source: trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs @ 783

Last change on this file since 783 was 783, checked in by arnold, 12 years ago

P2P Integration (bisher teilweise noch buggy)
P2PBase, P2PLoad und P2PStore funktionieren bereits
P2P_DHTstore funktioniert noch nicht --> IControl-Versuch

File size: 15.5 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2PLink.SimpleSnalNG;
23using PeersAtPlay.P2POverlay.Bootstrapper;
24using PeersAtPlay.P2POverlay;
25using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
26using PeersAtPlay.P2POverlay.FullMeshOverlay;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
29using System.Threading;
30using Cryptool.PluginBase.Control;
31using Cryptool.PluginBase;
32using Cryptool.PluginBase.Miscellaneous;
33using System.ComponentModel;
34using Cryptool.PluginBase.IO;
35
36/*
37 * Synchronous functions successfully tested (store, retrieve)
38 * Standard connection for test issues is LocalMachineBootstrapper.
39 * IrcBootstrapper also works, but it's to lame for testing issues!
40 *
41 * TODO:
42 * Testing asynchronous methods incl. EventHandlers
43 *
44 * TO DO in Related Projects:
45 * - Change stand-alone P2P-Apps to IControl-Slaves from one P2P-Master (store, load, (remove))
46 */
47namespace Cryptool.Plugins.PeerToPeer
48{
49    /// <summary>
50    /// Wrapper class to integrate peer@play environment into CrypTool.
51    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
52    /// </summary>
53    public class P2PBase
54    {
55        #region Delegates and Events for asynchronous p2p functions
56
57        public delegate void SystemJoined();
58        public event SystemJoined OnSystemJoined;
59
60        public delegate void SystemLeft();
61        public event SystemLeft OnSystemLeft;
62
63        public delegate void P2PMessageReceived(string sMsg);
64        public event P2PMessageReceived OnP2PMessageReceived;
65
66        /// <summary>
67        /// returns true if key-value-pair was successfully stored in the DHT
68        /// </summary>
69        /// <param name="result"></param>
70        public delegate void DHTStoreCompleted(bool result);
71        public event DHTStoreCompleted OnDhtStore_Completed;
72
73        public delegate void DHTLoadCompleted(byte[] loadedData);
74        public event DHTLoadCompleted OnDhtLoad_Completed;
75
76        /// <summary>
77        /// returns true if key was found and removed successfully from the DHT
78        /// </summary>
79        /// <param name="result"></param>
80        public delegate void DHTRemoveCompleted(bool result);
81        public event DHTRemoveCompleted OnDhtRemove_Completed;
82
83        #endregion
84
85
86        #region Variables
87
88        private IDHT dht;
89        private IP2PLinkManager linkmanager;
90        private IBootstrapper bootstrapper;
91        private P2POverlay overlay;
92        private AutoResetEvent systemJoined;
93        private AutoResetEvent systemLeft;
94
95        /// <summary>
96        /// Dictionary for synchronizing asynchronous DHT retrieves.
97        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
98        /// </summary>
99        private Dictionary<Guid, ResponseWait> waitDict;
100
101        #endregion
102
103        public P2PBase()
104        {
105            this.waitDict = new Dictionary<Guid, ResponseWait>();
106            this.systemJoined = new AutoResetEvent(false);
107            this.systemLeft = new AutoResetEvent(false);
108        }
109
110        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
111
112        /// <summary>
113        /// Initializing is the first step to build a new or access an existing p2p network
114        /// </summary>
115        /// <param name="sUserName">Choose an individual name for the user</param>
116        /// <param name="sWorldName">fundamental: two peers are only in the SAME P2P system, when they initialized the SAME WORLD!</param>
117        public void Initialize(string sUserName, string sWorldName)
118        {
119            //snal = secure network abstraction layer
120            this.linkmanager = new Snal();
121            //LocalMachineBootstrapper = only local connection (runs only on one machine)
122            //for more machines use this line:
123           
124            //this.bootstrapper = new IrcBootstrapper();
125            this.bootstrapper = new LocalMachineBootstrapper();
126
127            // changing overlay example: this.overlay = new ChordOverlay();
128            this.overlay = new FullMeshOverlay();
129            //changing overlay example: this.overlay = new ExampleDHT();
130            this.dht = new FullMeshDHT();
131
132            this.dht.Initialize(sUserName, sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
133
134            this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
135            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
136            this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
137        }
138
139        /// <summary>
140        /// Starts the P2P System. When the given P2P world doesn't exist yet,
141        /// inclusive creating the and bootstrapping to the P2P network.
142        /// In either case joining the P2P world.
143        /// This synchronized method returns true not before the peer has
144        /// successfully joined the network (this may take one or two minutes).
145        /// </summary>
146        /// <returns>True, if the peer has completely joined the p2p network</returns>
147        public bool SynchStart()
148        {
149            //Start != system joined
150            //Only starts the system asynchronous, the possible callback is useless,
151            //because it's invoked before the peer completly joined the P2P system
152            this.dht.BeginStart(null);
153            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
154            this.systemJoined.WaitOne();
155            return true;
156        }
157
158        /// <summary>
159        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
160        /// </summary>
161        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
162        public bool SynchStop()
163        {
164            if (this.dht != null)
165            {
166                this.dht.BeginStop(null);
167                //wait till systemLeft Event is invoked
168                this.systemLeft.WaitOne();
169            }
170            return true;
171        }
172
173
174        /// <summary>
175        /// Asynchronously starting the peer. When the given P2P world doesn't
176        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
177        /// In either case joining the P2P world. To ensure that peer has successfully
178        /// joined the p2p world, catch the event OnSystemJoined.
179        /// </summary>
180        public void AsynchStart()
181        {
182            // no callback usefull, because starting and joining isn't the same
183            // everything else is done by the EventHandler OnDHT_SystemJoined
184            this.dht.BeginStart(null);
185        }
186
187        /// <summary>
188        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
189        /// disjoining, catch the event OnDHT_SystemLeft.
190        /// </summary>
191        public void AsynchStop()
192        {
193            if (this.dht != null)
194            {
195                // no callback usefull.
196                // Everything else is done by the EventHandler OnDHT_SystemLeft
197                this.dht.BeginStop(null);
198            }
199        }
200
201        #endregion
202
203        #region Event Handling (System Joined, Left and Message Received)
204
205        private void OnDHT_SystemJoined(object sender, EventArgs e)
206        {
207            if (OnSystemJoined != null)
208                OnSystemJoined();
209            this.systemJoined.Set();
210        }
211
212        private void OnDHT_SystemLeft(object sender, EventArgs e)
213        {
214            if (OnSystemLeft != null)
215                OnSystemLeft();
216            this.systemLeft.Set();
217        }
218
219        private void OnDHT_MessageReceived(object sender, MessageReceived e)
220        {
221            if (OnP2PMessageReceived != null)
222                OnP2PMessageReceived("Source: " + e.Source + ", Data:" + e.Data);
223        }
224
225        #endregion
226
227        /*
228         * Attention: The asynchronous methods are not tested at the moment
229         */
230        #region Asynchronous Methods incl. Callbacks
231
232        /// <summary>
233        /// Asynchronously retrieving a key from the DHT. To get value, catch
234        /// event OnDhtLoad_Completed.
235        /// </summary>
236        /// <param name="sKey">Existing key in DHT</param>
237        public void AsynchRetrieve(string sKey)
238        {
239            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
240        }
241        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
242        {
243            if (OnDhtLoad_Completed != null)
244            {
245                OnDhtLoad_Completed(rr.Data);
246            }
247        }
248
249        /// <summary>
250        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
251        /// storing is completed, catch event OnDhtStore_Completed.
252        /// </summary>
253        /// <param name="sKey"></param>
254        /// <param name="sValue"></param>
255        public void AsynchStore(string sKey, string sValue)
256        {
257            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
258        }
259
260        private void OnAsynchStore_Completed(StoreResult sr)
261        {
262            if (OnDhtStore_Completed != null)
263            {
264                if (sr.Status == OperationStatus.Success)
265                    OnDhtStore_Completed(true);
266                else
267                    OnDhtStore_Completed(false);
268            }
269               
270        }
271
272        /// <summary>
273        /// Asynchronously removing an existing key out of the DHT. To ensure
274        /// that removing is completed, catch event OnDhtRemove_Completed.
275        /// </summary>
276        /// <param name="sKey"></param>
277        public void AsynchRemove(string sKey)
278        {
279            this.dht.Remove(OnAsynchRemove_Completed, sKey);
280        }
281        private void OnAsynchRemove_Completed(RemoveResult rr)
282        {
283            if (OnDhtRemove_Completed != null)
284            {
285                if(rr.Status == OperationStatus.Success)
286                    OnDhtRemove_Completed(true);
287                else
288                    OnDhtRemove_Completed(false);
289            }
290        }
291
292        #endregion
293
294        #region Synchronous Methods incl. Callbacks
295
296        /// <summary>
297        /// Stores a value in the DHT at the given key
298        /// </summary>
299        /// <param name="sKey">Key of DHT Entry</param>
300        /// <param name="sValue">Value of DHT Entry</param>
301        /// <returns>True, when storing is completed!</returns>
302        public bool SynchStore(string sKey, string sValue)
303        {
304            AutoResetEvent are = new AutoResetEvent(false);
305            // this method returns always a GUID to distinguish between asynchronous actions
306            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
307
308            ResponseWait rw = new ResponseWait() { WaitHandle = are };
309
310            waitDict.Add(g, rw);
311            //blocking till response
312            are.WaitOne();
313            return true;
314        }
315
316        /// <summary>
317        /// Get the value of the given DHT Key or null, if it doesn't exist.
318        /// For synchronous environments use the Synch* methods.
319        /// </summary>
320        /// <param name="sKey">Key of DHT Entry</param>
321        /// <returns>Value of DHT Entry</returns>
322        public byte[] SynchRetrieve(string sKey)
323        {
324            AutoResetEvent are = new AutoResetEvent(false);
325            // this method returns always a GUID to distinguish between asynchronous actions
326            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
327           
328            ResponseWait rw = new ResponseWait() {WaitHandle = are };
329           
330            waitDict.Add(g,rw  );
331            // blocking till response
332            are.WaitOne();
333            //Rückgabe der Daten
334            return rw.Message;
335        }
336
337        /// <summary>
338        /// Removes a key/value pair out of the DHT
339        /// </summary>
340        /// <param name="sKey">Key of the DHT Entry</param>
341        /// <returns>True, when removing is completed!</returns>
342        public bool SynchRemove(string sKey)
343        {
344            AutoResetEvent are = new AutoResetEvent(false);
345            // this method returns always a GUID to distinguish between asynchronous actions
346            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
347
348            ResponseWait rw = new ResponseWait() { WaitHandle = are };
349
350            waitDict.Add(g, rw);
351            // blocking till response
352            are.WaitOne();
353            return true;
354        }
355
356        /// <summary>
357        /// Callback for a the synchronized store method
358        /// </summary>
359        /// <param name="rr"></param>
360        private void OnSynchStoreCompleted(StoreResult sr)
361        {
362            ResponseWait rw;
363            if (this.waitDict.TryGetValue(sr.Guid, out rw))
364            {
365                rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
366
367                //unblock WaitHandle in the synchronous method
368                rw.WaitHandle.Set();
369                // don't know if this accelerates the system...
370                this.waitDict.Remove(sr.Guid);
371            }
372        }
373
374        /// <summary>
375        /// Callback for a the synchronized retrieval method
376        /// </summary>
377        /// <param name="rr"></param>
378        private void OnSynchRetrieveCompleted(RetrieveResult rr)
379        {
380            ResponseWait rw;
381
382            if (this.waitDict.TryGetValue(rr.Guid, out rw))
383            {
384                rw.Message = rr.Data;
385
386                //unblock WaitHandle in the synchronous method
387                rw.WaitHandle.Set(); 
388                // don't know if this accelerates the system...
389                this.waitDict.Remove(rr.Guid);
390            }
391        }
392
393        /// <summary>
394        /// Callback for a the synchronized remove method
395        /// </summary>
396        /// <param name="rr"></param>
397        private void OnSynchRemoveCompleted(RemoveResult rr)
398        {
399            ResponseWait rw;
400            if (this.waitDict.TryGetValue(rr.Guid, out rw))
401            {
402                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
403
404                //unblock WaitHandle in the synchronous method
405                rw.WaitHandle.Set();
406                // don't know if this accelerates the system...
407                this.waitDict.Remove(rr.Guid);
408            }
409        }
410
411        #endregion
412
413        /// <summary>
414        /// To log the internal state in the Monitoring Software of P@play
415        /// </summary>
416        public void LogInternalState()
417        {
418            if (this.dht != null)
419            {
420                this.dht.LogInternalState();
421            }
422        }
423    }
424}
Note: See TracBrowser for help on using the repository browser.