source: trunk/CrypP2P/Internal/P2PBase.cs @ 1589

Last change on this file since 1589 was 1589, checked in by Paul Lelgemann, 12 years ago

+ Added log message for P2P initialization
+ Added locking to P2P methods initialize/connect/disconnect for debugging

  • Removed fix in P2PPublisherBase from r1587
File size: 20.2 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Text;
19using System.Threading;
20using Cryptool.PluginBase;
21using Cryptool.Plugins.PeerToPeer.Internal;
22using Gears4Net;
23using PeersAtPlay;
24using PeersAtPlay.Monitoring;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2PLink.SnalNG;
27using PeersAtPlay.P2POverlay;
28using PeersAtPlay.P2POverlay.Bootstrapper;
29using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
30using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
31using PeersAtPlay.P2POverlay.FullMeshOverlay;
32using PeersAtPlay.P2PStorage.DHT;
33using PeersAtPlay.P2PStorage.FullMeshDHT;
34using PeersAtPlay.Util.Logging;
35
36/* TODO:
37 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
38 * - Testing asynchronous methods incl. EventHandlers
39 */
40
41namespace Cryptool.P2P.Internal
42{
43    /// <summary>
44    ///   Wrapper class to integrate peer@play environment into CrypTool.
45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
46    /// </summary>
47    public class P2PBase
48    {
49        private readonly P2PManager _p2PManager;
50
51        #region Variables
52
53        private readonly object _connectLock = new object();
54        private readonly AutoResetEvent _systemJoined;
55        private readonly AutoResetEvent _systemLeft;
56        private IBootstrapper _bootstrapper;
57        private IP2PLinkManager _linkmanager;
58        private P2POverlay _overlay;
59        internal IDHT Dht;
60        internal IVersionedDHT VersionedDht;
61
62        /// <summary>
63        ///   True if system was successfully joined, false if system is COMPLETELY left
64        /// </summary>
65        public bool Started { get; private set; }
66
67        /// <summary>
68        ///   True if the underlying peer to peer system has been fully initialized
69        /// </summary>
70        public bool Initialized { get; private set; }
71
72        #endregion
73
74        #region Delegates
75
76        public event P2PMessageReceived OnP2PMessageReceived;
77        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
78
79        public event SystemJoined OnSystemJoined;
80        public delegate void SystemJoined();
81
82        public event SystemLeft OnSystemLeft;
83        public delegate void SystemLeft();
84
85        #endregion
86
87        public P2PBase(P2PManager p2PManager)
88        {
89            Started = false;
90            Initialized = false;
91
92            _p2PManager = p2PManager;
93            _systemJoined = new AutoResetEvent(false);
94            _systemLeft = new AutoResetEvent(false);
95        }
96
97        #region Basic P2P Methods (Init, Start, Stop)
98
99        /// <summary>
100        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
101        /// </summary>
102        public void Initialize()
103        {
104            lock (_connectLock)
105            {
106                Scheduler scheduler = new STAScheduler("pap");
107
108                switch (P2PSettings.Default.LinkManager)
109                {
110                    case P2PLinkManagerType.Snal:
111                        LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
112
113                        // NAT-Traversal stuff needs a different Snal-Version
114                        _linkmanager = new Snal(scheduler);
115
116                        var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
117                        settings.LoadDefaults();
118                        settings.ConnectInternal = true;
119                        settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
120                        settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
121                        settings.AutoReconnect = false;
122                        settings.NoDelay = false;
123                        settings.ReuseAddress = false;
124                        settings.UseNetworkMonitorServer = true;
125
126                        _linkmanager.Settings = settings;
127                        _linkmanager.ApplicationType = ApplicationType.CrypTool;
128
129                        break;
130                    default:
131                        throw (new NotImplementedException());
132                }
133
134                switch (P2PSettings.Default.Bootstrapper)
135                {
136                    case P2PBootstrapperType.LocalMachineBootstrapper:
137                        // LocalMachineBootstrapper = only local connection (runs only on one machine)
138                        _bootstrapper = new LocalMachineBootstrapper();
139                        break;
140                    case P2PBootstrapperType.IrcBootstrapper:
141                        // setup nat traversal stuff
142                        LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
143                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
144                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
145                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
146
147                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
148                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
149
150                        _bootstrapper = new IrcBootstrapper(scheduler);
151                        break;
152                    default:
153                        throw (new NotImplementedException());
154                }
155
156                switch (P2PSettings.Default.Overlay)
157                {
158                    case P2POverlayType.FullMeshOverlay:
159                        // changing overlay example: this.overlay = new ChordOverlay();
160                        _overlay = new FullMeshOverlay(scheduler);
161                        break;
162                    default:
163                        throw (new NotImplementedException());
164                }
165
166                switch (P2PSettings.Default.Dht)
167                {
168                    case P2PDHTType.FullMeshDHT:
169                        Dht = new FullMeshDHT(scheduler);
170                        break;
171                    default:
172                        throw (new NotImplementedException());
173                }
174
175                _overlay.MessageReceived += OverlayMessageReceived;
176                Dht.SystemJoined += OnDhtSystemJoined;
177                Dht.SystemLeft += OnDhtSystemLeft;
178
179                VersionedDht = (IVersionedDHT) Dht;
180
181                _p2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
182                                                  NotificationLevel.Info);
183                Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
184                               _bootstrapper,
185                               _linkmanager, null);
186
187                Initialized = true;
188            }
189        }
190
191        /// <summary>
192        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
193        ///   inclusive creating the and bootstrapping to the P2P network.
194        ///   In either case joining the P2P world.
195        ///   This synchronized method returns true not before the peer has
196        ///   successfully joined the network (this may take one or two minutes).
197        /// </summary>
198        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
199        /// After validating the settings, this can be done by calling Initialize().</exception>
200        /// <returns>True, if the peer has completely joined the p2p network</returns>
201        public bool SynchStart()
202        {
203            lock (_connectLock)
204            {
205                if (!Initialized)
206                {
207                    throw new InvalidOperationException("Peer-to-peer is not initialized.");
208                }
209
210                if (Started)
211                {
212                    return true;
213                }
214
215                Dht.BeginStart(BeginStartEventHandler);
216
217                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
218                _systemJoined.WaitOne();
219                _p2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
220
221                return true;
222            }
223        }
224
225        private void BeginStartEventHandler(DHTEventArgs eventArgs)
226        {
227            _p2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
228        }
229
230        /// <summary>
231        ///   Disconnects from the peer-to-peer system.
232        /// </summary>
233        /// <returns>True, if the peer has completely left the p2p network</returns>
234        public bool SynchStop()
235        {
236            lock (_connectLock)
237            {
238                if (Dht == null) return false;
239
240                Dht.BeginStop(null);
241
242                // wait till systemLeft Event is invoked
243                _systemLeft.WaitOne();
244
245                return true;
246            }
247        }
248
249        #endregion
250
251        #region Peer related method (GetPeerId, Send message to peer)
252
253        /// <summary>
254        ///   Get PeerName of the actual peer
255        /// </summary>
256        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
257        /// <returns>PeerID as a String</returns>
258        public PeerId GetPeerId(out string sPeerName)
259        {
260            sPeerName = _linkmanager.UserName;
261            return new PeerId(_overlay.LocalAddress);
262        }
263
264        /// <summary>
265        ///   Construct PeerId object for a specific byte[] id
266        /// </summary>
267        /// <param name = "byteId">overlay address as byte array</param>
268        /// <returns>corresponding PeerId for given byte[] id</returns>
269        public PeerId GetPeerId(byte[] byteId)
270        {
271            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
272            return new PeerId(_overlay.GetAddress(byteId));
273        }
274
275        // overlay.LocalAddress = Overlay-Peer-Address/Names
276        public void SendToPeer(byte[] data, byte[] destinationPeer)
277        {
278            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
279            var realStackSize = _overlay.GetHeaderSize() + data.Length;
280
281            var stackData = new ByteStack(realStackSize);
282            stackData.Push(data);
283
284            var destinationAddr = _overlay.GetAddress(destinationPeer);
285            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
286                                                _overlay.LocalAddress, destinationAddr, stackData);
287
288            _overlay.Send(overlayMsg);
289        }
290
291        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
292        {
293            if (OnP2PMessageReceived == null) return;
294
295            var pid = new PeerId(e.Message.Source);
296            /* You have to fire this event asynchronous, because the main
297                 * thread will be stopped in this wrapper class for synchronizing
298                 * the asynchronous stuff (AutoResetEvent) --> so this could run
299                 * into a deadlock, when you fire this event synchronous (normal Invoke)
300                 * ATTENTION: This could change the invocation order!!! In my case
301                              no problem, but maybe in future cases... */
302
303            // TODO: not safe: The delegate must have only one target
304            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
305
306            foreach (var del in OnP2PMessageReceived.GetInvocationList())
307            {
308                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
309            }
310        }
311
312        #endregion
313
314        #region Event Handling (System Joined, Left and Message Received)
315
316        private void OnDhtSystemJoined(object sender, EventArgs e)
317        {
318            if (OnSystemJoined != null)
319                OnSystemJoined();
320
321            _systemJoined.Set();
322            Started = true;
323        }
324
325        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
326        {
327            if (OnSystemLeft != null)
328                OnSystemLeft();
329
330            Started = false;
331            Initialized = false;
332
333            // Allow new connection to start and check for waiting / blocked tasks
334            _p2PManager.IsP2PConnecting = false;
335            _systemLeft.Set();
336            _systemJoined.Set();
337
338            LogToMonitor("CrypP2P left the system.");
339        }
340
341        #endregion
342
343        #region Synchronous Methods + their Callbacks
344
345        /// <summary>
346        ///   Stores a value in the DHT at the given key
347        /// </summary>
348        /// <param name = "key">Key of DHT Entry</param>
349        /// <param name = "data">Value of DHT Entry</param>
350        /// <returns>True, when storing is completed!</returns>
351        public bool SynchStore(string key, byte[] data)
352        {
353            var autoResetEvent = new AutoResetEvent(false);
354
355            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
356            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
357
358            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
359            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
360
361            // blocking till response
362            autoResetEvent.WaitOne();
363
364            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
365
366            return responseWait.success;
367        }
368
369        /// <summary>
370        ///   Stores a value in the DHT at the given key
371        /// </summary>
372        /// <param name = "key">Key of DHT Entry</param>
373        /// <param name = "value">Value of DHT Entry</param>
374        /// <returns>True, when storing is completed!</returns>
375        public bool SynchStore(string key, string value)
376        {
377            return SynchStore(key, Encoding.UTF8.GetBytes(value));
378        }
379
380        /// <summary>
381        ///   Callback for a the synchronized store method
382        /// </summary>
383        /// <param name = "storeResult">retrieved data container</param>
384        private static void OnSynchStoreCompleted(StoreResult storeResult)
385        {
386            var responseWait = storeResult.AsyncState as ResponseWait;
387            if (responseWait == null)
388            {
389                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
390                return;
391            }
392
393            responseWait.success = storeResult.Status != OperationStatus.KeyNotFound;
394            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
395
396            // unblock WaitHandle in the synchronous method
397            responseWait.WaitHandle.Set();
398
399            LogToMonitor("Received and handled OnSynchStoreCompleted.");
400        }
401
402        /// <summary>
403        ///   Get the value of the given DHT Key or null, if it doesn't exist.
404        /// </summary>
405        /// <param name = "key">Key of DHT Entry</param>
406        /// <returns>Value of DHT Entry</returns>
407        public byte[] SynchRetrieve(string key)
408        {
409            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
410
411            var autoResetEvent = new AutoResetEvent(false);
412            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
413            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
414
415            // blocking till response
416            autoResetEvent.WaitOne();
417
418            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
419
420            return responseWait.Message;
421        }
422
423        /// <summary>
424        ///   Callback for a the synchronized retrieval method
425        /// </summary>
426        /// <param name = "retrieveResult"></param>
427        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
428        {
429            var responseWait = retrieveResult.AsyncState as ResponseWait;
430            if (responseWait == null)
431            {
432                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
433                return;
434            }
435
436            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
437
438            switch (retrieveResult.Status)
439            {
440                case OperationStatus.Success:
441                    responseWait.success = true;
442                    responseWait.Message = retrieveResult.Data;
443                    break;
444                case OperationStatus.KeyNotFound:
445                    responseWait.success = true;
446                    responseWait.Message = null;
447                    break;
448                default:
449                    responseWait.success = false;
450                    responseWait.Message = null;
451                    break;
452            }
453
454            // unblock WaitHandle in the synchronous method
455            responseWait.WaitHandle.Set();
456
457            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
458        }
459
460        /// <summary>
461        ///   Removes a key/value pair out of the DHT
462        /// </summary>
463        /// <param name = "key">Key of the DHT Entry</param>
464        /// <returns>True, when removing is completed!</returns>
465        public bool SynchRemove(string key)
466        {
467            LogToMonitor("Begin SynchRemove. Key: " + key);
468
469            var autoResetEvent = new AutoResetEvent(false);
470            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
471            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
472
473            // blocking till response
474            autoResetEvent.WaitOne();
475
476            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
477
478            return responseWait.success;
479        }
480
481        /// <summary>
482        ///   Callback for a the synchronized remove method
483        /// </summary>
484        /// <param name = "removeResult"></param>
485        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
486        {
487            var responseWait = removeResult.AsyncState as ResponseWait;
488            if (responseWait == null)
489            {
490                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
491                return;
492            }
493
494            responseWait.success = removeResult.Status == OperationStatus.Success;
495            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
496
497            // unblock WaitHandle in the synchronous method
498            responseWait.WaitHandle.Set();
499
500            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
501        }
502
503        #endregion
504
505        #region Log facility
506
507        /// <summary>
508        ///   To log the internal state in the Monitoring Software of P@play
509        /// </summary>
510        public void LogInternalState()
511        {
512            if (Dht != null)
513            {
514                Dht.LogInternalState();
515            }
516        }
517
518        private static void LogToMonitor(string sTextToLog)
519        {
520            if (P2PSettings.Default.Log2Monitor)
521                Log.Debug(sTextToLog);
522        }
523
524        #endregion
525    }
526}
Note: See TracBrowser for help on using the repository browser.