source: trunk/CrypP2P/Internal/P2PBase.cs @ 1603

Last change on this file since 1603 was 1603, checked in by Paul Lelgemann, 11 years ago

o Updated Peers@Play libraries

File size: 20.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Text;
19using System.Threading;
20using Cryptool.PluginBase;
21using Cryptool.Plugins.PeerToPeer.Internal;
22using Gears4Net;
23using PeersAtPlay;
24using PeersAtPlay.Monitoring;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2PLink.SnalNG;
27using PeersAtPlay.P2POverlay;
28using PeersAtPlay.P2POverlay.Bootstrapper;
29using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
30using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
31using PeersAtPlay.P2POverlay.FullMeshOverlay;
32using PeersAtPlay.P2PStorage.DHT;
33using PeersAtPlay.P2PStorage.FullMeshDHT;
34using PeersAtPlay.Util.Logging;
35
36/* TODO:
37 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
38 * - Testing asynchronous methods incl. EventHandlers
39 */
40
41namespace Cryptool.P2P.Internal
42{
43    /// <summary>
44    ///   Wrapper class to integrate peer@play environment into CrypTool.
45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
46    /// </summary>
47    public class P2PBase
48    {
49        private readonly P2PManager _p2PManager;
50
51        #region Variables
52
53        private readonly object _connectLock = new object();
54        private readonly AutoResetEvent _systemJoined;
55        private readonly AutoResetEvent _systemLeft;
56        private IBootstrapper _bootstrapper;
57        private IP2PLinkManager _linkmanager;
58        private P2POverlay _overlay;
59        internal IDHT Dht;
60        internal IVersionedDHT VersionedDht;
61
62        /// <summary>
63        ///   True if system was successfully joined, false if system is COMPLETELY left
64        /// </summary>
65        public bool Started { get; private set; }
66
67        /// <summary>
68        ///   True if the underlying peer to peer system has been fully initialized
69        /// </summary>
70        public bool Initialized { get; private set; }
71
72        #endregion
73
74        #region Delegates
75
76        public event P2PMessageReceived OnP2PMessageReceived;
77        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
78
79        public event SystemJoined OnSystemJoined;
80        public delegate void SystemJoined();
81
82        public event SystemLeft OnSystemLeft;
83        public delegate void SystemLeft();
84
85        #endregion
86
87        public P2PBase(P2PManager p2PManager)
88        {
89            Started = false;
90            Initialized = false;
91
92            _p2PManager = p2PManager;
93            _systemJoined = new AutoResetEvent(false);
94            _systemLeft = new AutoResetEvent(false);
95        }
96
97        #region Basic P2P Methods (Init, Start, Stop)
98
99        /// <summary>
100        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
101        /// </summary>
102        public void Initialize()
103        {
104            lock (_connectLock)
105            {
106                Scheduler scheduler = new STAScheduler("pap");
107
108                switch (P2PSettings.Default.LinkManager)
109                {
110                    case P2PLinkManagerType.Snal:
111                        LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
112
113                        // NAT-Traversal stuff needs a different Snal-Version
114                        _linkmanager = new Snal(scheduler);
115
116                        var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
117                        settings.LoadDefaults();
118                        settings.ConnectInternal = true;
119                        settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
120                        settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
121                        settings.AutoReconnect = false;
122                        settings.NoDelay = false;
123                        settings.ReuseAddress = false;
124                        settings.UseNetworkMonitorServer = true;
125                        settings.CloseConnectionAfterPingTimeout = false;
126
127                        _linkmanager.Settings = settings;
128                        _linkmanager.ApplicationType = ApplicationType.CrypTool;
129
130                        break;
131                    default:
132                        throw (new NotImplementedException());
133                }
134
135                switch (P2PSettings.Default.Bootstrapper)
136                {
137                    case P2PBootstrapperType.LocalMachineBootstrapper:
138                        // LocalMachineBootstrapper = only local connection (runs only on one machine)
139                        _bootstrapper = new LocalMachineBootstrapper();
140                        break;
141                    case P2PBootstrapperType.IrcBootstrapper:
142                        // setup nat traversal stuff
143                        LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
144                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
145                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
146                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
147
148                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
149                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
150
151                        _bootstrapper = new IrcBootstrapper(scheduler);
152                        break;
153                    default:
154                        throw (new NotImplementedException());
155                }
156
157                switch (P2PSettings.Default.Overlay)
158                {
159                    case P2POverlayType.FullMeshOverlay:
160                        // changing overlay example: this.overlay = new ChordOverlay();
161                        _overlay = new FullMeshOverlay(scheduler);
162                        break;
163                    default:
164                        throw (new NotImplementedException());
165                }
166
167                switch (P2PSettings.Default.Dht)
168                {
169                    case P2PDHTType.FullMeshDHT:
170                        Dht = new FullMeshDHT(scheduler);
171                        break;
172                    default:
173                        throw (new NotImplementedException());
174                }
175
176                _overlay.MessageReceived += OverlayMessageReceived;
177                Dht.SystemJoined += OnDhtSystemJoined;
178                Dht.SystemLeft += OnDhtSystemLeft;
179
180                VersionedDht = (IVersionedDHT) Dht;
181
182                _p2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
183                                                  NotificationLevel.Info);
184                Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
185                               _bootstrapper,
186                               _linkmanager, null);
187
188                Initialized = true;
189            }
190        }
191
192        /// <summary>
193        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
194        ///   inclusive creating the and bootstrapping to the P2P network.
195        ///   In either case joining the P2P world.
196        ///   This synchronized method returns true not before the peer has
197        ///   successfully joined the network (this may take one or two minutes).
198        /// </summary>
199        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
200        /// After validating the settings, this can be done by calling Initialize().</exception>
201        /// <returns>True, if the peer has completely joined the p2p network</returns>
202        public bool SynchStart()
203        {
204            lock (_connectLock)
205            {
206                if (!Initialized)
207                {
208                    throw new InvalidOperationException("Peer-to-peer is not initialized.");
209                }
210
211                if (Started)
212                {
213                    return true;
214                }
215
216                Dht.BeginStart(BeginStartEventHandler);
217
218                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
219                _systemJoined.WaitOne();
220                _p2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
221
222                return true;
223            }
224        }
225
226        private void BeginStartEventHandler(DHTEventArgs eventArgs)
227        {
228            _p2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
229        }
230
231        /// <summary>
232        ///   Disconnects from the peer-to-peer system.
233        /// </summary>
234        /// <returns>True, if the peer has completely left the p2p network</returns>
235        public bool SynchStop()
236        {
237            lock (_connectLock)
238            {
239                if (Dht == null) return false;
240
241                Dht.BeginStop(null);
242
243                // wait till systemLeft Event is invoked
244                _systemLeft.WaitOne();
245
246                return true;
247            }
248        }
249
250        #endregion
251
252        #region Peer related method (GetPeerId, Send message to peer)
253
254        /// <summary>
255        ///   Get PeerName of the actual peer
256        /// </summary>
257        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
258        /// <returns>PeerID as a String</returns>
259        public PeerId GetPeerId(out string sPeerName)
260        {
261            sPeerName = _linkmanager.UserName;
262            return new PeerId(_overlay.LocalAddress);
263        }
264
265        /// <summary>
266        ///   Construct PeerId object for a specific byte[] id
267        /// </summary>
268        /// <param name = "byteId">overlay address as byte array</param>
269        /// <returns>corresponding PeerId for given byte[] id</returns>
270        public PeerId GetPeerId(byte[] byteId)
271        {
272            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
273            return new PeerId(_overlay.GetAddress(byteId));
274        }
275
276        // overlay.LocalAddress = Overlay-Peer-Address/Names
277        public void SendToPeer(byte[] data, byte[] destinationPeer)
278        {
279            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
280            var realStackSize = _overlay.GetHeaderSize() + data.Length;
281
282            var stackData = new ByteStack(realStackSize);
283            stackData.Push(data);
284
285            var destinationAddr = _overlay.GetAddress(destinationPeer);
286            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
287                                                _overlay.LocalAddress, destinationAddr, stackData);
288
289            _overlay.Send(overlayMsg);
290        }
291
292        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
293        {
294            if (OnP2PMessageReceived == null) return;
295
296            var pid = new PeerId(e.Message.Source);
297            /* You have to fire this event asynchronous, because the main
298                 * thread will be stopped in this wrapper class for synchronizing
299                 * the asynchronous stuff (AutoResetEvent) --> so this could run
300                 * into a deadlock, when you fire this event synchronous (normal Invoke)
301                 * ATTENTION: This could change the invocation order!!! In my case
302                              no problem, but maybe in future cases... */
303
304            // TODO: not safe: The delegate must have only one target
305            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
306
307            foreach (var del in OnP2PMessageReceived.GetInvocationList())
308            {
309                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
310            }
311        }
312
313        #endregion
314
315        #region Event Handling (System Joined, Left and Message Received)
316
317        private void OnDhtSystemJoined(object sender, EventArgs e)
318        {
319            if (OnSystemJoined != null)
320                OnSystemJoined();
321
322            _systemJoined.Set();
323            Started = true;
324        }
325
326        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
327        {
328            if (OnSystemLeft != null)
329                OnSystemLeft();
330
331            Started = false;
332            Initialized = false;
333
334            // Allow new connection to start and check for waiting / blocked tasks
335            _p2PManager.IsP2PConnecting = false;
336            _systemLeft.Set();
337            _systemJoined.Set();
338
339            LogToMonitor("CrypP2P left the system.");
340        }
341
342        #endregion
343
344        #region Synchronous Methods + their Callbacks
345
346        /// <summary>
347        ///   Stores a value in the DHT at the given key
348        /// </summary>
349        /// <param name = "key">Key of DHT Entry</param>
350        /// <param name = "data">Value of DHT Entry</param>
351        /// <returns>True, when storing is completed!</returns>
352        public bool SynchStore(string key, byte[] data)
353        {
354            var autoResetEvent = new AutoResetEvent(false);
355
356            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
357            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
358
359            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
360            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
361
362            // blocking till response
363            autoResetEvent.WaitOne();
364
365            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
366
367            return responseWait.success;
368        }
369
370        /// <summary>
371        ///   Stores a value in the DHT at the given key
372        /// </summary>
373        /// <param name = "key">Key of DHT Entry</param>
374        /// <param name = "value">Value of DHT Entry</param>
375        /// <returns>True, when storing is completed!</returns>
376        public bool SynchStore(string key, string value)
377        {
378            return SynchStore(key, Encoding.UTF8.GetBytes(value));
379        }
380
381        /// <summary>
382        ///   Callback for a the synchronized store method
383        /// </summary>
384        /// <param name = "storeResult">retrieved data container</param>
385        private static void OnSynchStoreCompleted(StoreResult storeResult)
386        {
387            var responseWait = storeResult.AsyncState as ResponseWait;
388            if (responseWait == null)
389            {
390                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
391                return;
392            }
393
394            responseWait.success = storeResult.Status != OperationStatus.KeyNotFound;
395            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
396
397            // unblock WaitHandle in the synchronous method
398            responseWait.WaitHandle.Set();
399
400            LogToMonitor("Received and handled OnSynchStoreCompleted.");
401        }
402
403        /// <summary>
404        ///   Get the value of the given DHT Key or null, if it doesn't exist.
405        /// </summary>
406        /// <param name = "key">Key of DHT Entry</param>
407        /// <returns>Value of DHT Entry</returns>
408        public byte[] SynchRetrieve(string key)
409        {
410            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
411
412            var autoResetEvent = new AutoResetEvent(false);
413            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
414            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
415
416            // blocking till response
417            autoResetEvent.WaitOne();
418
419            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
420
421            return responseWait.Message;
422        }
423
424        /// <summary>
425        ///   Callback for a the synchronized retrieval method
426        /// </summary>
427        /// <param name = "retrieveResult"></param>
428        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
429        {
430            var responseWait = retrieveResult.AsyncState as ResponseWait;
431            if (responseWait == null)
432            {
433                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
434                return;
435            }
436
437            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
438
439            switch (retrieveResult.Status)
440            {
441                case OperationStatus.Success:
442                    responseWait.success = true;
443                    responseWait.Message = retrieveResult.Data;
444                    break;
445                case OperationStatus.KeyNotFound:
446                    responseWait.success = true;
447                    responseWait.Message = null;
448                    break;
449                default:
450                    responseWait.success = false;
451                    responseWait.Message = null;
452                    break;
453            }
454
455            // unblock WaitHandle in the synchronous method
456            responseWait.WaitHandle.Set();
457
458            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
459        }
460
461        /// <summary>
462        ///   Removes a key/value pair out of the DHT
463        /// </summary>
464        /// <param name = "key">Key of the DHT Entry</param>
465        /// <returns>True, when removing is completed!</returns>
466        public bool SynchRemove(string key)
467        {
468            LogToMonitor("Begin SynchRemove. Key: " + key);
469
470            var autoResetEvent = new AutoResetEvent(false);
471            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
472            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
473
474            // blocking till response
475            autoResetEvent.WaitOne();
476
477            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
478
479            return responseWait.success;
480        }
481
482        /// <summary>
483        ///   Callback for a the synchronized remove method
484        /// </summary>
485        /// <param name = "removeResult"></param>
486        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
487        {
488            var responseWait = removeResult.AsyncState as ResponseWait;
489            if (responseWait == null)
490            {
491                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
492                return;
493            }
494
495            responseWait.success = removeResult.Status == OperationStatus.Success;
496            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
497
498            // unblock WaitHandle in the synchronous method
499            responseWait.WaitHandle.Set();
500
501            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
502        }
503
504        #endregion
505
506        #region Log facility
507
508        /// <summary>
509        ///   To log the internal state in the Monitoring Software of P@play
510        /// </summary>
511        public void LogInternalState()
512        {
513            if (Dht != null)
514            {
515                Dht.LogInternalState();
516            }
517        }
518
519        private static void LogToMonitor(string sTextToLog)
520        {
521            if (P2PSettings.Default.Log2Monitor)
522                Log.Debug(sTextToLog);
523        }
524
525        #endregion
526    }
527}
Note: See TracBrowser for help on using the repository browser.