source: trunk/CrypP2P/Internal/P2PBase.cs @ 1645

Last change on this file since 1645 was 1645, checked in by Paul Lelgemann, 11 years ago

+ MaximumMessageSize setting added
o NullPointer in P2P KeySearcher fixed

File size: 20.0 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36
37/* TODO:
38 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
39 * - Testing asynchronous methods incl. EventHandlers
40 */
41
42namespace Cryptool.P2P.Internal
43{
44    /// <summary>
45    ///   Wrapper class to integrate peer@play environment into CrypTool.
46    ///   This class synchronizes asynchronous methods for easier usage in CT2.
47    /// </summary>
48    public class P2PBase
49    {
50        #region Variables
51
52        private readonly AutoResetEvent _systemJoined;
53        private readonly AutoResetEvent _systemLeft;
54        private IBootstrapper _bootstrapper;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        internal IDHT Dht;
58        internal IVersionedDHT VersionedDht;
59
60        /// <summary>
61        ///   True if system was successfully joined, false if system is COMPLETELY left
62        /// </summary>
63        public bool IsConnected { get; private set; }
64
65        /// <summary>
66        ///   True if the underlying peer to peer system has been fully initialized
67        /// </summary>
68        public bool IsInitialized { get; private set; }
69
70        #endregion
71
72        #region Delegates
73
74        public event P2PMessageReceived OnP2PMessageReceived;
75        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
76
77        public event SystemJoined OnSystemJoined;
78        public delegate void SystemJoined();
79
80        public event SystemLeft OnSystemLeft;
81        public delegate void SystemLeft();
82
83        #endregion
84
85        public P2PBase()
86        {
87            IsConnected = false;
88            IsInitialized = false;
89
90            _systemJoined = new AutoResetEvent(false);
91            _systemLeft = new AutoResetEvent(false);
92        }
93
94        #region Basic P2P Methods (Init, Start, Stop)
95
96        /// <summary>
97        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
98        /// </summary>
99        public void Initialize()
100        {
101            Scheduler scheduler = new STAScheduler("pap");
102
103            switch (P2PSettings.Default.LinkManager)
104            {
105                case P2PLinkManagerType.Snal:
106                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
107
108                    // NAT-Traversal stuff needs a different Snal-Version
109                    _linkmanager = new Snal(scheduler);
110
111                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
112                    settings.LoadDefaults();
113                    settings.ConnectInternal = true;
114                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
115                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119                    settings.CloseConnectionAfterPingTimeout = false;
120
121                    settings.MaximumMessageSize = 10485760;
122                    settings.IgnoreMaximumMessageSize = true;
123                       
124                    switch(P2PSettings.Default.TransportProtocol)
125                    {
126                        case P2PTransportProtocol.UDP:
127                            settings.TransportProtocol = TransportProtocol.UDP;
128                            break;
129                        case P2PTransportProtocol.TCP_UDP:
130                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
131                            break;
132                        default:
133                            settings.TransportProtocol = TransportProtocol.TCP;
134                            break;
135                    }
136
137                    _linkmanager.Settings = settings;
138                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
139
140                    break;
141                default:
142                    throw (new NotImplementedException());
143            }
144
145            switch (P2PSettings.Default.Bootstrapper)
146            {
147                case P2PBootstrapperType.LocalMachineBootstrapper:
148                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
149                    _bootstrapper = new LocalMachineBootstrapper();
150                    break;
151                case P2PBootstrapperType.IrcBootstrapper:
152                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
153                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
154                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
155
156                    _bootstrapper = new IrcBootstrapper(scheduler);
157                    break;
158                default:
159                    throw (new NotImplementedException());
160            }
161
162            switch (P2PSettings.Default.Overlay)
163            {
164                case P2POverlayType.FullMeshOverlay:
165                    // changing overlay example: this.overlay = new ChordOverlay();
166                    _overlay = new FullMeshOverlay(scheduler);
167                    break;
168                default:
169                    throw (new NotImplementedException());
170            }
171
172            switch (P2PSettings.Default.Dht)
173            {
174                case P2PDHTType.FullMeshDHT:
175                    Dht = new FullMeshDHT(scheduler);
176                    break;
177                default:
178                    throw (new NotImplementedException());
179            }
180
181            _overlay.MessageReceived += OverlayMessageReceived;
182            Dht.SystemJoined += OnDhtSystemJoined;
183            Dht.SystemLeft += OnDhtSystemLeft;
184
185            VersionedDht = (IVersionedDHT) Dht;
186
187            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
188                                                NotificationLevel.Info);
189            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
190                            _bootstrapper,
191                            _linkmanager, null);
192
193            IsInitialized = true;
194        }
195
196        /// <summary>
197        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
198        ///   inclusive creating the and bootstrapping to the P2P network.
199        ///   In either case joining the P2P world.
200        ///   This synchronized method returns true not before the peer has
201        ///   successfully joined the network (this may take one or two minutes).
202        /// </summary>
203        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
204        /// After validating the settings, this can be done by calling Initialize().</exception>
205        /// <returns>True, if the peer has completely joined the p2p network</returns>
206        public bool SynchStart()
207        {
208            if (!IsInitialized)
209            {
210                throw new InvalidOperationException("Peer-to-peer is not initialized.");
211            }
212
213            if (IsConnected)
214            {
215                return true;
216            }
217
218            Dht.BeginStart(BeginStartEventHandler);
219
220            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
221            _systemJoined.WaitOne();
222            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
223
224            return true;
225        }
226
227        private void BeginStartEventHandler(DHTEventArgs eventArgs)
228        {
229            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
230        }
231
232        /// <summary>
233        ///   Disconnects from the peer-to-peer system.
234        /// </summary>
235        /// <returns>True, if the peer has completely left the p2p network</returns>
236        public bool SynchStop()
237        {
238            if (Dht == null) return false;
239
240            Dht.BeginStop(null);
241
242            if (!IsConnected)
243            {
244                return true;
245            }
246
247            // wait till systemLeft Event is invoked
248            _systemLeft.WaitOne();
249
250            return true;
251        }
252
253        #endregion
254
255        #region Peer related method (GetPeerId, Send message to peer)
256
257        /// <summary>
258        ///   Get PeerName of the actual peer
259        /// </summary>
260        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
261        /// <returns>PeerID as a String</returns>
262        public PeerId GetPeerId(out string sPeerName)
263        {
264            sPeerName = _linkmanager.UserName;
265            return new PeerId(_overlay.LocalAddress);
266        }
267
268        /// <summary>
269        ///   Construct PeerId object for a specific byte[] id
270        /// </summary>
271        /// <param name = "byteId">overlay address as byte array</param>
272        /// <returns>corresponding PeerId for given byte[] id</returns>
273        public PeerId GetPeerId(byte[] byteId)
274        {
275            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
276            return new PeerId(_overlay.GetAddress(byteId));
277        }
278
279        // overlay.LocalAddress = Overlay-Peer-Address/Names
280        public void SendToPeer(byte[] data, byte[] destinationPeer)
281        {
282            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
283            var realStackSize = _overlay.GetHeaderSize() + data.Length;
284
285            var stackData = new ByteStack(realStackSize);
286            stackData.Push(data);
287
288            var destinationAddr = _overlay.GetAddress(destinationPeer);
289            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
290                                                _overlay.LocalAddress, destinationAddr, stackData);
291
292            _overlay.Send(overlayMsg);
293        }
294
295        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
296        {
297            if (OnP2PMessageReceived == null) return;
298
299            var pid = new PeerId(e.Message.Source);
300            /* You have to fire this event asynchronous, because the main
301                 * thread will be stopped in this wrapper class for synchronizing
302                 * the asynchronous stuff (AutoResetEvent) --> so this could run
303                 * into a deadlock, when you fire this event synchronous (normal Invoke)
304                 * ATTENTION: This could change the invocation order!!! In my case
305                              no problem, but maybe in future cases... */
306
307            // TODO: not safe: The delegate must have only one target
308            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
309
310            foreach (var del in OnP2PMessageReceived.GetInvocationList())
311            {
312                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
313            }
314        }
315
316        #endregion
317
318        #region Event Handling (System Joined, Left and Message Received)
319
320        private void OnDhtSystemJoined(object sender, EventArgs e)
321        {
322            if (OnSystemJoined != null)
323                OnSystemJoined();
324
325            _systemJoined.Set();
326            IsConnected = true;
327        }
328
329        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
330        {
331            if (OnSystemLeft != null)
332                OnSystemLeft();
333
334            IsConnected = false;
335            IsInitialized = false;
336
337            // Allow new connection to start and check for waiting / blocked tasks
338            // TODO reset running ConnectionWorkers?
339            _systemLeft.Set();
340            _systemJoined.Set();
341
342            LogToMonitor("CrypP2P left the system.");
343        }
344
345        #endregion
346
347        #region Synchronous Methods + their Callbacks
348
349        /// <summary>
350        ///   Stores a value in the DHT at the given key
351        /// </summary>
352        /// <param name = "key">Key of DHT Entry</param>
353        /// <param name = "data">Value of DHT Entry</param>
354        /// <returns>True, when storing is completed!</returns>
355        public bool SynchStore(string key, byte[] data)
356        {
357            var autoResetEvent = new AutoResetEvent(false);
358
359            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
360            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
361
362            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
363            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
364
365            // blocking till response
366            autoResetEvent.WaitOne();
367
368            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
369
370            return responseWait.success;
371        }
372
373        /// <summary>
374        ///   Stores a value in the DHT at the given key
375        /// </summary>
376        /// <param name = "key">Key of DHT Entry</param>
377        /// <param name = "value">Value of DHT Entry</param>
378        /// <returns>True, when storing is completed!</returns>
379        public bool SynchStore(string key, string value)
380        {
381            return SynchStore(key, Encoding.UTF8.GetBytes(value));
382        }
383
384        /// <summary>
385        ///   Callback for a the synchronized store method
386        /// </summary>
387        /// <param name = "storeResult">retrieved data container</param>
388        private static void OnSynchStoreCompleted(StoreResult storeResult)
389        {
390            var responseWait = storeResult.AsyncState as ResponseWait;
391            if (responseWait == null)
392            {
393                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
394                return;
395            }
396
397            responseWait.success = storeResult.Status == OperationStatus.Success;
398            responseWait.operationStatus = storeResult.Status;
399            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
400
401            // unblock WaitHandle in the synchronous method
402            responseWait.WaitHandle.Set();
403        }
404
405        /// <summary>
406        ///   Get the value of the given DHT Key or null, if it doesn't exist.
407        /// </summary>
408        /// <param name = "key">Key of DHT Entry</param>
409        /// <returns>Value of DHT Entry</returns>
410        public byte[] SynchRetrieve(string key)
411        {
412            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
413
414            var autoResetEvent = new AutoResetEvent(false);
415            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
416            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
417
418            // blocking till response
419            autoResetEvent.WaitOne();
420
421            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
422
423            return responseWait.Message;
424        }
425
426        /// <summary>
427        ///   Callback for a the synchronized retrieval method
428        /// </summary>
429        /// <param name = "retrieveResult"></param>
430        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
431        {
432            var responseWait = retrieveResult.AsyncState as ResponseWait;
433            if (responseWait == null)
434            {
435                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
436                return;
437            }
438
439            switch (retrieveResult.Status)
440            {
441                case OperationStatus.Success:
442                    responseWait.success = true;
443                    responseWait.Message = retrieveResult.Data;
444                    break;
445                case OperationStatus.KeyNotFound:
446                    responseWait.success = true;
447                    responseWait.Message = null;
448                    break;
449                default:
450                    responseWait.success = false;
451                    responseWait.Message = null;
452                    break;
453            }
454
455            responseWait.operationStatus = retrieveResult.Status;
456
457            // unblock WaitHandle in the synchronous method
458            responseWait.WaitHandle.Set();
459        }
460
461        /// <summary>
462        ///   Removes a key/value pair out of the DHT
463        /// </summary>
464        /// <param name = "key">Key of the DHT Entry</param>
465        /// <returns>True, when removing is completed!</returns>
466        public bool SynchRemove(string key)
467        {
468            LogToMonitor("Begin SynchRemove. Key: " + key);
469
470            var autoResetEvent = new AutoResetEvent(false);
471            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
472            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
473
474            // blocking till response
475            autoResetEvent.WaitOne();
476
477            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
478
479            return responseWait.success;
480        }
481
482        /// <summary>
483        ///   Callback for a the synchronized remove method
484        /// </summary>
485        /// <param name = "removeResult"></param>
486        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
487        {
488            var responseWait = removeResult.AsyncState as ResponseWait;
489            if (responseWait == null)
490            {
491                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
492                return;
493            }
494
495            responseWait.success = removeResult.Status != OperationStatus.Failure &&
496                                   removeResult.Status != OperationStatus.VersionMismatch;
497            responseWait.operationStatus = removeResult.Status;
498            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
499
500            // unblock WaitHandle in the synchronous method
501            responseWait.WaitHandle.Set();
502        }
503
504        #endregion
505
506        #region Log facility
507
508        /// <summary>
509        ///   To log the internal state in the Monitoring Software of P@play
510        /// </summary>
511        public void LogInternalState()
512        {
513            if (Dht != null)
514            {
515                Dht.LogInternalState();
516            }
517        }
518
519        private static void LogToMonitor(string sTextToLog)
520        {
521            if (P2PSettings.Default.Log2Monitor)
522                Log.Debug(sTextToLog);
523        }
524
525        #endregion
526    }
527}
Note: See TracBrowser for help on using the repository browser.