source: trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs @ 1173

Last change on this file since 1173 was 1173, checked in by arnold, 12 years ago

some bug fixes.

File size: 16.8 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase.Control;
6using Cryptool.PluginBase;
7using System.Timers;
8
9/*
10 * All Subscriber functions work problem-free!
11 *
12 * IDEAS:
13 * - Publisher takes subscriber list out of the DHT and registered
14 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
15 */
16
17namespace Cryptool.Plugins.PeerToPeer
18{
19    public class P2PSubscriberBase
20    {
21        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
22        public event GuiMessage OnGuiMessage;
23        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
24        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
25        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
26        /// <summary>
27        /// fired when Manager sent "stop" message to the worker.
28        /// </summary>
29        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
30
31        #region Variables
32
33        protected IP2PControl p2pControl;
34        private long sendAliveMessageInterval;
35        private long checkPublishersAvailability;
36        private long publisherReplyTimespan;
37        private string sTopic;
38
39        /// <summary>
40        /// Checking liveness, availability and/or changes (new peer) of the Publisher.
41        /// Retrieves the required DHT entries and initiates the necessary steps
42        /// </summary>
43        private Timer timerCheckPubAvailability;
44        /// <summary>
45        /// To inform the publisher pro-active, that this subscriber
46        /// is still alive and interested in this Task, send periodical
47        /// alive messages.
48        /// </summary>
49        private Timer timerSendingAliveMsg;
50        /// <summary>
51        /// This timer gets started when a DHT entry for a Publisher exists and
52        /// we want to check the liveness of the publisher,
53        /// at which the subscriber had registered. Therefore we send a Ping message
54        /// to the Publisher. If the timer callback is called and no Pong-Response was
55        /// received from the Publisher, the probability is high, that the Publisher is down!
56        /// </summary>
57        private Timer timeoutForPublishersPong;
58        /// <summary>
59        /// After register message is sent to publisher, this timer is started.
60        /// If the publisher doesn't responds with a RegisteringAccepted-Message,
61        /// the probability is high, that the publisher is down!
62        /// </summary>
63        private Timer timeoutForPublishersRegAccept;
64        /// <summary>
65        /// PeerID of the actual publisher. This ID is will be checked continious
66        /// on liveliness and/or updated if Publisher had changed.
67        /// </summary>
68        private PeerId actualPublisher;
69        public PeerId ActualPublisher
70        {
71            get { return this.actualPublisher; }
72            set { this.actualPublisher = value; }
73        }
74
75        /// <summary>
76        /// if true, check whether a new Publisher is the actual one
77        /// and renew Settings
78        /// </summary>
79        private bool bolStopped = true;
80
81        private bool started = false;
82        /// <summary>
83        /// Status flag which contains the state of the Subscriber
84        /// </summary>
85        public bool Started
86        {
87            get { return this.started; }
88            private set { this.started = value; }
89        }
90
91        #endregion
92
93        /* BEGIN: Only for experimental cases */
94
95        public void SolutionFound(byte[] solutionData)
96        {
97            SendMessage(actualPublisher, PubSubMessageType.Solution);
98            this.p2pControl.SendToPeer(solutionData, actualPublisher);
99        }
100
101        /* END: Only for experimental cases */
102
103        public P2PSubscriberBase(IP2PControl p2pControl)
104        {
105            this.p2pControl = p2pControl;
106
107            this.timeoutForPublishersPong = new Timer();
108            this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
109
110            this.timeoutForPublishersRegAccept = new Timer();
111            this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
112
113            this.timerCheckPubAvailability = new Timer();
114            this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
115
116            this.timerSendingAliveMsg = new Timer();
117            this.timerSendingAliveMsg.Elapsed += new ElapsedEventHandler(OnSendAliveMessage);
118        }
119
120        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
121        {
122            this.actualPublisher = null;
123
124            this.sTopic = sTopic;
125            this.checkPublishersAvailability = checkPublishersAvailability;
126            this.publisherReplyTimespan = publishersReplyTimespan;
127
128            #region Initialize network-maintanance-Timers
129
130            double pubTimeResponseTimeout = Convert.ToDouble(this.publisherReplyTimespan);
131
132            this.timerCheckPubAvailability.Interval = Convert.ToDouble(this.checkPublishersAvailability);
133            this.timerCheckPubAvailability.AutoReset = true;
134            this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
135            this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
136
137            #endregion
138
139            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
140            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
141
142            if (this.p2pControl != null)
143            {
144                string sNonrelevant;
145                PeerId myPeerId = this.p2pControl.GetPeerID(out sNonrelevant);
146                GuiLogging("Started Subscriber with ID: '" + myPeerId.ToString() + "'", NotificationLevel.Info);
147            }
148
149            this.Started = true;
150
151            CheckPublishersAvailability2();
152        }
153
154        private void CheckPublishersAvailability2()
155        {
156            // retrieve publisher information from the DHT
157            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
158            this.sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
159
160            if (pid == null || this.sendAliveMessageInterval == 0)
161            {
162                GuiLogging("No Publisher/Manager information for registering found in the DHT.", NotificationLevel.Info);
163            }
164            else
165            {
166                this.timerSendingAliveMsg.Interval = Convert.ToDouble(this.sendAliveMessageInterval);
167                this.timerSendingAliveMsg.Start();
168
169                if (actualPublisher == null)
170                {
171                    GuiLogging("Found a Publisher/Manager with ID '" + pid.ToString() + ", so register with it.", NotificationLevel.Info);
172                    SendMessage(pid, PubSubMessageType.Register);
173                }
174                else if (actualPublisher == pid)
175                {
176                    // Timer will be only stopped, when OnMessageReceived-Event received
177                    // a Pong-Response from the publisher!
178                    SendMessage(pid, PubSubMessageType.Ping);
179                    this.timeoutForPublishersPong.Start();
180                    GuiLogging("Successfully checked publishers'/managers' information in the DHT. To check liveness, a Ping message was sended to '" + pid.ToString() + "'.", NotificationLevel.Debug);
181                }
182                else
183                {
184                    GuiLogging("The Publisher/Manager had changed from '" + this.actualPublisher.ToString()
185                        + "' to '" + pid.ToString() + "'. Register with the new Publisher/Manager.", NotificationLevel.Info);
186                    SendMessage(pid, PubSubMessageType.Register);
187                }
188                this.actualPublisher = pid;
189            }
190            this.timerCheckPubAvailability.Enabled = true;
191        }
192
193        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
194        {
195            if (sender != this.actualPublisher)
196            {
197                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
198                return;
199            }
200            switch (msgType)
201            {
202                case PubSubMessageType.RegisteringAccepted:
203                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
204                    this.timeoutForPublishersRegAccept.Stop();
205                    break;
206                case PubSubMessageType.Ping:
207                    SendMessage(sender, PubSubMessageType.Pong);
208                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
209                    break;
210                case PubSubMessageType.Register:
211                case PubSubMessageType.Unregister:
212                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
213                    // continuously try to get a unregister and than re-register with publisher
214                    Stop(msgType);
215                    CheckPublishersAvailability2();
216                    break;
217                case PubSubMessageType.Solution:
218                    Stop(msgType);
219                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
220                    break;
221                case PubSubMessageType.Stop:
222                    Stop(msgType);
223                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
224                    break;
225                case PubSubMessageType.Pong:
226                    this.timeoutForPublishersPong.Stop();
227                    break;
228                case PubSubMessageType.Alive:
229                default:
230                    // not possible at the moment
231                    break;
232            }
233            // workaround, because Timers.Timer doesn't contains a "Reset" method --> when receiving a
234            // message from the Publisher, we can reset the "check pub availability"-interval time!
235            this.timerCheckPubAvailability.Enabled = false;
236            this.timerCheckPubAvailability.Enabled = true;
237        }
238
239        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
240        {
241            if (sender != actualPublisher)
242            {
243                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
244                return;
245            }
246            // functionality swapped for better inheritance
247            HandleIncomingData(sender, data);
248        }
249
250        /// <summary>
251        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
252        /// </summary>
253        /// <param name="senderId"></param>
254        /// <param name="sData"></param>
255        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
256        {
257            GuiLogging("RECEIVED: Message from '" + senderId
258                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
259
260            if (OnTextArrivedFromPublisher != null)
261                OnTextArrivedFromPublisher(data, senderId);
262        }
263
264        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
265        {
266            switch (msgType)
267            {
268                case PubSubMessageType.Register:
269                    // start waiting interval for RegAccept Message
270                    this.timeoutForPublishersRegAccept.Start();
271                    break;
272                case PubSubMessageType.Alive:
273                case PubSubMessageType.Ping:
274                case PubSubMessageType.Pong:
275                case PubSubMessageType.Unregister:
276                case PubSubMessageType.Stop:
277                case PubSubMessageType.Solution:
278                    break;
279                //case PubSubMessageType.Solution:
280                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
281                //    Stop(PubSubMessageType.NULL);
282                //    break;
283                default:
284                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
285                    return;
286            }
287            // it doesn't care which message type was sent, but when it was sent to the publisher, reset this AliveMsg Timer
288            if (pubPeerId == actualPublisher)
289            {
290                this.timerSendingAliveMsg.Enabled = false;
291                this.timerSendingAliveMsg.Enabled = true;
292            }
293
294            this.p2pControl.SendToPeer(msgType, pubPeerId);
295
296            GuiLogging(msgType.ToString() + " message sent to Publisher ID '" + pubPeerId.ToString() + "'.", NotificationLevel.Debug);
297        }
298
299        private void OnSendAliveMessage(object sender, ElapsedEventArgs e)
300        {
301            SendMessage(actualPublisher, PubSubMessageType.Alive);
302        }
303
304        /// <summary>
305        /// Callback for timerCheckPubAvailability (adjustable parameter
306        /// in settings, usually every 60 seconds). If another Peer
307        /// takes over Publishing the Task, this and many other things
308        /// will be initiated here
309        /// </summary>
310        /// <param name="state"></param>
311        private void OnCheckPubAvailability(object sender, ElapsedEventArgs e)
312        {
313            CheckPublishersAvailability2();
314        }
315
316        /// <summary>
317        /// This callback is only fired, when the publisher didn't sent a response on the register message.
318        /// </summary>
319        /// <param name="state"></param>
320        private void OnTimeoutRegisteringAccepted(object sender, ElapsedEventArgs e)
321        {
322            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
323            // try to register again
324            CheckPublishersAvailability2();
325        }
326
327        /// <summary>
328        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
329        /// </summary>
330        /// <param name="state"></param>
331        private void OnTimeoutPublishersPong(object sender, ElapsedEventArgs e)
332        {
333            GuiLogging("Publisher didn't respond on subscribers' ping message in the given time span!", NotificationLevel.Info);
334            this.timeoutForPublishersPong.Stop();
335            // try to get an active publisher and re-register
336
337            CheckPublishersAvailability2();
338        }
339
340        /// <summary>
341        /// Will stop all timers, so Subscriber ends with sending
342        /// Register-, Alive- and Pong-messages. Furthermore an
343        /// unregister message will be send to the publisher
344        /// </summary>
345        public void Stop(PubSubMessageType msgType)
346        {
347            if (actualPublisher != null)
348                SendMessage(actualPublisher, msgType);
349
350            #region stopping all timers, if they are still active
351
352            this.timeoutForPublishersPong.Stop();
353            this.timeoutForPublishersRegAccept.Stop();
354            this.timerSendingAliveMsg.Stop();
355
356            //when Sub received a UnReg message, it haven't stop
357            //the registering not possible worker, to connect to
358            //a new incoming Publisher
359            if (msgType == PubSubMessageType.Stop)
360            {
361                this.bolStopped = true;
362
363                this.timerCheckPubAvailability.Stop();
364
365                this.Started = false;
366                this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
367                this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
368                GuiLogging("Subscriber/Worker is completely stopped", NotificationLevel.Debug);
369            }
370            else
371            {
372                GuiLogging("Publisher/Manager had left the network, waiting for its comeback or takeover by a new Publisher/Manager.", NotificationLevel.Info);
373            }
374
375            #endregion
376
377            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
378        }
379
380        protected void GuiLogging(string sText, NotificationLevel notLev)
381        {
382            if (OnGuiMessage != null)
383                OnGuiMessage(sText, notLev);
384        }
385    }
386}
Note: See TracBrowser for help on using the repository browser.