source: trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs @ 1166

Last change on this file since 1166 was 1166, checked in by arnold, 12 years ago

P2PManager: Design embellished and added an "estimated end time" information.
Samples: Output Boxes for KeySearcher Outputs

File size: 17.0 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase.Control;
6using Cryptool.PluginBase;
7using System.Timers;
8
9/*
10 * All Subscriber functions work problem-free!
11 *
12 * IDEAS:
13 * - Publisher takes subscriber list out of the DHT and registered
14 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
15 */
16
17namespace Cryptool.Plugins.PeerToPeer
18{
19    public class P2PSubscriberBase
20    {
21        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
22        public event GuiMessage OnGuiMessage;
23        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
24        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
25        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
26        /// <summary>
27        /// fired when Manager sent "stop" message to the worker.
28        /// </summary>
29        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
30
31        #region Variables
32
33        protected IP2PControl p2pControl;
34        private long sendAliveMessageInterval;
35        private long checkPublishersAvailability;
36        private long publisherReplyTimespan;
37        private string sTopic;
38
39        /// <summary>
40        /// Checking liveness, availability and/or changes (new peer) of the Publisher.
41        /// Retrieves the required DHT entries and initiates the necessary steps
42        /// </summary>
43        private Timer timerCheckPubAvailability;
44        /// <summary>
45        /// To inform the publisher pro-active, that this subscriber
46        /// is still alive and interested in this Task, send periodical
47        /// alive messages.
48        /// </summary>
49        private Timer timerSendingAliveMsg;
50        /// <summary>
51        /// This timer gets started when a DHT entry for a Publisher exists and
52        /// we want to check the liveness of the publisher,
53        /// at which the subscriber had registered. Therefore we send a Ping message
54        /// to the Publisher. If the timer callback is called and no Pong-Response was
55        /// received from the Publisher, the probability is high, that the Publisher is down!
56        /// </summary>
57        private Timer timeoutForPublishersPong;
58        /// <summary>
59        /// After register message is sent to publisher, this timer is started.
60        /// If the publisher doesn't responds with a RegisteringAccepted-Message,
61        /// the probability is high, that the publisher is down!
62        /// </summary>
63        private Timer timeoutForPublishersRegAccept;
64        /// <summary>
65        /// PeerID of the actual publisher. This ID is will be checked continious
66        /// on liveliness and/or updated if Publisher had changed.
67        /// </summary>
68        private PeerId actualPublisher;
69        public PeerId ActualPublisher
70        {
71            get { return this.actualPublisher; }
72            set { this.actualPublisher = value; }
73        }
74
75        /// <summary>
76        /// if true, check whether a new Publisher is the actual one
77        /// and renew Settings
78        /// </summary>
79        private bool bolStopped = true;
80
81        private bool started = false;
82        /// <summary>
83        /// Status flag which contains the state of the Subscriber
84        /// </summary>
85        public bool Started
86        {
87            get { return this.started; }
88            private set { this.started = value; }
89        }
90
91        #endregion
92
93        /* BEGIN: Only for experimental cases */
94
95        public void SolutionFound(byte[] solutionData)
96        {
97            SendMessage(actualPublisher, PubSubMessageType.Solution);
98            this.p2pControl.SendToPeer(solutionData, actualPublisher);
99        }
100
101        /* END: Only for experimental cases */
102
103        public P2PSubscriberBase(IP2PControl p2pControl)
104        {
105            this.p2pControl = p2pControl;
106
107            this.timeoutForPublishersPong = new Timer();
108            this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
109
110            this.timeoutForPublishersRegAccept = new Timer();
111            this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
112
113            this.timerCheckPubAvailability = new Timer();
114            this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
115
116            this.timerSendingAliveMsg = new Timer();
117            this.timerSendingAliveMsg.Elapsed += new ElapsedEventHandler(OnSendAliveMessage);
118        }
119
120        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
121        {
122            this.actualPublisher = null;
123
124            this.sTopic = sTopic;
125            this.checkPublishersAvailability = checkPublishersAvailability;
126            this.publisherReplyTimespan = publishersReplyTimespan;
127
128            #region Initialize network-maintanance-Timers
129
130            double pubTimeResponseTimeout = Convert.ToDouble(this.publisherReplyTimespan);
131
132            this.timerCheckPubAvailability.Interval = Convert.ToDouble(this.checkPublishersAvailability);
133            this.timerCheckPubAvailability.AutoReset = true;
134            this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
135            this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
136
137            #endregion
138
139            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
140            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
141
142            if (this.p2pControl != null)
143            {
144                string sNonrelevant;
145                PeerId myPeerId = this.p2pControl.GetPeerID(out sNonrelevant);
146                GuiLogging("Started Subscriber with ID: '" + myPeerId.ToString() + "'", NotificationLevel.Info);
147            }
148
149            this.Started = true;
150
151            CheckPublishersAvailability2();
152        }
153
154        private void CheckPublishersAvailability2()
155        {
156            // retrieve publisher information from the DHT
157            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
158            this.sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
159
160            if (pid == null || this.sendAliveMessageInterval == 0)
161            {
162                GuiLogging("No Publisher/Manager information for registering found in the DHT.", NotificationLevel.Info);
163            }
164            else
165            {
166                this.timerSendingAliveMsg.Interval = Convert.ToDouble(this.sendAliveMessageInterval);
167                this.timerSendingAliveMsg.Start();
168
169                if (actualPublisher == null)
170                {
171                    GuiLogging("Found a Publisher/Manager with ID '" + pid.ToString() + ", so register with it.", NotificationLevel.Info);
172                    SendMessage(pid, PubSubMessageType.Register);
173                    this.timeoutForPublishersRegAccept.Start();
174                }
175                else if (actualPublisher == pid)
176                {
177                    // Timer will be only stopped, when OnMessageReceived-Event received
178                    // a Pong-Response from the publisher!
179                    SendMessage(pid, PubSubMessageType.Ping);
180                    this.timeoutForPublishersPong.Start();
181                    GuiLogging("Successfully checked publishers'/managers' information in the DHT. To check liveness, a Ping message was sended to '" + pid.ToString() + "'.", NotificationLevel.Debug);
182                }
183                else
184                {
185                    GuiLogging("The Publisher/Manager had changed from '" + this.actualPublisher.ToString()
186                        + "' to '" + pid.ToString() + "'. Register with the new Publisher/Manager.", NotificationLevel.Info);
187                    SendMessage(pid, PubSubMessageType.Register);
188                    this.timeoutForPublishersRegAccept.Start();
189                }
190                this.actualPublisher = pid;
191            }
192            this.timerCheckPubAvailability.Enabled = true;
193        }
194
195        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
196        {
197            if (sender != this.actualPublisher)
198            {
199                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
200                return;
201            }
202            switch (msgType)
203            {
204                case PubSubMessageType.RegisteringAccepted:
205                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
206                    this.timeoutForPublishersRegAccept.Stop();
207                    break;
208                case PubSubMessageType.Ping:
209                    SendMessage(sender, PubSubMessageType.Pong);
210                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
211                    break;
212                case PubSubMessageType.Register:
213                case PubSubMessageType.Unregister:
214                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
215                    // continuously try to get a unregister and than re-register with publisher
216                    Stop(msgType);
217                    CheckPublishersAvailability2();
218                    break;
219                case PubSubMessageType.Solution:
220                    Stop(msgType);
221                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
222                    break;
223                case PubSubMessageType.Stop:
224                    Stop(msgType);
225                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
226                    break;
227                case PubSubMessageType.Pong:
228                    SendMessage(sender, PubSubMessageType.Register);
229                    this.timeoutForPublishersPong.Stop();
230                    break;
231                case PubSubMessageType.Alive:
232                default:
233                    // not possible at the moment
234                    break;
235            }
236            // workaround, because Timers.Timer doesn't contains a "Reset" method --> when receiving a
237            // message from the Publisher, we can reset the "check pub availability"-interval time!
238            this.timerCheckPubAvailability.Enabled = false;
239            this.timerCheckPubAvailability.Enabled = true;
240        }
241
242        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
243        {
244            if (sender != actualPublisher)
245            {
246                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
247                return;
248            }
249            // functionality swapped for better inheritance
250            HandleIncomingData(sender, data);
251        }
252
253        /// <summary>
254        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
255        /// </summary>
256        /// <param name="senderId"></param>
257        /// <param name="sData"></param>
258        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
259        {
260            GuiLogging("RECEIVED: Message from '" + senderId
261                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
262
263            if (OnTextArrivedFromPublisher != null)
264                OnTextArrivedFromPublisher(data, senderId);
265        }
266
267        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
268        {
269            switch (msgType)
270            {
271                case PubSubMessageType.Register:
272                    // start waiting interval for RegAccept Message
273                    this.timeoutForPublishersRegAccept.Start();
274                    break;
275                case PubSubMessageType.Alive:
276                case PubSubMessageType.Ping:
277                case PubSubMessageType.Pong:
278                case PubSubMessageType.Unregister:
279                case PubSubMessageType.Stop:
280                case PubSubMessageType.Solution:
281                    break;
282                //case PubSubMessageType.Solution:
283                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
284                //    Stop(PubSubMessageType.NULL);
285                //    break;
286                default:
287                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
288                    return;
289            }
290            // it doesn't care which message type was sent, but when it was sent to the publisher, reset this AliveMsg Timer
291            if (pubPeerId == actualPublisher)
292            {
293                this.timerSendingAliveMsg.Enabled = false;
294                this.timerSendingAliveMsg.Enabled = true;
295            }
296
297            this.p2pControl.SendToPeer(msgType, pubPeerId);
298
299            GuiLogging(msgType.ToString() + " message sent to Publisher ID '" + pubPeerId.ToString() + "'.", NotificationLevel.Debug);
300        }
301
302        private void OnSendAliveMessage(object sender, ElapsedEventArgs e)
303        {
304            SendMessage(actualPublisher, PubSubMessageType.Alive);
305        }
306
307        /// <summary>
308        /// Callback for timerCheckPubAvailability (adjustable parameter
309        /// in settings, usually every 60 seconds). If another Peer
310        /// takes over Publishing the Task, this and many other things
311        /// will be initiated here
312        /// </summary>
313        /// <param name="state"></param>
314        private void OnCheckPubAvailability(object sender, ElapsedEventArgs e)
315        {
316            CheckPublishersAvailability2();
317        }
318
319        /// <summary>
320        /// This callback is only fired, when the publisher didn't sent a response on the register message.
321        /// </summary>
322        /// <param name="state"></param>
323        private void OnTimeoutRegisteringAccepted(object sender, ElapsedEventArgs e)
324        {
325            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
326            // try to register again
327            CheckPublishersAvailability2();
328        }
329
330        /// <summary>
331        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
332        /// </summary>
333        /// <param name="state"></param>
334        private void OnTimeoutPublishersPong(object sender, ElapsedEventArgs e)
335        {
336            GuiLogging("Publisher didn't respond on subscribers' ping message in the given time span!", NotificationLevel.Info);
337            this.timeoutForPublishersPong.Stop();
338            // try to get an active publisher and re-register
339
340            CheckPublishersAvailability2();
341        }
342
343        /// <summary>
344        /// Will stop all timers, so Subscriber ends with sending
345        /// Register-, Alive- and Pong-messages. Furthermore an
346        /// unregister message will be send to the publisher
347        /// </summary>
348        public void Stop(PubSubMessageType msgType)
349        {
350            if (actualPublisher != null)
351                SendMessage(actualPublisher, msgType);
352
353            #region stopping all timers, if they are still active
354
355            this.timeoutForPublishersPong.Stop();
356            this.timeoutForPublishersRegAccept.Stop();
357            this.timerSendingAliveMsg.Stop();
358
359            //when Sub received a UnReg message, it haven't stop
360            //the registering not possible worker, to connect to
361            //a new incoming Publisher
362            if (msgType == PubSubMessageType.Stop)
363            {
364                this.bolStopped = true;
365
366                this.timerCheckPubAvailability.Stop();
367
368                this.Started = false;
369                this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
370                this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
371                GuiLogging("Subscriber/Worker is completely stopped", NotificationLevel.Debug);
372            }
373            else
374            {
375                GuiLogging("Publisher/Manager had left the network, waiting for its comeback or takeover by a new Publisher/Manager.", NotificationLevel.Info);
376            }
377
378            #endregion
379
380            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
381        }
382
383        protected void GuiLogging(string sText, NotificationLevel notLev)
384        {
385            if (OnGuiMessage != null)
386                OnGuiMessage(sText, notLev);
387        }
388    }
389}
Note: See TracBrowser for help on using the repository browser.