source: trunk/CrypP2P/Internal/P2PBase.cs @ 1672

Last change on this file since 1672 was 1672, checked in by Matthäus Wander, 11 years ago

P2P:

  • new pap dlls based on pap rev. 5081
  • settings change to make ct2 compile against new pap dlls
  • added chord overlay+dht
File size: 19.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36using PeersAtPlay.P2POverlay.Chord;
37
38/* TODO:
39 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
40 * - Testing asynchronous methods incl. EventHandlers
41 */
42
43namespace Cryptool.P2P.Internal
44{
45    /// <summary>
46    ///   Wrapper class to integrate peer@play environment into CrypTool.
47    ///   This class synchronizes asynchronous methods for easier usage in CT2.
48    /// </summary>
49    public class P2PBase
50    {
51        #region Variables
52
53        private readonly AutoResetEvent _systemJoined;
54        private readonly AutoResetEvent _systemLeft;
55        private IBootstrapper _bootstrapper;
56        private IP2PLinkManager _linkmanager;
57        private P2POverlay _overlay;
58        internal IDHT Dht;
59        internal IVersionedDHT VersionedDht;
60
61        /// <summary>
62        ///   True if system was successfully joined, false if system is COMPLETELY left
63        /// </summary>
64        public bool IsConnected { get; private set; }
65
66        /// <summary>
67        ///   True if the underlying peer to peer system has been fully initialized
68        /// </summary>
69        public bool IsInitialized { get; private set; }
70
71        #endregion
72
73        #region Delegates
74
75        public event P2PMessageReceived OnP2PMessageReceived;
76        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
77
78        public event SystemJoined OnSystemJoined;
79        public delegate void SystemJoined();
80
81        public event SystemLeft OnSystemLeft;
82        public delegate void SystemLeft();
83
84        #endregion
85
86        public P2PBase()
87        {
88            IsConnected = false;
89            IsInitialized = false;
90
91            _systemJoined = new AutoResetEvent(false);
92            _systemLeft = new AutoResetEvent(false);
93        }
94
95        #region Basic P2P Methods (Init, Start, Stop)
96
97        /// <summary>
98        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
99        /// </summary>
100        public void Initialize()
101        {
102            Scheduler scheduler = new STAScheduler("pap");
103
104            switch (P2PSettings.Default.LinkManager)
105            {
106                case P2PLinkManagerType.Snal:
107                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
108
109                    // NAT-Traversal stuff needs a different Snal-Version
110                    _linkmanager = new Snal(scheduler);
111
112                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
113                    settings.LoadDefaults();
114                    settings.ConnectInternal = true;
115                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
116                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
117                    settings.NoDelay = false;
118                    settings.ReuseAddress = false;
119                    settings.UseNetworkMonitorServer = true;
120                    settings.CloseConnectionAfterPingTimeout = false;
121
122                    settings.FragmentMessages = true;
123                    settings.FragmentMessageSize = 10*1024;
124                       
125                    switch(P2PSettings.Default.TransportProtocol)
126                    {
127                        case P2PTransportProtocol.UDP:
128                            settings.TransportProtocol = TransportProtocol.UDP;
129                            break;
130                        case P2PTransportProtocol.TCP_UDP:
131                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
132                            break;
133                        default:
134                            settings.TransportProtocol = TransportProtocol.TCP;
135                            break;
136                    }
137
138                    _linkmanager.Settings = settings;
139                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
140
141                    break;
142                default:
143                    throw (new NotImplementedException());
144            }
145
146            switch (P2PSettings.Default.Bootstrapper)
147            {
148                case P2PBootstrapperType.LocalMachineBootstrapper:
149                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
150                    _bootstrapper = new LocalMachineBootstrapper();
151                    break;
152                case P2PBootstrapperType.IrcBootstrapper:
153                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
154                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
155                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
156
157                    _bootstrapper = new IrcBootstrapper(scheduler);
158                    break;
159                default:
160                    throw (new NotImplementedException());
161            }
162
163            switch (P2PSettings.Default.Overlay)
164            {
165                case P2POverlayType.FullMeshOverlay:
166                    // changing overlay example: this.overlay = new ChordOverlay();
167                    _overlay = new FullMeshOverlay(scheduler);
168                    break;
169                case P2POverlayType.ChordOverlay:
170                    _overlay = new ChordNGCore(scheduler);
171                    break;
172                default:
173                    throw (new NotImplementedException());
174            }
175
176            switch (P2PSettings.Default.Dht)
177            {
178                case P2PDHTType.FullMeshDHT:
179                    Dht = new FullMeshDHT(scheduler);
180                    break;
181                case P2PDHTType.ChordDHT:
182                    Dht = (IDHT) _overlay;
183                    break;
184                default:
185                    throw (new NotImplementedException());
186            }
187
188            _overlay.MessageReceived += OverlayMessageReceived;
189            Dht.SystemJoined += OnDhtSystemJoined;
190            Dht.SystemLeft += OnDhtSystemLeft;
191
192            VersionedDht = (IVersionedDHT) Dht;
193
194            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
195                                                NotificationLevel.Info);
196            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
197                            _bootstrapper,
198                            _linkmanager, null);
199
200            IsInitialized = true;
201        }
202
203        /// <summary>
204        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
205        ///   inclusive creating the and bootstrapping to the P2P network.
206        ///   In either case joining the P2P world.
207        ///   This synchronized method returns true not before the peer has
208        ///   successfully joined the network (this may take one or two minutes).
209        /// </summary>
210        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
211        /// After validating the settings, this can be done by calling Initialize().</exception>
212        /// <returns>True, if the peer has completely joined the p2p network</returns>
213        public bool SynchStart()
214        {
215            if (!IsInitialized)
216            {
217                throw new InvalidOperationException("Peer-to-peer is not initialized.");
218            }
219
220            if (IsConnected)
221            {
222                return true;
223            }
224
225            Dht.BeginStart(BeginStartEventHandler);
226
227            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
228            _systemJoined.WaitOne();
229            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
230
231            return true;
232        }
233
234        private void BeginStartEventHandler(DHTEventArgs eventArgs)
235        {
236            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
237        }
238
239        /// <summary>
240        ///   Disconnects from the peer-to-peer system.
241        /// </summary>
242        /// <returns>True, if the peer has completely left the p2p network</returns>
243        public bool SynchStop()
244        {
245            if (Dht == null) return false;
246
247            Dht.BeginStop(null);
248
249            if (!IsConnected)
250            {
251                return true;
252            }
253
254            // wait till systemLeft Event is invoked
255            _systemLeft.WaitOne();
256
257            return true;
258        }
259
260        #endregion
261
262        #region Peer related method (GetPeerId, Send message to peer)
263
264        /// <summary>
265        ///   Get PeerName of the actual peer
266        /// </summary>
267        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
268        /// <returns>PeerID as a String</returns>
269        public PeerId GetPeerId(out string sPeerName)
270        {
271            sPeerName = _linkmanager.UserName;
272            return new PeerId(_overlay.LocalAddress);
273        }
274
275        /// <summary>
276        ///   Construct PeerId object for a specific byte[] id
277        /// </summary>
278        /// <param name = "byteId">overlay address as byte array</param>
279        /// <returns>corresponding PeerId for given byte[] id</returns>
280        public PeerId GetPeerId(byte[] byteId)
281        {
282            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
283            return new PeerId(_overlay.GetAddress(byteId));
284        }
285
286        // overlay.LocalAddress = Overlay-Peer-Address/Names
287        public void SendToPeer(byte[] data, byte[] destinationPeer)
288        {
289            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
290            var realStackSize = _overlay.GetHeaderSize() + data.Length;
291
292            var stackData = new ByteStack(realStackSize);
293            stackData.Push(data);
294
295            var destinationAddr = _overlay.GetAddress(destinationPeer);
296            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
297                                                _overlay.LocalAddress, destinationAddr, stackData);
298
299            _overlay.Send(overlayMsg);
300        }
301
302        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
303        {
304            if (OnP2PMessageReceived == null) return;
305
306            var pid = new PeerId(e.Message.Source);
307            /* You have to fire this event asynchronous, because the main
308                 * thread will be stopped in this wrapper class for synchronizing
309                 * the asynchronous stuff (AutoResetEvent) --> so this could run
310                 * into a deadlock, when you fire this event synchronous (normal Invoke)
311                 * ATTENTION: This could change the invocation order!!! In my case
312                              no problem, but maybe in future cases... */
313
314            // TODO: not safe: The delegate must have only one target
315            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
316
317            foreach (var del in OnP2PMessageReceived.GetInvocationList())
318            {
319                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
320            }
321        }
322
323        #endregion
324
325        #region Event Handling (System Joined, Left and Message Received)
326
327        private void OnDhtSystemJoined(object sender, EventArgs e)
328        {
329            if (OnSystemJoined != null)
330                OnSystemJoined();
331
332            _systemJoined.Set();
333            IsConnected = true;
334        }
335
336        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
337        {
338            if (OnSystemLeft != null)
339                OnSystemLeft();
340
341            IsConnected = false;
342            IsInitialized = false;
343
344            // Allow new connection to start and check for waiting / blocked tasks
345            // TODO reset running ConnectionWorkers?
346            _systemLeft.Set();
347            _systemJoined.Set();
348
349            LogToMonitor("CrypP2P left the system.");
350        }
351
352        #endregion
353
354        #region Synchronous Methods + their Callbacks
355
356        /// <summary>
357        ///   Stores a value in the DHT at the given key
358        /// </summary>
359        /// <param name = "key">Key of DHT Entry</param>
360        /// <param name = "value">Value of DHT Entry</param>
361        /// <returns>True, when storing is completed!</returns>
362        public RequestResult SynchStore(string key, string value)
363        {
364            return SynchStore(key, Encoding.UTF8.GetBytes(value));
365        }
366
367        /// <summary>
368        ///   Stores a value in the DHT at the given key
369        /// </summary>
370        /// <param name = "key">Key of DHT Entry</param>
371        /// <param name = "data">Value of DHT Entry</param>
372        /// <returns>True, when storing is completed!</returns>
373        public RequestResult SynchStore(string key, byte[] data)
374        {
375            var autoResetEvent = new AutoResetEvent(false);
376
377            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
378            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
379
380            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
381            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
382
383            // blocking till response
384            autoResetEvent.WaitOne();
385
386            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
387
388            return requestResult;
389        }
390
391        /// <summary>
392        ///   Callback for a the synchronized store method
393        /// </summary>
394        /// <param name = "storeResult">retrieved data container</param>
395        private static void OnSynchStoreCompleted(StoreResult storeResult)
396        {
397            var requestResult = storeResult.AsyncState as RequestResult;
398            if (requestResult == null)
399            {
400                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
401                return;
402            }
403
404            requestResult.Parse(storeResult);
405
406            // unblock WaitHandle in the synchronous method
407            requestResult.WaitHandle.Set();
408        }
409
410        /// <summary>
411        ///   Get the value of the given DHT Key or null, if it doesn't exist.
412        /// </summary>
413        /// <param name = "key">Key of DHT Entry</param>
414        /// <returns>Value of DHT Entry</returns>
415        public RequestResult SynchRetrieve(string key)
416        {
417            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
418
419            var autoResetEvent = new AutoResetEvent(false);
420            var requestResult = new RequestResult() {WaitHandle = autoResetEvent, Key = key};
421            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
422
423            // blocking till response
424            autoResetEvent.WaitOne();
425
426            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
427
428            return requestResult;
429        }
430
431        /// <summary>
432        ///   Callback for a the synchronized retrieval method
433        /// </summary>
434        /// <param name = "retrieveResult"></param>
435        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
436        {
437            var requestResult = retrieveResult.AsyncState as RequestResult;
438            if (requestResult == null)
439            {
440                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
441                return;
442            }
443
444            requestResult.Parse(retrieveResult);
445
446            // unblock WaitHandle in the synchronous method
447            requestResult.WaitHandle.Set();
448        }
449
450        /// <summary>
451        ///   Removes a key/value pair out of the DHT
452        /// </summary>
453        /// <param name = "key">Key of the DHT Entry</param>
454        /// <returns>True, when removing is completed!</returns>
455        public RequestResult SynchRemove(string key)
456        {
457            LogToMonitor("Begin SynchRemove. Key: " + key);
458
459            var autoResetEvent = new AutoResetEvent(false);
460            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
461            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
462
463            // blocking till response
464            autoResetEvent.WaitOne();
465
466            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
467
468            return requestResult;
469        }
470
471        /// <summary>
472        ///   Callback for a the synchronized remove method
473        /// </summary>
474        /// <param name = "removeResult"></param>
475        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
476        {
477            var requestResult = removeResult.AsyncState as RequestResult;
478            if (requestResult == null)
479            {
480                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
481                return;
482            }
483
484            requestResult.Parse(removeResult);
485
486            // unblock WaitHandle in the synchronous method
487            requestResult.WaitHandle.Set();
488        }
489
490        #endregion
491
492        #region Log facility
493
494        /// <summary>
495        ///   To log the internal state in the Monitoring Software of P@play
496        /// </summary>
497        public void LogInternalState()
498        {
499            if (Dht != null)
500            {
501                Dht.LogInternalState();
502            }
503        }
504
505        private static void LogToMonitor(string sTextToLog)
506        {
507            if (P2PSettings.Default.Log2Monitor)
508                Log.Debug(sTextToLog);
509        }
510
511        #endregion
512    }
513}
Note: See TracBrowser for help on using the repository browser.