source: trunk/CrypP2P/Internal/P2PBase.cs @ 1433

Last change on this file since 1433 was 1433, checked in by Paul Lelgemann, 12 years ago

o Extracted common classes from PeerToPeerBase plugin into new PeerToPeer plugin as a preparation for the new P2P proxy
o Modified directory properties to ignore the CrypBuild directory

File size: 27.5 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2POverlay.Bootstrapper;
23using PeersAtPlay.P2POverlay;
24using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
25using PeersAtPlay.P2POverlay.FullMeshOverlay;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
28using System.Threading;
29using Cryptool.PluginBase.Control;
30using System.ComponentModel;
31using PeersAtPlay;
32using PeersAtPlay.Util.Logging;
33using Gears4Net;
34using Cryptool.Plugins.PeerToPeer.Internal;
35
36/* TODO:
37 * - Catch errors, which can occur when using the DHT (network-based errors)
38 */
39
40/* - Synchronous functions successfully tested (store, retrieve)
41 * - The DHT has an integrated versioning system. When a peer wants
42 *   to store data in an entry, which already holds data, the version
43 *   number will be compared with the peers' version number. If the
44 *   peer hasn't read/write the entry the last time, the storing instruction
45 *   will be rejected. You must first read the actual data and than you can
46 *   store your data in this entry...
47 *
48 * INFO:
49 * - Have considered the DHT-own versioning system in the SynchStore method.
50 *   If this versioning system will be abolished, the SynchStore method must
51 *   be change!
52 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
53 *   certification stuff runs now
54 *
55 * TODO:
56 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
57 * - Testing asynchronous methods incl. EventHandlers
58 */
59namespace Cryptool.P2P.Internal
60{
61    /* Advantages of this wrapper class:
62     * - The PeerAtPlay-Libraries are only referenced in this project
63     *   --> so they're easy to update
64     * - PeerAtPlay only works with asynchronous methods, so this class
65     *   "synchronizes" this methods.
66     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
67     *   issue is obfuscated by this wrapper class
68     */
69    /// <summary>
70    /// Wrapper class to integrate peer@play environment into CrypTool.
71    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
72    /// </summary>
73    public class P2PBase
74    {
75        #region Delegates and Events for asynchronous p2p functions
76
77        public delegate void SystemJoined();
78        public event SystemJoined OnSystemJoined;
79
80        public delegate void SystemLeft();
81        public event SystemLeft OnSystemLeft;
82
83        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
84        public event P2PMessageReceived OnP2PMessageReceived;
85
86        /// <summary>
87        /// returns true if key-value-pair is successfully stored in the DHT
88        /// </summary>
89        /// <param name="result"></param>
90        public delegate void DHTStoreCompleted(bool result);
91        public event DHTStoreCompleted OnDhtStore_Completed;
92
93        public delegate void DHTLoadCompleted(byte[] loadedData);
94        public event DHTLoadCompleted OnDhtLoad_Completed;
95
96        /// <summary>
97        /// returns true if key was found and removed successfully from the DHT
98        /// </summary>
99        /// <param name="result"></param>
100        public delegate void DHTRemoveCompleted(bool result);
101        public event DHTRemoveCompleted OnDhtRemove_Completed;
102
103        #endregion
104
105        #region Variables
106
107        private bool allowLoggingToMonitor;
108        /// <summary>
109        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
110        /// </summary>
111        public bool AllowLoggingToMonitor
112        {
113            get { return this.allowLoggingToMonitor; }
114            set { this.allowLoggingToMonitor = value; }
115        }
116
117        private const bool ALLOW_LOGGING_TO_MONITOR = true;
118
119        private bool started = false;
120        /// <summary>
121        /// True if system was successfully joined, false if system is COMPLETELY left
122        /// </summary>
123        public bool Started
124        {
125            get { return this.started; }
126            private set { this.started = value; }
127        }
128
129        private IDHT dht;
130        private IP2PLinkManager linkmanager;
131        private IBootstrapper bootstrapper;
132        private P2POverlay overlay;
133        private AutoResetEvent systemJoined;
134        private AutoResetEvent systemLeft;
135
136        /// <summary>
137        /// Dictionary for synchronizing asynchronous DHT retrieves.
138        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
139        /// </summary>
140        private Dictionary<Guid, ResponseWait> waitDict;
141
142        #endregion
143
144        public P2PBase()
145        {
146            this.waitDict = new Dictionary<Guid, ResponseWait>();
147            this.systemJoined = new AutoResetEvent(false);
148            this.systemLeft = new AutoResetEvent(false);
149        }
150
151        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
152
153        public void Initialize(P2PSettings p2pSettings)
154        {
155            Initialize(p2pSettings.PeerName, p2pSettings.WorldName,
156                (P2PLinkManagerType)p2pSettings.LinkManagerType, (P2PBootstrapperType)p2pSettings.BSType,
157                (P2POverlayType)p2pSettings.OverlayType, (P2PDHTType)p2pSettings.DhtType);
158        }
159
160        /// <summary>
161        /// Initializing is the first step to build a new or access an existing p2p network
162        /// </summary>
163        /// <param name="sUserName">Choose an individual name for the user</param>
164        /// <param name="sWorldName">fundamental: two peers are only in the SAME
165        /// P2P system, when they initialized the SAME WORLD!</param>
166        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
167        /// (tunneling the P2P connection through NATs and Firewalls), you have to
168        /// set this flag to true</param>
169        /// <param name="linkManagerType"></param>
170        /// <param name="bsType"></param>
171        /// <param name="overlayType"></param>
172        /// <param name="dhtType"></param>
173        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
174        {
175            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
176
177            Scheduler scheduler = new STAScheduler("pap");
178
179            switch (linkManagerType)
180            {
181                case P2PLinkManagerType.Snal:
182                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
183                    // NAT-Traversal stuff needs a different Snal-Version
184                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
185
186                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
187                    settings.LoadDefaults();
188                    settings.ConnectInternal = true;
189                    settings.LocalReceivingPort = 0;
190                    settings.UseLocalAddressDetection = false;
191                    settings.AutoReconnect = false;
192                    settings.NoDelay = false;
193                    settings.ReuseAddress = false;
194                    settings.UseNetworkMonitorServer = true;
195
196                    this.linkmanager.Settings = settings;
197                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
198
199                    break;
200                default:
201                    throw (new NotImplementedException());
202            }
203            switch (bsType)
204            {
205                case P2PBootstrapperType.LocalMachineBootstrapper:
206                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
207                    this.bootstrapper = new LocalMachineBootstrapper();
208                    break;
209                case P2PBootstrapperType.IrcBootstrapper:
210                    // setup nat traversal stuff
211                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
212                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
213                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
214                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
215
216                    this.bootstrapper = new IrcBootstrapper(scheduler);
217                    break;
218                default:
219                    throw (new NotImplementedException());
220            }
221            switch (overlayType)
222            {
223                case P2POverlayType.FullMeshOverlay:
224                    // changing overlay example: this.overlay = new ChordOverlay();
225                    this.overlay = new FullMeshOverlay(scheduler);
226                    break;
227                default:
228                    throw (new NotImplementedException());
229            }
230            switch (dhtType)
231            {
232                case P2PDHTType.FullMeshDHT:
233                    this.dht = new FullMeshDHT(scheduler);
234                    break;
235                default:
236                    throw (new NotImplementedException());
237            }
238            #endregion
239
240            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
241            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
242            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
243
244            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
245        }
246
247        /// <summary>
248        /// Starts the P2P System. When the given P2P world doesn't exist yet,
249        /// inclusive creating the and bootstrapping to the P2P network.
250        /// In either case joining the P2P world.
251        /// This synchronized method returns true not before the peer has
252        /// successfully joined the network (this may take one or two minutes).
253        /// </summary>
254        /// <returns>True, if the peer has completely joined the p2p network</returns>
255        public bool SynchStart()
256        {
257            //Start != system joined
258            //Only starts the system asynchronous, the possible callback is useless,
259            //because it's invoked before the peer completly joined the P2P system
260            this.dht.BeginStart(null);
261            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
262            this.systemJoined.WaitOne();
263            return true;
264        }
265
266        /// <summary>
267        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
268        /// </summary>
269        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
270        public bool SynchStop()
271        {
272            if (this.dht != null)
273            {
274                this.dht.BeginStop(null);
275                //don't stop anything else, because BOOM
276
277                //wait till systemLeft Event is invoked
278                this.systemLeft.WaitOne();
279            }
280            return true;
281        }
282
283        /// <summary>
284        /// Asynchronously starting the peer. When the given P2P world doesn't
285        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
286        /// In either case joining the P2P world. To ensure that peer has successfully
287        /// joined the p2p world, catch the event OnSystemJoined.
288        /// </summary>
289        public void AsynchStart()
290        {
291            // no callback usefull, because starting and joining isn't the same
292            // everything else is done by the EventHandler OnDHT_SystemJoined
293            this.dht.BeginStart(null);
294        }
295
296        /// <summary>
297        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
298        /// disjoining, catch the event OnDHT_SystemLeft.
299        /// </summary>
300        public void AsynchStop()
301        {
302            if (this.dht != null)
303            {
304                // no callback usefull.
305                // Everything else is done by the EventHandler OnDHT_SystemLeft
306                this.dht.BeginStop(null);
307            }
308        }
309
310        #endregion
311
312        /// <summary>
313        /// Get PeerName of the actual peer
314        /// </summary>
315        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
316        /// <returns>PeerID as a String</returns>
317        public PeerId GetPeerID(out string sPeerName)
318        {
319            sPeerName = this.linkmanager.UserName;
320            return new PeerId(this.overlay.LocalAddress);
321        }
322
323        /// <summary>
324        /// Construct PeerId object for a specific byte[] id
325        /// </summary>
326        /// <param name="byteId">overlay address as byte array</param>
327        /// <returns>corresponding PeerId for given byte[] id</returns>
328        public PeerId GetPeerID(byte[] byteId)
329        {
330            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
331            return new PeerId(this.overlay.GetAddress(byteId));
332        }
333
334        // overlay.LocalAddress = Overlay-Peer-Address/Names
335        public void SendToPeer(byte[] data, byte[] destinationPeer)
336        {
337            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
338            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
339            ByteStack stackData = new ByteStack(realStackSize);
340
341            stackData.Push(data);
342
343            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
344            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
345                this.overlay.LocalAddress, destinationAddr, stackData);
346            this.overlay.Send(overlayMsg);
347        }
348
349        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
350        {
351            if (OnP2PMessageReceived != null)
352            {
353                PeerId pid = new PeerId(e.Message.Source);
354                /* You have to fire this event asynchronous, because the main
355                 * thread will be stopped in this wrapper class for synchronizing
356                 * the asynchronous stuff (AutoResetEvent) --> so this could run
357                 * into a deadlock, when you fire this event synchronous (normal Invoke)
358                 * ATTENTION: This could change the invocation order!!! In my case
359                              no problem, but maybe in future cases... */
360                OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
361                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
362            }
363        }
364
365        #region Event Handling (System Joined, Left and Message Received)
366
367        private void OnDHT_SystemJoined(object sender, EventArgs e)
368        {
369            if (OnSystemJoined != null)
370                OnSystemJoined();
371            this.systemJoined.Set();
372            Started = true;
373        }
374
375        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
376        {
377            if (OnSystemLeft != null)
378                OnSystemLeft();
379            // as an experiment
380            this.dht = null;
381            this.systemLeft.Set();
382            Started = false;
383
384            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
385        }
386
387        #endregion
388
389        /* Attention: The asynchronous methods are not tested at the moment */
390        #region Asynchronous Methods incl. Callbacks
391
392        /// <summary>
393        /// Asynchronously retrieving a key from the DHT. To get value, catch
394        /// event OnDhtLoad_Completed.
395        /// </summary>
396        /// <param name="sKey">Existing key in DHT</param>
397        public void AsynchRetrieve(string sKey)
398        {
399            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
400        }
401        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
402        {
403            if (OnDhtLoad_Completed != null)
404            {
405                OnDhtLoad_Completed(rr.Data);
406            }
407        }
408
409        /// <summary>
410        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
411        /// storing is completed, catch event OnDhtStore_Completed.
412        /// </summary>
413        /// <param name="sKey"></param>
414        /// <param name="sValue"></param>
415        public void AsynchStore(string sKey, string sValue)
416        {
417            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
418            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
419        }
420
421        private void OnAsynchStore_Completed(StoreResult sr)
422        {
423            if (OnDhtStore_Completed != null)
424            {
425                if (sr.Status == OperationStatus.Success)
426                    OnDhtStore_Completed(true);
427                else
428                    OnDhtStore_Completed(false);
429            }
430
431        }
432
433        /// <summary>
434        /// Asynchronously removing an existing key out of the DHT. To ensure
435        /// that removing is completed, catch event OnDhtRemove_Completed.
436        /// </summary>
437        /// <param name="sKey"></param>
438        public void AsynchRemove(string sKey)
439        {
440            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
441            this.dht.Remove(OnAsynchRemove_Completed, sKey);
442        }
443        private void OnAsynchRemove_Completed(RemoveResult rr)
444        {
445            if (OnDhtRemove_Completed != null)
446            {
447                if (rr.Status == OperationStatus.Success)
448                    OnDhtRemove_Completed(true);
449                else
450                    OnDhtRemove_Completed(false);
451            }
452        }
453
454        #endregion
455
456        #region Synchronous Methods incl. Callbacks
457
458        #region SynchStore incl.Callback and SecondTrialCallback
459
460        /// <summary>
461        /// Stores a value in the DHT at the given key
462        /// </summary>
463        /// <param name="sKey">Key of DHT Entry</param>
464        /// <param name="byteData">Value of DHT Entry</param>
465        /// <returns>True, when storing is completed!</returns>
466        public bool SynchStore(string sKey, byte[] byteData)
467        {
468            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
469            AutoResetEvent are = new AutoResetEvent(false);
470            // this method returns always a GUID to distinguish between asynchronous actions
471            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
472
473            ResponseWait rw = new ResponseWait() { WaitHandle = are, key = sKey, value = byteData };
474
475            waitDict.Add(g, rw);
476            //blocking till response
477            are.WaitOne();
478
479            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
480
481            return rw.success;
482        }
483
484        /// <summary>
485        /// Stores a value in the DHT at the given key
486        /// </summary>
487        /// <param name="sKey">Key of DHT Entry</param>
488        /// <param name="sValue">Value of DHT Entry</param>
489        /// <returns>True, when storing is completed!</returns>
490        public bool SynchStore(string sKey, string sData)
491        {
492            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
493        }
494        /// <summary>
495        /// Callback for a the synchronized store method
496        /// </summary>
497        /// <param name="rr"></param>
498        private void OnSynchStoreCompleted(StoreResult sr)
499        {
500            ResponseWait rw;
501            if (this.waitDict.TryGetValue(sr.Guid, out rw))
502            {
503                // if Status == Error, than the version of the value is out of date.
504                // There is a versioning system in the DHT. So you must retrieve the
505                // key and than store the new value
506                if (sr.Status == OperationStatus.Failure)
507                {
508                    byte[] byteTemp = this.SynchRetrieve(rw.key);
509
510                    // Only try a second time. When it's still not possible, abort storing
511                    AutoResetEvent are = new AutoResetEvent(false);
512                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
513                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
514
515                    waitDict.Add(g, rw2);
516                    // blocking till response
517                    are.WaitOne();
518                    rw.success = rw2.success;
519                    rw.Message = rw2.Message;
520                }
521                else
522                {
523                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
524                    if (sr.Status == OperationStatus.KeyNotFound)
525                        rw.success = false;
526                    else
527                        rw.success = true;
528                }
529            }
530            //unblock WaitHandle in the synchronous method
531            rw.WaitHandle.Set();
532            // don't know if this accelerates the system...
533            this.waitDict.Remove(sr.Guid);
534        }
535
536        private void OnSecondTrialStoring(StoreResult sr)
537        {
538            ResponseWait rw;
539            if (this.waitDict.TryGetValue(sr.Guid, out rw))
540            {
541                if (sr.Status == OperationStatus.Failure)
542                {
543                    //Abort storing, because it's already the second trial
544                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
545                    rw.success = false;
546                }
547                else
548                {
549                    //works the second trial, so it was the versioning system
550                    rw.success = true;
551                }
552            }
553            //unblock WaitHandle in the synchronous method
554            rw.WaitHandle.Set();
555            // don't know if this accelerates the system...
556            this.waitDict.Remove(sr.Guid);
557        }
558
559        #endregion
560
561        /// <summary>
562        /// Get the value of the given DHT Key or null, if it doesn't exist.
563        /// For synchronous environments use the Synch* methods.
564        /// </summary>
565        /// <param name="sKey">Key of DHT Entry</param>
566        /// <returns>Value of DHT Entry</returns>
567        public byte[] SynchRetrieve(string sKey)
568        {
569            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
570
571            AutoResetEvent are = new AutoResetEvent(false);
572            // this method returns always a GUID to distinguish between asynchronous actions
573
574            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
575
576            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
577
578            ResponseWait rw = new ResponseWait() { WaitHandle = are };
579
580            waitDict.Add(g, rw);
581            // blocking till response
582            are.WaitOne();
583
584            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
585
586            //Rückgabe der Daten
587            return rw.Message;
588        }
589
590        /// <summary>
591        /// Callback for a the synchronized retrieval method
592        /// </summary>
593        /// <param name="rr"></param>
594        private void OnSynchRetrieveCompleted(RetrieveResult rr)
595        {
596            LogToMonitor(rr.Guid.ToString());
597
598            ResponseWait rw;
599
600            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
601
602            if (this.waitDict.TryGetValue(rr.Guid, out rw))
603            {
604                // successful as long as no error occured
605                rw.success = true;
606                if (rr.Status == OperationStatus.Failure)
607                {
608                    rw.Message = null;
609                    rw.success = false;
610                }
611                else if (rr.Status == OperationStatus.KeyNotFound)
612                    rw.Message = null;
613                else
614                    rw.Message = rr.Data;
615
616                //unblock WaitHandle in the synchronous method
617                rw.WaitHandle.Set();
618                // don't know if this accelerates the system...
619                this.waitDict.Remove(rr.Guid);
620            }
621        }
622        /// <summary>
623        /// Removes a key/value pair out of the DHT
624        /// </summary>
625        /// <param name="sKey">Key of the DHT Entry</param>
626        /// <returns>True, when removing is completed!</returns>
627        public bool SynchRemove(string sKey)
628        {
629            LogToMonitor("Begin SynchRemove. Key: " + sKey);
630
631            AutoResetEvent are = new AutoResetEvent(false);
632            // this method returns always a GUID to distinguish between asynchronous actions
633            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
634            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
635
636            ResponseWait rw = new ResponseWait() { WaitHandle = are };
637
638            waitDict.Add(g, rw);
639            // blocking till response
640            are.WaitOne();
641
642            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
643
644            return rw.success;
645        }
646
647        /// <summary>
648        /// Callback for a the synchronized remove method
649        /// </summary>
650        /// <param name="rr"></param>
651        private void OnSynchRemoveCompleted(RemoveResult rr)
652        {
653            ResponseWait rw;
654            if (this.waitDict.TryGetValue(rr.Guid, out rw))
655            {
656                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
657
658                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
659                    rw.success = false;
660                else
661                    rw.success = true;
662
663                //unblock WaitHandle in the synchronous method
664                rw.WaitHandle.Set();
665                // don't know if this accelerates the system...
666                this.waitDict.Remove(rr.Guid);
667            }
668        }
669
670        #endregion
671
672        /// <summary>
673        /// To log the internal state in the Monitoring Software of P@play
674        /// </summary>
675        public void LogInternalState()
676        {
677            if (this.dht != null)
678            {
679                this.dht.LogInternalState();
680            }
681        }
682
683        public void LogToMonitor(string sTextToLog)
684        {
685            if (AllowLoggingToMonitor)
686                Log.Debug(sTextToLog);
687        }
688
689    }
690}
Note: See TracBrowser for help on using the repository browser.