source: trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs @ 1137

Last change on this file since 1137 was 1137, checked in by arnold, 12 years ago

Completely redesigned Manager-JobAdmin-Worker-infrastructure to distribute Jobs with a Peer-to-Peer infrastructure to remote CT2-Workspaces.

To test this infrastructure, open 2 instances of CT and load P2P_Manager_NEW_DES.cte and in the other instance P2P_Worker_NEW.cte.
HINT: Working with remote peers isn't possible every time, because the so called "SuperNode", which is necessary for relaying, sometimes goes down. But testing this infrastructure on different computers in the same network should work every time.

File size: 39.3 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase.Control;
6using Cryptool.PluginBase;
7using System.Threading;
8
9/*
10 * All Subscriber functions work problem-free!
11 *
12 * IDEAS:
13 * - Publisher takes subscriber list out of the DHT and registered
14 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
15 */
16
17namespace Cryptool.Plugins.PeerToPeer
18{
19    public class P2PSubscriberBase
20    {
21        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
22        public event GuiMessage OnGuiMessage;
23        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
24        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
25        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
26        /// <summary>
27        /// fired when Manager sent "stop" message to the worker.
28        /// </summary>
29        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
30
31        #region Variables
32
33        protected IP2PControl p2pControl;
34        private long sendAliveMessageInterval;
35        private long checkPublishersAvailability;
36        private long publisherReplyTimespan;
37        private string sTopic;
38
39        /// <summary>
40        /// If the DHT Entry of the given Task is empty, continous try to
41        /// find a meanwhile inscribed Publishers PeerID
42        /// </summary>
43        private Timer timerRegisteringNotPossible;
44        /// <summary>
45        /// For informing the publisher pro-active, that this subscriber
46        /// is still interested in this Task.
47        /// </summary>
48        private Timer timerSendingAliveMsg;
49        /// <summary>
50        /// checking liveness, availability and/or changes (new peer) of the Publisher
51        /// </summary>
52        private Timer timerCheckPubAvailability;
53        /// <summary>
54        /// this timer gets started when the availability of the publisher,
55        /// at which the subscriber had registered, is checked. If the timer
56        /// callback is called and no Pong-message was received, the probability
57        /// that the Publisher is down is high!
58        /// </summary>
59        private Timer timeoutForPublishersPong;
60        /// <summary>
61        /// After register message is sent to publisher, this timer gets started.
62        /// If the publisher doesn't response with a RegisteringAccepted-Message,
63        /// the probability that the publisher is down is high!
64        /// </summary>
65        private Timer timeoutForPublishersRegAccept;
66        /// <summary>
67        /// PeerID of the actual publisher. This ID is will be checked continious
68        /// on liveliness and/or updated if Publisher had changed.
69        /// </summary>
70        private PeerId actualPublisher;
71        public PeerId ActualPublisher
72        {
73            get { return this.actualPublisher; }
74            set { this.actualPublisher = value; }
75        }
76
77        /// <summary>
78        /// if true, check whether a new Publisher is the actual one
79        /// and renew Settings
80        /// </summary>
81        private bool bolStopped = true;
82
83        private bool started = false;
84        /// <summary>
85        /// Status flag which contains the state of the Subscriber
86        /// </summary>
87        public bool Started
88        {
89            get { return this.started; }
90            private set { this.started = value; }
91        }
92
93        #endregion
94
95        /* BEGIN: Only for experimental cases */
96
97        public void SolutionFound(byte[] solutionData)
98        {
99            SendMessage(actualPublisher, PubSubMessageType.Solution);
100            this.p2pControl.SendToPeer(solutionData, actualPublisher);
101        }
102
103        /* END: Only for experimental cases */
104
105        public P2PSubscriberBase(IP2PControl p2pControl)
106        {
107            this.p2pControl = p2pControl;
108        }
109
110        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
111        {
112            this.sTopic = sTopic;
113            this.checkPublishersAvailability = checkPublishersAvailability;
114            this.publisherReplyTimespan = publishersReplyTimespan;
115            Register();
116        }
117
118        private void Register()
119        {
120            // Unfortunately you have to register this events every time, because this events will be deregistered, when
121            // Publisher/Manager sends a Unregister/Stop-Message... There isn't any possibility to check,
122            // whether the Events are already registered (if(dings != null) or anything else).
123            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
124            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
125            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
126            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
127
128            // because CheckPublishersAvailability checks this value, set it for the first time here...
129            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
130            this.bolStopped = false;
131            PeerId pubId = CheckPublishersAvailability();
132            // if DHT Entry for the task is empty, no Publisher exists at present.
133            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
134            if (pubId == null)
135            {
136                this.Started = false;
137                // if PubId is null, the Publisher isn't started!
138                this.bolStopped = true;
139                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
140                return;
141            }
142
143            // when the actual publisher differs from the new detected publisher, change it
144            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
145            {
146                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
147                actualPublisher = pubId;
148            }
149            SendMessage(pubId, PubSubMessageType.Register);
150            this.started = true;
151        }
152
153        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
154        {
155            if (sender != actualPublisher)
156            {
157                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
158                return;
159            }
160            switch (msgType)
161            {
162                case PubSubMessageType.RegisteringAccepted:
163                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
164                    if (this.timeoutForPublishersRegAccept != null)
165                    {
166                        this.timeoutForPublishersRegAccept.Dispose();
167                        this.timeoutForPublishersRegAccept = null;
168                    }
169                    break;
170                case PubSubMessageType.Ping:
171                    SendMessage(sender, PubSubMessageType.Pong);
172                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
173                    break;
174                case PubSubMessageType.Register:
175                case PubSubMessageType.Unregister:
176                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
177                    // continuously try to get a unregister and than re-register with publisher
178                    Stop(msgType);
179                    Register();
180                    break;
181                case PubSubMessageType.Solution:
182                    Stop(msgType);
183                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
184                    break;
185                case PubSubMessageType.Stop:
186                    Stop(msgType);
187                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
188                    break;
189                case PubSubMessageType.Pong:
190                    if (this.timeoutForPublishersPong != null)
191                    {
192                        this.timeoutForPublishersPong.Dispose();
193                        this.timeoutForPublishersPong = null;
194                    }
195                    break;
196                case PubSubMessageType.Alive:
197                default:
198                    // not possible at the moment
199                    break;
200            }
201        }
202
203        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
204        {
205            if (sender != actualPublisher)
206            {
207                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
208                return;
209            }
210            // functionality swapped for better inheritance
211            HandleIncomingData(sender, data);
212        }
213
214        /// <summary>
215        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
216        /// </summary>
217        /// <param name="senderId"></param>
218        /// <param name="sData"></param>
219        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
220        {
221            GuiLogging("RECEIVED: Message from '" + senderId
222                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
223
224            if (OnTextArrivedFromPublisher != null)
225                OnTextArrivedFromPublisher(data, senderId);
226        }
227
228        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
229        {
230            if (timerSendingAliveMsg == null && !this.bolStopped)
231                timerSendingAliveMsg = new Timer(OnSendAliveMessage, null, sendAliveMessageInterval, sendAliveMessageInterval);
232
233            switch (msgType)
234            {
235                case PubSubMessageType.Register:
236                    // stop "RegisteringNotPossibleTimer
237                    if (timerRegisteringNotPossible != null)
238                    {
239                        timerRegisteringNotPossible.Dispose();
240                        timerRegisteringNotPossible = null;
241                    }
242                    // start waiting interval for RegAccept Message
243                    if (this.timeoutForPublishersRegAccept == null)
244                        this.timeoutForPublishersRegAccept = new Timer(OnTimeoutRegisteringAccepted, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
245                    break;
246                case PubSubMessageType.Alive:
247                case PubSubMessageType.Ping:
248                case PubSubMessageType.Pong:
249                case PubSubMessageType.Unregister:
250                case PubSubMessageType.Stop:
251                case PubSubMessageType.Solution:
252                    break;
253                //case PubSubMessageType.Solution:
254                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
255                //    Stop(PubSubMessageType.NULL);
256                //    break;
257                default:
258                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
259                    return;
260            }
261            this.p2pControl.SendToPeer(msgType, pubPeerId);
262
263            GuiLogging(msgType.ToString() + " message sent to Publisher", NotificationLevel.Debug);
264        }
265
266        // registering isn't possible if no publisher has stored
267        // his ID in the DHT Entry with the Key TaskName
268        private void OnRegisteringNotPossible(object state)
269        {
270            Register();
271        }
272
273        private void OnSendAliveMessage(object state)
274        {
275            SendMessage(actualPublisher, PubSubMessageType.Alive);
276        }
277
278        /// <summary>
279        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
280        /// a Timer will be started, to check periodically the DHT entry.
281        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
282        /// The Timer for periodically checking the Publishers availability is also started here.
283        /// </summary>
284        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
285        private PeerId CheckPublishersAvailability()
286        {
287            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
288
289            if (pid == null)
290            {
291                if (timerRegisteringNotPossible == null && !this.bolStopped)
292                {
293                    // if DHT value doesn't exist at this moment, wait for 10 seconds and try again
294                    timerRegisteringNotPossible = new Timer(OnRegisteringNotPossible, null, 10000, 10000);
295                }
296                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
297                return null;
298            }
299
300            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
301
302            if (sendAliveMessageInterval == 0)
303            {
304                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
305                return null;
306            }
307
308            if (actualPublisher == null) //first time initialization
309            {
310                actualPublisher = pid;
311                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
312            }
313
314            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
315
316            // setting timer to check periodical the availability of the publishing peer
317            if (timerCheckPubAvailability == null && !this.bolStopped)
318                timerCheckPubAvailability = new Timer(OnCheckPubAvailability, null, this.checkPublishersAvailability, this.checkPublishersAvailability);
319
320            return pid;
321        }
322
323        /// <summary>
324        /// Callback for timerCheckPubAvailability (adjustable parameter
325        /// in settings, usually every 60 seconds). If another Peer
326        /// takes over Publishing the Task, this will be handled in this callback, too.
327        /// </summary>
328        /// <param name="state"></param>
329        private void OnCheckPubAvailability(object state)
330        {
331            PeerId newPubId = CheckPublishersAvailability();
332
333            if (newPubId == actualPublisher)
334            {
335                // Timer will be only stopped, when OnMessageReceived-Event received
336                // a Pong-Response from the publisher!
337                SendMessage(actualPublisher, PubSubMessageType.Ping);
338                if (timeoutForPublishersPong == null)
339                {
340                    timeoutForPublishersPong = new Timer(OnTimeoutPublishersPong, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
341                }
342            }
343        }
344
345        /// <summary>
346        /// This callback is only fired, when the publisher didn't sent a response on the register message.
347        /// </summary>
348        /// <param name="state"></param>
349        private void OnTimeoutRegisteringAccepted(object state)
350        {
351            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
352            // try to register again
353            Register();
354        }
355
356        /// <summary>
357        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
358        /// </summary>
359        /// <param name="state"></param>
360        private void OnTimeoutPublishersPong(object state)
361        {
362            GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Info);
363            if (timeoutForPublishersPong != null)
364            {
365                timeoutForPublishersPong.Dispose();
366                timeoutForPublishersPong = null;
367            }
368            // try to get an active publisher and re-register
369            CheckPublishersAvailability();
370        }
371
372        /// <summary>
373        /// Will stop all timers, so Subscriber ends with sending
374        /// Register-, Alive- and Pong-messages. Furthermore an
375        /// unregister message will be send to the publisher
376        /// </summary>
377        public void Stop(PubSubMessageType msgType)
378        {
379            this.bolStopped = true;
380            if (actualPublisher != null && msgType != PubSubMessageType.NULL)
381                SendMessage(actualPublisher, msgType);
382
383            #region stopping all timers, if they are still active
384            if (this.timerSendingAliveMsg != null)
385            {
386                this.timerSendingAliveMsg.Dispose();
387                this.timerSendingAliveMsg = null;
388            }
389            if (this.timerRegisteringNotPossible != null)
390            {
391                this.timerRegisteringNotPossible.Dispose();
392                this.timerRegisteringNotPossible = null;
393            }
394            if (this.timerCheckPubAvailability != null)
395            {
396                this.timerCheckPubAvailability.Dispose();
397                this.timerCheckPubAvailability = null;
398            }
399            if (this.timeoutForPublishersRegAccept != null)
400            {
401                this.timeoutForPublishersRegAccept.Dispose();
402                this.timeoutForPublishersRegAccept = null;
403            }
404            if (this.timeoutForPublishersPong != null)
405            {
406                this.timeoutForPublishersPong.Dispose();
407                this.timeoutForPublishersPong = null;
408            }
409            #endregion
410            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
411
412            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
413            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
414
415            this.Started = false;
416            GuiLogging("Subscriber is completely stopped", NotificationLevel.Debug);
417        }
418
419        protected void GuiLogging(string sText, NotificationLevel notLev)
420        {
421            if (OnGuiMessage != null)
422                OnGuiMessage(sText, notLev);
423        }
424    }
425}
426
427
428//using System;
429//using System.Collections.Generic;
430//using System.Linq;
431//using System.Text;
432//using Cryptool.PluginBase.Control;
433//using Cryptool.PluginBase;
434//using System.Threading;
435
436///*
437// * All Subscriber functions work problem-free!
438// *
439// * IDEAS:
440// * - Publisher takes subscriber list out of the DHT and registered
441// *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
442// */
443
444//namespace Cryptool.Plugins.PeerToPeer
445//{
446//    public class P2PSubscriberBase
447//    {
448//        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
449//        public event GuiMessage OnGuiMessage;
450//        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
451//        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
452//        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
453//        /// <summary>
454//        /// fired when Manager sent "stop" message to the worker.
455//        /// </summary>
456//        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
457
458//        #region Variables
459
460//        protected IP2PControl p2pControl;
461//        private long sendAliveMessageInterval;
462//        private long checkPublishersAvailability;
463//        private long publisherReplyTimespan;
464//        private string sTopic;
465
466//        /// <summary>
467//        /// If the DHT Entry of the given Task is empty, continous try to
468//        /// find a meanwhile inscribed Publishers PeerID
469//        /// </summary>
470//        private Timer timerRegisteringNotPossible;
471//        /// <summary>
472//        /// For informing the publisher pro-active, that this subscriber
473//        /// is still interested in this Task.
474//        /// </summary>
475//        private Timer timerSendingAliveMsg;
476//        /// <summary>
477//        /// checking liveness, availability and/or changes (new peer) of the Publisher
478//        /// </summary>
479//        private Timer timerCheckPubAvailability;
480//        /// <summary>
481//        /// this timer gets started when the availability of the publisher,
482//        /// at which the subscriber had registered, is checked. If the timer
483//        /// callback is called and no Pong-message was received, the probability
484//        /// that the Publisher is down is high!
485//        /// </summary>
486//        private Timer timeoutForPublishersPong;
487//        /// <summary>
488//        /// After register message is sent to publisher, this timer gets started.
489//        /// If the publisher doesn't response with a RegisteringAccepted-Message,
490//        /// the probability that the publisher is down is high!
491//        /// </summary>
492//        private Timer timeoutForPublishersRegAccept;
493//        /// <summary>
494//        /// PeerID of the actual publisher. This ID is will be checked continious
495//        /// on liveliness and/or updated if Publisher had changed.
496//        /// </summary>
497//        private PeerId actualPublisher;
498//        /// <summary>
499//        /// PeerID of the actual publisher. This ID is will be checked continious
500//        /// on liveliness and/or updated if Publisher had changed
501//        /// </summary>
502//        public PeerId ActualPublisher
503//        {
504//            get { return this.actualPublisher; }
505//        }
506
507//        /// <summary>
508//        /// if true, check whether a new Publisher is the actual one
509//        /// and renew Settings
510//        /// </summary>
511//        private bool bolStopped = true;
512
513//        private bool started = false;
514//        /// <summary>
515//        /// Status flag which contains the state of the Subscriber
516//        /// </summary>
517//        public bool Started
518//        {
519//            get { return this.started;  }
520//            private set { this.started = value;  }
521//        }
522
523//        #endregion
524
525//        /* BEGIN: Only for experimental cases */
526
527//        public void SolutionFound(byte[] solutionData)
528//        {
529//            SendMessage(actualPublisher, PubSubMessageType.Solution);
530//            this.p2pControl.SendToPeer(solutionData, actualPublisher);
531//        }
532
533//        /* END: Only for experimental cases - 2010.02.07  */
534
535//        /* BEGIN: Added for P2PJobAdmin */
536//        public P2PSubscriberBase()
537//        {
538//        }
539
540//        public void Start(IP2PControl p2pControl, string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
541//        {
542//            this.p2pControl = p2pControl;
543//            Start(sTopic, checkPublishersAvailability, publisherReplyTimespan);
544//        }
545//        /* END: Added for P2PJobAdmin - 2010.02.07 */
546
547//        public P2PSubscriberBase(IP2PControl p2pControl)
548//        {
549//            this.p2pControl = p2pControl;
550//        }     
551
552//        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
553//        {
554//            this.sTopic = sTopic;
555//            this.checkPublishersAvailability = checkPublishersAvailability;
556//            this.publisherReplyTimespan = publishersReplyTimespan;
557//            Register();
558//        }
559
560//        private void Register()
561//        {
562//            // Unfortunately you have to register this events every time, because this events will be deregistered, when
563//            // Publisher/Manager sends a Unregister/Stop-Message... There isn't any possibility to check,
564//            // whether the Events are already registered (if(dings != null) or anything else).
565//            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
566//            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
567//            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
568//            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
569
570//            // because CheckPublishersAvailability checks this value, set it for the first time here...
571//            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
572//            this.bolStopped = false;
573//            PeerId pubId = CheckPublishersAvailability();
574//            // if DHT Entry for the task is empty, no Publisher exists at present.
575//            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
576//            if (pubId == null)
577//            {
578//                this.Started = false;
579//                // if PubId is null, the Publisher isn't started!
580//                this.bolStopped = true;
581//                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
582//                return;
583//            }
584
585//            // when the actual publisher differs from the new detected publisher, change it
586//            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
587//            {
588//                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
589//                actualPublisher = pubId;
590//            }
591//            SendMessage(pubId, PubSubMessageType.Register);
592//            this.started = true;
593//        }
594
595//        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
596//        {
597//            if (sender != actualPublisher)
598//            {
599//                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
600//                return;
601//            }
602//            switch (msgType)
603//            {
604//                case PubSubMessageType.RegisteringAccepted:
605//                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
606//                    if (this.timeoutForPublishersRegAccept != null)
607//                    {
608//                        this.timeoutForPublishersRegAccept.Dispose();
609//                        this.timeoutForPublishersRegAccept = null;
610//                    }
611//                    break;
612//                case PubSubMessageType.Ping:
613//                    SendMessage(sender, PubSubMessageType.Pong);
614//                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
615//                    break;
616//                case PubSubMessageType.Register:
617//                case PubSubMessageType.Unregister:
618//                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
619//                    // continuously try to get a unregister and than re-register with publisher
620//                    Stop(msgType);
621//                    Register();
622//                    break;
623//                case PubSubMessageType.Solution:
624//                    Stop(msgType);
625//                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
626//                    break;
627//                case PubSubMessageType.Stop:
628//                    Stop(msgType);
629//                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
630//                    break;
631//                case PubSubMessageType.Pong:
632//                    if (this.timeoutForPublishersPong != null)
633//                    {
634//                        this.timeoutForPublishersPong.Dispose();
635//                        this.timeoutForPublishersPong = null;
636//                    }
637//                    break;
638//                case PubSubMessageType.Alive:
639//                default:
640//                    // not possible at the moment
641//                    break;
642//            }
643//        }
644
645//        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
646//        {
647//            if (sender != actualPublisher)
648//            {
649//                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
650//                return;
651//            }
652//            // functionality swapped for better inheritance
653//            HandleIncomingData(sender, data);
654//        }
655
656//        /// <summary>
657//        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
658//        /// </summary>
659//        /// <param name="senderId"></param>
660//        /// <param name="sData"></param>
661//        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
662//        {
663//            GuiLogging("RECEIVED: Message from '" + senderId
664//                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
665
666//            if (OnTextArrivedFromPublisher != null)
667//                OnTextArrivedFromPublisher(data, senderId);
668//        }
669
670//        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
671//        {
672//            if (timerSendingAliveMsg == null && !this.bolStopped)
673//                timerSendingAliveMsg = new Timer(OnSendAliveMessage, null, sendAliveMessageInterval, sendAliveMessageInterval);
674
675//            switch (msgType)
676//            {
677//                case PubSubMessageType.Register:
678//                    // stop "RegisteringNotPossibleTimer
679//                    if (timerRegisteringNotPossible != null)
680//                    {
681//                        timerRegisteringNotPossible.Dispose();
682//                        timerRegisteringNotPossible = null;
683//                    }
684//                    // start waiting interval for RegAccept Message
685//                    if (this.timeoutForPublishersRegAccept == null)
686//                        this.timeoutForPublishersRegAccept = new Timer(OnTimeoutRegisteringAccepted, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
687//                    break;
688//                case PubSubMessageType.Alive:
689//                case PubSubMessageType.Ping:
690//                case PubSubMessageType.Pong:
691//                case PubSubMessageType.Unregister:
692//                case PubSubMessageType.Stop:
693//                case PubSubMessageType.Solution:
694//                    break;
695//                //case PubSubMessageType.Solution:
696//                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
697//                //    Stop(PubSubMessageType.NULL);
698//                //    break;
699//                default:
700//                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
701//                    return;
702//            }
703//            this.p2pControl.SendToPeer(msgType, pubPeerId);
704
705//            GuiLogging(msgType.ToString() + " message sent to Publisher", NotificationLevel.Debug);
706//        }
707
708//        // registering isn't possible if no publisher has stored
709//        // its ID in the DHT Entry with the Key TaskName
710//        private void OnRegisteringNotPossible(object state)
711//        {
712//            Register();
713//        }
714
715//        private void OnSendAliveMessage(object state)
716//        {
717//            SendMessage(actualPublisher, PubSubMessageType.Alive);
718//        }
719
720//        /// <summary>
721//        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
722//        /// a Timer will be started, to check periodically the DHT entry.
723//        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
724//        /// The Timer for periodically checking the Publishers availability is also started here.
725//        /// </summary>
726//        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
727//        private PeerId CheckPublishersAvailability()
728//        {
729//            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
730
731//            if (pid == null)
732//            {
733//                if (timerRegisteringNotPossible == null && !this.bolStopped)
734//                {
735//                    // if DHT value doesn't exist at this moment, wait for 10 seconds and try again
736//                    timerRegisteringNotPossible = new Timer(OnRegisteringNotPossible, null, 10000, 10000);
737//                }
738//                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
739//                return null;
740//            }
741
742//            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
743
744//            if (sendAliveMessageInterval == 0)
745//            {
746//                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
747//                return null;
748//            }
749
750//            if (actualPublisher == null) //first time initialization
751//            {
752//                actualPublisher = pid;
753//                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
754//            }
755//            //else if (actualPublisher != pid)
756//            //{
757//            //    GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
758//            //    SendMessage(pubId, PubSubMessageType.Register);
759//            //    actualPublisher = pid;
760//            //}
761
762//            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
763
764//            // setting timer to check periodical the availability of the publishing peer
765//            if (timerCheckPubAvailability == null && !this.bolStopped)
766//                timerCheckPubAvailability = new Timer(OnCheckPubAvailability, null, this.checkPublishersAvailability, this.checkPublishersAvailability);
767
768//            return pid;
769//        }
770
771//        /// <summary>
772//        /// Callback for timerCheckPubAvailability (adjustable parameter
773//        /// in settings, usually every 60 seconds). If another Peer
774//        /// takes over Publishing the Task, this will be handled in this callback, too.
775//        /// </summary>
776//        /// <param name="state"></param>
777//        private void OnCheckPubAvailability(object state)
778//        {
779//            PeerId newPubId = CheckPublishersAvailability();
780
781//            if (newPubId == actualPublisher)
782//            {
783//                // Timer will be only stopped, when OnMessageReceived-Event received
784//                // a Pong-Response from the publisher!
785//                SendMessage(actualPublisher, PubSubMessageType.Ping);
786//                if (timeoutForPublishersPong == null)
787//                {
788//                    timeoutForPublishersPong = new Timer(OnTimeoutPublishersPong, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
789//                }
790//            }
791//            if (newPubId != actualPublisher)
792//                Register();
793//        }
794
795//        /// <summary>
796//        /// This callback is only fired, when the publisher didn't sent a response on the register message.
797//        /// </summary>
798//        /// <param name="state"></param>
799//        private void OnTimeoutRegisteringAccepted(object state)
800//        {
801//            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
802//            // try to register again
803//            Register();
804//        }
805
806//        /// <summary>
807//        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
808//        /// </summary>
809//        /// <param name="state"></param>
810//        private void OnTimeoutPublishersPong(object state)
811//        {
812//            GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Warning);
813//            if (timeoutForPublishersPong != null)
814//            {
815//                timeoutForPublishersPong.Dispose();
816//                timeoutForPublishersPong = null;
817//            }
818//            // try to get an active publisher and re-register
819//            CheckPublishersAvailability();
820//        }
821
822//        /// <summary>
823//        /// Will stop all timers, so Subscriber ends with sending
824//        /// Register-, Alive- and Pong-messages. Furthermore an
825//        /// unregister message will be send to the publisher
826//        /// </summary>
827//        public void Stop(PubSubMessageType msgType)
828//        {
829//            this.bolStopped = true;
830//            if (actualPublisher != null && msgType != PubSubMessageType.NULL)
831//                SendMessage(actualPublisher, msgType);
832
833//            #region stopping all timers, if they are still active
834//            if (this.timerSendingAliveMsg != null)
835//            {
836//                this.timerSendingAliveMsg.Dispose();
837//                this.timerSendingAliveMsg = null;
838//            }
839//            if (this.timerRegisteringNotPossible != null)
840//            {
841//                this.timerRegisteringNotPossible.Dispose();
842//                this.timerRegisteringNotPossible = null;
843//            }
844//            if (this.timerCheckPubAvailability != null)
845//            {
846//                this.timerCheckPubAvailability.Dispose();
847//                this.timerCheckPubAvailability = null;
848//            }
849//            if (this.timeoutForPublishersRegAccept != null)
850//            {
851//                this.timeoutForPublishersRegAccept.Dispose();
852//                this.timeoutForPublishersRegAccept = null;
853//            }
854//            if (this.timeoutForPublishersPong != null)
855//            {
856//                this.timeoutForPublishersPong.Dispose();
857//                this.timeoutForPublishersPong = null;
858//            }
859//            #endregion
860//            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
861
862//            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
863//            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
864
865//            this.Started = false;
866//            GuiLogging("Subscriber is completely stopped",NotificationLevel.Debug);
867//        }
868
869//        protected void GuiLogging(string sText, NotificationLevel notLev)
870//        {
871//            if (OnGuiMessage != null)
872//                OnGuiMessage(sText, notLev);
873//        }
874//    }
875//}
Note: See TracBrowser for help on using the repository browser.