source: trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs @ 836

Last change on this file since 836 was 836, checked in by arnold, 12 years ago

P2P Publish/Subscriber (buggy, aber für Präsentationszwecke geeignet)

File size: 20.0 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2PLink.SimpleSnalNG;
23using PeersAtPlay.P2POverlay.Bootstrapper;
24using PeersAtPlay.P2POverlay;
25using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
26using PeersAtPlay.P2POverlay.FullMeshOverlay;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
29using System.Threading;
30using Cryptool.PluginBase.Control;
31using Cryptool.PluginBase;
32using Cryptool.PluginBase.Miscellaneous;
33using System.ComponentModel;
34using Cryptool.PluginBase.IO;
35using PeersAtPlay;
36
37/*
38 * Synchronous functions successfully tested (store, retrieve)
39 * !!! remove-Function is faulty - open field !!!
40 *
41 * TODO:
42 * - dht.Remove-Method makes problems... "ArgumentNotNullException"
43 *   event though the Parameter is correctly set to a valid value!
44 *   --> forwarded to the p@p-Team
45 * - Testing asynchronous methods incl. EventHandlers
46 */
47namespace Cryptool.Plugins.PeerToPeer
48{
49    /* Advantages of this wrapper class:
50     * - The PeerAtPlay-Libraries are only referenced in this project
51     *   --> so they're easy to update
52     * - PeerAtPlay only works with asynchronous methods, so this class
53     *   "synchronizes" this methods.
54     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
55     *   issue is obfuscated by this wrapper class
56     */
57    /// <summary>
58    /// Wrapper class to integrate peer@play environment into CrypTool.
59    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
60    /// </summary>
61    public class P2PBase
62    {
63        #region Delegates and Events for asynchronous p2p functions
64
65        public delegate void SystemJoined();
66        public event SystemJoined OnSystemJoined;
67
68        public delegate void SystemLeft();
69        public event SystemLeft OnSystemLeft;
70
71        public delegate void P2PMessageReceived(byte[] byteSourceAddr, string sData);
72        public event P2PMessageReceived OnP2PMessageReceived;
73
74        /// <summary>
75        /// returns true if key-value-pair is successfully stored in the DHT
76        /// </summary>
77        /// <param name="result"></param>
78        public delegate void DHTStoreCompleted(bool result);
79        public event DHTStoreCompleted OnDhtStore_Completed;
80
81        public delegate void DHTLoadCompleted(byte[] loadedData);
82        public event DHTLoadCompleted OnDhtLoad_Completed;
83
84        /// <summary>
85        /// returns true if key was found and removed successfully from the DHT
86        /// </summary>
87        /// <param name="result"></param>
88        public delegate void DHTRemoveCompleted(bool result);
89        public event DHTRemoveCompleted OnDhtRemove_Completed;
90
91        #endregion
92
93        #region Variables
94
95        private IDHT dht;
96        private IP2PLinkManager linkmanager;
97        private IBootstrapper bootstrapper;
98        private P2POverlay overlay;
99        private AutoResetEvent systemJoined;
100        private AutoResetEvent systemLeft;
101
102        /// <summary>
103        /// Dictionary for synchronizing asynchronous DHT retrieves.
104        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
105        /// </summary>
106        private Dictionary<Guid, ResponseWait> waitDict;
107
108        #endregion
109
110        public P2PBase()
111        {
112            this.waitDict = new Dictionary<Guid, ResponseWait>();
113            this.systemJoined = new AutoResetEvent(false);
114            this.systemLeft = new AutoResetEvent(false);
115        }
116
117        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
118
119        /// <summary>
120        /// Initializing is the first step to build a new or access an existing p2p network
121        /// </summary>
122        /// <param name="sUserName">Choose an individual name for the user</param>
123        /// <param name="sWorldName">fundamental: two peers are only in the SAME P2P system, when they initialized the SAME WORLD!</param>
124        /// <param name="linkManagerType"></param>
125        /// <param name="bsType"></param>
126        /// <param name="overlayType"></param>
127        /// <param name="dhtType"></param>
128        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
129        {
130            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
131            switch (linkManagerType)
132            {
133                case P2PLinkManagerType.Snal:
134                    //snal = secure network abstraction layer
135                    this.linkmanager = new Snal();
136                    break;
137                default:
138                    throw (new NotImplementedException());
139            }
140            switch (bsType)
141            {
142                case P2PBootstrapperType.LocalMachineBootstrapper:
143                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
144                    this.bootstrapper = new LocalMachineBootstrapper();
145                    break;
146                case P2PBootstrapperType.IrcBootstrapper:
147                    this.bootstrapper = new IrcBootstrapper();
148                    break;
149                default:
150                    throw (new NotImplementedException());
151            }
152            switch (overlayType)
153            {
154                case P2POverlayType.FullMeshOverlay:
155                    // changing overlay example: this.overlay = new ChordOverlay();
156                    this.overlay = new FullMeshOverlay();
157                    break;
158                default:
159                    throw (new NotImplementedException());
160            }
161            switch (dhtType)
162            {
163                case P2PDHTType.FullMeshDHT:
164                    this.dht = new FullMeshDHT();
165                    break;
166                default:
167                    throw (new NotImplementedException());
168            }
169            #endregion
170
171            this.dht.Initialize(sUserName, sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
172
173            this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
174            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
175            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
176            this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
177        }
178
179        /// <summary>
180        /// Starts the P2P System. When the given P2P world doesn't exist yet,
181        /// inclusive creating the and bootstrapping to the P2P network.
182        /// In either case joining the P2P world.
183        /// This synchronized method returns true not before the peer has
184        /// successfully joined the network (this may take one or two minutes).
185        /// </summary>
186        /// <returns>True, if the peer has completely joined the p2p network</returns>
187        public bool SynchStart()
188        {
189            //Start != system joined
190            //Only starts the system asynchronous, the possible callback is useless,
191            //because it's invoked before the peer completly joined the P2P system
192            this.dht.BeginStart(null);
193            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
194            this.systemJoined.WaitOne();
195            return true;
196        }
197
198        /*TESTING AREA - COMPLETELY STOP THE WHOLE P2P SYSTEM*/
199        /// <summary>
200        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
201        /// </summary>
202        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
203        public bool SynchStop()
204        {
205            if (this.dht != null)
206            {
207                this.dht.BeginStop(null);
208                //wait till systemLeft Event is invoked
209                this.overlay.BeginStop(null);
210                this.linkmanager.BeginStop(null);
211                this.bootstrapper.Dispose();
212                this.systemLeft.WaitOne();
213            }
214            return true;
215        }
216
217
218        /// <summary>
219        /// Asynchronously starting the peer. When the given P2P world doesn't
220        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
221        /// In either case joining the P2P world. To ensure that peer has successfully
222        /// joined the p2p world, catch the event OnSystemJoined.
223        /// </summary>
224        public void AsynchStart()
225        {
226            // no callback usefull, because starting and joining isn't the same
227            // everything else is done by the EventHandler OnDHT_SystemJoined
228            this.dht.BeginStart(null);
229        }
230
231        /// <summary>
232        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
233        /// disjoining, catch the event OnDHT_SystemLeft.
234        /// </summary>
235        public void AsynchStop()
236        {
237            if (this.dht != null)
238            {
239                // no callback usefull.
240                // Everything else is done by the EventHandler OnDHT_SystemLeft
241                this.dht.BeginStop(null);
242            }
243        }
244
245        #endregion
246
247        /// <summary>
248        /// Get PeerName of the actual peer
249        /// </summary>
250        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
251        /// <returns>PeerID as an byte array (suitable for correct addressing on the overlay)</returns>
252        public byte[] GetPeerID(out string sPeerName)
253        {
254            sPeerName = this.linkmanager.UserName;
255            return this.overlay.LocalAddress.ToByteArray();
256        }
257
258        // overlay.LocalAddress = Overlay-Peer-Address/Names
259        public void SendToPeer(string sData, byte[] byteDestinationPeerName)
260        {
261            ByteStack byteData = new ByteStack();
262            byteData.PushUTF8String(sData);
263
264            OverlayAddress destinationAddr = this.overlay.GetAddress(byteDestinationPeerName);
265
266            // create own message receiver type... => P2PBase, so only this Application
267            // receives Messages and not all active application on the SAME overlay!
268            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
269                this.overlay.LocalAddress,destinationAddr, byteData);
270            this.overlay.Send(overlayMsg);
271        }
272
273        #region Event Handling (System Joined, Left and Message Received)
274
275        private void OnDHT_SystemJoined(object sender, EventArgs e)
276        {
277            if (OnSystemJoined != null)
278                OnSystemJoined();
279            this.systemJoined.Set();
280        }
281
282        private void OnDHT_SystemLeft(object sender, EventArgs e)
283        {
284            if (OnSystemLeft != null)
285                OnSystemLeft();
286            // as an experiment
287            this.dht = null;
288            this.systemLeft.Set();
289        }
290
291        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
292        {
293            if (OnP2PMessageReceived != null)
294                OnP2PMessageReceived(e.Message.Source.ToByteArray(), e.Message.Data.PopUTF8String());
295        }
296
297        private void OnDHT_MessageReceived(object sender, MessageReceived e)
298        {
299            if (OnP2PMessageReceived != null)
300                OnP2PMessageReceived(e.Source.ToByteArray(), e.Data.PopUTF8String());
301        }
302
303        #endregion
304
305        /*
306         * Attention: The asynchronous methods are not tested at the moment
307         */
308        #region Asynchronous Methods incl. Callbacks
309
310        /// <summary>
311        /// Asynchronously retrieving a key from the DHT. To get value, catch
312        /// event OnDhtLoad_Completed.
313        /// </summary>
314        /// <param name="sKey">Existing key in DHT</param>
315        public void AsynchRetrieve(string sKey)
316        {
317            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
318        }
319        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
320        {
321            if (OnDhtLoad_Completed != null)
322            {
323                OnDhtLoad_Completed(rr.Data);
324            }
325        }
326
327        /// <summary>
328        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
329        /// storing is completed, catch event OnDhtStore_Completed.
330        /// </summary>
331        /// <param name="sKey"></param>
332        /// <param name="sValue"></param>
333        public void AsynchStore(string sKey, string sValue)
334        {
335            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
336        }
337
338        private void OnAsynchStore_Completed(StoreResult sr)
339        {
340            if (OnDhtStore_Completed != null)
341            {
342                if (sr.Status == OperationStatus.Success)
343                    OnDhtStore_Completed(true);
344                else
345                    OnDhtStore_Completed(false);
346            }
347               
348        }
349
350        /// <summary>
351        /// Asynchronously removing an existing key out of the DHT. To ensure
352        /// that removing is completed, catch event OnDhtRemove_Completed.
353        /// </summary>
354        /// <param name="sKey"></param>
355        public void AsynchRemove(string sKey)
356        {
357            this.dht.Remove(OnAsynchRemove_Completed, sKey);
358        }
359        private void OnAsynchRemove_Completed(RemoveResult rr)
360        {
361            if (OnDhtRemove_Completed != null)
362            {
363                if(rr.Status == OperationStatus.Success)
364                    OnDhtRemove_Completed(true);
365                else
366                    OnDhtRemove_Completed(false);
367            }
368        }
369
370        #endregion
371
372        #region Synchronous Methods incl. Callbacks
373
374        /// <summary>
375        /// Stores a value in the DHT at the given key
376        /// </summary>
377        /// <param name="sKey">Key of DHT Entry</param>
378        /// <param name="byteData">Value of DHT Entry</param>
379        /// <returns>True, when storing is completed!</returns>
380        public bool SynchStore(string sKey, byte[] byteData)
381        {
382            AutoResetEvent are = new AutoResetEvent(false);
383            // this method returns always a GUID to distinguish between asynchronous actions
384            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
385
386            ResponseWait rw = new ResponseWait() { WaitHandle = are };
387
388            waitDict.Add(g, rw);
389            //blocking till response
390            are.WaitOne();
391            return true;
392        }
393
394        /// <summary>
395        /// Stores a value in the DHT at the given key
396        /// </summary>
397        /// <param name="sKey">Key of DHT Entry</param>
398        /// <param name="sValue">Value of DHT Entry</param>
399        /// <returns>True, when storing is completed!</returns>
400        public bool SynchStore(string sKey, string sData)
401        {
402            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
403        }
404
405        /// <summary>
406        /// Get the value of the given DHT Key or null, if it doesn't exist.
407        /// For synchronous environments use the Synch* methods.
408        /// </summary>
409        /// <param name="sKey">Key of DHT Entry</param>
410        /// <returns>Value of DHT Entry</returns>
411        public byte[] SynchRetrieve(string sKey)
412        {
413            AutoResetEvent are = new AutoResetEvent(false);
414            // this method returns always a GUID to distinguish between asynchronous actions
415            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
416           
417            ResponseWait rw = new ResponseWait() {WaitHandle = are };
418           
419            waitDict.Add(g,rw  );
420            // blocking till response
421            are.WaitOne();
422            //Rückgabe der Daten
423            return rw.Message;
424        }
425
426        /// <summary>
427        /// Removes a key/value pair out of the DHT
428        /// </summary>
429        /// <param name="sKey">Key of the DHT Entry</param>
430        /// <returns>True, when removing is completed!</returns>
431        public bool SynchRemove(string sKey)
432        {
433            AutoResetEvent are = new AutoResetEvent(false);
434            // this method returns always a GUID to distinguish between asynchronous actions
435
436            // ROAD WORKS: This function throws an error (ArgumentNotNullException).
437            //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
438            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
439
440            ResponseWait rw = new ResponseWait() { WaitHandle = are };
441
442            waitDict.Add(g, rw);
443            // blocking till response
444            are.WaitOne();
445            return true;
446        }
447
448        /// <summary>
449        /// Callback for a the synchronized store method
450        /// </summary>
451        /// <param name="rr"></param>
452        private void OnSynchStoreCompleted(StoreResult sr)
453        {
454            ResponseWait rw;
455            if (this.waitDict.TryGetValue(sr.Guid, out rw))
456            {
457                // if Status == Error, than the version of the value is out of date.
458                // There is a versioning system in the DHT. So you must retrieve the
459                // key and than store the new value --> that's it, but much traffic.
460                // to be fixed in a few weeks from M.Helling
461                rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
462
463                //unblock WaitHandle in the synchronous method
464                rw.WaitHandle.Set();
465                // don't know if this accelerates the system...
466                this.waitDict.Remove(sr.Guid);
467            }
468        }
469
470        /// <summary>
471        /// Callback for a the synchronized retrieval method
472        /// </summary>
473        /// <param name="rr"></param>
474        private void OnSynchRetrieveCompleted(RetrieveResult rr)
475        {
476            ResponseWait rw;
477
478            if (this.waitDict.TryGetValue(rr.Guid, out rw))
479            {
480                rw.Message = rr.Data;
481
482                //unblock WaitHandle in the synchronous method
483                rw.WaitHandle.Set(); 
484                // don't know if this accelerates the system...
485                this.waitDict.Remove(rr.Guid);
486            }
487        }
488
489        /// <summary>
490        /// Callback for a the synchronized remove method
491        /// </summary>
492        /// <param name="rr"></param>
493        private void OnSynchRemoveCompleted(RemoveResult rr)
494        {
495            ResponseWait rw;
496            if (this.waitDict.TryGetValue(rr.Guid, out rw))
497            {
498                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
499
500                //unblock WaitHandle in the synchronous method
501                rw.WaitHandle.Set();
502                // don't know if this accelerates the system...
503                this.waitDict.Remove(rr.Guid);
504            }
505        }
506
507        #endregion
508
509        /// <summary>
510        /// To log the internal state in the Monitoring Software of P@play
511        /// </summary>
512        public void LogInternalState()
513        {
514            if (this.dht != null)
515            {
516                this.dht.LogInternalState();
517            }
518        }
519    }
520}
Note: See TracBrowser for help on using the repository browser.