source: trunk/CrypP2P/Internal/P2PBase.cs @ 1477

Last change on this file since 1477 was 1477, checked in by Paul Lelgemann, 12 years ago

+ Added early version of P2P editor interface

override-bad-extension: CrypWin.exe
override-bad-extension: P2PEditor.dll

File size: 27.8 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Text;
19using PeersAtPlay.P2PStorage.DHT;
20using PeersAtPlay.P2PStorage.FullMeshDHT;
21using PeersAtPlay.P2POverlay.Bootstrapper;
22using PeersAtPlay.P2POverlay;
23using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
24using PeersAtPlay.P2POverlay.FullMeshOverlay;
25using PeersAtPlay.P2PLink;
26using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
27using System.Threading;
28using PeersAtPlay;
29using PeersAtPlay.Util.Logging;
30using Gears4Net;
31using Cryptool.Plugins.PeerToPeer.Internal;
32using PeersAtPlay.Util.Threading;
33
34/* TODO:
35 * - Catch errors, which can occur when using the DHT (network-based errors)
36 */
37
38/* - Synchronous functions successfully tested (store, retrieve)
39 * - The DHT has an integrated versioning system. When a peer wants
40 *   to store data in an entry, which already holds data, the version
41 *   number will be compared with the peers' version number. If the
42 *   peer hasn't read/write the entry the last time, the storing instruction
43 *   will be rejected. You must first read the actual data and than you can
44 *   store your data in this entry...
45 *
46 * INFO:
47 * - Have considered the DHT-own versioning system in the SynchStore method.
48 *   If this versioning system will be abolished, the SynchStore method must
49 *   be change!
50 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
51 *   certification stuff runs now
52 *
53 * TODO:
54 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
55 * - Testing asynchronous methods incl. EventHandlers
56 */
57namespace Cryptool.P2P.Internal
58{
59    /* Advantages of this wrapper class:
60     * - The PeerAtPlay-Libraries are only referenced in this project
61     *   --> so they're easy to update
62     * - PeerAtPlay only works with asynchronous methods, so this class
63     *   "synchronizes" this methods.
64     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
65     *   issue is obfuscated by this wrapper class
66     */
67    /// <summary>
68    /// Wrapper class to integrate peer@play environment into CrypTool.
69    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
70    /// </summary>
71    public class P2PBase
72    {
73        #region Delegates and Events for asynchronous p2p functions
74
75        public delegate void SystemJoined();
76        public event SystemJoined OnSystemJoined;
77
78        public delegate void SystemLeft();
79        public event SystemLeft OnSystemLeft;
80
81        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
82        public event P2PMessageReceived OnP2PMessageReceived;
83
84        /// <summary>
85        /// returns true if key-value-pair is successfully stored in the DHT
86        /// </summary>
87        /// <param name="result"></param>
88        public delegate void DHTStoreCompleted(bool result);
89        public event DHTStoreCompleted OnDhtStore_Completed;
90
91        public delegate void DHTLoadCompleted(byte[] loadedData);
92        public event DHTLoadCompleted OnDhtLoad_Completed;
93
94        /// <summary>
95        /// returns true if key was found and removed successfully from the DHT
96        /// </summary>
97        /// <param name="result"></param>
98        public delegate void DHTRemoveCompleted(bool result);
99        public event DHTRemoveCompleted OnDhtRemove_Completed;
100
101        #endregion
102
103        #region Variables
104
105        private bool allowLoggingToMonitor;
106        /// <summary>
107        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
108        /// </summary>
109        public bool AllowLoggingToMonitor
110        {
111            get { return this.allowLoggingToMonitor; }
112            set { this.allowLoggingToMonitor = value; }
113        }
114
115        private const bool ALLOW_LOGGING_TO_MONITOR = true;
116
117        private bool started = false;
118        /// <summary>
119        /// True if system was successfully joined, false if system is COMPLETELY left
120        /// </summary>
121        public bool Started
122        {
123            get { return this.started; }
124            private set { this.started = value; }
125        }
126
127        private IDHT dht;
128        private IP2PLinkManager linkmanager;
129        private IBootstrapper bootstrapper;
130        private P2POverlay overlay;
131        private AutoResetEvent systemJoined;
132        private AutoResetEvent systemLeft;
133
134        /// <summary>
135        /// Dictionary for synchronizing asynchronous DHT retrieves.
136        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
137        /// </summary>
138        private Dictionary<Guid, ResponseWait> waitDict;
139
140        #endregion
141
142        public P2PBase()
143        {
144            this.waitDict = new Dictionary<Guid, ResponseWait>();
145            this.systemJoined = new AutoResetEvent(false);
146            this.systemLeft = new AutoResetEvent(false);
147        }
148
149        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
150
151        public void Initialize(P2PSettings p2PSettings)
152        {
153            Initialize(p2PSettings.PeerName, p2PSettings.WorldName,
154                (P2PLinkManagerType)p2PSettings.LinkManagerType, (P2PBootstrapperType)p2PSettings.BsType,
155                (P2POverlayType)p2PSettings.OverlayType, (P2PDHTType)p2PSettings.DhtType);
156        }
157
158        /// <summary>
159        /// Initializing is the first step to build a new or access an existing p2p network
160        /// </summary>
161        /// <param name="sUserName">Choose an individual name for the user</param>
162        /// <param name="sWorldName">fundamental: two peers are only in the SAME
163        /// P2P system, when they initialized the SAME WORLD!</param>
164        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
165        /// (tunneling the P2P connection through NATs and Firewalls), you have to
166        /// set this flag to true</param>
167        /// <param name="linkManagerType"></param>
168        /// <param name="bsType"></param>
169        /// <param name="overlayType"></param>
170        /// <param name="dhtType"></param>
171        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
172        {
173            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
174
175            Scheduler scheduler = new STAScheduler("pap");
176
177            switch (linkManagerType)
178            {
179                case P2PLinkManagerType.Snal:
180                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
181                    // NAT-Traversal stuff needs a different Snal-Version
182                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
183
184                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
185                    settings.LoadDefaults();
186                    settings.ConnectInternal = true;
187                    settings.LocalReceivingPort = 0;
188                    settings.UseLocalAddressDetection = false;
189                    settings.AutoReconnect = false;
190                    settings.NoDelay = false;
191                    settings.ReuseAddress = false;
192                    settings.UseNetworkMonitorServer = true;
193
194                    this.linkmanager.Settings = settings;
195                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
196
197                    break;
198                default:
199                    throw (new NotImplementedException());
200            }
201            switch (bsType)
202            {
203                case P2PBootstrapperType.LocalMachineBootstrapper:
204                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
205                    this.bootstrapper = new LocalMachineBootstrapper();
206                    break;
207                case P2PBootstrapperType.IrcBootstrapper:
208                    // setup nat traversal stuff
209                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
210                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
211                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
212                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
213
214                    this.bootstrapper = new IrcBootstrapper(scheduler);
215                    break;
216                default:
217                    throw (new NotImplementedException());
218            }
219            switch (overlayType)
220            {
221                case P2POverlayType.FullMeshOverlay:
222                    // changing overlay example: this.overlay = new ChordOverlay();
223                    this.overlay = new FullMeshOverlay(scheduler);
224                    break;
225                default:
226                    throw (new NotImplementedException());
227            }
228            switch (dhtType)
229            {
230                case P2PDHTType.FullMeshDHT:
231                    this.dht = new FullMeshDHT(scheduler);
232                    break;
233                default:
234                    throw (new NotImplementedException());
235            }
236            #endregion
237
238            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
239            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
240            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
241
242            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
243        }
244
245        /// <summary>
246        /// Starts the P2P System. When the given P2P world doesn't exist yet,
247        /// inclusive creating the and bootstrapping to the P2P network.
248        /// In either case joining the P2P world.
249        /// This synchronized method returns true not before the peer has
250        /// successfully joined the network (this may take one or two minutes).
251        /// </summary>
252        /// <returns>True, if the peer has completely joined the p2p network</returns>
253        public bool SynchStart()
254        {
255            //Start != system joined
256            //Only starts the system asynchronous, the possible callback is useless,
257            //because it's invoked before the peer completly joined the P2P system
258            this.dht.BeginStart(null);
259            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
260            this.systemJoined.WaitOne();
261            return true;
262        }
263
264        /// <summary>
265        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
266        /// </summary>
267        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
268        public bool SynchStop()
269        {
270            if (this.dht != null)
271            {
272                this.dht.BeginStop(null);
273                //don't stop anything else, because BOOM
274
275                //wait till systemLeft Event is invoked
276                this.systemLeft.WaitOne();
277            }
278            return true;
279        }
280
281        /// <summary>
282        /// Asynchronously starting the peer. When the given P2P world doesn't
283        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
284        /// In either case joining the P2P world. To ensure that peer has successfully
285        /// joined the p2p world, catch the event OnSystemJoined.
286        /// </summary>
287        public void AsynchStart()
288        {
289            // no callback usefull, because starting and joining isn't the same
290            // everything else is done by the EventHandler OnDHT_SystemJoined
291            this.dht.BeginStart(null);
292        }
293
294        /// <summary>
295        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
296        /// disjoining, catch the event OnDHT_SystemLeft.
297        /// </summary>
298        public void AsynchStop()
299        {
300            if (this.dht != null)
301            {
302                // no callback usefull.
303                // Everything else is done by the EventHandler OnDHT_SystemLeft
304                this.dht.BeginStop(null);
305            }
306        }
307
308        #endregion
309
310        /// <summary>
311        /// Get PeerName of the actual peer
312        /// </summary>
313        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
314        /// <returns>PeerID as a String</returns>
315        public PeerId GetPeerID(out string sPeerName)
316        {
317            sPeerName = this.linkmanager.UserName;
318            return new PeerId(this.overlay.LocalAddress);
319        }
320
321        /// <summary>
322        /// Construct PeerId object for a specific byte[] id
323        /// </summary>
324        /// <param name="byteId">overlay address as byte array</param>
325        /// <returns>corresponding PeerId for given byte[] id</returns>
326        public PeerId GetPeerID(byte[] byteId)
327        {
328            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
329            return new PeerId(this.overlay.GetAddress(byteId));
330        }
331
332        // overlay.LocalAddress = Overlay-Peer-Address/Names
333        public void SendToPeer(byte[] data, byte[] destinationPeer)
334        {
335            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
336            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
337            ByteStack stackData = new ByteStack(realStackSize);
338
339            stackData.Push(data);
340
341            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
342            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
343                this.overlay.LocalAddress, destinationAddr, stackData);
344            this.overlay.Send(overlayMsg);
345        }
346
347        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
348        {
349            if (OnP2PMessageReceived != null)
350            {
351                PeerId pid = new PeerId(e.Message.Source);
352                /* You have to fire this event asynchronous, because the main
353                 * thread will be stopped in this wrapper class for synchronizing
354                 * the asynchronous stuff (AutoResetEvent) --> so this could run
355                 * into a deadlock, when you fire this event synchronous (normal Invoke)
356                 * ATTENTION: This could change the invocation order!!! In my case
357                              no problem, but maybe in future cases... */
358
359                // TODO: not safe: The delegate must have only one target
360                //OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
361
362                foreach (Delegate del in OnP2PMessageReceived.GetInvocationList())
363                {
364                    del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
365                }
366
367                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
368            }
369        }
370
371        #region Event Handling (System Joined, Left and Message Received)
372
373        private void OnDHT_SystemJoined(object sender, EventArgs e)
374        {
375            if (OnSystemJoined != null)
376                OnSystemJoined();
377            this.systemJoined.Set();
378            Started = true;
379        }
380
381        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
382        {
383            if (OnSystemLeft != null)
384                OnSystemLeft();
385            // as an experiment
386            this.dht = null;
387            this.systemLeft.Set();
388            Started = false;
389
390            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
391        }
392
393        #endregion
394
395        /* Attention: The asynchronous methods are not tested at the moment */
396        #region Asynchronous Methods incl. Callbacks
397
398        /// <summary>
399        /// Asynchronously retrieving a key from the DHT. To get value, catch
400        /// event OnDhtLoad_Completed.
401        /// </summary>
402        /// <param name="sKey">Existing key in DHT</param>
403        public void AsynchRetrieve(string sKey)
404        {
405            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
406        }
407        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
408        {
409            if (OnDhtLoad_Completed != null)
410            {
411                OnDhtLoad_Completed(rr.Data);
412            }
413        }
414
415        /// <summary>
416        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
417        /// storing is completed, catch event OnDhtStore_Completed.
418        /// </summary>
419        /// <param name="sKey"></param>
420        /// <param name="sValue"></param>
421        public void AsynchStore(string sKey, string sValue)
422        {
423            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
424            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
425        }
426
427        private void OnAsynchStore_Completed(StoreResult sr)
428        {
429            if (OnDhtStore_Completed != null)
430            {
431                if (sr.Status == OperationStatus.Success)
432                    OnDhtStore_Completed(true);
433                else
434                    OnDhtStore_Completed(false);
435            }
436
437        }
438
439        /// <summary>
440        /// Asynchronously removing an existing key out of the DHT. To ensure
441        /// that removing is completed, catch event OnDhtRemove_Completed.
442        /// </summary>
443        /// <param name="sKey"></param>
444        public void AsynchRemove(string sKey)
445        {
446            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
447            this.dht.Remove(OnAsynchRemove_Completed, sKey);
448        }
449        private void OnAsynchRemove_Completed(RemoveResult rr)
450        {
451            if (OnDhtRemove_Completed != null)
452            {
453                if (rr.Status == OperationStatus.Success)
454                    OnDhtRemove_Completed(true);
455                else
456                    OnDhtRemove_Completed(false);
457            }
458        }
459
460        #endregion
461
462        #region Synchronous Methods incl. Callbacks
463
464        #region SynchStore incl.Callback and SecondTrialCallback
465
466        /// <summary>
467        /// Stores a value in the DHT at the given key
468        /// </summary>
469        /// <param name="sKey">Key of DHT Entry</param>
470        /// <param name="byteData">Value of DHT Entry</param>
471        /// <returns>True, when storing is completed!</returns>
472        public bool SynchStore(string sKey, byte[] byteData)
473        {
474            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", " + byteData.Length + " bytes of data: " + Encoding.UTF8.GetString(byteData) + " [" + byteData.Length + " bytes]");
475            AutoResetEvent are = new AutoResetEvent(false);
476            // this method returns always a GUID to distinguish between asynchronous actions
477            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
478
479            ResponseWait rw = new ResponseWait() { WaitHandle = are, key = sKey, value = byteData };
480
481            waitDict.Add(g, rw);
482            //blocking till response
483            are.WaitOne();
484
485            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
486
487            return rw.success;
488        }
489
490        /// <summary>
491        /// Stores a value in the DHT at the given key
492        /// </summary>
493        /// <param name="sKey">Key of DHT Entry</param>
494        /// <param name="sValue">Value of DHT Entry</param>
495        /// <returns>True, when storing is completed!</returns>
496        public bool SynchStore(string sKey, string sData)
497        {
498            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
499        }
500        /// <summary>
501        /// Callback for a the synchronized store method
502        /// </summary>
503        /// <param name="rr"></param>
504        private void OnSynchStoreCompleted(StoreResult sr)
505        {
506            ResponseWait rw;
507            if (this.waitDict.TryGetValue(sr.Guid, out rw))
508            {
509                // if Status == Error, than the version of the value is out of date.
510                // There is a versioning system in the DHT. So you must retrieve the
511                // key and than store the new value
512                if (sr.Status == OperationStatus.Failure)
513                {
514                    byte[] byteTemp = this.SynchRetrieve(rw.key);
515
516                    // Only try a second time. When it's still not possible, abort storing
517                    AutoResetEvent are = new AutoResetEvent(false);
518                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
519                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
520
521                    waitDict.Add(g, rw2);
522                    // blocking till response
523                    are.WaitOne();
524                    rw.success = rw2.success;
525                    rw.Message = rw2.Message;
526                }
527                else
528                {
529                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
530                    if (sr.Status == OperationStatus.KeyNotFound)
531                        rw.success = false;
532                    else
533                        rw.success = true;
534                }
535            }
536            //unblock WaitHandle in the synchronous method
537            rw.WaitHandle.Set();
538            // don't know if this accelerates the system...
539            this.waitDict.Remove(sr.Guid);
540        }
541
542        private void OnSecondTrialStoring(StoreResult sr)
543        {
544            ResponseWait rw;
545            if (this.waitDict.TryGetValue(sr.Guid, out rw))
546            {
547                if (sr.Status == OperationStatus.Failure)
548                {
549                    //Abort storing, because it's already the second trial
550                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
551                    rw.success = false;
552                }
553                else
554                {
555                    //works the second trial, so it was the versioning system
556                    rw.success = true;
557                }
558            }
559            //unblock WaitHandle in the synchronous method
560            rw.WaitHandle.Set();
561            // don't know if this accelerates the system...
562            this.waitDict.Remove(sr.Guid);
563        }
564
565        #endregion
566
567        /// <summary>
568        /// Get the value of the given DHT Key or null, if it doesn't exist.
569        /// For synchronous environments use the Synch* methods.
570        /// </summary>
571        /// <param name="sKey">Key of DHT Entry</param>
572        /// <returns>Value of DHT Entry</returns>
573        public byte[] SynchRetrieve(string sKey)
574        {
575            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
576
577            AutoResetEvent are = new AutoResetEvent(false);
578            // this method returns always a GUID to distinguish between asynchronous actions
579
580            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
581
582            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
583
584            ResponseWait rw = new ResponseWait() { WaitHandle = are };
585
586            waitDict.Add(g, rw);
587            // blocking till response
588            are.WaitOne();
589
590            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
591
592            //Rückgabe der Daten
593            return rw.Message;
594        }
595
596        /// <summary>
597        /// Callback for a the synchronized retrieval method
598        /// </summary>
599        /// <param name="rr"></param>
600        private void OnSynchRetrieveCompleted(RetrieveResult rr)
601        {
602            LogToMonitor(rr.Guid.ToString());
603
604            ResponseWait rw;
605
606            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
607
608            if (this.waitDict.TryGetValue(rr.Guid, out rw))
609            {
610                // successful as long as no error occured
611                rw.success = true;
612                if (rr.Status == OperationStatus.Failure)
613                {
614                    rw.Message = null;
615                    rw.success = false;
616                }
617                else if (rr.Status == OperationStatus.KeyNotFound)
618                    rw.Message = null;
619                else
620                    rw.Message = rr.Data;
621
622                //unblock WaitHandle in the synchronous method
623                rw.WaitHandle.Set();
624                // don't know if this accelerates the system...
625                this.waitDict.Remove(rr.Guid);
626            }
627        }
628        /// <summary>
629        /// Removes a key/value pair out of the DHT
630        /// </summary>
631        /// <param name="sKey">Key of the DHT Entry</param>
632        /// <returns>True, when removing is completed!</returns>
633        public bool SynchRemove(string sKey)
634        {
635            LogToMonitor("Begin SynchRemove. Key: " + sKey);
636
637            AutoResetEvent are = new AutoResetEvent(false);
638            // this method returns always a GUID to distinguish between asynchronous actions
639            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
640            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
641
642            ResponseWait rw = new ResponseWait() { WaitHandle = are };
643
644            waitDict.Add(g, rw);
645            // blocking till response
646            are.WaitOne();
647
648            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
649
650            return rw.success;
651        }
652
653        /// <summary>
654        /// Callback for a the synchronized remove method
655        /// </summary>
656        /// <param name="rr"></param>
657        private void OnSynchRemoveCompleted(RemoveResult rr)
658        {
659            ResponseWait rw;
660            if (this.waitDict.TryGetValue(rr.Guid, out rw))
661            {
662                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
663
664                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
665                    rw.success = false;
666                else
667                    rw.success = true;
668
669                //unblock WaitHandle in the synchronous method
670                rw.WaitHandle.Set();
671                // don't know if this accelerates the system...
672                this.waitDict.Remove(rr.Guid);
673            }
674        }
675
676        #endregion
677
678        /// <summary>
679        /// To log the internal state in the Monitoring Software of P@play
680        /// </summary>
681        public void LogInternalState()
682        {
683            if (this.dht != null)
684            {
685                this.dht.LogInternalState();
686            }
687        }
688
689        public void LogToMonitor(string sTextToLog)
690        {
691            if (AllowLoggingToMonitor)
692                Log.Debug(sTextToLog);
693        }
694
695    }
696}
Note: See TracBrowser for help on using the repository browser.