source: trunk/CrypPlugins/PeerToPeerPublisher/P2PPublisherBase.cs @ 1166

Last change on this file since 1166 was 1166, checked in by arnold, 12 years ago

P2PManager: Design embellished and added an "estimated end time" information.
Samples: Output Boxes for KeySearcher Outputs

File size: 17.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using Cryptool.PluginBase;
6using Cryptool.PluginBase.Control;
7//using System.Threading;
8using System.Timers;
9using System.IO;
10
11/*
12 * TODO:
13 * - FUTURE: dual data management of subscriber list (on local peer and in DHT)
14 */
15
16namespace Cryptool.Plugins.PeerToPeer
17{
18    public class P2PPublisherBase
19    {
20        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
21        public virtual event GuiMessage OnGuiMessage;
22
23        #region Variables
24
25        protected IP2PControl p2pControl;
26        protected SubscriberManagement peerManagement;
27        private string topic = String.Empty;
28        private Timer timerWaitingForAliveMsg;
29       
30        private PeerId ownPeerId;
31        /// <summary>
32        /// interval in milliseconds!!! Divide with 1000 to preserve seconds
33        /// </summary>
34        private long aliveMessageInterval;
35
36        private bool started = false;
37        /// <summary>
38        /// Status flag which contains the state of the Publisher
39        /// </summary>
40        public bool Started
41        {
42            get { return this.started; }
43            private set { this.started = value; }
44        }
45
46        // Publisher-exchange extension - Arnie 2010.02.02
47        /// <summary>
48        /// Interval for waiting for other Publishers Pong in milliseconds!
49        /// </summary>
50        private const long INTERVAL_WAITING_FOR_OTHER_PUBS_PONG = 10000;
51        private Timer waitingForOtherPublishersPong;
52        /// <summary>
53        /// if this value is set, you are between the TimeSpan of checking liveness of the other peer.
54        /// If Timespan runs out without receiving a Pong-Msg from the other Publisher, assume its functionality
55        /// </summary>
56        private PeerId otherPublisherPeer = null;
57        private bool otherPublisherHadResponded = false;
58
59        #endregion
60
61        public P2PPublisherBase(IP2PControl p2pControl)
62        {
63            this.p2pControl = p2pControl;
64
65            this.timerWaitingForAliveMsg = new Timer();
66            this.timerWaitingForAliveMsg.AutoReset = true;
67            this.timerWaitingForAliveMsg.Elapsed += new ElapsedEventHandler(OnWaitingForAliveMsg);
68
69            this.waitingForOtherPublishersPong = new Timer();
70            this.waitingForOtherPublishersPong.AutoReset = false;
71            this.waitingForOtherPublishersPong.Interval = INTERVAL_WAITING_FOR_OTHER_PUBS_PONG;
72            this.waitingForOtherPublishersPong.Elapsed += new ElapsedEventHandler(OnWaitingForOtherPublishersPong);
73        }
74
75        // Publisher-exchange extension - Arnie 2010.02.02
76        /// <summary>
77        /// Callback function for waitingForOtherPublishersPong-object. Will be only executed, when a
78        /// different Publisher-ID was found in the DHT, to check if the "old" Publisher is still
79        /// alive!
80        /// </summary>
81        /// <param name="sender"></param>
82        /// <param name="e"></param>
83        private void OnWaitingForOtherPublishersPong(object sender, ElapsedEventArgs e)
84        {
85            if (this.otherPublisherHadResponded)
86            {
87                GuiLogging("Can't assume functionality of an alive Publishers. So starting this workspace isn't possible!", NotificationLevel.Error);
88                Stop(PubSubMessageType.PublisherRivalryProblem);
89            }
90            else
91            {
92                this.waitingForOtherPublishersPong.Stop();
93                GuiLogging("First trial to assume functionality of the old Publisher.", NotificationLevel.Debug);
94                // we have to delete all OLD Publishers entries to assume its functionality
95                DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.topic);
96                Start(this.topic, this.aliveMessageInterval);
97            }
98        }
99
100        /// <summary>
101        /// if peers are outdated (alive message doesn't arrive in the given interval)
102        /// ping them to give them a second chance
103        /// </summary>
104        /// <param name="sender"></param>
105        /// <param name="e"></param>
106        private void OnWaitingForAliveMsg(object sender, ElapsedEventArgs e)
107        {
108            // get second chance list from SubscribersManagement (Second chance list = The timespans of this subscribers are expired)
109            List<PeerId> lstOutdatedSubscribers = this.peerManagement.CheckVitality();
110            foreach (PeerId outdatedSubscriber in lstOutdatedSubscribers)
111            {
112                this.p2pControl.SendToPeer(PubSubMessageType.Ping, outdatedSubscriber);
113                GuiLogging("PING outdated peer " + outdatedSubscriber, NotificationLevel.Debug);
114            }
115        }
116
117        #region public methods (Start, Publish, Stop)
118
119        protected virtual void AssignManagement(long aliveMessageInterval)
120        {
121            this.peerManagement = new SubscriberManagement(aliveMessageInterval);
122            this.peerManagement.OnSubscriberRemoved += new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
123        }
124
125        /// <summary>
126        /// Starts the publisher and checks whether there is already a publisher for this topic (than returns false)
127        /// </summary>
128        /// <param name="sTopic">Topic, at which the subscribers can register themselves</param>
129        /// <param name="aliveMessageInterval">Declare interval (in sec) in which every subscriber has to send an alive message to the publisher</param>
130        /// <returns>true, if writing all necessary information was written in DHT, otherwise false</returns>
131        public void Start(string sTopic, long aliveMessageInterval)
132        {
133            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
134            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
135            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
136            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
137
138            /* BEGIN: CHECKING WHETHER THERE HAS ALREADY EXIST ANOTHER PUBLISHER */
139            this.topic = sTopic;
140            this.aliveMessageInterval = aliveMessageInterval;
141            AssignManagement(this.aliveMessageInterval);
142
143            // publish own PeerID to the DHT Entry with the key "TaskName", so every subscriber
144            // can retrieve the name and send a register-message to the publisher
145            string sPeerName;
146            PeerId myPeerId = this.p2pControl.GetPeerID(out sPeerName);
147            this.ownPeerId = myPeerId;
148
149            // before storing the publishers ID in the DHT, proof whether there already exists an entry
150            PeerId byRead = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, sTopic);
151
152            // if byRead is not null, the DHT entry was already written
153            if (byRead != null)
154            {
155                // if a different Publisher was found at the DHT entry, send a ping msg
156                // and wait for Pong-Response. When this won't arrive in the given TimeSpan,
157                // assume the Publishing functionality
158                if (byRead != myPeerId)
159                {   
160                    this.otherPublisherHadResponded = false;
161                    this.otherPublisherPeer = byRead;
162                    this.waitingForOtherPublishersPong.Enabled = true;
163
164                    this.p2pControl.SendToPeer(PubSubMessageType.Ping, byRead);
165
166                    GuiLogging("Another Publisher was found. Waiting for Pong-Response for "
167                        + INTERVAL_WAITING_FOR_OTHER_PUBS_PONG / 1000 + " seconds. When it won't response "
168                        + "assume its functionality.", NotificationLevel.Debug);
169                    return;
170                }
171            }
172            /* END: CHECKING WHETHER THERE HAS ALREADY EXIST ANOTHER PUBLISHER */
173
174            bool bolTopicStored = DHT_CommonManagement.SetTopicsPublisherId(ref this.p2pControl, sTopic, myPeerId);
175            bool bolSettingsStored = DHT_CommonManagement.SetAliveMessageInterval(ref this.p2pControl, sTopic, this.aliveMessageInterval);
176
177            if (!bolTopicStored || !bolSettingsStored)
178            {
179                GuiLogging("Storing Publishers ID and/or Publishers Settings wasn't possible.", NotificationLevel.Error);
180                return;
181            }
182
183            GuiLogging("Peer ID '" + myPeerId + "' is published to DHT -Entry-Key '" + this.topic + "'", NotificationLevel.Info);
184            this.Started = true;
185
186            PeerCompletelyStarted();
187        }
188
189        /// <summary>
190        /// Will only be thrown, when Publisher has successfully registered in the p2p network!
191        /// </summary>
192        protected virtual void PeerCompletelyStarted()
193        {  }
194
195        /// <summary>
196        /// Publish text to ALL active subscribers (subscribers of the second chance list included)
197        /// </summary>
198        /// <param name="sText"></param>
199        /// <returns>Amount of Subscribers, to which the message was sent</returns>
200        public virtual int Publish(string sText)
201        {
202            int i = 0;
203            List<PeerId> lstSubscribers = this.peerManagement.GetAllSubscribers();
204
205            foreach (PeerId subscriber in lstSubscribers)
206            {
207                this.p2pControl.SendToPeer(sText, subscriber);
208                i++;
209            }
210            return i;
211        }
212
213        /// <summary>
214        /// Sends a given message to all subscribers, deletes publishers information from DHT and stops all timers.
215        /// </summary>
216        /// <param name="msgType">Choose an according MessageType (for example: Unregister or Stop), so every subscriber can handle
217        /// the LAST message from the publisher correctly!</param>
218        public virtual void Stop(PubSubMessageType msgType)
219        {
220            // don't remove informations, when it occurs a rivalry problem between two
221            // Publishers, because otherwise this will kill the whole topic-solution-network.
222            if (this.p2pControl != null && this.p2pControl.PeerStarted() && msgType != PubSubMessageType.PublisherRivalryProblem)
223            {
224                GuiLogging("Begin removing the information from the DHT", NotificationLevel.Debug);
225
226                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.topic);
227
228                if (removeSettings)
229                    GuiLogging("Publishers/Managers ID and Alive Message Interval successfully removed from DHT.", NotificationLevel.Debug);
230                else
231                    GuiLogging("Neither Topic nor settings were removed from DHT.", NotificationLevel.Debug);
232
233                // send unregister message to all subscribers
234                int i = SendInternalMsg(msgType);
235                GuiLogging("Unregister messages were sent to " + i.ToString() + " subscribers!", NotificationLevel.Info);
236            }
237
238            GuiLogging("Stopping all timers.", NotificationLevel.Debug);
239
240            this.timerWaitingForAliveMsg.Stop();
241            this.waitingForOtherPublishersPong.Stop();
242
243            GuiLogging("Deregister message-received-events", NotificationLevel.Debug);
244            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
245            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
246
247            GuiLogging("Publisher completely stopped!", NotificationLevel.Info);
248
249            this.Started = false;
250        }
251
252        #endregion
253
254        #region private Methods (MessageReceived, SendInternalMsg, OnWaitingForAliveMsg, peerManagement_OnSubscriberRemoved)
255
256        protected virtual void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
257        {
258            if (sender == ownPeerId)
259            {
260                GuiLogging("Received Message from OWN Peer... Strange stuff.", NotificationLevel.Debug);
261                return;
262            }
263
264            switch (msgType)
265            {
266                case PubSubMessageType.Register:
267                    if (this.peerManagement.Add(sender))
268                    {
269                        if(!this.timerWaitingForAliveMsg.Enabled)
270                            this.timerWaitingForAliveMsg.Start();
271                        GuiLogging("REGISTERED: Peer with ID " + sender + "- RegExepted Msg was sent.", NotificationLevel.Info);
272                        this.p2pControl.SendToPeer(PubSubMessageType.RegisteringAccepted, sender);
273                    }
274                    else
275                    {
276                        GuiLogging("ALREADY REGISTERED peer with ID " + sender, NotificationLevel.Info);
277                        // this case only occurs, when some messages run into a misunderstanding,
278                        // but reply with a RegAccepted Msg, so the sender won't send new Reg Msgs
279                        // periodically.
280                        this.p2pControl.SendToPeer(PubSubMessageType.RegisteringAccepted, sender);
281                    }
282                    break;
283                case PubSubMessageType.Stop:
284                case PubSubMessageType.Unregister:
285                    if (!this.peerManagement.Remove(sender))
286                        GuiLogging("ALREADY REMOVED or had not registered anytime. ID " + sender, NotificationLevel.Info);
287                    break;
288                case PubSubMessageType.Alive:
289                case PubSubMessageType.Pong:
290                    if (this.otherPublisherPeer != null && this.otherPublisherPeer == sender)
291                    {
292                        this.otherPublisherHadResponded = true;
293                        this.otherPublisherPeer = null;
294                    }
295                    break;
296                case PubSubMessageType.Ping:
297                    this.p2pControl.SendToPeer(PubSubMessageType.Pong, sender);
298                    GuiLogging("REPLIED to a ping message from peer " + sender, NotificationLevel.Debug);
299                    break;
300                case PubSubMessageType.Solution:
301                    // Send solution msg to all subscriber peers and delete subList
302                    Stop(msgType);
303                    break;
304                default:
305                    throw (new NotImplementedException());
306            } // end switch
307
308            if (msgType != PubSubMessageType.Unregister && msgType != PubSubMessageType.Stop)
309            {
310                if(this.peerManagement.Update(sender))
311                    GuiLogging("UPDATED Peer " + sender + " successfully.", NotificationLevel.Debug);
312                else
313                    GuiLogging("UPDATING Peer " + sender + " failed.", NotificationLevel.Debug);
314            }
315        }
316
317        protected virtual void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
318        {
319            GuiLogging("RECEIVED message from non subscribed peer: " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
320            // if sender is already registered, update its entry in either case
321            if (this.peerManagement.Update(sender))
322            {
323                GuiLogging("UPDATED Peer " + sender + " successfully.", NotificationLevel.Debug);
324            }
325        }
326
327        /// <summary>
328        /// This functions can only send infrastructure-supporting messages to all subscribers
329        /// </summary>
330        /// <param name="msgType"></param>
331        /// <returns>Amount of Subscribers, to which a message was sent</returns>
332        private int SendInternalMsg(PubSubMessageType msgType)
333        {
334            int i = 0;
335            List<PeerId> lstSubscribers = this.peerManagement.GetAllSubscribers();
336            foreach (PeerId subscriber in lstSubscribers)   
337            {
338                this.p2pControl.SendToPeer(msgType, subscriber);
339                i++;
340            }
341            return i;
342        }
343
344        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
345        {
346            GuiLogging("REMOVED peer " + peerId, NotificationLevel.Info);
347        }
348
349        protected void GuiLogging(string sText, NotificationLevel notLev)
350        {
351            if(OnGuiMessage != null)
352                OnGuiMessage(sText, notLev);
353        }
354
355        #endregion
356
357        // Only for testing the (De-)Serialization of SubscribersManagement
358        public void TestSerialization()
359        {
360            /* Get all Subs and serialize them manual */
361            List<PeerId> originalSubList = this.peerManagement.GetAllSubscribers();
362
363            /* Serialize and deserialize active subs automatically */
364            byte[] byResult = this.peerManagement.Serialize();
365            List<PeerId> pid = this.peerManagement.Deserialize(byResult, ref this.p2pControl);
366
367            /* Comparing the deserialized list with the original SubList */
368            bool result = true;
369
370            if (pid.Count != originalSubList.Count)
371            {
372                result = false;
373            }
374            else
375            {
376                int f = 0;
377                foreach (PeerId originalPeer in originalSubList)
378                {
379                    if (originalPeer != pid[f])
380                    {
381                        result = false;
382                        break;
383                    }
384                    f++;
385                }
386            }
387            GuiLogging("Result of serialization/deserialization: " + result, NotificationLevel.Debug);
388        }
389    }
390}
Note: See TracBrowser for help on using the repository browser.