Ignore:
Timestamp:
Nov 10, 2009, 1:02:33 PM (12 years ago)
Author:
arnold
Message:

P2P Publish/Subscriber (buggy, aber für Präsentationszwecke geeignet)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs

    r813 r836  
    3333using System.ComponentModel;
    3434using Cryptool.PluginBase.IO;
     35using PeersAtPlay;
    3536
    3637/*
    3738 * Synchronous functions successfully tested (store, retrieve)
    38  * !!! remove-Function is faulty !!!
    39  * Standard connection for test issues is LocalMachineBootstrapper.
    40  * IrcBootstrapper also works, but it's to lame for testing issues!
     39 * !!! remove-Function is faulty - open field !!!
    4140 *
    4241 * TODO:
    43  * - Add enums (BS,LinkManager,Overlay) to class - make it public to use in cross-cutting classes
    4442 * - dht.Remove-Method makes problems... "ArgumentNotNullException"
    4543 *   event though the Parameter is correctly set to a valid value!
     
    4947namespace Cryptool.Plugins.PeerToPeer
    5048{
     49    /* Advantages of this wrapper class:
     50     * - The PeerAtPlay-Libraries are only referenced in this project
     51     *   --> so they're easy to update
     52     * - PeerAtPlay only works with asynchronous methods, so this class
     53     *   "synchronizes" this methods.
     54     * - The PeerToPeer-Layers are unimportant for CT2-Developers, so this
     55     *   issue is obfuscated by this wrapper class
     56     */
    5157    /// <summary>
    5258    /// Wrapper class to integrate peer@play environment into CrypTool.
     
    6369        public event SystemLeft OnSystemLeft;
    6470
    65         public delegate void P2PMessageReceived(string sMsg);
     71        public delegate void P2PMessageReceived(byte[] byteSourceAddr, string sData);
    6672        public event P2PMessageReceived OnP2PMessageReceived;
    6773
    6874        /// <summary>
    69         /// returns true if key-value-pair was successfully stored in the DHT
     75        /// returns true if key-value-pair is successfully stored in the DHT
    7076        /// </summary>
    7177        /// <param name="result"></param>
     
    120126        /// <param name="overlayType"></param>
    121127        /// <param name="dhtType"></param>
    122         public void InitializeAll(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
     128        public void Initialize(string sUserName, string sWorldName, P2PLinkManagerType linkManagerType, P2PBootstrapperType bsType, P2POverlayType overlayType, P2PDHTType dhtType)
    123129        {
    124130            #region Setting LinkManager, Bootstrapper, Overlay and DHT to the specified types
     
    131137                default:
    132138                    throw (new NotImplementedException());
    133                     break;
    134139            }
    135140            switch (bsType)
     
    144149                default:
    145150                    throw (new NotImplementedException());
    146                     break;
    147151            }
    148152            switch (overlayType)
     
    154158                default:
    155159                    throw (new NotImplementedException());
    156                     break;
    157160            }
    158161            switch (dhtType)
     
    163166                default:
    164167                    throw (new NotImplementedException());
    165                     break;
    166168            }
    167169            #endregion
     
    170172
    171173            this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
    172             this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
    173             this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
    174         }
    175 
    176         /// <summary>
    177         /// Initializing is the first step to build a new or access an existing p2p network
    178         /// </summary>
    179         /// <param name="sUserName">Choose an individual name for the user</param>
    180         /// <param name="sWorldName">fundamental: two peers are only in the SAME P2P system, when they initialized the SAME WORLD!</param>
    181         public void Initialize(string sUserName, string sWorldName)
    182         {
    183             //snal = secure network abstraction layer
    184             this.linkmanager = new Snal();
    185             //LocalMachineBootstrapper = only local connection (runs only on one machine)
    186             //for more machines use this line:
    187            
    188             this.bootstrapper = new IrcBootstrapper();
    189             //this.bootstrapper = new LocalMachineBootstrapper();
    190 
    191             // changing overlay example: this.overlay = new ChordOverlay();
    192             this.overlay = new FullMeshOverlay();
    193             //changing overlay example: this.overlay = new ExampleDHT();
    194             this.dht = new FullMeshDHT();
    195 
    196             this.dht.Initialize(sUserName, sWorldName, this.overlay, this.bootstrapper, this.linkmanager, null);
    197 
    198             this.dht.MessageReceived += new EventHandler<MessageReceived>(OnDHT_MessageReceived);
     174            this.overlay.MessageReceived += new EventHandler<OverlayMessageEventArgs>(overlay_MessageReceived);
    199175            this.dht.SystemJoined += new EventHandler(OnDHT_SystemJoined);
    200176            this.dht.SystemLeft += new EventHandler(OnDHT_SystemLeft);
     
    220196        }
    221197
     198        /*TESTING AREA - COMPLETELY STOP THE WHOLE P2P SYSTEM*/
    222199        /// <summary>
    223200        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
     
    230207                this.dht.BeginStop(null);
    231208                //wait till systemLeft Event is invoked
     209                this.overlay.BeginStop(null);
     210                this.linkmanager.BeginStop(null);
     211                this.bootstrapper.Dispose();
    232212                this.systemLeft.WaitOne();
    233213            }
     
    265245        #endregion
    266246
    267         // not tested and not sure that this function return the right PeerName...
    268247        /// <summary>
    269248        /// Get PeerName of the actual peer
    270249        /// </summary>
    271         /// <returns>Peer name of the actual peer</returns>
    272         public string GetPeerName()
    273         {
    274             return this.overlay.LocalAddress.ToString();
     250        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
     251        /// <returns>PeerID as an byte array (suitable for correct addressing on the overlay)</returns>
     252        public byte[] GetPeerID(out string sPeerName)
     253        {
     254            sPeerName = this.linkmanager.UserName;
     255            return this.overlay.LocalAddress.ToByteArray();
     256        }
     257
     258        // overlay.LocalAddress = Overlay-Peer-Address/Names
     259        public void SendToPeer(string sData, byte[] byteDestinationPeerName)
     260        {
     261            ByteStack byteData = new ByteStack();
     262            byteData.PushUTF8String(sData);
     263
     264            OverlayAddress destinationAddr = this.overlay.GetAddress(byteDestinationPeerName);
     265
     266            // create own message receiver type... => P2PBase, so only this Application
     267            // receives Messages and not all active application on the SAME overlay!
     268            OverlayMessage overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase,
     269                this.overlay.LocalAddress,destinationAddr, byteData);
     270            this.overlay.Send(overlayMsg);
    275271        }
    276272
     
    293289        }
    294290
     291        private void overlay_MessageReceived(object sender, OverlayMessageEventArgs e)
     292        {
     293            if (OnP2PMessageReceived != null)
     294                OnP2PMessageReceived(e.Message.Source.ToByteArray(), e.Message.Data.PopUTF8String());
     295        }
     296
    295297        private void OnDHT_MessageReceived(object sender, MessageReceived e)
    296298        {
    297299            if (OnP2PMessageReceived != null)
    298                 OnP2PMessageReceived("Source: " + e.Source + ", Data:" + e.Data);
     300                OnP2PMessageReceived(e.Source.ToByteArray(), e.Data.PopUTF8String());
    299301        }
    300302
     
    374376        /// </summary>
    375377        /// <param name="sKey">Key of DHT Entry</param>
    376         /// <param name="sValue">Value of DHT Entry</param>
     378        /// <param name="byteData">Value of DHT Entry</param>
    377379        /// <returns>True, when storing is completed!</returns>
    378         public bool SynchStore(string sKey, string sValue)
     380        public bool SynchStore(string sKey, byte[] byteData)
    379381        {
    380382            AutoResetEvent are = new AutoResetEvent(false);
    381383            // this method returns always a GUID to distinguish between asynchronous actions
    382             Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, UTF8Encoding.UTF8.GetBytes(sValue));
     384            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
    383385
    384386            ResponseWait rw = new ResponseWait() { WaitHandle = are };
     
    388390            are.WaitOne();
    389391            return true;
     392        }
     393
     394        /// <summary>
     395        /// Stores a value in the DHT at the given key
     396        /// </summary>
     397        /// <param name="sKey">Key of DHT Entry</param>
     398        /// <param name="sValue">Value of DHT Entry</param>
     399        /// <returns>True, when storing is completed!</returns>
     400        public bool SynchStore(string sKey, string sData)
     401        {
     402            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
    390403        }
    391404
     
    442455            if (this.waitDict.TryGetValue(sr.Guid, out rw))
    443456            {
     457                // if Status == Error, than the version of the value is out of date.
     458                // There is a versioning system in the DHT. So you must retrieve the
     459                // key and than store the new value --> that's it, but much traffic.
     460                // to be fixed in a few weeks from M.Helling
    444461                rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
    445462
Note: See TracChangeset for help on using the changeset viewer.