source: trunk/CrypP2P/Internal/P2PBase.cs @ 2360

Last change on this file since 2360 was 2360, checked in by Matthäus Wander, 11 years ago

removed SQLDB from trunk

File size: 20.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Linq;
20using System.Text;
21using System.Threading;
22using Cryptool.PluginBase;
23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
30using PeersAtPlay.P2POverlay.Bootstrapper;
31using PeersAtPlay.P2POverlay.Bootstrapper.DnsBootstrapper;
32using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
33using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
34using PeersAtPlay.P2POverlay.FullMeshOverlay;
35using PeersAtPlay.P2PStorage.DHT;
36using PeersAtPlay.P2PStorage.FullMeshDHT;
37using PeersAtPlay.PapsClient;
38using PeersAtPlay.Util.Logging;
39using PeersAtPlay.P2POverlay.Chord;
40using PeersAtPlay.P2PStorage.WebDHT;
41
42/* TODO:
43 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
44 * - Testing asynchronous methods incl. EventHandlers
45 */
46
47namespace Cryptool.P2P.Internal
48{
49    /// <summary>
50    ///   Wrapper class to integrate peer@play environment into CrypTool.
51    ///   This class synchronizes asynchronous methods for easier usage in CT2.
52    /// </summary>
53    public class P2PBase
54    {
55        #region Variables
56
57        private AutoResetEvent systemJoined;
58        private readonly AutoResetEvent systemLeft;
59        private IBootstrapper bootstrapper;
60        private IP2PLinkManager linkmanager;
61        private P2POverlay overlay;
62        internal IDHT Dht;
63        internal IVersionedDHT VersionedDht;
64
65        /// <summary>
66        ///   True if system was successfully joined, false if system is COMPLETELY left
67        /// </summary>
68        public bool IsConnected { get; private set; }
69
70        /// <summary>
71        ///   True if the underlying peer to peer system has been fully initialized
72        /// </summary>
73        public bool IsInitialized { get; private set; }
74
75        #endregion
76
77        #region Delegates
78
79        public event P2PMessageReceived OnP2PMessageReceived;
80        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
81
82        public event SystemJoined OnSystemJoined;
83        public delegate void SystemJoined();
84
85        public event SystemLeft OnSystemLeft;
86        public delegate void SystemLeft();
87
88        #endregion
89
90        public P2PBase()
91        {
92            IsConnected = false;
93            IsInitialized = false;
94
95            systemJoined = new AutoResetEvent(false);
96            systemLeft = new AutoResetEvent(false);
97        }
98
99        #region Basic P2P Methods (Init, Start, Stop)
100
101        /// <summary>
102        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
103        /// </summary>
104        public void Initialize()
105        {
106            Scheduler scheduler = new STAScheduler("pap_snal");
107            Scheduler scheduler_2 = new STAScheduler("pap_mysql");
108
109            switch (P2PSettings.Default.LinkManager)
110            {
111                case P2PLinkManagerType.Snal:
112                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
113
114                    // NAT-Traversal stuff needs a different Snal-Version
115                    linkmanager = new Snal(scheduler);
116
117                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
118                    settings.LoadDefaults();
119                    settings.ConnectInternal = true;
120                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
121                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
122                    settings.NoDelay = false;
123                    settings.ReuseAddress = false;
124                    settings.UseNetworkMonitorServer = true;
125                    settings.CloseConnectionAfterPingTimeout = true;
126
127                    settings.FragmentMessages = true;
128                    settings.FragmentMessageSize = 10*1024;
129
130                    linkmanager.Settings = settings;
131                    linkmanager.ApplicationType = ApplicationType.CrypTool;
132
133                    break;
134                default:
135                    throw new NotImplementedException();
136            }
137
138            switch (P2PSettings.Default.Bootstrapper)
139            {
140                case P2PBootstrapperType.LocalMachineBootstrapper:
141                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
142                    bootstrapper = new LocalMachineBootstrapper();
143                    break;
144                case P2PBootstrapperType.IrcBootstrapper:
145                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
146                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
147                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
148
149                    bootstrapper = new IrcBootstrapper(scheduler);
150                    break;
151                case P2PBootstrapperType.DnsBootstrapper:
152                    bootstrapper = new DnsBootstrapper();
153                    break;
154                default:
155                    throw new NotImplementedException();
156            }
157
158            try
159            {
160                switch (P2PSettings.Default.Architecture)
161                {
162                    case P2PArchitecture.FullMesh:
163                        overlay = new FullMeshOverlay(scheduler);
164                        Dht = new FullMeshDHT(scheduler);
165                        break;
166                    case P2PArchitecture.Chord:
167                        overlay = new ChordNGCore(scheduler);
168                        Dht = (IDHT) overlay;
169                        break;
170                    case P2PArchitecture.Server:
171                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerHost = P2PSettings.Default.ServerHost;
172                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerPort = P2PSettings.Default.ServerPort;
173                        bootstrapper = new LocalMachineBootstrapper();
174                        overlay = new PapsClientOverlay();
175                        Dht = new PapsClientDht(scheduler_2);
176                        break;
177                    case P2PArchitecture.WebDHT:
178                        Dht = new WebDHT(scheduler_2);
179                        break;
180                    default:
181                        throw new NotImplementedException();
182                }
183            }
184            catch(Exception e)
185            {
186                P2PManager.GuiLogMessage("Error initializing P2P network: " + e.Message, NotificationLevel.Error);
187                return;
188            }
189
190            if (overlay != null)
191            {
192                overlay.MessageReceived += OverlayMessageReceived;
193            }
194            Dht.SystemJoined += OnDhtSystemJoined;
195            Dht.SystemLeft += OnDhtSystemLeft;
196
197            VersionedDht = (IVersionedDHT) Dht;
198
199            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
200                                        NotificationLevel.Info);
201            IsInitialized = true;
202            Dht.Initialize(P2PSettings.Default.PeerName, P2PSettings.Default.Password, P2PSettings.Default.WorldName, overlay,
203                           bootstrapper, linkmanager, null);
204        }
205
206        /// <summary>
207        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
208        ///   inclusive creating the and bootstrapping to the P2P network.
209        ///   In either case joining the P2P world.
210        ///   This synchronized method returns true not before the peer has
211        ///   successfully joined the network (this may take one or two minutes).
212        /// </summary>
213        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
214        /// After validating the settings, this can be done by calling Initialize().</exception>
215        /// <returns>True, if the peer has completely joined the p2p network</returns>
216        public bool SynchStart()
217        {
218            if (!IsInitialized)
219            {
220                throw new InvalidOperationException("Peer-to-peer is not initialized.");
221            }
222
223            if (IsConnected)
224            {
225                return true;
226            }
227
228            try
229            {
230                Dht.BeginStart(BeginStartEventHandler);
231
232                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
233                systemJoined.WaitOne();
234                P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
235            }
236            catch (Exception e)
237            {
238                e.GetBaseException();
239            }
240
241            return true;
242        }
243
244        private void BeginStartEventHandler(DHTEventArgs eventArgs)
245        {
246            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
247        }
248
249        /// <summary>
250        ///   Disconnects from the peer-to-peer system.
251        /// </summary>
252        /// <returns>True, if the peer has completely left the p2p network</returns>
253        public bool SynchStop()
254        {
255            if (Dht == null) return false;
256
257            Dht.BeginStop(null);
258
259            if (IsConnected)
260                systemLeft.WaitOne();
261
262            systemJoined.Reset();
263            systemLeft.Reset();
264
265            return true;
266        }
267
268        #endregion
269
270        #region Peer related method (GetPeerId, Send message to peer)
271
272        /// <summary>
273        ///   Get PeerName of the actual peer
274        /// </summary>
275        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
276        /// <returns>PeerID as a String</returns>
277        public PeerId GetPeerId(out string sPeerName)
278        {
279            sPeerName = linkmanager.UserName;
280            return new PeerId(overlay.LocalAddress);
281        }
282
283        /// <summary>
284        ///   Construct PeerId object for a specific byte[] id
285        /// </summary>
286        /// <param name = "byteId">overlay address as byte array</param>
287        /// <returns>corresponding PeerId for given byte[] id</returns>
288        public PeerId GetPeerId(byte[] byteId)
289        {
290            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
291            return new PeerId(overlay.GetAddress(byteId));
292        }
293
294        // overlay.LocalAddress = Overlay-Peer-Address/Names
295        public void SendToPeer(byte[] data, byte[] destinationPeer)
296        {
297            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
298            var realStackSize = overlay.GetHeaderSize() + data.Length;
299
300            var stackData = new ByteStack(realStackSize);
301            stackData.Push(data);
302
303            var destinationAddr = overlay.GetAddress(destinationPeer);
304            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
305                                                overlay.LocalAddress, destinationAddr, stackData);
306
307            overlay.Send(overlayMsg);
308        }
309
310        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
311        {
312            if (OnP2PMessageReceived == null) return;
313
314            var pid = new PeerId(e.Message.Source);
315            /* You have to fire this event asynchronous, because the main
316                 * thread will be stopped in this wrapper class for synchronizing
317                 * the asynchronous stuff (AutoResetEvent) --> so this could run
318                 * into a deadlock, when you fire this event synchronous (normal Invoke)
319                 * ATTENTION: This could change the invocation order!!! In my case
320                              no problem, but maybe in future cases... */
321
322            // TODO: not safe: The delegate must have only one target
323            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
324
325            foreach (var del in OnP2PMessageReceived.GetInvocationList())
326            {
327                var data = e.Message.Data.ToArray();
328                if (e.Message.Data.CurrentStackSize != 0)
329                    data = e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize);
330
331                del.DynamicInvoke(pid, data);
332            }
333        }
334
335        #endregion
336
337        #region Event Handling (System Joined, Left and Message Received)
338
339        private void OnDhtSystemJoined(object sender, EventArgs e)
340        {
341            IsConnected = true;
342
343            if (OnSystemJoined != null)
344                OnSystemJoined();
345
346            systemJoined.Set();
347        }
348
349        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
350        {
351            IsConnected = false;
352            IsInitialized = false;
353
354            // Allow new connection to start and check for waiting / blocked tasks
355            // TODO reset running ConnectionWorkers?
356            systemLeft.Set();
357            systemJoined.Set();
358
359            LogToMonitor("CrypP2P left the system.");
360
361            if (OnSystemLeft != null)
362                OnSystemLeft();
363        }
364
365        #endregion
366
367        #region Synchronous Methods + their Callbacks
368
369        /// <summary>
370        ///   Stores a value in the DHT at the given key
371        /// </summary>
372        /// <param name = "key">Key of DHT Entry</param>
373        /// <param name = "value">Value of DHT Entry</param>
374        /// <returns>True, when storing is completed!</returns>
375        public RequestResult SynchStore(string key, string value)
376        {
377            return SynchStore(key, Encoding.UTF8.GetBytes(value));
378        }
379
380        /// <summary>
381        ///   Stores a value in the DHT at the given key
382        /// </summary>
383        /// <param name = "key">Key of DHT Entry</param>
384        /// <param name = "data">Value of DHT Entry</param>
385        /// <returns>True, when storing is completed!</returns>
386        public RequestResult SynchStore(string key, byte[] data)
387        {
388            var autoResetEvent = new AutoResetEvent(false);
389
390            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
391            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + (data != null ? data.Length : 0) + " bytes");
392
393            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
394            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
395
396            // blocking till response
397            autoResetEvent.WaitOne();
398
399            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
400
401            return requestResult;
402        }
403
404        /// <summary>
405        ///   Callback for a the synchronized store method
406        /// </summary>
407        /// <param name = "storeResult">retrieved data container</param>
408        private static void OnSynchStoreCompleted(StoreResult storeResult)
409        {
410            var requestResult = storeResult.AsyncState as RequestResult;
411            if (requestResult == null)
412            {
413                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
414                return;
415            }
416
417            requestResult.Parse(storeResult);
418
419            // unblock WaitHandle in the synchronous method
420            requestResult.WaitHandle.Set();
421        }
422
423        /// <summary>
424        ///   Get the value of the given DHT Key or null, if it doesn't exist.
425        /// </summary>
426        /// <param name = "key">Key of DHT Entry</param>
427        /// <returns>Value of DHT Entry</returns>
428        public RequestResult SynchRetrieve(string key)
429        {
430            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
431
432            var autoResetEvent = new AutoResetEvent(false);
433            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key};
434
435            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
436
437            // blocking till response
438            autoResetEvent.WaitOne();
439
440            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
441
442            return requestResult;
443        }
444
445        /// <summary>
446        ///   Callback for a the synchronized retrieval method
447        /// </summary>
448        /// <param name = "retrieveResult"></param>
449        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
450        {
451            var requestResult = retrieveResult.AsyncState as RequestResult;
452            if (requestResult == null)
453            {
454                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
455                return;
456            }
457
458            requestResult.Parse(retrieveResult);
459
460            // unblock WaitHandle in the synchronous method
461            requestResult.WaitHandle.Set();
462        }
463
464        /// <summary>
465        ///   Removes a key/value pair out of the DHT
466        /// </summary>
467        /// <param name = "key">Key of the DHT Entry</param>
468        /// <returns>True, when removing is completed!</returns>
469        public RequestResult SynchRemove(string key)
470        {
471            LogToMonitor("Begin SynchRemove. Key: " + key);
472
473            var autoResetEvent = new AutoResetEvent(false);
474            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
475            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
476
477            // blocking till response
478            autoResetEvent.WaitOne();
479
480            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
481
482            return requestResult;
483        }
484
485        /// <summary>
486        ///   Callback for a the synchronized remove method
487        /// </summary>
488        /// <param name = "removeResult"></param>
489        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
490        {
491            var requestResult = removeResult.AsyncState as RequestResult;
492            if (requestResult == null)
493            {
494                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
495                return;
496            }
497
498            requestResult.Parse(removeResult);
499
500            // unblock WaitHandle in the synchronous method
501            requestResult.WaitHandle.Set();
502        }
503
504        #endregion
505
506        #region Statistic Methods
507
508        public long TotalBytesSentOnAllLinks()
509        {
510            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
511        }
512
513        public long TotalBytesReceivedOnAllLinks()
514        {
515            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
516        }
517
518        #endregion
519
520        #region Log facility
521
522        /// <summary>
523        ///   To log the internal state in the Monitoring Software of P@play
524        /// </summary>
525        public void LogInternalState()
526        {
527            if (Dht != null)
528            {
529                Dht.LogInternalState();
530            }
531        }
532
533        private static void LogToMonitor(string sTextToLog)
534        {
535            if (P2PSettings.Default.Log2Monitor)
536                Log.Debug(sTextToLog);
537        }
538
539        #endregion
540    }
541}
Note: See TracBrowser for help on using the repository browser.