source: trunk/CrypP2P/Internal/P2PBase.cs @ 1665

Last change on this file since 1665 was 1665, checked in by Paul Lelgemann, 11 years ago

+ CrypP2P: Return types of synchronous methods Store/Retrieve/Remove changed
o Work on the distributed KeySearcher

File size: 19.0 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase;
22using Cryptool.Plugins.PeerToPeer.Internal;
23using Gears4Net;
24using PeersAtPlay;
25using PeersAtPlay.Monitoring;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2PLink.SnalNG;
28using PeersAtPlay.P2POverlay;
29using PeersAtPlay.P2POverlay.Bootstrapper;
30using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
31using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
32using PeersAtPlay.P2POverlay.FullMeshOverlay;
33using PeersAtPlay.P2PStorage.DHT;
34using PeersAtPlay.P2PStorage.FullMeshDHT;
35using PeersAtPlay.Util.Logging;
36
37/* TODO:
38 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
39 * - Testing asynchronous methods incl. EventHandlers
40 */
41
42namespace Cryptool.P2P.Internal
43{
44    /// <summary>
45    ///   Wrapper class to integrate peer@play environment into CrypTool.
46    ///   This class synchronizes asynchronous methods for easier usage in CT2.
47    /// </summary>
48    public class P2PBase
49    {
50        #region Variables
51
52        private readonly AutoResetEvent _systemJoined;
53        private readonly AutoResetEvent _systemLeft;
54        private IBootstrapper _bootstrapper;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        internal IDHT Dht;
58        internal IVersionedDHT VersionedDht;
59
60        /// <summary>
61        ///   True if system was successfully joined, false if system is COMPLETELY left
62        /// </summary>
63        public bool IsConnected { get; private set; }
64
65        /// <summary>
66        ///   True if the underlying peer to peer system has been fully initialized
67        /// </summary>
68        public bool IsInitialized { get; private set; }
69
70        #endregion
71
72        #region Delegates
73
74        public event P2PMessageReceived OnP2PMessageReceived;
75        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
76
77        public event SystemJoined OnSystemJoined;
78        public delegate void SystemJoined();
79
80        public event SystemLeft OnSystemLeft;
81        public delegate void SystemLeft();
82
83        #endregion
84
85        public P2PBase()
86        {
87            IsConnected = false;
88            IsInitialized = false;
89
90            _systemJoined = new AutoResetEvent(false);
91            _systemLeft = new AutoResetEvent(false);
92        }
93
94        #region Basic P2P Methods (Init, Start, Stop)
95
96        /// <summary>
97        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
98        /// </summary>
99        public void Initialize()
100        {
101            Scheduler scheduler = new STAScheduler("pap");
102
103            switch (P2PSettings.Default.LinkManager)
104            {
105                case P2PLinkManagerType.Snal:
106                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
107
108                    // NAT-Traversal stuff needs a different Snal-Version
109                    _linkmanager = new Snal(scheduler);
110
111                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
112                    settings.LoadDefaults();
113                    settings.ConnectInternal = true;
114                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
115                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119                    settings.CloseConnectionAfterPingTimeout = false;
120
121                    settings.MaximumMessageSize = 10485760;
122                    settings.IgnoreMaximumMessageSize = true;
123                       
124                    switch(P2PSettings.Default.TransportProtocol)
125                    {
126                        case P2PTransportProtocol.UDP:
127                            settings.TransportProtocol = TransportProtocol.UDP;
128                            break;
129                        case P2PTransportProtocol.TCP_UDP:
130                            settings.TransportProtocol = TransportProtocol.TCP_UDP;
131                            break;
132                        default:
133                            settings.TransportProtocol = TransportProtocol.TCP;
134                            break;
135                    }
136
137                    _linkmanager.Settings = settings;
138                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
139
140                    break;
141                default:
142                    throw (new NotImplementedException());
143            }
144
145            switch (P2PSettings.Default.Bootstrapper)
146            {
147                case P2PBootstrapperType.LocalMachineBootstrapper:
148                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
149                    _bootstrapper = new LocalMachineBootstrapper();
150                    break;
151                case P2PBootstrapperType.IrcBootstrapper:
152                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
153                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
154                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
155
156                    _bootstrapper = new IrcBootstrapper(scheduler);
157                    break;
158                default:
159                    throw (new NotImplementedException());
160            }
161
162            switch (P2PSettings.Default.Overlay)
163            {
164                case P2POverlayType.FullMeshOverlay:
165                    // changing overlay example: this.overlay = new ChordOverlay();
166                    _overlay = new FullMeshOverlay(scheduler);
167                    break;
168                default:
169                    throw (new NotImplementedException());
170            }
171
172            switch (P2PSettings.Default.Dht)
173            {
174                case P2PDHTType.FullMeshDHT:
175                    Dht = new FullMeshDHT(scheduler);
176                    break;
177                default:
178                    throw (new NotImplementedException());
179            }
180
181            _overlay.MessageReceived += OverlayMessageReceived;
182            Dht.SystemJoined += OnDhtSystemJoined;
183            Dht.SystemLeft += OnDhtSystemLeft;
184
185            VersionedDht = (IVersionedDHT) Dht;
186
187            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
188                                                NotificationLevel.Info);
189            Dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
190                            _bootstrapper,
191                            _linkmanager, null);
192
193            IsInitialized = true;
194        }
195
196        /// <summary>
197        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
198        ///   inclusive creating the and bootstrapping to the P2P network.
199        ///   In either case joining the P2P world.
200        ///   This synchronized method returns true not before the peer has
201        ///   successfully joined the network (this may take one or two minutes).
202        /// </summary>
203        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
204        /// After validating the settings, this can be done by calling Initialize().</exception>
205        /// <returns>True, if the peer has completely joined the p2p network</returns>
206        public bool SynchStart()
207        {
208            if (!IsInitialized)
209            {
210                throw new InvalidOperationException("Peer-to-peer is not initialized.");
211            }
212
213            if (IsConnected)
214            {
215                return true;
216            }
217
218            Dht.BeginStart(BeginStartEventHandler);
219
220            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
221            _systemJoined.WaitOne();
222            P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
223
224            return true;
225        }
226
227        private void BeginStartEventHandler(DHTEventArgs eventArgs)
228        {
229            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
230        }
231
232        /// <summary>
233        ///   Disconnects from the peer-to-peer system.
234        /// </summary>
235        /// <returns>True, if the peer has completely left the p2p network</returns>
236        public bool SynchStop()
237        {
238            if (Dht == null) return false;
239
240            Dht.BeginStop(null);
241
242            if (!IsConnected)
243            {
244                return true;
245            }
246
247            // wait till systemLeft Event is invoked
248            _systemLeft.WaitOne();
249
250            return true;
251        }
252
253        #endregion
254
255        #region Peer related method (GetPeerId, Send message to peer)
256
257        /// <summary>
258        ///   Get PeerName of the actual peer
259        /// </summary>
260        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
261        /// <returns>PeerID as a String</returns>
262        public PeerId GetPeerId(out string sPeerName)
263        {
264            sPeerName = _linkmanager.UserName;
265            return new PeerId(_overlay.LocalAddress);
266        }
267
268        /// <summary>
269        ///   Construct PeerId object for a specific byte[] id
270        /// </summary>
271        /// <param name = "byteId">overlay address as byte array</param>
272        /// <returns>corresponding PeerId for given byte[] id</returns>
273        public PeerId GetPeerId(byte[] byteId)
274        {
275            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
276            return new PeerId(_overlay.GetAddress(byteId));
277        }
278
279        // overlay.LocalAddress = Overlay-Peer-Address/Names
280        public void SendToPeer(byte[] data, byte[] destinationPeer)
281        {
282            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
283            var realStackSize = _overlay.GetHeaderSize() + data.Length;
284
285            var stackData = new ByteStack(realStackSize);
286            stackData.Push(data);
287
288            var destinationAddr = _overlay.GetAddress(destinationPeer);
289            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
290                                                _overlay.LocalAddress, destinationAddr, stackData);
291
292            _overlay.Send(overlayMsg);
293        }
294
295        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
296        {
297            if (OnP2PMessageReceived == null) return;
298
299            var pid = new PeerId(e.Message.Source);
300            /* You have to fire this event asynchronous, because the main
301                 * thread will be stopped in this wrapper class for synchronizing
302                 * the asynchronous stuff (AutoResetEvent) --> so this could run
303                 * into a deadlock, when you fire this event synchronous (normal Invoke)
304                 * ATTENTION: This could change the invocation order!!! In my case
305                              no problem, but maybe in future cases... */
306
307            // TODO: not safe: The delegate must have only one target
308            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
309
310            foreach (var del in OnP2PMessageReceived.GetInvocationList())
311            {
312                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
313            }
314        }
315
316        #endregion
317
318        #region Event Handling (System Joined, Left and Message Received)
319
320        private void OnDhtSystemJoined(object sender, EventArgs e)
321        {
322            if (OnSystemJoined != null)
323                OnSystemJoined();
324
325            _systemJoined.Set();
326            IsConnected = true;
327        }
328
329        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
330        {
331            if (OnSystemLeft != null)
332                OnSystemLeft();
333
334            IsConnected = false;
335            IsInitialized = false;
336
337            // Allow new connection to start and check for waiting / blocked tasks
338            // TODO reset running ConnectionWorkers?
339            _systemLeft.Set();
340            _systemJoined.Set();
341
342            LogToMonitor("CrypP2P left the system.");
343        }
344
345        #endregion
346
347        #region Synchronous Methods + their Callbacks
348
349        /// <summary>
350        ///   Stores a value in the DHT at the given key
351        /// </summary>
352        /// <param name = "key">Key of DHT Entry</param>
353        /// <param name = "value">Value of DHT Entry</param>
354        /// <returns>True, when storing is completed!</returns>
355        public RequestResult SynchStore(string key, string value)
356        {
357            return SynchStore(key, Encoding.UTF8.GetBytes(value));
358        }
359
360        /// <summary>
361        ///   Stores a value in the DHT at the given key
362        /// </summary>
363        /// <param name = "key">Key of DHT Entry</param>
364        /// <param name = "data">Value of DHT Entry</param>
365        /// <returns>True, when storing is completed!</returns>
366        public RequestResult SynchStore(string key, byte[] data)
367        {
368            var autoResetEvent = new AutoResetEvent(false);
369
370            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
371            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
372
373            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
374            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
375
376            // blocking till response
377            autoResetEvent.WaitOne();
378
379            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
380
381            return requestResult;
382        }
383
384        /// <summary>
385        ///   Callback for a the synchronized store method
386        /// </summary>
387        /// <param name = "storeResult">retrieved data container</param>
388        private static void OnSynchStoreCompleted(StoreResult storeResult)
389        {
390            var requestResult = storeResult.AsyncState as RequestResult;
391            if (requestResult == null)
392            {
393                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
394                return;
395            }
396
397            requestResult.Parse(storeResult);
398
399            // unblock WaitHandle in the synchronous method
400            requestResult.WaitHandle.Set();
401        }
402
403        /// <summary>
404        ///   Get the value of the given DHT Key or null, if it doesn't exist.
405        /// </summary>
406        /// <param name = "key">Key of DHT Entry</param>
407        /// <returns>Value of DHT Entry</returns>
408        public RequestResult SynchRetrieve(string key)
409        {
410            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
411
412            var autoResetEvent = new AutoResetEvent(false);
413            var requestResult = new RequestResult() {WaitHandle = autoResetEvent, Key = key};
414            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
415
416            // blocking till response
417            autoResetEvent.WaitOne();
418
419            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
420
421            return requestResult;
422        }
423
424        /// <summary>
425        ///   Callback for a the synchronized retrieval method
426        /// </summary>
427        /// <param name = "retrieveResult"></param>
428        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
429        {
430            var requestResult = retrieveResult.AsyncState as RequestResult;
431            if (requestResult == null)
432            {
433                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
434                return;
435            }
436
437            requestResult.Parse(retrieveResult);
438
439            // unblock WaitHandle in the synchronous method
440            requestResult.WaitHandle.Set();
441        }
442
443        /// <summary>
444        ///   Removes a key/value pair out of the DHT
445        /// </summary>
446        /// <param name = "key">Key of the DHT Entry</param>
447        /// <returns>True, when removing is completed!</returns>
448        public RequestResult SynchRemove(string key)
449        {
450            LogToMonitor("Begin SynchRemove. Key: " + key);
451
452            var autoResetEvent = new AutoResetEvent(false);
453            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
454            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
455
456            // blocking till response
457            autoResetEvent.WaitOne();
458
459            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
460
461            return requestResult;
462        }
463
464        /// <summary>
465        ///   Callback for a the synchronized remove method
466        /// </summary>
467        /// <param name = "removeResult"></param>
468        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
469        {
470            var requestResult = removeResult.AsyncState as RequestResult;
471            if (requestResult == null)
472            {
473                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
474                return;
475            }
476
477            requestResult.Parse(removeResult);
478
479            // unblock WaitHandle in the synchronous method
480            requestResult.WaitHandle.Set();
481        }
482
483        #endregion
484
485        #region Log facility
486
487        /// <summary>
488        ///   To log the internal state in the Monitoring Software of P@play
489        /// </summary>
490        public void LogInternalState()
491        {
492            if (Dht != null)
493            {
494                Dht.LogInternalState();
495            }
496        }
497
498        private static void LogToMonitor(string sTextToLog)
499        {
500            if (P2PSettings.Default.Log2Monitor)
501                Log.Debug(sTextToLog);
502        }
503
504        #endregion
505    }
506}
Note: See TracBrowser for help on using the repository browser.