source: trunk/CrypP2P/Internal/P2PBase.cs @ 1909

Last change on this file since 1909 was 1909, checked in by Paul Lelgemann, 11 years ago

o Updated peers@play libraries (including early PapsClient version)

override-bad-extension: PapsClient.dll
override-bad-extension: PapsClient.pdb

File size: 20.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Linq;
20using System.Text;
21using System.Threading;
22using Cryptool.PluginBase;
23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
30using PeersAtPlay.P2POverlay.Bootstrapper;
31using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
32using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
33using PeersAtPlay.P2POverlay.FullMeshOverlay;
34using PeersAtPlay.P2PStorage.DHT;
35using PeersAtPlay.P2PStorage.FullMeshDHT;
36using PeersAtPlay.PapsClient;
37using PeersAtPlay.Util.Logging;
38using PeersAtPlay.P2POverlay.Chord;
39
40/* TODO:
41 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
42 * - Testing asynchronous methods incl. EventHandlers
43 */
44
45namespace Cryptool.P2P.Internal
46{
47    /// <summary>
48    ///   Wrapper class to integrate peer@play environment into CrypTool.
49    ///   This class synchronizes asynchronous methods for easier usage in CT2.
50    /// </summary>
51    public class P2PBase
52    {
53        #region Variables
54
55        private readonly AutoResetEvent systemJoined;
56        private readonly AutoResetEvent systemLeft;
57        private IBootstrapper bootstrapper;
58        private IP2PLinkManager linkmanager;
59        private P2POverlay overlay;
60        internal IDHT Dht;
61        internal IVersionedDHT VersionedDht;
62
63        /// <summary>
64        ///   True if system was successfully joined, false if system is COMPLETELY left
65        /// </summary>
66        public bool IsConnected { get; private set; }
67
68        /// <summary>
69        ///   True if the underlying peer to peer system has been fully initialized
70        /// </summary>
71        public bool IsInitialized { get; private set; }
72
73        #endregion
74
75        #region Delegates
76
77        public event P2PMessageReceived OnP2PMessageReceived;
78        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
79
80        public event SystemJoined OnSystemJoined;
81        public delegate void SystemJoined();
82
83        public event SystemLeft OnSystemLeft;
84        public delegate void SystemLeft();
85
86        #endregion
87
88        public P2PBase()
89        {
90            IsConnected = false;
91            IsInitialized = false;
92
93            systemJoined = new AutoResetEvent(false);
94            systemLeft = new AutoResetEvent(false);
95        }
96
97        #region Basic P2P Methods (Init, Start, Stop)
98
99        /// <summary>
100        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
101        /// </summary>
102        public void Initialize()
103        {
104            Scheduler scheduler = new STAScheduler("pap");
105
106            switch (P2PSettings.Default.LinkManager)
107            {
108                case P2PLinkManagerType.Snal:
109                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
110
111                    // NAT-Traversal stuff needs a different Snal-Version
112                    linkmanager = new Snal(scheduler);
113
114                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
115                    settings.LoadDefaults();
116                    settings.ConnectInternal = true;
117                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
118                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
119                    settings.NoDelay = false;
120                    settings.ReuseAddress = false;
121                    settings.UseNetworkMonitorServer = true;
122                    settings.CloseConnectionAfterPingTimeout = true;
123
124                    settings.FragmentMessages = true;
125                    settings.FragmentMessageSize = 10*1024;
126                       
127                    switch(P2PSettings.Default.TransportProtocol)
128                    {
129                        case P2PTransportProtocol.UDP:
130                            settings.TransportProtocol = TransportProtocol.UDP;
131                            break;
132                        case P2PTransportProtocol.TCP_UDP:
133                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
134                            break;
135                        default:
136                            settings.TransportProtocol = TransportProtocol.TCP;
137                            break;
138                    }
139
140                    linkmanager.Settings = settings;
141                    linkmanager.ApplicationType = ApplicationType.CrypTool;
142
143                    break;
144                default:
145                    throw new NotImplementedException();
146            }
147
148            switch (P2PSettings.Default.Bootstrapper)
149            {
150                case P2PBootstrapperType.LocalMachineBootstrapper:
151                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
152                    bootstrapper = new LocalMachineBootstrapper();
153                    break;
154                case P2PBootstrapperType.IrcBootstrapper:
155                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
156                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
157                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
158
159                    bootstrapper = new IrcBootstrapper(scheduler);
160                    break;
161                default:
162                    throw new NotImplementedException();
163            }
164
165            try
166            {
167                switch (P2PSettings.Default.Architecture)
168                {
169                    case P2PArchitecture.FullMesh:
170                        overlay = new FullMeshOverlay(scheduler);
171                        Dht = new FullMeshDHT(scheduler);
172                        break;
173                    case P2PArchitecture.Chord:
174                        overlay = new ChordNGCore(scheduler);
175                        Dht = (IDHT) overlay;
176                        break;
177                    case P2PArchitecture.Server:
178                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerIP = P2PSettings.Default.ServerIP;
179                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerPort = P2PSettings.Default.ServerPort;
180                        bootstrapper = new LocalMachineBootstrapper();
181                        overlay = new PapsClientOverlay();
182                        Dht = new PapsClientDht(scheduler);
183                        break;
184                    default:
185                        throw new NotImplementedException();
186                }
187            }
188            catch(Exception e)
189            {
190                P2PManager.GuiLogMessage("Error initializing P2P network: " + e.Message, NotificationLevel.Error);
191                return;
192            }
193
194            overlay.MessageReceived += OverlayMessageReceived;
195            Dht.SystemJoined += OnDhtSystemJoined;
196            Dht.SystemLeft += OnDhtSystemLeft;
197
198            VersionedDht = (IVersionedDHT) Dht;
199
200            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
201                                        NotificationLevel.Info);
202            IsInitialized = true;
203            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, overlay,
204                            bootstrapper,
205                            linkmanager, null);
206        }
207
208        /// <summary>
209        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
210        ///   inclusive creating the and bootstrapping to the P2P network.
211        ///   In either case joining the P2P world.
212        ///   This synchronized method returns true not before the peer has
213        ///   successfully joined the network (this may take one or two minutes).
214        /// </summary>
215        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
216        /// After validating the settings, this can be done by calling Initialize().</exception>
217        /// <returns>True, if the peer has completely joined the p2p network</returns>
218        public bool SynchStart()
219        {
220            if (!IsInitialized)
221            {
222                throw new InvalidOperationException("Peer-to-peer is not initialized.");
223            }
224
225            if (IsConnected)
226            {
227                return true;
228            }
229
230            try
231            {
232                Dht.BeginStart(BeginStartEventHandler);
233
234                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
235                systemJoined.WaitOne();
236                P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
237            }
238            catch (Exception e)
239            {
240                e.GetBaseException();
241            }
242
243            return true;
244        }
245
246        private void BeginStartEventHandler(DHTEventArgs eventArgs)
247        {
248            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
249        }
250
251        /// <summary>
252        ///   Disconnects from the peer-to-peer system.
253        /// </summary>
254        /// <returns>True, if the peer has completely left the p2p network</returns>
255        public bool SynchStop()
256        {
257            if (Dht == null) return false;
258
259            Dht.BeginStop(null);
260
261            if (!IsConnected)
262            {
263                return true;
264            }
265
266            // wait till systemLeft Event is invoked
267            systemLeft.WaitOne();
268
269            return true;
270        }
271
272        #endregion
273
274        #region Peer related method (GetPeerId, Send message to peer)
275
276        /// <summary>
277        ///   Get PeerName of the actual peer
278        /// </summary>
279        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
280        /// <returns>PeerID as a String</returns>
281        public PeerId GetPeerId(out string sPeerName)
282        {
283            sPeerName = linkmanager.UserName;
284            return new PeerId(overlay.LocalAddress);
285        }
286
287        /// <summary>
288        ///   Construct PeerId object for a specific byte[] id
289        /// </summary>
290        /// <param name = "byteId">overlay address as byte array</param>
291        /// <returns>corresponding PeerId for given byte[] id</returns>
292        public PeerId GetPeerId(byte[] byteId)
293        {
294            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
295            return new PeerId(overlay.GetAddress(byteId));
296        }
297
298        // overlay.LocalAddress = Overlay-Peer-Address/Names
299        public void SendToPeer(byte[] data, byte[] destinationPeer)
300        {
301            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
302            var realStackSize = overlay.GetHeaderSize() + data.Length;
303
304            var stackData = new ByteStack(realStackSize);
305            stackData.Push(data);
306
307            var destinationAddr = overlay.GetAddress(destinationPeer);
308            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
309                                                overlay.LocalAddress, destinationAddr, stackData);
310
311            overlay.Send(overlayMsg);
312        }
313
314        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
315        {
316            if (OnP2PMessageReceived == null) return;
317
318            var pid = new PeerId(e.Message.Source);
319            /* You have to fire this event asynchronous, because the main
320                 * thread will be stopped in this wrapper class for synchronizing
321                 * the asynchronous stuff (AutoResetEvent) --> so this could run
322                 * into a deadlock, when you fire this event synchronous (normal Invoke)
323                 * ATTENTION: This could change the invocation order!!! In my case
324                              no problem, but maybe in future cases... */
325
326            // TODO: not safe: The delegate must have only one target
327            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
328
329            foreach (var del in OnP2PMessageReceived.GetInvocationList())
330            {
331                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
332            }
333        }
334
335        #endregion
336
337        #region Event Handling (System Joined, Left and Message Received)
338
339        private void OnDhtSystemJoined(object sender, EventArgs e)
340        {
341            IsConnected = true;
342
343            if (OnSystemJoined != null)
344                OnSystemJoined();
345
346            systemJoined.Set();
347        }
348
349        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
350        {
351            if (OnSystemLeft != null)
352                OnSystemLeft();
353
354            IsConnected = false;
355            IsInitialized = false;
356
357            // Allow new connection to start and check for waiting / blocked tasks
358            // TODO reset running ConnectionWorkers?
359            systemLeft.Set();
360            systemJoined.Set();
361
362            LogToMonitor("CrypP2P left the system.");
363        }
364
365        #endregion
366
367        #region Synchronous Methods + their Callbacks
368
369        /// <summary>
370        ///   Stores a value in the DHT at the given key
371        /// </summary>
372        /// <param name = "key">Key of DHT Entry</param>
373        /// <param name = "value">Value of DHT Entry</param>
374        /// <returns>True, when storing is completed!</returns>
375        public RequestResult SynchStore(string key, string value)
376        {
377            return SynchStore(key, Encoding.UTF8.GetBytes(value));
378        }
379
380        /// <summary>
381        ///   Stores a value in the DHT at the given key
382        /// </summary>
383        /// <param name = "key">Key of DHT Entry</param>
384        /// <param name = "data">Value of DHT Entry</param>
385        /// <returns>True, when storing is completed!</returns>
386        public RequestResult SynchStore(string key, byte[] data)
387        {
388            var autoResetEvent = new AutoResetEvent(false);
389
390            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
391            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
392
393            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
394            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
395
396            // blocking till response
397            autoResetEvent.WaitOne();
398
399            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
400
401            return requestResult;
402        }
403
404        /// <summary>
405        ///   Callback for a the synchronized store method
406        /// </summary>
407        /// <param name = "storeResult">retrieved data container</param>
408        private static void OnSynchStoreCompleted(StoreResult storeResult)
409        {
410            var requestResult = storeResult.AsyncState as RequestResult;
411            if (requestResult == null)
412            {
413                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
414                return;
415            }
416
417            requestResult.Parse(storeResult);
418
419            // unblock WaitHandle in the synchronous method
420            requestResult.WaitHandle.Set();
421        }
422
423        /// <summary>
424        ///   Get the value of the given DHT Key or null, if it doesn't exist.
425        /// </summary>
426        /// <param name = "key">Key of DHT Entry</param>
427        /// <returns>Value of DHT Entry</returns>
428        public RequestResult SynchRetrieve(string key)
429        {
430            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
431
432            var autoResetEvent = new AutoResetEvent(false);
433            var requestResult = new RequestResult() {WaitHandle = autoResetEvent, Key = key};
434            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
435
436            // blocking till response
437            autoResetEvent.WaitOne();
438
439            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
440
441            return requestResult;
442        }
443
444        /// <summary>
445        ///   Callback for a the synchronized retrieval method
446        /// </summary>
447        /// <param name = "retrieveResult"></param>
448        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
449        {
450            var requestResult = retrieveResult.AsyncState as RequestResult;
451            if (requestResult == null)
452            {
453                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
454                return;
455            }
456
457            requestResult.Parse(retrieveResult);
458
459            // unblock WaitHandle in the synchronous method
460            requestResult.WaitHandle.Set();
461        }
462
463        /// <summary>
464        ///   Removes a key/value pair out of the DHT
465        /// </summary>
466        /// <param name = "key">Key of the DHT Entry</param>
467        /// <returns>True, when removing is completed!</returns>
468        public RequestResult SynchRemove(string key)
469        {
470            LogToMonitor("Begin SynchRemove. Key: " + key);
471
472            var autoResetEvent = new AutoResetEvent(false);
473            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
474            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
475
476            // blocking till response
477            autoResetEvent.WaitOne();
478
479            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
480
481            return requestResult;
482        }
483
484        /// <summary>
485        ///   Callback for a the synchronized remove method
486        /// </summary>
487        /// <param name = "removeResult"></param>
488        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
489        {
490            var requestResult = removeResult.AsyncState as RequestResult;
491            if (requestResult == null)
492            {
493                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
494                return;
495            }
496
497            requestResult.Parse(removeResult);
498
499            // unblock WaitHandle in the synchronous method
500            requestResult.WaitHandle.Set();
501        }
502
503        #endregion
504
505        #region Statistic Methods
506
507        public long TotalBytesSentOnAllLinks()
508        {
509            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
510        }
511
512        public long TotalBytesReceivedOnAllLinks()
513        {
514            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
515        }
516
517        #endregion
518
519        #region Log facility
520
521        /// <summary>
522        ///   To log the internal state in the Monitoring Software of P@play
523        /// </summary>
524        public void LogInternalState()
525        {
526            if (Dht != null)
527            {
528                Dht.LogInternalState();
529            }
530        }
531
532        private static void LogToMonitor(string sTextToLog)
533        {
534            if (P2PSettings.Default.Log2Monitor)
535                Log.Debug(sTextToLog);
536        }
537
538        #endregion
539    }
540}
Note: See TracBrowser for help on using the repository browser.