source: trunk/CrypP2P/Internal/P2PBase.cs @ 2020

Last change on this file since 2020 was 2020, checked in by Christian Geihe, 11 years ago

Added MySQLDHT as a new architecture to the P2P-Interface expert settings tab as well as all necessary dlls.

override-bad-extension: MySql.Data.dll
override-bad-extension: MySQLDHT.dll
override-bad-extension: MySQLDHT.dll.config
override-bad-extension: MySQLDHT.pdb

File size: 20.7 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Linq;
20using System.Text;
21using System.Threading;
22using Cryptool.PluginBase;
23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
30using PeersAtPlay.P2POverlay.Bootstrapper;
31using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
32using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
33using PeersAtPlay.P2POverlay.FullMeshOverlay;
34using PeersAtPlay.P2PStorage.DHT;
35using PeersAtPlay.P2PStorage.FullMeshDHT;
36using PeersAtPlay.PapsClient;
37using PeersAtPlay.Util.Logging;
38using PeersAtPlay.P2POverlay.Chord;
39using PeersAtPlay.P2PStorage.MySqlDHT;
40
41/* TODO:
42 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
43 * - Testing asynchronous methods incl. EventHandlers
44 */
45
46namespace Cryptool.P2P.Internal
47{
48    /// <summary>
49    ///   Wrapper class to integrate peer@play environment into CrypTool.
50    ///   This class synchronizes asynchronous methods for easier usage in CT2.
51    /// </summary>
52    public class P2PBase
53    {
54        #region Variables
55
56        private AutoResetEvent systemJoined;
57        private readonly AutoResetEvent systemLeft;
58        private IBootstrapper bootstrapper;
59        private IP2PLinkManager linkmanager;
60        private P2POverlay overlay;
61        internal IDHT Dht;
62        internal IVersionedDHT VersionedDht;
63
64        /// <summary>
65        ///   True if system was successfully joined, false if system is COMPLETELY left
66        /// </summary>
67        public bool IsConnected { get; private set; }
68
69        /// <summary>
70        ///   True if the underlying peer to peer system has been fully initialized
71        /// </summary>
72        public bool IsInitialized { get; private set; }
73
74        #endregion
75
76        #region Delegates
77
78        public event P2PMessageReceived OnP2PMessageReceived;
79        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
80
81        public event SystemJoined OnSystemJoined;
82        public delegate void SystemJoined();
83
84        public event SystemLeft OnSystemLeft;
85        public delegate void SystemLeft();
86
87        #endregion
88
89        public P2PBase()
90        {
91            IsConnected = false;
92            IsInitialized = false;
93
94            systemJoined = new AutoResetEvent(false);
95            systemLeft = new AutoResetEvent(false);
96        }
97
98        #region Basic P2P Methods (Init, Start, Stop)
99
100        /// <summary>
101        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
102        /// </summary>
103        public void Initialize()
104        {
105            Scheduler scheduler = new STAScheduler("pap_snal");
106            Scheduler scheduler_2 = new STAScheduler("pap_mysql");
107
108            switch (P2PSettings.Default.LinkManager)
109            {
110                case P2PLinkManagerType.Snal:
111                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
112
113                    // NAT-Traversal stuff needs a different Snal-Version
114                    linkmanager = new Snal(scheduler);
115
116                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
117                    settings.LoadDefaults();
118                    settings.ConnectInternal = true;
119                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
120                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
121                    settings.NoDelay = false;
122                    settings.ReuseAddress = false;
123                    settings.UseNetworkMonitorServer = true;
124                    settings.CloseConnectionAfterPingTimeout = true;
125
126                    settings.FragmentMessages = true;
127                    settings.FragmentMessageSize = 10*1024;
128                       
129                    switch(P2PSettings.Default.TransportProtocol)
130                    {
131                        case P2PTransportProtocol.UDP:
132                            settings.TransportProtocol = TransportProtocol.UDP;
133                            break;
134                        case P2PTransportProtocol.TCP_UDP:
135                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
136                            break;
137                        default:
138                            settings.TransportProtocol = TransportProtocol.TCP;
139                            break;
140                    }
141
142                    linkmanager.Settings = settings;
143                    linkmanager.ApplicationType = ApplicationType.CrypTool;
144
145                    break;
146                default:
147                    throw new NotImplementedException();
148            }
149
150            switch (P2PSettings.Default.Bootstrapper)
151            {
152                case P2PBootstrapperType.LocalMachineBootstrapper:
153                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
154                    bootstrapper = new LocalMachineBootstrapper();
155                    break;
156                case P2PBootstrapperType.IrcBootstrapper:
157                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
158                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
159                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
160
161                    bootstrapper = new IrcBootstrapper(scheduler);
162                    break;
163                default:
164                    throw new NotImplementedException();
165            }
166
167            try
168            {
169                switch (P2PSettings.Default.Architecture)
170                {
171                    case P2PArchitecture.FullMesh:
172                        overlay = new FullMeshOverlay(scheduler);
173                        Dht = new FullMeshDHT(scheduler);
174                        break;
175                    case P2PArchitecture.Chord:
176                        overlay = new ChordNGCore(scheduler);
177                        Dht = (IDHT) overlay;
178                        break;
179                    case P2PArchitecture.Server:
180                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerHost = P2PSettings.Default.ServerHost;
181                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerPort = P2PSettings.Default.ServerPort;
182                        bootstrapper = new LocalMachineBootstrapper();
183                        overlay = new PapsClientOverlay();
184                        Dht = new PapsClientDht(scheduler_2);
185                        break;
186                    case P2PArchitecture.SQLDB:
187                        Dht = new MySqlDHT(scheduler_2);
188                        break;
189                    default:
190                        throw new NotImplementedException();
191                }
192            }
193            catch(Exception e)
194            {
195                P2PManager.GuiLogMessage("Error initializing P2P network: " + e.Message, NotificationLevel.Error);
196                return;
197            }
198
199            if (overlay != null)
200            {
201                overlay.MessageReceived += OverlayMessageReceived;
202            }
203            Dht.SystemJoined += OnDhtSystemJoined;
204            Dht.SystemLeft += OnDhtSystemLeft;
205
206            VersionedDht = (IVersionedDHT) Dht;
207
208            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
209                                        NotificationLevel.Info);
210            IsInitialized = true;
211            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, overlay,
212                           bootstrapper, linkmanager, null);
213        }
214
215        /// <summary>
216        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
217        ///   inclusive creating the and bootstrapping to the P2P network.
218        ///   In either case joining the P2P world.
219        ///   This synchronized method returns true not before the peer has
220        ///   successfully joined the network (this may take one or two minutes).
221        /// </summary>
222        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
223        /// After validating the settings, this can be done by calling Initialize().</exception>
224        /// <returns>True, if the peer has completely joined the p2p network</returns>
225        public bool SynchStart()
226        {
227            if (!IsInitialized)
228            {
229                throw new InvalidOperationException("Peer-to-peer is not initialized.");
230            }
231
232            if (IsConnected)
233            {
234                return true;
235            }
236
237            try
238            {
239                Dht.BeginStart(BeginStartEventHandler);
240
241                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
242                systemJoined.WaitOne();
243                P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
244            }
245            catch (Exception e)
246            {
247                e.GetBaseException();
248            }
249
250            return true;
251        }
252
253        private void BeginStartEventHandler(DHTEventArgs eventArgs)
254        {
255            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
256        }
257
258        /// <summary>
259        ///   Disconnects from the peer-to-peer system.
260        /// </summary>
261        /// <returns>True, if the peer has completely left the p2p network</returns>
262        public bool SynchStop()
263        {
264            if (Dht == null) return false;
265
266            Dht.BeginStop(null);
267
268            if (IsConnected)
269                systemLeft.WaitOne();
270
271            systemJoined.Reset();
272            systemLeft.Reset();
273
274            return true;
275        }
276
277        #endregion
278
279        #region Peer related method (GetPeerId, Send message to peer)
280
281        /// <summary>
282        ///   Get PeerName of the actual peer
283        /// </summary>
284        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
285        /// <returns>PeerID as a String</returns>
286        public PeerId GetPeerId(out string sPeerName)
287        {
288            sPeerName = linkmanager.UserName;
289            return new PeerId(overlay.LocalAddress);
290        }
291
292        /// <summary>
293        ///   Construct PeerId object for a specific byte[] id
294        /// </summary>
295        /// <param name = "byteId">overlay address as byte array</param>
296        /// <returns>corresponding PeerId for given byte[] id</returns>
297        public PeerId GetPeerId(byte[] byteId)
298        {
299            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
300            return new PeerId(overlay.GetAddress(byteId));
301        }
302
303        // overlay.LocalAddress = Overlay-Peer-Address/Names
304        public void SendToPeer(byte[] data, byte[] destinationPeer)
305        {
306            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
307            var realStackSize = overlay.GetHeaderSize() + data.Length;
308
309            var stackData = new ByteStack(realStackSize);
310            stackData.Push(data);
311
312            var destinationAddr = overlay.GetAddress(destinationPeer);
313            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
314                                                overlay.LocalAddress, destinationAddr, stackData);
315
316            overlay.Send(overlayMsg);
317        }
318
319        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
320        {
321            if (OnP2PMessageReceived == null) return;
322
323            var pid = new PeerId(e.Message.Source);
324            /* You have to fire this event asynchronous, because the main
325                 * thread will be stopped in this wrapper class for synchronizing
326                 * the asynchronous stuff (AutoResetEvent) --> so this could run
327                 * into a deadlock, when you fire this event synchronous (normal Invoke)
328                 * ATTENTION: This could change the invocation order!!! In my case
329                              no problem, but maybe in future cases... */
330
331            // TODO: not safe: The delegate must have only one target
332            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
333
334            foreach (var del in OnP2PMessageReceived.GetInvocationList())
335            {
336                var data = e.Message.Data.ToArray();
337                if (e.Message.Data.CurrentStackSize != 0)
338                    data = e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize);
339
340                del.DynamicInvoke(pid, data);
341            }
342        }
343
344        #endregion
345
346        #region Event Handling (System Joined, Left and Message Received)
347
348        private void OnDhtSystemJoined(object sender, EventArgs e)
349        {
350            IsConnected = true;
351
352            if (OnSystemJoined != null)
353                OnSystemJoined();
354
355            systemJoined.Set();
356        }
357
358        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
359        {
360            if (OnSystemLeft != null)
361                OnSystemLeft();
362
363            IsConnected = false;
364            IsInitialized = false;
365
366            // Allow new connection to start and check for waiting / blocked tasks
367            // TODO reset running ConnectionWorkers?
368            systemLeft.Set();
369            systemJoined.Set();
370
371            LogToMonitor("CrypP2P left the system.");
372        }
373
374        #endregion
375
376        #region Synchronous Methods + their Callbacks
377
378        /// <summary>
379        ///   Stores a value in the DHT at the given key
380        /// </summary>
381        /// <param name = "key">Key of DHT Entry</param>
382        /// <param name = "value">Value of DHT Entry</param>
383        /// <returns>True, when storing is completed!</returns>
384        public RequestResult SynchStore(string key, string value)
385        {
386            return SynchStore(key, Encoding.UTF8.GetBytes(value));
387        }
388
389        /// <summary>
390        ///   Stores a value in the DHT at the given key
391        /// </summary>
392        /// <param name = "key">Key of DHT Entry</param>
393        /// <param name = "data">Value of DHT Entry</param>
394        /// <returns>True, when storing is completed!</returns>
395        public RequestResult SynchStore(string key, byte[] data)
396        {
397            var autoResetEvent = new AutoResetEvent(false);
398
399            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
400            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + (data != null ? data.Length : 0) + " bytes");
401
402            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
403            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
404
405            // blocking till response
406            autoResetEvent.WaitOne();
407
408            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
409
410            return requestResult;
411        }
412
413        /// <summary>
414        ///   Callback for a the synchronized store method
415        /// </summary>
416        /// <param name = "storeResult">retrieved data container</param>
417        private static void OnSynchStoreCompleted(StoreResult storeResult)
418        {
419            var requestResult = storeResult.AsyncState as RequestResult;
420            if (requestResult == null)
421            {
422                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
423                return;
424            }
425
426            requestResult.Parse(storeResult);
427
428            // unblock WaitHandle in the synchronous method
429            requestResult.WaitHandle.Set();
430        }
431
432        /// <summary>
433        ///   Get the value of the given DHT Key or null, if it doesn't exist.
434        /// </summary>
435        /// <param name = "key">Key of DHT Entry</param>
436        /// <returns>Value of DHT Entry</returns>
437        public RequestResult SynchRetrieve(string key)
438        {
439            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
440
441            var autoResetEvent = new AutoResetEvent(false);
442            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key};
443
444            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
445
446            // blocking till response
447            autoResetEvent.WaitOne();
448
449            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
450
451            return requestResult;
452        }
453
454        /// <summary>
455        ///   Callback for a the synchronized retrieval method
456        /// </summary>
457        /// <param name = "retrieveResult"></param>
458        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
459        {
460            var requestResult = retrieveResult.AsyncState as RequestResult;
461            if (requestResult == null)
462            {
463                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
464                return;
465            }
466
467            requestResult.Parse(retrieveResult);
468
469            // unblock WaitHandle in the synchronous method
470            requestResult.WaitHandle.Set();
471        }
472
473        /// <summary>
474        ///   Removes a key/value pair out of the DHT
475        /// </summary>
476        /// <param name = "key">Key of the DHT Entry</param>
477        /// <returns>True, when removing is completed!</returns>
478        public RequestResult SynchRemove(string key)
479        {
480            LogToMonitor("Begin SynchRemove. Key: " + key);
481
482            var autoResetEvent = new AutoResetEvent(false);
483            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
484            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
485
486            // blocking till response
487            autoResetEvent.WaitOne();
488
489            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
490
491            return requestResult;
492        }
493
494        /// <summary>
495        ///   Callback for a the synchronized remove method
496        /// </summary>
497        /// <param name = "removeResult"></param>
498        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
499        {
500            var requestResult = removeResult.AsyncState as RequestResult;
501            if (requestResult == null)
502            {
503                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
504                return;
505            }
506
507            requestResult.Parse(removeResult);
508
509            // unblock WaitHandle in the synchronous method
510            requestResult.WaitHandle.Set();
511        }
512
513        #endregion
514
515        #region Statistic Methods
516
517        public long TotalBytesSentOnAllLinks()
518        {
519            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
520        }
521
522        public long TotalBytesReceivedOnAllLinks()
523        {
524            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
525        }
526
527        #endregion
528
529        #region Log facility
530
531        /// <summary>
532        ///   To log the internal state in the Monitoring Software of P@play
533        /// </summary>
534        public void LogInternalState()
535        {
536            if (Dht != null)
537            {
538                Dht.LogInternalState();
539            }
540        }
541
542        private static void LogToMonitor(string sTextToLog)
543        {
544            if (P2PSettings.Default.Log2Monitor)
545                Log.Debug(sTextToLog);
546        }
547
548        #endregion
549    }
550}
Note: See TracBrowser for help on using the repository browser.