source: trunk/CrypP2P/Internal/P2PBase.cs @ 1604

Last change on this file since 1604 was 1604, checked in by Paul Lelgemann, 12 years ago

+ CrypP2P: Added transport protocol configuration
o VersionedDHT: Fixed bug that reported success instead of failure while storing, if the key was existing

File size: 20.9 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Text;
19using System.Threading;
20using Cryptool.PluginBase;
21using Cryptool.Plugins.PeerToPeer.Internal;
22using Gears4Net;
23using PeersAtPlay;
24using PeersAtPlay.Monitoring;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2PLink.SnalNG;
27using PeersAtPlay.P2POverlay;
28using PeersAtPlay.P2POverlay.Bootstrapper;
29using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
30using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
31using PeersAtPlay.P2POverlay.FullMeshOverlay;
32using PeersAtPlay.P2PStorage.DHT;
33using PeersAtPlay.P2PStorage.FullMeshDHT;
34using PeersAtPlay.Util.Logging;
35
36/* TODO:
37 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
38 * - Testing asynchronous methods incl. EventHandlers
39 */
40
41namespace Cryptool.P2P.Internal
42{
43    /// <summary>
44    ///   Wrapper class to integrate peer@play environment into CrypTool.
45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
46    /// </summary>
47    public class P2PBase
48    {
49        private readonly P2PManager _p2PManager;
50
51        #region Variables
52
53        private readonly object _connectLock = new object();
54        private readonly AutoResetEvent _systemJoined;
55        private readonly AutoResetEvent _systemLeft;
56        private IBootstrapper _bootstrapper;
57        private IP2PLinkManager _linkmanager;
58        private P2POverlay _overlay;
59        internal IDHT Dht;
60        internal IVersionedDHT VersionedDht;
61
62        /// <summary>
63        ///   True if system was successfully joined, false if system is COMPLETELY left
64        /// </summary>
65        public bool Started { get; private set; }
66
67        /// <summary>
68        ///   True if the underlying peer to peer system has been fully initialized
69        /// </summary>
70        public bool Initialized { get; private set; }
71
72        #endregion
73
74        #region Delegates
75
76        public event P2PMessageReceived OnP2PMessageReceived;
77        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
78
79        public event SystemJoined OnSystemJoined;
80        public delegate void SystemJoined();
81
82        public event SystemLeft OnSystemLeft;
83        public delegate void SystemLeft();
84
85        #endregion
86
87        public P2PBase(P2PManager p2PManager)
88        {
89            Started = false;
90            Initialized = false;
91
92            _p2PManager = p2PManager;
93            _systemJoined = new AutoResetEvent(false);
94            _systemLeft = new AutoResetEvent(false);
95        }
96
97        #region Basic P2P Methods (Init, Start, Stop)
98
99        /// <summary>
100        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
101        /// </summary>
102        public void Initialize()
103        {
104            lock (_connectLock)
105            {
106                Scheduler scheduler = new STAScheduler("pap");
107
108                switch (P2PSettings.Default.LinkManager)
109                {
110                    case P2PLinkManagerType.Snal:
111                        LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
112
113                        // NAT-Traversal stuff needs a different Snal-Version
114                        _linkmanager = new Snal(scheduler);
115
116                        var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
117                        settings.LoadDefaults();
118                        settings.ConnectInternal = true;
119                        settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
120                        settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
121                        settings.AutoReconnect = false;
122                        settings.NoDelay = false;
123                        settings.ReuseAddress = false;
124                        settings.UseNetworkMonitorServer = true;
125                        settings.CloseConnectionAfterPingTimeout = false;
126                       
127                        switch(P2PSettings.Default.TransportProtocol)
128                        {
129                            case P2PTransportProtocol.UDP:
130                                settings.TransportProtocol = TransportProtocol.UDP;
131                                break;
132                            case P2PTransportProtocol.TCP_UDP:
133                                settings.TransportProtocol = TransportProtocol.TCP_UDP;
134                                break;
135                            default:
136                                settings.TransportProtocol = TransportProtocol.TCP;
137                                break;
138                        }
139
140                        _linkmanager.Settings = settings;
141                        _linkmanager.ApplicationType = ApplicationType.CrypTool;
142
143                        break;
144                    default:
145                        throw (new NotImplementedException());
146                }
147
148                switch (P2PSettings.Default.Bootstrapper)
149                {
150                    case P2PBootstrapperType.LocalMachineBootstrapper:
151                        // LocalMachineBootstrapper = only local connection (runs only on one machine)
152                        _bootstrapper = new LocalMachineBootstrapper();
153                        break;
154                    case P2PBootstrapperType.IrcBootstrapper:
155                        // setup nat traversal stuff
156                        LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
157                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
158                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
159                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
160
161                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
162                        PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
163
164                        _bootstrapper = new IrcBootstrapper(scheduler);
165                        break;
166                    default:
167                        throw (new NotImplementedException());
168                }
169
170                switch (P2PSettings.Default.Overlay)
171                {
172                    case P2POverlayType.FullMeshOverlay:
173                        // changing overlay example: this.overlay = new ChordOverlay();
174                        _overlay = new FullMeshOverlay(scheduler);
175                        break;
176                    default:
177                        throw (new NotImplementedException());
178                }
179
180                switch (P2PSettings.Default.Dht)
181                {
182                    case P2PDHTType.FullMeshDHT:
183                        Dht = new FullMeshDHT(scheduler);
184                        break;
185                    default:
186                        throw (new NotImplementedException());
187                }
188
189                _overlay.MessageReceived += OverlayMessageReceived;
190                Dht.SystemJoined += OnDhtSystemJoined;
191                Dht.SystemLeft += OnDhtSystemLeft;
192
193                VersionedDht = (IVersionedDHT) Dht;
194
195                _p2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
196                                                  NotificationLevel.Info);
197                Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
198                               _bootstrapper,
199                               _linkmanager, null);
200
201                Initialized = true;
202            }
203        }
204
205        /// <summary>
206        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
207        ///   inclusive creating the and bootstrapping to the P2P network.
208        ///   In either case joining the P2P world.
209        ///   This synchronized method returns true not before the peer has
210        ///   successfully joined the network (this may take one or two minutes).
211        /// </summary>
212        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
213        /// After validating the settings, this can be done by calling Initialize().</exception>
214        /// <returns>True, if the peer has completely joined the p2p network</returns>
215        public bool SynchStart()
216        {
217            lock (_connectLock)
218            {
219                if (!Initialized)
220                {
221                    throw new InvalidOperationException("Peer-to-peer is not initialized.");
222                }
223
224                if (Started)
225                {
226                    return true;
227                }
228
229                Dht.BeginStart(BeginStartEventHandler);
230
231                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
232                _systemJoined.WaitOne();
233                _p2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
234
235                return true;
236            }
237        }
238
239        private void BeginStartEventHandler(DHTEventArgs eventArgs)
240        {
241            _p2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
242        }
243
244        /// <summary>
245        ///   Disconnects from the peer-to-peer system.
246        /// </summary>
247        /// <returns>True, if the peer has completely left the p2p network</returns>
248        public bool SynchStop()
249        {
250            lock (_connectLock)
251            {
252                if (Dht == null) return false;
253
254                Dht.BeginStop(null);
255
256                // wait till systemLeft Event is invoked
257                _systemLeft.WaitOne();
258
259                return true;
260            }
261        }
262
263        #endregion
264
265        #region Peer related method (GetPeerId, Send message to peer)
266
267        /// <summary>
268        ///   Get PeerName of the actual peer
269        /// </summary>
270        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
271        /// <returns>PeerID as a String</returns>
272        public PeerId GetPeerId(out string sPeerName)
273        {
274            sPeerName = _linkmanager.UserName;
275            return new PeerId(_overlay.LocalAddress);
276        }
277
278        /// <summary>
279        ///   Construct PeerId object for a specific byte[] id
280        /// </summary>
281        /// <param name = "byteId">overlay address as byte array</param>
282        /// <returns>corresponding PeerId for given byte[] id</returns>
283        public PeerId GetPeerId(byte[] byteId)
284        {
285            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
286            return new PeerId(_overlay.GetAddress(byteId));
287        }
288
289        // overlay.LocalAddress = Overlay-Peer-Address/Names
290        public void SendToPeer(byte[] data, byte[] destinationPeer)
291        {
292            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
293            var realStackSize = _overlay.GetHeaderSize() + data.Length;
294
295            var stackData = new ByteStack(realStackSize);
296            stackData.Push(data);
297
298            var destinationAddr = _overlay.GetAddress(destinationPeer);
299            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
300                                                _overlay.LocalAddress, destinationAddr, stackData);
301
302            _overlay.Send(overlayMsg);
303        }
304
305        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
306        {
307            if (OnP2PMessageReceived == null) return;
308
309            var pid = new PeerId(e.Message.Source);
310            /* You have to fire this event asynchronous, because the main
311                 * thread will be stopped in this wrapper class for synchronizing
312                 * the asynchronous stuff (AutoResetEvent) --> so this could run
313                 * into a deadlock, when you fire this event synchronous (normal Invoke)
314                 * ATTENTION: This could change the invocation order!!! In my case
315                              no problem, but maybe in future cases... */
316
317            // TODO: not safe: The delegate must have only one target
318            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
319
320            foreach (var del in OnP2PMessageReceived.GetInvocationList())
321            {
322                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
323            }
324        }
325
326        #endregion
327
328        #region Event Handling (System Joined, Left and Message Received)
329
330        private void OnDhtSystemJoined(object sender, EventArgs e)
331        {
332            if (OnSystemJoined != null)
333                OnSystemJoined();
334
335            _systemJoined.Set();
336            Started = true;
337        }
338
339        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
340        {
341            if (OnSystemLeft != null)
342                OnSystemLeft();
343
344            Started = false;
345            Initialized = false;
346
347            // Allow new connection to start and check for waiting / blocked tasks
348            _p2PManager.IsP2PConnecting = false;
349            _systemLeft.Set();
350            _systemJoined.Set();
351
352            LogToMonitor("CrypP2P left the system.");
353        }
354
355        #endregion
356
357        #region Synchronous Methods + their Callbacks
358
359        /// <summary>
360        ///   Stores a value in the DHT at the given key
361        /// </summary>
362        /// <param name = "key">Key of DHT Entry</param>
363        /// <param name = "data">Value of DHT Entry</param>
364        /// <returns>True, when storing is completed!</returns>
365        public bool SynchStore(string key, byte[] data)
366        {
367            var autoResetEvent = new AutoResetEvent(false);
368
369            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
370            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
371
372            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
373            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
374
375            // blocking till response
376            autoResetEvent.WaitOne();
377
378            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
379
380            return responseWait.success;
381        }
382
383        /// <summary>
384        ///   Stores a value in the DHT at the given key
385        /// </summary>
386        /// <param name = "key">Key of DHT Entry</param>
387        /// <param name = "value">Value of DHT Entry</param>
388        /// <returns>True, when storing is completed!</returns>
389        public bool SynchStore(string key, string value)
390        {
391            return SynchStore(key, Encoding.UTF8.GetBytes(value));
392        }
393
394        /// <summary>
395        ///   Callback for a the synchronized store method
396        /// </summary>
397        /// <param name = "storeResult">retrieved data container</param>
398        private static void OnSynchStoreCompleted(StoreResult storeResult)
399        {
400            var responseWait = storeResult.AsyncState as ResponseWait;
401            if (responseWait == null)
402            {
403                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
404                return;
405            }
406
407            responseWait.success = storeResult.Status == OperationStatus.Success;
408            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
409
410            // unblock WaitHandle in the synchronous method
411            responseWait.WaitHandle.Set();
412
413            LogToMonitor("Received and handled OnSynchStoreCompleted.");
414        }
415
416        /// <summary>
417        ///   Get the value of the given DHT Key or null, if it doesn't exist.
418        /// </summary>
419        /// <param name = "key">Key of DHT Entry</param>
420        /// <returns>Value of DHT Entry</returns>
421        public byte[] SynchRetrieve(string key)
422        {
423            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
424
425            var autoResetEvent = new AutoResetEvent(false);
426            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
427            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
428
429            // blocking till response
430            autoResetEvent.WaitOne();
431
432            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
433
434            return responseWait.Message;
435        }
436
437        /// <summary>
438        ///   Callback for a the synchronized retrieval method
439        /// </summary>
440        /// <param name = "retrieveResult"></param>
441        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
442        {
443            var responseWait = retrieveResult.AsyncState as ResponseWait;
444            if (responseWait == null)
445            {
446                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
447                return;
448            }
449
450            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
451
452            switch (retrieveResult.Status)
453            {
454                case OperationStatus.Success:
455                    responseWait.success = true;
456                    responseWait.Message = retrieveResult.Data;
457                    break;
458                case OperationStatus.KeyNotFound:
459                    responseWait.success = true;
460                    responseWait.Message = null;
461                    break;
462                default:
463                    responseWait.success = false;
464                    responseWait.Message = null;
465                    break;
466            }
467
468            // unblock WaitHandle in the synchronous method
469            responseWait.WaitHandle.Set();
470
471            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
472        }
473
474        /// <summary>
475        ///   Removes a key/value pair out of the DHT
476        /// </summary>
477        /// <param name = "key">Key of the DHT Entry</param>
478        /// <returns>True, when removing is completed!</returns>
479        public bool SynchRemove(string key)
480        {
481            LogToMonitor("Begin SynchRemove. Key: " + key);
482
483            var autoResetEvent = new AutoResetEvent(false);
484            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
485            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
486
487            // blocking till response
488            autoResetEvent.WaitOne();
489
490            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
491
492            return responseWait.success;
493        }
494
495        /// <summary>
496        ///   Callback for a the synchronized remove method
497        /// </summary>
498        /// <param name = "removeResult"></param>
499        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
500        {
501            var responseWait = removeResult.AsyncState as ResponseWait;
502            if (responseWait == null)
503            {
504                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
505                return;
506            }
507
508            responseWait.success = removeResult.Status == OperationStatus.Success;
509            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
510
511            // unblock WaitHandle in the synchronous method
512            responseWait.WaitHandle.Set();
513
514            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
515        }
516
517        #endregion
518
519        #region Log facility
520
521        /// <summary>
522        ///   To log the internal state in the Monitoring Software of P@play
523        /// </summary>
524        public void LogInternalState()
525        {
526            if (Dht != null)
527            {
528                Dht.LogInternalState();
529            }
530        }
531
532        private static void LogToMonitor(string sTextToLog)
533        {
534            if (P2PSettings.Default.Log2Monitor)
535                Log.Debug(sTextToLog);
536        }
537
538        #endregion
539    }
540}
Note: See TracBrowser for help on using the repository browser.