source: trunk/CrypP2P/Internal/P2PBase.cs @ 1545

Last change on this file since 1545 was 1545, checked in by Paul Lelgemann, 12 years ago

o CrypP2P: refactored P2PBase, removed race condition and unused code

File size: 18.6 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Text;
19using System.Threading;
20using Cryptool.Plugins.PeerToPeer.Internal;
21using Gears4Net;
22using PeersAtPlay;
23using PeersAtPlay.Monitoring;
24using PeersAtPlay.P2PLink;
25using PeersAtPlay.P2PLink.SnalNG;
26using PeersAtPlay.P2POverlay;
27using PeersAtPlay.P2POverlay.Bootstrapper;
28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
29using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
30using PeersAtPlay.P2POverlay.FullMeshOverlay;
31using PeersAtPlay.P2PStorage.DHT;
32using PeersAtPlay.P2PStorage.FullMeshDHT;
33using PeersAtPlay.Util.Logging;
34using Settings = PeersAtPlay.P2PLink.SnalNG.Settings;
35
36/* TODO:
37 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
38 * - Testing asynchronous methods incl. EventHandlers
39 */
40
41namespace Cryptool.P2P.Internal
42{
43    /// <summary>
44    ///   Wrapper class to integrate peer@play environment into CrypTool.
45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
46    /// </summary>
47    public class P2PBase
48    {
49        #region Variables
50
51        private readonly AutoResetEvent _systemJoined;
52        private readonly AutoResetEvent _systemLeft;
53        private IBootstrapper _bootstrapper;
54        private IDHT _dht;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        private IVersionedDHT _versionedDht;
58
59        /// <summary>
60        ///   True if system was successfully joined, false if system is COMPLETELY left
61        /// </summary>
62        public bool Started { get; private set; }
63
64        /// <summary>
65        ///   True if the underlying peer to peer system has been fully initialized
66        /// </summary>
67        public bool Initialized { get; private set; }
68
69        #endregion
70
71        #region Delegates
72
73        public event P2PMessageReceived OnP2PMessageReceived;
74        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
75
76        public event SystemJoined OnSystemJoined;
77        public delegate void SystemJoined();
78
79        public event SystemLeft OnSystemLeft;
80        public delegate void SystemLeft();
81
82        #endregion
83
84        public P2PBase()
85        {
86            Started = false;
87            Initialized = false;
88
89            _systemJoined = new AutoResetEvent(false);
90            _systemLeft = new AutoResetEvent(false);
91        }
92
93        #region Basic P2P Methods (Init, Start, Stop)
94
95        /// <summary>
96        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
97        /// </summary>
98        public void Initialize()
99        {
100            Scheduler scheduler = new STAScheduler("pap");
101
102            switch (P2PSettings.Default.LinkManager)
103            {
104                case P2PLinkManagerType.Snal:
105                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
106
107                    // NAT-Traversal stuff needs a different Snal-Version
108                    _linkmanager = new Snal(scheduler);
109
110                    var settings = new Settings();
111                    settings.LoadDefaults();
112                    settings.ConnectInternal = true;
113                    settings.LocalReceivingPort = 0;
114                    settings.UseLocalAddressDetection = false;
115                    settings.AutoReconnect = false;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119
120                    _linkmanager.Settings = settings;
121                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
122
123                    break;
124                default:
125                    throw (new NotImplementedException());
126            }
127
128            switch (P2PSettings.Default.Bootstrapper)
129            {
130                case P2PBootstrapperType.LocalMachineBootstrapper:
131                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
132                    _bootstrapper = new LocalMachineBootstrapper();
133                    break;
134                case P2PBootstrapperType.IrcBootstrapper:
135                    // setup nat traversal stuff
136                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
137                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
138                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
139                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
140
141                    _bootstrapper = new IrcBootstrapper(scheduler);
142                    break;
143                default:
144                    throw (new NotImplementedException());
145            }
146
147            switch (P2PSettings.Default.Overlay)
148            {
149                case P2POverlayType.FullMeshOverlay:
150                    // changing overlay example: this.overlay = new ChordOverlay();
151                    _overlay = new FullMeshOverlay(scheduler);
152                    break;
153                default:
154                    throw (new NotImplementedException());
155            }
156
157            switch (P2PSettings.Default.Dht)
158            {
159                case P2PDHTType.FullMeshDHT:
160                    _dht = new FullMeshDHT(scheduler);
161                    break;
162                default:
163                    throw (new NotImplementedException());
164            }
165
166            _overlay.MessageReceived += OverlayMessageReceived;
167            _dht.SystemJoined += OnDhtSystemJoined;
168            _dht.SystemLeft += OnDhtSystemLeft;
169
170            _versionedDht = (IVersionedDHT) _dht;
171
172            _dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay, 
173                            _bootstrapper, 
174                            _linkmanager, null);
175
176            Initialized = true;
177        }
178
179        /// <summary>
180        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
181        ///   inclusive creating the and bootstrapping to the P2P network.
182        ///   In either case joining the P2P world.
183        ///   This synchronized method returns true not before the peer has
184        ///   successfully joined the network (this may take one or two minutes).
185        /// </summary>
186        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
187        /// After validating the settings, this can be done by calling Initialize().</exception>
188        /// <returns>True, if the peer has completely joined the p2p network</returns>
189        public bool SynchStart()
190        {
191            if (!Initialized)
192            {
193                throw new InvalidOperationException("Peer-to-peer is not initialized.");
194            }
195
196            if (Started)
197            {
198                return true;
199            }
200
201            _dht.BeginStart(null);
202
203            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
204            _systemJoined.WaitOne();
205
206            return true;
207        }
208
209        /// <summary>
210        ///   Disconnects from the peer-to-peer system.
211        /// </summary>
212        /// <returns>True, if the peer has completely left the p2p network</returns>
213        public bool SynchStop()
214        {
215            if (_dht == null) return false;
216
217            _dht.BeginStop(null);
218
219            // wait till systemLeft Event is invoked
220            _systemLeft.WaitOne();
221
222            return true;
223        }
224
225        #endregion
226
227        #region Peer related method (GetPeerId, Send message to peer)
228
229        /// <summary>
230        ///   Get PeerName of the actual peer
231        /// </summary>
232        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
233        /// <returns>PeerID as a String</returns>
234        public PeerId GetPeerId(out string sPeerName)
235        {
236            sPeerName = _linkmanager.UserName;
237            return new PeerId(_overlay.LocalAddress);
238        }
239
240        /// <summary>
241        ///   Construct PeerId object for a specific byte[] id
242        /// </summary>
243        /// <param name = "byteId">overlay address as byte array</param>
244        /// <returns>corresponding PeerId for given byte[] id</returns>
245        public PeerId GetPeerId(byte[] byteId)
246        {
247            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
248            return new PeerId(_overlay.GetAddress(byteId));
249        }
250
251        // overlay.LocalAddress = Overlay-Peer-Address/Names
252        public void SendToPeer(byte[] data, byte[] destinationPeer)
253        {
254            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
255            var realStackSize = _overlay.GetHeaderSize() + data.Length;
256
257            var stackData = new ByteStack(realStackSize);
258            stackData.Push(data);
259
260            var destinationAddr = _overlay.GetAddress(destinationPeer);
261            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
262                                                _overlay.LocalAddress, destinationAddr, stackData);
263
264            _overlay.Send(overlayMsg);
265        }
266
267        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
268        {
269            if (OnP2PMessageReceived == null) return;
270
271            var pid = new PeerId(e.Message.Source);
272            /* You have to fire this event asynchronous, because the main
273                 * thread will be stopped in this wrapper class for synchronizing
274                 * the asynchronous stuff (AutoResetEvent) --> so this could run
275                 * into a deadlock, when you fire this event synchronous (normal Invoke)
276                 * ATTENTION: This could change the invocation order!!! In my case
277                              no problem, but maybe in future cases... */
278
279            // TODO: not safe: The delegate must have only one target
280            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
281
282            foreach (var del in OnP2PMessageReceived.GetInvocationList())
283            {
284                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
285            }
286        }
287
288        #endregion
289
290        #region Event Handling (System Joined, Left and Message Received)
291
292        private void OnDhtSystemJoined(object sender, EventArgs e)
293        {
294            if (OnSystemJoined != null)
295                OnSystemJoined();
296
297            _systemJoined.Set();
298            Started = true;
299        }
300
301        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
302        {
303            if (OnSystemLeft != null)
304                OnSystemLeft();
305
306            // as an experiment
307            _dht = null;
308
309            _systemLeft.Set();
310            Started = false;
311            Initialized = false;
312
313            LogToMonitor("CrypP2P left the system.");
314        }
315
316        #endregion
317
318        #region Synchronous Methods + their Callbacks
319
320        /// <summary>
321        ///   Stores a value in the DHT at the given key
322        /// </summary>
323        /// <param name = "key">Key of DHT Entry</param>
324        /// <param name = "data">Value of DHT Entry</param>
325        /// <returns>True, when storing is completed!</returns>
326        public bool SynchStore(string key, byte[] data)
327        {
328            var autoResetEvent = new AutoResetEvent(false);
329
330            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
331            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
332
333            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
334            _versionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
335
336            // blocking till response
337            autoResetEvent.WaitOne();
338
339            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
340
341            return responseWait.success;
342        }
343
344        /// <summary>
345        ///   Stores a value in the DHT at the given key
346        /// </summary>
347        /// <param name = "key">Key of DHT Entry</param>
348        /// <param name = "value">Value of DHT Entry</param>
349        /// <returns>True, when storing is completed!</returns>
350        public bool SynchStore(string key, string value)
351        {
352            return SynchStore(key, Encoding.UTF8.GetBytes(value));
353        }
354
355        /// <summary>
356        ///   Callback for a the synchronized store method
357        /// </summary>
358        /// <param name = "storeResult">retrieved data container</param>
359        private static void OnSynchStoreCompleted(StoreResult storeResult)
360        {
361            var responseWait = storeResult.AsyncState as ResponseWait;
362            if (responseWait == null)
363            {
364                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
365                return;
366            }
367
368            responseWait.success = storeResult.Status != OperationStatus.KeyNotFound;
369            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
370
371            // unblock WaitHandle in the synchronous method
372            responseWait.WaitHandle.Set();
373
374            LogToMonitor("Received and handled OnSynchStoreCompleted.");
375        }
376
377        /// <summary>
378        ///   Get the value of the given DHT Key or null, if it doesn't exist.
379        /// </summary>
380        /// <param name = "key">Key of DHT Entry</param>
381        /// <returns>Value of DHT Entry</returns>
382        public byte[] SynchRetrieve(string key)
383        {
384            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
385
386            var autoResetEvent = new AutoResetEvent(false);
387            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
388            _dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
389
390            // blocking till response
391            autoResetEvent.WaitOne();
392
393            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
394
395            return responseWait.Message;
396        }
397
398        /// <summary>
399        ///   Callback for a the synchronized retrieval method
400        /// </summary>
401        /// <param name = "retrieveResult"></param>
402        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
403        {
404            var responseWait = retrieveResult.AsyncState as ResponseWait;
405            if (responseWait == null)
406            {
407                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
408                return;
409            }
410
411            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
412
413            switch (retrieveResult.Status)
414            {
415                case OperationStatus.Success:
416                    responseWait.success = true;
417                    responseWait.Message = retrieveResult.Data;
418                    break;
419                case OperationStatus.KeyNotFound:
420                    responseWait.success = true;
421                    responseWait.Message = null;
422                    break;
423                default:
424                    responseWait.success = false;
425                    responseWait.Message = null;
426                    break;
427            }
428
429            // unblock WaitHandle in the synchronous method
430            responseWait.WaitHandle.Set();
431
432            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
433        }
434
435        /// <summary>
436        ///   Removes a key/value pair out of the DHT
437        /// </summary>
438        /// <param name = "key">Key of the DHT Entry</param>
439        /// <returns>True, when removing is completed!</returns>
440        public bool SynchRemove(string key)
441        {
442            LogToMonitor("Begin SynchRemove. Key: " + key);
443
444            var autoResetEvent = new AutoResetEvent(false);
445            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
446            _versionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
447
448            // blocking till response
449            autoResetEvent.WaitOne();
450
451            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
452
453            return responseWait.success;
454        }
455
456        /// <summary>
457        ///   Callback for a the synchronized remove method
458        /// </summary>
459        /// <param name = "removeResult"></param>
460        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
461        {
462            var responseWait = removeResult.AsyncState as ResponseWait;
463            if (responseWait == null)
464            {
465                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
466                return;
467            }
468
469            responseWait.success = removeResult.Status == OperationStatus.Success;
470            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
471
472            // unblock WaitHandle in the synchronous method
473            responseWait.WaitHandle.Set();
474
475            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
476        }
477
478        #endregion
479
480        #region Log facility
481
482        /// <summary>
483        ///   To log the internal state in the Monitoring Software of P@play
484        /// </summary>
485        public void LogInternalState()
486        {
487            if (_dht != null)
488            {
489                _dht.LogInternalState();
490            }
491        }
492
493        private static void LogToMonitor(string sTextToLog)
494        {
495            if (P2PSettings.Default.Log2Monitor)
496                Log.Debug(sTextToLog);
497        }
498
499        #endregion
500    }
501}
Note: See TracBrowser for help on using the repository browser.