source: trunk/CrypP2P/Internal/P2PBase.cs @ 1702

Last change on this file since 1702 was 1702, checked in by Paul Lelgemann, 11 years ago

o CrypP2P: CloseConnectionAfterPingTimeout set to true
o Updated PAP-DLLs to revision 5150

File size: 18.8 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36using PeersAtPlay.P2POverlay.Chord;
37
38/* TODO:
39 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
40 * - Testing asynchronous methods incl. EventHandlers
41 */
42
43namespace Cryptool.P2P.Internal
44{
45    /// <summary>
46    ///   Wrapper class to integrate peer@play environment into CrypTool.
47    ///   This class synchronizes asynchronous methods for easier usage in CT2.
48    /// </summary>
49    public class P2PBase
50    {
51        #region Variables
52
53        private readonly AutoResetEvent systemJoined;
54        private readonly AutoResetEvent systemLeft;
55        private IBootstrapper bootstrapper;
56        private IP2PLinkManager linkmanager;
57        private P2POverlay overlay;
58        internal IDHT Dht;
59        internal IVersionedDHT VersionedDht;
60
61        /// <summary>
62        ///   True if system was successfully joined, false if system is COMPLETELY left
63        /// </summary>
64        public bool IsConnected { get; private set; }
65
66        /// <summary>
67        ///   True if the underlying peer to peer system has been fully initialized
68        /// </summary>
69        public bool IsInitialized { get; private set; }
70
71        #endregion
72
73        #region Delegates
74
75        public event P2PMessageReceived OnP2PMessageReceived;
76        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
77
78        public event SystemJoined OnSystemJoined;
79        public delegate void SystemJoined();
80
81        public event SystemLeft OnSystemLeft;
82        public delegate void SystemLeft();
83
84        #endregion
85
86        public P2PBase()
87        {
88            IsConnected = false;
89            IsInitialized = false;
90
91            systemJoined = new AutoResetEvent(false);
92            systemLeft = new AutoResetEvent(false);
93        }
94
95        #region Basic P2P Methods (Init, Start, Stop)
96
97        /// <summary>
98        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
99        /// </summary>
100        public void Initialize()
101        {
102            Scheduler scheduler = new STAScheduler("pap");
103
104            switch (P2PSettings.Default.LinkManager)
105            {
106                case P2PLinkManagerType.Snal:
107                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
108
109                    // NAT-Traversal stuff needs a different Snal-Version
110                    linkmanager = new Snal(scheduler);
111
112                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
113                    settings.LoadDefaults();
114                    settings.ConnectInternal = true;
115                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
116                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
117                    settings.NoDelay = false;
118                    settings.ReuseAddress = false;
119                    settings.UseNetworkMonitorServer = true;
120                    settings.CloseConnectionAfterPingTimeout = true;
121
122                    settings.FragmentMessages = true;
123                    settings.FragmentMessageSize = 10*1024;
124                       
125                    switch(P2PSettings.Default.TransportProtocol)
126                    {
127                        case P2PTransportProtocol.UDP:
128                            settings.TransportProtocol = TransportProtocol.UDP;
129                            break;
130                        case P2PTransportProtocol.TCP_UDP:
131                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
132                            break;
133                        default:
134                            settings.TransportProtocol = TransportProtocol.TCP;
135                            break;
136                    }
137
138                    linkmanager.Settings = settings;
139                    linkmanager.ApplicationType = ApplicationType.CrypTool;
140
141                    break;
142                default:
143                    throw (new NotImplementedException());
144            }
145
146            switch (P2PSettings.Default.Bootstrapper)
147            {
148                case P2PBootstrapperType.LocalMachineBootstrapper:
149                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
150                    bootstrapper = new LocalMachineBootstrapper();
151                    break;
152                case P2PBootstrapperType.IrcBootstrapper:
153                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
154                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
155                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
156
157                    bootstrapper = new IrcBootstrapper(scheduler);
158                    break;
159                default:
160                    throw (new NotImplementedException());
161            }
162
163            switch (P2PSettings.Default.Architecture)
164            {
165                case P2PArchitecture.FullMesh:
166                    overlay = new FullMeshOverlay(scheduler);
167                    Dht = new FullMeshDHT(scheduler);
168                    break;
169                case P2PArchitecture.Chord:
170                    overlay = new ChordNGCore(scheduler);
171                    Dht = (IDHT) overlay;
172                    break;
173                default:
174                    throw (new NotImplementedException());
175            }
176
177            overlay.MessageReceived += OverlayMessageReceived;
178            Dht.SystemJoined += OnDhtSystemJoined;
179            Dht.SystemLeft += OnDhtSystemLeft;
180
181            VersionedDht = (IVersionedDHT) Dht;
182
183            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
184                                                NotificationLevel.Info);
185            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, overlay,
186                            bootstrapper,
187                            linkmanager, null);
188
189            IsInitialized = true;
190        }
191
192        /// <summary>
193        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
194        ///   inclusive creating the and bootstrapping to the P2P network.
195        ///   In either case joining the P2P world.
196        ///   This synchronized method returns true not before the peer has
197        ///   successfully joined the network (this may take one or two minutes).
198        /// </summary>
199        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
200        /// After validating the settings, this can be done by calling Initialize().</exception>
201        /// <returns>True, if the peer has completely joined the p2p network</returns>
202        public bool SynchStart()
203        {
204            if (!IsInitialized)
205            {
206                throw new InvalidOperationException("Peer-to-peer is not initialized.");
207            }
208
209            if (IsConnected)
210            {
211                return true;
212            }
213
214            Dht.BeginStart(BeginStartEventHandler);
215
216            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
217            systemJoined.WaitOne();
218            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
219
220            return true;
221        }
222
223        private void BeginStartEventHandler(DHTEventArgs eventArgs)
224        {
225            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
226        }
227
228        /// <summary>
229        ///   Disconnects from the peer-to-peer system.
230        /// </summary>
231        /// <returns>True, if the peer has completely left the p2p network</returns>
232        public bool SynchStop()
233        {
234            if (Dht == null) return false;
235
236            Dht.BeginStop(null);
237
238            if (!IsConnected)
239            {
240                return true;
241            }
242
243            // wait till systemLeft Event is invoked
244            systemLeft.WaitOne();
245
246            return true;
247        }
248
249        #endregion
250
251        #region Peer related method (GetPeerId, Send message to peer)
252
253        /// <summary>
254        ///   Get PeerName of the actual peer
255        /// </summary>
256        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
257        /// <returns>PeerID as a String</returns>
258        public PeerId GetPeerId(out string sPeerName)
259        {
260            sPeerName = linkmanager.UserName;
261            return new PeerId(overlay.LocalAddress);
262        }
263
264        /// <summary>
265        ///   Construct PeerId object for a specific byte[] id
266        /// </summary>
267        /// <param name = "byteId">overlay address as byte array</param>
268        /// <returns>corresponding PeerId for given byte[] id</returns>
269        public PeerId GetPeerId(byte[] byteId)
270        {
271            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
272            return new PeerId(overlay.GetAddress(byteId));
273        }
274
275        // overlay.LocalAddress = Overlay-Peer-Address/Names
276        public void SendToPeer(byte[] data, byte[] destinationPeer)
277        {
278            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
279            var realStackSize = overlay.GetHeaderSize() + data.Length;
280
281            var stackData = new ByteStack(realStackSize);
282            stackData.Push(data);
283
284            var destinationAddr = overlay.GetAddress(destinationPeer);
285            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
286                                                overlay.LocalAddress, destinationAddr, stackData);
287
288            overlay.Send(overlayMsg);
289        }
290
291        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
292        {
293            if (OnP2PMessageReceived == null) return;
294
295            var pid = new PeerId(e.Message.Source);
296            /* You have to fire this event asynchronous, because the main
297                 * thread will be stopped in this wrapper class for synchronizing
298                 * the asynchronous stuff (AutoResetEvent) --> so this could run
299                 * into a deadlock, when you fire this event synchronous (normal Invoke)
300                 * ATTENTION: This could change the invocation order!!! In my case
301                              no problem, but maybe in future cases... */
302
303            // TODO: not safe: The delegate must have only one target
304            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
305
306            foreach (var del in OnP2PMessageReceived.GetInvocationList())
307            {
308                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
309            }
310        }
311
312        #endregion
313
314        #region Event Handling (System Joined, Left and Message Received)
315
316        private void OnDhtSystemJoined(object sender, EventArgs e)
317        {
318            if (OnSystemJoined != null)
319                OnSystemJoined();
320
321            systemJoined.Set();
322            IsConnected = true;
323        }
324
325        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
326        {
327            if (OnSystemLeft != null)
328                OnSystemLeft();
329
330            IsConnected = false;
331            IsInitialized = false;
332
333            // Allow new connection to start and check for waiting / blocked tasks
334            // TODO reset running ConnectionWorkers?
335            systemLeft.Set();
336            systemJoined.Set();
337
338            LogToMonitor("CrypP2P left the system.");
339        }
340
341        #endregion
342
343        #region Synchronous Methods + their Callbacks
344
345        /// <summary>
346        ///   Stores a value in the DHT at the given key
347        /// </summary>
348        /// <param name = "key">Key of DHT Entry</param>
349        /// <param name = "value">Value of DHT Entry</param>
350        /// <returns>True, when storing is completed!</returns>
351        public RequestResult SynchStore(string key, string value)
352        {
353            return SynchStore(key, Encoding.UTF8.GetBytes(value));
354        }
355
356        /// <summary>
357        ///   Stores a value in the DHT at the given key
358        /// </summary>
359        /// <param name = "key">Key of DHT Entry</param>
360        /// <param name = "data">Value of DHT Entry</param>
361        /// <returns>True, when storing is completed!</returns>
362        public RequestResult SynchStore(string key, byte[] data)
363        {
364            var autoResetEvent = new AutoResetEvent(false);
365
366            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
367            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
368
369            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
370            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
371
372            // blocking till response
373            autoResetEvent.WaitOne();
374
375            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
376
377            return requestResult;
378        }
379
380        /// <summary>
381        ///   Callback for a the synchronized store method
382        /// </summary>
383        /// <param name = "storeResult">retrieved data container</param>
384        private static void OnSynchStoreCompleted(StoreResult storeResult)
385        {
386            var requestResult = storeResult.AsyncState as RequestResult;
387            if (requestResult == null)
388            {
389                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
390                return;
391            }
392
393            requestResult.Parse(storeResult);
394
395            // unblock WaitHandle in the synchronous method
396            requestResult.WaitHandle.Set();
397        }
398
399        /// <summary>
400        ///   Get the value of the given DHT Key or null, if it doesn't exist.
401        /// </summary>
402        /// <param name = "key">Key of DHT Entry</param>
403        /// <returns>Value of DHT Entry</returns>
404        public RequestResult SynchRetrieve(string key)
405        {
406            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
407
408            var autoResetEvent = new AutoResetEvent(false);
409            var requestResult = new RequestResult() {WaitHandle = autoResetEvent, Key = key};
410            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
411
412            // blocking till response
413            autoResetEvent.WaitOne();
414
415            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
416
417            return requestResult;
418        }
419
420        /// <summary>
421        ///   Callback for a the synchronized retrieval method
422        /// </summary>
423        /// <param name = "retrieveResult"></param>
424        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
425        {
426            var requestResult = retrieveResult.AsyncState as RequestResult;
427            if (requestResult == null)
428            {
429                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
430                return;
431            }
432
433            requestResult.Parse(retrieveResult);
434
435            // unblock WaitHandle in the synchronous method
436            requestResult.WaitHandle.Set();
437        }
438
439        /// <summary>
440        ///   Removes a key/value pair out of the DHT
441        /// </summary>
442        /// <param name = "key">Key of the DHT Entry</param>
443        /// <returns>True, when removing is completed!</returns>
444        public RequestResult SynchRemove(string key)
445        {
446            LogToMonitor("Begin SynchRemove. Key: " + key);
447
448            var autoResetEvent = new AutoResetEvent(false);
449            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
450            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
451
452            // blocking till response
453            autoResetEvent.WaitOne();
454
455            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
456
457            return requestResult;
458        }
459
460        /// <summary>
461        ///   Callback for a the synchronized remove method
462        /// </summary>
463        /// <param name = "removeResult"></param>
464        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
465        {
466            var requestResult = removeResult.AsyncState as RequestResult;
467            if (requestResult == null)
468            {
469                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
470                return;
471            }
472
473            requestResult.Parse(removeResult);
474
475            // unblock WaitHandle in the synchronous method
476            requestResult.WaitHandle.Set();
477        }
478
479        #endregion
480
481        #region Log facility
482
483        /// <summary>
484        ///   To log the internal state in the Monitoring Software of P@play
485        /// </summary>
486        public void LogInternalState()
487        {
488            if (Dht != null)
489            {
490                Dht.LogInternalState();
491            }
492        }
493
494        private static void LogToMonitor(string sTextToLog)
495        {
496            if (P2PSettings.Default.Log2Monitor)
497                Log.Debug(sTextToLog);
498        }
499
500        #endregion
501    }
502}
Note: See TracBrowser for help on using the repository browser.