Ignore:
Timestamp:
Nov 17, 2009, 1:05:15 PM (12 years ago)
Author:
arnold
Message:

Buggy P2P version

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs

    r836 r862  
    3535using PeersAtPlay;
    3636
    37 /*
    38  * Synchronous functions successfully tested (store, retrieve)
    39  * !!! remove-Function is faulty - open field !!!
     37/* - Synchronous functions successfully tested (store, retrieve)
     38 * - !!! remove-Function is faulty - open field !!!
     39 * - The DHT has an integrated versioning system. When a peer wants
     40 *   to store data in an entry, which already holds data, the version
     41 *   number will be compared with the peers' version number. If the
     42 *   peer hasn't read/write the entry the last time, the storing instruction
     43 *   will be rejected. You must first read the actual data and than you can
     44 *   store your data in this entry...
     45 *
     46 * INFO:
     47 * - Have considered the DHT-own versioning system in the SynchStore method.
     48 *   If this versioning system will be abolished, the SynchStore method must
     49 *   be change!
    4050 *
    4151 * TODO:
     
    6979        public event SystemLeft OnSystemLeft;
    7080
    71         public delegate void P2PMessageReceived(byte[] byteSourceAddr, string sData);
     81        public delegate void P2PMessageReceived(string sSourceAddr, string sData);
    7282        public event P2PMessageReceived OnP2PMessageReceived;
    7383
     
    92102
    93103        #region Variables
     104
     105        private bool initialized = false;
     106        public bool Initialized
     107        {
     108            get { return this.initialized; }
     109            set { this.initialized = value; }
     110        }
    94111
    95112        private IDHT dht;
     
    196213        }
    197214
    198         /*TESTING AREA - COMPLETELY STOP THE WHOLE P2P SYSTEM*/
    199215        /// <summary>
    200216        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
     
    206222            {
    207223                this.dht.BeginStop(null);
    208                 //wait till systemLeft Event is invoked
    209224                this.overlay.BeginStop(null);
    210225                this.linkmanager.BeginStop(null);
    211226                this.bootstrapper.Dispose();
     227                //wait till systemLeft Event is invoked
    212228                this.systemLeft.WaitOne();
    213229            }
     
    249265        /// </summary>
    250266        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
    251         /// <returns>PeerID as an byte array (suitable for correct addressing on the overlay)</returns>
    252         public byte[] GetPeerID(out string sPeerName)
     267        /// <returns>PeerID as a String</returns>
     268        public string GetPeerID(out string sPeerName)
    253269        {
    254270            sPeerName = this.linkmanager.UserName;
    255             return this.overlay.LocalAddress.ToByteArray();
     271            return this.overlay.LocalAddress.ToString();
     272            // return this.overlay.LocalAddress.ToByteArray();
    256273        }
    257274
     
    271288        }
    272289
     290        public void SendToPeer(string sData, string sDestinationPeerId)
     291        {
     292            // necessary because the overlay.GetAddress(string)-method of PAP is very buggy.
     293            // It doesn't cast the string- to a Byte-Address, but returns the own address...
     294            string[] sSplitted = sDestinationPeerId.Split(new char[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
     295            byte[] byteDestinationAddress = new byte[sSplitted.Length];
     296            for (int i = 0; i < byteDestinationAddress.Length; i++)
     297            {
     298                byteDestinationAddress[i] = System.Convert.ToByte(sSplitted[i]);
     299            }
     300            SendToPeer(sData, byteDestinationAddress);
     301        }
     302
     303        public void SendToPeer(PubSubMessageType msgType, string sDestinationAddress)
     304        {
     305            SendToPeer(((int)msgType).ToString(), sDestinationAddress);
     306        }
     307
    273308        #region Event Handling (System Joined, Left and Message Received)
    274309
     
    278313                OnSystemJoined();
    279314            this.systemJoined.Set();
     315            Initialized = true;
    280316        }
    281317
     
    287323            this.dht = null;
    288324            this.systemLeft.Set();
     325            Initialized = false;
    289326        }
    290327
     
    292329        {
    293330            if (OnP2PMessageReceived != null)
    294                 OnP2PMessageReceived(e.Message.Source.ToByteArray(), e.Message.Data.PopUTF8String());
     331                OnP2PMessageReceived(e.Message.Source.ToString(), e.Message.Data.PopUTF8String());
    295332        }
    296333
     
    298335        {
    299336            if (OnP2PMessageReceived != null)
    300                 OnP2PMessageReceived(e.Source.ToByteArray(), e.Data.PopUTF8String());
     337                OnP2PMessageReceived(e.Source.ToString(), e.Data.PopUTF8String());
    301338        }
    302339
    303340        #endregion
    304341
    305         /*
    306          * Attention: The asynchronous methods are not tested at the moment
    307          */
     342        /* Attention: The asynchronous methods are not tested at the moment */
    308343        #region Asynchronous Methods incl. Callbacks
    309344
     
    372407        #region Synchronous Methods incl. Callbacks
    373408
     409        #region SynchStore incl.Callback and SecondTrialCallback
     410
     411        /* The DHT has an integrated VERSIONING SYSTEM. When a peer wants
     412         * to store data in an entry, which already holds data, the version
     413         * number will be compared with the peers' version number. If the
     414         * peer hasn't read/write the entry the last time, the storing instruction
     415         * will be rejected. You must first read the actual data and than you can
     416         * store your data in this entry... */
    374417        /// <summary>
    375418        /// Stores a value in the DHT at the given key
     
    384427            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
    385428
    386             ResponseWait rw = new ResponseWait() { WaitHandle = are };
     429            ResponseWait rw = new ResponseWait() { WaitHandle = are, key=sKey , value = byteData };
    387430
    388431            waitDict.Add(g, rw);
    389432            //blocking till response
    390433            are.WaitOne();
    391             return true;
     434            return rw.success;
    392435        }
    393436
     
    402445            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
    403446        }
     447        /// <summary>
     448        /// Callback for a the synchronized store method
     449        /// </summary>
     450        /// <param name="rr"></param>
     451        private void OnSynchStoreCompleted(StoreResult sr)
     452        {
     453            ResponseWait rw;
     454            if (this.waitDict.TryGetValue(sr.Guid, out rw))
     455            {
     456                // if Status == Error, than the version of the value is out of date.
     457                // There is a versioning system in the DHT. So you must retrieve the
     458                // key and than store the new value --> that's it, but much traffic.
     459                // to be fixed in a few weeks from M.Helling
     460                if (sr.Status == OperationStatus.Failure)
     461                {
     462                    byte[] byteTemp = this.SynchRetrieve(rw.key);
     463
     464                    // Only try a second time. When it's still not possible, abort storing
     465                    AutoResetEvent are = new AutoResetEvent(false);
     466                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
     467                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
     468
     469                    waitDict.Add(g, rw2);
     470                    // blocking till response
     471                    are.WaitOne();
     472                    rw.success = rw2.success;
     473                    rw.Message = rw2.Message;
     474                }
     475                else
     476                {
     477                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
     478                    if (sr.Status == OperationStatus.KeyNotFound)
     479                        rw.success = false;
     480                    else
     481                        rw.success = true;
     482                }
     483            }
     484            //unblock WaitHandle in the synchronous method
     485            rw.WaitHandle.Set();
     486            // don't know if this accelerates the system...
     487            this.waitDict.Remove(sr.Guid);
     488        }
     489
     490        private void OnSecondTrialStoring(StoreResult sr)
     491        {
     492            ResponseWait rw;
     493            if (this.waitDict.TryGetValue(sr.Guid, out rw))
     494            {
     495                if (sr.Status == OperationStatus.Failure)
     496                {
     497                    //Abort storing, because it's already the second trial
     498                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
     499                    rw.success = false;
     500                }
     501                else
     502                {
     503                    //works the second trial, so it was the versioning system
     504                    rw.success = true;
     505                }
     506            }
     507            //unblock WaitHandle in the synchronous method
     508            rw.WaitHandle.Set();
     509            // don't know if this accelerates the system...
     510            this.waitDict.Remove(sr.Guid);
     511        }
     512
     513        #endregion
    404514
    405515        /// <summary>
     
    425535
    426536        /// <summary>
    427         /// Removes a key/value pair out of the DHT
    428         /// </summary>
    429         /// <param name="sKey">Key of the DHT Entry</param>
    430         /// <returns>True, when removing is completed!</returns>
    431         public bool SynchRemove(string sKey)
    432         {
    433             AutoResetEvent are = new AutoResetEvent(false);
    434             // this method returns always a GUID to distinguish between asynchronous actions
    435 
    436             // ROAD WORKS: This function throws an error (ArgumentNotNullException).
    437             //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
    438             Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
    439 
    440             ResponseWait rw = new ResponseWait() { WaitHandle = are };
    441 
    442             waitDict.Add(g, rw);
    443             // blocking till response
    444             are.WaitOne();
    445             return true;
    446         }
    447 
    448         /// <summary>
    449         /// Callback for a the synchronized store method
    450         /// </summary>
    451         /// <param name="rr"></param>
    452         private void OnSynchStoreCompleted(StoreResult sr)
    453         {
    454             ResponseWait rw;
    455             if (this.waitDict.TryGetValue(sr.Guid, out rw))
    456             {
    457                 // if Status == Error, than the version of the value is out of date.
    458                 // There is a versioning system in the DHT. So you must retrieve the
    459                 // key and than store the new value --> that's it, but much traffic.
    460                 // to be fixed in a few weeks from M.Helling
    461                 rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
    462 
    463                 //unblock WaitHandle in the synchronous method
    464                 rw.WaitHandle.Set();
    465                 // don't know if this accelerates the system...
    466                 this.waitDict.Remove(sr.Guid);
    467             }
    468         }
    469 
    470         /// <summary>
    471537        /// Callback for a the synchronized retrieval method
    472538        /// </summary>
     
    478544            if (this.waitDict.TryGetValue(rr.Guid, out rw))
    479545            {
    480                 rw.Message = rr.Data;
    481 
    482                 //unblock WaitHandle in the synchronous method
    483                 rw.WaitHandle.Set();
    484                 // don't know if this accelerates the system...
    485                 this.waitDict.Remove(rr.Guid);
    486             }
    487         }
    488 
    489         /// <summary>
    490         /// Callback for a the synchronized remove method
    491         /// </summary>
    492         /// <param name="rr"></param>
    493         private void OnSynchRemoveCompleted(RemoveResult rr)
    494         {
    495             ResponseWait rw;
    496             if (this.waitDict.TryGetValue(rr.Guid, out rw))
    497             {
    498                 rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
     546                // successful as long as no error occured
     547                rw.success = true;
     548                if (rr.Status == OperationStatus.Failure)
     549                {
     550                    rw.Message = null;
     551                    rw.success = false;
     552                }
     553                else if (rr.Status == OperationStatus.KeyNotFound)
     554                    rw.Message = null;
     555                else
     556                    rw.Message = rr.Data;
    499557
    500558                //unblock WaitHandle in the synchronous method
     
    504562            }
    505563        }
     564        /// <summary>
     565        /// Removes a key/value pair out of the DHT
     566        /// </summary>
     567        /// <param name="sKey">Key of the DHT Entry</param>
     568        /// <returns>True, when removing is completed!</returns>
     569        public bool SynchRemove(string sKey)
     570        {
     571            AutoResetEvent are = new AutoResetEvent(false);
     572            // this method returns always a GUID to distinguish between asynchronous actions
     573
     574            // ROAD WORKS: This function throws an error (ArgumentNotNullException).
     575            //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
     576            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
     577
     578            ResponseWait rw = new ResponseWait() { WaitHandle = are };
     579
     580            waitDict.Add(g, rw);
     581            // blocking till response
     582            are.WaitOne();
     583            return rw.success;
     584        }
     585
     586        /// <summary>
     587        /// Callback for a the synchronized remove method
     588        /// </summary>
     589        /// <param name="rr"></param>
     590        private void OnSynchRemoveCompleted(RemoveResult rr)
     591        {
     592            ResponseWait rw;
     593            if (this.waitDict.TryGetValue(rr.Guid, out rw))
     594            {
     595                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
     596
     597                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
     598                    rw.success = false;
     599                else
     600                    rw.success = true;
     601
     602                //unblock WaitHandle in the synchronous method
     603                rw.WaitHandle.Set();
     604                // don't know if this accelerates the system...
     605                this.waitDict.Remove(rr.Guid);
     606            }
     607        }
    506608
    507609        #endregion
Note: See TracChangeset for help on using the changeset viewer.