source: trunk/CrypP2P/Internal/P2PBase.cs @ 1436

Last change on this file since 1436 was 1436, checked in by Paul Lelgemann, 12 years ago

+ PeerToPeerBaseProxy: events are handled now, but functionality for stopping the workspace still missing

File size: 27.8 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2POverlay.Bootstrapper;
23using PeersAtPlay.P2POverlay;
24using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
25using PeersAtPlay.P2POverlay.FullMeshOverlay;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
28using System.Threading;
29using Cryptool.PluginBase.Control;
30using System.ComponentModel;
31using PeersAtPlay;
32using PeersAtPlay.Util.Logging;
33using Gears4Net;
34using Cryptool.Plugins.PeerToPeer.Internal;
35
36/* TODO:
37 * - Catch errors, which can occur when using the DHT (network-based errors)
38 */
39
40/* - Synchronous functions successfully tested (store, retrieve)
41 * - The DHT has an integrated versioning system. When a peer wants
42 *   to store data in an entry, which already holds data, the version
43 *   number will be compared with the peers' version number. If the
44 *   peer hasn't read/write the entry the last time, the storing instruction
45 *   will be rejected. You must first read the actual data and than you can
46 *   store your data in this entry...
47 *
48 * INFO:
49 * - Have considered the DHT-own versioning system in the SynchStore method.
50 *   If this versioning system will be abolished, the SynchStore method must
51 *   be change!
52 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
53 *   certification stuff runs now
54 *
55 * TODO:
56 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
57 * - Testing asynchronous methods incl. EventHandlers
58 */
59namespace Cryptool.P2P.Internal
60{
61    /* Advantages of this wrapper class:
62     * - The PeerAtPlay-Libraries are only referenced in this project
63     *   --> so they're easy to update
64     * - PeerAtPlay only works with asynchronous methods, so this class
65     *   "synchronizes" this methods.
66     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
67     *   issue is obfuscated by this wrapper class
68     */
69    /// <summary>
70    /// Wrapper class to integrate peer@play environment into CrypTool.
71    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
72    /// </summary>
73    public class P2PBase
74    {
75        #region Delegates and Events for asynchronous p2p functions
76
77        public delegate void SystemJoined();
78        public event SystemJoined OnSystemJoined;
79
80        public delegate void SystemLeft();
81        public event SystemLeft OnSystemLeft;
82
83        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
84        public event P2PMessageReceived OnP2PMessageReceived;
85
86        /// <summary>
87        /// returns true if key-value-pair is successfully stored in the DHT
88        /// </summary>
89        /// <param name="result"></param>
90        public delegate void DHTStoreCompleted(bool result);
91        public event DHTStoreCompleted OnDhtStore_Completed;
92
93        public delegate void DHTLoadCompleted(byte[] loadedData);
94        public event DHTLoadCompleted OnDhtLoad_Completed;
95
96        /// <summary>
97        /// returns true if key was found and removed successfully from the DHT
98        /// </summary>
99        /// <param name="result"></param>
100        public delegate void DHTRemoveCompleted(bool result);
101        public event DHTRemoveCompleted OnDhtRemove_Completed;
102
103        #endregion
104
105        #region Variables
106
107        private bool allowLoggingToMonitor;
108        /// <summary>
109        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
110        /// </summary>
111        public bool AllowLoggingToMonitor
112        {
113            get { return this.allowLoggingToMonitor; }
114            set { this.allowLoggingToMonitor = value; }
115        }
116
117        private const bool ALLOW_LOGGING_TO_MONITOR = true;
118
119        private bool started = false;
120        /// <summary>
121        /// True if system was successfully joined, false if system is COMPLETELY left
122        /// </summary>
123        public bool Started
124        {
125            get { return this.started; }
126            private set { this.started = value; }
127        }
128
129        private IDHT dht;
130        private IP2PLinkManager linkmanager;
131        private IBootstrapper bootstrapper;
132        private P2POverlay overlay;
133        private AutoResetEvent systemJoined;
134        private AutoResetEvent systemLeft;
135
136        /// <summary>
137        /// Dictionary for synchronizing asynchronous DHT retrieves.
138        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
139        /// </summary>
140        private Dictionary<Guid, ResponseWait> waitDict;
141
142        #endregion
143
144        public P2PBase()
145        {
146            this.waitDict = new Dictionary<Guid, ResponseWait>();
147            this.systemJoined = new AutoResetEvent(false);
148            this.systemLeft = new AutoResetEvent(false);
149        }
150
151        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
152
153        public void Initialize(P2PSettings p2pSettings)
154        {
155            Initialize(p2pSettings.PeerName, p2pSettings.WorldName,
156                (P2PLinkManagerType)p2pSettings.LinkManagerType, (P2PBootstrapperType)p2pSettings.BSType,
157                (P2POverlayType)p2pSettings.OverlayType, (P2PDHTType)p2pSettings.DhtType);
158        }
159
160        /// <summary>
161        /// Initializing is the first step to build a new or access an existing p2p network
162        /// </summary>
163        /// <param name="sUserName">Choose an individual name for the user</param>
164        /// <param name="sWorldName">fundamental: two peers are only in the SAME
165        /// P2P system, when they initialized the SAME WORLD!</param>
166        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
167        /// (tunneling the P2P connection through NATs and Firewalls), you have to
168        /// set this flag to true</param>
169        /// <param name="linkManagerType"></param>
170        /// <param name="bsType"></param>
171        /// <param name="overlayType"></param>
172        /// <param name="dhtType"></param>
173        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
174        {
175            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
176
177            Scheduler scheduler = new STAScheduler("pap");
178
179            switch (linkManagerType)
180            {
181                case P2PLinkManagerType.Snal:
182                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
183                    // NAT-Traversal stuff needs a different Snal-Version
184                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
185
186                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
187                    settings.LoadDefaults();
188                    settings.ConnectInternal = true;
189                    settings.LocalReceivingPort = 0;
190                    settings.UseLocalAddressDetection = false;
191                    settings.AutoReconnect = false;
192                    settings.NoDelay = false;
193                    settings.ReuseAddress = false;
194                    settings.UseNetworkMonitorServer = true;
195
196                    this.linkmanager.Settings = settings;
197                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
198
199                    break;
200                default:
201                    throw (new NotImplementedException());
202            }
203            switch (bsType)
204            {
205                case P2PBootstrapperType.LocalMachineBootstrapper:
206                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
207                    this.bootstrapper = new LocalMachineBootstrapper();
208                    break;
209                case P2PBootstrapperType.IrcBootstrapper:
210                    // setup nat traversal stuff
211                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
212                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
213                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
214                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
215
216                    this.bootstrapper = new IrcBootstrapper(scheduler);
217                    break;
218                default:
219                    throw (new NotImplementedException());
220            }
221            switch (overlayType)
222            {
223                case P2POverlayType.FullMeshOverlay:
224                    // changing overlay example: this.overlay = new ChordOverlay();
225                    this.overlay = new FullMeshOverlay(scheduler);
226                    break;
227                default:
228                    throw (new NotImplementedException());
229            }
230            switch (dhtType)
231            {
232                case P2PDHTType.FullMeshDHT:
233                    this.dht = new FullMeshDHT(scheduler);
234                    break;
235                default:
236                    throw (new NotImplementedException());
237            }
238            #endregion
239
240            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
241            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
242            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
243
244            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
245        }
246
247        /// <summary>
248        /// Starts the P2P System. When the given P2P world doesn't exist yet,
249        /// inclusive creating the and bootstrapping to the P2P network.
250        /// In either case joining the P2P world.
251        /// This synchronized method returns true not before the peer has
252        /// successfully joined the network (this may take one or two minutes).
253        /// </summary>
254        /// <returns>True, if the peer has completely joined the p2p network</returns>
255        public bool SynchStart()
256        {
257            //Start != system joined
258            //Only starts the system asynchronous, the possible callback is useless,
259            //because it's invoked before the peer completly joined the P2P system
260            this.dht.BeginStart(null);
261            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
262            this.systemJoined.WaitOne();
263            return true;
264        }
265
266        /// <summary>
267        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
268        /// </summary>
269        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
270        public bool SynchStop()
271        {
272            if (this.dht != null)
273            {
274                this.dht.BeginStop(null);
275                //don't stop anything else, because BOOM
276
277                //wait till systemLeft Event is invoked
278                this.systemLeft.WaitOne();
279            }
280            return true;
281        }
282
283        /// <summary>
284        /// Asynchronously starting the peer. When the given P2P world doesn't
285        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
286        /// In either case joining the P2P world. To ensure that peer has successfully
287        /// joined the p2p world, catch the event OnSystemJoined.
288        /// </summary>
289        public void AsynchStart()
290        {
291            // no callback usefull, because starting and joining isn't the same
292            // everything else is done by the EventHandler OnDHT_SystemJoined
293            this.dht.BeginStart(null);
294        }
295
296        /// <summary>
297        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
298        /// disjoining, catch the event OnDHT_SystemLeft.
299        /// </summary>
300        public void AsynchStop()
301        {
302            if (this.dht != null)
303            {
304                // no callback usefull.
305                // Everything else is done by the EventHandler OnDHT_SystemLeft
306                this.dht.BeginStop(null);
307            }
308        }
309
310        #endregion
311
312        /// <summary>
313        /// Get PeerName of the actual peer
314        /// </summary>
315        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
316        /// <returns>PeerID as a String</returns>
317        public PeerId GetPeerID(out string sPeerName)
318        {
319            sPeerName = this.linkmanager.UserName;
320            return new PeerId(this.overlay.LocalAddress);
321        }
322
323        /// <summary>
324        /// Construct PeerId object for a specific byte[] id
325        /// </summary>
326        /// <param name="byteId">overlay address as byte array</param>
327        /// <returns>corresponding PeerId for given byte[] id</returns>
328        public PeerId GetPeerID(byte[] byteId)
329        {
330            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
331            return new PeerId(this.overlay.GetAddress(byteId));
332        }
333
334        // overlay.LocalAddress = Overlay-Peer-Address/Names
335        public void SendToPeer(byte[] data, byte[] destinationPeer)
336        {
337            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
338            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
339            ByteStack stackData = new ByteStack(realStackSize);
340
341            stackData.Push(data);
342
343            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
344            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
345                this.overlay.LocalAddress, destinationAddr, stackData);
346            this.overlay.Send(overlayMsg);
347        }
348
349        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
350        {
351            if (OnP2PMessageReceived != null)
352            {
353                PeerId pid = new PeerId(e.Message.Source);
354                /* You have to fire this event asynchronous, because the main
355                 * thread will be stopped in this wrapper class for synchronizing
356                 * the asynchronous stuff (AutoResetEvent) --> so this could run
357                 * into a deadlock, when you fire this event synchronous (normal Invoke)
358                 * ATTENTION: This could change the invocation order!!! In my case
359                              no problem, but maybe in future cases... */
360
361                // TODO: not safe: The delegate must have only one target
362                //OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
363
364                foreach (Delegate del in OnP2PMessageReceived.GetInvocationList())
365                {
366                    del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
367                }
368
369                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
370            }
371        }
372
373        #region Event Handling (System Joined, Left and Message Received)
374
375        private void OnDHT_SystemJoined(object sender, EventArgs e)
376        {
377            if (OnSystemJoined != null)
378                OnSystemJoined();
379            this.systemJoined.Set();
380            Started = true;
381        }
382
383        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
384        {
385            if (OnSystemLeft != null)
386                OnSystemLeft();
387            // as an experiment
388            this.dht = null;
389            this.systemLeft.Set();
390            Started = false;
391
392            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
393        }
394
395        #endregion
396
397        /* Attention: The asynchronous methods are not tested at the moment */
398        #region Asynchronous Methods incl. Callbacks
399
400        /// <summary>
401        /// Asynchronously retrieving a key from the DHT. To get value, catch
402        /// event OnDhtLoad_Completed.
403        /// </summary>
404        /// <param name="sKey">Existing key in DHT</param>
405        public void AsynchRetrieve(string sKey)
406        {
407            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
408        }
409        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
410        {
411            if (OnDhtLoad_Completed != null)
412            {
413                OnDhtLoad_Completed(rr.Data);
414            }
415        }
416
417        /// <summary>
418        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
419        /// storing is completed, catch event OnDhtStore_Completed.
420        /// </summary>
421        /// <param name="sKey"></param>
422        /// <param name="sValue"></param>
423        public void AsynchStore(string sKey, string sValue)
424        {
425            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
426            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
427        }
428
429        private void OnAsynchStore_Completed(StoreResult sr)
430        {
431            if (OnDhtStore_Completed != null)
432            {
433                if (sr.Status == OperationStatus.Success)
434                    OnDhtStore_Completed(true);
435                else
436                    OnDhtStore_Completed(false);
437            }
438
439        }
440
441        /// <summary>
442        /// Asynchronously removing an existing key out of the DHT. To ensure
443        /// that removing is completed, catch event OnDhtRemove_Completed.
444        /// </summary>
445        /// <param name="sKey"></param>
446        public void AsynchRemove(string sKey)
447        {
448            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
449            this.dht.Remove(OnAsynchRemove_Completed, sKey);
450        }
451        private void OnAsynchRemove_Completed(RemoveResult rr)
452        {
453            if (OnDhtRemove_Completed != null)
454            {
455                if (rr.Status == OperationStatus.Success)
456                    OnDhtRemove_Completed(true);
457                else
458                    OnDhtRemove_Completed(false);
459            }
460        }
461
462        #endregion
463
464        #region Synchronous Methods incl. Callbacks
465
466        #region SynchStore incl.Callback and SecondTrialCallback
467
468        /// <summary>
469        /// Stores a value in the DHT at the given key
470        /// </summary>
471        /// <param name="sKey">Key of DHT Entry</param>
472        /// <param name="byteData">Value of DHT Entry</param>
473        /// <returns>True, when storing is completed!</returns>
474        public bool SynchStore(string sKey, byte[] byteData)
475        {
476            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
477            AutoResetEvent are = new AutoResetEvent(false);
478            // this method returns always a GUID to distinguish between asynchronous actions
479            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
480
481            ResponseWait rw = new ResponseWait() { WaitHandle = are, key = sKey, value = byteData };
482
483            waitDict.Add(g, rw);
484            //blocking till response
485            are.WaitOne();
486
487            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
488
489            return rw.success;
490        }
491
492        /// <summary>
493        /// Stores a value in the DHT at the given key
494        /// </summary>
495        /// <param name="sKey">Key of DHT Entry</param>
496        /// <param name="sValue">Value of DHT Entry</param>
497        /// <returns>True, when storing is completed!</returns>
498        public bool SynchStore(string sKey, string sData)
499        {
500            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
501        }
502        /// <summary>
503        /// Callback for a the synchronized store method
504        /// </summary>
505        /// <param name="rr"></param>
506        private void OnSynchStoreCompleted(StoreResult sr)
507        {
508            ResponseWait rw;
509            if (this.waitDict.TryGetValue(sr.Guid, out rw))
510            {
511                // if Status == Error, than the version of the value is out of date.
512                // There is a versioning system in the DHT. So you must retrieve the
513                // key and than store the new value
514                if (sr.Status == OperationStatus.Failure)
515                {
516                    byte[] byteTemp = this.SynchRetrieve(rw.key);
517
518                    // Only try a second time. When it's still not possible, abort storing
519                    AutoResetEvent are = new AutoResetEvent(false);
520                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
521                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
522
523                    waitDict.Add(g, rw2);
524                    // blocking till response
525                    are.WaitOne();
526                    rw.success = rw2.success;
527                    rw.Message = rw2.Message;
528                }
529                else
530                {
531                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
532                    if (sr.Status == OperationStatus.KeyNotFound)
533                        rw.success = false;
534                    else
535                        rw.success = true;
536                }
537            }
538            //unblock WaitHandle in the synchronous method
539            rw.WaitHandle.Set();
540            // don't know if this accelerates the system...
541            this.waitDict.Remove(sr.Guid);
542        }
543
544        private void OnSecondTrialStoring(StoreResult sr)
545        {
546            ResponseWait rw;
547            if (this.waitDict.TryGetValue(sr.Guid, out rw))
548            {
549                if (sr.Status == OperationStatus.Failure)
550                {
551                    //Abort storing, because it's already the second trial
552                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
553                    rw.success = false;
554                }
555                else
556                {
557                    //works the second trial, so it was the versioning system
558                    rw.success = true;
559                }
560            }
561            //unblock WaitHandle in the synchronous method
562            rw.WaitHandle.Set();
563            // don't know if this accelerates the system...
564            this.waitDict.Remove(sr.Guid);
565        }
566
567        #endregion
568
569        /// <summary>
570        /// Get the value of the given DHT Key or null, if it doesn't exist.
571        /// For synchronous environments use the Synch* methods.
572        /// </summary>
573        /// <param name="sKey">Key of DHT Entry</param>
574        /// <returns>Value of DHT Entry</returns>
575        public byte[] SynchRetrieve(string sKey)
576        {
577            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
578
579            AutoResetEvent are = new AutoResetEvent(false);
580            // this method returns always a GUID to distinguish between asynchronous actions
581
582            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
583
584            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
585
586            ResponseWait rw = new ResponseWait() { WaitHandle = are };
587
588            waitDict.Add(g, rw);
589            // blocking till response
590            are.WaitOne();
591
592            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
593
594            //Rückgabe der Daten
595            return rw.Message;
596        }
597
598        /// <summary>
599        /// Callback for a the synchronized retrieval method
600        /// </summary>
601        /// <param name="rr"></param>
602        private void OnSynchRetrieveCompleted(RetrieveResult rr)
603        {
604            LogToMonitor(rr.Guid.ToString());
605
606            ResponseWait rw;
607
608            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
609
610            if (this.waitDict.TryGetValue(rr.Guid, out rw))
611            {
612                // successful as long as no error occured
613                rw.success = true;
614                if (rr.Status == OperationStatus.Failure)
615                {
616                    rw.Message = null;
617                    rw.success = false;
618                }
619                else if (rr.Status == OperationStatus.KeyNotFound)
620                    rw.Message = null;
621                else
622                    rw.Message = rr.Data;
623
624                //unblock WaitHandle in the synchronous method
625                rw.WaitHandle.Set();
626                // don't know if this accelerates the system...
627                this.waitDict.Remove(rr.Guid);
628            }
629        }
630        /// <summary>
631        /// Removes a key/value pair out of the DHT
632        /// </summary>
633        /// <param name="sKey">Key of the DHT Entry</param>
634        /// <returns>True, when removing is completed!</returns>
635        public bool SynchRemove(string sKey)
636        {
637            LogToMonitor("Begin SynchRemove. Key: " + sKey);
638
639            AutoResetEvent are = new AutoResetEvent(false);
640            // this method returns always a GUID to distinguish between asynchronous actions
641            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
642            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
643
644            ResponseWait rw = new ResponseWait() { WaitHandle = are };
645
646            waitDict.Add(g, rw);
647            // blocking till response
648            are.WaitOne();
649
650            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
651
652            return rw.success;
653        }
654
655        /// <summary>
656        /// Callback for a the synchronized remove method
657        /// </summary>
658        /// <param name="rr"></param>
659        private void OnSynchRemoveCompleted(RemoveResult rr)
660        {
661            ResponseWait rw;
662            if (this.waitDict.TryGetValue(rr.Guid, out rw))
663            {
664                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
665
666                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
667                    rw.success = false;
668                else
669                    rw.success = true;
670
671                //unblock WaitHandle in the synchronous method
672                rw.WaitHandle.Set();
673                // don't know if this accelerates the system...
674                this.waitDict.Remove(rr.Guid);
675            }
676        }
677
678        #endregion
679
680        /// <summary>
681        /// To log the internal state in the Monitoring Software of P@play
682        /// </summary>
683        public void LogInternalState()
684        {
685            if (this.dht != null)
686            {
687                this.dht.LogInternalState();
688            }
689        }
690
691        public void LogToMonitor(string sTextToLog)
692        {
693            if (AllowLoggingToMonitor)
694                Log.Debug(sTextToLog);
695        }
696
697    }
698}
Note: See TracBrowser for help on using the repository browser.