source: trunk/CrypPlugins/PeerToPeerBase/P2PBase.cs @ 2061

Last change on this file since 2061 was 2027, checked in by Arno Wacker, 11 years ago

Adapted P2PBase to new PAP DLLs: Replaced all references to IrcBootstrapper with IrcBootstrapperV2 (V1 is no longer available)
(this might fix the nightly build)

File size: 27.1 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2POverlay.Bootstrapper;
23using PeersAtPlay.P2POverlay;
24using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
25using PeersAtPlay.P2POverlay.FullMeshOverlay;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
28using System.Threading;
29using Cryptool.PluginBase.Control;
30using System.ComponentModel;
31using PeersAtPlay;
32using PeersAtPlay.Util.Logging;
33using Gears4Net;
34using Cryptool.Plugins.PeerToPeer.Internal;
35
36/* TODO:
37 * - Catch errors, which can occur when using the DHT (network-based errors)
38 */
39
40/* - Synchronous functions successfully tested (store, retrieve)
41 * - The DHT has an integrated versioning system. When a peer wants
42 *   to store data in an entry, which already holds data, the version
43 *   number will be compared with the peers' version number. If the
44 *   peer hasn't read/write the entry the last time, the storing instruction
45 *   will be rejected. You must first read the actual data and than you can
46 *   store your data in this entry...
47 *
48 * INFO:
49 * - Have considered the DHT-own versioning system in the SynchStore method.
50 *   If this versioning system will be abolished, the SynchStore method must
51 *   be change!
52 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
53 *   certification stuff runs now
54 *
55 * TODO:
56 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
57 * - Testing asynchronous methods incl. EventHandlers
58 */
59namespace Cryptool.Plugins.PeerToPeer
60{
61    /* Advantages of this wrapper class:
62     * - The PeerAtPlay-Libraries are only referenced in this project
63     *   --> so they're easy to update
64     * - PeerAtPlay only works with asynchronous methods, so this class
65     *   "synchronizes" this methods.
66     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
67     *   issue is obfuscated by this wrapper class
68     */
69    /// <summary>
70    /// Wrapper class to integrate peer@play environment into CrypTool.
71    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
72    /// </summary>
73    public class P2PBase
74    {
75        #region Delegates and Events for asynchronous p2p functions
76
77        public delegate void SystemJoined();
78        public event SystemJoined OnSystemJoined;
79
80        public delegate void SystemLeft();
81        public event SystemLeft OnSystemLeft;
82
83        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
84        public event P2PMessageReceived OnP2PMessageReceived;
85
86        /// <summary>
87        /// returns true if key-value-pair is successfully stored in the DHT
88        /// </summary>
89        /// <param name="result"></param>
90        public delegate void DHTStoreCompleted(bool result);
91        public event DHTStoreCompleted OnDhtStore_Completed;
92
93        public delegate void DHTLoadCompleted(byte[] loadedData);
94        public event DHTLoadCompleted OnDhtLoad_Completed;
95
96        /// <summary>
97        /// returns true if key was found and removed successfully from the DHT
98        /// </summary>
99        /// <param name="result"></param>
100        public delegate void DHTRemoveCompleted(bool result);
101        public event DHTRemoveCompleted OnDhtRemove_Completed;
102
103        #endregion
104
105        #region Variables
106
107        private bool allowLoggingToMonitor;
108        /// <summary>
109        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
110        /// </summary>
111        public bool AllowLoggingToMonitor
112        {
113            get { return this.allowLoggingToMonitor; }
114            set { this.allowLoggingToMonitor = value; }
115        }
116
117        private const bool ALLOW_LOGGING_TO_MONITOR = true;
118
119        private bool started = false;
120        /// <summary>
121        /// True if system was successfully joined, false if system is COMPLETELY left
122        /// </summary>
123        public bool Started
124        {
125            get { return this.started; }
126            private set { this.started = value; } 
127        }
128
129        private IDHT dht;
130        private IP2PLinkManager linkmanager;
131        private IBootstrapper bootstrapper;
132        private P2POverlay overlay;
133        private AutoResetEvent systemJoined;
134        private AutoResetEvent systemLeft;
135
136        /// <summary>
137        /// Dictionary for synchronizing asynchronous DHT retrieves.
138        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
139        /// </summary>
140        private Dictionary<Guid, ResponseWait> waitDict;
141
142        #endregion
143
144        public P2PBase()
145        {
146            this.waitDict = new Dictionary<Guid, ResponseWait>();
147            this.systemJoined = new AutoResetEvent(false);
148            this.systemLeft = new AutoResetEvent(false);
149        }
150
151        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
152
153        /// <summary>
154        /// Initializing is the first step to build a new or access an existing p2p network
155        /// </summary>
156        /// <param name="sUserName">Choose an individual name for the user</param>
157        /// <param name="sWorldName">fundamental: two peers are only in the SAME
158        /// P2P system, when they initialized the SAME WORLD!</param>
159        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
160        /// (tunneling the P2P connection through NATs and Firewalls), you have to
161        /// set this flag to true</param>
162        /// <param name="linkManagerType"></param>
163        /// <param name="bsType"></param>
164        /// <param name="overlayType"></param>
165        /// <param name="dhtType"></param>
166        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2PArchitecture architecture)
167        {
168            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
169
170            Scheduler scheduler = new STAScheduler("pap");
171
172            switch (linkManagerType)
173            {
174                case P2PLinkManagerType.Snal:
175                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
176                    // NAT-Traversal stuff needs a different Snal-Version
177                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
178
179                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
180                    settings.LoadDefaults();
181                    settings.ConnectInternal = true;
182                    settings.LocalReceivingPort = 0;
183                    settings.UseLocalAddressDetection = false;
184                    settings.NoDelay = false;
185                    settings.ReuseAddress = false;
186                    settings.UseNetworkMonitorServer = true;
187
188                    this.linkmanager.Settings = settings;
189                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
190
191                    break;
192                default:
193                    throw (new NotImplementedException());
194            }
195            switch (bsType)
196            {
197                case P2PBootstrapperType.LocalMachineBootstrapper:
198                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
199                    this.bootstrapper = new LocalMachineBootstrapper();
200                    break;
201                case P2PBootstrapperType.IrcBootstrapper:
202                    // setup nat traversal stuff
203                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
204                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
205                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
206                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponseTime = 6000;
207
208                    this.bootstrapper = new IrcBootstrapper(scheduler);
209                    break;
210                default:
211                    throw (new NotImplementedException());
212            }
213            switch (architecture)
214            {
215                case P2PArchitecture.FullMesh:
216                    // changing overlay example: this.overlay = new ChordOverlay();
217                    this.overlay = new FullMeshOverlay(scheduler);
218                    this.dht = new FullMeshDHT(scheduler);
219                    break;
220                case P2PArchitecture.Chord:
221                    throw (new NotImplementedException());
222                default:
223                    throw (new NotImplementedException());
224            }
225            #endregion
226
227            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
228            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
229            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
230
231            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
232        }
233
234        /// <summary>
235        /// Starts the P2P System. When the given P2P world doesn't exist yet,
236        /// inclusive creating the and bootstrapping to the P2P network.
237        /// In either case joining the P2P world.
238        /// This synchronized method returns true not before the peer has
239        /// successfully joined the network (this may take one or two minutes).
240        /// </summary>
241        /// <returns>True, if the peer has completely joined the p2p network</returns>
242        public bool SynchStart()
243        {
244            //Start != system joined
245            //Only starts the system asynchronous, the possible callback is useless,
246            //because it's invoked before the peer completly joined the P2P system
247            this.dht.BeginStart(null);
248            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
249            this.systemJoined.WaitOne();
250            return true;
251        }
252
253        /// <summary>
254        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
255        /// </summary>
256        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
257        public bool SynchStop()
258        {
259            if (this.dht != null)
260            {
261                this.dht.BeginStop(null);
262                //don't stop anything else, because BOOM
263
264                //wait till systemLeft Event is invoked
265                this.systemLeft.WaitOne();
266            }
267            return true;
268        }
269
270        /// <summary>
271        /// Asynchronously starting the peer. When the given P2P world doesn't
272        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
273        /// In either case joining the P2P world. To ensure that peer has successfully
274        /// joined the p2p world, catch the event OnSystemJoined.
275        /// </summary>
276        public void AsynchStart()
277        {
278            // no callback usefull, because starting and joining isn't the same
279            // everything else is done by the EventHandler OnDHT_SystemJoined
280            this.dht.BeginStart(null);
281        }
282
283        /// <summary>
284        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
285        /// disjoining, catch the event OnDHT_SystemLeft.
286        /// </summary>
287        public void AsynchStop()
288        {
289            if (this.dht != null)
290            {
291                // no callback usefull.
292                // Everything else is done by the EventHandler OnDHT_SystemLeft
293                this.dht.BeginStop(null);
294            }
295        }
296
297        #endregion
298
299        /// <summary>
300        /// Get PeerName of the actual peer
301        /// </summary>
302        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
303        /// <returns>PeerID as a String</returns>
304        public PeerId GetPeerID(out string sPeerName)
305        {
306            sPeerName = this.linkmanager.UserName;
307            return new PeerId(this.overlay.LocalAddress);
308        }
309
310        /// <summary>
311        /// Construct PeerId object for a specific byte[] id
312        /// </summary>
313        /// <param name="byteId">overlay address as byte array</param>
314        /// <returns>corresponding PeerId for given byte[] id</returns>
315        public PeerId GetPeerID(byte[] byteId)
316        {
317            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
318            return new PeerId(this.overlay.GetAddress(byteId));
319        }
320
321        // overlay.LocalAddress = Overlay-Peer-Address/Names
322        public void SendToPeer(byte[] data, byte[] destinationPeer)
323        {
324            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
325            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
326            ByteStack stackData = new ByteStack(realStackSize);
327
328            stackData.Push(data);
329
330            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
331            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
332                this.overlay.LocalAddress, destinationAddr, stackData);
333            this.overlay.Send(overlayMsg);
334        }
335
336        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
337        {
338            if (OnP2PMessageReceived != null)
339            {
340                PeerId pid = new PeerId(e.Message.Source);
341                /* You have to fire this event asynchronous, because the main
342                 * thread will be stopped in this wrapper class for synchronizing
343                 * the asynchronous stuff (AutoResetEvent) --> so this could run
344                 * into a deadlock, when you fire this event synchronous (normal Invoke)
345                 * ATTENTION: This could change the invocation order!!! In my case
346                              no problem, but maybe in future cases... */
347                OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize),null,null);
348                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
349            }
350        }
351
352        #region Event Handling (System Joined, Left and Message Received)
353
354        private void OnDHT_SystemJoined(object sender, EventArgs e)
355        {
356            if (OnSystemJoined != null)
357                OnSystemJoined();
358            this.systemJoined.Set();
359            Started = true;
360        }
361
362        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
363        {
364            if (OnSystemLeft != null)
365                OnSystemLeft();
366            // as an experiment
367            this.dht = null;
368            this.systemLeft.Set();
369            Started = false;
370
371            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
372        }
373
374        #endregion
375
376        /* Attention: The asynchronous methods are not tested at the moment */
377        #region Asynchronous Methods incl. Callbacks
378
379        /// <summary>
380        /// Asynchronously retrieving a key from the DHT. To get value, catch
381        /// event OnDhtLoad_Completed.
382        /// </summary>
383        /// <param name="sKey">Existing key in DHT</param>
384        public void AsynchRetrieve(string sKey)
385        {
386            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
387        }
388        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
389        {
390            if (OnDhtLoad_Completed != null)
391            {
392                OnDhtLoad_Completed(rr.Data);
393            }
394        }
395
396        /// <summary>
397        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
398        /// storing is completed, catch event OnDhtStore_Completed.
399        /// </summary>
400        /// <param name="sKey"></param>
401        /// <param name="sValue"></param>
402        public void AsynchStore(string sKey, string sValue)
403        {
404            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
405            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
406        }
407
408        private void OnAsynchStore_Completed(StoreResult sr)
409        {
410            if (OnDhtStore_Completed != null)
411            {
412                if (sr.Status == OperationStatus.Success)
413                    OnDhtStore_Completed(true);
414                else
415                    OnDhtStore_Completed(false);
416            }
417               
418        }
419
420        /// <summary>
421        /// Asynchronously removing an existing key out of the DHT. To ensure
422        /// that removing is completed, catch event OnDhtRemove_Completed.
423        /// </summary>
424        /// <param name="sKey"></param>
425        public void AsynchRemove(string sKey)
426        {
427            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
428            this.dht.Remove(OnAsynchRemove_Completed, sKey);
429        }
430        private void OnAsynchRemove_Completed(RemoveResult rr)
431        {
432            if (OnDhtRemove_Completed != null)
433            {
434                if(rr.Status == OperationStatus.Success)
435                    OnDhtRemove_Completed(true);
436                else
437                    OnDhtRemove_Completed(false);
438            }
439        }
440
441        #endregion
442
443        #region Synchronous Methods incl. Callbacks
444
445        #region SynchStore incl.Callback and SecondTrialCallback
446
447        /// <summary>
448        /// Stores a value in the DHT at the given key
449        /// </summary>
450        /// <param name="sKey">Key of DHT Entry</param>
451        /// <param name="byteData">Value of DHT Entry</param>
452        /// <returns>True, when storing is completed!</returns>
453        public bool SynchStore(string sKey, byte[] byteData)
454        {
455            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
456            AutoResetEvent are = new AutoResetEvent(false);
457            // this method returns always a GUID to distinguish between asynchronous actions
458            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
459
460            ResponseWait rw = new ResponseWait() { WaitHandle = are, key=sKey , value = byteData };
461
462            waitDict.Add(g, rw);
463            //blocking till response
464            are.WaitOne();
465
466            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
467
468            return rw.success;
469        }
470
471        /// <summary>
472        /// Stores a value in the DHT at the given key
473        /// </summary>
474        /// <param name="sKey">Key of DHT Entry</param>
475        /// <param name="sValue">Value of DHT Entry</param>
476        /// <returns>True, when storing is completed!</returns>
477        public bool SynchStore(string sKey, string sData)
478        {
479            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
480        }
481        /// <summary>
482        /// Callback for a the synchronized store method
483        /// </summary>
484        /// <param name="rr"></param>
485        private void OnSynchStoreCompleted(StoreResult sr)
486        {
487            ResponseWait rw;
488            if (this.waitDict.TryGetValue(sr.Guid, out rw))
489            {
490                // if Status == Error, than the version of the value is out of date.
491                // There is a versioning system in the DHT. So you must retrieve the
492                // key and than store the new value
493                if (sr.Status == OperationStatus.Failure)
494                {
495                    byte[] byteTemp = this.SynchRetrieve(rw.key);
496
497                    // Only try a second time. When it's still not possible, abort storing
498                    AutoResetEvent are = new AutoResetEvent(false);
499                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
500                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
501
502                    waitDict.Add(g, rw2);
503                    // blocking till response
504                    are.WaitOne();
505                    rw.success = rw2.success;
506                    rw.Message = rw2.Message;
507                }
508                else
509                {
510                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
511                    if (sr.Status == OperationStatus.KeyNotFound)
512                        rw.success = false;
513                    else
514                        rw.success = true;
515                }
516            }
517            //unblock WaitHandle in the synchronous method
518            rw.WaitHandle.Set();
519            // don't know if this accelerates the system...
520            this.waitDict.Remove(sr.Guid);
521        }
522
523        private void OnSecondTrialStoring(StoreResult sr)
524        {
525            ResponseWait rw;
526            if (this.waitDict.TryGetValue(sr.Guid, out rw))
527            {
528                if (sr.Status == OperationStatus.Failure)
529                {
530                    //Abort storing, because it's already the second trial
531                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
532                    rw.success = false;
533                }
534                else
535                {
536                    //works the second trial, so it was the versioning system
537                    rw.success = true;
538                }
539            }
540            //unblock WaitHandle in the synchronous method
541            rw.WaitHandle.Set();
542            // don't know if this accelerates the system...
543            this.waitDict.Remove(sr.Guid);
544        }
545
546        #endregion
547
548        /// <summary>
549        /// Get the value of the given DHT Key or null, if it doesn't exist.
550        /// For synchronous environments use the Synch* methods.
551        /// </summary>
552        /// <param name="sKey">Key of DHT Entry</param>
553        /// <returns>Value of DHT Entry</returns>
554        public byte[] SynchRetrieve(string sKey)
555        {
556            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
557
558            AutoResetEvent are = new AutoResetEvent(false);
559            // this method returns always a GUID to distinguish between asynchronous actions
560
561            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
562             
563            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
564           
565            ResponseWait rw = new ResponseWait() {WaitHandle = are };
566           
567            waitDict.Add(g,rw  );
568            // blocking till response
569            are.WaitOne();
570
571            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
572
573            //Rückgabe der Daten
574            return rw.Message;
575        }
576
577        /// <summary>
578        /// Callback for a the synchronized retrieval method
579        /// </summary>
580        /// <param name="rr"></param>
581        private void OnSynchRetrieveCompleted(RetrieveResult rr)
582        {
583            LogToMonitor(rr.Guid.ToString());
584           
585            ResponseWait rw;
586
587            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
588
589            if (this.waitDict.TryGetValue(rr.Guid, out rw))
590            {
591                // successful as long as no error occured
592                rw.success = true;
593                if (rr.Status == OperationStatus.Failure)
594                {
595                    rw.Message = null;
596                    rw.success = false;
597                }
598                else if (rr.Status == OperationStatus.KeyNotFound)
599                    rw.Message = null;
600                else
601                    rw.Message = rr.Data;
602
603                //unblock WaitHandle in the synchronous method
604                rw.WaitHandle.Set();
605                // don't know if this accelerates the system...
606                this.waitDict.Remove(rr.Guid);
607            }
608        }
609        /// <summary>
610        /// Removes a key/value pair out of the DHT
611        /// </summary>
612        /// <param name="sKey">Key of the DHT Entry</param>
613        /// <returns>True, when removing is completed!</returns>
614        public bool SynchRemove(string sKey)
615        {
616            LogToMonitor("Begin SynchRemove. Key: " + sKey);
617
618            AutoResetEvent are = new AutoResetEvent(false);
619            // this method returns always a GUID to distinguish between asynchronous actions
620            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
621            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
622
623            ResponseWait rw = new ResponseWait() { WaitHandle = are };
624
625            waitDict.Add(g, rw);
626            // blocking till response
627            are.WaitOne();
628
629            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
630
631            return rw.success;
632        }
633
634        /// <summary>
635        /// Callback for a the synchronized remove method
636        /// </summary>
637        /// <param name="rr"></param>
638        private void OnSynchRemoveCompleted(RemoveResult rr)
639        {
640            ResponseWait rw;
641            if (this.waitDict.TryGetValue(rr.Guid, out rw))
642            {
643                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
644
645                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
646                    rw.success = false;
647                else
648                    rw.success = true;
649
650                //unblock WaitHandle in the synchronous method
651                rw.WaitHandle.Set();
652                // don't know if this accelerates the system...
653                this.waitDict.Remove(rr.Guid);
654            }
655        }
656
657        #endregion
658
659        /// <summary>
660        /// To log the internal state in the Monitoring Software of P@play
661        /// </summary>
662        public void LogInternalState()
663        {
664            if (this.dht != null)
665            {
666                this.dht.LogInternalState();
667            }
668        }
669
670        public void LogToMonitor(string sTextToLog)
671        {
672            if(AllowLoggingToMonitor)
673                Log.Debug(sTextToLog);
674        }
675
676    }
677}
Note: See TracBrowser for help on using the repository browser.