Changeset 862


Ignore:
Timestamp:
Nov 17, 2009, 1:05:15 PM (12 years ago)
Author:
arnold
Message:

Buggy P2P version

Location:
trunk/CrypPlugins
Files:
51 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerBase/IP2PControl.cs

    r836 r862  
    3434    public enum PubSubMessageType
    3535    {
     36        /// <summary>
     37        /// To register the subscriber with the publisher
     38        /// </summary>
    3639        Register = 0,
    37         Alive = 1,
    38         Ping = 2,
    39         Pong = 3
     40        /// <summary>
     41        /// adequate response to a subscriber-sided
     42        /// registering message
     43        /// </summary>
     44        RegisteringAccepted = 1,
     45        /// <summary>
     46        /// when peer wants to leave the publish/subscriber union
     47        /// </summary>
     48        Unregister = 2,
     49        /// <summary>
     50        /// To signalize the publisher that subscriber is still online/alive
     51        /// </summary>
     52        Alive = 3,
     53        /// <summary>
     54        /// active liveliness-request, the other side
     55        /// must respond with a pong message
     56        /// </summary>
     57        Ping = 4,
     58        /// <summary>
     59        /// adequate response to a
     60        /// received ping message
     61        /// </summary>
     62        Pong = 5,
     63        /// <summary>
     64        /// subscriber sends this msg when solution was found
     65        /// </summary>
     66        Solution = 6,
     67        /// <summary>
     68        /// to immediately stop the subscribers work
     69        /// </summary>
     70        Stop = 7,
     71        /// <summary>
     72        /// because Enum is non-nullable, I used this workaround
     73        /// </summary>
     74        NULL = 666
    4075    }
    4176    #endregion
     
    4883        bool DHTremove(string sKey);
    4984
    50         byte[] GetPeerID(out string sPeerName);
    51 
    52         string ConvertPeerId(byte[] bytePeerId);
     85        string GetPeerID(out string sPeerName);
     86        //byte[] GetPeerID(out string sPeerName);
    5387
    5488        void SendToPeer(string sData, byte[] sDestinationPeerAddress);
     89        void SendToPeer(string sData, string sDestinationPeerAddress);
     90        void SendToPeer(PubSubMessageType msgType, string sDestinationAddress);
     91
     92        PubSubMessageType GetMsgType(string byteData);
    5593
    5694        event P2PBase.P2PMessageReceived OnPeerReceivedMsg;
  • trunk/CrypPlugins/PeerToPeerBase/P2PPeer.cs

    r836 r862  
    2525using Cryptool.PluginBase.IO;
    2626
     27
     28/*
     29 * TODO:
     30 * - Build a stable P2PPeer Version (only valid when initialized!)
     31 * - Integrate struct PeerId into functions and other classes,
     32 *   particularly P2PPublisherBase
     33 */
    2734namespace Cryptool.Plugins.PeerToPeer
    2835{
     
    4653        #endregion
    4754
     55        #region Standard functionality
     56
    4857        public P2PPeer()
    4958        {
     
    5160            // to forward event from overlay/dht MessageReceived-Event from P2PBase
    5261            this.p2pBase.OnP2PMessageReceived += new P2PBase.P2PMessageReceived(p2pBase_OnP2PMessageReceived);
    53             this.settings = new P2PPeerSettings(p2pBase);
     62            this.settings = new P2PPeerSettings(this);
    5463            this.settings.TaskPaneAttributeChanged += new TaskPaneAttributeChangedHandler(settings_TaskPaneAttributeChanged);
    5564            this.settings.OnPluginStatusChanged += new StatusChangedEventHandler(settings_OnPluginStatusChanged);
     
    5968        private void settings_OnPluginStatusChanged(IPlugin sender, StatusEventArgs args)
    6069        {
    61             if (OnPluginStatusChanged != null) OnPluginStatusChanged(this, args);
     70            if (OnPluginStatusChanged != null)
     71                OnPluginStatusChanged(this, args);
    6272        }
    6373
    6474        // to forward event from overlay/dht MessageReceived-Event from P2PBase
    65         private void p2pBase_OnP2PMessageReceived(byte[] byteSourceAddr, string sData)
     75        private void p2pBase_OnP2PMessageReceived(string sSourceAddr, string sData)
    6676        {
    6777            if (OnPeerMessageReceived != null)
    68                 OnPeerMessageReceived(byteSourceAddr, sData);
     78                OnPeerMessageReceived(sSourceAddr, sData);
    6979        }
    7080
     
    7383            //throw new NotImplementedException();
    7484        }
    75 
    76         #region IPlugin Members
    77 
    78         public event GuiLogNotificationEventHandler OnGuiLogNotificationOccured;
    79 
    80         public event PluginProgressChangedEventHandler OnPluginProgressChanged;
    8185
    8286        public ISettings Settings
     
    105109            else
    106110            {
    107                 if (!settings.PeerStarted)
    108                 {
    109                     // starts peer in the settings class and enables/disables Controlbuttons
    110                     this.settings.PeerStarted = true;
    111                     GuiLogMessage("Successfully joined the P2P System", NotificationLevel.Info);
    112                 }
     111                StartPeer();
    113112            }
    114113        }
     
    118117            // TODO: For future use copy functionality to Execute instead of PreExecute
    119118            //       so we don't need the workaround anymore!!!
    120             //if (!settings.PeerStarted)
    121             //{
    122             //    // starts peer in the settings class and enables/disables Controlbuttons
    123             //    this.settings.PeerStarted = true;
    124             //}
    125         }
    126 
    127         public void process(IP2PControl sender)
    128         {
    129             GuiLogMessage("P2P Peer method 'process' is executed, because status of P2PSlave has changed!", NotificationLevel.Debug);
     119            // StartPeer();
    130120        }
    131121
     
    148138        public void Dispose()
    149139        {
    150             //settings are already set to null in this Step...
    151             //unsolved design problem in CT2...
    152             //this.settings.PeerStopped = true;
    153             if (this.p2pBase != null)
    154             {
    155                 this.p2pBase.SynchStop();
    156                 this.p2pBase = null;
    157             }
    158         }
    159 
    160         #endregion
     140            StopPeer();
     141        }
     142
     143        #endregion
     144
     145        #region IPlugin Members
     146
     147        public event GuiLogNotificationEventHandler OnGuiLogNotificationOccured;
     148
     149        public event PluginProgressChangedEventHandler OnPluginProgressChanged;
     150
     151        #endregion
    161152
    162153        #region INotifyPropertyChanged Members
     
    195186                return this.p2pSlave;
    196187            }
    197             set
    198             {
    199                 if (this.p2pSlave != null)
    200                 {
    201                     this.p2pSlave.OnStatusChanged -= p2pSlave_OnStatusChanged;
    202                 }
    203 
    204                 this.p2pSlave.OnStatusChanged += new IControlStatusChangedEventHandler(p2pSlave_OnStatusChanged);
    205                 //Only when using asynchronous p2p-Start-method, add event handler for OnPeerJoinedCompletely
    206                 //this.p2pSlave.OnPeerJoinedCompletely += new PeerJoinedP2P(OnPeerJoinedCompletely);
    207 
    208                 if (this.p2pSlave != value)
    209                 {
    210                     this.p2pSlave = (P2PPeerMaster)value;
    211                     OnPropertyChanged("P2PControlSlave");
    212                 }
    213             }
    214         }
    215 
    216         void p2pSlave_OnStatusChanged(IControl sender, bool readyForExecution)
    217         {
    218             if (readyForExecution)
    219                 this.process((IP2PControl)sender);
    220         }
    221 
     188        }
     189
     190        #endregion
     191
     192        #region Start and Stop Peer
     193
     194        /// <summary>
     195        /// Status flag for starting and stopping peer only once.
     196        /// </summary>
     197        private bool peerStarted = false;
     198
     199        public void StartPeer()
     200        {
     201            if (!this.peerStarted)
     202            {
     203                if (this.p2pBase == null)
     204                {
     205                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Error);
     206                    GuiLogMessage("Starting Peer failed, because Base-Object is null.",NotificationLevel.Error);
     207                    return;
     208                }
     209
     210                this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Connecting);
     211
     212                this.p2pBase.Initialize(this.settings.P2PPeerName, this.settings.P2PWorldName,
     213                    (P2PLinkManagerType)this.settings.P2PLinkMngrType, (P2PBootstrapperType)this.settings.P2PBSType,
     214                    (P2POverlayType)this.settings.P2POverlType, (P2PDHTType)this.settings.P2PDhtType);
     215                this.peerStarted = this.p2pBase.SynchStart();
     216
     217                if (this.peerStarted)
     218                {
     219                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
     220                    GuiLogMessage("Successfully joined the P2P System", NotificationLevel.Info);
     221                }
     222                else
     223                {
     224                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Error);
     225                    GuiLogMessage("Joining to P2P System failed!", NotificationLevel.Error);
     226                }
     227            }
     228            else
     229            {
     230                GuiLogMessage("Peer is already started!", NotificationLevel.Warning);
     231            }
     232        }
     233
     234        public void StopPeer()
     235        {
     236            if (this.peerStarted && this.p2pBase != null)
     237            {
     238                this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Connecting);
     239
     240                this.peerStarted = !this.p2pBase.SynchStop();
     241
     242                if (this.peerStarted)
     243                {
     244                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
     245                    GuiLogMessage("Peer stopped: " + !this.peerStarted, NotificationLevel.Warning);
     246                }
     247                else
     248                {
     249                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.NotConnected);
     250                    GuiLogMessage("Peer stopped: " + !this.peerStarted, NotificationLevel.Info);
     251                }
     252            }
     253            else
     254            {
     255                GuiLogMessage("Peer is already stopped!", NotificationLevel.Warning);
     256            }
     257        }
     258
     259        public void LogInternalState()
     260        {
     261            if(this.p2pBase != null)
     262                this.p2pBase.LogInternalState();
     263        }
    222264        #endregion
    223265    }
     
    226268    {
    227269        private P2PPeer p2pPeer;
    228         private byte[] bytePeerID;
    229270        private string sPeerID;
    230271        private string sPeerName;
    231272
     273        /* Required because comparing a byte-array is inefficient */
     274        // TODO: Previously unused!
     275        public struct PeerId
     276        {
     277            string stringId;
     278            byte[] byteId;
     279        }
     280
    232281        public P2PPeerMaster(P2PPeer p2pPeer)
    233282        {
    234283            this.p2pPeer = p2pPeer;
     284            //if (this.p2pPeer.p2pBase == null && this.p2pPeer.p2pBase.Initialized)
     285            //{
     286            //    throw (new Exception("P2PBase isn't completely initialized!"));
     287            //}
    235288            // to forward event from overlay/dht MessageReceived-Event from P2PBase
    236289            this.p2pPeer.OnPeerMessageReceived += new P2PBase.P2PMessageReceived(p2pPeer_OnPeerMessageReceived);
     
    242295        // to forward event from overlay/dht MessageReceived-Event from P2PBase
    243296        public event P2PBase.P2PMessageReceived OnPeerReceivedMsg;
    244         private void p2pPeer_OnPeerMessageReceived(byte[] byteSourceAddr, string sData)
     297        private void p2pPeer_OnPeerMessageReceived(string sSourceAddr, string sData)
    245298        {
    246299            if (OnPeerReceivedMsg != null)
    247                 OnPeerReceivedMsg(byteSourceAddr, sData);
     300                OnPeerReceivedMsg(sSourceAddr, sData);
    248301        }
    249302
     
    276329        public bool DHTremove(string sKey)
    277330        {
    278             // derzeit liegt wohl in peerq@play ein Fehler in der Methode...
    279             // erkennt den Übergabeparameter nicht an und wirft dann "ArgumentNotNullException"...
    280             // Problem an M.Helling und S.Holzapfel von p@p weitergegeben...
    281331            return this.p2pPeer.p2pBase.SynchRemove(sKey);
    282             //return false;
    283332        }
    284333
     
    288337        /// <param name="sPeerName">returns the Peer Name</param>
    289338        /// <returns>returns the Peer ID</returns>
    290         public byte[] GetPeerID(out string sPeerName)
    291         {
    292             if (this.bytePeerID == null)
    293             {
    294                 this.bytePeerID = this.p2pPeer.p2pBase.GetPeerID(out this.sPeerName);
    295                 this.sPeerID = ConvertPeerId(bytePeerID);
     339        public string GetPeerID(out string sPeerName)
     340        {
     341            if (this.sPeerID == null)
     342            {
     343                this.sPeerID = this.p2pPeer.p2pBase.GetPeerID(out this.sPeerName);
    296344            }
    297345            sPeerName = this.sPeerName;
    298             return this.bytePeerID;
    299         }
    300 
    301         public string ConvertPeerId(byte[] bytePeerId)
    302         {
    303             string sRet = String.Empty;
    304             for (int i = 0; i < bytePeerId.Length; i++)
    305             {
    306                 sRet += bytePeerId[i].ToString() + ":";
    307             }
    308             return sRet.Substring(0, sRet.Length - 1);
    309         }
    310 
     346            return this.sPeerID;
     347        }
     348
     349        public void SendToPeer(string sData, string sDestinationPeerAddress)
     350        {
     351            this.p2pPeer.p2pBase.SendToPeer(sData, sDestinationPeerAddress);
     352        }
    311353        public void SendToPeer(string sData, byte[] byteDestinationPeerAddress)
    312354        {
    313355            this.p2pPeer.p2pBase.SendToPeer(sData, byteDestinationPeerAddress);
    314356        }
     357        public void SendToPeer(PubSubMessageType msgType, string sDestinationAddress)
     358        {
     359            this.p2pPeer.p2pBase.SendToPeer(msgType, sDestinationAddress);
     360        }
     361
     362        /// <summary>
     363        /// Converts a string to the PubSubMessageType if possible. Otherwise return null.
     364        /// </summary>
     365        /// <param name="sData">Data</param>
     366        /// <returns>PubSubMessageType if possible. Otherwise null.</returns>
     367        public PubSubMessageType GetMsgType(string sData)
     368        {
     369            // Convert one byte data to PublishSubscribeMessageType-Enum
     370            int iMsg;
     371            if (sData.Trim().Length == 1 && Int32.TryParse(sData.Trim(), out iMsg))
     372            {
     373                return (PubSubMessageType)iMsg;
     374            }
     375            else
     376            {
     377                // because Enum is non-nullable, I used this workaround
     378                return PubSubMessageType.NULL;
     379            }
     380        }
    315381
    316382        #endregion
    317383    }
    318 
    319     //public class P2PPeerMaster : IP2PControl
    320     //{
    321     //    private P2PBase p2pBase;
    322 
    323     //    public P2PPeerMaster(P2PBase p2pBase)
    324     //    {
    325     //        this.p2pBase = p2pBase;
    326     //        this.OnPeerReceivedMsg += new P2PBase.P2PMessageReceived(P2PPeerMaster_OnPeerReceivedMsg);
    327     //        this.OnStatusChanged += new IControlStatusChangedEventHandler(P2PPeerMaster_OnStatusChanged);
    328     //    }
    329 
    330     //    #region Events and Event-Handling
    331 
    332     //    public event P2PBase.P2PMessageReceived OnPeerReceivedMsg;
    333     //    private void P2PPeerMaster_OnPeerReceivedMsg(byte[] byteSourceAddr, string sData)
    334     //    {
    335     //        if (OnPeerReceivedMsg != null)
    336     //            OnPeerReceivedMsg(byteSourceAddr, sData);
    337     //    }
    338 
    339     //    public event IControlStatusChangedEventHandler OnStatusChanged;
    340     //    private void P2PPeerMaster_OnStatusChanged(IControl sender, bool readyForExecution)
    341     //    {
    342     //        if (OnStatusChanged != null)
    343     //            OnStatusChanged(sender, readyForExecution);
    344     //    }
    345 
    346     //    #endregion
    347 
    348     //    #region IP2PControl Members
    349 
    350     //    public bool DHTstore(string sKey, string sValue)
    351     //    {
    352     //        return this.p2pBase.SynchStore(sKey, sValue);
    353     //    }
    354 
    355     //    public byte[] DHTload(string sKey)
    356     //    {
    357     //        return this.p2pBase.SynchRetrieve(sKey);
    358     //    }
    359 
    360     //    public bool DHTremove(string sKey)
    361     //    {
    362     //        // derzeit liegt wohl in peerq@play ein Fehler in der Methode...
    363     //        // erkennt den Übergabeparameter nicht an und wirft dann "ArgumentNotNullException"...
    364     //        // Problem an M.Helling und S.Holzapfel von p@p weitergegeben...
    365     //        return this.p2pBase.SynchRemove(sKey);
    366     //        //return false;
    367     //    }
    368 
    369     //    public byte[] GetPeerName()
    370     //    {
    371     //        return this.p2pBase.GetPeerName();
    372     //    }
    373 
    374     //    public void SendToPeer(string sData, byte[] byteDestinationPeerAddress)
    375     //    {
    376     //        this.p2pBase.SendToPeer(sData, byteDestinationPeerAddress);
    377     //    }
    378 
    379     //    #endregion
    380     //}
    381384}
  • trunk/CrypPlugins/PeerToPeerBase/P2PPeerSettings.cs

    r836 r862  
    1616
    1717        private bool hasChanges = false;
    18         private P2PBase p2pBase;
     18        private P2PPeer p2pPeer;
    1919
    2020        #region ISettings Members
     
    3636        #region taskPane
    3737
    38         public P2PPeerSettings (P2PBase p2pBase)
     38        public P2PPeerSettings (P2PPeer p2pPeer)
    3939            {
    4040            if(TaskPaneAttributeChanged != null)
    4141                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnStop", Visibility.Hidden)));
    42             this.p2pBase = p2pBase;
     42            this.p2pPeer = p2pPeer;
    4343            ChangePluginIcon(PeerStatus.NotConnected);
    4444            }
     
    4848        #region Start- and Stop-Buttons incl. functionality
    4949
    50         [TaskPane("Start", "Initializes and starts Peer", null, 3, false, DisplayLevel.Beginner, ControlType.Button)]
     50        public bool StartingPeer
     51        {
     52            set
     53            {
     54                if (value)
     55                {
     56                    BtnStart();
     57                }
     58            }
     59        }
     60
     61        [TaskPane("Start", "Initializes and starts Peer", null, 2, false, DisplayLevel.Beginner, ControlType.Button)]
    5162        public void BtnStart()
    5263        {
    53             PeerStarted = !this.peerStarted;
    54         }
    55 
    56         private bool peerStarted = false;
    57         /// <summary>
    58         /// If peer isn't started by clicking the Button, it will be started by setting to true
    59         /// </summary>
    60         public bool PeerStarted
    61         {
    62             get { return this.peerStarted; }
    63             set
    64             {
    65                 if (!this.peerStarted)
    66                 {
    67                     if (P2PPeerName != null && P2PWorldName != null)
    68                     {
    69                         ChangePluginIcon(PeerStatus.Connecting);
    70                         this.p2pBase.Initialize(P2PPeerName, P2PWorldName, (P2PLinkManagerType)P2PLinkMngrType,
    71                             (P2PBootstrapperType)P2PBSType, (P2POverlayType)P2POverlType, (P2PDHTType)P2PDhtType);
    72                         this.p2pBase.SynchStart();
    73                         TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnStart", Visibility.Collapsed)));
    74                         TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnStop", Visibility.Visible)));
    75                         ChangePluginIcon(PeerStatus.Online);
    76                     }
    77                     else
    78                     {
    79                         ChangePluginIcon(PeerStatus.Error);
    80                         // can not initialize Peer, because P2PUserName and/or P2PWorldName are missing
    81                         throw (new Exception("You must set P2PPeerName and/or P2PWorldName, otherwise starting the peer isn't possible"));
    82                     }
    83                 }
    84                 if (value != this.peerStarted)
    85                 {
    86                     this.peerStarted = value;
    87                     //use the private Var instead of the PeerStopped-Property, because you will run into a recursive loop!!!
    88                     this.peerStopped = !value;
    89                     OnPropertyChanged("PeerStarted");
    90                     HasChanges = true;
    91                 }
    92             }
    93         }
    94 
    95         [TaskPane("Stop", "Stops the Peer", null, 4, false, DisplayLevel.Beginner, ControlType.Button)]
     64            if (P2PPeerName != null && P2PWorldName != null)
     65            {
     66                this.p2pPeer.StartPeer();
     67            }
     68            else
     69            {
     70                PeerStatusChanged(PeerStatus.Error);
     71                // can not initialize Peer, because P2PUserName and/or P2PWorldName are missing
     72                throw (new Exception("You must set P2PPeerName and/or P2PWorldName, otherwise starting the peer isn't possible"));
     73            }
     74        }
     75
     76        [TaskPane("Stop", "Stops the Peer", null, 3, false, DisplayLevel.Beginner, ControlType.Button)]
    9677        public void BtnStop()
    9778        {
    98             PeerStopped = !this.peerStopped;
    99         }
    100 
    101         private bool peerStopped = true;
    102         /// <summary>
    103         /// if peer is already started, it will be stopped by setting to true
    104         /// </summary>
    105         public bool PeerStopped
    106         {
    107             get { return this.peerStopped; }
    108             set
    109             {
    110                 if (this.peerStarted)
    111                 {
    112                     ChangePluginIcon(PeerStatus.Connecting);
    113                     this.p2pBase.SynchStop();
    114                     TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnStart", Visibility.Visible)));
    115                     TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnStop", Visibility.Collapsed)));
    116                     ChangePluginIcon(PeerStatus.NotConnected);
    117                 }
    118                 if (value != this.peerStopped)
    119                 {
    120                     //don't use the PeerStarted-Property, because you will run into a recursive loop!!!
    121                     this.peerStarted = !value;
    122                     this.peerStopped = value;
    123                     OnPropertyChanged("PeerStopped");
    124                     HasChanges = true;
    125                 }
    126             }
     79            this.p2pPeer.StopPeer();
     80         
     81            OnPropertyChanged("PeerStopped");
     82            HasChanges = true;
     83        }
     84
     85        [TaskPane("Log internal state of peer", "Log internal state of peer", null, 4, false, DisplayLevel.Beginner, ControlType.Button)]
     86        public void BtnLogInternalState()
     87        {
     88            this.p2pPeer.LogInternalState();
    12789        }
    12890
     
    163125        }
    164126
    165         [TaskPane("Log internal state of peer", "Log internal state of peer", null, 2, false, DisplayLevel.Beginner, ControlType.Button)]
    166         public void btnTest()
    167         {
    168             this.p2pBase.LogInternalState();
    169         }
    170 
    171127        private P2PLinkManagerType p2pLinkManagerType = P2PLinkManagerType.Snal;
    172128        [TaskPane("LinkManager-Type", "Select the LinkManager-Type", "P2P Settings", 2, false, DisplayLevel.Beginner, ControlType.ComboBox, new string[] { "Snal" })]
     
    248204
    249205        // Index depends on icon-position in P2PPeer-Class properties
    250         private enum PeerStatus
     206        public enum PeerStatus
    251207        {
    252208            Connecting = 1,
     
    256212        }
    257213
     214        /// <summary>
     215        /// Changes icon of P2PPeer and visibility of the control buttons in settings
     216        /// </summary>
     217        /// <param name="peerStat"></param>
     218        public void PeerStatusChanged(PeerStatus peerStat)
     219        {
     220            ChangePluginIcon(peerStat);
     221            // Only set visibility in final states!
     222            switch (peerStat)
     223            {
     224                case PeerStatus.Online:
     225                    TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(
     226                        new TaskPaneAttribteContainer("BtnStart", Visibility.Collapsed)));
     227                    TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(
     228                        new TaskPaneAttribteContainer("BtnStop", Visibility.Visible)));
     229                    break;
     230                case PeerStatus.NotConnected:
     231                    TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(
     232                        new TaskPaneAttribteContainer("BtnStart", Visibility.Visible)));
     233                    TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(
     234                        new TaskPaneAttribteContainer("BtnStop", Visibility.Hidden)));
     235                    break;
     236                case PeerStatus.Error:
     237                case PeerStatus.Connecting:
     238                default:
     239                    break;
     240            }
     241        }
     242
    258243        public event StatusChangedEventHandler OnPluginStatusChanged;
    259244        private void ChangePluginIcon(PeerStatus peerStatus)
    260245        {
    261             if (OnPluginStatusChanged != null) OnPluginStatusChanged(null,
    262                 new StatusEventArgs(StatusChangedMode.ImageUpdate, (int)peerStatus));
     246            if (OnPluginStatusChanged != null)
     247                OnPluginStatusChanged(null, new StatusEventArgs(StatusChangedMode.ImageUpdate, (int)peerStatus));
    263248        }
    264249    }
  • trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs

    r836 r862  
    3535using PeersAtPlay;
    3636
    37 /*
    38  * Synchronous functions successfully tested (store, retrieve)
    39  * !!! remove-Function is faulty - open field !!!
     37/* - Synchronous functions successfully tested (store, retrieve)
     38 * - !!! remove-Function is faulty - open field !!!
     39 * - The DHT has an integrated versioning system. When a peer wants
     40 *   to store data in an entry, which already holds data, the version
     41 *   number will be compared with the peers' version number. If the
     42 *   peer hasn't read/write the entry the last time, the storing instruction
     43 *   will be rejected. You must first read the actual data and than you can
     44 *   store your data in this entry...
     45 *
     46 * INFO:
     47 * - Have considered the DHT-own versioning system in the SynchStore method.
     48 *   If this versioning system will be abolished, the SynchStore method must
     49 *   be change!
    4050 *
    4151 * TODO:
     
    6979        public event SystemLeft OnSystemLeft;
    7080
    71         public delegate void P2PMessageReceived(byte[] byteSourceAddr, string sData);
     81        public delegate void P2PMessageReceived(string sSourceAddr, string sData);
    7282        public event P2PMessageReceived OnP2PMessageReceived;
    7383
     
    92102
    93103        #region Variables
     104
     105        private bool initialized = false;
     106        public bool Initialized
     107        {
     108            get { return this.initialized; }
     109            set { this.initialized = value; }
     110        }
    94111
    95112        private IDHT dht;
     
    196213        }
    197214
    198         /*TESTING AREA - COMPLETELY STOP THE WHOLE P2P SYSTEM*/
    199215        /// <summary>
    200216        /// Disjoins the peer from the system. The P2P system survive while one peer is still in the network.
     
    206222            {
    207223                this.dht.BeginStop(null);
    208                 //wait till systemLeft Event is invoked
    209224                this.overlay.BeginStop(null);
    210225                this.linkmanager.BeginStop(null);
    211226                this.bootstrapper.Dispose();
     227                //wait till systemLeft Event is invoked
    212228                this.systemLeft.WaitOne();
    213229            }
     
    249265        /// </summary>
    250266        /// <param name="sPeerName">out: additional peer information UserName on LinkManager</param>
    251         /// <returns>PeerID as an byte array (suitable for correct addressing on the overlay)</returns>
    252         public byte[] GetPeerID(out string sPeerName)
     267        /// <returns>PeerID as a String</returns>
     268        public string GetPeerID(out string sPeerName)
    253269        {
    254270            sPeerName = this.linkmanager.UserName;
    255             return this.overlay.LocalAddress.ToByteArray();
     271            return this.overlay.LocalAddress.ToString();
     272            // return this.overlay.LocalAddress.ToByteArray();
    256273        }
    257274
     
    271288        }
    272289
     290        public void SendToPeer(string sData, string sDestinationPeerId)
     291        {
     292            // necessary because the overlay.GetAddress(string)-method of PAP is very buggy.
     293            // It doesn't cast the string- to a Byte-Address, but returns the own address...
     294            string[] sSplitted = sDestinationPeerId.Split(new char[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
     295            byte[] byteDestinationAddress = new byte[sSplitted.Length];
     296            for (int i = 0; i < byteDestinationAddress.Length; i++)
     297            {
     298                byteDestinationAddress[i] = System.Convert.ToByte(sSplitted[i]);
     299            }
     300            SendToPeer(sData, byteDestinationAddress);
     301        }
     302
     303        public void SendToPeer(PubSubMessageType msgType, string sDestinationAddress)
     304        {
     305            SendToPeer(((int)msgType).ToString(), sDestinationAddress);
     306        }
     307
    273308        #region Event Handling (System Joined, Left and Message Received)
    274309
     
    278313                OnSystemJoined();
    279314            this.systemJoined.Set();
     315            Initialized = true;
    280316        }
    281317
     
    287323            this.dht = null;
    288324            this.systemLeft.Set();
     325            Initialized = false;
    289326        }
    290327
     
    292329        {
    293330            if (OnP2PMessageReceived != null)
    294                 OnP2PMessageReceived(e.Message.Source.ToByteArray(), e.Message.Data.PopUTF8String());
     331                OnP2PMessageReceived(e.Message.Source.ToString(), e.Message.Data.PopUTF8String());
    295332        }
    296333
     
    298335        {
    299336            if (OnP2PMessageReceived != null)
    300                 OnP2PMessageReceived(e.Source.ToByteArray(), e.Data.PopUTF8String());
     337                OnP2PMessageReceived(e.Source.ToString(), e.Data.PopUTF8String());
    301338        }
    302339
    303340        #endregion
    304341
    305         /*
    306          * Attention: The asynchronous methods are not tested at the moment
    307          */
     342        /* Attention: The asynchronous methods are not tested at the moment */
    308343        #region Asynchronous Methods incl. Callbacks
    309344
     
    372407        #region Synchronous Methods incl. Callbacks
    373408
     409        #region SynchStore incl.Callback and SecondTrialCallback
     410
     411        /* The DHT has an integrated VERSIONING SYSTEM. When a peer wants
     412         * to store data in an entry, which already holds data, the version
     413         * number will be compared with the peers' version number. If the
     414         * peer hasn't read/write the entry the last time, the storing instruction
     415         * will be rejected. You must first read the actual data and than you can
     416         * store your data in this entry... */
    374417        /// <summary>
    375418        /// Stores a value in the DHT at the given key
     
    384427            Guid g = this.dht.Store(OnSynchStoreCompleted, sKey, byteData);
    385428
    386             ResponseWait rw = new ResponseWait() { WaitHandle = are };
     429            ResponseWait rw = new ResponseWait() { WaitHandle = are, key=sKey , value = byteData };
    387430
    388431            waitDict.Add(g, rw);
    389432            //blocking till response
    390433            are.WaitOne();
    391             return true;
     434            return rw.success;
    392435        }
    393436
     
    402445            return SynchStore(sKey, UTF8Encoding.UTF8.GetBytes(sData));
    403446        }
     447        /// <summary>
     448        /// Callback for a the synchronized store method
     449        /// </summary>
     450        /// <param name="rr"></param>
     451        private void OnSynchStoreCompleted(StoreResult sr)
     452        {
     453            ResponseWait rw;
     454            if (this.waitDict.TryGetValue(sr.Guid, out rw))
     455            {
     456                // if Status == Error, than the version of the value is out of date.
     457                // There is a versioning system in the DHT. So you must retrieve the
     458                // key and than store the new value --> that's it, but much traffic.
     459                // to be fixed in a few weeks from M.Helling
     460                if (sr.Status == OperationStatus.Failure)
     461                {
     462                    byte[] byteTemp = this.SynchRetrieve(rw.key);
     463
     464                    // Only try a second time. When it's still not possible, abort storing
     465                    AutoResetEvent are = new AutoResetEvent(false);
     466                    Guid g = this.dht.Store(OnSecondTrialStoring, rw.key, rw.value);
     467                    ResponseWait rw2 = new ResponseWait() { WaitHandle = are, key = rw.key, value = rw.value };
     468
     469                    waitDict.Add(g, rw2);
     470                    // blocking till response
     471                    are.WaitOne();
     472                    rw.success = rw2.success;
     473                    rw.Message = rw2.Message;
     474                }
     475                else
     476                {
     477                    rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
     478                    if (sr.Status == OperationStatus.KeyNotFound)
     479                        rw.success = false;
     480                    else
     481                        rw.success = true;
     482                }
     483            }
     484            //unblock WaitHandle in the synchronous method
     485            rw.WaitHandle.Set();
     486            // don't know if this accelerates the system...
     487            this.waitDict.Remove(sr.Guid);
     488        }
     489
     490        private void OnSecondTrialStoring(StoreResult sr)
     491        {
     492            ResponseWait rw;
     493            if (this.waitDict.TryGetValue(sr.Guid, out rw))
     494            {
     495                if (sr.Status == OperationStatus.Failure)
     496                {
     497                    //Abort storing, because it's already the second trial
     498                    rw.Message = UTF8Encoding.UTF8.GetBytes("Storing also not possible on second trial.");
     499                    rw.success = false;
     500                }
     501                else
     502                {
     503                    //works the second trial, so it was the versioning system
     504                    rw.success = true;
     505                }
     506            }
     507            //unblock WaitHandle in the synchronous method
     508            rw.WaitHandle.Set();
     509            // don't know if this accelerates the system...
     510            this.waitDict.Remove(sr.Guid);
     511        }
     512
     513        #endregion
    404514
    405515        /// <summary>
     
    425535
    426536        /// <summary>
    427         /// Removes a key/value pair out of the DHT
    428         /// </summary>
    429         /// <param name="sKey">Key of the DHT Entry</param>
    430         /// <returns>True, when removing is completed!</returns>
    431         public bool SynchRemove(string sKey)
    432         {
    433             AutoResetEvent are = new AutoResetEvent(false);
    434             // this method returns always a GUID to distinguish between asynchronous actions
    435 
    436             // ROAD WORKS: This function throws an error (ArgumentNotNullException).
    437             //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
    438             Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
    439 
    440             ResponseWait rw = new ResponseWait() { WaitHandle = are };
    441 
    442             waitDict.Add(g, rw);
    443             // blocking till response
    444             are.WaitOne();
    445             return true;
    446         }
    447 
    448         /// <summary>
    449         /// Callback for a the synchronized store method
    450         /// </summary>
    451         /// <param name="rr"></param>
    452         private void OnSynchStoreCompleted(StoreResult sr)
    453         {
    454             ResponseWait rw;
    455             if (this.waitDict.TryGetValue(sr.Guid, out rw))
    456             {
    457                 // if Status == Error, than the version of the value is out of date.
    458                 // There is a versioning system in the DHT. So you must retrieve the
    459                 // key and than store the new value --> that's it, but much traffic.
    460                 // to be fixed in a few weeks from M.Helling
    461                 rw.Message = UTF8Encoding.UTF8.GetBytes(sr.Status.ToString());
    462 
    463                 //unblock WaitHandle in the synchronous method
    464                 rw.WaitHandle.Set();
    465                 // don't know if this accelerates the system...
    466                 this.waitDict.Remove(sr.Guid);
    467             }
    468         }
    469 
    470         /// <summary>
    471537        /// Callback for a the synchronized retrieval method
    472538        /// </summary>
     
    478544            if (this.waitDict.TryGetValue(rr.Guid, out rw))
    479545            {
    480                 rw.Message = rr.Data;
    481 
    482                 //unblock WaitHandle in the synchronous method
    483                 rw.WaitHandle.Set();
    484                 // don't know if this accelerates the system...
    485                 this.waitDict.Remove(rr.Guid);
    486             }
    487         }
    488 
    489         /// <summary>
    490         /// Callback for a the synchronized remove method
    491         /// </summary>
    492         /// <param name="rr"></param>
    493         private void OnSynchRemoveCompleted(RemoveResult rr)
    494         {
    495             ResponseWait rw;
    496             if (this.waitDict.TryGetValue(rr.Guid, out rw))
    497             {
    498                 rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
     546                // successful as long as no error occured
     547                rw.success = true;
     548                if (rr.Status == OperationStatus.Failure)
     549                {
     550                    rw.Message = null;
     551                    rw.success = false;
     552                }
     553                else if (rr.Status == OperationStatus.KeyNotFound)
     554                    rw.Message = null;
     555                else
     556                    rw.Message = rr.Data;
    499557
    500558                //unblock WaitHandle in the synchronous method
     
    504562            }
    505563        }
     564        /// <summary>
     565        /// Removes a key/value pair out of the DHT
     566        /// </summary>
     567        /// <param name="sKey">Key of the DHT Entry</param>
     568        /// <returns>True, when removing is completed!</returns>
     569        public bool SynchRemove(string sKey)
     570        {
     571            AutoResetEvent are = new AutoResetEvent(false);
     572            // this method returns always a GUID to distinguish between asynchronous actions
     573
     574            // ROAD WORKS: This function throws an error (ArgumentNotNullException).
     575            //             I think that's an error in the p@p-environment --> forwarded to the p@p-Team
     576            Guid g = this.dht.Remove(OnSynchRemoveCompleted, sKey);
     577
     578            ResponseWait rw = new ResponseWait() { WaitHandle = are };
     579
     580            waitDict.Add(g, rw);
     581            // blocking till response
     582            are.WaitOne();
     583            return rw.success;
     584        }
     585
     586        /// <summary>
     587        /// Callback for a the synchronized remove method
     588        /// </summary>
     589        /// <param name="rr"></param>
     590        private void OnSynchRemoveCompleted(RemoveResult rr)
     591        {
     592            ResponseWait rw;
     593            if (this.waitDict.TryGetValue(rr.Guid, out rw))
     594            {
     595                rw.Message = UTF8Encoding.UTF8.GetBytes(rr.Status.ToString());
     596
     597                if (rr.Status == OperationStatus.Failure || rr.Status == OperationStatus.KeyNotFound)
     598                    rw.success = false;
     599                else
     600                    rw.success = true;
     601
     602                //unblock WaitHandle in the synchronous method
     603                rw.WaitHandle.Set();
     604                // don't know if this accelerates the system...
     605                this.waitDict.Remove(rr.Guid);
     606            }
     607        }
    506608
    507609        #endregion
  • trunk/CrypPlugins/PeerToPeerBase/ResponseWait.cs

    r783 r862  
    1111        public AutoResetEvent WaitHandle = null;
    1212        public byte[] Message = null;
     13        public string key = null;
     14        public byte[] value = null;
     15
     16        public bool success = false;
    1317    }
    1418
  • trunk/CrypPlugins/PeerToPeerPublisher/P2PPublisher.cs

    r836 r862  
    2727/*
    2828 * TODO:
    29  * - independent reverse counter per subscriber / timestamp dict (Key = SubID, Value = TimeStamp | SecondChanceStep)
    3029 * - FUTURE: dual data management of subscriber list (on local peer and in DHT)
    3130 */
     
    4039    public class P2PPublisher : IInput
    4140    {
    42         #region PrivVariables (Lists, Timer, etc)
    43 
    44         private string sDHTSettingsPostfix = "Settings";
    45 
    4641        private P2PPublisherSettings settings;
    4742        private IP2PControl p2pMaster;
    48         /*
    49         private List<byte[]> lstSubscribers = new List<byte[]>();
    50         private List<byte[]> lstWaitingForPong = new List<byte[]>();
    51         private List<byte[]> lstAliveArrived = new List<byte[]>();
    52         */
    53         private SubscriberInfo subManagement;
    54         private Timer timerWaitingForAliveMsg;
    55         private long aliveMessageInterval;
     43        private P2PPublisherBase p2pPublisher;
     44
     45        public P2PPublisher()
     46        {
     47            this.settings = new P2PPublisherSettings(this);
     48            this.settings.PropertyChanged += new PropertyChangedEventHandler(settings_PropertyChanged);
     49            this.settings.TaskPaneAttributeChanged += new TaskPaneAttributeChangedHandler(settings_TaskPaneAttributeChanged);
     50        }
     51
     52        #region SettingEvents
     53
     54        private void settings_TaskPaneAttributeChanged(ISettings settings, TaskPaneAttributeChangedEventArgs args)
     55        {
     56            //throw new NotImplementedException();
     57        }
     58
     59        private void settings_PropertyChanged(object sender, PropertyChangedEventArgs e)
     60        {
     61            // storing settings for subscribers in the DHT, so they can load them there
     62            if (e.PropertyName == "SendAliveMessageInterval")
     63            {
     64                this.p2pMaster.DHTstore(settings.TopicName + "Settings",
     65                    System.BitConverter.GetBytes(this.settings.SendAliveMessageInterval));
     66            }
     67            // if TaskName has changed, clear the Lists, because the Subscribers must reconfirm registering
     68            if (e.PropertyName == "TopicName")
     69            {
     70                GuiLogMessage("Topic Name has changed, so all subscribers must reconfirm registering!", NotificationLevel.Warning);
     71                // stop active publisher and tell all subscribers that topic name isn't valid anymore
     72                this.p2pPublisher.Stop(PubSubMessageType.Unregister);
     73                // start publisher for the changed topic
     74                this.p2pPublisher.Start(this.settings.TopicName, (long)this.settings.SendAliveMessageInterval);
     75            }
     76            if (e.PropertyName == "BtnUnregister")
     77            {
     78                this.p2pPublisher.Stop(PubSubMessageType.Unregister);
     79                GuiLogMessage("Unregister button pressed, Publisher has stopped!", NotificationLevel.Info);
     80            }
     81            if (e.PropertyName == "BtnRegister")
     82            {
     83                this.p2pPublisher.Start(this.settings.TopicName, (long)this.settings.SendAliveMessageInterval);
     84                GuiLogMessage("Register button pressed, Publisher has been started!", NotificationLevel.Info);
     85            }
     86            if (e.PropertyName == "BtnSolutionFound")
     87            {
     88                this.p2pPublisher.Stop(PubSubMessageType.Solution);
     89                GuiLogMessage("TEST: Emulate Solution-Found-message", NotificationLevel.Info);
     90            }
     91        }
    5692
    5793        #endregion
     
    124160        #region Standard PlugIn-Functionality
    125161
    126         public P2PPublisher()
    127         {
    128             this.settings = new P2PPublisherSettings(this);
    129             this.settings.PropertyChanged += new PropertyChangedEventHandler(settings_PropertyChanged);
    130         }
    131 
    132         void settings_PropertyChanged(object sender, PropertyChangedEventArgs e)
    133         {
    134             // storing settings for subscribers in the DHT, so they can load them there
    135             if (e.PropertyName == "SendAliveMessageInterval")
    136             {
    137                 this.p2pMaster.DHTstore(settings.TaskName + this.sDHTSettingsPostfix,
    138                     System.BitConverter.GetBytes(this.settings.SendAliveMessageInterval));
    139             }
    140             // if TaskName has changed, clear the Lists, because the Subscribers must reconfirm registering
    141             if (e.PropertyName == "TaskName")
    142             {
    143                 /*
    144                 this.lstSubscribers.Clear();
    145                 this.lstAliveArrived.Clear();
    146                 this.lstWaitingForPong.Clear();
    147                 */
    148                 GuiLogMessage("Taskname has changed, so all subscribers must reconfirm registering!",NotificationLevel.Warning);
    149             }
    150         }
    151 
    152162        public ISettings Settings
    153163        {
     
    168178        public void PreExecution()
    169179        {
    170             aliveMessageInterval = (long)this.settings.SendAliveMessageInterval * 1000;
    171             this.subManagement = new SubscriberInfo(aliveMessageInterval);
    172             this.subManagement.OnSubscriberRemoved += new SubscriberInfo.SubscriberRemoved(subManagement_OnSubscriberRemoved);
    173         }
    174 
    175         // Execute-Method is below this region
    176 
    177         public void PostExecution()
    178         {
    179             //throw new NotImplementedException();
    180         }
    181 
    182         public void Pause()
    183         {
    184             //throw new NotImplementedException();
    185         }
    186 
    187         public void Stop()
    188         {
    189             if (this.timerWaitingForAliveMsg != null)
    190             {
    191                 this.timerWaitingForAliveMsg.Dispose();
    192                 this.timerWaitingForAliveMsg = null;
    193             }
    194         }
    195 
    196         public void Initialize()
    197         {
    198         }
    199 
    200         public void Dispose()
    201         {
    202         }
    203 
    204         #endregion
    205 
    206         public void Execute()
    207         {
    208180            // if no P2P Slave PlugIn is connected with this PlugIn --> No execution!
    209181            if (P2PMaster == null)
     
    212184                return;
    213185            }
    214             if (this.settings.TaskName != null)
    215             {
    216                 string sActualPeerName;
    217 
    218                 // publish own PeerID to the DHT Entry with the key "TaskName", so every subscriber
    219                 // can retrieve the name and send a register-message to the publisher
    220                 byte[] bytePeerId = P2PMaster.GetPeerID(out sActualPeerName);
    221 
    222                 sActualPeerName = P2PMaster.ConvertPeerId(bytePeerId);
    223 
    224                 // Question: Why casting to String isn't possible?
    225                 P2PMaster.DHTstore(this.settings.TaskName, bytePeerId);
    226                 P2PMaster.DHTstore(this.settings.TaskName + this.sDHTSettingsPostfix,
    227                     System.BitConverter.GetBytes(aliveMessageInterval));
    228 
    229                 GuiLogMessage("Peer ID '" + P2PMaster.ConvertPeerId(bytePeerId) + "' is published to DHT -Entry-Key '" + this.settings.TaskName + "'", NotificationLevel.Info);
    230             }
    231             else
     186
     187            if (this.p2pPublisher == null)
     188            {
     189                this.p2pPublisher = new P2PPublisherBase(this.P2PMaster);
     190                this.p2pPublisher.OnGuiMessage += new P2PPublisherBase.GuiMessage(p2pPublisher_OnGuiMessage);
     191                this.p2pPublisher.Start(this.settings.TopicName, (long)this.settings.SendAliveMessageInterval);
     192            }
     193        }
     194
     195        public void Execute()
     196        {
     197            if (this.settings == null && this.settings.TopicName == null)
    232198            {
    233199                GuiLogMessage("There is no input and/or empty Settings. Storing isn't possible.", NotificationLevel.Error);
    234200                return;
    235201            }
     202
    236203            if (this.Inputvalue != null)
    237204            {
    238                 PublishText(this.Inputvalue);
    239             }
    240         }
    241 
    242         private void PublishText(string sText)
    243         {
    244             Dictionary<byte[], DateTime> lstSubscribers = this.subManagement.GetAllSubscribers();
    245             foreach (byte[] byteSubscriber in lstSubscribers.Keys)
    246             {
    247                 this.P2PMaster.SendToPeer(sText, byteSubscriber);
    248             }
    249         }
    250 
    251         private void p2pMaster_OnPeerReceivedMsg(byte[] byteSourceAddr, string sData)
    252         {
    253             if (sData.Trim() == "regi")
    254             {
    255                 if (this.subManagement.Add(byteSourceAddr))
    256                     GuiLogMessage("REGISTERED: Peer with ID " + P2PMaster.ConvertPeerId(byteSourceAddr), NotificationLevel.Info);
    257                 else
    258                     GuiLogMessage("ALREADY REGISTERED peer with ID " + P2PMaster.ConvertPeerId(byteSourceAddr), NotificationLevel.Info);
    259             }
    260             else
    261             {
    262                 if (this.subManagement.Update(byteSourceAddr))
    263                     GuiLogMessage("RECEIVED: " + sData.Trim() + " Message from " + P2PMaster.ConvertPeerId(byteSourceAddr), NotificationLevel.Info);
    264                 else
    265                     GuiLogMessage("UPDATE FAILED for " + P2PMaster.ConvertPeerId(byteSourceAddr) + " because it hasn't registered first.", NotificationLevel.Info);
    266                 if (sData.Trim() == "ping")
    267                     this.P2PMaster.SendToPeer("pong", byteSourceAddr);
    268             }
    269             if(timerWaitingForAliveMsg == null)
    270                 timerWaitingForAliveMsg = new Timer(OnWaitingForAliveMsg, null, this.settings.SendAliveMessageInterval * 1000,
    271                     this.settings.SendAliveMessageInterval * 1000);
    272         }
    273 
    274         private void OnWaitingForAliveMsg(object state)
    275         {
    276             List<byte[]> lstOutdatedSubscribers = this.subManagement.CheckVitality();
    277             foreach (byte[] outdatedSubscriber in lstOutdatedSubscribers)
    278             {
    279                 P2PMaster.SendToPeer("ping", outdatedSubscriber);
    280                 GuiLogMessage("PING outdated peer " + P2PMaster.ConvertPeerId(outdatedSubscriber), NotificationLevel.Info);
    281             }
    282         }
    283 
    284         private void subManagement_OnSubscriberRemoved(byte[] byPeerId)
    285         {
    286             GuiLogMessage("REMOVED subscriber " + P2PMaster.ConvertPeerId(byPeerId), NotificationLevel.Info);
    287         }
    288 
    289         /*OLD SOLUTION - WORKS, BUT NOT THE BEST WAY...*/
    290         /*
    291         void p2pMaster_OnPeerReceivedMsg(byte[] byteSourceAddr, string sData)
    292         {
    293             if (sData.Trim() == "regi")
    294             {
    295                 if (!lstSubscribers.Contains(byteSourceAddr))
    296                 {
    297                     lstSubscribers.Add(byteSourceAddr);
    298                     GuiLogMessage("REGISTERED: Peer with ID " + P2PMaster.ConvertPeerId(byteSourceAddr), NotificationLevel.Info);
    299                     StartWaitingTimer();
    300                 }
    301             }
    302             if (sData.Trim() == "aliv")
    303             {
    304                 if (!lstAliveArrived.Contains(byteSourceAddr))
    305                 {
    306                     lstAliveArrived.Add(byteSourceAddr);
    307                     GuiLogMessage("RECEIVED: Alive Message from " + P2PMaster.ConvertPeerId(byteSourceAddr), NotificationLevel.Info);
    308                     // if Alive Msg arrived to late, the subscriber was removed from list
    309                     // of active subscribers. So we must add him again.
    310                     if (!lstSubscribers.Contains(byteSourceAddr))
    311                         lstSubscribers.Add(byteSourceAddr);
    312                     StartWaitingTimer();
    313                 }
    314             }
    315             if (sData.Trim() == "pong")
    316             {
    317                 GuiLogMessage("RECEIVED: Pong Message from " + P2PMaster.ConvertPeerId(byteSourceAddr), NotificationLevel.Info);
    318                 if (lstWaitingForPong.Contains(byteSourceAddr))
    319                     lstWaitingForPong.Remove(byteSourceAddr);
    320             }
    321         }
    322 
    323         private void StartWaitingTimer()
    324         {
    325             // check every 30 seconds if the subscribers are still alive
    326             // Timer gets started not until retrieving the first register message from a Subscriber
    327             if (timerWaitingForAliveMsg == null)
    328                 timerWaitingForAliveMsg = new Timer(OnWaitingForAliveMsg, null, 30000, 30000);
    329         }
    330 
    331         private void OnWaitingForAliveMsg(object state)
    332         {
    333             GuiLogMessage("Checking vitality of Subscribers", NotificationLevel.Info);
    334             for (int i = 0; i < this.lstSubscribers.Count; i++)
    335             {
    336                 if (!this.lstAliveArrived.Contains(this.lstSubscribers[i]))
    337                 {
    338                     // only if Subscriber had not send a Alive message
    339                     // remove him from the subscriber list
    340                     GuiLogMessage("DEAD: Subscriber " + P2PMaster.ConvertPeerId(this.lstSubscribers[i]) + " is removed from SubList!", NotificationLevel.Info);
    341                     this.lstSubscribers.Remove(this.lstSubscribers[i]);
    342                 }
    343                 else
    344                 {
    345                     this.lstAliveArrived.Remove(this.lstSubscribers[i]);
    346                 }
    347             }
    348             // if there are no more subscribers, stop Timer of
    349             // waiting for alive messages
    350             if (this.lstSubscribers.Count == 0)
    351             {
    352                 timerWaitingForAliveMsg.Dispose();
    353                 return;
    354             }
    355         }
    356         */
     205                this.p2pPublisher.Publish(this.Inputvalue);
     206            }
     207        }
     208
     209        public void PostExecution()
     210        {
     211            //throw new NotImplementedException();
     212        }
     213
     214        public void Pause()
     215        {
     216            //throw new NotImplementedException();
     217        }
     218
     219        public void Stop()
     220        {
     221            if(this.p2pPublisher != null)
     222                this.p2pPublisher.Stop(PubSubMessageType.Unregister);
     223        }
     224
     225        public void Initialize()
     226        {
     227        }
     228
     229        public void Dispose()
     230        {
     231        }
     232
     233        #endregion
     234
     235        private void p2pMaster_OnPeerReceivedMsg(string sSourceAddr, string sData)
     236        {
     237            //this.p2pPublisher.MessageReceived(sSourceAddr, sData);
     238        }
     239
     240        private void p2pPublisher_OnGuiMessage(string sData, NotificationLevel notificationLevel)
     241        {
     242            GuiLogMessage(sData, notificationLevel);
     243        }
    357244
    358245        #region INotifyPropertyChanged Members
     
    378265
    379266        #endregion
     267
     268        //#region Publisher methods
     269
     270        //private string topic = String.Empty;
     271        //private string sDHTSettingsPostfix = "Settings";
     272        //private SubscriberManagement subManagement;
     273        //private Timer timerWaitingForAliveMsg;
     274        //private string sPeerName;
     275        //private long aliveMessageInterval;
     276
     277        //private bool StartPublisher(string sTopic, long aliveMessageInterval)
     278        //{
     279        //    this.topic = sTopic;
     280        //    aliveMessageInterval = aliveMessageInterval * 1000;
     281        //    this.subManagement = new SubscriberManagement(aliveMessageInterval);
     282        //    this.subManagement.OnSubscriberRemoved += new SubscriberManagement.SubscriberRemoved(subManagement_OnSubscriberRemoved);
     283
     284        //    string sActualPeerName;
     285
     286        //    // publish own PeerID to the DHT Entry with the key "TaskName", so every subscriber
     287        //    // can retrieve the name and send a register-message to the publisher
     288        //    string sPeerId = P2PMaster.GetPeerID(out sPeerName);
     289        //    //byte[] bytePeerId = P2PMaster.GetPeerID(out sActualPeerName);
     290
     291        //    sActualPeerName = sPeerId;
     292
     293        //    // before storing the publishers ID in the DHT, proof whether there already exist an entry
     294        //    byte[] byRead = P2PMaster.DHTload(sTopic);
     295        //    string sRead;
     296        //    // if byRead is not null, the DHT entry was already written
     297        //    if (byRead != null)
     298        //    {
     299        //        sRead = UTF8Encoding.UTF8.GetString(byRead);
     300        //        // if sRead equals sPeerId this Publisher with the same topic had written
     301        //        // this entry - no problem! Otherwise abort Starting the publisher!
     302        //        if (sRead != sPeerId)
     303        //        {
     304        //            GuiLogMessage("Can't store Publisher in the DHT because the Entry was already occupied.", NotificationLevel.Error);
     305        //            return false;
     306        //        }
     307        //    }
     308        //    bool bolTopicStored = P2PMaster.DHTstore(sTopic, sPeerId);
     309        //    bool bolSettingsStored = P2PMaster.DHTstore(sTopic + this.sDHTSettingsPostfix,
     310        //        System.BitConverter.GetBytes(aliveMessageInterval));
     311
     312        //    if (!bolTopicStored || !bolSettingsStored)
     313        //    {
     314        //        GuiLogMessage("Storing Publishers ID or Publishers Settings wasn't possible.", NotificationLevel.Error);
     315        //        return false;
     316        //    }
     317
     318        //    GuiLogMessage("Peer ID '" + sPeerId + "' is published to DHT -Entry-Key '" + this.settings.TopicName + "'", NotificationLevel.Info);
     319        //    return true;
     320        //}
     321
     322        //private void MessageReceived(string sSourceAddr, string sData)
     323        //{
     324        //    PubSubMessageType msgType = this.P2PMaster.GetMsgType(sData);
     325
     326        //    if (msgType != PubSubMessageType.NULL)
     327        //    {
     328        //        switch (msgType)
     329        //        {
     330        //            case PubSubMessageType.Register:
     331        //                if (this.subManagement.Add(sSourceAddr))
     332        //                {
     333        //                    GuiLogMessage("REGISTERED: Peer with ID " + sSourceAddr, NotificationLevel.Info);
     334        //                    this.P2PMaster.SendToPeer(PubSubMessageType.RegisteringAccepted, sSourceAddr);
     335        //                }
     336        //                else
     337        //                {
     338        //                    GuiLogMessage("ALREADY REGISTERED peer with ID " + sSourceAddr, NotificationLevel.Info);
     339        //                }
     340        //                break;
     341        //            case PubSubMessageType.Unregister:
     342        //                if (this.subManagement.Remove(sSourceAddr))
     343        //                    GuiLogMessage("REMOVED subscriber " + sSourceAddr + " because it had sent an unregister message", NotificationLevel.Info);
     344        //                else
     345        //                    GuiLogMessage("ALREADY REMOVED or had not registered anytime. ID " + sSourceAddr, NotificationLevel.Info);
     346        //                break;
     347        //            case PubSubMessageType.Alive:
     348        //            case PubSubMessageType.Pong:
     349        //                if (this.subManagement.Update(sSourceAddr))
     350        //                {
     351        //                    GuiLogMessage("RECEIVED: " + msgType.ToString() + " Message from " + sSourceAddr, NotificationLevel.Info);
     352        //                }
     353        //                else
     354        //                {
     355        //                    GuiLogMessage("UPDATE FAILED for " + sSourceAddr + " because it hasn't registered first. " + msgType.ToString(), NotificationLevel.Info);
     356        //                }
     357        //                break;
     358        //            case PubSubMessageType.Ping:
     359        //                this.P2PMaster.SendToPeer(PubSubMessageType.Pong, sSourceAddr);
     360        //                GuiLogMessage("REPLIED to a ping message from subscriber " + sSourceAddr, NotificationLevel.Info);
     361        //                break;
     362        //            case PubSubMessageType.Solution:
     363        //                // Send solution msg to all subscriber peers and delete subList
     364        //                StopPublisher(msgType);
     365        //                break;
     366        //            default:
     367        //                throw (new NotImplementedException());
     368        //        } // end switch
     369        //        if (timerWaitingForAliveMsg == null)
     370        //            timerWaitingForAliveMsg = new Timer(OnWaitingForAliveMsg, null, this.settings.SendAliveMessageInterval * 1000,
     371        //                this.settings.SendAliveMessageInterval * 1000);
     372        //    }
     373        //    // Received Data aren't PubSubMessageTypes or rather no action-relevant messages
     374        //    else
     375        //    {
     376        //        GuiLogMessage("RECEIVED message from non subscribed peer: " + sData.Trim() + ", ID: " + sSourceAddr, NotificationLevel.Warning);
     377        //    }
     378        //}
     379
     380        //private void Publish(string sText)
     381        //{
     382        //    Dictionary<string, DateTime> lstSubscribers = this.subManagement.GetAllSubscribers();
     383
     384        //    PubSubMessageType msgType = this.P2PMaster.GetMsgType(sText);
     385        //    if (msgType == PubSubMessageType.NULL)
     386        //    {
     387        //        foreach (string sSubscriber in lstSubscribers.Keys)
     388        //        {
     389        //            this.P2PMaster.SendToPeer(sText, sSubscriber);
     390        //        }
     391        //    }
     392        //    else
     393        //    {
     394        //        foreach (string sSubscriber in lstSubscribers.Keys)
     395        //        {
     396        //            this.P2PMaster.SendToPeer(msgType, sSubscriber);
     397        //        }
     398        //    }
     399        //}
     400
     401        ///// <summary>
     402        ///// if peers are outdated (alive message doesn't arrive in the given interval)
     403        ///// ping them to give them a second chance
     404        ///// </summary>
     405        ///// <param name="state"></param>
     406        //private void OnWaitingForAliveMsg(object state)
     407        //{
     408        //    List<string> lstOutdatedSubscribers = this.subManagement.CheckVitality();
     409        //    foreach (string outdatedSubscriber in lstOutdatedSubscribers)
     410        //    {
     411        //        P2PMaster.SendToPeer(PubSubMessageType.Ping, outdatedSubscriber);
     412        //        GuiLogMessage("PING outdated peer " + outdatedSubscriber, NotificationLevel.Info);
     413        //    }
     414        //}
     415
     416        //private void subManagement_OnSubscriberRemoved(string sPeerId)
     417        //{
     418        //    GuiLogMessage("REMOVED subscriber " + sPeerId, NotificationLevel.Info);
     419        //}
     420
     421        //private void StopPublisher(PubSubMessageType msgType)
     422        //{
     423        //    if(this.P2PMaster != null)
     424        //        // send unregister message to all subscribers
     425        //        Publish(((int)msgType).ToString());
     426        //    if (this.P2PMaster != null)
     427        //    {
     428        //        this.P2PMaster.DHTremove(this.topic);
     429        //        this.P2PMaster.DHTremove(this.topic + this.sDHTSettingsPostfix);
     430        //    }
     431        //    if (this.timerWaitingForAliveMsg != null)
     432        //    {
     433        //        this.timerWaitingForAliveMsg.Dispose();
     434        //        this.timerWaitingForAliveMsg = null;
     435        //    }
     436        //}
     437
     438        //#endregion
    380439    }
    381440
  • trunk/CrypPlugins/PeerToPeerPublisher/P2PPublisherSettings.cs

    r836 r862  
    1313    public class P2PPublisherSettings : ISettings
    1414    {
     15        public event TaskPaneAttributeChangedHandler TaskPaneAttributeChanged;
    1516        private bool hasChanges = false;
    1617        private P2PPublisher p2pPublisher;
     
    3536        {
    3637            this.p2pPublisher = p2pPublisher;
     38            if (TaskPaneAttributeChanged != null)
     39            {
     40                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Collapsed)));
     41                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Hidden)));
     42                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Visible)));
     43            }
    3744        }
    3845
    39         private string sTaskName = "NewCompTask";
    40         [TaskPane("Task Name","Choose a name for the computational task",null,0,false,DisplayLevel.Beginner,ControlType.TextBox)]
    41         public string TaskName
     46        private string sTopic = "NewTopic";
     47        [TaskPane("Topic Name","Choose a topic name with which all subscribers can register.",null,0,false,DisplayLevel.Beginner,ControlType.TextBox)]
     48        public string TopicName
    4249        {
    43             get { return this.sTaskName; }
     50            get { return this.sTopic; }
    4451            set
    4552            {
    46                 if (this.sTaskName != value && value != String.Empty && value != null)
     53                if (this.sTopic != value && value != String.Empty && value != null)
    4754                {
    48                     this.sTaskName = value;
     55                    this.sTopic = value;
    4956                    HasChanges = true;
    50                     OnPropertyChanged("TaskName");
     57                    OnPropertyChanged("TopicName");
    5158                }
    5259            }
    5360        }
     61
     62        /* FOR TESTING ISSUES */
     63        [TaskPane("Unregister", "Click here to Unregister the publisher from all registered subscribers!", "Control region", 0, true, DisplayLevel.Beginner, ControlType.Button)]
     64        public void BtnUnregister()
     65        {
     66            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Collapsed)));
     67            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Collapsed)));
     68            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Visible)));
     69            OnPropertyChanged("BtnUnregister");
     70        }
     71        [TaskPane("Register", "Click here to Register the publisher pro-active with all formely registered subscribers!", "Control region", 1, true, DisplayLevel.Beginner, ControlType.Button)]
     72        public void BtnRegister()
     73        {
     74            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Visible)));
     75            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Collapsed)));
     76            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Visible)));
     77            OnPropertyChanged("BtnRegister");
     78        }
     79        [TaskPane("Solution found", "TESTING: Emulate solution-found-case!", "Control region", 2, true, DisplayLevel.Beginner, ControlType.Button)]
     80        public void BtnSolutionFound()
     81        {
     82            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Collapsed)));
     83            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Collapsed)));
     84            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Visible)));
     85            OnPropertyChanged("BtnSolutionFound");
     86        }
     87        /* FOR TESTING ISSUES */
    5488
    5589        private int sendAliveMessageInterval = 20;
  • trunk/CrypPlugins/PeerToPeerPublisher/PeerToPeerPublisher.csproj

    r836 r862  
    5656  <ItemGroup>
    5757    <Compile Include="P2PPublisher.cs" />
     58    <Compile Include="P2PPublisherBase.cs" />
    5859    <Compile Include="P2PPublisherSettings.cs" />
    5960    <Compile Include="Properties\AssemblyInfo.cs" />
    60     <Compile Include="SubscribersInfo.cs" />
     61    <Compile Include="SubscribersManagement.cs" />
    6162  </ItemGroup>
    6263  <ItemGroup>
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriber.cs

    r836 r862  
    2626
    2727/*
     28 * IDEAS:
     29 * - Publisher takes subscriber list out of the DHT and registered
     30 *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
     31 *
    2832 * TODO:
    29  * - Receive "add"-Message from Publisher (2-way-handshake),
    30  *   so you can be sure that the Publisher exists at present
    3133 * - Handle "publisher-changed" case (reconfirm registration, etc.)
    32  * - Check availability of Publisher periodically (make GuiLogMsg)
    33  * - Unregister subscriber on Stop-Action
    3434 */
    3535
     
    4848        private long sendAliveMessageInterval;
    4949        private long checkPublishersAvailability;
    50         /// <summary>
    51         /// if true, check whether a new Publisher is the actual one
    52         /// and renew Settings
    53         /// </summary>
    54         private bool bolStopped = true;
    5550
    5651        private P2PSubscriberSettings settings;
     
    7065        /// </summary>
    7166        private Timer timerCheckPubAvailability;
    72         private byte[] actualPublisher;
     67        /// <summary>
     68        /// this timer gets started when the availability of the publisher,
     69        /// at which the subscriber had registered, is checked. If the timer
     70        /// callback is called and no Pong-message was received, the probability
     71        /// that the Publisher is down is high!
     72        /// </summary>
     73        private Timer timeoutForPublishersPong;
     74        /// <summary>
     75        /// After register message is sent to publisher, this timer gets started.
     76        /// If the publisher doesn't response with a RegisteringAccepted-Message,
     77        /// the probability that the publisher is down is high!
     78        /// </summary>
     79        private Timer timeoutForPublishersRegAccept;
     80        /// <summary>
     81        /// PeerID of the actual publisher. This ID is will be checked continious
     82        /// on liveliness and/or updated if Publisher had changed.
     83        /// </summary>
     84        private string actualPublisher;
    7385
    7486        #endregion
     
    127139        #endregion
    128140
     141        private void P2PMaster_OnPeerReceivedMsg(string sSourceAddr, string sData)
     142        {
     143            MessageReceived(sSourceAddr, sData);
     144        }
     145
    129146        private void P2PMaster_OnStatusChanged(IControl sender, bool readyForExecution)
    130147        {
     
    146163        {
    147164            this.settings = new P2PSubscriberSettings(this);
     165            this.settings.PropertyChanged += new PropertyChangedEventHandler(settings_PropertyChanged);
     166            this.settings.TaskPaneAttributeChanged += new TaskPaneAttributeChangedHandler(settings_TaskPaneAttributeChanged);
     167        }
     168
     169        void settings_TaskPaneAttributeChanged(ISettings settings, TaskPaneAttributeChangedEventArgs args)
     170        {
     171            // throw new NotImplementedException();
     172        }
     173
     174        private void settings_PropertyChanged(object sender, PropertyChangedEventArgs e)
     175        {
     176            if (e.PropertyName == "BtnUnregister")
     177            {
     178                StopActiveWork();
     179                GuiLogMessage("Subscriber unregistered from Publisher!", NotificationLevel.Info);
     180            }
     181            if (e.PropertyName == "BtnRegister")
     182            {
     183                Register();
     184                GuiLogMessage("Subscriber registers with Publisher!", NotificationLevel.Info);
     185            }
     186            if (e.PropertyName == "BtnSolutionFound")
     187            {
     188                SendMessage(actualPublisher, PubSubMessageType.Solution);
     189                GuiLogMessage("Solution found message sent to Publisher.", NotificationLevel.Info);
     190            }
    148191        }
    149192
     
    168211        public void PreExecution()
    169212        {
    170             bolStopped = false;
    171213        }
    172214
     
    178220        public void Pause()
    179221        {
    180             //throw new NotImplementedException();
     222            StopActiveWork();
    181223        }
    182224
    183225        public void Stop()
    184226        {
    185             bolStopped = true;
    186             try
    187             {
    188                 if (this.timerSendingAliveMsg != null)
    189                 {
    190                     this.timerSendingAliveMsg.Dispose();
    191                     this.timerSendingAliveMsg = null;
    192                 }
    193                 if (this.timerRegisteringNotPossible != null)
    194                 {
    195                     this.timerRegisteringNotPossible.Dispose();
    196                     this.timerRegisteringNotPossible = null;
    197                 }
    198             }
    199             catch (Exception ex)
    200             {
    201                 GuiLogMessage(ex.ToString(), NotificationLevel.Error);
    202             }
     227            StopActiveWork();
    203228        }
    204229
     
    211236        }
    212237        #endregion
    213 
    214         #region Methods, EventHandler, etc. for Subscriber
    215238
    216239        public void Execute()
     
    222245                return;
    223246            }
    224             if (this.settings.TaskName != null)
    225             {
    226                 byte[] bytePubId = CheckPublisherAvailability();
    227                 // if DHT Entry for Task is empty, there exist no Publisher at present.
    228                 // The method PublisherIsAlive starts a Timer for this case to continous proof Publisher-DHT-Entry
    229                 if (bytePubId != null)
    230                     SendMsgToPublisher(bytePubId, PubSubMessageType.Register);
     247            if (this.settings.TopicName != null)
     248            {
     249                Register();
    231250            }
    232251            else
     
    236255        }
    237256
    238         private void SendMsgToPublisher(byte[] pubPeerId, PubSubMessageType msgType)
    239         {
    240             if (timerSendingAliveMsg == null && !bolStopped)
     257        #region INotifyPropertyChanged Members
     258
     259        public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged;
     260
     261        public void OnPropertyChanged(string name)
     262        {
     263            EventsHelper.PropertyChanged(PropertyChanged, this, new PropertyChangedEventArgs(name));
     264        }
     265
     266        public event PluginProgressChangedEventHandler OnPluginProcessChanged;
     267
     268        private void ProgressChanged(double value, double max)
     269        {
     270            EventsHelper.ProgressChanged(OnPluginProgressChanged, this, new PluginProgressEventArgs(value, max));
     271        }
     272
     273        private void GuiLogMessage(string p, NotificationLevel notificationLevel)
     274        {
     275            EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this, new GuiLogEventArgs(p, this, notificationLevel));
     276        }
     277
     278        #endregion
     279
     280        #region Subscriber methods
     281
     282        /// <summary>
     283        /// if true, check whether a new Publisher is the actual one
     284        /// and renew Settings
     285        /// </summary>
     286        private bool bolStopped = true;
     287
     288        private void Register()
     289        {
     290            string sPubId = CheckPublisherAvailability();
     291            // if DHT Entry for Task is empty, there exist no Publisher at present.
     292            // The method PublisherIsAlive starts a Timer for this case to continous proof Publisher-DHT-Entry
     293            if (sPubId != null)
     294            {
     295                SendMessage(sPubId, PubSubMessageType.Register);
     296                long interval = this.settings.PublishersReplyTimespan * 1000;
     297                if (this.timeoutForPublishersRegAccept == null)
     298                    this.timeoutForPublishersRegAccept = new Timer(OnTimeoutRegisteringAccepted, null, interval, interval);
     299                this.bolStopped = false;
     300            }
     301        }
     302
     303        private void MessageReceived(string sSourceAddr, string sData)
     304        {
     305            if (sSourceAddr != actualPublisher)
     306            {
     307                GuiLogMessage("RECEIVED message from third party peer (not the publisher!): " + sData.Trim() + ", ID: " + sSourceAddr, NotificationLevel.Info);
     308                return;
     309            }
     310
     311            PubSubMessageType msgType = this.P2PMaster.GetMsgType(sData);
     312
     313            switch (msgType)
     314            {
     315                case PubSubMessageType.RegisteringAccepted:
     316                    GuiLogMessage("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
     317                    if (this.timeoutForPublishersRegAccept != null)
     318                    {
     319                        this.timeoutForPublishersRegAccept.Dispose();
     320                        this.timeoutForPublishersRegAccept = null;
     321                    }
     322                    break;
     323                case PubSubMessageType.Ping:
     324                    SendMessage(sSourceAddr, PubSubMessageType.Pong);
     325                    GuiLogMessage("REPLIED to a ping message from " + sSourceAddr, NotificationLevel.Info);
     326                    break;
     327                case PubSubMessageType.Register:
     328                case PubSubMessageType.Unregister:
     329                    /* can't work the right way, because at present the
     330                     * publisher can't remove information of the DHT
     331                     * (point of entry for subscribers) */
     332                    GuiLogMessage(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Warning);
     333                    // continuously try to get a unregister and than re-register with publisher
     334                    StopActiveWork();
     335                    Register();
     336                    break;
     337                case PubSubMessageType.Solution:
     338                    StopActiveWork();
     339                    GuiLogMessage("Another Subscriber had found the solution!",NotificationLevel.Info);
     340                    break;
     341                case PubSubMessageType.Stop:
     342                    StopActiveWork();
     343                    GuiLogMessage("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
     344                    break;
     345                case PubSubMessageType.Pong:
     346                    if (this.timeoutForPublishersPong != null)
     347                    {
     348                        this.timeoutForPublishersPong.Dispose();
     349                        this.timeoutForPublishersPong = null;
     350                    }
     351                    break;
     352                // if the received Data couldn't be casted to enum,
     353                // it must be text-data
     354                case PubSubMessageType.NULL:
     355                    GuiLogMessage("RECEIVED: Message from '" + sSourceAddr
     356                    + "' with data: '" + sData + "'", NotificationLevel.Info);
     357                    Outputvalue = sData;
     358                    break;
     359                case PubSubMessageType.Alive:
     360                default:
     361                    // not possible at the moment
     362                    break;
     363            }
     364        }
     365
     366        private void SendMessage(string pubPeerId, PubSubMessageType msgType)
     367        {
     368            if (timerSendingAliveMsg == null && !this.bolStopped)
    241369                timerSendingAliveMsg = new Timer(OnSendAliveMessage, null, sendAliveMessageInterval, sendAliveMessageInterval);
    242370
     
    245373                case PubSubMessageType.Register:
    246374                    // stop "RegisteringNotPossibleTimer
    247                     if(timerRegisteringNotPossible != null)
     375                    if (timerRegisteringNotPossible != null)
     376                    {
    248377                        timerRegisteringNotPossible.Dispose();
    249 
    250                     // send register message to the publisher peer
    251                     this.P2PMaster.SendToPeer("regi", pubPeerId);
    252                     GuiLogMessage("Register message sent to Publishing", NotificationLevel.Info);
     378                        timerRegisteringNotPossible = null;
     379                    }
    253380                    break;
    254381                case PubSubMessageType.Alive:
    255                     this.P2PMaster.SendToPeer("aliv", pubPeerId);
    256                     GuiLogMessage("Alive message sent to Publisher",NotificationLevel.Info);
    257                     break;
     382                case PubSubMessageType.Ping:
    258383                case PubSubMessageType.Pong:
    259                     this.P2PMaster.SendToPeer("pong", pubPeerId);
    260                     GuiLogMessage("Pong message sent to Publisher", NotificationLevel.Info);
     384                case PubSubMessageType.Unregister:
     385                    break;
     386                case PubSubMessageType.Solution:
     387                    StopActiveWork();
    261388                    break;
    262389                default:
    263                     break;
    264             }
     390                    GuiLogMessage("No Message sent, because MessageType wasn't supported: " + msgType.ToString(),NotificationLevel.Warning);
     391                    return;
     392            }
     393            this.P2PMaster.SendToPeer(msgType, pubPeerId);
     394
     395            // don't show every single alive message
     396            if(msgType != PubSubMessageType.Alive)
     397                GuiLogMessage(msgType.ToString() + " message sent to Publisher", NotificationLevel.Info);
    265398        }
    266399
     
    269402        private void OnRegisteringNotPossible(object state)
    270403        {
    271             byte[] bytePubId = CheckPublisherAvailability();
     404            string sPubId = CheckPublisherAvailability();
    272405            // if DHT Entry for Task is empty, there exist no Publisher at present.
    273406            // The method PublisherIsAlive starts a Timer for this case to continous proof Publisher-DHT-Entry
    274             if (bytePubId != null)
    275                 SendMsgToPublisher(bytePubId, PubSubMessageType.Register);
     407            if (sPubId != null)
     408                SendMessage(sPubId, PubSubMessageType.Register);
    276409        }
    277410
    278411        private void OnSendAliveMessage(object state)
    279412        {
    280             SendMsgToPublisher(actualPublisher, PubSubMessageType.Alive);
    281         }
    282 
    283         private void P2PMaster_OnPeerReceivedMsg(byte[] byteSourceAddr, string sData)
    284         {
    285             if (sData.Trim() == "ping")
    286             {
    287                 SendMsgToPublisher(byteSourceAddr, PubSubMessageType.Pong);
    288                 GuiLogMessage("REPLIED to a ping message from the publisher", NotificationLevel.Info);
    289             }
    290             else
    291             {
    292                 GuiLogMessage("RECEIVED: Message from '" + P2PMaster.ConvertPeerId(byteSourceAddr)
    293                     + "' with data: '" + sData + "'", NotificationLevel.Info);
    294                 Outputvalue = sData;
    295             }
    296         }
    297 
    298         private byte[] CheckPublisherAvailability()
    299         {
    300             byte[] bytePubId = P2PMaster.DHTload(this.settings.TaskName);
     413            SendMessage(actualPublisher, PubSubMessageType.Alive);
     414        }
     415
     416        private string CheckPublisherAvailability()
     417        {
     418            byte[] bytePubId = P2PMaster.DHTload(this.settings.TopicName);
    301419            if (bytePubId == null)
    302420            {
    303                 if (timerRegisteringNotPossible == null && !bolStopped)
     421                if (timerRegisteringNotPossible == null && !this.bolStopped)
    304422                {
    305423                    // if DHT value doesn't exist at this moment, wait for 10 seconds and try again
     
    308426                return null;
    309427            }
    310             byte[] byteISettings = P2PMaster.DHTload(this.settings.TaskName + this.sDHTSettingsPostfix);
     428
     429            byte[] byteISettings = P2PMaster.DHTload(this.settings.TopicName + this.sDHTSettingsPostfix);
    311430            if (byteISettings == null)
    312431            {
     
    316435            sendAliveMessageInterval = System.BitConverter.ToInt32(byteISettings, 0);
    317436
    318             string sPublisherName = P2PMaster.ConvertPeerId(bytePubId);
    319             GuiLogMessage("RECEIVED: Publishers' peer name '" + sPublisherName + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Info);
     437            string sPubId = UTF8Encoding.UTF8.GetString(bytePubId);
     438            GuiLogMessage("RECEIVED: Publishers' peer name '" + sPubId + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Info);
    320439
    321440            if (actualPublisher == null) //first time initialization
    322                 actualPublisher = bytePubId;
     441                actualPublisher = sPubId;
    323442
    324443            checkPublishersAvailability = this.settings.CheckPublishersAvailability * 1000;
    325444
    326445            // setting timer to check periodical the availability of the publishing peer
    327             if (timerCheckPubAvailability == null && !bolStopped)
     446            if (timerCheckPubAvailability == null && !this.bolStopped)
    328447                timerCheckPubAvailability = new Timer(OnCheckPubAvailability, null, checkPublishersAvailability, checkPublishersAvailability);
    329448
    330             return bytePubId;
     449            return sPubId;
    331450        }
    332451
    333452        private void OnCheckPubAvailability(object state)
    334453        {
    335             byte[] newPubId = CheckPublisherAvailability();
    336 
    337             string sNewPubId = P2PMaster.ConvertPeerId(newPubId);
    338             string sActualPeerId = P2PMaster.ConvertPeerId(actualPublisher);
    339             if (sNewPubId != sActualPeerId)
     454            string sNewPubId = CheckPublisherAvailability();
     455
     456            if (sNewPubId == null)
     457            {
     458                GuiLogMessage("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Warning);
     459                return;
     460            }
     461            if (sNewPubId != actualPublisher)
    340462            {
    341463                //Handle case, when publisher changed or isn't active at present (don't reply on response)
    342                 GuiLogMessage("CHANGED: Publisher from '" + sActualPeerId
     464                GuiLogMessage("CHANGED: Publisher from '" + actualPublisher
    343465                    + "' to '" + sNewPubId + "'!", NotificationLevel.Info);
    344             }
    345             SendMsgToPublisher(newPubId, PubSubMessageType.Ping);
    346             // TODO: handle asynchronous reply or timeout ...
    347             // bool unansweredPingToPub = true;
    348             // //Start Timer with settings-interval to check whether a pong arrived
    349             // this.settings.PublishersReplyTimespan
    350         }
    351 
    352         #endregion
    353 
    354         #region INotifyPropertyChanged Members
    355 
    356         public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged;
    357 
    358         public void OnPropertyChanged(string name)
    359         {
    360             EventsHelper.PropertyChanged(PropertyChanged, this, new PropertyChangedEventArgs(name));
    361         }
    362 
    363         public event PluginProgressChangedEventHandler OnPluginProcessChanged;
    364 
    365         private void ProgressChanged(double value, double max)
    366         {
    367             EventsHelper.ProgressChanged(OnPluginProgressChanged, this, new PluginProgressEventArgs(value, max));
    368         }
    369 
    370         private void GuiLogMessage(string p, NotificationLevel notificationLevel)
    371         {
    372             EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this, new GuiLogEventArgs(p, this, notificationLevel));
    373         }
    374 
     466                actualPublisher = sNewPubId;
     467                // because the publisher has changed, send a new register msg
     468                SendMessage(actualPublisher, PubSubMessageType.Register);
     469            }
     470            else
     471            {
     472                // Timer will be only stopped, when OnMessageReceived-Event received
     473                // a Pong-Response from the publisher!
     474                SendMessage(actualPublisher, PubSubMessageType.Ping);
     475                if (timeoutForPublishersPong == null)
     476                {
     477                    long interval = this.settings.PublishersReplyTimespan * 1000;
     478                    timeoutForPublishersPong = new Timer(OnTimeoutPublishersPong, null, interval, interval);
     479                }
     480            }
     481        }
     482
     483        /// <summary>
     484        /// This callback only get fired, when the publisher didn't sent a response on the register message.
     485        /// </summary>
     486        /// <param name="state"></param>
     487        private void OnTimeoutRegisteringAccepted(object state)
     488        {
     489            GuiLogMessage("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Warning);
     490            // TODO: anything
     491        }
     492
     493        /// <summary>
     494        /// This callback only get fired, when the publisher didn't sent a response on the ping message.
     495        /// </summary>
     496        /// <param name="state"></param>
     497        private void OnTimeoutPublishersPong(object state)
     498        {
     499            GuiLogMessage("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Warning);
     500            timeoutForPublishersPong.Dispose();
     501            timeoutForPublishersPong = null;
     502            // try to get an active publisher and re-register
     503            CheckPublisherAvailability();
     504        }
     505
     506        /// <summary>
     507        /// Will stop all timers, so Subscriber ends with sending
     508        /// Register-, Alive- and Pong-messages. Furthermore an
     509        /// unregister message will be send to the publisher
     510        /// </summary>
     511        private void StopActiveWork()
     512        {
     513            this.bolStopped = true;
     514            if(actualPublisher != null)
     515                SendMessage(actualPublisher,PubSubMessageType.Unregister);
     516
     517            #region stopping all timers, if they are still active
     518            if (this.timerSendingAliveMsg != null)
     519            {
     520                this.timerSendingAliveMsg.Dispose();
     521                this.timerSendingAliveMsg = null;
     522            }
     523            if (this.timerRegisteringNotPossible != null)
     524            {
     525                this.timerRegisteringNotPossible.Dispose();
     526                this.timerRegisteringNotPossible = null;
     527            }
     528            if (this.timerCheckPubAvailability != null)
     529            {
     530                this.timerCheckPubAvailability.Dispose();
     531                this.timerCheckPubAvailability = null;
     532            }
     533            if (this.timeoutForPublishersRegAccept != null)
     534            {
     535                this.timeoutForPublishersRegAccept.Dispose();
     536                this.timeoutForPublishersRegAccept = null;
     537            }
     538            if (this.timeoutForPublishersPong != null)
     539            {
     540                this.timeoutForPublishersPong.Dispose();
     541                this.timeoutForPublishersPong = null;
     542            }
     543            #endregion
     544        }
    375545        #endregion
    376546    }
    377 
    378 
    379547}
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberSettings.cs

    r836 r862  
    1313    public class P2PSubscriberSettings : ISettings
    1414    {
     15        public event TaskPaneAttributeChangedHandler TaskPaneAttributeChanged;
    1516        private bool hasChanges = false;
    1617        private P2PSubscriber p2pSubscriber;
     
    3536            {
    3637            this.p2pSubscriber = p2pSubscriber;
     38            if (TaskPaneAttributeChanged != null)
     39            {
     40                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Collapsed)));
     41                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Collapsed)));
     42                TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Visible)));
     43            }
    3744            }
    3845
    39         private string sTaskName = "NewCompTask";
    40         [TaskPane("Task Name", "Choose the name of a published computational task", null, 0, false, DisplayLevel.Beginner, ControlType.TextBox)]
    41         public string TaskName
     46        private string sTopic = "NewTopic";
     47        [TaskPane("Topic Name", "Choose a topic name with which this subscriber shall be registered.", null, 0, false, DisplayLevel.Beginner, ControlType.TextBox)]
     48        public string TopicName
    4249        {
    43             get { return this.sTaskName; }
     50            get { return this.sTopic; }
    4451            set
    4552            {
    46                 if (this.sTaskName != value && value != String.Empty && value != null)
     53                if (this.sTopic != value && value != String.Empty && value != null)
    4754                {
    48                     this.sTaskName = value;
     55                    this.sTopic = value;
    4956                    HasChanges = true;
    50                     OnPropertyChanged(TaskName);
     57                    OnPropertyChanged(TopicName);
    5158                }
    5259            }
    5360        }
     61
     62        /* FOR TESTING ISSUES */
     63        [TaskPane("Unregister", "Click here to Unregister the publisher from all registered subscribers!", "Control region", 0, true, DisplayLevel.Beginner, ControlType.Button)]
     64        public void BtnUnregister()
     65        {
     66            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Collapsed)));
     67            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Collapsed)));
     68            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Visible)));
     69            OnPropertyChanged("BtnUnregister");
     70        }
     71        [TaskPane("Register", "Click here to Register the publisher pro-active with all formely registered subscribers!", "Control region", 1, true, DisplayLevel.Beginner, ControlType.Button)]
     72        public void BtnRegister()
     73        {
     74            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Visible)));
     75            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Collapsed)));
     76            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Visible)));
     77            OnPropertyChanged("BtnRegister");
     78        }
     79
     80        [TaskPane("Solution found", "TESTING: Emulate solution-found-case!", "Control region", 2, true, DisplayLevel.Beginner, ControlType.Button)]
     81        public void BtnSolutionFound()
     82        {
     83            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnSolutionFound", Visibility.Collapsed)));
     84            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnUnregister", Visibility.Collapsed)));
     85            TaskPaneAttributeChanged(this, new TaskPaneAttributeChangedEventArgs(new TaskPaneAttribteContainer("BtnRegister", Visibility.Visible)));
     86            OnPropertyChanged("BtnSolutionFound");
     87        }
     88        /* FOR TESTING ISSUES */
    5489
    5590        private int checkPublishersAvailability = 60;
     
    72107        }
    73108
    74         private int publishersReplyTimespan = 10;
    75         [TaskPane("Publisher Reply Timespan (in sec)", "When checking publishers availability, ping message is sent. The publisher must answer with a pong message in the timespan!", "Intervals", 0, false, DisplayLevel.Beginner, ControlType.NumericUpDown, ValidationType.RangeInteger, 10, 60)]
     109        private int publishersReplyTimespan = 2;
     110        [TaskPane("Publisher Reply Timespan (in sec)", "When checking publishers availability, ping message is sent. The publisher must answer with a pong message in the timespan!", "Intervals", 0, false, DisplayLevel.Beginner, ControlType.NumericUpDown, ValidationType.RangeInteger, 2, 60)]
    76111        public int PublishersReplyTimespan
    77112        {
  • trunk/CrypPlugins/PeerToPeer_DHTremove/P2PDHTremove.cs

    r813 r862  
    2626using Cryptool.PluginBase.Control;
    2727
     28/*TODO:
     29 * - PAP: dht.Remove wirft Fehler - sollen Dezember abgestellt werden...
     30 *   Daher ist PlugIn derzeit fehlerhaft!
     31 */
    2832namespace Cryptool.Plugins.PeerToPeer
    2933{
     
    136140            if (DhtKey != null)
    137141            {
    138                 P2PMaster.DHTremove(DhtKey);
    139                 GuiLogMessage("Entry with the key '" + DhtKey + "' will be removed from the DHT", NotificationLevel.Info);
     142                if(P2PMaster.DHTremove(DhtKey))
     143                    GuiLogMessage("Entry with the key '" + DhtKey + "' is removed from the DHT", NotificationLevel.Info);
     144                else
     145                    GuiLogMessage("Entry with the key '" + DhtKey + "' isn't removed from the DHT", NotificationLevel.Warning);
    140146            }
    141147            else
Note: See TracChangeset for help on using the changeset viewer.