source: trunk/CrypP2P/Internal/P2PBase.cs @ 1634

Last change on this file since 1634 was 1634, checked in by Paul Lelgemann, 11 years ago

o Fixed KeySearcher progress display
+ Work on the distributed KeySearcher part

File size: 20.3 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36
37/* TODO:
38 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
39 * - Testing asynchronous methods incl. EventHandlers
40 */
41
42namespace Cryptool.P2P.Internal
43{
44    /// <summary>
45    ///   Wrapper class to integrate peer@play environment into CrypTool.
46    ///   This class synchronizes asynchronous methods for easier usage in CT2.
47    /// </summary>
48    public class P2PBase
49    {
50        #region Variables
51
52        private readonly AutoResetEvent _systemJoined;
53        private readonly AutoResetEvent _systemLeft;
54        private IBootstrapper _bootstrapper;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        internal IDHT Dht;
58        internal IVersionedDHT VersionedDht;
59
60        /// <summary>
61        ///   True if system was successfully joined, false if system is COMPLETELY left
62        /// </summary>
63        public bool IsConnected { get; private set; }
64
65        /// <summary>
66        ///   True if the underlying peer to peer system has been fully initialized
67        /// </summary>
68        public bool IsInitialized { get; private set; }
69
70        #endregion
71
72        #region Delegates
73
74        public event P2PMessageReceived OnP2PMessageReceived;
75        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
76
77        public event SystemJoined OnSystemJoined;
78        public delegate void SystemJoined();
79
80        public event SystemLeft OnSystemLeft;
81        public delegate void SystemLeft();
82
83        #endregion
84
85        public P2PBase()
86        {
87            IsConnected = false;
88            IsInitialized = false;
89
90            _systemJoined = new AutoResetEvent(false);
91            _systemLeft = new AutoResetEvent(false);
92        }
93
94        #region Basic P2P Methods (Init, Start, Stop)
95
96        /// <summary>
97        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
98        /// </summary>
99        public void Initialize()
100        {
101            Scheduler scheduler = new STAScheduler("pap");
102
103            switch (P2PSettings.Default.LinkManager)
104            {
105                case P2PLinkManagerType.Snal:
106                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
107
108                    // NAT-Traversal stuff needs a different Snal-Version
109                    _linkmanager = new Snal(scheduler);
110
111                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
112                    settings.LoadDefaults();
113                    settings.ConnectInternal = true;
114                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
115                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
116                    settings.AutoReconnect = false;
117                    settings.NoDelay = false;
118                    settings.ReuseAddress = false;
119                    settings.UseNetworkMonitorServer = true;
120                    settings.CloseConnectionAfterPingTimeout = false;
121                       
122                    switch(P2PSettings.Default.TransportProtocol)
123                    {
124                        case P2PTransportProtocol.UDP:
125                            settings.TransportProtocol = TransportProtocol.UDP;
126                            break;
127                        case P2PTransportProtocol.TCP_UDP:
128                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
129                            break;
130                        default:
131                            settings.TransportProtocol = TransportProtocol.TCP;
132                            break;
133                    }
134
135                    _linkmanager.Settings = settings;
136                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
137
138                    break;
139                default:
140                    throw (new NotImplementedException());
141            }
142
143            switch (P2PSettings.Default.Bootstrapper)
144            {
145                case P2PBootstrapperType.LocalMachineBootstrapper:
146                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
147                    _bootstrapper = new LocalMachineBootstrapper();
148                    break;
149                case P2PBootstrapperType.IrcBootstrapper:
150                    // setup nat traversal stuff
151                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
152                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
153                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
154                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
155
156                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
157                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
158
159                    _bootstrapper = new IrcBootstrapper(scheduler);
160                    break;
161                default:
162                    throw (new NotImplementedException());
163            }
164
165            switch (P2PSettings.Default.Overlay)
166            {
167                case P2POverlayType.FullMeshOverlay:
168                    // changing overlay example: this.overlay = new ChordOverlay();
169                    _overlay = new FullMeshOverlay(scheduler);
170                    break;
171                default:
172                    throw (new NotImplementedException());
173            }
174
175            switch (P2PSettings.Default.Dht)
176            {
177                case P2PDHTType.FullMeshDHT:
178                    Dht = new FullMeshDHT(scheduler);
179                    break;
180                default:
181                    throw (new NotImplementedException());
182            }
183
184            _overlay.MessageReceived += OverlayMessageReceived;
185            Dht.SystemJoined += OnDhtSystemJoined;
186            Dht.SystemLeft += OnDhtSystemLeft;
187
188            VersionedDht = (IVersionedDHT) Dht;
189
190            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
191                                                NotificationLevel.Info);
192            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
193                            _bootstrapper,
194                            _linkmanager, null);
195
196            IsInitialized = true;
197        }
198
199        /// <summary>
200        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
201        ///   inclusive creating the and bootstrapping to the P2P network.
202        ///   In either case joining the P2P world.
203        ///   This synchronized method returns true not before the peer has
204        ///   successfully joined the network (this may take one or two minutes).
205        /// </summary>
206        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
207        /// After validating the settings, this can be done by calling Initialize().</exception>
208        /// <returns>True, if the peer has completely joined the p2p network</returns>
209        public bool SynchStart()
210        {
211            if (!IsInitialized)
212            {
213                throw new InvalidOperationException("Peer-to-peer is not initialized.");
214            }
215
216            if (IsConnected)
217            {
218                return true;
219            }
220
221            Dht.BeginStart(BeginStartEventHandler);
222
223            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
224            _systemJoined.WaitOne();
225            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
226
227            return true;
228        }
229
230        private void BeginStartEventHandler(DHTEventArgs eventArgs)
231        {
232            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
233        }
234
235        /// <summary>
236        ///   Disconnects from the peer-to-peer system.
237        /// </summary>
238        /// <returns>True, if the peer has completely left the p2p network</returns>
239        public bool SynchStop()
240        {
241            if (Dht == null) return false;
242
243            Dht.BeginStop(null);
244
245            if (!IsConnected)
246            {
247                return true;
248            }
249
250            // wait till systemLeft Event is invoked
251            _systemLeft.WaitOne();
252
253            return true;
254        }
255
256        #endregion
257
258        #region Peer related method (GetPeerId, Send message to peer)
259
260        /// <summary>
261        ///   Get PeerName of the actual peer
262        /// </summary>
263        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
264        /// <returns>PeerID as a String</returns>
265        public PeerId GetPeerId(out string sPeerName)
266        {
267            sPeerName = _linkmanager.UserName;
268            return new PeerId(_overlay.LocalAddress);
269        }
270
271        /// <summary>
272        ///   Construct PeerId object for a specific byte[] id
273        /// </summary>
274        /// <param name = "byteId">overlay address as byte array</param>
275        /// <returns>corresponding PeerId for given byte[] id</returns>
276        public PeerId GetPeerId(byte[] byteId)
277        {
278            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
279            return new PeerId(_overlay.GetAddress(byteId));
280        }
281
282        // overlay.LocalAddress = Overlay-Peer-Address/Names
283        public void SendToPeer(byte[] data, byte[] destinationPeer)
284        {
285            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
286            var realStackSize = _overlay.GetHeaderSize() + data.Length;
287
288            var stackData = new ByteStack(realStackSize);
289            stackData.Push(data);
290
291            var destinationAddr = _overlay.GetAddress(destinationPeer);
292            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
293                                                _overlay.LocalAddress, destinationAddr, stackData);
294
295            _overlay.Send(overlayMsg);
296        }
297
298        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
299        {
300            if (OnP2PMessageReceived == null) return;
301
302            var pid = new PeerId(e.Message.Source);
303            /* You have to fire this event asynchronous, because the main
304                 * thread will be stopped in this wrapper class for synchronizing
305                 * the asynchronous stuff (AutoResetEvent) --> so this could run
306                 * into a deadlock, when you fire this event synchronous (normal Invoke)
307                 * ATTENTION: This could change the invocation order!!! In my case
308                              no problem, but maybe in future cases... */
309
310            // TODO: not safe: The delegate must have only one target
311            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
312
313            foreach (var del in OnP2PMessageReceived.GetInvocationList())
314            {
315                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
316            }
317        }
318
319        #endregion
320
321        #region Event Handling (System Joined, Left and Message Received)
322
323        private void OnDhtSystemJoined(object sender, EventArgs e)
324        {
325            if (OnSystemJoined != null)
326                OnSystemJoined();
327
328            _systemJoined.Set();
329            IsConnected = true;
330        }
331
332        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
333        {
334            if (OnSystemLeft != null)
335                OnSystemLeft();
336
337            IsConnected = false;
338            IsInitialized = false;
339
340            // Allow new connection to start and check for waiting / blocked tasks
341            // TODO reset running ConnectionWorkers?
342            _systemLeft.Set();
343            _systemJoined.Set();
344
345            LogToMonitor("CrypP2P left the system.");
346        }
347
348        #endregion
349
350        #region Synchronous Methods + their Callbacks
351
352        /// <summary>
353        ///   Stores a value in the DHT at the given key
354        /// </summary>
355        /// <param name = "key">Key of DHT Entry</param>
356        /// <param name = "data">Value of DHT Entry</param>
357        /// <returns>True, when storing is completed!</returns>
358        public bool SynchStore(string key, byte[] data)
359        {
360            var autoResetEvent = new AutoResetEvent(false);
361
362            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
363            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
364
365            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
366            VersionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
367
368            // blocking till response
369            autoResetEvent.WaitOne();
370
371            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
372
373            return responseWait.success;
374        }
375
376        /// <summary>
377        ///   Stores a value in the DHT at the given key
378        /// </summary>
379        /// <param name = "key">Key of DHT Entry</param>
380        /// <param name = "value">Value of DHT Entry</param>
381        /// <returns>True, when storing is completed!</returns>
382        public bool SynchStore(string key, string value)
383        {
384            return SynchStore(key, Encoding.UTF8.GetBytes(value));
385        }
386
387        /// <summary>
388        ///   Callback for a the synchronized store method
389        /// </summary>
390        /// <param name = "storeResult">retrieved data container</param>
391        private static void OnSynchStoreCompleted(StoreResult storeResult)
392        {
393            var responseWait = storeResult.AsyncState as ResponseWait;
394            if (responseWait == null)
395            {
396                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
397                return;
398            }
399
400            responseWait.success = storeResult.Status == OperationStatus.Success;
401            responseWait.operationStatus = storeResult.Status;
402            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
403
404            // unblock WaitHandle in the synchronous method
405            responseWait.WaitHandle.Set();
406        }
407
408        /// <summary>
409        ///   Get the value of the given DHT Key or null, if it doesn't exist.
410        /// </summary>
411        /// <param name = "key">Key of DHT Entry</param>
412        /// <returns>Value of DHT Entry</returns>
413        public byte[] SynchRetrieve(string key)
414        {
415            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
416
417            var autoResetEvent = new AutoResetEvent(false);
418            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
419            Dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
420
421            // blocking till response
422            autoResetEvent.WaitOne();
423
424            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
425
426            return responseWait.Message;
427        }
428
429        /// <summary>
430        ///   Callback for a the synchronized retrieval method
431        /// </summary>
432        /// <param name = "retrieveResult"></param>
433        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
434        {
435            var responseWait = retrieveResult.AsyncState as ResponseWait;
436            if (responseWait == null)
437            {
438                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
439                return;
440            }
441
442            switch (retrieveResult.Status)
443            {
444                case OperationStatus.Success:
445                    responseWait.success = true;
446                    responseWait.Message = retrieveResult.Data;
447                    break;
448                case OperationStatus.KeyNotFound:
449                    responseWait.success = true;
450                    responseWait.Message = null;
451                    break;
452                default:
453                    responseWait.success = false;
454                    responseWait.Message = null;
455                    break;
456            }
457
458            responseWait.operationStatus = retrieveResult.Status;
459
460            // unblock WaitHandle in the synchronous method
461            responseWait.WaitHandle.Set();
462        }
463
464        /// <summary>
465        ///   Removes a key/value pair out of the DHT
466        /// </summary>
467        /// <param name = "key">Key of the DHT Entry</param>
468        /// <returns>True, when removing is completed!</returns>
469        public bool SynchRemove(string key)
470        {
471            LogToMonitor("Begin SynchRemove. Key: " + key);
472
473            var autoResetEvent = new AutoResetEvent(false);
474            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
475            VersionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
476
477            // blocking till response
478            autoResetEvent.WaitOne();
479
480            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
481
482            return responseWait.success;
483        }
484
485        /// <summary>
486        ///   Callback for a the synchronized remove method
487        /// </summary>
488        /// <param name = "removeResult"></param>
489        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
490        {
491            var responseWait = removeResult.AsyncState as ResponseWait;
492            if (responseWait == null)
493            {
494                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
495                return;
496            }
497
498            responseWait.success = removeResult.Status != OperationStatus.Failure &&
499                                   removeResult.Status != OperationStatus.VersionMismatch;
500            responseWait.operationStatus = removeResult.Status;
501            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
502
503            // unblock WaitHandle in the synchronous method
504            responseWait.WaitHandle.Set();
505        }
506
507        #endregion
508
509        #region Log facility
510
511        /// <summary>
512        ///   To log the internal state in the Monitoring Software of P@play
513        /// </summary>
514        public void LogInternalState()
515        {
516            if (Dht != null)
517            {
518                Dht.LogInternalState();
519            }
520        }
521
522        private static void LogToMonitor(string sTextToLog)
523        {
524            if (P2PSettings.Default.Log2Monitor)
525                Log.Debug(sTextToLog);
526        }
527
528        #endregion
529    }
530}
Note: See TracBrowser for help on using the repository browser.