source: trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs @ 1206

Last change on this file since 1206 was 1206, checked in by Arno Wacker, 12 years ago

P2PBase:

  • Added some P2P settings
  • Minor performance improvement for P2PBase (using only a single scheduler)
File size: 30.0 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2POverlay.Bootstrapper;
23using PeersAtPlay.P2POverlay;
24using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
25using PeersAtPlay.P2POverlay.FullMeshOverlay;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
28using System.Threading;
29using Cryptool.PluginBase.Control;
30using System.ComponentModel;
31using PeersAtPlay;
32using PeersAtPlay.Util.Logging;
33using Gears4Net;
34
35/* TODO:
36 * - Catch errors, which can occur when using the DHT (network-based errors)
37 */
38
39/* - Synchronous functions successfully tested (store, retrieve)
40 * - The DHT has an integrated versioning system. When a peer wants
41 *   to store data in an entry, which already holds data, the version
42 *   number will be compared with the peers' version number. If the
43 *   peer hasn't read/write the entry the last time, the storing instruction
44 *   will be rejected. You must first read the actual data and than you can
45 *   store your data in this entry...
46 *
47 * INFO:
48 * - Have considered the DHT-own versioning system in the SynchStore method.
49 *   If this versioning system will be abolished, the SynchStore method must
50 *   be change!
51 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
52 *   certification stuff runs now
53 *
54 * TODO:
55 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
56 * - Testing asynchronous methods incl. EventHandlers
57 */
58namespace Cryptool.Plugins.PeerToPeer
59{
60    /* Advantages of this wrapper class:
61     * - The PeerAtPlay-Libraries are only referenced in this project
62     *   --> so they're easy to update
63     * - PeerAtPlay only works with asynchronous methods, so this class
64     *   "synchronizes" this methods.
65     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
66     *   issue is obfuscated by this wrapper class
67     */
68    /// <summary>
69    /// Wrapper class to integrate peer@play environment into CrypTool.
70    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
71    /// </summary>
72    public class P2PBase
73    {
74        #region Delegates and Events for asynchronous p2p functions
75
76        public delegate void SystemJoined();
77        public event SystemJoined OnSystemJoined;
78
79        public delegate void SystemLeft();
80        public event SystemLeft OnSystemLeft;
81
82        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
83        public event P2PMessageReceived OnP2PMessageReceived;
84
85        /// <summary>
86        /// returns true if key-value-pair is successfully stored in the DHT
87        /// </summary>
88        /// <param name="result"></param>
89        public delegate void DHTStoreCompleted(bool result);
90        public event DHTStoreCompleted OnDhtStore_Completed;
91
92        public delegate void DHTLoadCompleted(byte[] loadedData);
93        public event DHTLoadCompleted OnDhtLoad_Completed;
94
95        /// <summary>
96        /// returns true if key was found and removed successfully from the DHT
97        /// </summary>
98        /// <param name="result"></param>
99        public delegate void DHTRemoveCompleted(bool result);
100        public event DHTRemoveCompleted OnDhtRemove_Completed;
101
102        #endregion
103
104        #region Variables
105
106        private bool allowLoggingToMonitor;
107        /// <summary>
108        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
109        /// </summary>
110        public bool AllowLoggingToMonitor
111        {
112            get { return this.allowLoggingToMonitor; }
113            set { this.allowLoggingToMonitor = value; }
114        }
115
116        private const bool ALLOW_LOGGING_TO_MONITOR = true;
117
118        private bool started = false;
119        /// <summary>
120        /// True if system was successfully joined, false if system is COMPLETELY left
121        /// </summary>
122        public bool Started
123        {
124            get { return this.started; }
125            private set { this.started = value; } 
126        }
127
128        private IDHT dht;
129        private IP2PLinkManager linkmanager;
130        private IBootstrapper bootstrapper;
131        private P2POverlay overlay;
132        private AutoResetEvent systemJoined;
133        private AutoResetEvent systemLeft;
134
135        /// <summary>
136        /// Dictionary for synchronizing asynchronous DHT retrieves.
137        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
138        /// </summary>
139        private Dictionary<Guid, ResponseWait> waitDict;
140
141        #endregion
142
143        public P2PBase()
144        {
145            this.waitDict = new Dictionary<Guid, ResponseWait>();
146            this.systemJoined = new AutoResetEvent(false);
147            this.systemLeft = new AutoResetEvent(false);
148        }
149
150        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
151
152        /// <summary>
153        /// Initializing is the first step to build a new or access an existing p2p network
154        /// </summary>
155        /// <param name="sUserName">Choose an individual name for the user</param>
156        /// <param name="sWorldName">fundamental: two peers are only in the SAME
157        /// P2P system, when they initialized the SAME WORLD!</param>
158        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
159        /// (tunneling the P2P connection through NATs and Firewalls), you have to
160        /// set this flag to true</param>
161        /// <param name="linkManagerType"></param>
162        /// <param name="bsType"></param>
163        /// <param name="overlayType"></param>
164        /// <param name="dhtType"></param>
165        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
166        {
167            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
168
169            Scheduler scheduler = new STAScheduler("pap");
170
171            switch (linkManagerType)
172            {
173                case P2PLinkManagerType.Snal:
174                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
175                    // NAT-Traversal stuff needs a different Snal-Version
176                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
177                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.ConnectInternal = true;
178                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.LocalReceivingPort = 0;
179                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.UseLocalAddressDetection = false;
180                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.AutoReconnect = false;
181                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.NoDelay = false;
182                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.ReuseAddress = false;
183                    ((PeersAtPlay.P2PLink.SnalNG.Snal)this.linkmanager).Settings.UseNetworkMonitorServer = true;
184                    break;
185                default:
186                    throw (new NotImplementedException());
187            }
188            switch (bsType)
189            {
190                case P2PBootstrapperType.LocalMachineBootstrapper:
191                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
192                    this.bootstrapper = new LocalMachineBootstrapper();
193                    break;
194                case P2PBootstrapperType.IrcBootstrapper:
195                    // setup nat traversal stuff
196                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
197                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
198                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
199                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
200
201                    this.bootstrapper = new IrcBootstrapper(scheduler);
202                    break;
203                default:
204                    throw (new NotImplementedException());
205            }
206            switch (overlayType)
207            {
208                case P2POverlayType.FullMeshOverlay:
209                    // changing overlay example: this.overlay = new ChordOverlay();
210                    this.overlay = new FullMeshOverlay(scheduler);
211                    break;
212                default:
213                    throw (new NotImplementedException());
214            }
215            switch (dhtType)
216            {
217                case P2PDHTType.FullMeshDHT:
218                    this.dht = new FullMeshDHT(scheduler);
219                    break;
220                default:
221                    throw (new NotImplementedException());
222            }
223            #endregion
224
225            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
226            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
227            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
228
229            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
230        }
231
232        /// <summary>
233        /// Starts the P2P System. When the given P2P world doesn't exist yet,
234        /// inclusive creating the and bootstrapping to the P2P network.
235        /// In either case joining the P2P world.
236        /// This synchronized method returns true not before the peer has
237        /// successfully joined the network (this may take one or two minutes).
238        /// </summary>
239        /// <returns>True, if the peer has completely joined the p2p network</returns>
240        public bool SynchStart()
241        {
242            //Start != system joined
243            //Only starts the system asynchronous, the possible callback is useless,
244            //because it's invoked before the peer completly joined the P2P system
245            this.dht.BeginStart(null);
246            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
247            this.systemJoined.WaitOne();
248            return true;
249        }
250
251        /// <summary>
252        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
253        /// </summary>
254        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
255        public bool SynchStop()
256        {
257            if (this.dht != null)
258            {
259                this.dht.BeginStop(null);
260                //don't stop anything else, because BOOM
261
262                //wait till systemLeft Event is invoked
263                this.systemLeft.WaitOne();
264            }
265            return true;
266        }
267
268        /// <summary>
269        /// Asynchronously starting the peer. When the given P2P world doesn't
270        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
271        /// In either case joining the P2P world. To ensure that peer has successfully
272        /// joined the p2p world, catch the event OnSystemJoined.
273        /// </summary>
274        public void AsynchStart()
275        {
276            // no callback usefull, because starting and joining isn't the same
277            // everything else is done by the EventHandler OnDHT_SystemJoined
278            this.dht.BeginStart(null);
279        }
280
281        /// <summary>
282        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
283        /// disjoining, catch the event OnDHT_SystemLeft.
284        /// </summary>
285        public void AsynchStop()
286        {
287            if (this.dht != null)
288            {
289                // no callback usefull.
290                // Everything else is done by the EventHandler OnDHT_SystemLeft
291                this.dht.BeginStop(null);
292            }
293        }
294
295        #endregion
296
297        /// <summary>
298        /// Get PeerName of the actual peer
299        /// </summary>
300        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
301        /// <returns>PeerID as a String</returns>
302        public PeerId GetPeerID(out string sPeerName)
303        {
304            sPeerName = this.linkmanager.UserName;
305            return new PeerId(this.overlay.LocalAddress);
306        }
307
308        /// <summary>
309        /// Construct PeerId object for a specific byte[] id
310        /// </summary>
311        /// <param name="byteId">overlay address as byte array</param>
312        /// <returns>corresponding PeerId for given byte[] id</returns>
313        public PeerId GetPeerID(byte[] byteId)
314        {
315            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
316            return new PeerId(this.overlay.GetAddress(byteId));
317        }
318
319        // overlay.LocalAddress = Overlay-Peer-Address/Names
320        public void SendToPeer(byte[] data, byte[] destinationPeer)
321        {
322            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
323            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
324            ByteStack stackData = new ByteStack(realStackSize);
325
326            stackData.Push(data);
327
328            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
329            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
330                this.overlay.LocalAddress, destinationAddr, stackData);
331            this.overlay.Send(overlayMsg);
332        }
333
334        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
335        {
336            if (OnP2PMessageReceived != null)
337            {
338                PeerId pid = new PeerId(e.Message.Source);
339                /* You have to fire this event asynchronous, because the main
340                 * thread will be stopped in this wrapper class for synchronizing
341                 * the asynchronous stuff (AutoResetEvent) --> so this could run
342                 * into a deadlock, when you fire this event synchronous (normal Invoke)
343                 * ATTENTION: This could change the invocation order!!! In my case
344                              no problem, but maybe in future cases... */
345                OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize),null,null);
346                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
347            }
348        }
349
350        #region Event Handling (System Joined, Left and Message Received)
351
352        private void OnDHT_SystemJoined(object sender, EventArgs e)
353        {
354            if (OnSystemJoined != null)
355                OnSystemJoined();
356            this.systemJoined.Set();
357            Started = true;
358        }
359
360        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
361        {
362            if (OnSystemLeft != null)
363                OnSystemLeft();
364            // as an experiment
365            this.dht = null;
366            this.systemLeft.Set();
367            Started = false;
368
369            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
370        }
371
372        #endregion
373
374        /* Attention: The asynchronous methods are not tested at the moment */
375        #region Asynchronous Methods incl. Callbacks
376
377        /// <summary>
378        /// Asynchronously retrieving a key from the DHT. To get value, catch
379        /// event OnDhtLoad_Completed.
380        /// </summary>
381        /// <param name="sKey">Existing key in DHT</param>
382        public void AsynchRetrieve(string sKey)
383        {
384            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
385        }
386        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
387        {
388            if (OnDhtLoad_Completed != null)
389            {
390                OnDhtLoad_Completed(rr.Data);
391            }
392        }
393
394        /// <summary>
395        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
396        /// storing is completed, catch event OnDhtStore_Completed.
397        /// </summary>
398        /// <param name="sKey"></param>
399        /// <param name="sValue"></param>
400        public void AsynchStore(string sKey, string sValue)
401        {
402            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
403            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
404        }
405
406        private void OnAsynchStore_Completed(StoreResult sr)
407        {
408            if (OnDhtStore_Completed != null)
409            {
410                if (sr.Status == OperationStatus.Success)
411                    OnDhtStore_Completed(true);
412                else
413                    OnDhtStore_Completed(false);
414            }
415               
416        }
417
418        /// <summary>
419        /// Asynchronously removing an existing key out of the DHT. To ensure
420        /// that removing is completed, catch event OnDhtRemove_Completed.
421        /// </summary>
422        /// <param name="sKey"></param>
423        public void AsynchRemove(string sKey)
424        {
425            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
426            this.dht.Remove(OnAsynchRemove_Completed, sKey);
427        }
428        private void OnAsynchRemove_Completed(RemoveResult rr)
429        {
430            if (OnDhtRemove_Completed != null)
431            {
432                if(rr.Status == OperationStatus.Success)
433                    OnDhtRemove_Completed(true);
434                else
435                    OnDhtRemove_Completed(false);
436            }
437        }
438
439        #endregion
440
441        #region Synchronous Methods incl. Callbacks
442
443        #region SynchStore incl.Callback and SecondTrialCallback
444
445        /// <summary>
446        /// Stores a value in the DHT at the given key
447        /// </summary>
448        /// <param name="sKey">Key of DHT Entry</param>
449        /// <param name="byteData">Value of DHT Entry</param>
450        /// <returns>True, when storing is completed!</returns>
451        public bool SynchStore(string sKey, byte[] byteData)
452        {
453            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
454            AutoResetEvent are = new AutoResetEvent(false);
455            // this method returns always a GUID to distinguish between asynchronous actions
456            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
457
458            ResponseWait rw = new ResponseWait() { WaitHandle = are, key=sKey , value = byteData };
459
460            waitDict.Add(g, rw);
461            //blocking till response
462            are.WaitOne();
463
464            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
465
466            return rw.success;
467        }
468
469        /// <summary>
470        /// Stores a value in the DHT at the given key
471        /// </summary>
472        /// <param name="sKey">Key of DHT Entry</param>
473        /// <param name="sValue">Value of DHT Entry</param>
474        /// <returns>True, when storing is completed!</returns>
475        public bool SynchStore(string sKey, string sData)
476        {
477            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
478        }
479        /// <summary>
480        /// Callback for a the synchronized store method
481        /// </summary>
482        /// <param name="rr"></param>
483        private void OnSynchStoreCompleted(StoreResult sr)
484        {
485            ResponseWait rw;
486            if (this.waitDict.TryGetValue(sr.Guid, out rw))
487            {
488                // if Status == Error, than the version of the value is out of date.
489                // There is a versioning system in the DHT. So you must retrieve the
490                // key and than store the new value
491                if (sr.Status == OperationStatus.Failure)
492                {
493                    byte[] byteTemp = this.SynchRetrieve(rw.key);
494
495                    // Only try a second time. When it's still not possible, abort storing
496                    AutoResetEvent are = new AutoResetEvent(false);
497                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
498                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
499
500                    waitDict.Add(g, rw2);
501                    // blocking till response
502                    are.WaitOne();
503                    rw.success = rw2.success;
504                    rw.Message = rw2.Message;
505                }
506                else
507                {
508                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
509                    if (sr.Status == OperationStatus.KeyNotFound)
510                        rw.success = false;
511                    else
512                        rw.success = true;
513                }
514            }
515            //unblock WaitHandle in the synchronous method
516            rw.WaitHandle.Set();
517            // don't know if this accelerates the system...
518            this.waitDict.Remove(sr.Guid);
519        }
520
521        private void OnSecondTrialStoring(StoreResult sr)
522        {
523            ResponseWait rw;
524            if (this.waitDict.TryGetValue(sr.Guid, out rw))
525            {
526                if (sr.Status == OperationStatus.Failure)
527                {
528                    //Abort storing, because it's already the second trial
529                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
530                    rw.success = false;
531                }
532                else
533                {
534                    //works the second trial, so it was the versioning system
535                    rw.success = true;
536                }
537            }
538            //unblock WaitHandle in the synchronous method
539            rw.WaitHandle.Set();
540            // don't know if this accelerates the system...
541            this.waitDict.Remove(sr.Guid);
542        }
543
544        #endregion
545
546        /// <summary>
547        /// Get the value of the given DHT Key or null, if it doesn't exist.
548        /// For synchronous environments use the Synch* methods.
549        /// </summary>
550        /// <param name="sKey">Key of DHT Entry</param>
551        /// <returns>Value of DHT Entry</returns>
552        public byte[] SynchRetrieve(string sKey)
553        {
554            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
555
556            AutoResetEvent are = new AutoResetEvent(false);
557            // this method returns always a GUID to distinguish between asynchronous actions
558
559            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
560             
561            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
562           
563            ResponseWait rw = new ResponseWait() {WaitHandle = are };
564           
565            waitDict.Add(g,rw  );
566            // blocking till response
567            are.WaitOne();
568
569            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
570
571            //Rückgabe der Daten
572            return rw.Message;
573        }
574
575        /// <summary>
576        /// Callback for a the synchronized retrieval method
577        /// </summary>
578        /// <param name="rr"></param>
579        private void OnSynchRetrieveCompleted(RetrieveResult rr)
580        {
581            LogToMonitor(rr.Guid.ToString());
582           
583            ResponseWait rw;
584
585            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
586
587            if (this.waitDict.TryGetValue(rr.Guid, out rw))
588            {
589                // successful as long as no error occured
590                rw.success = true;
591                if (rr.Status == OperationStatus.Failure)
592                {
593                    rw.Message = null;
594                    rw.success = false;
595                }
596                else if (rr.Status == OperationStatus.KeyNotFound)
597                    rw.Message = null;
598                else
599                    rw.Message = rr.Data;
600
601                //unblock WaitHandle in the synchronous method
602                rw.WaitHandle.Set();
603                // don't know if this accelerates the system...
604                this.waitDict.Remove(rr.Guid);
605            }
606        }
607        /// <summary>
608        /// Removes a key/value pair out of the DHT
609        /// </summary>
610        /// <param name="sKey">Key of the DHT Entry</param>
611        /// <returns>True, when removing is completed!</returns>
612        public bool SynchRemove(string sKey)
613        {
614            LogToMonitor("Begin SynchRemove. Key: " + sKey);
615
616            AutoResetEvent are = new AutoResetEvent(false);
617            // this method returns always a GUID to distinguish between asynchronous actions
618            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
619            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
620
621            ResponseWait rw = new ResponseWait() { WaitHandle = are };
622
623            waitDict.Add(g, rw);
624            // blocking till response
625            are.WaitOne();
626
627            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
628
629            return rw.success;
630        }
631
632        /// <summary>
633        /// Callback for a the synchronized remove method
634        /// </summary>
635        /// <param name="rr"></param>
636        private void OnSynchRemoveCompleted(RemoveResult rr)
637        {
638            ResponseWait rw;
639            if (this.waitDict.TryGetValue(rr.Guid, out rw))
640            {
641                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
642
643                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
644                    rw.success = false;
645                else
646                    rw.success = true;
647
648                //unblock WaitHandle in the synchronous method
649                rw.WaitHandle.Set();
650                // don't know if this accelerates the system...
651                this.waitDict.Remove(rr.Guid);
652            }
653        }
654
655        #endregion
656
657        /// <summary>
658        /// To log the internal state in the Monitoring Software of P@play
659        /// </summary>
660        public void LogInternalState()
661        {
662            if (this.dht != null)
663            {
664                this.dht.LogInternalState();
665            }
666        }
667
668        public void LogToMonitor(string sTextToLog)
669        {
670            if(AllowLoggingToMonitor)
671                Log.Debug(sTextToLog);
672        }
673
674    }
675
676    public class PeerId
677    {
678        private readonly string stringId;
679        private readonly byte[] byteId;
680
681        private const uint OFFSET_BASIS = 2166136261;
682        private const uint PRIME = 16777619; // 2^24 + 2^8 + 0x93
683        private readonly int hashCode;
684
685        public PeerId(OverlayAddress oAddress)
686        {
687            if (oAddress != null)
688            {
689                this.stringId = oAddress.ToString();
690                this.byteId = oAddress.ToByteArray();
691
692                // FNV-1 hashing
693                uint fnvHash = OFFSET_BASIS;
694                foreach (byte b in byteId)
695                {
696                    fnvHash = (fnvHash * PRIME) ^ b;
697                }
698                hashCode = (int)fnvHash;
699            }
700        }
701
702        /// <summary>
703        /// Returns true when the byteId content is identical
704        /// </summary>
705        /// <param name="right"></param>
706        /// <returns></returns>
707        public override bool Equals(object right)
708        {
709            /* standard checks for reference types */
710
711            if (right == null)
712                return false;
713
714            if (object.ReferenceEquals(this, right))
715                return true;
716
717            if (this.GetType() != right.GetType())
718                return false;
719
720            // actual content comparison
721            return this == (PeerId)right;
722        }
723
724        public static bool operator ==(PeerId left, PeerId right)
725        {
726            // because it's possible that one parameter is null, catch this exception
727            /* Begin add - Christian Arnold, 2009.12.16, must cast the parameters, otherwise --> recursion */
728            if ((object)left == (object)right)
729                return true;
730
731            if ((object)left == null || (object)right == null)
732                return false;
733            /* End add */
734
735            if (left.hashCode != right.hashCode)
736                return false;
737
738            if (left.byteId.Length != right.byteId.Length)
739                return false;
740
741            for (int i = 0; i < left.byteId.Length; i++)
742            {
743                // uneven pattern content
744                if (left.byteId[i] != right.byteId[i])
745                    return false;
746            }
747
748            return true;
749        }
750
751        public static bool operator !=(PeerId left, PeerId right)
752        {
753            return !(left == right);
754        }
755
756        public override int GetHashCode()
757        {
758            return hashCode;
759        }
760
761        public override string ToString()
762        {
763            return this.stringId;
764        }
765
766        public byte[] ToByteArray()
767        {
768            return this.byteId;
769        }
770    }
771}
Note: See TracBrowser for help on using the repository browser.