source: trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs @ 797

Last change on this file since 797 was 797, checked in by arnold, 12 years ago

P2P DHT Operations

File size: 15.9 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2PLink.SimpleSnalNG;
23using PeersAtPlay.P2POverlay.Bootstrapper;
24using PeersAtPlay.P2POverlay;
25using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
26using PeersAtPlay.P2POverlay.FullMeshOverlay;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
29using System.Threading;
30using Cryptool.PluginBase.Control;
31using Cryptool.PluginBase;
32using Cryptool.PluginBase.Miscellaneous;
33using System.ComponentModel;
34using Cryptool.PluginBase.IO;
35
36/*
37 * Synchronous functions successfully tested (store, retrieve)
38 * Standard connection for test issues is LocalMachineBootstrapper.
39 * IrcBootstrapper also works, but it's to lame for testing issues!
40 *
41 * TODO:
42 * - dht.Remove-Method makes problems... "ArgumentNotNullException"
43 *   event though the Parameter is correctly set to a valid value!
44 *   --> forwarded to the p@p-Team
45 * - Testing asynchronous methods incl. EventHandlers
46 *
47 * TO DO in Related Projects:
48 * - Change stand-alone P2P-Apps to IControl-Slaves from one P2P-Master (store, load, (remove))
49 */
50namespace Cryptool.Plugins.PeerToPeer
51{
52    /// <summary>
53    /// Wrapper class to integrate peer@play environment into CrypTool.
54    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
55    /// </summary>
56    public class P2PBase
57    {
58        #region Delegates and Events for asynchronous p2p functions
59
60        public delegate void SystemJoined();
61        public event SystemJoined OnSystemJoined;
62
63        public delegate void SystemLeft();
64        public event SystemLeft OnSystemLeft;
65
66        public delegate void P2PMessageReceived(string sMsg);
67        public event P2PMessageReceived OnP2PMessageReceived;
68
69        /// <summary>
70        /// returns true if key-value-pair was successfully stored in the DHT
71        /// </summary>
72        /// <param name="result"></param>
73        public delegate void DHTStoreCompleted(bool result);
74        public event DHTStoreCompleted OnDhtStore_Completed;
75
76        public delegate void DHTLoadCompleted(byte[] loadedData);
77        public event DHTLoadCompleted OnDhtLoad_Completed;
78
79        /// <summary>
80        /// returns true if key was found and removed successfully from the DHT
81        /// </summary>
82        /// <param name="result"></param>
83        public delegate void DHTRemoveCompleted(bool result);
84        public event DHTRemoveCompleted OnDhtRemove_Completed;
85
86        #endregion
87
88        #region Variables
89
90        private IDHT dht;
91        private IP2PLinkManager linkmanager;
92        private IBootstrapper bootstrapper;
93        private P2POverlay overlay;
94        private AutoResetEvent systemJoined;
95        private AutoResetEvent systemLeft;
96
97        /// <summary>
98        /// Dictionary for synchronizing asynchronous DHT retrieves.
99        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
100        /// </summary>
101        private Dictionary<Guid, ResponseWait> waitDict;
102
103        #endregion
104
105        public P2PBase()
106        {
107            this.waitDict = new Dictionary<Guid, ResponseWait>();
108            this.systemJoined = new AutoResetEvent(false);
109            this.systemLeft = new AutoResetEvent(false);
110        }
111
112        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
113
114        /// <summary>
115        /// Initializing is the first step to build a new or access an existing p2p network
116        /// </summary>
117        /// <param name="sUserName">Choose an individual name for the user</param>
118        /// <param name="sWorldName">fundamental: two peers are only in the SAME P2P system, when they initialized the SAME WORLD!</param>
119        public void Initialize(string sUserName, string sWorldName)
120        {
121            //snal = secure network abstraction layer
122            this.linkmanager = new Snal();
123            //LocalMachineBootstrapper = only local connection (runs only on one machine)
124            //for more machines use this line:
125           
126            this.bootstrapper = new IrcBootstrapper();
127            //this.bootstrapper = new LocalMachineBootstrapper();
128
129            // changing overlay example: this.overlay = new ChordOverlay();
130            this.overlay = new FullMeshOverlay();
131            //changing overlay example: this.overlay = new ExampleDHT();
132            this.dht = new FullMeshDHT();
133
134            this.dht.Initialize(sUserName, sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
135
136            this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
137            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
138            this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
139        }
140
141        /// <summary>
142        /// Starts the P2P System. When the given P2P world doesn't exist yet,
143        /// inclusive creating the and bootstrapping to the P2P network.
144        /// In either case joining the P2P world.
145        /// This synchronized method returns true not before the peer has
146        /// successfully joined the network (this may take one or two minutes).
147        /// </summary>
148        /// <returns>True, if the peer has completely joined the p2p network</returns>
149        public bool SynchStart()
150        {
151            //Start != system joined
152            //Only starts the system asynchronous, the possible callback is useless,
153            //because it's invoked before the peer completly joined the P2P system
154            this.dht.BeginStart(null);
155            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
156            this.systemJoined.WaitOne();
157            return true;
158        }
159
160        /// <summary>
161        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
162        /// </summary>
163        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
164        public bool SynchStop()
165        {
166            if (this.dht != null)
167            {
168                this.dht.BeginStop(null);
169                //wait till systemLeft Event is invoked
170                this.systemLeft.WaitOne();
171            }
172            return true;
173        }
174
175
176        /// <summary>
177        /// Asynchronously starting the peer. When the given P2P world doesn't
178        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
179        /// In either case joining the P2P world. To ensure that peer has successfully
180        /// joined the p2p world, catch the event OnSystemJoined.
181        /// </summary>
182        public void AsynchStart()
183        {
184            // no callback usefull, because starting and joining isn't the same
185            // everything else is done by the EventHandler OnDHT_SystemJoined
186            this.dht.BeginStart(null);
187        }
188
189        /// <summary>
190        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
191        /// disjoining, catch the event OnDHT_SystemLeft.
192        /// </summary>
193        public void AsynchStop()
194        {
195            if (this.dht != null)
196            {
197                // no callback usefull.
198                // Everything else is done by the EventHandler OnDHT_SystemLeft
199                this.dht.BeginStop(null);
200            }
201        }
202
203        #endregion
204
205        #region Event Handling (System Joined, Left and Message Received)
206
207        private void OnDHT_SystemJoined(object sender, EventArgs e)
208        {
209            if (OnSystemJoined != null)
210                OnSystemJoined();
211            this.systemJoined.Set();
212        }
213
214        private void OnDHT_SystemLeft(object sender, EventArgs e)
215        {
216            if (OnSystemLeft != null)
217                OnSystemLeft();
218            this.systemLeft.Set();
219        }
220
221        private void OnDHT_MessageReceived(object sender, MessageReceived e)
222        {
223            if (OnP2PMessageReceived != null)
224                OnP2PMessageReceived("Source: " + e.Source + ", Data:" + e.Data);
225        }
226
227        #endregion
228
229        /*
230         * Attention: The asynchronous methods are not tested at the moment
231         */
232        #region Asynchronous Methods incl. Callbacks
233
234        /// <summary>
235        /// Asynchronously retrieving a key from the DHT. To get value, catch
236        /// event OnDhtLoad_Completed.
237        /// </summary>
238        /// <param name="sKey">Existing key in DHT</param>
239        public void AsynchRetrieve(string sKey)
240        {
241            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
242        }
243        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
244        {
245            if (OnDhtLoad_Completed != null)
246            {
247                OnDhtLoad_Completed(rr.Data);
248            }
249        }
250
251        /// <summary>
252        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
253        /// storing is completed, catch event OnDhtStore_Completed.
254        /// </summary>
255        /// <param name="sKey"></param>
256        /// <param name="sValue"></param>
257        public void AsynchStore(string sKey, string sValue)
258        {
259            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
260        }
261
262        private void OnAsynchStore_Completed(StoreResult sr)
263        {
264            if (OnDhtStore_Completed != null)
265            {
266                if (sr.Status == OperationStatus.Success)
267                    OnDhtStore_Completed(true);
268                else
269                    OnDhtStore_Completed(false);
270            }
271               
272        }
273
274        /// <summary>
275        /// Asynchronously removing an existing key out of the DHT. To ensure
276        /// that removing is completed, catch event OnDhtRemove_Completed.
277        /// </summary>
278        /// <param name="sKey"></param>
279        public void AsynchRemove(string sKey)
280        {
281            this.dht.Remove(OnAsynchRemove_Completed, sKey);
282        }
283        private void OnAsynchRemove_Completed(RemoveResult rr)
284        {
285            if (OnDhtRemove_Completed != null)
286            {
287                if(rr.Status == OperationStatus.Success)
288                    OnDhtRemove_Completed(true);
289                else
290                    OnDhtRemove_Completed(false);
291            }
292        }
293
294        #endregion
295
296        #region Synchronous Methods incl. Callbacks
297
298        /// <summary>
299        /// Stores a value in the DHT at the given key
300        /// </summary>
301        /// <param name="sKey">Key of DHT Entry</param>
302        /// <param name="sValue">Value of DHT Entry</param>
303        /// <returns>True, when storing is completed!</returns>
304        public bool SynchStore(string sKey, string sValue)
305        {
306            AutoResetEvent are = new AutoResetEvent(false);
307            // this method returns always a GUID to distinguish between asynchronous actions
308            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
309
310            ResponseWait rw = new ResponseWait() { WaitHandle = are };
311
312            waitDict.Add(g, rw);
313            //blocking till response
314            are.WaitOne();
315            return true;
316        }
317
318        /// <summary>
319        /// Get the value of the given DHT Key or null, if it doesn't exist.
320        /// For synchronous environments use the Synch* methods.
321        /// </summary>
322        /// <param name="sKey">Key of DHT Entry</param>
323        /// <returns>Value of DHT Entry</returns>
324        public byte[] SynchRetrieve(string sKey)
325        {
326            AutoResetEvent are = new AutoResetEvent(false);
327            // this method returns always a GUID to distinguish between asynchronous actions
328            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
329           
330            ResponseWait rw = new ResponseWait() {WaitHandle = are };
331           
332            waitDict.Add(g,rw  );
333            // blocking till response
334            are.WaitOne();
335            //Rückgabe der Daten
336            return rw.Message;
337        }
338
339        /// <summary>
340        /// Removes a key/value pair out of the DHT
341        /// </summary>
342        /// <param name="sKey">Key of the DHT Entry</param>
343        /// <returns>True, when removing is completed!</returns>
344        public bool SynchRemove(string sKey)
345        {
346            AutoResetEvent are = new AutoResetEvent(false);
347            // this method returns always a GUID to distinguish between asynchronous actions
348
349            // ROAD WORKS: This function throws an error (ArgumentNotNullException).
350            //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
351            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
352
353            ResponseWait rw = new ResponseWait() { WaitHandle = are };
354
355            waitDict.Add(g, rw);
356            // blocking till response
357            are.WaitOne();
358            return true;
359        }
360
361        /// <summary>
362        /// Callback for a the synchronized store method
363        /// </summary>
364        /// <param name="rr"></param>
365        private void OnSynchStoreCompleted(StoreResult sr)
366        {
367            ResponseWait rw;
368            if (this.waitDict.TryGetValue(sr.Guid, out rw))
369            {
370                rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
371
372                //unblock WaitHandle in the synchronous method
373                rw.WaitHandle.Set();
374                // don't know if this accelerates the system...
375                this.waitDict.Remove(sr.Guid);
376            }
377        }
378
379        /// <summary>
380        /// Callback for a the synchronized retrieval method
381        /// </summary>
382        /// <param name="rr"></param>
383        private void OnSynchRetrieveCompleted(RetrieveResult rr)
384        {
385            ResponseWait rw;
386
387            if (this.waitDict.TryGetValue(rr.Guid, out rw))
388            {
389                rw.Message = rr.Data;
390
391                //unblock WaitHandle in the synchronous method
392                rw.WaitHandle.Set(); 
393                // don't know if this accelerates the system...
394                this.waitDict.Remove(rr.Guid);
395            }
396        }
397
398        /// <summary>
399        /// Callback for a the synchronized remove method
400        /// </summary>
401        /// <param name="rr"></param>
402        private void OnSynchRemoveCompleted(RemoveResult rr)
403        {
404            ResponseWait rw;
405            if (this.waitDict.TryGetValue(rr.Guid, out rw))
406            {
407                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
408
409                //unblock WaitHandle in the synchronous method
410                rw.WaitHandle.Set();
411                // don't know if this accelerates the system...
412                this.waitDict.Remove(rr.Guid);
413            }
414        }
415
416        #endregion
417
418        /// <summary>
419        /// To log the internal state in the Monitoring Software of P@play
420        /// </summary>
421        public void LogInternalState()
422        {
423            if (this.dht != null)
424            {
425                this.dht.LogInternalState();
426            }
427        }
428    }
429}
Note: See TracBrowser for help on using the repository browser.