source: trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs @ 1130

Last change on this file since 1130 was 1130, checked in by arnold, 12 years ago

Added references to some P2PProjects, so the global SLN compiles correctly again

File size: 19.0 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase.Control;
6using Cryptool.PluginBase;
7using System.Threading;
8
9/*
10 * All Subscriber functions work problem-free!
11 *
12 * IDEAS:
13 * - Publisher takes subscriber list out of the DHT and registered
14 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
15 */
16
17namespace Cryptool.Plugins.PeerToPeer
18{
19    public class P2PSubscriberBase
20    {
21        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
22        public event GuiMessage OnGuiMessage;
23        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
24        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
25        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
26        /// <summary>
27        /// fired when Manager sent "stop" message to the worker.
28        /// </summary>
29        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
30
31        #region Variables
32
33        protected IP2PControl p2pControl;
34        private long sendAliveMessageInterval;
35        private long checkPublishersAvailability;
36        private long publisherReplyTimespan;
37        private string sTopic;
38
39        /// <summary>
40        /// If the DHT Entry of the given Task is empty, continous try to
41        /// find a meanwhile inscribed Publishers PeerID
42        /// </summary>
43        private Timer timerRegisteringNotPossible;
44        /// <summary>
45        /// For informing the publisher pro-active, that this subscriber
46        /// is still interested in this Task.
47        /// </summary>
48        private Timer timerSendingAliveMsg;
49        /// <summary>
50        /// checking liveness, availability and/or changes (new peer) of the Publisher
51        /// </summary>
52        private Timer timerCheckPubAvailability;
53        /// <summary>
54        /// this timer gets started when the availability of the publisher,
55        /// at which the subscriber had registered, is checked. If the timer
56        /// callback is called and no Pong-message was received, the probability
57        /// that the Publisher is down is high!
58        /// </summary>
59        private Timer timeoutForPublishersPong;
60        /// <summary>
61        /// After register message is sent to publisher, this timer gets started.
62        /// If the publisher doesn't response with a RegisteringAccepted-Message,
63        /// the probability that the publisher is down is high!
64        /// </summary>
65        private Timer timeoutForPublishersRegAccept;
66        /// <summary>
67        /// PeerID of the actual publisher. This ID is will be checked continious
68        /// on liveliness and/or updated if Publisher had changed.
69        /// </summary>
70        private PeerId actualPublisher;
71        /// <summary>
72        /// if true, check whether a new Publisher is the actual one
73        /// and renew Settings
74        /// </summary>
75        private bool bolStopped = true;
76
77        private bool started = false;
78        /// <summary>
79        /// Status flag which contains the state of the Subscriber
80        /// </summary>
81        public bool Started
82        {
83            get { return this.started;  }
84            private set { this.started = value;  } 
85        }
86
87        #endregion
88
89        /* BEGIN: Only for experimental cases */
90
91        public void SolutionFound(byte[] solutionData)
92        {
93            SendMessage(actualPublisher, PubSubMessageType.Solution);
94            this.p2pControl.SendToPeer(solutionData, actualPublisher);
95        }
96
97        /* END: Only for experimental cases */
98
99
100        public P2PSubscriberBase(IP2PControl p2pControl)
101        {
102            this.p2pControl = p2pControl;
103        }
104
105        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
106        {
107            this.sTopic = sTopic;
108            this.checkPublishersAvailability = checkPublishersAvailability;
109            this.publisherReplyTimespan = publishersReplyTimespan;
110            Register();
111        }
112
113        private void Register()
114        {
115            // Unfortunately you have to register this events every time, because this events will be deregistered, when
116            // Publisher/Manager sends a Unregister/Stop-Message... There isn't any possibility to check,
117            // whether the Events are already registered (if(dings != null) or anything else).
118            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
119            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
120            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
121            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
122
123            // because CheckPublishersAvailability checks this value, set it for the first time here...
124            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
125            this.bolStopped = false;
126            PeerId pubId = CheckPublishersAvailability();
127            // if DHT Entry for the task is empty, no Publisher exists at present.
128            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
129            if (pubId == null)
130            {
131                this.Started = false;
132                // if PubId is null, the Publisher isn't started!
133                this.bolStopped = true;
134                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
135                return;
136            }
137
138            // when the actual publisher differs from the new detected publisher, change it
139            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
140            {
141                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
142                actualPublisher = pubId;
143            }
144            SendMessage(pubId, PubSubMessageType.Register);
145            this.started = true;
146        }
147
148        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
149        {
150            if (sender != actualPublisher)
151            {
152                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
153                return;
154            }
155            switch (msgType)
156            {
157                case PubSubMessageType.RegisteringAccepted:
158                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
159                    if (this.timeoutForPublishersRegAccept != null)
160                    {
161                        this.timeoutForPublishersRegAccept.Dispose();
162                        this.timeoutForPublishersRegAccept = null;
163                    }
164                    break;
165                case PubSubMessageType.Ping:
166                    SendMessage(sender, PubSubMessageType.Pong);
167                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
168                    break;
169                case PubSubMessageType.Register:
170                case PubSubMessageType.Unregister:
171                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
172                    // continuously try to get a unregister and than re-register with publisher
173                    Stop(msgType);
174                    Register();
175                    break;
176                case PubSubMessageType.Solution:
177                    Stop(msgType);
178                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
179                    break;
180                case PubSubMessageType.Stop:
181                    Stop(msgType);
182                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
183                    break;
184                case PubSubMessageType.Pong:
185                    if (this.timeoutForPublishersPong != null)
186                    {
187                        this.timeoutForPublishersPong.Dispose();
188                        this.timeoutForPublishersPong = null;
189                    }
190                    break;
191                case PubSubMessageType.Alive:
192                default:
193                    // not possible at the moment
194                    break;
195            }
196        }
197
198        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
199        {
200            if (sender != actualPublisher)
201            {
202                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
203                return;
204            }
205            // functionality swapped for better inheritance
206            HandleIncomingData(sender, data);
207        }
208
209        /// <summary>
210        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
211        /// </summary>
212        /// <param name="senderId"></param>
213        /// <param name="sData"></param>
214        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
215        {
216            GuiLogging("RECEIVED: Message from '" + senderId
217                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
218
219            if (OnTextArrivedFromPublisher != null)
220                OnTextArrivedFromPublisher(data, senderId);
221        }
222
223        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
224        {
225            if (timerSendingAliveMsg == null && !this.bolStopped)
226                timerSendingAliveMsg = new Timer(OnSendAliveMessage, null, sendAliveMessageInterval, sendAliveMessageInterval);
227
228            switch (msgType)
229            {
230                case PubSubMessageType.Register:
231                    // stop "RegisteringNotPossibleTimer
232                    if (timerRegisteringNotPossible != null)
233                    {
234                        timerRegisteringNotPossible.Dispose();
235                        timerRegisteringNotPossible = null;
236                    }
237                    // start waiting interval for RegAccept Message
238                    if (this.timeoutForPublishersRegAccept == null)
239                        this.timeoutForPublishersRegAccept = new Timer(OnTimeoutRegisteringAccepted, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
240                    break;
241                case PubSubMessageType.Alive:
242                case PubSubMessageType.Ping:
243                case PubSubMessageType.Pong:
244                case PubSubMessageType.Unregister:
245                case PubSubMessageType.Stop: 
246                case PubSubMessageType.Solution:
247                    break;
248                //case PubSubMessageType.Solution:
249                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
250                //    Stop(PubSubMessageType.NULL);
251                //    break;
252                default:
253                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
254                    return;
255            }
256            this.p2pControl.SendToPeer(msgType, pubPeerId);
257
258            GuiLogging(msgType.ToString() + " message sent to Publisher", NotificationLevel.Debug);
259        }
260
261        // registering isn't possible if no publisher has stored
262        // its ID in the DHT Entry with the Key TaskName
263        private void OnRegisteringNotPossible(object state)
264        {
265            Register();
266        }
267
268        private void OnSendAliveMessage(object state)
269        {
270            SendMessage(actualPublisher, PubSubMessageType.Alive);
271        }
272
273        /// <summary>
274        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
275        /// a Timer will be started, to check periodically the DHT entry.
276        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
277        /// The Timer for periodically checking the Publishers availability is also started here.
278        /// </summary>
279        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
280        private PeerId CheckPublishersAvailability()
281        {
282            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
283
284            if (pid == null)
285            {
286                if (timerRegisteringNotPossible == null && !this.bolStopped)
287                {
288                    // if DHT value doesn't exist at this moment, wait for 10 seconds and try again
289                    timerRegisteringNotPossible = new Timer(OnRegisteringNotPossible, null, 10000, 10000);
290                }
291                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
292                return null;
293            }
294
295            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
296
297            if (sendAliveMessageInterval == 0)
298            {
299                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
300                return null;
301            }
302
303            if (actualPublisher == null) //first time initialization
304            {
305                actualPublisher = pid;
306                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
307            }
308            //else if (actualPublisher != pid)
309            //{
310            //    GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
311            //    SendMessage(pubId, PubSubMessageType.Register);
312            //    actualPublisher = pid;
313            //}
314
315            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
316
317            // setting timer to check periodical the availability of the publishing peer
318            if (timerCheckPubAvailability == null && !this.bolStopped)
319                timerCheckPubAvailability = new Timer(OnCheckPubAvailability, null, this.checkPublishersAvailability, this.checkPublishersAvailability);
320
321            return pid;
322        }
323
324        /// <summary>
325        /// Callback for timerCheckPubAvailability (adjustable parameter
326        /// in settings, usually every 60 seconds). If another Peer
327        /// takes over Publishing the Task, this will be handled in this callback, too.
328        /// </summary>
329        /// <param name="state"></param>
330        private void OnCheckPubAvailability(object state)
331        {
332            PeerId newPubId = CheckPublishersAvailability();
333
334            if (newPubId == actualPublisher)
335            {
336                // Timer will be only stopped, when OnMessageReceived-Event received
337                // a Pong-Response from the publisher!
338                SendMessage(actualPublisher, PubSubMessageType.Ping);
339                if (timeoutForPublishersPong == null)
340                {
341                    timeoutForPublishersPong = new Timer(OnTimeoutPublishersPong, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
342                }
343            }
344            if (newPubId != actualPublisher)
345                Register();
346        }
347
348        /// <summary>
349        /// This callback is only fired, when the publisher didn't sent a response on the register message.
350        /// </summary>
351        /// <param name="state"></param>
352        private void OnTimeoutRegisteringAccepted(object state)
353        {
354            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
355            // try to register again
356            Register();
357        }
358
359        /// <summary>
360        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
361        /// </summary>
362        /// <param name="state"></param>
363        private void OnTimeoutPublishersPong(object state)
364        {
365            GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Warning);
366            if (timeoutForPublishersPong != null)
367            {
368                timeoutForPublishersPong.Dispose();
369                timeoutForPublishersPong = null;
370            }
371            // try to get an active publisher and re-register
372            CheckPublishersAvailability();
373        }
374
375        /// <summary>
376        /// Will stop all timers, so Subscriber ends with sending
377        /// Register-, Alive- and Pong-messages. Furthermore an
378        /// unregister message will be send to the publisher
379        /// </summary>
380        public void Stop(PubSubMessageType msgType)
381        {
382            this.bolStopped = true;
383            if (actualPublisher != null && msgType != PubSubMessageType.NULL)
384                SendMessage(actualPublisher, msgType);
385
386            #region stopping all timers, if they are still active
387            if (this.timerSendingAliveMsg != null)
388            {
389                this.timerSendingAliveMsg.Dispose();
390                this.timerSendingAliveMsg = null;
391            }
392            if (this.timerRegisteringNotPossible != null)
393            {
394                this.timerRegisteringNotPossible.Dispose();
395                this.timerRegisteringNotPossible = null;
396            }
397            if (this.timerCheckPubAvailability != null)
398            {
399                this.timerCheckPubAvailability.Dispose();
400                this.timerCheckPubAvailability = null;
401            }
402            if (this.timeoutForPublishersRegAccept != null)
403            {
404                this.timeoutForPublishersRegAccept.Dispose();
405                this.timeoutForPublishersRegAccept = null;
406            }
407            if (this.timeoutForPublishersPong != null)
408            {
409                this.timeoutForPublishersPong.Dispose();
410                this.timeoutForPublishersPong = null;
411            }
412            #endregion
413            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
414
415            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
416            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
417
418            this.Started = false;
419            GuiLogging("Subscriber is completely stopped",NotificationLevel.Debug);
420        }
421
422        protected void GuiLogging(string sText, NotificationLevel notLev)
423        {
424            if (OnGuiMessage != null)
425                OnGuiMessage(sText, notLev);
426        }
427    }
428}
Note: See TracBrowser for help on using the repository browser.