source: trunk/CrypP2P/Internal/P2PBase.cs @ 1587

Last change on this file since 1587 was 1587, checked in by Paul Lelgemann, 12 years ago

+ CrypP2P: Added configuration option for "Use local local address detection" (defaults to false)
o Minor bugfixes in PeerToPeer(Publisher|Subscriber), which seem to handle an unconnected p2p network wrong

File size: 19.0 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Text;
19using System.Threading;
20using Cryptool.PluginBase;
21using Cryptool.Plugins.PeerToPeer.Internal;
22using Gears4Net;
23using PeersAtPlay;
24using PeersAtPlay.Monitoring;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2PLink.SnalNG;
27using PeersAtPlay.P2POverlay;
28using PeersAtPlay.P2POverlay.Bootstrapper;
29using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
30using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
31using PeersAtPlay.P2POverlay.FullMeshOverlay;
32using PeersAtPlay.P2PStorage.DHT;
33using PeersAtPlay.P2PStorage.FullMeshDHT;
34using PeersAtPlay.Util.Logging;
35
36/* TODO:
37 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
38 * - Testing asynchronous methods incl. EventHandlers
39 */
40
41namespace Cryptool.P2P.Internal
42{
43    /// <summary>
44    ///   Wrapper class to integrate peer@play environment into CrypTool.
45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
46    /// </summary>
47    public class P2PBase
48    {
49        #region Variables
50
51        private readonly AutoResetEvent _systemJoined;
52        private readonly AutoResetEvent _systemLeft;
53        private IBootstrapper _bootstrapper;
54        private IP2PLinkManager _linkmanager;
55        private P2POverlay _overlay;
56        internal IDHT Dht;
57        internal IVersionedDHT VersionedDht;
58
59        /// <summary>
60        ///   True if system was successfully joined, false if system is COMPLETELY left
61        /// </summary>
62        public bool Started { get; private set; }
63
64        /// <summary>
65        ///   True if the underlying peer to peer system has been fully initialized
66        /// </summary>
67        public bool Initialized { get; private set; }
68
69        #endregion
70
71        #region Delegates
72
73        public event P2PMessageReceived OnP2PMessageReceived;
74        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
75
76        public event SystemJoined OnSystemJoined;
77        public delegate void SystemJoined();
78
79        public event SystemLeft OnSystemLeft;
80        public delegate void SystemLeft();
81
82        #endregion
83
84        public P2PBase()
85        {
86            Started = false;
87            Initialized = false;
88
89            _systemJoined = new AutoResetEvent(false);
90            _systemLeft = new AutoResetEvent(false);
91        }
92
93        #region Basic P2P Methods (Init, Start, Stop)
94
95        /// <summary>
96        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
97        /// </summary>
98        public void Initialize()
99        {
100            Scheduler scheduler = new STAScheduler("pap");
101
102            switch (P2PSettings.Default.LinkManager)
103            {
104                case P2PLinkManagerType.Snal:
105                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
106
107                    // NAT-Traversal stuff needs a different Snal-Version
108                    _linkmanager = new Snal(scheduler);
109
110                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
111                    settings.LoadDefaults();
112                    settings.ConnectInternal = true;
113                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
114                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
115                    settings.AutoReconnect = false;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119
120                    _linkmanager.Settings = settings;
121                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
122
123                    break;
124                default:
125                    throw (new NotImplementedException());
126            }
127
128            switch (P2PSettings.Default.Bootstrapper)
129            {
130                case P2PBootstrapperType.LocalMachineBootstrapper:
131                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
132                    _bootstrapper = new LocalMachineBootstrapper();
133                    break;
134                case P2PBootstrapperType.IrcBootstrapper:
135                    // setup nat traversal stuff
136                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
137                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
138                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
139                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
140                   
141                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
142                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
143
144                    _bootstrapper = new IrcBootstrapper(scheduler);
145                    break;
146                default:
147                    throw (new NotImplementedException());
148            }
149
150            switch (P2PSettings.Default.Overlay)
151            {
152                case P2POverlayType.FullMeshOverlay:
153                    // changing overlay example: this.overlay = new ChordOverlay();
154                    _overlay = new FullMeshOverlay(scheduler);
155                    break;
156                default:
157                    throw (new NotImplementedException());
158            }
159
160            switch (P2PSettings.Default.Dht)
161            {
162                case P2PDHTType.FullMeshDHT:
163                    Dht = new FullMeshDHT(scheduler);
164                    break;
165                default:
166                    throw (new NotImplementedException());
167            }
168
169            _overlay.MessageReceived += OverlayMessageReceived;
170            Dht.SystemJoined += OnDhtSystemJoined;
171            Dht.SystemLeft += OnDhtSystemLeft;
172
173            VersionedDht = (IVersionedDHT) Dht;
174
175            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay, 
176                            _bootstrapper, 
177                            _linkmanager, null);
178
179            Initialized = true;
180        }
181
182        /// <summary>
183        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
184        ///   inclusive creating the and bootstrapping to the P2P network.
185        ///   In either case joining the P2P world.
186        ///   This synchronized method returns true not before the peer has
187        ///   successfully joined the network (this may take one or two minutes).
188        /// </summary>
189        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
190        /// After validating the settings, this can be done by calling Initialize().</exception>
191        /// <returns>True, if the peer has completely joined the p2p network</returns>
192        public bool SynchStart()
193        {
194            if (!Initialized)
195            {
196                throw new InvalidOperationException("Peer-to-peer is not initialized.");
197            }
198
199            if (Started)
200            {
201                return true;
202            }
203
204            Dht.BeginStart(null);
205
206            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
207            _systemJoined.WaitOne();
208
209            return true;
210        }
211
212        /// <summary>
213        ///   Disconnects from the peer-to-peer system.
214        /// </summary>
215        /// <returns>True, if the peer has completely left the p2p network</returns>
216        public bool SynchStop()
217        {
218            if (Dht == null) return false;
219
220            Dht.BeginStop(null);
221
222            // wait till systemLeft Event is invoked
223            _systemLeft.WaitOne();
224
225            return true;
226        }
227
228        #endregion
229
230        #region Peer related method (GetPeerId, Send message to peer)
231
232        /// <summary>
233        ///   Get PeerName of the actual peer
234        /// </summary>
235        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
236        /// <returns>PeerID as a String</returns>
237        public PeerId GetPeerId(out string sPeerName)
238        {
239            sPeerName = _linkmanager.UserName;
240            return new PeerId(_overlay.LocalAddress);
241        }
242
243        /// <summary>
244        ///   Construct PeerId object for a specific byte[] id
245        /// </summary>
246        /// <param name = "byteId">overlay address as byte array</param>
247        /// <returns>corresponding PeerId for given byte[] id</returns>
248        public PeerId GetPeerId(byte[] byteId)
249        {
250            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
251            return new PeerId(_overlay.GetAddress(byteId));
252        }
253
254        // overlay.LocalAddress = Overlay-Peer-Address/Names
255        public void SendToPeer(byte[] data, byte[] destinationPeer)
256        {
257            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
258            var realStackSize = _overlay.GetHeaderSize() + data.Length;
259
260            var stackData = new ByteStack(realStackSize);
261            stackData.Push(data);
262
263            var destinationAddr = _overlay.GetAddress(destinationPeer);
264            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
265                                                _overlay.LocalAddress, destinationAddr, stackData);
266
267            _overlay.Send(overlayMsg);
268        }
269
270        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
271        {
272            if (OnP2PMessageReceived == null) return;
273
274            var pid = new PeerId(e.Message.Source);
275            /* You have to fire this event asynchronous, because the main
276                 * thread will be stopped in this wrapper class for synchronizing
277                 * the asynchronous stuff (AutoResetEvent) --> so this could run
278                 * into a deadlock, when you fire this event synchronous (normal Invoke)
279                 * ATTENTION: This could change the invocation order!!! In my case
280                              no problem, but maybe in future cases... */
281
282            // TODO: not safe: The delegate must have only one target
283            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
284
285            foreach (var del in OnP2PMessageReceived.GetInvocationList())
286            {
287                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
288            }
289        }
290
291        #endregion
292
293        #region Event Handling (System Joined, Left and Message Received)
294
295        private void OnDhtSystemJoined(object sender, EventArgs e)
296        {
297            if (OnSystemJoined != null)
298                OnSystemJoined();
299
300            _systemJoined.Set();
301            Started = true;
302        }
303
304        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
305        {
306            if (OnSystemLeft != null)
307                OnSystemLeft();
308
309            Started = false;
310            Initialized = false;
311
312            // Allow new connection to start and check for waiting / blocked tasks
313            P2PManager.Instance.IsP2PConnecting = false;
314            _systemLeft.Set();
315            _systemJoined.Set();
316
317            LogToMonitor("CrypP2P left the system.");
318        }
319
320        #endregion
321
322        #region Synchronous Methods + their Callbacks
323
324        /// <summary>
325        ///   Stores a value in the DHT at the given key
326        /// </summary>
327        /// <param name = "key">Key of DHT Entry</param>
328        /// <param name = "data">Value of DHT Entry</param>
329        /// <returns>True, when storing is completed!</returns>
330        public bool SynchStore(string key, byte[] data)
331        {
332            var autoResetEvent = new AutoResetEvent(false);
333
334            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
335            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
336
337            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
338            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
339
340            // blocking till response
341            autoResetEvent.WaitOne();
342
343            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
344
345            return responseWait.success;
346        }
347
348        /// <summary>
349        ///   Stores a value in the DHT at the given key
350        /// </summary>
351        /// <param name = "key">Key of DHT Entry</param>
352        /// <param name = "value">Value of DHT Entry</param>
353        /// <returns>True, when storing is completed!</returns>
354        public bool SynchStore(string key, string value)
355        {
356            return SynchStore(key, Encoding.UTF8.GetBytes(value));
357        }
358
359        /// <summary>
360        ///   Callback for a the synchronized store method
361        /// </summary>
362        /// <param name = "storeResult">retrieved data container</param>
363        private static void OnSynchStoreCompleted(StoreResult storeResult)
364        {
365            var responseWait = storeResult.AsyncState as ResponseWait;
366            if (responseWait == null)
367            {
368                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
369                return;
370            }
371
372            responseWait.success = storeResult.Status != OperationStatus.KeyNotFound;
373            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
374
375            // unblock WaitHandle in the synchronous method
376            responseWait.WaitHandle.Set();
377
378            LogToMonitor("Received and handled OnSynchStoreCompleted.");
379        }
380
381        /// <summary>
382        ///   Get the value of the given DHT Key or null, if it doesn't exist.
383        /// </summary>
384        /// <param name = "key">Key of DHT Entry</param>
385        /// <returns>Value of DHT Entry</returns>
386        public byte[] SynchRetrieve(string key)
387        {
388            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
389
390            var autoResetEvent = new AutoResetEvent(false);
391            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
392            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
393
394            // blocking till response
395            autoResetEvent.WaitOne();
396
397            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
398
399            return responseWait.Message;
400        }
401
402        /// <summary>
403        ///   Callback for a the synchronized retrieval method
404        /// </summary>
405        /// <param name = "retrieveResult"></param>
406        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
407        {
408            var responseWait = retrieveResult.AsyncState as ResponseWait;
409            if (responseWait == null)
410            {
411                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
412                return;
413            }
414
415            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
416
417            switch (retrieveResult.Status)
418            {
419                case OperationStatus.Success:
420                    responseWait.success = true;
421                    responseWait.Message = retrieveResult.Data;
422                    break;
423                case OperationStatus.KeyNotFound:
424                    responseWait.success = true;
425                    responseWait.Message = null;
426                    break;
427                default:
428                    responseWait.success = false;
429                    responseWait.Message = null;
430                    break;
431            }
432
433            // unblock WaitHandle in the synchronous method
434            responseWait.WaitHandle.Set();
435
436            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
437        }
438
439        /// <summary>
440        ///   Removes a key/value pair out of the DHT
441        /// </summary>
442        /// <param name = "key">Key of the DHT Entry</param>
443        /// <returns>True, when removing is completed!</returns>
444        public bool SynchRemove(string key)
445        {
446            LogToMonitor("Begin SynchRemove. Key: " + key);
447
448            var autoResetEvent = new AutoResetEvent(false);
449            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
450            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
451
452            // blocking till response
453            autoResetEvent.WaitOne();
454
455            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
456
457            return responseWait.success;
458        }
459
460        /// <summary>
461        ///   Callback for a the synchronized remove method
462        /// </summary>
463        /// <param name = "removeResult"></param>
464        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
465        {
466            var responseWait = removeResult.AsyncState as ResponseWait;
467            if (responseWait == null)
468            {
469                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
470                return;
471            }
472
473            responseWait.success = removeResult.Status == OperationStatus.Success;
474            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
475
476            // unblock WaitHandle in the synchronous method
477            responseWait.WaitHandle.Set();
478
479            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
480        }
481
482        #endregion
483
484        #region Log facility
485
486        /// <summary>
487        ///   To log the internal state in the Monitoring Software of P@play
488        /// </summary>
489        public void LogInternalState()
490        {
491            if (Dht != null)
492            {
493                Dht.LogInternalState();
494            }
495        }
496
497        private static void LogToMonitor(string sTextToLog)
498        {
499            if (P2PSettings.Default.Log2Monitor)
500                Log.Debug(sTextToLog);
501        }
502
503        #endregion
504    }
505}
Note: See TracBrowser for help on using the repository browser.