source: trunk/CrypP2P/Internal/P2PBase.cs @ 1643

Last change on this file since 1643 was 1643, checked in by Paul Lelgemann, 11 years ago

o Updated Peers@Play libraries
o Work on the distributed KeySearcher

File size: 19.9 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36
37/* TODO:
38 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
39 * - Testing asynchronous methods incl. EventHandlers
40 */
41
42namespace Cryptool.P2P.Internal
43{
44    /// <summary>
45    ///   Wrapper class to integrate peer@play environment into CrypTool.
46    ///   This class synchronizes asynchronous methods for easier usage in CT2.
47    /// </summary>
48    public class P2PBase
49    {
50        #region Variables
51
52        private readonly AutoResetEvent _systemJoined;
53        private readonly AutoResetEvent _systemLeft;
54        private IBootstrapper _bootstrapper;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        internal IDHT Dht;
58        internal IVersionedDHT VersionedDht;
59
60        /// <summary>
61        ///   True if system was successfully joined, false if system is COMPLETELY left
62        /// </summary>
63        public bool IsConnected { get; private set; }
64
65        /// <summary>
66        ///   True if the underlying peer to peer system has been fully initialized
67        /// </summary>
68        public bool IsInitialized { get; private set; }
69
70        #endregion
71
72        #region Delegates
73
74        public event P2PMessageReceived OnP2PMessageReceived;
75        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
76
77        public event SystemJoined OnSystemJoined;
78        public delegate void SystemJoined();
79
80        public event SystemLeft OnSystemLeft;
81        public delegate void SystemLeft();
82
83        #endregion
84
85        public P2PBase()
86        {
87            IsConnected = false;
88            IsInitialized = false;
89
90            _systemJoined = new AutoResetEvent(false);
91            _systemLeft = new AutoResetEvent(false);
92        }
93
94        #region Basic P2P Methods (Init, Start, Stop)
95
96        /// <summary>
97        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
98        /// </summary>
99        public void Initialize()
100        {
101            Scheduler scheduler = new STAScheduler("pap");
102
103            switch (P2PSettings.Default.LinkManager)
104            {
105                case P2PLinkManagerType.Snal:
106                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
107
108                    // NAT-Traversal stuff needs a different Snal-Version
109                    _linkmanager = new Snal(scheduler);
110
111                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
112                    settings.LoadDefaults();
113                    settings.ConnectInternal = true;
114                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
115                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119                    settings.CloseConnectionAfterPingTimeout = false;
120                       
121                    switch(P2PSettings.Default.TransportProtocol)
122                    {
123                        case P2PTransportProtocol.UDP:
124                            settings.TransportProtocol = TransportProtocol.UDP;
125                            break;
126                        case P2PTransportProtocol.TCP_UDP:
127                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
128                            break;
129                        default:
130                            settings.TransportProtocol = TransportProtocol.TCP;
131                            break;
132                    }
133
134                    _linkmanager.Settings = settings;
135                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
136
137                    break;
138                default:
139                    throw (new NotImplementedException());
140            }
141
142            switch (P2PSettings.Default.Bootstrapper)
143            {
144                case P2PBootstrapperType.LocalMachineBootstrapper:
145                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
146                    _bootstrapper = new LocalMachineBootstrapper();
147                    break;
148                case P2PBootstrapperType.IrcBootstrapper:
149                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
150                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
151                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
152
153                    _bootstrapper = new IrcBootstrapper(scheduler);
154                    break;
155                default:
156                    throw (new NotImplementedException());
157            }
158
159            switch (P2PSettings.Default.Overlay)
160            {
161                case P2POverlayType.FullMeshOverlay:
162                    // changing overlay example: this.overlay = new ChordOverlay();
163                    _overlay = new FullMeshOverlay(scheduler);
164                    break;
165                default:
166                    throw (new NotImplementedException());
167            }
168
169            switch (P2PSettings.Default.Dht)
170            {
171                case P2PDHTType.FullMeshDHT:
172                    Dht = new FullMeshDHT(scheduler);
173                    break;
174                default:
175                    throw (new NotImplementedException());
176            }
177
178            _overlay.MessageReceived += OverlayMessageReceived;
179            Dht.SystemJoined += OnDhtSystemJoined;
180            Dht.SystemLeft += OnDhtSystemLeft;
181
182            VersionedDht = (IVersionedDHT) Dht;
183
184            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
185                                                NotificationLevel.Info);
186            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
187                            _bootstrapper,
188                            _linkmanager, null);
189
190            IsInitialized = true;
191        }
192
193        /// <summary>
194        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
195        ///   inclusive creating the and bootstrapping to the P2P network.
196        ///   In either case joining the P2P world.
197        ///   This synchronized method returns true not before the peer has
198        ///   successfully joined the network (this may take one or two minutes).
199        /// </summary>
200        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
201        /// After validating the settings, this can be done by calling Initialize().</exception>
202        /// <returns>True, if the peer has completely joined the p2p network</returns>
203        public bool SynchStart()
204        {
205            if (!IsInitialized)
206            {
207                throw new InvalidOperationException("Peer-to-peer is not initialized.");
208            }
209
210            if (IsConnected)
211            {
212                return true;
213            }
214
215            Dht.BeginStart(BeginStartEventHandler);
216
217            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
218            _systemJoined.WaitOne();
219            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
220
221            return true;
222        }
223
224        private void BeginStartEventHandler(DHTEventArgs eventArgs)
225        {
226            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
227        }
228
229        /// <summary>
230        ///   Disconnects from the peer-to-peer system.
231        /// </summary>
232        /// <returns>True, if the peer has completely left the p2p network</returns>
233        public bool SynchStop()
234        {
235            if (Dht == null) return false;
236
237            Dht.BeginStop(null);
238
239            if (!IsConnected)
240            {
241                return true;
242            }
243
244            // wait till systemLeft Event is invoked
245            _systemLeft.WaitOne();
246
247            return true;
248        }
249
250        #endregion
251
252        #region Peer related method (GetPeerId, Send message to peer)
253
254        /// <summary>
255        ///   Get PeerName of the actual peer
256        /// </summary>
257        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
258        /// <returns>PeerID as a String</returns>
259        public PeerId GetPeerId(out string sPeerName)
260        {
261            sPeerName = _linkmanager.UserName;
262            return new PeerId(_overlay.LocalAddress);
263        }
264
265        /// <summary>
266        ///   Construct PeerId object for a specific byte[] id
267        /// </summary>
268        /// <param name = "byteId">overlay address as byte array</param>
269        /// <returns>corresponding PeerId for given byte[] id</returns>
270        public PeerId GetPeerId(byte[] byteId)
271        {
272            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
273            return new PeerId(_overlay.GetAddress(byteId));
274        }
275
276        // overlay.LocalAddress = Overlay-Peer-Address/Names
277        public void SendToPeer(byte[] data, byte[] destinationPeer)
278        {
279            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
280            var realStackSize = _overlay.GetHeaderSize() + data.Length;
281
282            var stackData = new ByteStack(realStackSize);
283            stackData.Push(data);
284
285            var destinationAddr = _overlay.GetAddress(destinationPeer);
286            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
287                                                _overlay.LocalAddress, destinationAddr, stackData);
288
289            _overlay.Send(overlayMsg);
290        }
291
292        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
293        {
294            if (OnP2PMessageReceived == null) return;
295
296            var pid = new PeerId(e.Message.Source);
297            /* You have to fire this event asynchronous, because the main
298                 * thread will be stopped in this wrapper class for synchronizing
299                 * the asynchronous stuff (AutoResetEvent) --> so this could run
300                 * into a deadlock, when you fire this event synchronous (normal Invoke)
301                 * ATTENTION: This could change the invocation order!!! In my case
302                              no problem, but maybe in future cases... */
303
304            // TODO: not safe: The delegate must have only one target
305            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
306
307            foreach (var del in OnP2PMessageReceived.GetInvocationList())
308            {
309                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
310            }
311        }
312
313        #endregion
314
315        #region Event Handling (System Joined, Left and Message Received)
316
317        private void OnDhtSystemJoined(object sender, EventArgs e)
318        {
319            if (OnSystemJoined != null)
320                OnSystemJoined();
321
322            _systemJoined.Set();
323            IsConnected = true;
324        }
325
326        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
327        {
328            if (OnSystemLeft != null)
329                OnSystemLeft();
330
331            IsConnected = false;
332            IsInitialized = false;
333
334            // Allow new connection to start and check for waiting / blocked tasks
335            // TODO reset running ConnectionWorkers?
336            _systemLeft.Set();
337            _systemJoined.Set();
338
339            LogToMonitor("CrypP2P left the system.");
340        }
341
342        #endregion
343
344        #region Synchronous Methods + their Callbacks
345
346        /// <summary>
347        ///   Stores a value in the DHT at the given key
348        /// </summary>
349        /// <param name = "key">Key of DHT Entry</param>
350        /// <param name = "data">Value of DHT Entry</param>
351        /// <returns>True, when storing is completed!</returns>
352        public bool SynchStore(string key, byte[] data)
353        {
354            var autoResetEvent = new AutoResetEvent(false);
355
356            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
357            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
358
359            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
360            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
361
362            // blocking till response
363            autoResetEvent.WaitOne();
364
365            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
366
367            return responseWait.success;
368        }
369
370        /// <summary>
371        ///   Stores a value in the DHT at the given key
372        /// </summary>
373        /// <param name = "key">Key of DHT Entry</param>
374        /// <param name = "value">Value of DHT Entry</param>
375        /// <returns>True, when storing is completed!</returns>
376        public bool SynchStore(string key, string value)
377        {
378            return SynchStore(key, Encoding.UTF8.GetBytes(value));
379        }
380
381        /// <summary>
382        ///   Callback for a the synchronized store method
383        /// </summary>
384        /// <param name = "storeResult">retrieved data container</param>
385        private static void OnSynchStoreCompleted(StoreResult storeResult)
386        {
387            var responseWait = storeResult.AsyncState as ResponseWait;
388            if (responseWait == null)
389            {
390                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
391                return;
392            }
393
394            responseWait.success = storeResult.Status == OperationStatus.Success;
395            responseWait.operationStatus = storeResult.Status;
396            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
397
398            // unblock WaitHandle in the synchronous method
399            responseWait.WaitHandle.Set();
400        }
401
402        /// <summary>
403        ///   Get the value of the given DHT Key or null, if it doesn't exist.
404        /// </summary>
405        /// <param name = "key">Key of DHT Entry</param>
406        /// <returns>Value of DHT Entry</returns>
407        public byte[] SynchRetrieve(string key)
408        {
409            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
410
411            var autoResetEvent = new AutoResetEvent(false);
412            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
413            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
414
415            // blocking till response
416            autoResetEvent.WaitOne();
417
418            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
419
420            return responseWait.Message;
421        }
422
423        /// <summary>
424        ///   Callback for a the synchronized retrieval method
425        /// </summary>
426        /// <param name = "retrieveResult"></param>
427        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
428        {
429            var responseWait = retrieveResult.AsyncState as ResponseWait;
430            if (responseWait == null)
431            {
432                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
433                return;
434            }
435
436            switch (retrieveResult.Status)
437            {
438                case OperationStatus.Success:
439                    responseWait.success = true;
440                    responseWait.Message = retrieveResult.Data;
441                    break;
442                case OperationStatus.KeyNotFound:
443                    responseWait.success = true;
444                    responseWait.Message = null;
445                    break;
446                default:
447                    responseWait.success = false;
448                    responseWait.Message = null;
449                    break;
450            }
451
452            responseWait.operationStatus = retrieveResult.Status;
453
454            // unblock WaitHandle in the synchronous method
455            responseWait.WaitHandle.Set();
456        }
457
458        /// <summary>
459        ///   Removes a key/value pair out of the DHT
460        /// </summary>
461        /// <param name = "key">Key of the DHT Entry</param>
462        /// <returns>True, when removing is completed!</returns>
463        public bool SynchRemove(string key)
464        {
465            LogToMonitor("Begin SynchRemove. Key: " + key);
466
467            var autoResetEvent = new AutoResetEvent(false);
468            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
469            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
470
471            // blocking till response
472            autoResetEvent.WaitOne();
473
474            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
475
476            return responseWait.success;
477        }
478
479        /// <summary>
480        ///   Callback for a the synchronized remove method
481        /// </summary>
482        /// <param name = "removeResult"></param>
483        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
484        {
485            var responseWait = removeResult.AsyncState as ResponseWait;
486            if (responseWait == null)
487            {
488                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
489                return;
490            }
491
492            responseWait.success = removeResult.Status != OperationStatus.Failure &&
493                                   removeResult.Status != OperationStatus.VersionMismatch;
494            responseWait.operationStatus = removeResult.Status;
495            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
496
497            // unblock WaitHandle in the synchronous method
498            responseWait.WaitHandle.Set();
499        }
500
501        #endregion
502
503        #region Log facility
504
505        /// <summary>
506        ///   To log the internal state in the Monitoring Software of P@play
507        /// </summary>
508        public void LogInternalState()
509        {
510            if (Dht != null)
511            {
512                Dht.LogInternalState();
513            }
514        }
515
516        private static void LogToMonitor(string sTextToLog)
517        {
518            if (P2PSettings.Default.Log2Monitor)
519                Log.Debug(sTextToLog);
520        }
521
522        #endregion
523    }
524}
Note: See TracBrowser for help on using the repository browser.