source: trunk/CrypPlugins/PeerToPeerBase/P2PBase.cs @ 1597

Last change on this file since 1597 was 1433, checked in by Paul Lelgemann, 12 years ago

o Extracted common classes from PeerToPeerBase plugin into new PeerToPeer plugin as a preparation for the new P2P proxy
o Modified directory properties to ignore the CrypBuild directory

File size: 27.3 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2POverlay.Bootstrapper;
23using PeersAtPlay.P2POverlay;
24using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
25using PeersAtPlay.P2POverlay.FullMeshOverlay;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
28using System.Threading;
29using Cryptool.PluginBase.Control;
30using System.ComponentModel;
31using PeersAtPlay;
32using PeersAtPlay.Util.Logging;
33using Gears4Net;
34using Cryptool.Plugins.PeerToPeer.Internal;
35
36/* TODO:
37 * - Catch errors, which can occur when using the DHT (network-based errors)
38 */
39
40/* - Synchronous functions successfully tested (store, retrieve)
41 * - The DHT has an integrated versioning system. When a peer wants
42 *   to store data in an entry, which already holds data, the version
43 *   number will be compared with the peers' version number. If the
44 *   peer hasn't read/write the entry the last time, the storing instruction
45 *   will be rejected. You must first read the actual data and than you can
46 *   store your data in this entry...
47 *
48 * INFO:
49 * - Have considered the DHT-own versioning system in the SynchStore method.
50 *   If this versioning system will be abolished, the SynchStore method must
51 *   be change!
52 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
53 *   certification stuff runs now
54 *
55 * TODO:
56 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
57 * - Testing asynchronous methods incl. EventHandlers
58 */
59namespace Cryptool.Plugins.PeerToPeer
60{
61    /* Advantages of this wrapper class:
62     * - The PeerAtPlay-Libraries are only referenced in this project
63     *   --> so they're easy to update
64     * - PeerAtPlay only works with asynchronous methods, so this class
65     *   "synchronizes" this methods.
66     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
67     *   issue is obfuscated by this wrapper class
68     */
69    /// <summary>
70    /// Wrapper class to integrate peer@play environment into CrypTool.
71    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
72    /// </summary>
73    public class P2PBase
74    {
75        #region Delegates and Events for asynchronous p2p functions
76
77        public delegate void SystemJoined();
78        public event SystemJoined OnSystemJoined;
79
80        public delegate void SystemLeft();
81        public event SystemLeft OnSystemLeft;
82
83        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
84        public event P2PMessageReceived OnP2PMessageReceived;
85
86        /// <summary>
87        /// returns true if key-value-pair is successfully stored in the DHT
88        /// </summary>
89        /// <param name="result"></param>
90        public delegate void DHTStoreCompleted(bool result);
91        public event DHTStoreCompleted OnDhtStore_Completed;
92
93        public delegate void DHTLoadCompleted(byte[] loadedData);
94        public event DHTLoadCompleted OnDhtLoad_Completed;
95
96        /// <summary>
97        /// returns true if key was found and removed successfully from the DHT
98        /// </summary>
99        /// <param name="result"></param>
100        public delegate void DHTRemoveCompleted(bool result);
101        public event DHTRemoveCompleted OnDhtRemove_Completed;
102
103        #endregion
104
105        #region Variables
106
107        private bool allowLoggingToMonitor;
108        /// <summary>
109        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
110        /// </summary>
111        public bool AllowLoggingToMonitor
112        {
113            get { return this.allowLoggingToMonitor; }
114            set { this.allowLoggingToMonitor = value; }
115        }
116
117        private const bool ALLOW_LOGGING_TO_MONITOR = true;
118
119        private bool started = false;
120        /// <summary>
121        /// True if system was successfully joined, false if system is COMPLETELY left
122        /// </summary>
123        public bool Started
124        {
125            get { return this.started; }
126            private set { this.started = value; } 
127        }
128
129        private IDHT dht;
130        private IP2PLinkManager linkmanager;
131        private IBootstrapper bootstrapper;
132        private P2POverlay overlay;
133        private AutoResetEvent systemJoined;
134        private AutoResetEvent systemLeft;
135
136        /// <summary>
137        /// Dictionary for synchronizing asynchronous DHT retrieves.
138        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
139        /// </summary>
140        private Dictionary<Guid, ResponseWait> waitDict;
141
142        #endregion
143
144        public P2PBase()
145        {
146            this.waitDict = new Dictionary<Guid, ResponseWait>();
147            this.systemJoined = new AutoResetEvent(false);
148            this.systemLeft = new AutoResetEvent(false);
149        }
150
151        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
152
153        /// <summary>
154        /// Initializing is the first step to build a new or access an existing p2p network
155        /// </summary>
156        /// <param name="sUserName">Choose an individual name for the user</param>
157        /// <param name="sWorldName">fundamental: two peers are only in the SAME
158        /// P2P system, when they initialized the SAME WORLD!</param>
159        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
160        /// (tunneling the P2P connection through NATs and Firewalls), you have to
161        /// set this flag to true</param>
162        /// <param name="linkManagerType"></param>
163        /// <param name="bsType"></param>
164        /// <param name="overlayType"></param>
165        /// <param name="dhtType"></param>
166        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
167        {
168            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
169
170            Scheduler scheduler = new STAScheduler("pap");
171
172            switch (linkManagerType)
173            {
174                case P2PLinkManagerType.Snal:
175                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
176                    // NAT-Traversal stuff needs a different Snal-Version
177                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
178
179                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
180                    settings.LoadDefaults();
181                    settings.ConnectInternal = true;
182                    settings.LocalReceivingPort = 0;
183                    settings.UseLocalAddressDetection = false;
184                    settings.AutoReconnect = false;
185                    settings.NoDelay = false;
186                    settings.ReuseAddress = false;
187                    settings.UseNetworkMonitorServer = true;
188
189                    this.linkmanager.Settings = settings;
190                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
191
192                    break;
193                default:
194                    throw (new NotImplementedException());
195            }
196            switch (bsType)
197            {
198                case P2PBootstrapperType.LocalMachineBootstrapper:
199                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
200                    this.bootstrapper = new LocalMachineBootstrapper();
201                    break;
202                case P2PBootstrapperType.IrcBootstrapper:
203                    // setup nat traversal stuff
204                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
205                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
206                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
207                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
208
209                    this.bootstrapper = new IrcBootstrapper(scheduler);
210                    break;
211                default:
212                    throw (new NotImplementedException());
213            }
214            switch (overlayType)
215            {
216                case P2POverlayType.FullMeshOverlay:
217                    // changing overlay example: this.overlay = new ChordOverlay();
218                    this.overlay = new FullMeshOverlay(scheduler);
219                    break;
220                default:
221                    throw (new NotImplementedException());
222            }
223            switch (dhtType)
224            {
225                case P2PDHTType.FullMeshDHT:
226                    this.dht = new FullMeshDHT(scheduler);
227                    break;
228                default:
229                    throw (new NotImplementedException());
230            }
231            #endregion
232
233            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
234            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
235            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
236
237            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
238        }
239
240        /// <summary>
241        /// Starts the P2P System. When the given P2P world doesn't exist yet,
242        /// inclusive creating the and bootstrapping to the P2P network.
243        /// In either case joining the P2P world.
244        /// This synchronized method returns true not before the peer has
245        /// successfully joined the network (this may take one or two minutes).
246        /// </summary>
247        /// <returns>True, if the peer has completely joined the p2p network</returns>
248        public bool SynchStart()
249        {
250            //Start != system joined
251            //Only starts the system asynchronous, the possible callback is useless,
252            //because it's invoked before the peer completly joined the P2P system
253            this.dht.BeginStart(null);
254            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
255            this.systemJoined.WaitOne();
256            return true;
257        }
258
259        /// <summary>
260        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
261        /// </summary>
262        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
263        public bool SynchStop()
264        {
265            if (this.dht != null)
266            {
267                this.dht.BeginStop(null);
268                //don't stop anything else, because BOOM
269
270                //wait till systemLeft Event is invoked
271                this.systemLeft.WaitOne();
272            }
273            return true;
274        }
275
276        /// <summary>
277        /// Asynchronously starting the peer. When the given P2P world doesn't
278        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
279        /// In either case joining the P2P world. To ensure that peer has successfully
280        /// joined the p2p world, catch the event OnSystemJoined.
281        /// </summary>
282        public void AsynchStart()
283        {
284            // no callback usefull, because starting and joining isn't the same
285            // everything else is done by the EventHandler OnDHT_SystemJoined
286            this.dht.BeginStart(null);
287        }
288
289        /// <summary>
290        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
291        /// disjoining, catch the event OnDHT_SystemLeft.
292        /// </summary>
293        public void AsynchStop()
294        {
295            if (this.dht != null)
296            {
297                // no callback usefull.
298                // Everything else is done by the EventHandler OnDHT_SystemLeft
299                this.dht.BeginStop(null);
300            }
301        }
302
303        #endregion
304
305        /// <summary>
306        /// Get PeerName of the actual peer
307        /// </summary>
308        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
309        /// <returns>PeerID as a String</returns>
310        public PeerId GetPeerID(out string sPeerName)
311        {
312            sPeerName = this.linkmanager.UserName;
313            return new PeerId(this.overlay.LocalAddress);
314        }
315
316        /// <summary>
317        /// Construct PeerId object for a specific byte[] id
318        /// </summary>
319        /// <param name="byteId">overlay address as byte array</param>
320        /// <returns>corresponding PeerId for given byte[] id</returns>
321        public PeerId GetPeerID(byte[] byteId)
322        {
323            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
324            return new PeerId(this.overlay.GetAddress(byteId));
325        }
326
327        // overlay.LocalAddress = Overlay-Peer-Address/Names
328        public void SendToPeer(byte[] data, byte[] destinationPeer)
329        {
330            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
331            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
332            ByteStack stackData = new ByteStack(realStackSize);
333
334            stackData.Push(data);
335
336            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
337            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
338                this.overlay.LocalAddress, destinationAddr, stackData);
339            this.overlay.Send(overlayMsg);
340        }
341
342        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
343        {
344            if (OnP2PMessageReceived != null)
345            {
346                PeerId pid = new PeerId(e.Message.Source);
347                /* You have to fire this event asynchronous, because the main
348                 * thread will be stopped in this wrapper class for synchronizing
349                 * the asynchronous stuff (AutoResetEvent) --> so this could run
350                 * into a deadlock, when you fire this event synchronous (normal Invoke)
351                 * ATTENTION: This could change the invocation order!!! In my case
352                              no problem, but maybe in future cases... */
353                OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize),null,null);
354                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
355            }
356        }
357
358        #region Event Handling (System Joined, Left and Message Received)
359
360        private void OnDHT_SystemJoined(object sender, EventArgs e)
361        {
362            if (OnSystemJoined != null)
363                OnSystemJoined();
364            this.systemJoined.Set();
365            Started = true;
366        }
367
368        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
369        {
370            if (OnSystemLeft != null)
371                OnSystemLeft();
372            // as an experiment
373            this.dht = null;
374            this.systemLeft.Set();
375            Started = false;
376
377            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
378        }
379
380        #endregion
381
382        /* Attention: The asynchronous methods are not tested at the moment */
383        #region Asynchronous Methods incl. Callbacks
384
385        /// <summary>
386        /// Asynchronously retrieving a key from the DHT. To get value, catch
387        /// event OnDhtLoad_Completed.
388        /// </summary>
389        /// <param name="sKey">Existing key in DHT</param>
390        public void AsynchRetrieve(string sKey)
391        {
392            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
393        }
394        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
395        {
396            if (OnDhtLoad_Completed != null)
397            {
398                OnDhtLoad_Completed(rr.Data);
399            }
400        }
401
402        /// <summary>
403        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
404        /// storing is completed, catch event OnDhtStore_Completed.
405        /// </summary>
406        /// <param name="sKey"></param>
407        /// <param name="sValue"></param>
408        public void AsynchStore(string sKey, string sValue)
409        {
410            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
411            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
412        }
413
414        private void OnAsynchStore_Completed(StoreResult sr)
415        {
416            if (OnDhtStore_Completed != null)
417            {
418                if (sr.Status == OperationStatus.Success)
419                    OnDhtStore_Completed(true);
420                else
421                    OnDhtStore_Completed(false);
422            }
423               
424        }
425
426        /// <summary>
427        /// Asynchronously removing an existing key out of the DHT. To ensure
428        /// that removing is completed, catch event OnDhtRemove_Completed.
429        /// </summary>
430        /// <param name="sKey"></param>
431        public void AsynchRemove(string sKey)
432        {
433            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
434            this.dht.Remove(OnAsynchRemove_Completed, sKey);
435        }
436        private void OnAsynchRemove_Completed(RemoveResult rr)
437        {
438            if (OnDhtRemove_Completed != null)
439            {
440                if(rr.Status == OperationStatus.Success)
441                    OnDhtRemove_Completed(true);
442                else
443                    OnDhtRemove_Completed(false);
444            }
445        }
446
447        #endregion
448
449        #region Synchronous Methods incl. Callbacks
450
451        #region SynchStore incl.Callback and SecondTrialCallback
452
453        /// <summary>
454        /// Stores a value in the DHT at the given key
455        /// </summary>
456        /// <param name="sKey">Key of DHT Entry</param>
457        /// <param name="byteData">Value of DHT Entry</param>
458        /// <returns>True, when storing is completed!</returns>
459        public bool SynchStore(string sKey, byte[] byteData)
460        {
461            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
462            AutoResetEvent are = new AutoResetEvent(false);
463            // this method returns always a GUID to distinguish between asynchronous actions
464            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
465
466            ResponseWait rw = new ResponseWait() { WaitHandle = are, key=sKey , value = byteData };
467
468            waitDict.Add(g, rw);
469            //blocking till response
470            are.WaitOne();
471
472            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
473
474            return rw.success;
475        }
476
477        /// <summary>
478        /// Stores a value in the DHT at the given key
479        /// </summary>
480        /// <param name="sKey">Key of DHT Entry</param>
481        /// <param name="sValue">Value of DHT Entry</param>
482        /// <returns>True, when storing is completed!</returns>
483        public bool SynchStore(string sKey, string sData)
484        {
485            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
486        }
487        /// <summary>
488        /// Callback for a the synchronized store method
489        /// </summary>
490        /// <param name="rr"></param>
491        private void OnSynchStoreCompleted(StoreResult sr)
492        {
493            ResponseWait rw;
494            if (this.waitDict.TryGetValue(sr.Guid, out rw))
495            {
496                // if Status == Error, than the version of the value is out of date.
497                // There is a versioning system in the DHT. So you must retrieve the
498                // key and than store the new value
499                if (sr.Status == OperationStatus.Failure)
500                {
501                    byte[] byteTemp = this.SynchRetrieve(rw.key);
502
503                    // Only try a second time. When it's still not possible, abort storing
504                    AutoResetEvent are = new AutoResetEvent(false);
505                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
506                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
507
508                    waitDict.Add(g, rw2);
509                    // blocking till response
510                    are.WaitOne();
511                    rw.success = rw2.success;
512                    rw.Message = rw2.Message;
513                }
514                else
515                {
516                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
517                    if (sr.Status == OperationStatus.KeyNotFound)
518                        rw.success = false;
519                    else
520                        rw.success = true;
521                }
522            }
523            //unblock WaitHandle in the synchronous method
524            rw.WaitHandle.Set();
525            // don't know if this accelerates the system...
526            this.waitDict.Remove(sr.Guid);
527        }
528
529        private void OnSecondTrialStoring(StoreResult sr)
530        {
531            ResponseWait rw;
532            if (this.waitDict.TryGetValue(sr.Guid, out rw))
533            {
534                if (sr.Status == OperationStatus.Failure)
535                {
536                    //Abort storing, because it's already the second trial
537                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
538                    rw.success = false;
539                }
540                else
541                {
542                    //works the second trial, so it was the versioning system
543                    rw.success = true;
544                }
545            }
546            //unblock WaitHandle in the synchronous method
547            rw.WaitHandle.Set();
548            // don't know if this accelerates the system...
549            this.waitDict.Remove(sr.Guid);
550        }
551
552        #endregion
553
554        /// <summary>
555        /// Get the value of the given DHT Key or null, if it doesn't exist.
556        /// For synchronous environments use the Synch* methods.
557        /// </summary>
558        /// <param name="sKey">Key of DHT Entry</param>
559        /// <returns>Value of DHT Entry</returns>
560        public byte[] SynchRetrieve(string sKey)
561        {
562            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
563
564            AutoResetEvent are = new AutoResetEvent(false);
565            // this method returns always a GUID to distinguish between asynchronous actions
566
567            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
568             
569            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
570           
571            ResponseWait rw = new ResponseWait() {WaitHandle = are };
572           
573            waitDict.Add(g,rw  );
574            // blocking till response
575            are.WaitOne();
576
577            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
578
579            //Rückgabe der Daten
580            return rw.Message;
581        }
582
583        /// <summary>
584        /// Callback for a the synchronized retrieval method
585        /// </summary>
586        /// <param name="rr"></param>
587        private void OnSynchRetrieveCompleted(RetrieveResult rr)
588        {
589            LogToMonitor(rr.Guid.ToString());
590           
591            ResponseWait rw;
592
593            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
594
595            if (this.waitDict.TryGetValue(rr.Guid, out rw))
596            {
597                // successful as long as no error occured
598                rw.success = true;
599                if (rr.Status == OperationStatus.Failure)
600                {
601                    rw.Message = null;
602                    rw.success = false;
603                }
604                else if (rr.Status == OperationStatus.KeyNotFound)
605                    rw.Message = null;
606                else
607                    rw.Message = rr.Data;
608
609                //unblock WaitHandle in the synchronous method
610                rw.WaitHandle.Set();
611                // don't know if this accelerates the system...
612                this.waitDict.Remove(rr.Guid);
613            }
614        }
615        /// <summary>
616        /// Removes a key/value pair out of the DHT
617        /// </summary>
618        /// <param name="sKey">Key of the DHT Entry</param>
619        /// <returns>True, when removing is completed!</returns>
620        public bool SynchRemove(string sKey)
621        {
622            LogToMonitor("Begin SynchRemove. Key: " + sKey);
623
624            AutoResetEvent are = new AutoResetEvent(false);
625            // this method returns always a GUID to distinguish between asynchronous actions
626            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
627            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
628
629            ResponseWait rw = new ResponseWait() { WaitHandle = are };
630
631            waitDict.Add(g, rw);
632            // blocking till response
633            are.WaitOne();
634
635            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
636
637            return rw.success;
638        }
639
640        /// <summary>
641        /// Callback for a the synchronized remove method
642        /// </summary>
643        /// <param name="rr"></param>
644        private void OnSynchRemoveCompleted(RemoveResult rr)
645        {
646            ResponseWait rw;
647            if (this.waitDict.TryGetValue(rr.Guid, out rw))
648            {
649                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
650
651                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
652                    rw.success = false;
653                else
654                    rw.success = true;
655
656                //unblock WaitHandle in the synchronous method
657                rw.WaitHandle.Set();
658                // don't know if this accelerates the system...
659                this.waitDict.Remove(rr.Guid);
660            }
661        }
662
663        #endregion
664
665        /// <summary>
666        /// To log the internal state in the Monitoring Software of P@play
667        /// </summary>
668        public void LogInternalState()
669        {
670            if (this.dht != null)
671            {
672                this.dht.LogInternalState();
673            }
674        }
675
676        public void LogToMonitor(string sTextToLog)
677        {
678            if(AllowLoggingToMonitor)
679                Log.Debug(sTextToLog);
680        }
681
682    }
683}
Note: See TracBrowser for help on using the repository browser.