source: trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs @ 1144

Last change on this file since 1144 was 1144, checked in by arnold, 12 years ago

P2PManager/P2PJobAdmin: Finetuning of leaving and re-joining the network.
Miscellaneous null-Checks implemented, so some p2p-sided errors where catched.

File size: 21.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase.Control;
6using Cryptool.PluginBase;
7using System.Timers;
8
9/*
10 * All Subscriber functions work problem-free!
11 *
12 * IDEAS:
13 * - Publisher takes subscriber list out of the DHT and registered
14 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
15 */
16
17namespace Cryptool.Plugins.PeerToPeer
18{
19    public class P2PSubscriberBase
20    {
21        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
22        public event GuiMessage OnGuiMessage;
23        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
24        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
25        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
26        /// <summary>
27        /// fired when Manager sent "stop" message to the worker.
28        /// </summary>
29        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
30
31        #region Variables
32
33        protected IP2PControl p2pControl;
34        private long sendAliveMessageInterval;
35        private long checkPublishersAvailability;
36        private long publisherReplyTimespan;
37        private string sTopic;
38
39        /// <summary>
40        /// Checking liveness, availability and/or changes (new peer) of the Publisher.
41        /// Retrieves the required DHT entries and initiates the necessary steps
42        /// </summary>
43        private Timer timerCheckPubAvailability;
44        /// <summary>
45        /// To inform the publisher pro-active, that this subscriber
46        /// is still alive and interested in this Task, send periodical
47        /// alive messages.
48        /// </summary>
49        private Timer timerSendingAliveMsg;
50        /// <summary>
51        /// This timer gets started when a DHT entry for a Publisher exists and
52        /// we want to check the liveness of the publisher,
53        /// at which the subscriber had registered. Therefore we send a Ping message
54        /// to the Publisher. If the timer callback is called and no Pong-Response was
55        /// received from the Publisher, the probability is high, that the Publisher is down!
56        /// </summary>
57        private Timer timeoutForPublishersPong;
58        /// <summary>
59        /// After register message is sent to publisher, this timer is started.
60        /// If the publisher doesn't responds with a RegisteringAccepted-Message,
61        /// the probability is high, that the publisher is down!
62        /// </summary>
63        private Timer timeoutForPublishersRegAccept;
64        /// <summary>
65        /// PeerID of the actual publisher. This ID is will be checked continious
66        /// on liveliness and/or updated if Publisher had changed.
67        /// </summary>
68        private PeerId actualPublisher;
69        public PeerId ActualPublisher
70        {
71            get { return this.actualPublisher; }
72            set { this.actualPublisher = value; }
73        }
74
75        /// <summary>
76        /// if true, check whether a new Publisher is the actual one
77        /// and renew Settings
78        /// </summary>
79        private bool bolStopped = true;
80
81        private bool started = false;
82        /// <summary>
83        /// Status flag which contains the state of the Subscriber
84        /// </summary>
85        public bool Started
86        {
87            get { return this.started; }
88            private set { this.started = value; }
89        }
90
91        #endregion
92
93        /* BEGIN: Only for experimental cases */
94
95        public void SolutionFound(byte[] solutionData)
96        {
97            SendMessage(actualPublisher, PubSubMessageType.Solution);
98            this.p2pControl.SendToPeer(solutionData, actualPublisher);
99        }
100
101        /* END: Only for experimental cases */
102
103        public P2PSubscriberBase(IP2PControl p2pControl)
104        {
105            this.p2pControl = p2pControl;
106
107            this.timeoutForPublishersPong = new Timer();
108            this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
109
110            this.timeoutForPublishersRegAccept = new Timer();
111            this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
112
113            this.timerCheckPubAvailability = new Timer();
114            this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
115
116            this.timerSendingAliveMsg = new Timer();
117            this.timerSendingAliveMsg.Elapsed += new ElapsedEventHandler(OnSendAliveMessage);
118        }
119
120        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
121        {
122            this.actualPublisher = null;
123
124            this.sTopic = sTopic;
125            this.checkPublishersAvailability = checkPublishersAvailability;
126            this.publisherReplyTimespan = publishersReplyTimespan;
127
128            #region Initialize network-maintanance-Timers
129
130            double pubTimeResponseTimeout = Convert.ToDouble(this.publisherReplyTimespan);
131
132            this.timerCheckPubAvailability.Interval = Convert.ToDouble(this.checkPublishersAvailability);
133            this.timerCheckPubAvailability.AutoReset = true;
134            this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
135            this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
136
137            #endregion
138
139            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
140            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
141
142            if (this.p2pControl != null)
143            {
144                string sNonrelevant;
145                PeerId myPeerId = this.p2pControl.GetPeerID(out sNonrelevant);
146                GuiLogging("Started Subscriber with ID: '" + myPeerId.ToString() + "'", NotificationLevel.Info);
147            }
148
149            this.Started = true;
150
151            CheckPublishersAvailability2();
152        }
153
154        private void CheckPublishersAvailability2()
155        {
156            // retrieve publisher information from the DHT
157            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
158            this.sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
159
160            if (pid == null || this.sendAliveMessageInterval == 0)
161            {
162                GuiLogging("No Publisher/Manager information for registering found in the DHT.", NotificationLevel.Info);
163            }
164            else
165            {
166                this.timerSendingAliveMsg.Interval = Convert.ToDouble(this.sendAliveMessageInterval);
167                this.timerSendingAliveMsg.Start();
168
169                if (actualPublisher == null)
170                {
171                    GuiLogging("Found a Publisher/Manager with ID '" + pid.ToString() + ", so register with it.", NotificationLevel.Info);
172                    SendMessage(pid, PubSubMessageType.Register);
173                    timeoutForPublishersRegAccept.Start();
174                }
175                else if (actualPublisher == pid)
176                {
177                    // Timer will be only stopped, when OnMessageReceived-Event received
178                    // a Pong-Response from the publisher!
179                    SendMessage(pid, PubSubMessageType.Ping);
180                    this.timeoutForPublishersPong.Start();
181                    GuiLogging("Successfully checked publishers'/managers' information in the DHT. To check liveness, a Ping message was sended to '" + pid.ToString() + "'.", NotificationLevel.Debug);
182                }
183                else
184                {
185                    GuiLogging("The Publisher/Manager had changed from '" + this.actualPublisher.ToString()
186                        + "' to '" + pid.ToString() + "'. Register with the new Publisher/Manager.", NotificationLevel.Info);
187                    SendMessage(pid, PubSubMessageType.Register);
188                    timeoutForPublishersRegAccept.Start();
189                }
190                this.actualPublisher = pid;
191            }
192            this.timerCheckPubAvailability.Enabled = true;
193        }
194
195        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
196        {
197            if (sender != this.actualPublisher)
198            {
199                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
200                return;
201            }
202            switch (msgType)
203            {
204                case PubSubMessageType.RegisteringAccepted:
205                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
206                    this.timeoutForPublishersRegAccept.Stop();
207                    break;
208                case PubSubMessageType.Ping:
209                    SendMessage(sender, PubSubMessageType.Pong);
210                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
211                    break;
212                case PubSubMessageType.Register:
213                case PubSubMessageType.Unregister:
214                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
215                    // continuously try to get a unregister and than re-register with publisher
216                    Stop(msgType);
217                    CheckPublishersAvailability2();
218                    break;
219                case PubSubMessageType.Solution:
220                    Stop(msgType);
221                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
222                    break;
223                case PubSubMessageType.Stop:
224                    Stop(msgType);
225                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
226                    break;
227                case PubSubMessageType.Pong:
228                    SendMessage(sender, PubSubMessageType.Register);
229                    this.timeoutForPublishersPong.Stop();
230                    break;
231                case PubSubMessageType.Alive:
232                default:
233                    // not possible at the moment
234                    break;
235            }
236            // workaround, because Timers.Timer doesn't contains a "Reset" method --> when receiving a
237            // message from the Publisher, we can reset the "check pub availability"-interval time!
238            this.timerCheckPubAvailability.Enabled = false;
239            this.timerCheckPubAvailability.Enabled = true;
240        }
241
242        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
243        {
244            if (sender != actualPublisher)
245            {
246                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
247                return;
248            }
249            // functionality swapped for better inheritance
250            HandleIncomingData(sender, data);
251        }
252
253        /// <summary>
254        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
255        /// </summary>
256        /// <param name="senderId"></param>
257        /// <param name="sData"></param>
258        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
259        {
260            GuiLogging("RECEIVED: Message from '" + senderId
261                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
262
263            if (OnTextArrivedFromPublisher != null)
264                OnTextArrivedFromPublisher(data, senderId);
265        }
266
267        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
268        {
269            switch (msgType)
270            {
271                case PubSubMessageType.Register:
272                    // start waiting interval for RegAccept Message
273                    this.timeoutForPublishersRegAccept.Start();
274                    break;
275                case PubSubMessageType.Alive:
276                case PubSubMessageType.Ping:
277                case PubSubMessageType.Pong:
278                case PubSubMessageType.Unregister:
279                case PubSubMessageType.Stop:
280                case PubSubMessageType.Solution:
281                    break;
282                //case PubSubMessageType.Solution:
283                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
284                //    Stop(PubSubMessageType.NULL);
285                //    break;
286                default:
287                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
288                    return;
289            }
290            // it doesn't care which message type was sent, but when it was sent to the publisher, reset this AliveMsg Timer
291            if (pubPeerId == actualPublisher)
292            {
293                this.timerSendingAliveMsg.Enabled = false;
294                this.timerSendingAliveMsg.Enabled = true;
295            }
296
297            this.p2pControl.SendToPeer(msgType, pubPeerId);
298
299            GuiLogging(msgType.ToString() + " message sent to Publisher ID '" + pubPeerId.ToString() + "'.", NotificationLevel.Debug);
300        }
301
302        private void OnSendAliveMessage(object sender, ElapsedEventArgs e)
303        {
304            SendMessage(actualPublisher, PubSubMessageType.Alive);
305        }
306
307        /// <summary>
308        /// Callback for timerCheckPubAvailability (adjustable parameter
309        /// in settings, usually every 60 seconds). If another Peer
310        /// takes over Publishing the Task, this and many other things
311        /// will be initiated here
312        /// </summary>
313        /// <param name="state"></param>
314        private void OnCheckPubAvailability(object sender, ElapsedEventArgs e)
315        {
316            CheckPublishersAvailability2();
317
318            //PeerId newPubId = CheckPublishersAvailability();
319
320            //if (newPubId == actualPublisher)
321            //{
322            //    // Timer will be only stopped, when OnMessageReceived-Event received
323            //    // a Pong-Response from the publisher!
324            //    SendMessage(actualPublisher, PubSubMessageType.Ping);
325            //    this.timeoutForPublishersPong.Start();
326            //}
327        }
328
329        /// <summary>
330        /// This callback is only fired, when the publisher didn't sent a response on the register message.
331        /// </summary>
332        /// <param name="state"></param>
333        private void OnTimeoutRegisteringAccepted(object sender, ElapsedEventArgs e)
334        {
335            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
336            // try to register again
337           
338            //Register();
339            CheckPublishersAvailability2();
340        }
341
342        /// <summary>
343        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
344        /// </summary>
345        /// <param name="state"></param>
346        private void OnTimeoutPublishersPong(object sender, ElapsedEventArgs e)
347        {
348            GuiLogging("Publisher didn't respond on subscribers' ping message in the given time span!", NotificationLevel.Info);
349            this.timeoutForPublishersPong.Stop();
350            // try to get an active publisher and re-register
351
352            //CheckPublishersAvailability();
353            CheckPublishersAvailability2();
354        }
355
356        /// <summary>
357        /// Will stop all timers, so Subscriber ends with sending
358        /// Register-, Alive- and Pong-messages. Furthermore an
359        /// unregister message will be send to the publisher
360        /// </summary>
361        public void Stop(PubSubMessageType msgType)
362        {
363            if (actualPublisher != null)
364                SendMessage(actualPublisher, msgType);
365
366            #region stopping all timers, if they are still active
367
368            this.timeoutForPublishersPong.Stop();
369            this.timeoutForPublishersRegAccept.Stop();
370            this.timerSendingAliveMsg.Stop();
371
372            //when Sub received a UnReg message, it haven't stop
373            //the registering not possible worker, to connect to
374            //a new incoming Publisher
375            if (msgType == PubSubMessageType.Stop)
376            {
377                this.bolStopped = true;
378
379                this.timerCheckPubAvailability.Stop();
380
381                this.Started = false;
382                this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
383                this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
384                GuiLogging("Subscriber/Worker is completely stopped", NotificationLevel.Debug);
385            }
386            else
387            {
388                GuiLogging("Publisher/Manager had left the network, waiting for its comeback or takeover by a new Publisher/Manager.", NotificationLevel.Info);
389            }
390
391            #endregion
392
393            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
394        }
395
396        protected void GuiLogging(string sText, NotificationLevel notLev)
397        {
398            if (OnGuiMessage != null)
399                OnGuiMessage(sText, notLev);
400        }
401    }
402}
403
404
405
406//private void Register()
407//        {
408//            // because CheckPublishersAvailability checks this value, set it for the first time here...
409//            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
410//            this.bolStopped = false;
411//            PeerId pubId = CheckPublishersAvailability();
412//            // if DHT Entry for the task is empty, no Publisher exists at present.
413//            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
414//            if (pubId == null)
415//            {
416//                this.Started = false;
417//                // if PubId is null, the Publisher isn't started!
418//                this.bolStopped = true;
419//                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
420//                return;
421//            }
422
423//            // when the actual publisher differs from the new detected publisher, change it
424//            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
425//            {
426//                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
427//                actualPublisher = pubId;
428//            }
429//            SendMessage(pubId, PubSubMessageType.Register);
430//            this.timeoutForPublishersRegAccept.Start();
431//            this.started = true;
432//        }
433
434//        /// <summary>
435//        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
436//        /// a Timer will be started, to check periodically the DHT entry.
437//        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
438//        /// The Timer for periodically checking the Publishers availability is also started here.
439//        /// </summary>
440//        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
441//        private PeerId CheckPublishersAvailability()
442//        {
443//            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
444
445//            if (pid == null)
446//            {
447//                // do nothing, because every time this method will be invoked by
448//                // the timerCheckPubAvailability-Event, the DHT entry will be checked
449//                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
450//                return null;
451//            }
452
453//            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
454
455//            if (sendAliveMessageInterval == 0)
456//            {
457//                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
458//                return null;
459//            }
460//            this.timerSendingAliveMsg.Interval = Convert.ToDouble(sendAliveMessageInterval);
461//            this.timerSendingAliveMsg.Start();
462
463//            if (actualPublisher == null) //first time initialization
464//            {
465//                actualPublisher = pid;
466//                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
467//            }
468
469//            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
470
471//            this.timerCheckPubAvailability.Start();
472//            // setting timer to check periodical the availability of the publishing peer
473
474//            return pid;
475//        }
Note: See TracBrowser for help on using the repository browser.