source: trunk/CrypP2P/Internal/P2PBase.cs @ 2154

Last change on this file since 2154 was 2154, checked in by Arno Wacker, 11 years ago
  • Added WebDHT to P2PBase and P2PEditor
File size: 20.8 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Linq;
20using System.Text;
21using System.Threading;
22using Cryptool.PluginBase;
23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
30using PeersAtPlay.P2POverlay.Bootstrapper;
31using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
32using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
33using PeersAtPlay.P2POverlay.FullMeshOverlay;
34using PeersAtPlay.P2PStorage.DHT;
35using PeersAtPlay.P2PStorage.FullMeshDHT;
36using PeersAtPlay.PapsClient;
37using PeersAtPlay.Util.Logging;
38using PeersAtPlay.P2POverlay.Chord;
39using PeersAtPlay.P2PStorage.MySqlDHT;
40using PeersAtPlay.P2PStorage.WebDHT;
41
42/* TODO:
43 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
44 * - Testing asynchronous methods incl. EventHandlers
45 */
46
47namespace Cryptool.P2P.Internal
48{
49    /// <summary>
50    ///   Wrapper class to integrate peer@play environment into CrypTool.
51    ///   This class synchronizes asynchronous methods for easier usage in CT2.
52    /// </summary>
53    public class P2PBase
54    {
55        #region Variables
56
57        private AutoResetEvent systemJoined;
58        private readonly AutoResetEvent systemLeft;
59        private IBootstrapper bootstrapper;
60        private IP2PLinkManager linkmanager;
61        private P2POverlay overlay;
62        internal IDHT Dht;
63        internal IVersionedDHT VersionedDht;
64
65        /// <summary>
66        ///   True if system was successfully joined, false if system is COMPLETELY left
67        /// </summary>
68        public bool IsConnected { get; private set; }
69
70        /// <summary>
71        ///   True if the underlying peer to peer system has been fully initialized
72        /// </summary>
73        public bool IsInitialized { get; private set; }
74
75        #endregion
76
77        #region Delegates
78
79        public event P2PMessageReceived OnP2PMessageReceived;
80        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
81
82        public event SystemJoined OnSystemJoined;
83        public delegate void SystemJoined();
84
85        public event SystemLeft OnSystemLeft;
86        public delegate void SystemLeft();
87
88        #endregion
89
90        public P2PBase()
91        {
92            IsConnected = false;
93            IsInitialized = false;
94
95            systemJoined = new AutoResetEvent(false);
96            systemLeft = new AutoResetEvent(false);
97        }
98
99        #region Basic P2P Methods (Init, Start, Stop)
100
101        /// <summary>
102        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
103        /// </summary>
104        public void Initialize()
105        {
106            Scheduler scheduler = new STAScheduler("pap_snal");
107            Scheduler scheduler_2 = new STAScheduler("pap_mysql");
108
109            switch (P2PSettings.Default.LinkManager)
110            {
111                case P2PLinkManagerType.Snal:
112                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
113
114                    // NAT-Traversal stuff needs a different Snal-Version
115                    linkmanager = new Snal(scheduler);
116
117                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
118                    settings.LoadDefaults();
119                    settings.ConnectInternal = true;
120                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
121                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
122                    settings.NoDelay = false;
123                    settings.ReuseAddress = false;
124                    settings.UseNetworkMonitorServer = true;
125                    settings.CloseConnectionAfterPingTimeout = true;
126
127                    settings.FragmentMessages = true;
128                    settings.FragmentMessageSize = 10*1024;
129                       
130                    switch(P2PSettings.Default.TransportProtocol)
131                    {
132                        case P2PTransportProtocol.UDP:
133                            settings.TransportProtocol = TransportProtocol.UDP;
134                            break;
135                        case P2PTransportProtocol.TCP_UDP:
136                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
137                            break;
138                        default:
139                            settings.TransportProtocol = TransportProtocol.TCP;
140                            break;
141                    }
142
143                    linkmanager.Settings = settings;
144                    linkmanager.ApplicationType = ApplicationType.CrypTool;
145
146                    break;
147                default:
148                    throw new NotImplementedException();
149            }
150
151            switch (P2PSettings.Default.Bootstrapper)
152            {
153                case P2PBootstrapperType.LocalMachineBootstrapper:
154                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
155                    bootstrapper = new LocalMachineBootstrapper();
156                    break;
157                case P2PBootstrapperType.IrcBootstrapper:
158                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
159                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
160                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
161
162                    bootstrapper = new IrcBootstrapper(scheduler);
163                    break;
164                default:
165                    throw new NotImplementedException();
166            }
167
168            try
169            {
170                switch (P2PSettings.Default.Architecture)
171                {
172                    case P2PArchitecture.FullMesh:
173                        overlay = new FullMeshOverlay(scheduler);
174                        Dht = new FullMeshDHT(scheduler);
175                        break;
176                    case P2PArchitecture.Chord:
177                        overlay = new ChordNGCore(scheduler);
178                        Dht = (IDHT) overlay;
179                        break;
180                    case P2PArchitecture.Server:
181                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerHost = P2PSettings.Default.ServerHost;
182                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerPort = P2PSettings.Default.ServerPort;
183                        bootstrapper = new LocalMachineBootstrapper();
184                        overlay = new PapsClientOverlay();
185                        Dht = new PapsClientDht(scheduler_2);
186                        break;
187                    case P2PArchitecture.SQLDB:
188                        Dht = new MySqlDHT(scheduler_2);
189                        break;
190                    case P2PArchitecture.WebDHT:
191                        Dht = new WebDHT(scheduler_2);
192                        break;
193                    default:
194                        throw new NotImplementedException();
195                }
196            }
197            catch(Exception e)
198            {
199                P2PManager.GuiLogMessage("Error initializing P2P network: " + e.Message, NotificationLevel.Error);
200                return;
201            }
202
203            if (overlay != null)
204            {
205                overlay.MessageReceived += OverlayMessageReceived;
206            }
207            Dht.SystemJoined += OnDhtSystemJoined;
208            Dht.SystemLeft += OnDhtSystemLeft;
209
210            VersionedDht = (IVersionedDHT) Dht;
211
212            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
213                                        NotificationLevel.Info);
214            IsInitialized = true;
215            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, overlay,
216                           bootstrapper, linkmanager, null);
217        }
218
219        /// <summary>
220        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
221        ///   inclusive creating the and bootstrapping to the P2P network.
222        ///   In either case joining the P2P world.
223        ///   This synchronized method returns true not before the peer has
224        ///   successfully joined the network (this may take one or two minutes).
225        /// </summary>
226        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
227        /// After validating the settings, this can be done by calling Initialize().</exception>
228        /// <returns>True, if the peer has completely joined the p2p network</returns>
229        public bool SynchStart()
230        {
231            if (!IsInitialized)
232            {
233                throw new InvalidOperationException("Peer-to-peer is not initialized.");
234            }
235
236            if (IsConnected)
237            {
238                return true;
239            }
240
241            try
242            {
243                Dht.BeginStart(BeginStartEventHandler);
244
245                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
246                systemJoined.WaitOne();
247                P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
248            }
249            catch (Exception e)
250            {
251                e.GetBaseException();
252            }
253
254            return true;
255        }
256
257        private void BeginStartEventHandler(DHTEventArgs eventArgs)
258        {
259            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
260        }
261
262        /// <summary>
263        ///   Disconnects from the peer-to-peer system.
264        /// </summary>
265        /// <returns>True, if the peer has completely left the p2p network</returns>
266        public bool SynchStop()
267        {
268            if (Dht == null) return false;
269
270            Dht.BeginStop(null);
271
272            if (IsConnected)
273                systemLeft.WaitOne();
274
275            systemJoined.Reset();
276            systemLeft.Reset();
277
278            return true;
279        }
280
281        #endregion
282
283        #region Peer related method (GetPeerId, Send message to peer)
284
285        /// <summary>
286        ///   Get PeerName of the actual peer
287        /// </summary>
288        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
289        /// <returns>PeerID as a String</returns>
290        public PeerId GetPeerId(out string sPeerName)
291        {
292            sPeerName = linkmanager.UserName;
293            return new PeerId(overlay.LocalAddress);
294        }
295
296        /// <summary>
297        ///   Construct PeerId object for a specific byte[] id
298        /// </summary>
299        /// <param name = "byteId">overlay address as byte array</param>
300        /// <returns>corresponding PeerId for given byte[] id</returns>
301        public PeerId GetPeerId(byte[] byteId)
302        {
303            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
304            return new PeerId(overlay.GetAddress(byteId));
305        }
306
307        // overlay.LocalAddress = Overlay-Peer-Address/Names
308        public void SendToPeer(byte[] data, byte[] destinationPeer)
309        {
310            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
311            var realStackSize = overlay.GetHeaderSize() + data.Length;
312
313            var stackData = new ByteStack(realStackSize);
314            stackData.Push(data);
315
316            var destinationAddr = overlay.GetAddress(destinationPeer);
317            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
318                                                overlay.LocalAddress, destinationAddr, stackData);
319
320            overlay.Send(overlayMsg);
321        }
322
323        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
324        {
325            if (OnP2PMessageReceived == null) return;
326
327            var pid = new PeerId(e.Message.Source);
328            /* You have to fire this event asynchronous, because the main
329                 * thread will be stopped in this wrapper class for synchronizing
330                 * the asynchronous stuff (AutoResetEvent) --> so this could run
331                 * into a deadlock, when you fire this event synchronous (normal Invoke)
332                 * ATTENTION: This could change the invocation order!!! In my case
333                              no problem, but maybe in future cases... */
334
335            // TODO: not safe: The delegate must have only one target
336            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
337
338            foreach (var del in OnP2PMessageReceived.GetInvocationList())
339            {
340                var data = e.Message.Data.ToArray();
341                if (e.Message.Data.CurrentStackSize != 0)
342                    data = e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize);
343
344                del.DynamicInvoke(pid, data);
345            }
346        }
347
348        #endregion
349
350        #region Event Handling (System Joined, Left and Message Received)
351
352        private void OnDhtSystemJoined(object sender, EventArgs e)
353        {
354            IsConnected = true;
355
356            if (OnSystemJoined != null)
357                OnSystemJoined();
358
359            systemJoined.Set();
360        }
361
362        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
363        {
364            IsConnected = false;
365            IsInitialized = false;
366
367            // Allow new connection to start and check for waiting / blocked tasks
368            // TODO reset running ConnectionWorkers?
369            systemLeft.Set();
370            systemJoined.Set();
371
372            LogToMonitor("CrypP2P left the system.");
373
374            if (OnSystemLeft != null)
375                OnSystemLeft();
376        }
377
378        #endregion
379
380        #region Synchronous Methods + their Callbacks
381
382        /// <summary>
383        ///   Stores a value in the DHT at the given key
384        /// </summary>
385        /// <param name = "key">Key of DHT Entry</param>
386        /// <param name = "value">Value of DHT Entry</param>
387        /// <returns>True, when storing is completed!</returns>
388        public RequestResult SynchStore(string key, string value)
389        {
390            return SynchStore(key, Encoding.UTF8.GetBytes(value));
391        }
392
393        /// <summary>
394        ///   Stores a value in the DHT at the given key
395        /// </summary>
396        /// <param name = "key">Key of DHT Entry</param>
397        /// <param name = "data">Value of DHT Entry</param>
398        /// <returns>True, when storing is completed!</returns>
399        public RequestResult SynchStore(string key, byte[] data)
400        {
401            var autoResetEvent = new AutoResetEvent(false);
402
403            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
404            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + (data != null ? data.Length : 0) + " bytes");
405
406            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
407            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
408
409            // blocking till response
410            autoResetEvent.WaitOne();
411
412            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
413
414            return requestResult;
415        }
416
417        /// <summary>
418        ///   Callback for a the synchronized store method
419        /// </summary>
420        /// <param name = "storeResult">retrieved data container</param>
421        private static void OnSynchStoreCompleted(StoreResult storeResult)
422        {
423            var requestResult = storeResult.AsyncState as RequestResult;
424            if (requestResult == null)
425            {
426                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
427                return;
428            }
429
430            requestResult.Parse(storeResult);
431
432            // unblock WaitHandle in the synchronous method
433            requestResult.WaitHandle.Set();
434        }
435
436        /// <summary>
437        ///   Get the value of the given DHT Key or null, if it doesn't exist.
438        /// </summary>
439        /// <param name = "key">Key of DHT Entry</param>
440        /// <returns>Value of DHT Entry</returns>
441        public RequestResult SynchRetrieve(string key)
442        {
443            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
444
445            var autoResetEvent = new AutoResetEvent(false);
446            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key};
447
448            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
449
450            // blocking till response
451            autoResetEvent.WaitOne();
452
453            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
454
455            return requestResult;
456        }
457
458        /// <summary>
459        ///   Callback for a the synchronized retrieval method
460        /// </summary>
461        /// <param name = "retrieveResult"></param>
462        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
463        {
464            var requestResult = retrieveResult.AsyncState as RequestResult;
465            if (requestResult == null)
466            {
467                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
468                return;
469            }
470
471            requestResult.Parse(retrieveResult);
472
473            // unblock WaitHandle in the synchronous method
474            requestResult.WaitHandle.Set();
475        }
476
477        /// <summary>
478        ///   Removes a key/value pair out of the DHT
479        /// </summary>
480        /// <param name = "key">Key of the DHT Entry</param>
481        /// <returns>True, when removing is completed!</returns>
482        public RequestResult SynchRemove(string key)
483        {
484            LogToMonitor("Begin SynchRemove. Key: " + key);
485
486            var autoResetEvent = new AutoResetEvent(false);
487            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
488            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
489
490            // blocking till response
491            autoResetEvent.WaitOne();
492
493            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
494
495            return requestResult;
496        }
497
498        /// <summary>
499        ///   Callback for a the synchronized remove method
500        /// </summary>
501        /// <param name = "removeResult"></param>
502        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
503        {
504            var requestResult = removeResult.AsyncState as RequestResult;
505            if (requestResult == null)
506            {
507                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
508                return;
509            }
510
511            requestResult.Parse(removeResult);
512
513            // unblock WaitHandle in the synchronous method
514            requestResult.WaitHandle.Set();
515        }
516
517        #endregion
518
519        #region Statistic Methods
520
521        public long TotalBytesSentOnAllLinks()
522        {
523            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
524        }
525
526        public long TotalBytesReceivedOnAllLinks()
527        {
528            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
529        }
530
531        #endregion
532
533        #region Log facility
534
535        /// <summary>
536        ///   To log the internal state in the Monitoring Software of P@play
537        /// </summary>
538        public void LogInternalState()
539        {
540            if (Dht != null)
541            {
542                Dht.LogInternalState();
543            }
544        }
545
546        private static void LogToMonitor(string sTextToLog)
547        {
548            if (P2PSettings.Default.Log2Monitor)
549                Log.Debug(sTextToLog);
550        }
551
552        #endregion
553    }
554}
Note: See TracBrowser for help on using the repository browser.