source: trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs @ 1287

Last change on this file since 1287 was 1287, checked in by Matthäus Wander, 12 years ago

set P@P monitor client application type to CT2

File size: 29.9 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using PeersAtPlay.P2PStorage.DHT;
21using PeersAtPlay.P2PStorage.FullMeshDHT;
22using PeersAtPlay.P2POverlay.Bootstrapper;
23using PeersAtPlay.P2POverlay;
24using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
25using PeersAtPlay.P2POverlay.FullMeshOverlay;
26using PeersAtPlay.P2PLink;
27using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper;
28using System.Threading;
29using Cryptool.PluginBase.Control;
30using System.ComponentModel;
31using PeersAtPlay;
32using PeersAtPlay.Util.Logging;
33using Gears4Net;
34
35/* TODO:
36 * - Catch errors, which can occur when using the DHT (network-based errors)
37 */
38
39/* - Synchronous functions successfully tested (store, retrieve)
40 * - The DHT has an integrated versioning system. When a peer wants
41 *   to store data in an entry, which already holds data, the version
42 *   number will be compared with the peers' version number. If the
43 *   peer hasn't read/write the entry the last time, the storing instruction
44 *   will be rejected. You must first read the actual data and than you can
45 *   store your data in this entry...
46 *
47 * INFO:
48 * - Have considered the DHT-own versioning system in the SynchStore method.
49 *   If this versioning system will be abolished, the SynchStore method must
50 *   be change!
51 * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
52 *   certification stuff runs now
53 *
54 * TODO:
55 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
56 * - Testing asynchronous methods incl. EventHandlers
57 */
58namespace Cryptool.Plugins.PeerToPeer
59{
60    /* Advantages of this wrapper class:
61     * - The PeerAtPlay-Libraries are only referenced in this project
62     *   --> so they're easy to update
63     * - PeerAtPlay only works with asynchronous methods, so this class
64     *   "synchronizes" this methods.
65     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
66     *   issue is obfuscated by this wrapper class
67     */
68    /// <summary>
69    /// Wrapper class to integrate peer@play environment into CrypTool.
70    /// This class synchronizes asynchronous methods for easier usage in CT2. For future
71    /// </summary>
72    public class P2PBase
73    {
74        #region Delegates and Events for asynchronous p2p functions
75
76        public delegate void SystemJoined();
77        public event SystemJoined OnSystemJoined;
78
79        public delegate void SystemLeft();
80        public event SystemLeft OnSystemLeft;
81
82        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
83        public event P2PMessageReceived OnP2PMessageReceived;
84
85        /// <summary>
86        /// returns true if key-value-pair is successfully stored in the DHT
87        /// </summary>
88        /// <param name="result"></param>
89        public delegate void DHTStoreCompleted(bool result);
90        public event DHTStoreCompleted OnDhtStore_Completed;
91
92        public delegate void DHTLoadCompleted(byte[] loadedData);
93        public event DHTLoadCompleted OnDhtLoad_Completed;
94
95        /// <summary>
96        /// returns true if key was found and removed successfully from the DHT
97        /// </summary>
98        /// <param name="result"></param>
99        public delegate void DHTRemoveCompleted(bool result);
100        public event DHTRemoveCompleted OnDhtRemove_Completed;
101
102        #endregion
103
104        #region Variables
105
106        private bool allowLoggingToMonitor;
107        /// <summary>
108        /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
109        /// </summary>
110        public bool AllowLoggingToMonitor
111        {
112            get { return this.allowLoggingToMonitor; }
113            set { this.allowLoggingToMonitor = value; }
114        }
115
116        private const bool ALLOW_LOGGING_TO_MONITOR = true;
117
118        private bool started = false;
119        /// <summary>
120        /// True if system was successfully joined, false if system is COMPLETELY left
121        /// </summary>
122        public bool Started
123        {
124            get { return this.started; }
125            private set { this.started = value; } 
126        }
127
128        private IDHT dht;
129        private IP2PLinkManager linkmanager;
130        private IBootstrapper bootstrapper;
131        private P2POverlay overlay;
132        private AutoResetEvent systemJoined;
133        private AutoResetEvent systemLeft;
134
135        /// <summary>
136        /// Dictionary for synchronizing asynchronous DHT retrieves.
137        /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
138        /// </summary>
139        private Dictionary<Guid, ResponseWait> waitDict;
140
141        #endregion
142
143        public P2PBase()
144        {
145            this.waitDict = new Dictionary<Guid, ResponseWait>();
146            this.systemJoined = new AutoResetEvent(false);
147            this.systemLeft = new AutoResetEvent(false);
148        }
149
150        #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
151
152        /// <summary>
153        /// Initializing is the first step to build a new or access an existing p2p network
154        /// </summary>
155        /// <param name="sUserName">Choose an individual name for the user</param>
156        /// <param name="sWorldName">fundamental: two peers are only in the SAME
157        /// P2P system, when they initialized the SAME WORLD!</param>
158        /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
159        /// (tunneling the P2P connection through NATs and Firewalls), you have to
160        /// set this flag to true</param>
161        /// <param name="linkManagerType"></param>
162        /// <param name="bsType"></param>
163        /// <param name="overlayType"></param>
164        /// <param name="dhtType"></param>
165        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
166        {
167            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
168
169            Scheduler scheduler = new STAScheduler("pap");
170
171            switch (linkManagerType)
172            {
173                case P2PLinkManagerType.Snal:
174                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
175                    // NAT-Traversal stuff needs a different Snal-Version
176                    this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
177
178                    PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
179                    settings.LoadDefaults();
180                    settings.ConnectInternal = true;
181                    settings.LocalReceivingPort = 0;
182                    settings.UseLocalAddressDetection = false;
183                    settings.AutoReconnect = false;
184                    settings.NoDelay = false;
185                    settings.ReuseAddress = false;
186                    settings.UseNetworkMonitorServer = true;
187
188                    this.linkmanager.Settings = settings;
189                    this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
190
191                    break;
192                default:
193                    throw (new NotImplementedException());
194            }
195            switch (bsType)
196            {
197                case P2PBootstrapperType.LocalMachineBootstrapper:
198                    //LocalMachineBootstrapper = only local connection (runs only on one machine)
199                    this.bootstrapper = new LocalMachineBootstrapper();
200                    break;
201                case P2PBootstrapperType.IrcBootstrapper:
202                    // setup nat traversal stuff
203                    LogToMonitor("Init Bootstrapper: Using NAT Traversal stuff");
204                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.DelaySymmetricResponse = true;
205                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.IncludeSymmetricInResponse = false;
206                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
207
208                    this.bootstrapper = new IrcBootstrapper(scheduler);
209                    break;
210                default:
211                    throw (new NotImplementedException());
212            }
213            switch (overlayType)
214            {
215                case P2POverlayType.FullMeshOverlay:
216                    // changing overlay example: this.overlay = new ChordOverlay();
217                    this.overlay = new FullMeshOverlay(scheduler);
218                    break;
219                default:
220                    throw (new NotImplementedException());
221            }
222            switch (dhtType)
223            {
224                case P2PDHTType.FullMeshDHT:
225                    this.dht = new FullMeshDHT(scheduler);
226                    break;
227                default:
228                    throw (new NotImplementedException());
229            }
230            #endregion
231
232            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
233            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
234            this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
235
236            this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
237        }
238
239        /// <summary>
240        /// Starts the P2P System. When the given P2P world doesn't exist yet,
241        /// inclusive creating the and bootstrapping to the P2P network.
242        /// In either case joining the P2P world.
243        /// This synchronized method returns true not before the peer has
244        /// successfully joined the network (this may take one or two minutes).
245        /// </summary>
246        /// <returns>True, if the peer has completely joined the p2p network</returns>
247        public bool SynchStart()
248        {
249            //Start != system joined
250            //Only starts the system asynchronous, the possible callback is useless,
251            //because it's invoked before the peer completly joined the P2P system
252            this.dht.BeginStart(null);
253            //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
254            this.systemJoined.WaitOne();
255            return true;
256        }
257
258        /// <summary>
259        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
260        /// </summary>
261        /// <returns>True, if the peer has completely disjoined the p2p network</returns>
262        public bool SynchStop()
263        {
264            if (this.dht != null)
265            {
266                this.dht.BeginStop(null);
267                //don't stop anything else, because BOOM
268
269                //wait till systemLeft Event is invoked
270                this.systemLeft.WaitOne();
271            }
272            return true;
273        }
274
275        /// <summary>
276        /// Asynchronously starting the peer. When the given P2P world doesn't
277        /// exist yet, inclusive creating the and bootstrapping to the P2P network.
278        /// In either case joining the P2P world. To ensure that peer has successfully
279        /// joined the p2p world, catch the event OnSystemJoined.
280        /// </summary>
281        public void AsynchStart()
282        {
283            // no callback usefull, because starting and joining isn't the same
284            // everything else is done by the EventHandler OnDHT_SystemJoined
285            this.dht.BeginStart(null);
286        }
287
288        /// <summary>
289        /// Asynchronously disjoining the actual peer of the p2p system. To ensure
290        /// disjoining, catch the event OnDHT_SystemLeft.
291        /// </summary>
292        public void AsynchStop()
293        {
294            if (this.dht != null)
295            {
296                // no callback usefull.
297                // Everything else is done by the EventHandler OnDHT_SystemLeft
298                this.dht.BeginStop(null);
299            }
300        }
301
302        #endregion
303
304        /// <summary>
305        /// Get PeerName of the actual peer
306        /// </summary>
307        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
308        /// <returns>PeerID as a String</returns>
309        public PeerId GetPeerID(out string sPeerName)
310        {
311            sPeerName = this.linkmanager.UserName;
312            return new PeerId(this.overlay.LocalAddress);
313        }
314
315        /// <summary>
316        /// Construct PeerId object for a specific byte[] id
317        /// </summary>
318        /// <param name="byteId">overlay address as byte array</param>
319        /// <returns>corresponding PeerId for given byte[] id</returns>
320        public PeerId GetPeerID(byte[] byteId)
321        {
322            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
323            return new PeerId(this.overlay.GetAddress(byteId));
324        }
325
326        // overlay.LocalAddress = Overlay-Peer-Address/Names
327        public void SendToPeer(byte[] data, byte[] destinationPeer)
328        {
329            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
330            int realStackSize = this.overlay.GetHeaderSize() + data.Length;
331            ByteStack stackData = new ByteStack(realStackSize);
332
333            stackData.Push(data);
334
335            OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
336            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
337                this.overlay.LocalAddress, destinationAddr, stackData);
338            this.overlay.Send(overlayMsg);
339        }
340
341        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
342        {
343            if (OnP2PMessageReceived != null)
344            {
345                PeerId pid = new PeerId(e.Message.Source);
346                /* You have to fire this event asynchronous, because the main
347                 * thread will be stopped in this wrapper class for synchronizing
348                 * the asynchronous stuff (AutoResetEvent) --> so this could run
349                 * into a deadlock, when you fire this event synchronous (normal Invoke)
350                 * ATTENTION: This could change the invocation order!!! In my case
351                              no problem, but maybe in future cases... */
352                OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize),null,null);
353                //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
354            }
355        }
356
357        #region Event Handling (System Joined, Left and Message Received)
358
359        private void OnDHT_SystemJoined(object sender, EventArgs e)
360        {
361            if (OnSystemJoined != null)
362                OnSystemJoined();
363            this.systemJoined.Set();
364            Started = true;
365        }
366
367        private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
368        {
369            if (OnSystemLeft != null)
370                OnSystemLeft();
371            // as an experiment
372            this.dht = null;
373            this.systemLeft.Set();
374            Started = false;
375
376            LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
377        }
378
379        #endregion
380
381        /* Attention: The asynchronous methods are not tested at the moment */
382        #region Asynchronous Methods incl. Callbacks
383
384        /// <summary>
385        /// Asynchronously retrieving a key from the DHT. To get value, catch
386        /// event OnDhtLoad_Completed.
387        /// </summary>
388        /// <param name="sKey">Existing key in DHT</param>
389        public void AsynchRetrieve(string sKey)
390        {
391            Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
392        }
393        private void OnAsynchRetrieve_Completed(RetrieveResult rr)
394        {
395            if (OnDhtLoad_Completed != null)
396            {
397                OnDhtLoad_Completed(rr.Data);
398            }
399        }
400
401        /// <summary>
402        /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
403        /// storing is completed, catch event OnDhtStore_Completed.
404        /// </summary>
405        /// <param name="sKey"></param>
406        /// <param name="sValue"></param>
407        public void AsynchStore(string sKey, string sValue)
408        {
409            //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
410            this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
411        }
412
413        private void OnAsynchStore_Completed(StoreResult sr)
414        {
415            if (OnDhtStore_Completed != null)
416            {
417                if (sr.Status == OperationStatus.Success)
418                    OnDhtStore_Completed(true);
419                else
420                    OnDhtStore_Completed(false);
421            }
422               
423        }
424
425        /// <summary>
426        /// Asynchronously removing an existing key out of the DHT. To ensure
427        /// that removing is completed, catch event OnDhtRemove_Completed.
428        /// </summary>
429        /// <param name="sKey"></param>
430        public void AsynchRemove(string sKey)
431        {
432            //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
433            this.dht.Remove(OnAsynchRemove_Completed, sKey);
434        }
435        private void OnAsynchRemove_Completed(RemoveResult rr)
436        {
437            if (OnDhtRemove_Completed != null)
438            {
439                if(rr.Status == OperationStatus.Success)
440                    OnDhtRemove_Completed(true);
441                else
442                    OnDhtRemove_Completed(false);
443            }
444        }
445
446        #endregion
447
448        #region Synchronous Methods incl. Callbacks
449
450        #region SynchStore incl.Callback and SecondTrialCallback
451
452        /// <summary>
453        /// Stores a value in the DHT at the given key
454        /// </summary>
455        /// <param name="sKey">Key of DHT Entry</param>
456        /// <param name="byteData">Value of DHT Entry</param>
457        /// <returns>True, when storing is completed!</returns>
458        public bool SynchStore(string sKey, byte[] byteData)
459        {
460            LogToMonitor("Begin: SynchStore. Key: " + sKey + ", Data: " + Encoding.UTF8.GetString(byteData));
461            AutoResetEvent are = new AutoResetEvent(false);
462            // this method returns always a GUID to distinguish between asynchronous actions
463            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
464
465            ResponseWait rw = new ResponseWait() { WaitHandle = are, key=sKey , value = byteData };
466
467            waitDict.Add(g, rw);
468            //blocking till response
469            are.WaitOne();
470
471            LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
472
473            return rw.success;
474        }
475
476        /// <summary>
477        /// Stores a value in the DHT at the given key
478        /// </summary>
479        /// <param name="sKey">Key of DHT Entry</param>
480        /// <param name="sValue">Value of DHT Entry</param>
481        /// <returns>True, when storing is completed!</returns>
482        public bool SynchStore(string sKey, string sData)
483        {
484            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
485        }
486        /// <summary>
487        /// Callback for a the synchronized store method
488        /// </summary>
489        /// <param name="rr"></param>
490        private void OnSynchStoreCompleted(StoreResult sr)
491        {
492            ResponseWait rw;
493            if (this.waitDict.TryGetValue(sr.Guid, out rw))
494            {
495                // if Status == Error, than the version of the value is out of date.
496                // There is a versioning system in the DHT. So you must retrieve the
497                // key and than store the new value
498                if (sr.Status == OperationStatus.Failure)
499                {
500                    byte[] byteTemp = this.SynchRetrieve(rw.key);
501
502                    // Only try a second time. When it's still not possible, abort storing
503                    AutoResetEvent are = new AutoResetEvent(false);
504                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
505                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
506
507                    waitDict.Add(g, rw2);
508                    // blocking till response
509                    are.WaitOne();
510                    rw.success = rw2.success;
511                    rw.Message = rw2.Message;
512                }
513                else
514                {
515                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
516                    if (sr.Status == OperationStatus.KeyNotFound)
517                        rw.success = false;
518                    else
519                        rw.success = true;
520                }
521            }
522            //unblock WaitHandle in the synchronous method
523            rw.WaitHandle.Set();
524            // don't know if this accelerates the system...
525            this.waitDict.Remove(sr.Guid);
526        }
527
528        private void OnSecondTrialStoring(StoreResult sr)
529        {
530            ResponseWait rw;
531            if (this.waitDict.TryGetValue(sr.Guid, out rw))
532            {
533                if (sr.Status == OperationStatus.Failure)
534                {
535                    //Abort storing, because it's already the second trial
536                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
537                    rw.success = false;
538                }
539                else
540                {
541                    //works the second trial, so it was the versioning system
542                    rw.success = true;
543                }
544            }
545            //unblock WaitHandle in the synchronous method
546            rw.WaitHandle.Set();
547            // don't know if this accelerates the system...
548            this.waitDict.Remove(sr.Guid);
549        }
550
551        #endregion
552
553        /// <summary>
554        /// Get the value of the given DHT Key or null, if it doesn't exist.
555        /// For synchronous environments use the Synch* methods.
556        /// </summary>
557        /// <param name="sKey">Key of DHT Entry</param>
558        /// <returns>Value of DHT Entry</returns>
559        public byte[] SynchRetrieve(string sKey)
560        {
561            LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
562
563            AutoResetEvent are = new AutoResetEvent(false);
564            // this method returns always a GUID to distinguish between asynchronous actions
565
566            LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
567             
568            Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
569           
570            ResponseWait rw = new ResponseWait() {WaitHandle = are };
571           
572            waitDict.Add(g,rw  );
573            // blocking till response
574            are.WaitOne();
575
576            LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
577
578            //Rückgabe der Daten
579            return rw.Message;
580        }
581
582        /// <summary>
583        /// Callback for a the synchronized retrieval method
584        /// </summary>
585        /// <param name="rr"></param>
586        private void OnSynchRetrieveCompleted(RetrieveResult rr)
587        {
588            LogToMonitor(rr.Guid.ToString());
589           
590            ResponseWait rw;
591
592            LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
593
594            if (this.waitDict.TryGetValue(rr.Guid, out rw))
595            {
596                // successful as long as no error occured
597                rw.success = true;
598                if (rr.Status == OperationStatus.Failure)
599                {
600                    rw.Message = null;
601                    rw.success = false;
602                }
603                else if (rr.Status == OperationStatus.KeyNotFound)
604                    rw.Message = null;
605                else
606                    rw.Message = rr.Data;
607
608                //unblock WaitHandle in the synchronous method
609                rw.WaitHandle.Set();
610                // don't know if this accelerates the system...
611                this.waitDict.Remove(rr.Guid);
612            }
613        }
614        /// <summary>
615        /// Removes a key/value pair out of the DHT
616        /// </summary>
617        /// <param name="sKey">Key of the DHT Entry</param>
618        /// <returns>True, when removing is completed!</returns>
619        public bool SynchRemove(string sKey)
620        {
621            LogToMonitor("Begin SynchRemove. Key: " + sKey);
622
623            AutoResetEvent are = new AutoResetEvent(false);
624            // this method returns always a GUID to distinguish between asynchronous actions
625            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
626            //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
627
628            ResponseWait rw = new ResponseWait() { WaitHandle = are };
629
630            waitDict.Add(g, rw);
631            // blocking till response
632            are.WaitOne();
633
634            LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
635
636            return rw.success;
637        }
638
639        /// <summary>
640        /// Callback for a the synchronized remove method
641        /// </summary>
642        /// <param name="rr"></param>
643        private void OnSynchRemoveCompleted(RemoveResult rr)
644        {
645            ResponseWait rw;
646            if (this.waitDict.TryGetValue(rr.Guid, out rw))
647            {
648                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
649
650                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
651                    rw.success = false;
652                else
653                    rw.success = true;
654
655                //unblock WaitHandle in the synchronous method
656                rw.WaitHandle.Set();
657                // don't know if this accelerates the system...
658                this.waitDict.Remove(rr.Guid);
659            }
660        }
661
662        #endregion
663
664        /// <summary>
665        /// To log the internal state in the Monitoring Software of P@play
666        /// </summary>
667        public void LogInternalState()
668        {
669            if (this.dht != null)
670            {
671                this.dht.LogInternalState();
672            }
673        }
674
675        public void LogToMonitor(string sTextToLog)
676        {
677            if(AllowLoggingToMonitor)
678                Log.Debug(sTextToLog);
679        }
680
681    }
682
683    public class PeerId
684    {
685        private readonly string stringId;
686        private readonly byte[] byteId;
687
688        private const uint OFFSET_BASIS = 2166136261;
689        private const uint PRIME = 16777619; // 2^24 + 2^8 + 0x93
690        private readonly int hashCode;
691
692        public PeerId(OverlayAddress oAddress)
693        {
694            if (oAddress != null)
695            {
696                this.stringId = oAddress.ToString();
697                this.byteId = oAddress.ToByteArray();
698
699                // FNV-1 hashing
700                uint fnvHash = OFFSET_BASIS;
701                foreach (byte b in byteId)
702                {
703                    fnvHash = (fnvHash * PRIME) ^ b;
704                }
705                hashCode = (int)fnvHash;
706            }
707        }
708
709        /// <summary>
710        /// Returns true when the byteId content is identical
711        /// </summary>
712        /// <param name="right"></param>
713        /// <returns></returns>
714        public override bool Equals(object right)
715        {
716            /* standard checks for reference types */
717
718            if (right == null)
719                return false;
720
721            if (object.ReferenceEquals(this, right))
722                return true;
723
724            if (this.GetType() != right.GetType())
725                return false;
726
727            // actual content comparison
728            return this == (PeerId)right;
729        }
730
731        public static bool operator ==(PeerId left, PeerId right)
732        {
733            // because it's possible that one parameter is null, catch this exception
734            /* Begin add - Christian Arnold, 2009.12.16, must cast the parameters, otherwise --> recursion */
735            if ((object)left == (object)right)
736                return true;
737
738            if ((object)left == null || (object)right == null)
739                return false;
740            /* End add */
741
742            if (left.hashCode != right.hashCode)
743                return false;
744
745            if (left.byteId.Length != right.byteId.Length)
746                return false;
747
748            for (int i = 0; i < left.byteId.Length; i++)
749            {
750                // uneven pattern content
751                if (left.byteId[i] != right.byteId[i])
752                    return false;
753            }
754
755            return true;
756        }
757
758        public static bool operator !=(PeerId left, PeerId right)
759        {
760            return !(left == right);
761        }
762
763        public override int GetHashCode()
764        {
765            return hashCode;
766        }
767
768        public override string ToString()
769        {
770            return this.stringId;
771        }
772
773        public byte[] ToByteArray()
774        {
775            return this.byteId;
776        }
777    }
778}
Note: See TracBrowser for help on using the repository browser.