source: trunk/CrypP2P/Internal/P2PBase.cs @ 1642

Last change on this file since 1642 was 1642, checked in by Matthäus Wander, 11 years ago

removed unused setting to make ct2 p2p compile against head pap

File size: 20.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36
37/* TODO:
38 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
39 * - Testing asynchronous methods incl. EventHandlers
40 */
41
42namespace Cryptool.P2P.Internal
43{
44    /// <summary>
45    ///   Wrapper class to integrate peer@play environment into CrypTool.
46    ///   This class synchronizes asynchronous methods for easier usage in CT2.
47    /// </summary>
48    public class P2PBase
49    {
50        #region Variables
51
52        private readonly AutoResetEvent _systemJoined;
53        private readonly AutoResetEvent _systemLeft;
54        private IBootstrapper _bootstrapper;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        internal IDHT Dht;
58        internal IVersionedDHT VersionedDht;
59
60        /// <summary>
61        ///   True if system was successfully joined, false if system is COMPLETELY left
62        /// </summary>
63        public bool IsConnected { get; private set; }
64
65        /// <summary>
66        ///   True if the underlying peer to peer system has been fully initialized
67        /// </summary>
68        public bool IsInitialized { get; private set; }
69
70        #endregion
71
72        #region Delegates
73
74        public event P2PMessageReceived OnP2PMessageReceived;
75        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
76
77        public event SystemJoined OnSystemJoined;
78        public delegate void SystemJoined();
79
80        public event SystemLeft OnSystemLeft;
81        public delegate void SystemLeft();
82
83        #endregion
84
85        public P2PBase()
86        {
87            IsConnected = false;
88            IsInitialized = false;
89
90            _systemJoined = new AutoResetEvent(false);
91            _systemLeft = new AutoResetEvent(false);
92        }
93
94        #region Basic P2P Methods (Init, Start, Stop)
95
96        /// <summary>
97        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
98        /// </summary>
99        public void Initialize()
100        {
101            Scheduler scheduler = new STAScheduler("pap");
102
103            switch (P2PSettings.Default.LinkManager)
104            {
105                case P2PLinkManagerType.Snal:
106                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
107
108                    // NAT-Traversal stuff needs a different Snal-Version
109                    _linkmanager = new Snal(scheduler);
110
111                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
112                    settings.LoadDefaults();
113                    settings.ConnectInternal = true;
114                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
115                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119                    settings.CloseConnectionAfterPingTimeout = false;
120                       
121                    switch(P2PSettings.Default.TransportProtocol)
122                    {
123                        case P2PTransportProtocol.UDP:
124                            settings.TransportProtocol = TransportProtocol.UDP;
125                            break;
126                        case P2PTransportProtocol.TCP_UDP:
127                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
128                            break;
129                        default:
130                            settings.TransportProtocol = TransportProtocol.TCP;
131                            break;
132                    }
133
134                    _linkmanager.Settings = settings;
135                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
136
137                    break;
138                default:
139                    throw (new NotImplementedException());
140            }
141
142            switch (P2PSettings.Default.Bootstrapper)
143            {
144                case P2PBootstrapperType.LocalMachineBootstrapper:
145                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
146                    _bootstrapper = new LocalMachineBootstrapper();
147                    break;
148                case P2PBootstrapperType.IrcBootstrapper:
149                    // setup nat traversal stuff
150                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
151                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
152                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
153                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
154
155                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
156                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
157
158                    _bootstrapper = new IrcBootstrapper(scheduler);
159                    break;
160                default:
161                    throw (new NotImplementedException());
162            }
163
164            switch (P2PSettings.Default.Overlay)
165            {
166                case P2POverlayType.FullMeshOverlay:
167                    // changing overlay example: this.overlay = new ChordOverlay();
168                    _overlay = new FullMeshOverlay(scheduler);
169                    break;
170                default:
171                    throw (new NotImplementedException());
172            }
173
174            switch (P2PSettings.Default.Dht)
175            {
176                case P2PDHTType.FullMeshDHT:
177                    Dht = new FullMeshDHT(scheduler);
178                    break;
179                default:
180                    throw (new NotImplementedException());
181            }
182
183            _overlay.MessageReceived += OverlayMessageReceived;
184            Dht.SystemJoined += OnDhtSystemJoined;
185            Dht.SystemLeft += OnDhtSystemLeft;
186
187            VersionedDht = (IVersionedDHT) Dht;
188
189            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
190                                                NotificationLevel.Info);
191            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
192                            _bootstrapper,
193                            _linkmanager, null);
194
195            IsInitialized = true;
196        }
197
198        /// <summary>
199        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
200        ///   inclusive creating the and bootstrapping to the P2P network.
201        ///   In either case joining the P2P world.
202        ///   This synchronized method returns true not before the peer has
203        ///   successfully joined the network (this may take one or two minutes).
204        /// </summary>
205        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
206        /// After validating the settings, this can be done by calling Initialize().</exception>
207        /// <returns>True, if the peer has completely joined the p2p network</returns>
208        public bool SynchStart()
209        {
210            if (!IsInitialized)
211            {
212                throw new InvalidOperationException("Peer-to-peer is not initialized.");
213            }
214
215            if (IsConnected)
216            {
217                return true;
218            }
219
220            Dht.BeginStart(BeginStartEventHandler);
221
222            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
223            _systemJoined.WaitOne();
224            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
225
226            return true;
227        }
228
229        private void BeginStartEventHandler(DHTEventArgs eventArgs)
230        {
231            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
232        }
233
234        /// <summary>
235        ///   Disconnects from the peer-to-peer system.
236        /// </summary>
237        /// <returns>True, if the peer has completely left the p2p network</returns>
238        public bool SynchStop()
239        {
240            if (Dht == null) return false;
241
242            Dht.BeginStop(null);
243
244            if (!IsConnected)
245            {
246                return true;
247            }
248
249            // wait till systemLeft Event is invoked
250            _systemLeft.WaitOne();
251
252            return true;
253        }
254
255        #endregion
256
257        #region Peer related method (GetPeerId, Send message to peer)
258
259        /// <summary>
260        ///   Get PeerName of the actual peer
261        /// </summary>
262        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
263        /// <returns>PeerID as a String</returns>
264        public PeerId GetPeerId(out string sPeerName)
265        {
266            sPeerName = _linkmanager.UserName;
267            return new PeerId(_overlay.LocalAddress);
268        }
269
270        /// <summary>
271        ///   Construct PeerId object for a specific byte[] id
272        /// </summary>
273        /// <param name = "byteId">overlay address as byte array</param>
274        /// <returns>corresponding PeerId for given byte[] id</returns>
275        public PeerId GetPeerId(byte[] byteId)
276        {
277            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
278            return new PeerId(_overlay.GetAddress(byteId));
279        }
280
281        // overlay.LocalAddress = Overlay-Peer-Address/Names
282        public void SendToPeer(byte[] data, byte[] destinationPeer)
283        {
284            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
285            var realStackSize = _overlay.GetHeaderSize() + data.Length;
286
287            var stackData = new ByteStack(realStackSize);
288            stackData.Push(data);
289
290            var destinationAddr = _overlay.GetAddress(destinationPeer);
291            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
292                                                _overlay.LocalAddress, destinationAddr, stackData);
293
294            _overlay.Send(overlayMsg);
295        }
296
297        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
298        {
299            if (OnP2PMessageReceived == null) return;
300
301            var pid = new PeerId(e.Message.Source);
302            /* You have to fire this event asynchronous, because the main
303                 * thread will be stopped in this wrapper class for synchronizing
304                 * the asynchronous stuff (AutoResetEvent) --> so this could run
305                 * into a deadlock, when you fire this event synchronous (normal Invoke)
306                 * ATTENTION: This could change the invocation order!!! In my case
307                              no problem, but maybe in future cases... */
308
309            // TODO: not safe: The delegate must have only one target
310            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
311
312            foreach (var del in OnP2PMessageReceived.GetInvocationList())
313            {
314                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
315            }
316        }
317
318        #endregion
319
320        #region Event Handling (System Joined, Left and Message Received)
321
322        private void OnDhtSystemJoined(object sender, EventArgs e)
323        {
324            if (OnSystemJoined != null)
325                OnSystemJoined();
326
327            _systemJoined.Set();
328            IsConnected = true;
329        }
330
331        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
332        {
333            if (OnSystemLeft != null)
334                OnSystemLeft();
335
336            IsConnected = false;
337            IsInitialized = false;
338
339            // Allow new connection to start and check for waiting / blocked tasks
340            // TODO reset running ConnectionWorkers?
341            _systemLeft.Set();
342            _systemJoined.Set();
343
344            LogToMonitor("CrypP2P left the system.");
345        }
346
347        #endregion
348
349        #region Synchronous Methods + their Callbacks
350
351        /// <summary>
352        ///   Stores a value in the DHT at the given key
353        /// </summary>
354        /// <param name = "key">Key of DHT Entry</param>
355        /// <param name = "data">Value of DHT Entry</param>
356        /// <returns>True, when storing is completed!</returns>
357        public bool SynchStore(string key, byte[] data)
358        {
359            var autoResetEvent = new AutoResetEvent(false);
360
361            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
362            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
363
364            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
365            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
366
367            // blocking till response
368            autoResetEvent.WaitOne();
369
370            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
371
372            return responseWait.success;
373        }
374
375        /// <summary>
376        ///   Stores a value in the DHT at the given key
377        /// </summary>
378        /// <param name = "key">Key of DHT Entry</param>
379        /// <param name = "value">Value of DHT Entry</param>
380        /// <returns>True, when storing is completed!</returns>
381        public bool SynchStore(string key, string value)
382        {
383            return SynchStore(key, Encoding.UTF8.GetBytes(value));
384        }
385
386        /// <summary>
387        ///   Callback for a the synchronized store method
388        /// </summary>
389        /// <param name = "storeResult">retrieved data container</param>
390        private static void OnSynchStoreCompleted(StoreResult storeResult)
391        {
392            var responseWait = storeResult.AsyncState as ResponseWait;
393            if (responseWait == null)
394            {
395                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
396                return;
397            }
398
399            responseWait.success = storeResult.Status == OperationStatus.Success;
400            responseWait.operationStatus = storeResult.Status;
401            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
402
403            // unblock WaitHandle in the synchronous method
404            responseWait.WaitHandle.Set();
405        }
406
407        /// <summary>
408        ///   Get the value of the given DHT Key or null, if it doesn't exist.
409        /// </summary>
410        /// <param name = "key">Key of DHT Entry</param>
411        /// <returns>Value of DHT Entry</returns>
412        public byte[] SynchRetrieve(string key)
413        {
414            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
415
416            var autoResetEvent = new AutoResetEvent(false);
417            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
418            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
419
420            // blocking till response
421            autoResetEvent.WaitOne();
422
423            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
424
425            return responseWait.Message;
426        }
427
428        /// <summary>
429        ///   Callback for a the synchronized retrieval method
430        /// </summary>
431        /// <param name = "retrieveResult"></param>
432        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
433        {
434            var responseWait = retrieveResult.AsyncState as ResponseWait;
435            if (responseWait == null)
436            {
437                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
438                return;
439            }
440
441            switch (retrieveResult.Status)
442            {
443                case OperationStatus.Success:
444                    responseWait.success = true;
445                    responseWait.Message = retrieveResult.Data;
446                    break;
447                case OperationStatus.KeyNotFound:
448                    responseWait.success = true;
449                    responseWait.Message = null;
450                    break;
451                default:
452                    responseWait.success = false;
453                    responseWait.Message = null;
454                    break;
455            }
456
457            responseWait.operationStatus = retrieveResult.Status;
458
459            // unblock WaitHandle in the synchronous method
460            responseWait.WaitHandle.Set();
461        }
462
463        /// <summary>
464        ///   Removes a key/value pair out of the DHT
465        /// </summary>
466        /// <param name = "key">Key of the DHT Entry</param>
467        /// <returns>True, when removing is completed!</returns>
468        public bool SynchRemove(string key)
469        {
470            LogToMonitor("Begin SynchRemove. Key: " + key);
471
472            var autoResetEvent = new AutoResetEvent(false);
473            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
474            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
475
476            // blocking till response
477            autoResetEvent.WaitOne();
478
479            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
480
481            return responseWait.success;
482        }
483
484        /// <summary>
485        ///   Callback for a the synchronized remove method
486        /// </summary>
487        /// <param name = "removeResult"></param>
488        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
489        {
490            var responseWait = removeResult.AsyncState as ResponseWait;
491            if (responseWait == null)
492            {
493                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
494                return;
495            }
496
497            responseWait.success = removeResult.Status != OperationStatus.Failure &&
498                                   removeResult.Status != OperationStatus.VersionMismatch;
499            responseWait.operationStatus = removeResult.Status;
500            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
501
502            // unblock WaitHandle in the synchronous method
503            responseWait.WaitHandle.Set();
504        }
505
506        #endregion
507
508        #region Log facility
509
510        /// <summary>
511        ///   To log the internal state in the Monitoring Software of P@play
512        /// </summary>
513        public void LogInternalState()
514        {
515            if (Dht != null)
516            {
517                Dht.LogInternalState();
518            }
519        }
520
521        private static void LogToMonitor(string sTextToLog)
522        {
523            if (P2PSettings.Default.Log2Monitor)
524                Log.Debug(sTextToLog);
525        }
526
527        #endregion
528    }
529}
Note: See TracBrowser for help on using the repository browser.