source: trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs @ 2280

Last change on this file since 2280 was 1587, checked in by Paul Lelgemann, 12 years ago

+ CrypP2P: Added configuration option for "Use local local address detection" (defaults to false)
o Minor bugfixes in PeerToPeer(Publisher|Subscriber), which seem to handle an unconnected p2p network wrong

File size: 17.5 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase.Control;
6using Cryptool.PluginBase;
7using System.Timers;
8using Cryptool.Plugins.PeerToPeer.Internal;
9
10/*
11 * All Subscriber functions work problem-free!
12 *
13 * IDEAS:
14 * - Publisher takes subscriber list out of the DHT and registered
15 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
16 */
17
18namespace Cryptool.Plugins.PeerToPeer
19{
20    public class P2PSubscriberBase
21    {
22        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
23        public event GuiMessage OnGuiMessage;
24        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
25        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
26        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
27        /// <summary>
28        /// fired when Manager sent "stop" message to the worker.
29        /// </summary>
30        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
31
32        #region Variables
33
34        protected IP2PControl p2pControl;
35        private long sendAliveMessageInterval;
36        private long checkPublishersAvailability;
37        private long publisherReplyTimespan;
38        private string sTopic;
39
40        /// <summary>
41        /// Checking liveness, availability and/or changes (new peer) of the Publisher.
42        /// Retrieves the required DHT entries and initiates the necessary steps
43        /// </summary>
44        private Timer timerCheckPubAvailability;
45        /// <summary>
46        /// To inform the publisher pro-active, that this subscriber
47        /// is still alive and interested in this Task, send periodical
48        /// alive messages.
49        /// </summary>
50        private Timer timerSendingAliveMsg;
51        /// <summary>
52        /// This timer gets started when a DHT entry for a Publisher exists and
53        /// we want to check the liveness of the publisher,
54        /// at which the subscriber had registered. Therefore we send a Ping message
55        /// to the Publisher. If the timer callback is called and no Pong-Response was
56        /// received from the Publisher, the probability is high, that the Publisher is down!
57        /// </summary>
58        private Timer timeoutForPublishersPong;
59        /// <summary>
60        /// After register message is sent to publisher, this timer is started.
61        /// If the publisher doesn't responds with a RegisteringAccepted-Message,
62        /// the probability is high, that the publisher is down!
63        /// </summary>
64        private Timer timeoutForPublishersRegAccept;
65        /// <summary>
66        /// PeerID of the actual publisher. This ID is will be checked continious
67        /// on liveliness and/or updated if Publisher had changed.
68        /// </summary>
69        private PeerId actualPublisher;
70        public PeerId ActualPublisher
71        {
72            get { return this.actualPublisher; }
73            set { this.actualPublisher = value; }
74        }
75
76        /// <summary>
77        /// if true, check whether a new Publisher is the actual one
78        /// and renew Settings
79        /// </summary>
80        private bool bolStopped = true;
81
82        private bool started = false;
83        /// <summary>
84        /// Status flag which contains the state of the Subscriber
85        /// </summary>
86        public bool Started
87        {
88            get { return this.started; }
89            private set { this.started = value; }
90        }
91
92        #endregion
93
94        /* BEGIN: Only for experimental cases */
95
96        public void SolutionFound(byte[] solutionData)
97        {
98            SendMessage(actualPublisher, PubSubMessageType.Solution);
99            this.p2pControl.SendToPeer(solutionData, actualPublisher);
100        }
101
102        /* END: Only for experimental cases */
103
104        public P2PSubscriberBase(IP2PControl p2pControl)
105        {
106            this.p2pControl = p2pControl;
107
108            this.timeoutForPublishersPong = new Timer();
109            this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
110
111            this.timeoutForPublishersRegAccept = new Timer();
112            this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
113
114            this.timerCheckPubAvailability = new Timer();
115            this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
116
117            this.timerSendingAliveMsg = new Timer();
118            this.timerSendingAliveMsg.Elapsed += new ElapsedEventHandler(OnSendAliveMessage);
119        }
120
121        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
122        {
123            this.actualPublisher = null;
124
125            this.sTopic = sTopic;
126            this.checkPublishersAvailability = checkPublishersAvailability;
127            this.publisherReplyTimespan = publishersReplyTimespan;
128
129            #region Initialize network-maintanance-Timers
130
131            double pubTimeResponseTimeout = Convert.ToDouble(this.publisherReplyTimespan);
132
133            this.timerCheckPubAvailability.Interval = Convert.ToDouble(this.checkPublishersAvailability);
134            this.timerCheckPubAvailability.AutoReset = true;
135            this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
136            this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
137
138            #endregion
139
140            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
141            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
142
143            if (this.p2pControl != null)
144            {
145                string sNonrelevant;
146                PeerId myPeerId = this.p2pControl.GetPeerID(out sNonrelevant);
147
148                if (myPeerId != null)
149                {
150                    GuiLogging("Started Subscriber with ID: '" + myPeerId.ToString() + "'", NotificationLevel.Info);
151                } else
152                {
153                    GuiLogging("myPeerId not available. Aborting.", NotificationLevel.Error);
154                    return;
155                }
156            }
157
158            this.Started = true;
159
160            CheckPublishersAvailability();
161        }
162
163        private void CheckPublishersAvailability()
164        {
165            // retrieve publisher information from the DHT
166            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
167            this.sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
168
169            if (pid == null || this.sendAliveMessageInterval == 0)
170            {
171                GuiLogging("No Publisher/Manager information for registering found in the DHT.", NotificationLevel.Info);
172            }
173            else
174            {
175                this.timerSendingAliveMsg.Interval = Convert.ToDouble(this.sendAliveMessageInterval);
176                this.timerSendingAliveMsg.Start();
177
178                if (actualPublisher == null)
179                {
180                    GuiLogging("Found a Publisher/Manager with ID '" + pid.ToString() + ", so register with it.", NotificationLevel.Info);
181                    SendMessage(pid, PubSubMessageType.Register);
182                }
183                else if (actualPublisher == pid)
184                {
185                    // Timer will be only stopped, when OnMessageReceived-Event received
186                    // a Pong-Response from the publisher!
187                    SendMessage(pid, PubSubMessageType.Ping);
188                    this.timeoutForPublishersPong.Start();
189                    GuiLogging("Successfully checked publishers'/managers' information in the DHT. To check liveness, a Ping message was sended to '" + pid.ToString() + "'.", NotificationLevel.Debug);
190                }
191                else
192                {
193                    GuiLogging("The Publisher/Manager had changed from '" + this.actualPublisher.ToString()
194                        + "' to '" + pid.ToString() + "'. Register with the new Publisher/Manager.", NotificationLevel.Info);
195                    SendMessage(pid, PubSubMessageType.Register);
196                }
197                this.actualPublisher = pid;
198            }
199            this.timerCheckPubAvailability.Enabled = true;
200        }
201
202        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
203        {
204            if (sender != this.actualPublisher)
205            {
206                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
207                return;
208            }
209            switch (msgType)
210            {
211                case PubSubMessageType.RegisteringAccepted:
212                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
213                    this.timeoutForPublishersRegAccept.Stop();
214                    break;
215                case PubSubMessageType.Ping:
216                    SendMessage(sender, PubSubMessageType.Pong);
217                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
218                    break;
219                case PubSubMessageType.Register:
220                case PubSubMessageType.Unregister:
221                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
222                    // continuously trying to get a unregister and than re-register with publisher
223                    Stop(msgType);
224                    CheckPublishersAvailability();
225                    break;
226                case PubSubMessageType.Solution:
227                    Stop(msgType);
228                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
229                    break;
230                case PubSubMessageType.Stop:
231                    Stop(msgType);
232                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
233                    break;
234                case PubSubMessageType.Pong:
235                    this.timeoutForPublishersPong.Stop();
236                    break;
237                case PubSubMessageType.Alive:
238                default:
239                    // not possible at the moment
240                    break;
241            }
242            // workaround, because Timers.Timer doesn't contains a "Reset" method --> when receiving a
243            // message from the Publisher, we can reset the "check pub availability"-interval time!
244            this.timerCheckPubAvailability.Enabled = false;
245            this.timerCheckPubAvailability.Enabled = true;
246        }
247
248        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
249        {
250            if (sender != actualPublisher)
251            {
252                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
253                return;
254            }
255            // functionality swapped for better inheritance
256            HandleIncomingData(sender, data);
257        }
258
259        /// <summary>
260        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
261        /// </summary>
262        /// <param name="senderId"></param>
263        /// <param name="sData"></param>
264        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
265        {
266            GuiLogging("RECEIVED: Message from '" + senderId
267                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
268
269            if (OnTextArrivedFromPublisher != null)
270                OnTextArrivedFromPublisher(data, senderId);
271        }
272
273        // added by Arnold - 2010.03.22
274        // so Worker can send additional reg messages, if connection got lost
275        public void SendRegMsg()
276        {
277            SendMessage(actualPublisher, PubSubMessageType.Register);
278        }
279
280        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
281        {
282            switch (msgType)
283            {
284                case PubSubMessageType.Register:
285                    // start waiting interval for RegAccept Message
286                    this.timeoutForPublishersRegAccept.Start();
287                    break;
288                case PubSubMessageType.Alive:
289                case PubSubMessageType.Ping:
290                case PubSubMessageType.Pong:
291                case PubSubMessageType.Unregister:
292                case PubSubMessageType.Stop:
293                case PubSubMessageType.Solution:
294                    break;
295                //case PubSubMessageType.Solution:
296                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
297                //    Stop(PubSubMessageType.NULL);
298                //    break;
299                default:
300                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
301                    return;
302            }
303            // it doesn't care which message type was sent, but when it was sent to the publisher, reset this AliveMsg Timer
304            if (pubPeerId == actualPublisher)
305            {
306                this.timerSendingAliveMsg.Enabled = false;
307                this.timerSendingAliveMsg.Enabled = true;
308            }
309
310            this.p2pControl.SendToPeer(msgType, pubPeerId);
311
312            GuiLogging(msgType.ToString() + " message sent to Publisher ID '" + pubPeerId.ToString() + "'.", NotificationLevel.Debug);
313        }
314
315        private void OnSendAliveMessage(object sender, ElapsedEventArgs e)
316        {
317            SendMessage(actualPublisher, PubSubMessageType.Alive);
318        }
319
320        /// <summary>
321        /// Callback for timerCheckPubAvailability (adjustable parameter
322        /// in settings, usually every 60 seconds). If another Peer
323        /// takes over Publishing the Task, this and many other things
324        /// will be initiated here
325        /// </summary>
326        /// <param name="state"></param>
327        private void OnCheckPubAvailability(object sender, ElapsedEventArgs e)
328        {
329            CheckPublishersAvailability();
330        }
331
332        /// <summary>
333        /// This callback is only fired, when the publisher didn't sent a response on the register message.
334        /// </summary>
335        /// <param name="state"></param>
336        private void OnTimeoutRegisteringAccepted(object sender, ElapsedEventArgs e)
337        {
338            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
339            // try to register again
340            CheckPublishersAvailability();
341        }
342
343        /// <summary>
344        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
345        /// </summary>
346        /// <param name="state"></param>
347        private void OnTimeoutPublishersPong(object sender, ElapsedEventArgs e)
348        {
349            GuiLogging("Publisher didn't respond on subscribers' ping message in the given time span!", NotificationLevel.Info);
350            this.timeoutForPublishersPong.Stop();
351            // try to get an active publisher and re-register
352
353            CheckPublishersAvailability();
354        }
355
356        /// <summary>
357        /// Will stop all timers, so Subscriber ends with sending
358        /// Register-, Alive- and Pong-messages. Furthermore an
359        /// unregister message will be send to the publisher
360        /// </summary>
361        public void Stop(PubSubMessageType msgType)
362        {
363            if (actualPublisher != null)
364                SendMessage(actualPublisher, msgType);
365
366            #region stopping all timers, if they are still active
367
368            this.timeoutForPublishersPong.Stop();
369            this.timeoutForPublishersRegAccept.Stop();
370            this.timerSendingAliveMsg.Stop();
371
372            //when Sub received a UnReg message, it haven't stop
373            //the registering not possible worker, to connect to
374            //a new incoming Publisher
375            if (msgType == PubSubMessageType.Stop)
376            {
377                this.bolStopped = true;
378
379                this.timerCheckPubAvailability.Stop();
380
381                this.Started = false;
382                this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
383                this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
384                GuiLogging("Subscriber/Worker is completely stopped", NotificationLevel.Debug);
385            }
386            else
387            {
388                GuiLogging("Publisher/Manager had left the network, waiting for its comeback or takeover by a new Publisher/Manager.", NotificationLevel.Info);
389            }
390
391            #endregion
392
393            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
394        }
395
396        protected void GuiLogging(string sText, NotificationLevel notLev)
397        {
398            if (OnGuiMessage != null)
399            {
400                // for evaluation issues, DateTime is added
401                OnGuiMessage(sText + "(" + DebugToFile.GetTimeStamp() + ")", notLev);
402                //OnGuiMessage(sText, notLev);
403            }
404        }
405    }
406}
Note: See TracBrowser for help on using the repository browser.