source: trunk/CrypP2P/Internal/P2PBase.cs @ 1537

Last change on this file since 1537 was 1537, checked in by Paul Lelgemann, 12 years ago

o (probably) fixed bug with a locked P2P instance when storing large values
+ CrypP2P: Added event for connection state changes

File size: 27.6 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Text;
19using PeersAtPlay.P2PStorage.DHT;
20using PeersAtPlay.P2PStorage.FullMeshDHT;
21using PeersAtPlay.P2POverlay.Bootstrapper;
22using PeersAtPlay.P2POverlay;
23using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
24using PeersAtPlay.P2POverlay.FullMeshOverlay;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
27using System.Threading;
28using PeersAtPlay;
29using PeersAtPlay.Util.Logging;
30using Gears4Net;
31using Cryptool.Plugins.PeerToPeer.Internal;
32
33/* TODO:
34 * - Catch errors, which can occur when using the DHT (network-based errors)
35 */
36
37/* - Synchronous functions successfully tested (store, retrieve)
38 * - The DHT has an integrated versioning system. When a peer wants
39 *   to store data in an entry, which already holds data, the version
40 *   number will be compared with the peers' version number. If the
41 *   peer hasn't read/write the entry the last time, the storing instruction
42 *   will be rejected. You must first read the actual data and than you can
43 *   store your data in this entry...
44 *
45 * INFO:
46 * - Have considered the DHT-own versioning system in the SynchStore method.
47 *   If this versioning system will be abolished, the SynchStore method must
48 *   be change!
49 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
50 *   certification stuff runs now
51 *
52 * TODO:
53 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
54 * - Testing asynchronous methods incl. EventHandlers
55 */
56namespace Cryptool.P2P.Internal
57{
58    /* Advantages of this wrapper class:
59     * - The PeerAtPlay-Libraries are only referenced in this project
60     *   --> so they're easy to update
61     * - PeerAtPlay only works with asynchronous methods, so this class
62     *   "synchronizes" this methods.
63     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
64     *   issue is obfuscated by this wrapper class
65     */
66    /// <summary>
67    /// Wrapper class to integrate peer@play environment into CrypTool.
68    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
69    /// </summary>
70    public class P2PBase
71    {
72        #region Delegates and Events for asynchronous p2p functions
73
74        public delegate void SystemJoined();
75        public event SystemJoined OnSystemJoined;
76
77        public delegate void SystemLeft();
78        public event SystemLeft OnSystemLeft;
79
80        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
81        public event P2PMessageReceived OnP2PMessageReceived;
82
83        /// <summary>
84        /// returns true if key-value-pair is successfully stored in the DHT
85        /// </summary>
86        /// <param name="result"></param>
87        public delegate void DHTStoreCompleted(bool result);
88        public event DHTStoreCompleted OnDhtStore_Completed;
89
90        public delegate void DHTLoadCompleted(byte[] loadedData);
91        public event DHTLoadCompleted OnDhtLoad_Completed;
92
93        /// <summary>
94        /// returns true if key was found and removed successfully from the DHT
95        /// </summary>
96        /// <param name="result"></param>
97        public delegate void DHTRemoveCompleted(bool result);
98        public event DHTRemoveCompleted OnDhtRemove_Completed;
99
100        #endregion
101
102        #region Variables
103
104        private bool allowLoggingToMonitor;
105        /// <summary>
106        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
107        /// </summary>
108        public bool AllowLoggingToMonitor
109        {
110            get { return this.allowLoggingToMonitor; }
111            set { this.allowLoggingToMonitor = value; }
112        }
113
114        private const bool ALLOW_LOGGING_TO_MONITOR = true;
115
116        private bool started = false;
117        /// <summary>
118        /// True if system was successfully joined, false if system is COMPLETELY left
119        /// </summary>
120        public bool Started
121        {
122            get { return this.started; }
123            private set { this.started = value; }
124        }
125
126        private IDHT dht;
127        private IP2PLinkManager linkmanager;
128        private IBootstrapper bootstrapper;
129        private P2POverlay overlay;
130        private AutoResetEvent systemJoined;
131        private AutoResetEvent systemLeft;
132
133        /// <summary>
134        /// Dictionary for synchronizing asynchronous DHT retrieves.
135        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
136        /// </summary>
137        private Dictionary<Guid, ResponseWait> waitDict;
138
139        #endregion
140
141        public P2PBase()
142        {
143            this.waitDict = new Dictionary<Guid, ResponseWait>();
144            this.systemJoined = new AutoResetEvent(false);
145            this.systemLeft = new AutoResetEvent(false);
146        }
147
148        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
149
150        public void Initialize(P2PSettings p2PSettings)
151        {
152            Initialize(p2PSettings.PeerName, p2PSettings.WorldName, p2PSettings.LinkManager, p2PSettings.Bootstrapper, p2PSettings.Overlay, p2PSettings.Dht);
153        }
154
155        /// <summary>
156        /// Initializing is the first step to build a new or access an existing p2p network
157        /// </summary>
158        /// <param name="sUserName">Choose an individual name for the user</param>
159        /// <param name="sWorldName">fundamental: two peers are only in the SAME
160        /// P2P system, when they initialized the SAME WORLD!</param>
161        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
162        /// (tunneling the P2P connection through NATs and Firewalls), you have to
163        /// set this flag to true</param>
164        /// <param name="linkManagerType"></param>
165        /// <param name="bsType"></param>
166        /// <param name="overlayType"></param>
167        /// <param name="dhtType"></param>
168        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
169        {
170            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
171
172            Scheduler scheduler = new STAScheduler("pap");
173
174            switch (linkManagerType)
175            {
176                case P2PLinkManagerType.Snal:
177                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
178                    // NAT-Traversal stuff needs a different Snal-Version
179                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
180
181                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
182                    settings.LoadDefaults();
183                    settings.ConnectInternal = true;
184                    settings.LocalReceivingPort = 0;
185                    settings.UseLocalAddressDetection = false;
186                    settings.AutoReconnect = false;
187                    settings.NoDelay = false;
188                    settings.ReuseAddress = false;
189                    settings.UseNetworkMonitorServer = true;
190
191                    this.linkmanager.Settings = settings;
192                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
193
194                    break;
195                default:
196                    throw (new NotImplementedException());
197            }
198            switch (bsType)
199            {
200                case P2PBootstrapperType.LocalMachineBootstrapper:
201                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
202                    this.bootstrapper = new LocalMachineBootstrapper();
203                    break;
204                case P2PBootstrapperType.IrcBootstrapper:
205                    // setup nat traversal stuff
206                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
207                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
208                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
209                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
210
211                    this.bootstrapper = new IrcBootstrapper(scheduler);
212                    break;
213                default:
214                    throw (new NotImplementedException());
215            }
216            switch (overlayType)
217            {
218                case P2POverlayType.FullMeshOverlay:
219                    // changing overlay example: this.overlay = new ChordOverlay();
220                    this.overlay = new FullMeshOverlay(scheduler);
221                    break;
222                default:
223                    throw (new NotImplementedException());
224            }
225            switch (dhtType)
226            {
227                case P2PDHTType.FullMeshDHT:
228                    this.dht = new FullMeshDHT(scheduler);
229                    break;
230                default:
231                    throw (new NotImplementedException());
232            }
233            #endregion
234
235            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
236            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
237            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
238
239            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
240        }
241
242        /// <summary>
243        /// Starts the P2P System. When the given P2P world doesn't exist yet,
244        /// inclusive creating the and bootstrapping to the P2P network.
245        /// In either case joining the P2P world.
246        /// This synchronized method returns true not before the peer has
247        /// successfully joined the network (this may take one or two minutes).
248        /// </summary>
249        /// <returns>True, if the peer has completely joined the p2p network</returns>
250        public bool SynchStart()
251        {
252            //Start != system joined
253            //Only starts the system asynchronous, the possible callback is useless,
254            //because it's invoked before the peer completly joined the P2P system
255            this.dht.BeginStart(null);
256            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
257            this.systemJoined.WaitOne();
258            return true;
259        }
260
261        /// <summary>
262        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
263        /// </summary>
264        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
265        public bool SynchStop()
266        {
267            if (this.dht != null)
268            {
269                this.dht.BeginStop(null);
270                //don't stop anything else, because BOOM
271
272                //wait till systemLeft Event is invoked
273                this.systemLeft.WaitOne();
274            }
275            return true;
276        }
277
278        /// <summary>
279        /// Asynchronously starting the peer. When the given P2P world doesn't
280        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
281        /// In either case joining the P2P world. To ensure that peer has successfully
282        /// joined the p2p world, catch the event OnSystemJoined.
283        /// </summary>
284        public void AsynchStart()
285        {
286            // no callback usefull, because starting and joining isn't the same
287            // everything else is done by the EventHandler OnDHT_SystemJoined
288            this.dht.BeginStart(null);
289        }
290
291        /// <summary>
292        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
293        /// disjoining, catch the event OnDHT_SystemLeft.
294        /// </summary>
295        public void AsynchStop()
296        {
297            if (this.dht != null)
298            {
299                // no callback usefull.
300                // Everything else is done by the EventHandler OnDHT_SystemLeft
301                this.dht.BeginStop(null);
302            }
303        }
304
305        #endregion
306
307        /// <summary>
308        /// Get PeerName of the actual peer
309        /// </summary>
310        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
311        /// <returns>PeerID as a String</returns>
312        public PeerId GetPeerID(out string sPeerName)
313        {
314            sPeerName = this.linkmanager.UserName;
315            return new PeerId(this.overlay.LocalAddress);
316        }
317
318        /// <summary>
319        /// Construct PeerId object for a specific byte[] id
320        /// </summary>
321        /// <param name="byteId">overlay address as byte array</param>
322        /// <returns>corresponding PeerId for given byte[] id</returns>
323        public PeerId GetPeerID(byte[] byteId)
324        {
325            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
326            return new PeerId(this.overlay.GetAddress(byteId));
327        }
328
329        // overlay.LocalAddress = Overlay-Peer-Address/Names
330        public void SendToPeer(byte[] data, byte[] destinationPeer)
331        {
332            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
333            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
334            ByteStack stackData = new ByteStack(realStackSize);
335
336            stackData.Push(data);
337
338            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
339            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
340                this.overlay.LocalAddress, destinationAddr, stackData);
341            this.overlay.Send(overlayMsg);
342        }
343
344        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
345        {
346            if (OnP2PMessageReceived != null)
347            {
348                PeerId pid = new PeerId(e.Message.Source);
349                /* You have to fire this event asynchronous, because the main
350                 * thread will be stopped in this wrapper class for synchronizing
351                 * the asynchronous stuff (AutoResetEvent) --> so this could run
352                 * into a deadlock, when you fire this event synchronous (normal Invoke)
353                 * ATTENTION: This could change the invocation order!!! In my case
354                              no problem, but maybe in future cases... */
355
356                // TODO: not safe: The delegate must have only one target
357                //OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
358
359                foreach (Delegate del in OnP2PMessageReceived.GetInvocationList())
360                {
361                    del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
362                }
363
364                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
365            }
366        }
367
368        #region Event Handling (System Joined, Left and Message Received)
369
370        private void OnDHT_SystemJoined(object sender, EventArgs e)
371        {
372            if (OnSystemJoined != null)
373                OnSystemJoined();
374            this.systemJoined.Set();
375            Started = true;
376        }
377
378        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
379        {
380            if (OnSystemLeft != null)
381                OnSystemLeft();
382            // as an experiment
383            this.dht = null;
384            this.systemLeft.Set();
385            Started = false;
386
387            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
388        }
389
390        #endregion
391
392        /* Attention: The asynchronous methods are not tested at the moment */
393        #region Asynchronous Methods incl. Callbacks
394
395        /// <summary>
396        /// Asynchronously retrieving a key from the DHT. To get value, catch
397        /// event OnDhtLoad_Completed.
398        /// </summary>
399        /// <param name="sKey">Existing key in DHT</param>
400        public void AsynchRetrieve(string sKey)
401        {
402            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
403        }
404        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
405        {
406            if (OnDhtLoad_Completed != null)
407            {
408                OnDhtLoad_Completed(rr.Data);
409            }
410        }
411
412        /// <summary>
413        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
414        /// storing is completed, catch event OnDhtStore_Completed.
415        /// </summary>
416        /// <param name="sKey"></param>
417        /// <param name="sValue"></param>
418        public void AsynchStore(string sKey, string sValue)
419        {
420            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
421            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
422        }
423
424        private void OnAsynchStore_Completed(StoreResult sr)
425        {
426            if (OnDhtStore_Completed != null)
427            {
428                if (sr.Status == OperationStatus.Success)
429                    OnDhtStore_Completed(true);
430                else
431                    OnDhtStore_Completed(false);
432            }
433
434        }
435
436        /// <summary>
437        /// Asynchronously removing an existing key out of the DHT. To ensure
438        /// that removing is completed, catch event OnDhtRemove_Completed.
439        /// </summary>
440        /// <param name="sKey"></param>
441        public void AsynchRemove(string sKey)
442        {
443            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
444            this.dht.Remove(OnAsynchRemove_Completed, sKey);
445        }
446        private void OnAsynchRemove_Completed(RemoveResult rr)
447        {
448            if (OnDhtRemove_Completed != null)
449            {
450                if (rr.Status == OperationStatus.Success)
451                    OnDhtRemove_Completed(true);
452                else
453                    OnDhtRemove_Completed(false);
454            }
455        }
456
457        #endregion
458
459        #region Synchronous Methods incl. Callbacks
460
461        #region SynchStore incl.Callback and SecondTrialCallback
462
463        /// <summary>
464        /// Stores a value in the DHT at the given key
465        /// </summary>
466        /// <param name="sKey">Key of DHT Entry</param>
467        /// <param name="byteData">Value of DHT Entry</param>
468        /// <returns>True, when storing is completed!</returns>
469        public bool SynchStore(string sKey, byte[] byteData)
470        {
471            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", " + byteData.Length + " bytes");
472            AutoResetEvent are = new AutoResetEvent(false);
473            // this method returns always a GUID to distinguish between asynchronous actions
474            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
475
476            ResponseWait rw = new ResponseWait() { WaitHandle = are, key = sKey, value = byteData };
477
478            waitDict.Add(g, rw);
479            //blocking till response
480            are.WaitOne();
481
482            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
483
484            return rw.success;
485        }
486
487        /// <summary>
488        /// Stores a value in the DHT at the given key
489        /// </summary>
490        /// <param name="sKey">Key of DHT Entry</param>
491        /// <param name="sValue">Value of DHT Entry</param>
492        /// <returns>True, when storing is completed!</returns>
493        public bool SynchStore(string sKey, string sData)
494        {
495            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
496        }
497        /// <summary>
498        /// Callback for a the synchronized store method
499        /// </summary>
500        /// <param name="rr"></param>
501        private void OnSynchStoreCompleted(StoreResult sr)
502        {
503            ResponseWait rw;
504            if (this.waitDict.TryGetValue(sr.Guid, out rw))
505            {
506                // if Status == Error, than the version of the value is out of date.
507                // There is a versioning system in the DHT. So you must retrieve the
508                // key and than store the new value
509                if (sr.Status == OperationStatus.Failure)
510                {
511                    byte[] byteTemp = this.SynchRetrieve(rw.key);
512
513                    // Only try a second time. When it's still not possible, abort storing
514                    AutoResetEvent are = new AutoResetEvent(false);
515                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
516                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
517
518                    waitDict.Add(g, rw2);
519                    // blocking till response
520                    are.WaitOne();
521                    rw.success = rw2.success;
522                    rw.Message = rw2.Message;
523                }
524                else
525                {
526                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
527                    if (sr.Status == OperationStatus.KeyNotFound)
528                        rw.success = false;
529                    else
530                        rw.success = true;
531                }
532            }
533            //unblock WaitHandle in the synchronous method
534            rw.WaitHandle.Set();
535            // don't know if this accelerates the system...
536            this.waitDict.Remove(sr.Guid);
537        }
538
539        private void OnSecondTrialStoring(StoreResult sr)
540        {
541            ResponseWait rw;
542            if (this.waitDict.TryGetValue(sr.Guid, out rw))
543            {
544                if (sr.Status == OperationStatus.Failure)
545                {
546                    //Abort storing, because it's already the second trial
547                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
548                    rw.success = false;
549                }
550                else
551                {
552                    //works the second trial, so it was the versioning system
553                    rw.success = true;
554                }
555            }
556            //unblock WaitHandle in the synchronous method
557            rw.WaitHandle.Set();
558            // don't know if this accelerates the system...
559            this.waitDict.Remove(sr.Guid);
560        }
561
562        #endregion
563
564        /// <summary>
565        /// Get the value of the given DHT Key or null, if it doesn't exist.
566        /// For synchronous environments use the Synch* methods.
567        /// </summary>
568        /// <param name="sKey">Key of DHT Entry</param>
569        /// <returns>Value of DHT Entry</returns>
570        public byte[] SynchRetrieve(string sKey)
571        {
572            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
573
574            AutoResetEvent are = new AutoResetEvent(false);
575            // this method returns always a GUID to distinguish between asynchronous actions
576
577            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
578
579            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
580
581            ResponseWait rw = new ResponseWait() { WaitHandle = are };
582
583            waitDict.Add(g, rw);
584            // blocking till response
585            are.WaitOne();
586
587            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
588
589            //Rückgabe der Daten
590            return rw.Message;
591        }
592
593        /// <summary>
594        /// Callback for a the synchronized retrieval method
595        /// </summary>
596        /// <param name="rr"></param>
597        private void OnSynchRetrieveCompleted(RetrieveResult rr)
598        {
599            LogToMonitor(rr.Guid.ToString());
600
601            ResponseWait rw;
602
603            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
604
605            if (this.waitDict.TryGetValue(rr.Guid, out rw))
606            {
607                // successful as long as no error occured
608                rw.success = true;
609                if (rr.Status == OperationStatus.Failure)
610                {
611                    rw.Message = null;
612                    rw.success = false;
613                }
614                else if (rr.Status == OperationStatus.KeyNotFound)
615                    rw.Message = null;
616                else
617                    rw.Message = rr.Data;
618
619                //unblock WaitHandle in the synchronous method
620                rw.WaitHandle.Set();
621                // don't know if this accelerates the system...
622                this.waitDict.Remove(rr.Guid);
623            }
624        }
625        /// <summary>
626        /// Removes a key/value pair out of the DHT
627        /// </summary>
628        /// <param name="sKey">Key of the DHT Entry</param>
629        /// <returns>True, when removing is completed!</returns>
630        public bool SynchRemove(string sKey)
631        {
632            LogToMonitor("Begin SynchRemove. Key: " + sKey);
633
634            AutoResetEvent are = new AutoResetEvent(false);
635            // this method returns always a GUID to distinguish between asynchronous actions
636            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
637            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
638
639            ResponseWait rw = new ResponseWait() { WaitHandle = are };
640
641            waitDict.Add(g, rw);
642            // blocking till response
643            are.WaitOne();
644
645            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
646
647            return rw.success;
648        }
649
650        /// <summary>
651        /// Callback for a the synchronized remove method
652        /// </summary>
653        /// <param name="rr"></param>
654        private void OnSynchRemoveCompleted(RemoveResult rr)
655        {
656            ResponseWait rw;
657            if (this.waitDict.TryGetValue(rr.Guid, out rw))
658            {
659                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
660
661                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
662                    rw.success = false;
663                else
664                    rw.success = true;
665
666                //unblock WaitHandle in the synchronous method
667                rw.WaitHandle.Set();
668                // don't know if this accelerates the system...
669                this.waitDict.Remove(rr.Guid);
670            }
671        }
672
673        #endregion
674
675        /// <summary>
676        /// To log the internal state in the Monitoring Software of P@play
677        /// </summary>
678        public void LogInternalState()
679        {
680            if (this.dht != null)
681            {
682                this.dht.LogInternalState();
683            }
684        }
685
686        public void LogToMonitor(string sTextToLog)
687        {
688            if (AllowLoggingToMonitor)
689                Log.Debug(sTextToLog);
690        }
691
692    }
693}
Note: See TracBrowser for help on using the repository browser.