source: trunk/CrypP2P/Internal/P2PBase.cs @ 1704

Last change on this file since 1704 was 1704, checked in by Paul Lelgemann, 11 years ago

+ KeySearcher: display of aggregated linkmanager statistics
o KeySearcher: job identifier extended with sample decryption data to preserve decryption algorithm settings

File size: 19.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Linq;
20using System.Text;
21using System.Threading;
22using Cryptool.PluginBase;
23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
30using PeersAtPlay.P2POverlay.Bootstrapper;
31using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
32using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
33using PeersAtPlay.P2POverlay.FullMeshOverlay;
34using PeersAtPlay.P2PStorage.DHT;
35using PeersAtPlay.P2PStorage.FullMeshDHT;
36using PeersAtPlay.Util.Logging;
37using PeersAtPlay.P2POverlay.Chord;
38
39/* TODO:
40 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
41 * - Testing asynchronous methods incl. EventHandlers
42 */
43
44namespace Cryptool.P2P.Internal
45{
46    /// <summary>
47    ///   Wrapper class to integrate peer@play environment into CrypTool.
48    ///   This class synchronizes asynchronous methods for easier usage in CT2.
49    /// </summary>
50    public class P2PBase
51    {
52        #region Variables
53
54        private readonly AutoResetEvent systemJoined;
55        private readonly AutoResetEvent systemLeft;
56        private IBootstrapper bootstrapper;
57        private IP2PLinkManager linkmanager;
58        private P2POverlay overlay;
59        internal IDHT Dht;
60        internal IVersionedDHT VersionedDht;
61
62        /// <summary>
63        ///   True if system was successfully joined, false if system is COMPLETELY left
64        /// </summary>
65        public bool IsConnected { get; private set; }
66
67        /// <summary>
68        ///   True if the underlying peer to peer system has been fully initialized
69        /// </summary>
70        public bool IsInitialized { get; private set; }
71
72        #endregion
73
74        #region Delegates
75
76        public event P2PMessageReceived OnP2PMessageReceived;
77        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
78
79        public event SystemJoined OnSystemJoined;
80        public delegate void SystemJoined();
81
82        public event SystemLeft OnSystemLeft;
83        public delegate void SystemLeft();
84
85        #endregion
86
87        public P2PBase()
88        {
89            IsConnected = false;
90            IsInitialized = false;
91
92            systemJoined = new AutoResetEvent(false);
93            systemLeft = new AutoResetEvent(false);
94        }
95
96        #region Basic P2P Methods (Init, Start, Stop)
97
98        /// <summary>
99        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
100        /// </summary>
101        public void Initialize()
102        {
103            Scheduler scheduler = new STAScheduler("pap");
104
105            switch (P2PSettings.Default.LinkManager)
106            {
107                case P2PLinkManagerType.Snal:
108                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
109
110                    // NAT-Traversal stuff needs a different Snal-Version
111                    linkmanager = new Snal(scheduler);
112
113                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
114                    settings.LoadDefaults();
115                    settings.ConnectInternal = true;
116                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
117                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
118                    settings.NoDelay = false;
119                    settings.ReuseAddress = false;
120                    settings.UseNetworkMonitorServer = true;
121                    settings.CloseConnectionAfterPingTimeout = true;
122
123                    settings.FragmentMessages = true;
124                    settings.FragmentMessageSize = 10*1024;
125                       
126                    switch(P2PSettings.Default.TransportProtocol)
127                    {
128                        case P2PTransportProtocol.UDP:
129                            settings.TransportProtocol = TransportProtocol.UDP;
130                            break;
131                        case P2PTransportProtocol.TCP_UDP:
132                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
133                            break;
134                        default:
135                            settings.TransportProtocol = TransportProtocol.TCP;
136                            break;
137                    }
138
139                    linkmanager.Settings = settings;
140                    linkmanager.ApplicationType = ApplicationType.CrypTool;
141
142                    break;
143                default:
144                    throw (new NotImplementedException());
145            }
146
147            switch (P2PSettings.Default.Bootstrapper)
148            {
149                case P2PBootstrapperType.LocalMachineBootstrapper:
150                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
151                    bootstrapper = new LocalMachineBootstrapper();
152                    break;
153                case P2PBootstrapperType.IrcBootstrapper:
154                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
155                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
156                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
157
158                    bootstrapper = new IrcBootstrapper(scheduler);
159                    break;
160                default:
161                    throw (new NotImplementedException());
162            }
163
164            switch (P2PSettings.Default.Architecture)
165            {
166                case P2PArchitecture.FullMesh:
167                    overlay = new FullMeshOverlay(scheduler);
168                    Dht = new FullMeshDHT(scheduler);
169                    break;
170                case P2PArchitecture.Chord:
171                    overlay = new ChordNGCore(scheduler);
172                    Dht = (IDHT) overlay;
173                    break;
174                default:
175                    throw (new NotImplementedException());
176            }
177
178            overlay.MessageReceived += OverlayMessageReceived;
179            Dht.SystemJoined += OnDhtSystemJoined;
180            Dht.SystemLeft += OnDhtSystemLeft;
181
182            VersionedDht = (IVersionedDHT) Dht;
183
184            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
185                                                NotificationLevel.Info);
186            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, overlay,
187                            bootstrapper,
188                            linkmanager, null);
189
190            IsInitialized = true;
191        }
192
193        /// <summary>
194        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
195        ///   inclusive creating the and bootstrapping to the P2P network.
196        ///   In either case joining the P2P world.
197        ///   This synchronized method returns true not before the peer has
198        ///   successfully joined the network (this may take one or two minutes).
199        /// </summary>
200        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
201        /// After validating the settings, this can be done by calling Initialize().</exception>
202        /// <returns>True, if the peer has completely joined the p2p network</returns>
203        public bool SynchStart()
204        {
205            if (!IsInitialized)
206            {
207                throw new InvalidOperationException("Peer-to-peer is not initialized.");
208            }
209
210            if (IsConnected)
211            {
212                return true;
213            }
214
215            Dht.BeginStart(BeginStartEventHandler);
216
217            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
218            systemJoined.WaitOne();
219            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
220
221            return true;
222        }
223
224        private void BeginStartEventHandler(DHTEventArgs eventArgs)
225        {
226            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
227        }
228
229        /// <summary>
230        ///   Disconnects from the peer-to-peer system.
231        /// </summary>
232        /// <returns>True, if the peer has completely left the p2p network</returns>
233        public bool SynchStop()
234        {
235            if (Dht == null) return false;
236
237            Dht.BeginStop(null);
238
239            if (!IsConnected)
240            {
241                return true;
242            }
243
244            // wait till systemLeft Event is invoked
245            systemLeft.WaitOne();
246
247            return true;
248        }
249
250        #endregion
251
252        #region Peer related method (GetPeerId, Send message to peer)
253
254        /// <summary>
255        ///   Get PeerName of the actual peer
256        /// </summary>
257        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
258        /// <returns>PeerID as a String</returns>
259        public PeerId GetPeerId(out string sPeerName)
260        {
261            sPeerName = linkmanager.UserName;
262            return new PeerId(overlay.LocalAddress);
263        }
264
265        /// <summary>
266        ///   Construct PeerId object for a specific byte[] id
267        /// </summary>
268        /// <param name = "byteId">overlay address as byte array</param>
269        /// <returns>corresponding PeerId for given byte[] id</returns>
270        public PeerId GetPeerId(byte[] byteId)
271        {
272            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
273            return new PeerId(overlay.GetAddress(byteId));
274        }
275
276        // overlay.LocalAddress = Overlay-Peer-Address/Names
277        public void SendToPeer(byte[] data, byte[] destinationPeer)
278        {
279            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
280            var realStackSize = overlay.GetHeaderSize() + data.Length;
281
282            var stackData = new ByteStack(realStackSize);
283            stackData.Push(data);
284
285            var destinationAddr = overlay.GetAddress(destinationPeer);
286            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
287                                                overlay.LocalAddress, destinationAddr, stackData);
288
289            overlay.Send(overlayMsg);
290        }
291
292        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
293        {
294            if (OnP2PMessageReceived == null) return;
295
296            var pid = new PeerId(e.Message.Source);
297            /* You have to fire this event asynchronous, because the main
298                 * thread will be stopped in this wrapper class for synchronizing
299                 * the asynchronous stuff (AutoResetEvent) --> so this could run
300                 * into a deadlock, when you fire this event synchronous (normal Invoke)
301                 * ATTENTION: This could change the invocation order!!! In my case
302                              no problem, but maybe in future cases... */
303
304            // TODO: not safe: The delegate must have only one target
305            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
306
307            foreach (var del in OnP2PMessageReceived.GetInvocationList())
308            {
309                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
310            }
311        }
312
313        #endregion
314
315        #region Event Handling (System Joined, Left and Message Received)
316
317        private void OnDhtSystemJoined(object sender, EventArgs e)
318        {
319            if (OnSystemJoined != null)
320                OnSystemJoined();
321
322            systemJoined.Set();
323            IsConnected = true;
324        }
325
326        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
327        {
328            if (OnSystemLeft != null)
329                OnSystemLeft();
330
331            IsConnected = false;
332            IsInitialized = false;
333
334            // Allow new connection to start and check for waiting / blocked tasks
335            // TODO reset running ConnectionWorkers?
336            systemLeft.Set();
337            systemJoined.Set();
338
339            LogToMonitor("CrypP2P left the system.");
340        }
341
342        #endregion
343
344        #region Synchronous Methods + their Callbacks
345
346        /// <summary>
347        ///   Stores a value in the DHT at the given key
348        /// </summary>
349        /// <param name = "key">Key of DHT Entry</param>
350        /// <param name = "value">Value of DHT Entry</param>
351        /// <returns>True, when storing is completed!</returns>
352        public RequestResult SynchStore(string key, string value)
353        {
354            return SynchStore(key, Encoding.UTF8.GetBytes(value));
355        }
356
357        /// <summary>
358        ///   Stores a value in the DHT at the given key
359        /// </summary>
360        /// <param name = "key">Key of DHT Entry</param>
361        /// <param name = "data">Value of DHT Entry</param>
362        /// <returns>True, when storing is completed!</returns>
363        public RequestResult SynchStore(string key, byte[] data)
364        {
365            var autoResetEvent = new AutoResetEvent(false);
366
367            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
368            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
369
370            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
371            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
372
373            // blocking till response
374            autoResetEvent.WaitOne();
375
376            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
377
378            return requestResult;
379        }
380
381        /// <summary>
382        ///   Callback for a the synchronized store method
383        /// </summary>
384        /// <param name = "storeResult">retrieved data container</param>
385        private static void OnSynchStoreCompleted(StoreResult storeResult)
386        {
387            var requestResult = storeResult.AsyncState as RequestResult;
388            if (requestResult == null)
389            {
390                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
391                return;
392            }
393
394            requestResult.Parse(storeResult);
395
396            // unblock WaitHandle in the synchronous method
397            requestResult.WaitHandle.Set();
398        }
399
400        /// <summary>
401        ///   Get the value of the given DHT Key or null, if it doesn't exist.
402        /// </summary>
403        /// <param name = "key">Key of DHT Entry</param>
404        /// <returns>Value of DHT Entry</returns>
405        public RequestResult SynchRetrieve(string key)
406        {
407            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
408
409            var autoResetEvent = new AutoResetEvent(false);
410            var requestResult = new RequestResult() {WaitHandle = autoResetEvent, Key = key};
411            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
412
413            // blocking till response
414            autoResetEvent.WaitOne();
415
416            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
417
418            return requestResult;
419        }
420
421        /// <summary>
422        ///   Callback for a the synchronized retrieval method
423        /// </summary>
424        /// <param name = "retrieveResult"></param>
425        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
426        {
427            var requestResult = retrieveResult.AsyncState as RequestResult;
428            if (requestResult == null)
429            {
430                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
431                return;
432            }
433
434            requestResult.Parse(retrieveResult);
435
436            // unblock WaitHandle in the synchronous method
437            requestResult.WaitHandle.Set();
438        }
439
440        /// <summary>
441        ///   Removes a key/value pair out of the DHT
442        /// </summary>
443        /// <param name = "key">Key of the DHT Entry</param>
444        /// <returns>True, when removing is completed!</returns>
445        public RequestResult SynchRemove(string key)
446        {
447            LogToMonitor("Begin SynchRemove. Key: " + key);
448
449            var autoResetEvent = new AutoResetEvent(false);
450            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
451            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
452
453            // blocking till response
454            autoResetEvent.WaitOne();
455
456            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
457
458            return requestResult;
459        }
460
461        /// <summary>
462        ///   Callback for a the synchronized remove method
463        /// </summary>
464        /// <param name = "removeResult"></param>
465        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
466        {
467            var requestResult = removeResult.AsyncState as RequestResult;
468            if (requestResult == null)
469            {
470                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
471                return;
472            }
473
474            requestResult.Parse(removeResult);
475
476            // unblock WaitHandle in the synchronous method
477            requestResult.WaitHandle.Set();
478        }
479
480        #endregion
481
482        #region Statistic Methods
483
484        public long TotalBytesSentOnAllLinks()
485        {
486            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
487        }
488
489        public long TotalBytesReceivedOnAllLinks()
490        {
491            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
492        }
493
494        #endregion
495
496        #region Log facility
497
498        /// <summary>
499        ///   To log the internal state in the Monitoring Software of P@play
500        /// </summary>
501        public void LogInternalState()
502        {
503            if (Dht != null)
504            {
505                Dht.LogInternalState();
506            }
507        }
508
509        private static void LogToMonitor(string sTextToLog)
510        {
511            if (P2PSettings.Default.Log2Monitor)
512                Log.Debug(sTextToLog);
513        }
514
515        #endregion
516    }
517}
Note: See TracBrowser for help on using the repository browser.