source: trunk/CrypP2P/Internal/P2PBase.cs @ 1458

Last change on this file since 1458 was 1458, checked in by Paul Lelgemann, 12 years ago
  • Removed unused references and using statement in CrypP2P, PeerToPeer, PeerToPeerBaseProxy

+ Skeleton for P2PEditor

File size: 27.7 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Text;
19using PeersAtPlay.P2PStorage.DHT;
20using PeersAtPlay.P2PStorage.FullMeshDHT;
21using PeersAtPlay.P2POverlay.Bootstrapper;
22using PeersAtPlay.P2POverlay;
23using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
24using PeersAtPlay.P2POverlay.FullMeshOverlay;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
27using System.Threading;
28using PeersAtPlay;
29using PeersAtPlay.Util.Logging;
30using Gears4Net;
31using Cryptool.Plugins.PeerToPeer.Internal;
32
33/* TODO:
34 * - Catch errors, which can occur when using the DHT (network-based errors)
35 */
36
37/* - Synchronous functions successfully tested (store, retrieve)
38 * - The DHT has an integrated versioning system. When a peer wants
39 *   to store data in an entry, which already holds data, the version
40 *   number will be compared with the peers' version number. If the
41 *   peer hasn't read/write the entry the last time, the storing instruction
42 *   will be rejected. You must first read the actual data and than you can
43 *   store your data in this entry...
44 *
45 * INFO:
46 * - Have considered the DHT-own versioning system in the SynchStore method.
47 *   If this versioning system will be abolished, the SynchStore method must
48 *   be change!
49 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
50 *   certification stuff runs now
51 *
52 * TODO:
53 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
54 * - Testing asynchronous methods incl. EventHandlers
55 */
56namespace Cryptool.P2P.Internal
57{
58    /* Advantages of this wrapper class:
59     * - The PeerAtPlay-Libraries are only referenced in this project
60     *   --> so they're easy to update
61     * - PeerAtPlay only works with asynchronous methods, so this class
62     *   "synchronizes" this methods.
63     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
64     *   issue is obfuscated by this wrapper class
65     */
66    /// <summary>
67    /// Wrapper class to integrate peer@play environment into CrypTool.
68    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
69    /// </summary>
70    public class P2PBase
71    {
72        #region Delegates and Events for asynchronous p2p functions
73
74        public delegate void SystemJoined();
75        public event SystemJoined OnSystemJoined;
76
77        public delegate void SystemLeft();
78        public event SystemLeft OnSystemLeft;
79
80        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
81        public event P2PMessageReceived OnP2PMessageReceived;
82
83        /// <summary>
84        /// returns true if key-value-pair is successfully stored in the DHT
85        /// </summary>
86        /// <param name="result"></param>
87        public delegate void DHTStoreCompleted(bool result);
88        public event DHTStoreCompleted OnDhtStore_Completed;
89
90        public delegate void DHTLoadCompleted(byte[] loadedData);
91        public event DHTLoadCompleted OnDhtLoad_Completed;
92
93        /// <summary>
94        /// returns true if key was found and removed successfully from the DHT
95        /// </summary>
96        /// <param name="result"></param>
97        public delegate void DHTRemoveCompleted(bool result);
98        public event DHTRemoveCompleted OnDhtRemove_Completed;
99
100        #endregion
101
102        #region Variables
103
104        private bool allowLoggingToMonitor;
105        /// <summary>
106        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
107        /// </summary>
108        public bool AllowLoggingToMonitor
109        {
110            get { return this.allowLoggingToMonitor; }
111            set { this.allowLoggingToMonitor = value; }
112        }
113
114        private const bool ALLOW_LOGGING_TO_MONITOR = true;
115
116        private bool started = false;
117        /// <summary>
118        /// True if system was successfully joined, false if system is COMPLETELY left
119        /// </summary>
120        public bool Started
121        {
122            get { return this.started; }
123            private set { this.started = value; }
124        }
125
126        private IDHT dht;
127        private IP2PLinkManager linkmanager;
128        private IBootstrapper bootstrapper;
129        private P2POverlay overlay;
130        private AutoResetEvent systemJoined;
131        private AutoResetEvent systemLeft;
132
133        /// <summary>
134        /// Dictionary for synchronizing asynchronous DHT retrieves.
135        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
136        /// </summary>
137        private Dictionary<Guid, ResponseWait> waitDict;
138
139        #endregion
140
141        public P2PBase()
142        {
143            this.waitDict = new Dictionary<Guid, ResponseWait>();
144            this.systemJoined = new AutoResetEvent(false);
145            this.systemLeft = new AutoResetEvent(false);
146        }
147
148        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
149
150        public void Initialize(P2PSettings p2PSettings)
151        {
152            Initialize(p2PSettings.PeerName, p2PSettings.WorldName,
153                (P2PLinkManagerType)p2PSettings.LinkManagerType, (P2PBootstrapperType)p2PSettings.BsType,
154                (P2POverlayType)p2PSettings.OverlayType, (P2PDHTType)p2PSettings.DhtType);
155        }
156
157        /// <summary>
158        /// Initializing is the first step to build a new or access an existing p2p network
159        /// </summary>
160        /// <param name="sUserName">Choose an individual name for the user</param>
161        /// <param name="sWorldName">fundamental: two peers are only in the SAME
162        /// P2P system, when they initialized the SAME WORLD!</param>
163        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
164        /// (tunneling the P2P connection through NATs and Firewalls), you have to
165        /// set this flag to true</param>
166        /// <param name="linkManagerType"></param>
167        /// <param name="bsType"></param>
168        /// <param name="overlayType"></param>
169        /// <param name="dhtType"></param>
170        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
171        {
172            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
173
174            Scheduler scheduler = new STAScheduler("pap");
175
176            switch (linkManagerType)
177            {
178                case P2PLinkManagerType.Snal:
179                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
180                    // NAT-Traversal stuff needs a different Snal-Version
181                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
182
183                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
184                    settings.LoadDefaults();
185                    settings.ConnectInternal = true;
186                    settings.LocalReceivingPort = 0;
187                    settings.UseLocalAddressDetection = false;
188                    settings.AutoReconnect = false;
189                    settings.NoDelay = false;
190                    settings.ReuseAddress = false;
191                    settings.UseNetworkMonitorServer = true;
192
193                    this.linkmanager.Settings = settings;
194                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
195
196                    break;
197                default:
198                    throw (new NotImplementedException());
199            }
200            switch (bsType)
201            {
202                case P2PBootstrapperType.LocalMachineBootstrapper:
203                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
204                    this.bootstrapper = new LocalMachineBootstrapper();
205                    break;
206                case P2PBootstrapperType.IrcBootstrapper:
207                    // setup nat traversal stuff
208                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
209                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
210                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
211                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
212
213                    this.bootstrapper = new IrcBootstrapper(scheduler);
214                    break;
215                default:
216                    throw (new NotImplementedException());
217            }
218            switch (overlayType)
219            {
220                case P2POverlayType.FullMeshOverlay:
221                    // changing overlay example: this.overlay = new ChordOverlay();
222                    this.overlay = new FullMeshOverlay(scheduler);
223                    break;
224                default:
225                    throw (new NotImplementedException());
226            }
227            switch (dhtType)
228            {
229                case P2PDHTType.FullMeshDHT:
230                    this.dht = new FullMeshDHT(scheduler);
231                    break;
232                default:
233                    throw (new NotImplementedException());
234            }
235            #endregion
236
237            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
238            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
239            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
240
241            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
242        }
243
244        /// <summary>
245        /// Starts the P2P System. When the given P2P world doesn't exist yet,
246        /// inclusive creating the and bootstrapping to the P2P network.
247        /// In either case joining the P2P world.
248        /// This synchronized method returns true not before the peer has
249        /// successfully joined the network (this may take one or two minutes).
250        /// </summary>
251        /// <returns>True, if the peer has completely joined the p2p network</returns>
252        public bool SynchStart()
253        {
254            //Start != system joined
255            //Only starts the system asynchronous, the possible callback is useless,
256            //because it's invoked before the peer completly joined the P2P system
257            this.dht.BeginStart(null);
258            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
259            this.systemJoined.WaitOne();
260            return true;
261        }
262
263        /// <summary>
264        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
265        /// </summary>
266        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
267        public bool SynchStop()
268        {
269            if (this.dht != null)
270            {
271                this.dht.BeginStop(null);
272                //don't stop anything else, because BOOM
273
274                //wait till systemLeft Event is invoked
275                this.systemLeft.WaitOne();
276            }
277            return true;
278        }
279
280        /// <summary>
281        /// Asynchronously starting the peer. When the given P2P world doesn't
282        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
283        /// In either case joining the P2P world. To ensure that peer has successfully
284        /// joined the p2p world, catch the event OnSystemJoined.
285        /// </summary>
286        public void AsynchStart()
287        {
288            // no callback usefull, because starting and joining isn't the same
289            // everything else is done by the EventHandler OnDHT_SystemJoined
290            this.dht.BeginStart(null);
291        }
292
293        /// <summary>
294        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
295        /// disjoining, catch the event OnDHT_SystemLeft.
296        /// </summary>
297        public void AsynchStop()
298        {
299            if (this.dht != null)
300            {
301                // no callback usefull.
302                // Everything else is done by the EventHandler OnDHT_SystemLeft
303                this.dht.BeginStop(null);
304            }
305        }
306
307        #endregion
308
309        /// <summary>
310        /// Get PeerName of the actual peer
311        /// </summary>
312        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
313        /// <returns>PeerID as a String</returns>
314        public PeerId GetPeerID(out string sPeerName)
315        {
316            sPeerName = this.linkmanager.UserName;
317            return new PeerId(this.overlay.LocalAddress);
318        }
319
320        /// <summary>
321        /// Construct PeerId object for a specific byte[] id
322        /// </summary>
323        /// <param name="byteId">overlay address as byte array</param>
324        /// <returns>corresponding PeerId for given byte[] id</returns>
325        public PeerId GetPeerID(byte[] byteId)
326        {
327            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
328            return new PeerId(this.overlay.GetAddress(byteId));
329        }
330
331        // overlay.LocalAddress = Overlay-Peer-Address/Names
332        public void SendToPeer(byte[] data, byte[] destinationPeer)
333        {
334            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
335            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
336            ByteStack stackData = new ByteStack(realStackSize);
337
338            stackData.Push(data);
339
340            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
341            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
342                this.overlay.LocalAddress, destinationAddr, stackData);
343            this.overlay.Send(overlayMsg);
344        }
345
346        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
347        {
348            if (OnP2PMessageReceived != null)
349            {
350                PeerId pid = new PeerId(e.Message.Source);
351                /* You have to fire this event asynchronous, because the main
352                 * thread will be stopped in this wrapper class for synchronizing
353                 * the asynchronous stuff (AutoResetEvent) --> so this could run
354                 * into a deadlock, when you fire this event synchronous (normal Invoke)
355                 * ATTENTION: This could change the invocation order!!! In my case
356                              no problem, but maybe in future cases... */
357
358                // TODO: not safe: The delegate must have only one target
359                //OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
360
361                foreach (Delegate del in OnP2PMessageReceived.GetInvocationList())
362                {
363                    del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
364                }
365
366                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
367            }
368        }
369
370        #region Event Handling (System Joined, Left and Message Received)
371
372        private void OnDHT_SystemJoined(object sender, EventArgs e)
373        {
374            if (OnSystemJoined != null)
375                OnSystemJoined();
376            this.systemJoined.Set();
377            Started = true;
378        }
379
380        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
381        {
382            if (OnSystemLeft != null)
383                OnSystemLeft();
384            // as an experiment
385            this.dht = null;
386            this.systemLeft.Set();
387            Started = false;
388
389            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
390        }
391
392        #endregion
393
394        /* Attention: The asynchronous methods are not tested at the moment */
395        #region Asynchronous Methods incl. Callbacks
396
397        /// <summary>
398        /// Asynchronously retrieving a key from the DHT. To get value, catch
399        /// event OnDhtLoad_Completed.
400        /// </summary>
401        /// <param name="sKey">Existing key in DHT</param>
402        public void AsynchRetrieve(string sKey)
403        {
404            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
405        }
406        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
407        {
408            if (OnDhtLoad_Completed != null)
409            {
410                OnDhtLoad_Completed(rr.Data);
411            }
412        }
413
414        /// <summary>
415        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
416        /// storing is completed, catch event OnDhtStore_Completed.
417        /// </summary>
418        /// <param name="sKey"></param>
419        /// <param name="sValue"></param>
420        public void AsynchStore(string sKey, string sValue)
421        {
422            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
423            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
424        }
425
426        private void OnAsynchStore_Completed(StoreResult sr)
427        {
428            if (OnDhtStore_Completed != null)
429            {
430                if (sr.Status == OperationStatus.Success)
431                    OnDhtStore_Completed(true);
432                else
433                    OnDhtStore_Completed(false);
434            }
435
436        }
437
438        /// <summary>
439        /// Asynchronously removing an existing key out of the DHT. To ensure
440        /// that removing is completed, catch event OnDhtRemove_Completed.
441        /// </summary>
442        /// <param name="sKey"></param>
443        public void AsynchRemove(string sKey)
444        {
445            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
446            this.dht.Remove(OnAsynchRemove_Completed, sKey);
447        }
448        private void OnAsynchRemove_Completed(RemoveResult rr)
449        {
450            if (OnDhtRemove_Completed != null)
451            {
452                if (rr.Status == OperationStatus.Success)
453                    OnDhtRemove_Completed(true);
454                else
455                    OnDhtRemove_Completed(false);
456            }
457        }
458
459        #endregion
460
461        #region Synchronous Methods incl. Callbacks
462
463        #region SynchStore incl.Callback and SecondTrialCallback
464
465        /// <summary>
466        /// Stores a value in the DHT at the given key
467        /// </summary>
468        /// <param name="sKey">Key of DHT Entry</param>
469        /// <param name="byteData">Value of DHT Entry</param>
470        /// <returns>True, when storing is completed!</returns>
471        public bool SynchStore(string sKey, byte[] byteData)
472        {
473            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
474            AutoResetEvent are = new AutoResetEvent(false);
475            // this method returns always a GUID to distinguish between asynchronous actions
476            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
477
478            ResponseWait rw = new ResponseWait() { WaitHandle = are, key = sKey, value = byteData };
479
480            waitDict.Add(g, rw);
481            //blocking till response
482            are.WaitOne();
483
484            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
485
486            return rw.success;
487        }
488
489        /// <summary>
490        /// Stores a value in the DHT at the given key
491        /// </summary>
492        /// <param name="sKey">Key of DHT Entry</param>
493        /// <param name="sValue">Value of DHT Entry</param>
494        /// <returns>True, when storing is completed!</returns>
495        public bool SynchStore(string sKey, string sData)
496        {
497            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
498        }
499        /// <summary>
500        /// Callback for a the synchronized store method
501        /// </summary>
502        /// <param name="rr"></param>
503        private void OnSynchStoreCompleted(StoreResult sr)
504        {
505            ResponseWait rw;
506            if (this.waitDict.TryGetValue(sr.Guid, out rw))
507            {
508                // if Status == Error, than the version of the value is out of date.
509                // There is a versioning system in the DHT. So you must retrieve the
510                // key and than store the new value
511                if (sr.Status == OperationStatus.Failure)
512                {
513                    byte[] byteTemp = this.SynchRetrieve(rw.key);
514
515                    // Only try a second time. When it's still not possible, abort storing
516                    AutoResetEvent are = new AutoResetEvent(false);
517                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
518                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
519
520                    waitDict.Add(g, rw2);
521                    // blocking till response
522                    are.WaitOne();
523                    rw.success = rw2.success;
524                    rw.Message = rw2.Message;
525                }
526                else
527                {
528                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
529                    if (sr.Status == OperationStatus.KeyNotFound)
530                        rw.success = false;
531                    else
532                        rw.success = true;
533                }
534            }
535            //unblock WaitHandle in the synchronous method
536            rw.WaitHandle.Set();
537            // don't know if this accelerates the system...
538            this.waitDict.Remove(sr.Guid);
539        }
540
541        private void OnSecondTrialStoring(StoreResult sr)
542        {
543            ResponseWait rw;
544            if (this.waitDict.TryGetValue(sr.Guid, out rw))
545            {
546                if (sr.Status == OperationStatus.Failure)
547                {
548                    //Abort storing, because it's already the second trial
549                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
550                    rw.success = false;
551                }
552                else
553                {
554                    //works the second trial, so it was the versioning system
555                    rw.success = true;
556                }
557            }
558            //unblock WaitHandle in the synchronous method
559            rw.WaitHandle.Set();
560            // don't know if this accelerates the system...
561            this.waitDict.Remove(sr.Guid);
562        }
563
564        #endregion
565
566        /// <summary>
567        /// Get the value of the given DHT Key or null, if it doesn't exist.
568        /// For synchronous environments use the Synch* methods.
569        /// </summary>
570        /// <param name="sKey">Key of DHT Entry</param>
571        /// <returns>Value of DHT Entry</returns>
572        public byte[] SynchRetrieve(string sKey)
573        {
574            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
575
576            AutoResetEvent are = new AutoResetEvent(false);
577            // this method returns always a GUID to distinguish between asynchronous actions
578
579            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
580
581            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
582
583            ResponseWait rw = new ResponseWait() { WaitHandle = are };
584
585            waitDict.Add(g, rw);
586            // blocking till response
587            are.WaitOne();
588
589            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
590
591            //Rückgabe der Daten
592            return rw.Message;
593        }
594
595        /// <summary>
596        /// Callback for a the synchronized retrieval method
597        /// </summary>
598        /// <param name="rr"></param>
599        private void OnSynchRetrieveCompleted(RetrieveResult rr)
600        {
601            LogToMonitor(rr.Guid.ToString());
602
603            ResponseWait rw;
604
605            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
606
607            if (this.waitDict.TryGetValue(rr.Guid, out rw))
608            {
609                // successful as long as no error occured
610                rw.success = true;
611                if (rr.Status == OperationStatus.Failure)
612                {
613                    rw.Message = null;
614                    rw.success = false;
615                }
616                else if (rr.Status == OperationStatus.KeyNotFound)
617                    rw.Message = null;
618                else
619                    rw.Message = rr.Data;
620
621                //unblock WaitHandle in the synchronous method
622                rw.WaitHandle.Set();
623                // don't know if this accelerates the system...
624                this.waitDict.Remove(rr.Guid);
625            }
626        }
627        /// <summary>
628        /// Removes a key/value pair out of the DHT
629        /// </summary>
630        /// <param name="sKey">Key of the DHT Entry</param>
631        /// <returns>True, when removing is completed!</returns>
632        public bool SynchRemove(string sKey)
633        {
634            LogToMonitor("Begin SynchRemove. Key: " + sKey);
635
636            AutoResetEvent are = new AutoResetEvent(false);
637            // this method returns always a GUID to distinguish between asynchronous actions
638            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
639            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
640
641            ResponseWait rw = new ResponseWait() { WaitHandle = are };
642
643            waitDict.Add(g, rw);
644            // blocking till response
645            are.WaitOne();
646
647            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
648
649            return rw.success;
650        }
651
652        /// <summary>
653        /// Callback for a the synchronized remove method
654        /// </summary>
655        /// <param name="rr"></param>
656        private void OnSynchRemoveCompleted(RemoveResult rr)
657        {
658            ResponseWait rw;
659            if (this.waitDict.TryGetValue(rr.Guid, out rw))
660            {
661                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
662
663                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
664                    rw.success = false;
665                else
666                    rw.success = true;
667
668                //unblock WaitHandle in the synchronous method
669                rw.WaitHandle.Set();
670                // don't know if this accelerates the system...
671                this.waitDict.Remove(rr.Guid);
672            }
673        }
674
675        #endregion
676
677        /// <summary>
678        /// To log the internal state in the Monitoring Software of P@play
679        /// </summary>
680        public void LogInternalState()
681        {
682            if (this.dht != null)
683            {
684                this.dht.LogInternalState();
685            }
686        }
687
688        public void LogToMonitor(string sTextToLog)
689        {
690            if (AllowLoggingToMonitor)
691                Log.Debug(sTextToLog);
692        }
693
694    }
695}
Note: See TracBrowser for help on using the repository browser.