source: trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs @ 813

Last change on this file since 813 was 813, checked in by arnold, 12 years ago

P2P Classes updated (Remove, Base) and removed old P2P Classes (P2PStore, P2PLoad)

File size: 19.1 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2PLink.SimpleSnalNG;
23using PeersAtPlay.P2POverlay.Bootstrapper;
24using PeersAtPlay.P2POverlay;
25using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
26using PeersAtPlay.P2POverlay.FullMeshOverlay;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
29using System.Threading;
30using Cryptool.PluginBase.Control;
31using Cryptool.PluginBase;
32using Cryptool.PluginBase.Miscellaneous;
33using System.ComponentModel;
34using Cryptool.PluginBase.IO;
35
36/*
37 * Synchronous functions successfully tested (store, retrieve)
38 * !!! remove-Function is faulty !!!
39 * Standard connection for test issues is LocalMachineBootstrapper.
40 * IrcBootstrapper also works, but it's to lame for testing issues!
41 *
42 * TODO:
43 * - Add enums (BS,LinkManager,Overlay) to class - make it public to use in cross-cutting classes
44 * - dht.Remove-Method makes problems... "ArgumentNotNullException"
45 *   event though the Parameter is correctly set to a valid value!
46 *   --> forwarded to the p@p-Team
47 * - Testing asynchronous methods incl. EventHandlers
48 */
49namespace Cryptool.Plugins.PeerToPeer
50{
51    /// <summary>
52    /// Wrapper class to integrate peer@play environment into CrypTool.
53    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
54    /// </summary>
55    public class P2PBase
56    {
57        #region Delegates and Events for asynchronous p2p functions
58
59        public delegate void SystemJoined();
60        public event SystemJoined OnSystemJoined;
61
62        public delegate void SystemLeft();
63        public event SystemLeft OnSystemLeft;
64
65        public delegate void P2PMessageReceived(string sMsg);
66        public event P2PMessageReceived OnP2PMessageReceived;
67
68        /// <summary>
69        /// returns true if key-value-pair was successfully stored in the DHT
70        /// </summary>
71        /// <param name="result"></param>
72        public delegate void DHTStoreCompleted(bool result);
73        public event DHTStoreCompleted OnDhtStore_Completed;
74
75        public delegate void DHTLoadCompleted(byte[] loadedData);
76        public event DHTLoadCompleted OnDhtLoad_Completed;
77
78        /// <summary>
79        /// returns true if key was found and removed successfully from the DHT
80        /// </summary>
81        /// <param name="result"></param>
82        public delegate void DHTRemoveCompleted(bool result);
83        public event DHTRemoveCompleted OnDhtRemove_Completed;
84
85        #endregion
86
87        #region Variables
88
89        private IDHT dht;
90        private IP2PLinkManager linkmanager;
91        private IBootstrapper bootstrapper;
92        private P2POverlay overlay;
93        private AutoResetEvent systemJoined;
94        private AutoResetEvent systemLeft;
95
96        /// <summary>
97        /// Dictionary for synchronizing asynchronous DHT retrieves.
98        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
99        /// </summary>
100        private Dictionary<Guid, ResponseWait> waitDict;
101
102        #endregion
103
104        public P2PBase()
105        {
106            this.waitDict = new Dictionary<Guid, ResponseWait>();
107            this.systemJoined = new AutoResetEvent(false);
108            this.systemLeft = new AutoResetEvent(false);
109        }
110
111        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
112
113        /// <summary>
114        /// Initializing is the first step to build a new or access an existing p2p network
115        /// </summary>
116        /// <param name="sUserName">Choose an individual name for the user</param>
117        /// <param name="sWorldName">fundamental: two peers are only in the SAME P2P system, when they initialized the SAME WORLD!</param>
118        /// <param name="linkManagerType"></param>
119        /// <param name="bsType"></param>
120        /// <param name="overlayType"></param>
121        /// <param name="dhtType"></param>
122        public void InitializeAll(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
123        {
124            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
125            switch (linkManagerType)
126            {
127                case P2PLinkManagerType.Snal:
128                    //snal = secure network abstraction layer
129                    this.linkmanager = new Snal();
130                    break;
131                default:
132                    throw (new NotImplementedException());
133                    break;
134            }
135            switch (bsType)
136            {
137                case P2PBootstrapperType.LocalMachineBootstrapper:
138                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
139                    this.bootstrapper = new LocalMachineBootstrapper();
140                    break;
141                case P2PBootstrapperType.IrcBootstrapper:
142                    this.bootstrapper = new IrcBootstrapper();
143                    break;
144                default:
145                    throw (new NotImplementedException());
146                    break;
147            }
148            switch (overlayType)
149            {
150                case P2POverlayType.FullMeshOverlay:
151                    // changing overlay example: this.overlay = new ChordOverlay();
152                    this.overlay = new FullMeshOverlay();
153                    break;
154                default:
155                    throw (new NotImplementedException());
156                    break;
157            }
158            switch (dhtType)
159            {
160                case P2PDHTType.FullMeshDHT:
161                    this.dht = new FullMeshDHT();
162                    break;
163                default:
164                    throw (new NotImplementedException());
165                    break;
166            }
167            #endregion
168
169            this.dht.Initialize(sUserName, sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
170
171            this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
172            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
173            this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
174        }
175
176        /// <summary>
177        /// Initializing is the first step to build a new or access an existing p2p network
178        /// </summary>
179        /// <param name="sUserName">Choose an individual name for the user</param>
180        /// <param name="sWorldName">fundamental: two peers are only in the SAME P2P system, when they initialized the SAME WORLD!</param>
181        public void Initialize(string sUserName, string sWorldName)
182        {
183            //snal = secure network abstraction layer
184            this.linkmanager = new Snal();
185            //LocalMachineBootstrapper = only local connection (runs only on one machine)
186            //for more machines use this line:
187           
188            this.bootstrapper = new IrcBootstrapper();
189            //this.bootstrapper = new LocalMachineBootstrapper();
190
191            // changing overlay example: this.overlay = new ChordOverlay();
192            this.overlay = new FullMeshOverlay();
193            //changing overlay example: this.overlay = new ExampleDHT();
194            this.dht = new FullMeshDHT();
195
196            this.dht.Initialize(sUserName, sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
197
198            this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
199            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
200            this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
201        }
202
203        /// <summary>
204        /// Starts the P2P System. When the given P2P world doesn't exist yet,
205        /// inclusive creating the and bootstrapping to the P2P network.
206        /// In either case joining the P2P world.
207        /// This synchronized method returns true not before the peer has
208        /// successfully joined the network (this may take one or two minutes).
209        /// </summary>
210        /// <returns>True, if the peer has completely joined the p2p network</returns>
211        public bool SynchStart()
212        {
213            //Start != system joined
214            //Only starts the system asynchronous, the possible callback is useless,
215            //because it's invoked before the peer completly joined the P2P system
216            this.dht.BeginStart(null);
217            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
218            this.systemJoined.WaitOne();
219            return true;
220        }
221
222        /// <summary>
223        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
224        /// </summary>
225        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
226        public bool SynchStop()
227        {
228            if (this.dht != null)
229            {
230                this.dht.BeginStop(null);
231                //wait till systemLeft Event is invoked
232                this.systemLeft.WaitOne();
233            }
234            return true;
235        }
236
237
238        /// <summary>
239        /// Asynchronously starting the peer. When the given P2P world doesn't
240        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
241        /// In either case joining the P2P world. To ensure that peer has successfully
242        /// joined the p2p world, catch the event OnSystemJoined.
243        /// </summary>
244        public void AsynchStart()
245        {
246            // no callback usefull, because starting and joining isn't the same
247            // everything else is done by the EventHandler OnDHT_SystemJoined
248            this.dht.BeginStart(null);
249        }
250
251        /// <summary>
252        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
253        /// disjoining, catch the event OnDHT_SystemLeft.
254        /// </summary>
255        public void AsynchStop()
256        {
257            if (this.dht != null)
258            {
259                // no callback usefull.
260                // Everything else is done by the EventHandler OnDHT_SystemLeft
261                this.dht.BeginStop(null);
262            }
263        }
264
265        #endregion
266
267        // not tested and not sure that this function return the right PeerName...
268        /// <summary>
269        /// Get PeerName of the actual peer
270        /// </summary>
271        /// <returns>Peer name of the actual peer</returns>
272        public string GetPeerName()
273        {
274            return this.overlay.LocalAddress.ToString();
275        }
276
277        #region Event Handling (System Joined, Left and Message Received)
278
279        private void OnDHT_SystemJoined(object sender, EventArgs e)
280        {
281            if (OnSystemJoined != null)
282                OnSystemJoined();
283            this.systemJoined.Set();
284        }
285
286        private void OnDHT_SystemLeft(object sender, EventArgs e)
287        {
288            if (OnSystemLeft != null)
289                OnSystemLeft();
290            // as an experiment
291            this.dht = null;
292            this.systemLeft.Set();
293        }
294
295        private void OnDHT_MessageReceived(object sender, MessageReceived e)
296        {
297            if (OnP2PMessageReceived != null)
298                OnP2PMessageReceived("Source: " + e.Source + ", Data:" + e.Data);
299        }
300
301        #endregion
302
303        /*
304         * Attention: The asynchronous methods are not tested at the moment
305         */
306        #region Asynchronous Methods incl. Callbacks
307
308        /// <summary>
309        /// Asynchronously retrieving a key from the DHT. To get value, catch
310        /// event OnDhtLoad_Completed.
311        /// </summary>
312        /// <param name="sKey">Existing key in DHT</param>
313        public void AsynchRetrieve(string sKey)
314        {
315            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
316        }
317        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
318        {
319            if (OnDhtLoad_Completed != null)
320            {
321                OnDhtLoad_Completed(rr.Data);
322            }
323        }
324
325        /// <summary>
326        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
327        /// storing is completed, catch event OnDhtStore_Completed.
328        /// </summary>
329        /// <param name="sKey"></param>
330        /// <param name="sValue"></param>
331        public void AsynchStore(string sKey, string sValue)
332        {
333            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
334        }
335
336        private void OnAsynchStore_Completed(StoreResult sr)
337        {
338            if (OnDhtStore_Completed != null)
339            {
340                if (sr.Status == OperationStatus.Success)
341                    OnDhtStore_Completed(true);
342                else
343                    OnDhtStore_Completed(false);
344            }
345               
346        }
347
348        /// <summary>
349        /// Asynchronously removing an existing key out of the DHT. To ensure
350        /// that removing is completed, catch event OnDhtRemove_Completed.
351        /// </summary>
352        /// <param name="sKey"></param>
353        public void AsynchRemove(string sKey)
354        {
355            this.dht.Remove(OnAsynchRemove_Completed, sKey);
356        }
357        private void OnAsynchRemove_Completed(RemoveResult rr)
358        {
359            if (OnDhtRemove_Completed != null)
360            {
361                if(rr.Status == OperationStatus.Success)
362                    OnDhtRemove_Completed(true);
363                else
364                    OnDhtRemove_Completed(false);
365            }
366        }
367
368        #endregion
369
370        #region Synchronous Methods incl. Callbacks
371
372        /// <summary>
373        /// Stores a value in the DHT at the given key
374        /// </summary>
375        /// <param name="sKey">Key of DHT Entry</param>
376        /// <param name="sValue">Value of DHT Entry</param>
377        /// <returns>True, when storing is completed!</returns>
378        public bool SynchStore(string sKey, string sValue)
379        {
380            AutoResetEvent are = new AutoResetEvent(false);
381            // this method returns always a GUID to distinguish between asynchronous actions
382            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
383
384            ResponseWait rw = new ResponseWait() { WaitHandle = are };
385
386            waitDict.Add(g, rw);
387            //blocking till response
388            are.WaitOne();
389            return true;
390        }
391
392        /// <summary>
393        /// Get the value of the given DHT Key or null, if it doesn't exist.
394        /// For synchronous environments use the Synch* methods.
395        /// </summary>
396        /// <param name="sKey">Key of DHT Entry</param>
397        /// <returns>Value of DHT Entry</returns>
398        public byte[] SynchRetrieve(string sKey)
399        {
400            AutoResetEvent are = new AutoResetEvent(false);
401            // this method returns always a GUID to distinguish between asynchronous actions
402            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
403           
404            ResponseWait rw = new ResponseWait() {WaitHandle = are };
405           
406            waitDict.Add(g,rw  );
407            // blocking till response
408            are.WaitOne();
409            //Rückgabe der Daten
410            return rw.Message;
411        }
412
413        /// <summary>
414        /// Removes a key/value pair out of the DHT
415        /// </summary>
416        /// <param name="sKey">Key of the DHT Entry</param>
417        /// <returns>True, when removing is completed!</returns>
418        public bool SynchRemove(string sKey)
419        {
420            AutoResetEvent are = new AutoResetEvent(false);
421            // this method returns always a GUID to distinguish between asynchronous actions
422
423            // ROAD WORKS: This function throws an error (ArgumentNotNullException).
424            //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
425            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
426
427            ResponseWait rw = new ResponseWait() { WaitHandle = are };
428
429            waitDict.Add(g, rw);
430            // blocking till response
431            are.WaitOne();
432            return true;
433        }
434
435        /// <summary>
436        /// Callback for a the synchronized store method
437        /// </summary>
438        /// <param name="rr"></param>
439        private void OnSynchStoreCompleted(StoreResult sr)
440        {
441            ResponseWait rw;
442            if (this.waitDict.TryGetValue(sr.Guid, out rw))
443            {
444                rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
445
446                //unblock WaitHandle in the synchronous method
447                rw.WaitHandle.Set();
448                // don't know if this accelerates the system...
449                this.waitDict.Remove(sr.Guid);
450            }
451        }
452
453        /// <summary>
454        /// Callback for a the synchronized retrieval method
455        /// </summary>
456        /// <param name="rr"></param>
457        private void OnSynchRetrieveCompleted(RetrieveResult rr)
458        {
459            ResponseWait rw;
460
461            if (this.waitDict.TryGetValue(rr.Guid, out rw))
462            {
463                rw.Message = rr.Data;
464
465                //unblock WaitHandle in the synchronous method
466                rw.WaitHandle.Set(); 
467                // don't know if this accelerates the system...
468                this.waitDict.Remove(rr.Guid);
469            }
470        }
471
472        /// <summary>
473        /// Callback for a the synchronized remove method
474        /// </summary>
475        /// <param name="rr"></param>
476        private void OnSynchRemoveCompleted(RemoveResult rr)
477        {
478            ResponseWait rw;
479            if (this.waitDict.TryGetValue(rr.Guid, out rw))
480            {
481                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
482
483                //unblock WaitHandle in the synchronous method
484                rw.WaitHandle.Set();
485                // don't know if this accelerates the system...
486                this.waitDict.Remove(rr.Guid);
487            }
488        }
489
490        #endregion
491
492        /// <summary>
493        /// To log the internal state in the Monitoring Software of P@play
494        /// </summary>
495        public void LogInternalState()
496        {
497            if (this.dht != null)
498            {
499                this.dht.LogInternalState();
500            }
501        }
502    }
503}
Note: See TracBrowser for help on using the repository browser.