source: trunk/CrypP2P/Internal/P2PBase.cs @ 1549

Last change on this file since 1549 was 1549, checked in by Paul Lelgemann, 12 years ago

o P2PEditor: L&F improvements for the listbox containing distributed jobs
o Changed DistributedJob serialization strategy to built-in

File size: 18.7 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Text;
19using System.Threading;
20using Cryptool.Plugins.PeerToPeer.Internal;
21using Gears4Net;
22using PeersAtPlay;
23using PeersAtPlay.Monitoring;
24using PeersAtPlay.P2PLink;
25using PeersAtPlay.P2PLink.SnalNG;
26using PeersAtPlay.P2POverlay;
27using PeersAtPlay.P2POverlay.Bootstrapper;
28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
29using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
30using PeersAtPlay.P2POverlay.FullMeshOverlay;
31using PeersAtPlay.P2PStorage.DHT;
32using PeersAtPlay.P2PStorage.FullMeshDHT;
33using PeersAtPlay.Util.Logging;
34using Settings = PeersAtPlay.P2PLink.SnalNG.Settings;
35
36/* TODO:
37 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
38 * - Testing asynchronous methods incl. EventHandlers
39 */
40
41namespace Cryptool.P2P.Internal
42{
43    /// <summary>
44    ///   Wrapper class to integrate peer@play environment into CrypTool.
45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
46    /// </summary>
47    public class P2PBase
48    {
49        #region Variables
50
51        private readonly AutoResetEvent _systemJoined;
52        private readonly AutoResetEvent _systemLeft;
53        private IBootstrapper _bootstrapper;
54        private IDHT _dht;
55        private IP2PLinkManager _linkmanager;
56        private P2POverlay _overlay;
57        private IVersionedDHT _versionedDht;
58
59        /// <summary>
60        ///   True if system was successfully joined, false if system is COMPLETELY left
61        /// </summary>
62        public bool Started { get; private set; }
63
64        /// <summary>
65        ///   True if the underlying peer to peer system has been fully initialized
66        /// </summary>
67        public bool Initialized { get; private set; }
68
69        #endregion
70
71        #region Delegates
72
73        public event P2PMessageReceived OnP2PMessageReceived;
74        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
75
76        public event SystemJoined OnSystemJoined;
77        public delegate void SystemJoined();
78
79        public event SystemLeft OnSystemLeft;
80        public delegate void SystemLeft();
81
82        #endregion
83
84        public P2PBase()
85        {
86            Started = false;
87            Initialized = false;
88
89            _systemJoined = new AutoResetEvent(false);
90            _systemLeft = new AutoResetEvent(false);
91        }
92
93        #region Basic P2P Methods (Init, Start, Stop)
94
95        /// <summary>
96        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
97        /// </summary>
98        public void Initialize()
99        {
100            Scheduler scheduler = new STAScheduler("pap");
101
102            switch (P2PSettings.Default.LinkManager)
103            {
104                case P2PLinkManagerType.Snal:
105                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
106
107                    // NAT-Traversal stuff needs a different Snal-Version
108                    _linkmanager = new Snal(scheduler);
109
110                    var settings = new Settings();
111                    settings.LoadDefaults();
112                    settings.ConnectInternal = true;
113                    settings.LocalReceivingPort = 0;
114                    settings.UseLocalAddressDetection = false;
115                    settings.AutoReconnect = false;
116                    settings.NoDelay = false;
117                    settings.ReuseAddress = false;
118                    settings.UseNetworkMonitorServer = true;
119
120                    _linkmanager.Settings = settings;
121                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
122
123                    break;
124                default:
125                    throw (new NotImplementedException());
126            }
127
128            switch (P2PSettings.Default.Bootstrapper)
129            {
130                case P2PBootstrapperType.LocalMachineBootstrapper:
131                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
132                    _bootstrapper = new LocalMachineBootstrapper();
133                    break;
134                case P2PBootstrapperType.IrcBootstrapper:
135                    // setup nat traversal stuff
136                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
137                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
138                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
139                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
140
141                    _bootstrapper = new IrcBootstrapper(scheduler);
142                    break;
143                default:
144                    throw (new NotImplementedException());
145            }
146
147            switch (P2PSettings.Default.Overlay)
148            {
149                case P2POverlayType.FullMeshOverlay:
150                    // changing overlay example: this.overlay = new ChordOverlay();
151                    _overlay = new FullMeshOverlay(scheduler);
152                    break;
153                default:
154                    throw (new NotImplementedException());
155            }
156
157            switch (P2PSettings.Default.Dht)
158            {
159                case P2PDHTType.FullMeshDHT:
160                    _dht = new FullMeshDHT(scheduler);
161                    break;
162                default:
163                    throw (new NotImplementedException());
164            }
165
166            _overlay.MessageReceived += OverlayMessageReceived;
167            _dht.SystemJoined += OnDhtSystemJoined;
168            _dht.SystemLeft += OnDhtSystemLeft;
169
170            _versionedDht = (IVersionedDHT) _dht;
171
172            _dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay, 
173                            _bootstrapper, 
174                            _linkmanager, null);
175
176            Initialized = true;
177        }
178
179        /// <summary>
180        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
181        ///   inclusive creating the and bootstrapping to the P2P network.
182        ///   In either case joining the P2P world.
183        ///   This synchronized method returns true not before the peer has
184        ///   successfully joined the network (this may take one or two minutes).
185        /// </summary>
186        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
187        /// After validating the settings, this can be done by calling Initialize().</exception>
188        /// <returns>True, if the peer has completely joined the p2p network</returns>
189        public bool SynchStart()
190        {
191            if (!Initialized)
192            {
193                throw new InvalidOperationException("Peer-to-peer is not initialized.");
194            }
195
196            if (Started)
197            {
198                return true;
199            }
200
201            _dht.BeginStart(null);
202
203            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
204            _systemJoined.WaitOne();
205
206            return true;
207        }
208
209        /// <summary>
210        ///   Disconnects from the peer-to-peer system.
211        /// </summary>
212        /// <returns>True, if the peer has completely left the p2p network</returns>
213        public bool SynchStop()
214        {
215            if (_dht == null) return false;
216
217            _dht.BeginStop(null);
218
219            // wait till systemLeft Event is invoked
220            _systemLeft.WaitOne();
221
222            return true;
223        }
224
225        #endregion
226
227        #region Peer related method (GetPeerId, Send message to peer)
228
229        /// <summary>
230        ///   Get PeerName of the actual peer
231        /// </summary>
232        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
233        /// <returns>PeerID as a String</returns>
234        public PeerId GetPeerId(out string sPeerName)
235        {
236            sPeerName = _linkmanager.UserName;
237            return new PeerId(_overlay.LocalAddress);
238        }
239
240        /// <summary>
241        ///   Construct PeerId object for a specific byte[] id
242        /// </summary>
243        /// <param name = "byteId">overlay address as byte array</param>
244        /// <returns>corresponding PeerId for given byte[] id</returns>
245        public PeerId GetPeerId(byte[] byteId)
246        {
247            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
248            return new PeerId(_overlay.GetAddress(byteId));
249        }
250
251        // overlay.LocalAddress = Overlay-Peer-Address/Names
252        public void SendToPeer(byte[] data, byte[] destinationPeer)
253        {
254            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
255            var realStackSize = _overlay.GetHeaderSize() + data.Length;
256
257            var stackData = new ByteStack(realStackSize);
258            stackData.Push(data);
259
260            var destinationAddr = _overlay.GetAddress(destinationPeer);
261            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
262                                                _overlay.LocalAddress, destinationAddr, stackData);
263
264            _overlay.Send(overlayMsg);
265        }
266
267        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
268        {
269            if (OnP2PMessageReceived == null) return;
270
271            var pid = new PeerId(e.Message.Source);
272            /* You have to fire this event asynchronous, because the main
273                 * thread will be stopped in this wrapper class for synchronizing
274                 * the asynchronous stuff (AutoResetEvent) --> so this could run
275                 * into a deadlock, when you fire this event synchronous (normal Invoke)
276                 * ATTENTION: This could change the invocation order!!! In my case
277                              no problem, but maybe in future cases... */
278
279            // TODO: not safe: The delegate must have only one target
280            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
281
282            foreach (var del in OnP2PMessageReceived.GetInvocationList())
283            {
284                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
285            }
286        }
287
288        #endregion
289
290        #region Event Handling (System Joined, Left and Message Received)
291
292        private void OnDhtSystemJoined(object sender, EventArgs e)
293        {
294            if (OnSystemJoined != null)
295                OnSystemJoined();
296
297            _systemJoined.Set();
298            Started = true;
299        }
300
301        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
302        {
303            if (OnSystemLeft != null)
304                OnSystemLeft();
305
306            Started = false;
307            Initialized = false;
308
309            // Allow new connection to start and check for waiting / blocked tasks
310            P2PManager.Instance.IsP2PConnecting = false;
311            _systemLeft.Set();
312            _systemJoined.Set();
313
314            LogToMonitor("CrypP2P left the system.");
315        }
316
317        #endregion
318
319        #region Synchronous Methods + their Callbacks
320
321        /// <summary>
322        ///   Stores a value in the DHT at the given key
323        /// </summary>
324        /// <param name = "key">Key of DHT Entry</param>
325        /// <param name = "data">Value of DHT Entry</param>
326        /// <returns>True, when storing is completed!</returns>
327        public bool SynchStore(string key, byte[] data)
328        {
329            var autoResetEvent = new AutoResetEvent(false);
330
331            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
332            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
333
334            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
335            _versionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
336
337            // blocking till response
338            autoResetEvent.WaitOne();
339
340            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
341
342            return responseWait.success;
343        }
344
345        /// <summary>
346        ///   Stores a value in the DHT at the given key
347        /// </summary>
348        /// <param name = "key">Key of DHT Entry</param>
349        /// <param name = "value">Value of DHT Entry</param>
350        /// <returns>True, when storing is completed!</returns>
351        public bool SynchStore(string key, string value)
352        {
353            return SynchStore(key, Encoding.UTF8.GetBytes(value));
354        }
355
356        /// <summary>
357        ///   Callback for a the synchronized store method
358        /// </summary>
359        /// <param name = "storeResult">retrieved data container</param>
360        private static void OnSynchStoreCompleted(StoreResult storeResult)
361        {
362            var responseWait = storeResult.AsyncState as ResponseWait;
363            if (responseWait == null)
364            {
365                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
366                return;
367            }
368
369            responseWait.success = storeResult.Status != OperationStatus.KeyNotFound;
370            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
371
372            // unblock WaitHandle in the synchronous method
373            responseWait.WaitHandle.Set();
374
375            LogToMonitor("Received and handled OnSynchStoreCompleted.");
376        }
377
378        /// <summary>
379        ///   Get the value of the given DHT Key or null, if it doesn't exist.
380        /// </summary>
381        /// <param name = "key">Key of DHT Entry</param>
382        /// <returns>Value of DHT Entry</returns>
383        public byte[] SynchRetrieve(string key)
384        {
385            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
386
387            var autoResetEvent = new AutoResetEvent(false);
388            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
389            _dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
390
391            // blocking till response
392            autoResetEvent.WaitOne();
393
394            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
395
396            return responseWait.Message;
397        }
398
399        /// <summary>
400        ///   Callback for a the synchronized retrieval method
401        /// </summary>
402        /// <param name = "retrieveResult"></param>
403        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
404        {
405            var responseWait = retrieveResult.AsyncState as ResponseWait;
406            if (responseWait == null)
407            {
408                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
409                return;
410            }
411
412            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
413
414            switch (retrieveResult.Status)
415            {
416                case OperationStatus.Success:
417                    responseWait.success = true;
418                    responseWait.Message = retrieveResult.Data;
419                    break;
420                case OperationStatus.KeyNotFound:
421                    responseWait.success = true;
422                    responseWait.Message = null;
423                    break;
424                default:
425                    responseWait.success = false;
426                    responseWait.Message = null;
427                    break;
428            }
429
430            // unblock WaitHandle in the synchronous method
431            responseWait.WaitHandle.Set();
432
433            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
434        }
435
436        /// <summary>
437        ///   Removes a key/value pair out of the DHT
438        /// </summary>
439        /// <param name = "key">Key of the DHT Entry</param>
440        /// <returns>True, when removing is completed!</returns>
441        public bool SynchRemove(string key)
442        {
443            LogToMonitor("Begin SynchRemove. Key: " + key);
444
445            var autoResetEvent = new AutoResetEvent(false);
446            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
447            _versionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
448
449            // blocking till response
450            autoResetEvent.WaitOne();
451
452            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
453
454            return responseWait.success;
455        }
456
457        /// <summary>
458        ///   Callback for a the synchronized remove method
459        /// </summary>
460        /// <param name = "removeResult"></param>
461        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
462        {
463            var responseWait = removeResult.AsyncState as ResponseWait;
464            if (responseWait == null)
465            {
466                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
467                return;
468            }
469
470            responseWait.success = removeResult.Status == OperationStatus.Success;
471            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
472
473            // unblock WaitHandle in the synchronous method
474            responseWait.WaitHandle.Set();
475
476            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
477        }
478
479        #endregion
480
481        #region Log facility
482
483        /// <summary>
484        ///   To log the internal state in the Monitoring Software of P@play
485        /// </summary>
486        public void LogInternalState()
487        {
488            if (_dht != null)
489            {
490                _dht.LogInternalState();
491            }
492        }
493
494        private static void LogToMonitor(string sTextToLog)
495        {
496            if (P2PSettings.Default.Log2Monitor)
497                Log.Debug(sTextToLog);
498        }
499
500        #endregion
501    }
502}
Note: See TracBrowser for help on using the repository browser.