source: trunk/CrypP2P/Internal/P2PBase.cs @ 2269

Last change on this file since 2269 was 2269, checked in by Matthäus Wander, 11 years ago
  • Replaced peers@play DLLs with pap rev 626
  • This commit breaks the nightly build. Work in progress...
  • moved AnyCPU assemblies from x86/x64 to generic AppReferences directory
  • moved NativeCrypto project to subdirectory, see #164

override-bad-extension: MySql.data.dll
override-bad-extension: PacketDotNet.dll
override-bad-extension: SharpPcap.dll

File size: 20.4 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Linq;
20using System.Text;
21using System.Threading;
22using Cryptool.PluginBase;
23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
30using PeersAtPlay.P2POverlay.Bootstrapper;
31using PeersAtPlay.P2POverlay.Bootstrapper.DnsBootstrapper;
32using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
33using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
34using PeersAtPlay.P2POverlay.FullMeshOverlay;
35using PeersAtPlay.P2PStorage.DHT;
36using PeersAtPlay.P2PStorage.FullMeshDHT;
37using PeersAtPlay.PapsClient;
38using PeersAtPlay.Util.Logging;
39using PeersAtPlay.P2POverlay.Chord;
40using PeersAtPlay.P2PStorage.MySqlDHT;
41using PeersAtPlay.P2PStorage.WebDHT;
42
43/* TODO:
44 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
45 * - Testing asynchronous methods incl. EventHandlers
46 */
47
48namespace Cryptool.P2P.Internal
49{
50    /// <summary>
51    ///   Wrapper class to integrate peer@play environment into CrypTool.
52    ///   This class synchronizes asynchronous methods for easier usage in CT2.
53    /// </summary>
54    public class P2PBase
55    {
56        #region Variables
57
58        private AutoResetEvent systemJoined;
59        private readonly AutoResetEvent systemLeft;
60        private IBootstrapper bootstrapper;
61        private IP2PLinkManager linkmanager;
62        private P2POverlay overlay;
63        internal IDHT Dht;
64        internal IVersionedDHT VersionedDht;
65
66        /// <summary>
67        ///   True if system was successfully joined, false if system is COMPLETELY left
68        /// </summary>
69        public bool IsConnected { get; private set; }
70
71        /// <summary>
72        ///   True if the underlying peer to peer system has been fully initialized
73        /// </summary>
74        public bool IsInitialized { get; private set; }
75
76        #endregion
77
78        #region Delegates
79
80        public event P2PMessageReceived OnP2PMessageReceived;
81        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
82
83        public event SystemJoined OnSystemJoined;
84        public delegate void SystemJoined();
85
86        public event SystemLeft OnSystemLeft;
87        public delegate void SystemLeft();
88
89        #endregion
90
91        public P2PBase()
92        {
93            IsConnected = false;
94            IsInitialized = false;
95
96            systemJoined = new AutoResetEvent(false);
97            systemLeft = new AutoResetEvent(false);
98        }
99
100        #region Basic P2P Methods (Init, Start, Stop)
101
102        /// <summary>
103        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
104        /// </summary>
105        public void Initialize()
106        {
107            Scheduler scheduler = new STAScheduler("pap_snal");
108            Scheduler scheduler_2 = new STAScheduler("pap_mysql");
109
110            switch (P2PSettings.Default.LinkManager)
111            {
112                case P2PLinkManagerType.Snal:
113                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
114
115                    // NAT-Traversal stuff needs a different Snal-Version
116                    linkmanager = new Snal(scheduler);
117
118                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
119                    settings.LoadDefaults();
120                    settings.ConnectInternal = true;
121                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
122                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
123                    settings.NoDelay = false;
124                    settings.ReuseAddress = false;
125                    settings.UseNetworkMonitorServer = true;
126                    settings.CloseConnectionAfterPingTimeout = true;
127
128                    settings.FragmentMessages = true;
129                    settings.FragmentMessageSize = 10*1024;
130
131                    linkmanager.Settings = settings;
132                    linkmanager.ApplicationType = ApplicationType.CrypTool;
133
134                    break;
135                default:
136                    throw new NotImplementedException();
137            }
138
139            switch (P2PSettings.Default.Bootstrapper)
140            {
141                case P2PBootstrapperType.LocalMachineBootstrapper:
142                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
143                    bootstrapper = new LocalMachineBootstrapper();
144                    break;
145                case P2PBootstrapperType.IrcBootstrapper:
146                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
147                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
148                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
149
150                    bootstrapper = new IrcBootstrapper(scheduler);
151                    break;
152                case P2PBootstrapperType.DnsBootstrapper:
153                    bootstrapper = new DnsBootstrapper();
154                    break;
155                default:
156                    throw new NotImplementedException();
157            }
158
159            try
160            {
161                switch (P2PSettings.Default.Architecture)
162                {
163                    case P2PArchitecture.FullMesh:
164                        overlay = new FullMeshOverlay(scheduler);
165                        Dht = new FullMeshDHT(scheduler);
166                        break;
167                    case P2PArchitecture.Chord:
168                        overlay = new ChordNGCore(scheduler);
169                        Dht = (IDHT) overlay;
170                        break;
171                    case P2PArchitecture.Server:
172                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerHost = P2PSettings.Default.ServerHost;
173                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerPort = P2PSettings.Default.ServerPort;
174                        bootstrapper = new LocalMachineBootstrapper();
175                        overlay = new PapsClientOverlay();
176                        Dht = new PapsClientDht(scheduler_2);
177                        break;
178                    case P2PArchitecture.SQLDB:
179                        Dht = new MySqlDHT(scheduler_2);
180                        break;
181                    case P2PArchitecture.WebDHT:
182                        Dht = new WebDHT(scheduler_2);
183                        break;
184                    default:
185                        throw new NotImplementedException();
186                }
187            }
188            catch(Exception e)
189            {
190                P2PManager.GuiLogMessage("Error initializing P2P network: " + e.Message, NotificationLevel.Error);
191                return;
192            }
193
194            if (overlay != null)
195            {
196                overlay.MessageReceived += OverlayMessageReceived;
197            }
198            Dht.SystemJoined += OnDhtSystemJoined;
199            Dht.SystemLeft += OnDhtSystemLeft;
200
201            VersionedDht = (IVersionedDHT) Dht;
202
203            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
204                                        NotificationLevel.Info);
205            IsInitialized = true;
206            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, overlay,
207                           bootstrapper, linkmanager, null);
208        }
209
210        /// <summary>
211        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
212        ///   inclusive creating the and bootstrapping to the P2P network.
213        ///   In either case joining the P2P world.
214        ///   This synchronized method returns true not before the peer has
215        ///   successfully joined the network (this may take one or two minutes).
216        /// </summary>
217        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
218        /// After validating the settings, this can be done by calling Initialize().</exception>
219        /// <returns>True, if the peer has completely joined the p2p network</returns>
220        public bool SynchStart()
221        {
222            if (!IsInitialized)
223            {
224                throw new InvalidOperationException("Peer-to-peer is not initialized.");
225            }
226
227            if (IsConnected)
228            {
229                return true;
230            }
231
232            try
233            {
234                Dht.BeginStart(BeginStartEventHandler);
235
236                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
237                systemJoined.WaitOne();
238                P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
239            }
240            catch (Exception e)
241            {
242                e.GetBaseException();
243            }
244
245            return true;
246        }
247
248        private void BeginStartEventHandler(DHTEventArgs eventArgs)
249        {
250            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
251        }
252
253        /// <summary>
254        ///   Disconnects from the peer-to-peer system.
255        /// </summary>
256        /// <returns>True, if the peer has completely left the p2p network</returns>
257        public bool SynchStop()
258        {
259            if (Dht == null) return false;
260
261            Dht.BeginStop(null);
262
263            if (IsConnected)
264                systemLeft.WaitOne();
265
266            systemJoined.Reset();
267            systemLeft.Reset();
268
269            return true;
270        }
271
272        #endregion
273
274        #region Peer related method (GetPeerId, Send message to peer)
275
276        /// <summary>
277        ///   Get PeerName of the actual peer
278        /// </summary>
279        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
280        /// <returns>PeerID as a String</returns>
281        public PeerId GetPeerId(out string sPeerName)
282        {
283            sPeerName = linkmanager.UserName;
284            return new PeerId(overlay.LocalAddress);
285        }
286
287        /// <summary>
288        ///   Construct PeerId object for a specific byte[] id
289        /// </summary>
290        /// <param name = "byteId">overlay address as byte array</param>
291        /// <returns>corresponding PeerId for given byte[] id</returns>
292        public PeerId GetPeerId(byte[] byteId)
293        {
294            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
295            return new PeerId(overlay.GetAddress(byteId));
296        }
297
298        // overlay.LocalAddress = Overlay-Peer-Address/Names
299        public void SendToPeer(byte[] data, byte[] destinationPeer)
300        {
301            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
302            var realStackSize = overlay.GetHeaderSize() + data.Length;
303
304            var stackData = new ByteStack(realStackSize);
305            stackData.Push(data);
306
307            var destinationAddr = overlay.GetAddress(destinationPeer);
308            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
309                                                overlay.LocalAddress, destinationAddr, stackData);
310
311            overlay.Send(overlayMsg);
312        }
313
314        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
315        {
316            if (OnP2PMessageReceived == null) return;
317
318            var pid = new PeerId(e.Message.Source);
319            /* You have to fire this event asynchronous, because the main
320                 * thread will be stopped in this wrapper class for synchronizing
321                 * the asynchronous stuff (AutoResetEvent) --> so this could run
322                 * into a deadlock, when you fire this event synchronous (normal Invoke)
323                 * ATTENTION: This could change the invocation order!!! In my case
324                              no problem, but maybe in future cases... */
325
326            // TODO: not safe: The delegate must have only one target
327            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
328
329            foreach (var del in OnP2PMessageReceived.GetInvocationList())
330            {
331                var data = e.Message.Data.ToArray();
332                if (e.Message.Data.CurrentStackSize != 0)
333                    data = e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize);
334
335                del.DynamicInvoke(pid, data);
336            }
337        }
338
339        #endregion
340
341        #region Event Handling (System Joined, Left and Message Received)
342
343        private void OnDhtSystemJoined(object sender, EventArgs e)
344        {
345            IsConnected = true;
346
347            if (OnSystemJoined != null)
348                OnSystemJoined();
349
350            systemJoined.Set();
351        }
352
353        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
354        {
355            IsConnected = false;
356            IsInitialized = false;
357
358            // Allow new connection to start and check for waiting / blocked tasks
359            // TODO reset running ConnectionWorkers?
360            systemLeft.Set();
361            systemJoined.Set();
362
363            LogToMonitor("CrypP2P left the system.");
364
365            if (OnSystemLeft != null)
366                OnSystemLeft();
367        }
368
369        #endregion
370
371        #region Synchronous Methods + their Callbacks
372
373        /// <summary>
374        ///   Stores a value in the DHT at the given key
375        /// </summary>
376        /// <param name = "key">Key of DHT Entry</param>
377        /// <param name = "value">Value of DHT Entry</param>
378        /// <returns>True, when storing is completed!</returns>
379        public RequestResult SynchStore(string key, string value)
380        {
381            return SynchStore(key, Encoding.UTF8.GetBytes(value));
382        }
383
384        /// <summary>
385        ///   Stores a value in the DHT at the given key
386        /// </summary>
387        /// <param name = "key">Key of DHT Entry</param>
388        /// <param name = "data">Value of DHT Entry</param>
389        /// <returns>True, when storing is completed!</returns>
390        public RequestResult SynchStore(string key, byte[] data)
391        {
392            var autoResetEvent = new AutoResetEvent(false);
393
394            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
395            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + (data != null ? data.Length : 0) + " bytes");
396
397            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
398            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
399
400            // blocking till response
401            autoResetEvent.WaitOne();
402
403            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
404
405            return requestResult;
406        }
407
408        /// <summary>
409        ///   Callback for a the synchronized store method
410        /// </summary>
411        /// <param name = "storeResult">retrieved data container</param>
412        private static void OnSynchStoreCompleted(StoreResult storeResult)
413        {
414            var requestResult = storeResult.AsyncState as RequestResult;
415            if (requestResult == null)
416            {
417                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
418                return;
419            }
420
421            requestResult.Parse(storeResult);
422
423            // unblock WaitHandle in the synchronous method
424            requestResult.WaitHandle.Set();
425        }
426
427        /// <summary>
428        ///   Get the value of the given DHT Key or null, if it doesn't exist.
429        /// </summary>
430        /// <param name = "key">Key of DHT Entry</param>
431        /// <returns>Value of DHT Entry</returns>
432        public RequestResult SynchRetrieve(string key)
433        {
434            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
435
436            var autoResetEvent = new AutoResetEvent(false);
437            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key};
438
439            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
440
441            // blocking till response
442            autoResetEvent.WaitOne();
443
444            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
445
446            return requestResult;
447        }
448
449        /// <summary>
450        ///   Callback for a the synchronized retrieval method
451        /// </summary>
452        /// <param name = "retrieveResult"></param>
453        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
454        {
455            var requestResult = retrieveResult.AsyncState as RequestResult;
456            if (requestResult == null)
457            {
458                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
459                return;
460            }
461
462            requestResult.Parse(retrieveResult);
463
464            // unblock WaitHandle in the synchronous method
465            requestResult.WaitHandle.Set();
466        }
467
468        /// <summary>
469        ///   Removes a key/value pair out of the DHT
470        /// </summary>
471        /// <param name = "key">Key of the DHT Entry</param>
472        /// <returns>True, when removing is completed!</returns>
473        public RequestResult SynchRemove(string key)
474        {
475            LogToMonitor("Begin SynchRemove. Key: " + key);
476
477            var autoResetEvent = new AutoResetEvent(false);
478            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
479            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
480
481            // blocking till response
482            autoResetEvent.WaitOne();
483
484            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
485
486            return requestResult;
487        }
488
489        /// <summary>
490        ///   Callback for a the synchronized remove method
491        /// </summary>
492        /// <param name = "removeResult"></param>
493        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
494        {
495            var requestResult = removeResult.AsyncState as RequestResult;
496            if (requestResult == null)
497            {
498                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
499                return;
500            }
501
502            requestResult.Parse(removeResult);
503
504            // unblock WaitHandle in the synchronous method
505            requestResult.WaitHandle.Set();
506        }
507
508        #endregion
509
510        #region Statistic Methods
511
512        public long TotalBytesSentOnAllLinks()
513        {
514            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
515        }
516
517        public long TotalBytesReceivedOnAllLinks()
518        {
519            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
520        }
521
522        #endregion
523
524        #region Log facility
525
526        /// <summary>
527        ///   To log the internal state in the Monitoring Software of P@play
528        /// </summary>
529        public void LogInternalState()
530        {
531            if (Dht != null)
532            {
533                Dht.LogInternalState();
534            }
535        }
536
537        private static void LogToMonitor(string sTextToLog)
538        {
539            if (P2PSettings.Default.Log2Monitor)
540                Log.Debug(sTextToLog);
541        }
542
543        #endregion
544    }
545}
Note: See TracBrowser for help on using the repository browser.