Ignore:
Timestamp:
May 31, 2010, 7:22:34 PM (12 years ago)
Author:
Paul Lelgemann
Message:

o CrypP2P: refactored P2PBase, removed race condition and unused code

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypP2P/Internal/P2PBase.cs

    r1537 r1545  
    1 /* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
     1/*
     2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
    23
    34   Licensed under the Apache License, Version 2.0 (the "License");
     
    1516
    1617using System;
    17 using System.Collections.Generic;
    1818using System.Text;
     19using System.Threading;
     20using Cryptool.Plugins.PeerToPeer.Internal;
     21using Gears4Net;
     22using PeersAtPlay;
     23using PeersAtPlay.Monitoring;
     24using PeersAtPlay.P2PLink;
     25using PeersAtPlay.P2PLink.SnalNG;
     26using PeersAtPlay.P2POverlay;
     27using PeersAtPlay.P2POverlay.Bootstrapper;
     28using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
     29using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
     30using PeersAtPlay.P2POverlay.FullMeshOverlay;
    1931using PeersAtPlay.P2PStorage.DHT;
    2032using PeersAtPlay.P2PStorage.FullMeshDHT;
    21 using PeersAtPlay.P2POverlay.Bootstrapper;
    22 using PeersAtPlay.P2POverlay;
    23 using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
    24 using PeersAtPlay.P2POverlay.FullMeshOverlay;
    25 using PeersAtPlay.P2PLink;
    26 using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
    27 using System.Threading;
    28 using PeersAtPlay;
    2933using PeersAtPlay.Util.Logging;
    30 using Gears4Net;
    31 using Cryptool.Plugins.PeerToPeer.Internal;
     34using Settings = PeersAtPlay.P2PLink.SnalNG.Settings;
    3235
    3336/* TODO:
    34  * - Catch errors, which can occur when using the DHT (network-based errors)
    35  */
    36 
    37 /* - Synchronous functions successfully tested (store, retrieve)
    38  * - The DHT has an integrated versioning system. When a peer wants
    39  *   to store data in an entry, which already holds data, the version
    40  *   number will be compared with the peers' version number. If the
    41  *   peer hasn't read/write the entry the last time, the storing instruction
    42  *   will be rejected. You must first read the actual data and than you can
    43  *   store your data in this entry...
    44  *
    45  * INFO:
    46  * - Have considered the DHT-own versioning system in the SynchStore method.
    47  *   If this versioning system will be abolished, the SynchStore method must
    48  *   be change!
    49  * - Everything switched to SnalNG, SimpleSnal isn't used anymore, because
    50  *   certification stuff runs now
    51  *
    52  * TODO:
    5337 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
    5438 * - Testing asynchronous methods incl. EventHandlers
    5539 */
     40
    5641namespace Cryptool.P2P.Internal
    5742{
    58     /* Advantages of this wrapper class:
    59      * - The PeerAtPlay-Libraries are only referenced in this project
    60      *   --> so they're easy to update
    61      * - PeerAtPlay only works with asynchronous methods, so this class
    62      *   "synchronizes" this methods.
    63      * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
    64      *   issue is obfuscated by this wrapper class
    65      */
    6643    /// <summary>
    67     /// Wrapper class to integrate peer@play environment into CrypTool.
    68     /// This class synchronizes asynchronous methods for easier usage in CT2. For future
     44    ///   Wrapper class to integrate peer@play environment into CrypTool.
     45    ///   This class synchronizes asynchronous methods for easier usage in CT2.
    6946    /// </summary>
    7047    public class P2PBase
    7148    {
    72         #region Delegates and Events for asynchronous p2p functions
    73 
     49        #region Variables
     50
     51        private readonly AutoResetEvent _systemJoined;
     52        private readonly AutoResetEvent _systemLeft;
     53        private IBootstrapper _bootstrapper;
     54        private IDHT _dht;
     55        private IP2PLinkManager _linkmanager;
     56        private P2POverlay _overlay;
     57        private IVersionedDHT _versionedDht;
     58
     59        /// <summary>
     60        ///   True if system was successfully joined, false if system is COMPLETELY left
     61        /// </summary>
     62        public bool Started { get; private set; }
     63
     64        /// <summary>
     65        ///   True if the underlying peer to peer system has been fully initialized
     66        /// </summary>
     67        public bool Initialized { get; private set; }
     68
     69        #endregion
     70
     71        #region Delegates
     72
     73        public event P2PMessageReceived OnP2PMessageReceived;
     74        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
     75
     76        public event SystemJoined OnSystemJoined;
    7477        public delegate void SystemJoined();
    75         public event SystemJoined OnSystemJoined;
    76 
     78
     79        public event SystemLeft OnSystemLeft;
    7780        public delegate void SystemLeft();
    78         public event SystemLeft OnSystemLeft;
    79 
    80         public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
    81         public event P2PMessageReceived OnP2PMessageReceived;
    82 
    83         /// <summary>
    84         /// returns true if key-value-pair is successfully stored in the DHT
    85         /// </summary>
    86         /// <param name="result"></param>
    87         public delegate void DHTStoreCompleted(bool result);
    88         public event DHTStoreCompleted OnDhtStore_Completed;
    89 
    90         public delegate void DHTLoadCompleted(byte[] loadedData);
    91         public event DHTLoadCompleted OnDhtLoad_Completed;
    92 
    93         /// <summary>
    94         /// returns true if key was found and removed successfully from the DHT
    95         /// </summary>
    96         /// <param name="result"></param>
    97         public delegate void DHTRemoveCompleted(bool result);
    98         public event DHTRemoveCompleted OnDhtRemove_Completed;
    99 
    100         #endregion
    101 
    102         #region Variables
    103 
    104         private bool allowLoggingToMonitor;
    105         /// <summary>
    106         /// If true, all kinds of actions will be logged in the PeersAtPlay LogMonitor.
    107         /// </summary>
    108         public bool AllowLoggingToMonitor
    109         {
    110             get { return this.allowLoggingToMonitor; }
    111             set { this.allowLoggingToMonitor = value; }
    112         }
    113 
    114         private const bool ALLOW_LOGGING_TO_MONITOR = true;
    115 
    116         private bool started = false;
    117         /// <summary>
    118         /// True if system was successfully joined, false if system is COMPLETELY left
    119         /// </summary>
    120         public bool Started
    121         {
    122             get { return this.started; }
    123             private set { this.started = value; }
    124         }
    125 
    126         private IDHT dht;
    127         private IP2PLinkManager linkmanager;
    128         private IBootstrapper bootstrapper;
    129         private P2POverlay overlay;
    130         private AutoResetEvent systemJoined;
    131         private AutoResetEvent systemLeft;
    132 
    133         /// <summary>
    134         /// Dictionary for synchronizing asynchronous DHT retrieves.
    135         /// Cryptool doesn't offers an asynchronous environment, so this workaround is necessary
    136         /// </summary>
    137         private Dictionary<Guid, ResponseWait> waitDict;
    13881
    13982        #endregion
     
    14184        public P2PBase()
    14285        {
    143             this.waitDict = new Dictionary<Guid, ResponseWait>();
    144             this.systemJoined = new AutoResetEvent(false);
    145             this.systemLeft = new AutoResetEvent(false);
    146         }
    147 
    148         #region Basic P2P Methods (Init, Start, Stop) - synch and asynch
    149 
    150         public void Initialize(P2PSettings p2PSettings)
    151         {
    152             Initialize(p2PSettings.PeerName, p2PSettings.WorldName, p2PSettings.LinkManager, p2PSettings.Bootstrapper, p2PSettings.Overlay, p2PSettings.Dht);
    153         }
    154 
    155         /// <summary>
    156         /// Initializing is the first step to build a new or access an existing p2p network
    157         /// </summary>
    158         /// <param name="sUserName">Choose an individual name for the user</param>
    159         /// <param name="sWorldName">fundamental: two peers are only in the SAME
    160         /// P2P system, when they initialized the SAME WORLD!</param>
    161         /// <param name="bolUseNatTraversal">When you want to use NAT-Traversal #
    162         /// (tunneling the P2P connection through NATs and Firewalls), you have to
    163         /// set this flag to true</param>
    164         /// <param name="linkManagerType"></param>
    165         /// <param name="bsType"></param>
    166         /// <param name="overlayType"></param>
    167         /// <param name="dhtType"></param>
    168         public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
    169         {
    170             #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
    171 
     86            Started = false;
     87            Initialized = false;
     88
     89            _systemJoined = new AutoResetEvent(false);
     90            _systemLeft = new AutoResetEvent(false);
     91        }
     92
     93        #region Basic P2P Methods (Init, Start, Stop)
     94
     95        /// <summary>
     96        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
     97        /// </summary>
     98        public void Initialize()
     99        {
    172100            Scheduler scheduler = new STAScheduler("pap");
    173101
    174             switch (linkManagerType)
     102            switch (P2PSettings.Default.LinkManager)
    175103            {
    176104                case P2PLinkManagerType.Snal:
    177105                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
     106
    178107                    // NAT-Traversal stuff needs a different Snal-Version
    179                     this.linkmanager = new PeersAtPlay.P2PLink.SnalNG.Snal(scheduler);
    180 
    181                     PeersAtPlay.P2PLink.SnalNG.Settings settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
     108                    _linkmanager = new Snal(scheduler);
     109
     110                    var settings = new Settings();
    182111                    settings.LoadDefaults();
    183112                    settings.ConnectInternal = true;
     
    189118                    settings.UseNetworkMonitorServer = true;
    190119
    191                     this.linkmanager.Settings = settings;
    192                     this.linkmanager.ApplicationType = PeersAtPlay.Monitoring.ApplicationType.CrypTool;
     120                    _linkmanager.Settings = settings;
     121                    _linkmanager.ApplicationType = ApplicationType.CrypTool;
    193122
    194123                    break;
     
    196125                    throw (new NotImplementedException());
    197126            }
    198             switch (bsType)
     127
     128            switch (P2PSettings.Default.Bootstrapper)
    199129            {
    200130                case P2PBootstrapperType.LocalMachineBootstrapper:
    201                     //LocalMachineBootstrapper = only local connection (runs only on one machine)
    202                     this.bootstrapper = new LocalMachineBootstrapper();
     131                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
     132                    _bootstrapper = new LocalMachineBootstrapper();
    203133                    break;
    204134                case P2PBootstrapperType.IrcBootstrapper:
     
    209139                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapper.Settings.SymmetricResponseDelay = 6000;
    210140
    211                     this.bootstrapper = new IrcBootstrapper(scheduler);
     141                    _bootstrapper = new IrcBootstrapper(scheduler);
    212142                    break;
    213143                default:
    214144                    throw (new NotImplementedException());
    215145            }
    216             switch (overlayType)
     146
     147            switch (P2PSettings.Default.Overlay)
    217148            {
    218149                case P2POverlayType.FullMeshOverlay:
    219150                    // changing overlay example: this.overlay = new ChordOverlay();
    220                     this.overlay = new FullMeshOverlay(scheduler);
     151                    _overlay = new FullMeshOverlay(scheduler);
    221152                    break;
    222153                default:
    223154                    throw (new NotImplementedException());
    224155            }
    225             switch (dhtType)
     156
     157            switch (P2PSettings.Default.Dht)
    226158            {
    227159                case P2PDHTType.FullMeshDHT:
    228                     this.dht = new FullMeshDHT(scheduler);
     160                    _dht = new FullMeshDHT(scheduler);
    229161                    break;
    230162                default:
    231163                    throw (new NotImplementedException());
    232164            }
    233             #endregion
    234 
    235             this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
    236             this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
    237             this.dht.SystemLeft += new EventHandler<SystemLeftEventArgs>(OnDHT_SystemLeft);
    238 
    239             this.dht.Initialize(sUserName, "", sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
    240         }
    241 
    242         /// <summary>
    243         /// Starts the P2P System. When the given P2P world doesn't exist yet,
    244         /// inclusive creating the and bootstrapping to the P2P network.
    245         /// In either case joining the P2P world.
    246         /// This synchronized method returns true not before the peer has
    247         /// successfully joined the network (this may take one or two minutes).
    248         /// </summary>
     165
     166            _overlay.MessageReceived += OverlayMessageReceived;
     167            _dht.SystemJoined += OnDhtSystemJoined;
     168            _dht.SystemLeft += OnDhtSystemLeft;
     169
     170            _versionedDht = (IVersionedDHT) _dht;
     171
     172            _dht.Initialize(P2PSettings.Default.PeerName, string.Empty, P2PSettings.Default.WorldName, _overlay,
     173                            _bootstrapper,
     174                            _linkmanager, null);
     175
     176            Initialized = true;
     177        }
     178
     179        /// <summary>
     180        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
     181        ///   inclusive creating the and bootstrapping to the P2P network.
     182        ///   In either case joining the P2P world.
     183        ///   This synchronized method returns true not before the peer has
     184        ///   successfully joined the network (this may take one or two minutes).
     185        /// </summary>
     186        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
     187        /// After validating the settings, this can be done by calling Initialize().</exception>
    249188        /// <returns>True, if the peer has completely joined the p2p network</returns>
    250189        public bool SynchStart()
    251190        {
    252             //Start != system joined
    253             //Only starts the system asynchronous, the possible callback is useless,
    254             //because it's invoked before the peer completly joined the P2P system
    255             this.dht.BeginStart(null);
    256             //Wait for event SystemJoined. When it's invoked, the peer completly joined the P2P system
    257             this.systemJoined.WaitOne();
     191            if (!Initialized)
     192            {
     193                throw new InvalidOperationException("Peer-to-peer is not initialized.");
     194            }
     195
     196            if (Started)
     197            {
     198                return true;
     199            }
     200
     201            _dht.BeginStart(null);
     202
     203            // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
     204            _systemJoined.WaitOne();
     205
    258206            return true;
    259207        }
    260208
    261209        /// <summary>
    262         /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
    263         /// </summary>
    264         /// <returns>True, if the peer has completely disjoined the p2p network</returns>
     210        ///   Disconnects from the peer-to-peer system.
     211        /// </summary>
     212        /// <returns>True, if the peer has completely left the p2p network</returns>
    265213        public bool SynchStop()
    266214        {
    267             if (this.dht != null)
    268             {
    269                 this.dht.BeginStop(null);
    270                 //don't stop anything else, because BOOM
    271 
    272                 //wait till systemLeft Event is invoked
    273                 this.systemLeft.WaitOne();
    274             }
     215            if (_dht == null) return false;
     216
     217            _dht.BeginStop(null);
     218
     219            // wait till systemLeft Event is invoked
     220            _systemLeft.WaitOne();
     221
    275222            return true;
    276223        }
    277224
    278         /// <summary>
    279         /// Asynchronously starting the peer. When the given P2P world doesn't
    280         /// exist yet, inclusive creating the and bootstrapping to the P2P network.
    281         /// In either case joining the P2P world. To ensure that peer has successfully
    282         /// joined the p2p world, catch the event OnSystemJoined.
    283         /// </summary>
    284         public void AsynchStart()
    285         {
    286             // no callback usefull, because starting and joining isn't the same
    287             // everything else is done by the EventHandler OnDHT_SystemJoined
    288             this.dht.BeginStart(null);
    289         }
    290 
    291         /// <summary>
    292         /// Asynchronously disjoining the actual peer of the p2p system. To ensure
    293         /// disjoining, catch the event OnDHT_SystemLeft.
    294         /// </summary>
    295         public void AsynchStop()
    296         {
    297             if (this.dht != null)
    298             {
    299                 // no callback usefull.
    300                 // Everything else is done by the EventHandler OnDHT_SystemLeft
    301                 this.dht.BeginStop(null);
    302             }
    303         }
    304 
    305         #endregion
    306 
    307         /// <summary>
    308         /// Get PeerName of the actual peer
    309         /// </summary>
    310         /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
     225        #endregion
     226
     227        #region Peer related method (GetPeerId, Send message to peer)
     228
     229        /// <summary>
     230        ///   Get PeerName of the actual peer
     231        /// </summary>
     232        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
    311233        /// <returns>PeerID as a String</returns>
    312         public PeerId GetPeerID(out string sPeerName)
    313         {
    314             sPeerName = this.linkmanager.UserName;
    315             return new PeerId(this.overlay.LocalAddress);
    316         }
    317 
    318         /// <summary>
    319         /// Construct PeerId object for a specific byte[] id
    320         /// </summary>
    321         /// <param name="byteId">overlay address as byte array</param>
     234        public PeerId GetPeerId(out string sPeerName)
     235        {
     236            sPeerName = _linkmanager.UserName;
     237            return new PeerId(_overlay.LocalAddress);
     238        }
     239
     240        /// <summary>
     241        ///   Construct PeerId object for a specific byte[] id
     242        /// </summary>
     243        /// <param name = "byteId">overlay address as byte array</param>
    322244        /// <returns>corresponding PeerId for given byte[] id</returns>
    323         public PeerId GetPeerID(byte[] byteId)
     245        public PeerId GetPeerId(byte[] byteId)
    324246        {
    325247            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
    326             return new PeerId(this.overlay.GetAddress(byteId));
     248            return new PeerId(_overlay.GetAddress(byteId));
    327249        }
    328250
     
    331253        {
    332254            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
    333             int realStackSize = this.overlay.GetHeaderSize() + data.Length;
    334             ByteStack stackData = new ByteStack(realStackSize);
    335 
     255            var realStackSize = _overlay.GetHeaderSize() + data.Length;
     256
     257            var stackData = new ByteStack(realStackSize);
    336258            stackData.Push(data);
    337259
    338             OverlayAddress destinationAddr = this.overlay.GetAddress(destinationPeer);
    339             OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
    340                 this.overlay.LocalAddress, destinationAddr, stackData);
    341             this.overlay.Send(overlayMsg);
    342         }
    343 
    344         private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
    345         {
    346             if (OnP2PMessageReceived != null)
    347             {
    348                 PeerId pid = new PeerId(e.Message.Source);
    349                 /* You have to fire this event asynchronous, because the main
     260            var destinationAddr = _overlay.GetAddress(destinationPeer);
     261            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
     262                                                _overlay.LocalAddress, destinationAddr, stackData);
     263
     264            _overlay.Send(overlayMsg);
     265        }
     266
     267        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
     268        {
     269            if (OnP2PMessageReceived == null) return;
     270
     271            var pid = new PeerId(e.Message.Source);
     272            /* You have to fire this event asynchronous, because the main
    350273                 * thread will be stopped in this wrapper class for synchronizing
    351274                 * the asynchronous stuff (AutoResetEvent) --> so this could run
     
    354277                              no problem, but maybe in future cases... */
    355278
    356                 // TODO: not safe: The delegate must have only one target
    357                 //OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
    358 
    359                 foreach (Delegate del in OnP2PMessageReceived.GetInvocationList())
    360                 {
    361                     del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
    362                 }
    363 
    364                 //OnP2PMessageReceived(pid, e.Message.Data.PopUTF8String());
    365             }
    366         }
     279            // TODO: not safe: The delegate must have only one target
     280            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
     281
     282            foreach (var del in OnP2PMessageReceived.GetInvocationList())
     283            {
     284                del.DynamicInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize));
     285            }
     286        }
     287
     288        #endregion
    367289
    368290        #region Event Handling (System Joined, Left and Message Received)
    369291
    370         private void OnDHT_SystemJoined(object sender, EventArgs e)
     292        private void OnDhtSystemJoined(object sender, EventArgs e)
    371293        {
    372294            if (OnSystemJoined != null)
    373295                OnSystemJoined();
    374             this.systemJoined.Set();
     296
     297            _systemJoined.Set();
    375298            Started = true;
    376299        }
    377300
    378         private void OnDHT_SystemLeft(object sender, SystemLeftEventArgs e)
     301        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
    379302        {
    380303            if (OnSystemLeft != null)
    381304                OnSystemLeft();
     305
    382306            // as an experiment
    383             this.dht = null;
    384             this.systemLeft.Set();
     307            _dht = null;
     308
     309            _systemLeft.Set();
    385310            Started = false;
    386 
    387             LogToMonitor("OnDHT_SystemLeft has nulled the dht and setted the systemLeft Waithandle");
    388         }
    389 
    390         #endregion
    391 
    392         /* Attention: The asynchronous methods are not tested at the moment */
    393         #region Asynchronous Methods incl. Callbacks
    394 
    395         /// <summary>
    396         /// Asynchronously retrieving a key from the DHT. To get value, catch
    397         /// event OnDhtLoad_Completed.
    398         /// </summary>
    399         /// <param name="sKey">Existing key in DHT</param>
    400         public void AsynchRetrieve(string sKey)
    401         {
    402             Guid g = this.dht.Retrieve(OnAsynchRetrieve_Completed, sKey);
    403         }
    404         private void OnAsynchRetrieve_Completed(RetrieveResult rr)
    405         {
    406             if (OnDhtLoad_Completed != null)
    407             {
    408                 OnDhtLoad_Completed(rr.Data);
    409             }
    410         }
    411 
    412         /// <summary>
    413         /// Asynchronously storing a Key-Value-Pair in the DHT. To ensure that
    414         /// storing is completed, catch event OnDhtStore_Completed.
    415         /// </summary>
    416         /// <param name="sKey"></param>
    417         /// <param name="sValue"></param>
    418         public void AsynchStore(string sKey, string sValue)
    419         {
    420             //this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue), IGNORE_DHT_VERSIONING_SYSTEM);
    421             this.dht.Store(OnAsynchStore_Completed, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
    422         }
    423 
    424         private void OnAsynchStore_Completed(StoreResult sr)
    425         {
    426             if (OnDhtStore_Completed != null)
    427             {
    428                 if (sr.Status == OperationStatus.Success)
    429                     OnDhtStore_Completed(true);
    430                 else
    431                     OnDhtStore_Completed(false);
    432             }
    433 
    434         }
    435 
    436         /// <summary>
    437         /// Asynchronously removing an existing key out of the DHT. To ensure
    438         /// that removing is completed, catch event OnDhtRemove_Completed.
    439         /// </summary>
    440         /// <param name="sKey"></param>
    441         public void AsynchRemove(string sKey)
    442         {
    443             //this.dht.Remove(OnAsynchRemove_Completed, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
    444             this.dht.Remove(OnAsynchRemove_Completed, sKey);
    445         }
    446         private void OnAsynchRemove_Completed(RemoveResult rr)
    447         {
    448             if (OnDhtRemove_Completed != null)
    449             {
    450                 if (rr.Status == OperationStatus.Success)
    451                     OnDhtRemove_Completed(true);
    452                 else
    453                     OnDhtRemove_Completed(false);
    454             }
    455         }
    456 
    457         #endregion
    458 
    459         #region Synchronous Methods incl. Callbacks
    460 
    461         #region SynchStore incl.Callback and SecondTrialCallback
    462 
    463         /// <summary>
    464         /// Stores a value in the DHT at the given key
    465         /// </summary>
    466         /// <param name="sKey">Key of DHT Entry</param>
    467         /// <param name="byteData">Value of DHT Entry</param>
     311            Initialized = false;
     312
     313            LogToMonitor("CrypP2P left the system.");
     314        }
     315
     316        #endregion
     317
     318        #region Synchronous Methods + their Callbacks
     319
     320        /// <summary>
     321        ///   Stores a value in the DHT at the given key
     322        /// </summary>
     323        /// <param name = "key">Key of DHT Entry</param>
     324        /// <param name = "data">Value of DHT Entry</param>
    468325        /// <returns>True, when storing is completed!</returns>
    469         public bool SynchStore(string sKey, byte[] byteData)
    470         {
    471             LogToMonitor("Begin: SynchStore. Key: " + sKey + ", " + byteData.Length + " bytes");
    472             AutoResetEvent are = new AutoResetEvent(false);
    473             // this method returns always a GUID to distinguish between asynchronous actions
    474             Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
    475 
    476             ResponseWait rw = new ResponseWait() { WaitHandle = are, key = sKey, value = byteData };
    477 
    478             waitDict.Add(g, rw);
    479             //blocking till response
    480             are.WaitOne();
    481 
    482             LogToMonitor("End: SynchStore. Key: " + sKey + ". Success: " + rw.success.ToString());
    483 
    484             return rw.success;
    485         }
    486 
    487         /// <summary>
    488         /// Stores a value in the DHT at the given key
    489         /// </summary>
    490         /// <param name="sKey">Key of DHT Entry</param>
    491         /// <param name="sValue">Value of DHT Entry</param>
     326        public bool SynchStore(string key, byte[] data)
     327        {
     328            var autoResetEvent = new AutoResetEvent(false);
     329
     330            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
     331            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + data.Length + " bytes");
     332
     333            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key, value = data};
     334            _versionedDht.Store(OnSynchStoreCompleted, key, data, responseWait);
     335
     336            // blocking till response
     337            autoResetEvent.WaitOne();
     338
     339            LogToMonitor("End: SynchStore. Key: " + key + ". Success: " + responseWait.success);
     340
     341            return responseWait.success;
     342        }
     343
     344        /// <summary>
     345        ///   Stores a value in the DHT at the given key
     346        /// </summary>
     347        /// <param name = "key">Key of DHT Entry</param>
     348        /// <param name = "value">Value of DHT Entry</param>
    492349        /// <returns>True, when storing is completed!</returns>
    493         public bool SynchStore(string sKey, string sData)
    494         {
    495             return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
    496         }
    497         /// <summary>
    498         /// Callback for a the synchronized store method
    499         /// </summary>
    500         /// <param name="rr"></param>
    501         private void OnSynchStoreCompleted(StoreResult sr)
    502         {
    503             ResponseWait rw;
    504             if (this.waitDict.TryGetValue(sr.Guid, out rw))
    505             {
    506                 // if Status == Error, than the version of the value is out of date.
    507                 // There is a versioning system in the DHT. So you must retrieve the
    508                 // key and than store the new value
    509                 if (sr.Status == OperationStatus.Failure)
    510                 {
    511                     byte[] byteTemp = this.SynchRetrieve(rw.key);
    512 
    513                     // Only try a second time. When it's still not possible, abort storing
    514                     AutoResetEvent are = new AutoResetEvent(false);
    515                     Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
    516                     ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
    517 
    518                     waitDict.Add(g, rw2);
    519                     // blocking till response
    520                     are.WaitOne();
    521                     rw.success = rw2.success;
    522                     rw.Message = rw2.Message;
    523                 }
    524                 else
    525                 {
    526                     rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
    527                     if (sr.Status == OperationStatus.KeyNotFound)
    528                         rw.success = false;
    529                     else
    530                         rw.success = true;
    531                 }
    532             }
    533             //unblock WaitHandle in the synchronous method
    534             rw.WaitHandle.Set();
    535             // don't know if this accelerates the system...
    536             this.waitDict.Remove(sr.Guid);
    537         }
    538 
    539         private void OnSecondTrialStoring(StoreResult sr)
    540         {
    541             ResponseWait rw;
    542             if (this.waitDict.TryGetValue(sr.Guid, out rw))
    543             {
    544                 if (sr.Status == OperationStatus.Failure)
    545                 {
    546                     //Abort storing, because it's already the second trial
    547                     rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
    548                     rw.success = false;
    549                 }
    550                 else
    551                 {
    552                     //works the second trial, so it was the versioning system
    553                     rw.success = true;
    554                 }
    555             }
    556             //unblock WaitHandle in the synchronous method
    557             rw.WaitHandle.Set();
    558             // don't know if this accelerates the system...
    559             this.waitDict.Remove(sr.Guid);
    560         }
    561 
    562         #endregion
    563 
    564         /// <summary>
    565         /// Get the value of the given DHT Key or null, if it doesn't exist.
    566         /// For synchronous environments use the Synch* methods.
    567         /// </summary>
    568         /// <param name="sKey">Key of DHT Entry</param>
     350        public bool SynchStore(string key, string value)
     351        {
     352            return SynchStore(key, Encoding.UTF8.GetBytes(value));
     353        }
     354
     355        /// <summary>
     356        ///   Callback for a the synchronized store method
     357        /// </summary>
     358        /// <param name = "storeResult">retrieved data container</param>
     359        private static void OnSynchStoreCompleted(StoreResult storeResult)
     360        {
     361            var responseWait = storeResult.AsyncState as ResponseWait;
     362            if (responseWait == null)
     363            {
     364                LogToMonitor("Received OnSynchStoreCompleted, but ResponseWait object is missing. Discarding.");
     365                return;
     366            }
     367
     368            responseWait.success = storeResult.Status != OperationStatus.KeyNotFound;
     369            responseWait.Message = Encoding.UTF8.GetBytes(storeResult.Status.ToString());
     370
     371            // unblock WaitHandle in the synchronous method
     372            responseWait.WaitHandle.Set();
     373
     374            LogToMonitor("Received and handled OnSynchStoreCompleted.");
     375        }
     376
     377        /// <summary>
     378        ///   Get the value of the given DHT Key or null, if it doesn't exist.
     379        /// </summary>
     380        /// <param name = "key">Key of DHT Entry</param>
    569381        /// <returns>Value of DHT Entry</returns>
    570         public byte[] SynchRetrieve(string sKey)
    571         {
    572             LogToMonitor("ThreadId (P2PBase SynchRetrieve): " + Thread.CurrentThread.ManagedThreadId.ToString());
    573 
    574             AutoResetEvent are = new AutoResetEvent(false);
    575             // this method returns always a GUID to distinguish between asynchronous actions
    576 
    577             LogToMonitor("Begin: SynchRetrieve. Key: " + sKey);
    578 
    579             Guid g = this.dht.Retrieve(OnSynchRetrieveCompleted, sKey);
    580 
    581             ResponseWait rw = new ResponseWait() { WaitHandle = are };
    582 
    583             waitDict.Add(g, rw);
     382        public byte[] SynchRetrieve(string key)
     383        {
     384            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
     385
     386            var autoResetEvent = new AutoResetEvent(false);
     387            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
     388            _dht.Retrieve(OnSynchRetrieveCompleted, key, responseWait);
     389
    584390            // blocking till response
    585             are.WaitOne();
    586 
    587             LogToMonitor("End: SynchRetrieve. Key: " + sKey + ". Success: " + rw.success.ToString());
    588 
    589             //Rückgabe der Daten
    590             return rw.Message;
    591         }
    592 
    593         /// <summary>
    594         /// Callback for a the synchronized retrieval method
    595         /// </summary>
    596         /// <param name="rr"></param>
    597         private void OnSynchRetrieveCompleted(RetrieveResult rr)
    598         {
    599             LogToMonitor(rr.Guid.ToString());
    600 
    601             ResponseWait rw;
    602 
    603             LogToMonitor("ThreadId (P2PBase retrieve callback): " + Thread.CurrentThread.ManagedThreadId.ToString());
    604 
    605             if (this.waitDict.TryGetValue(rr.Guid, out rw))
    606             {
    607                 // successful as long as no error occured
    608                 rw.success = true;
    609                 if (rr.Status == OperationStatus.Failure)
    610                 {
    611                     rw.Message = null;
    612                     rw.success = false;
    613                 }
    614                 else if (rr.Status == OperationStatus.KeyNotFound)
    615                     rw.Message = null;
    616                 else
    617                     rw.Message = rr.Data;
    618 
    619                 //unblock WaitHandle in the synchronous method
    620                 rw.WaitHandle.Set();
    621                 // don't know if this accelerates the system...
    622                 this.waitDict.Remove(rr.Guid);
    623             }
    624         }
    625         /// <summary>
    626         /// Removes a key/value pair out of the DHT
    627         /// </summary>
    628         /// <param name="sKey">Key of the DHT Entry</param>
     391            autoResetEvent.WaitOne();
     392
     393            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Success: " + responseWait.success);
     394
     395            return responseWait.Message;
     396        }
     397
     398        /// <summary>
     399        ///   Callback for a the synchronized retrieval method
     400        /// </summary>
     401        /// <param name = "retrieveResult"></param>
     402        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
     403        {
     404            var responseWait = retrieveResult.AsyncState as ResponseWait;
     405            if (responseWait == null)
     406            {
     407                LogToMonitor("Received OnSynchRetrieveCompleted, but ResponseWait object is missing. Discarding.");
     408                return;
     409            }
     410
     411            LogToMonitor("Received retrieve callback, local ThreadId: " + Thread.CurrentThread.ManagedThreadId);
     412
     413            switch (retrieveResult.Status)
     414            {
     415                case OperationStatus.Success:
     416                    responseWait.success = true;
     417                    responseWait.Message = retrieveResult.Data;
     418                    break;
     419                case OperationStatus.KeyNotFound:
     420                    responseWait.success = true;
     421                    responseWait.Message = null;
     422                    break;
     423                default:
     424                    responseWait.success = false;
     425                    responseWait.Message = null;
     426                    break;
     427            }
     428
     429            // unblock WaitHandle in the synchronous method
     430            responseWait.WaitHandle.Set();
     431
     432            LogToMonitor("Received and handled OnSynchRetrieveCompleted.");
     433        }
     434
     435        /// <summary>
     436        ///   Removes a key/value pair out of the DHT
     437        /// </summary>
     438        /// <param name = "key">Key of the DHT Entry</param>
    629439        /// <returns>True, when removing is completed!</returns>
    630         public bool SynchRemove(string sKey)
    631         {
    632             LogToMonitor("Begin SynchRemove. Key: " + sKey);
    633 
    634             AutoResetEvent are = new AutoResetEvent(false);
    635             // this method returns always a GUID to distinguish between asynchronous actions
    636             Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
    637             //Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey, IGNORE_DHT_VERSIONING_SYSTEM);
    638 
    639             ResponseWait rw = new ResponseWait() { WaitHandle = are };
    640 
    641             waitDict.Add(g, rw);
     440        public bool SynchRemove(string key)
     441        {
     442            LogToMonitor("Begin SynchRemove. Key: " + key);
     443
     444            var autoResetEvent = new AutoResetEvent(false);
     445            var responseWait = new ResponseWait {WaitHandle = autoResetEvent, key = key};
     446            _versionedDht.Remove(OnSynchRemoveCompleted, key, responseWait);
     447
    642448            // blocking till response
    643             are.WaitOne();
    644 
    645             LogToMonitor("Ended SynchRemove. Key: " + sKey + ". Success: " + rw.success.ToString());
    646 
    647             return rw.success;
    648         }
    649 
    650         /// <summary>
    651         /// Callback for a the synchronized remove method
    652         /// </summary>
    653         /// <param name="rr"></param>
    654         private void OnSynchRemoveCompleted(RemoveResult rr)
    655         {
    656             ResponseWait rw;
    657             if (this.waitDict.TryGetValue(rr.Guid, out rw))
    658             {
    659                 rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
    660 
    661                 if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
    662                     rw.success = false;
    663                 else
    664                     rw.success = true;
    665 
    666                 //unblock WaitHandle in the synchronous method
    667                 rw.WaitHandle.Set();
    668                 // don't know if this accelerates the system...
    669                 this.waitDict.Remove(rr.Guid);
    670             }
    671         }
    672 
    673         #endregion
    674 
    675         /// <summary>
    676         /// To log the internal state in the Monitoring Software of P@play
     449            autoResetEvent.WaitOne();
     450
     451            LogToMonitor("End: SynchRemove. Key: " + key + ". Success: " + responseWait.success);
     452
     453            return responseWait.success;
     454        }
     455
     456        /// <summary>
     457        ///   Callback for a the synchronized remove method
     458        /// </summary>
     459        /// <param name = "removeResult"></param>
     460        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
     461        {
     462            var responseWait = removeResult.AsyncState as ResponseWait;
     463            if (responseWait == null)
     464            {
     465                LogToMonitor("Received OnSynchRemoveCompleted, but ResponseWait object is missing. Discarding.");
     466                return;
     467            }
     468
     469            responseWait.success = removeResult.Status == OperationStatus.Success;
     470            responseWait.Message = Encoding.UTF8.GetBytes(removeResult.Status.ToString());
     471
     472            // unblock WaitHandle in the synchronous method
     473            responseWait.WaitHandle.Set();
     474
     475            LogToMonitor("Received and handled OnSynchRemoveCompleted.");
     476        }
     477
     478        #endregion
     479
     480        #region Log facility
     481
     482        /// <summary>
     483        ///   To log the internal state in the Monitoring Software of P@play
    677484        /// </summary>
    678485        public void LogInternalState()
    679486        {
    680             if (this.dht != null)
    681             {
    682                 this.dht.LogInternalState();
    683             }
    684         }
    685 
    686         public void LogToMonitor(string sTextToLog)
    687         {
    688             if (AllowLoggingToMonitor)
     487            if (_dht != null)
     488            {
     489                _dht.LogInternalState();
     490            }
     491        }
     492
     493        private static void LogToMonitor(string sTextToLog)
     494        {
     495            if (P2PSettings.Default.Log2Monitor)
    689496                Log.Debug(sTextToLog);
    690497        }
    691498
     499        #endregion
    692500    }
    693501}
Note: See TracChangeset for help on using the changeset viewer.