Ignore:
Timestamp:
Jan 12, 2010, 11:57:12 AM (12 years ago)
Author:
arnold
Message:

Vulminous changes and updates (for example: enabling replacement of the publisher, WPF QuickWatchPresentation in P2PManager, etc.)

Location:
trunk/CrypPlugins/PeerToPeerSubscriber
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriber.cs

    r971 r1068  
    134134            if (e.PropertyName == "BtnRegister")
    135135            {
    136                 RegisterSubscriber();
     136                StartSubscriber();
    137137                GuiLogMessage("Subscriber registers with Publisher!", NotificationLevel.Info);
    138138            }
     
    141141                if (this.p2pSubscriber != null)
    142142                {
    143                     this.p2pSubscriber.SolutionFound("");
     143                    this.p2pSubscriber.SolutionFound(new byte[]{0});
    144144                    GuiLogMessage("Solution found message sent to Publisher.", NotificationLevel.Info);
    145145                }
     
    185185        {
    186186            if(this.p2pSubscriber != null)
    187                 this.p2pSubscriber.Stop(PubSubMessageType.Stop);
     187                this.p2pSubscriber.Stop(PubSubMessageType.Unregister);
    188188        }
    189189
     
    207207            if (this.settings.TopicName != null)
    208208            {
    209                 RegisterSubscriber();
     209                StartSubscriber();
    210210            }
    211211            else
     
    215215        }
    216216
    217         private void RegisterSubscriber()
     217        private void StartSubscriber()
    218218        {
    219219            if (this.p2pSubscriber == null)
     
    222222                this.p2pSubscriber.OnGuiMessage += new P2PSubscriberBase.GuiMessage(p2pSubscriber_OnGuiMessage);
    223223                this.p2pSubscriber.OnTextArrivedFromPublisher += new P2PSubscriberBase.TextArrivedFromPublisher(p2pSubscriber_OnTextArrivedFromPublisher);
    224                 this.p2pSubscriber.Register(this.settings.TopicName, (long)(this.settings.CheckPublishersAvailability * 1000),
     224            }
     225            this.p2pSubscriber.Start(this.settings.TopicName, (long)(this.settings.CheckPublishersAvailability * 1000),
    225226                    (long)(this.settings.PublishersReplyTimespan * 1000));
    226             }
    227             else
    228             {
    229                 this.p2pSubscriber.Register(this.settings.TopicName, (long)(this.settings.CheckPublishersAvailability * 1000),
    230                     (long)(this.settings.PublishersReplyTimespan * 1000));
    231             }
    232         }
    233 
    234         void p2pSubscriber_OnTextArrivedFromPublisher(string sData, PeerId pid)
    235         {
    236             this.Outputvalue = sData;
     227        }
     228
     229        void p2pSubscriber_OnTextArrivedFromPublisher(byte[] data, PeerId pid)
     230        {
     231            this.Outputvalue = UTF8Encoding.UTF8.GetString(data);
    237232        }
    238233
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs

    r1022 r1068  
    2121        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
    2222        public event GuiMessage OnGuiMessage;
    23         public delegate void TextArrivedFromPublisher(string sData, PeerId pid);
     23        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
    2424        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
    2525        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
     
    3131        #region Variables
    3232
     33        private const string DHT_ALIVE_POSTFIX = "AliveMsg";
     34
    3335        private IP2PControl p2pControl;
    34         private string sDHTSettingsPostfix = "Settings";
    3536        private long sendAliveMessageInterval;
    3637        private long checkPublishersAvailability;
     
    9091        /* BEGIN: Only for experimental cases */
    9192
    92         public void SolutionFound(string sSolutionData)
     93        public void SolutionFound(byte[] solutionData)
    9394        {
    9495            SendMessage(actualPublisher, PubSubMessageType.Solution);
    95             this.p2pControl.SendToPeer(sSolutionData, actualPublisher.ToByteArray());
    96         }
     96            this.p2pControl.SendToPeer(solutionData, actualPublisher);
     97        }
     98
    9799        /* END: Only for experimental cases */
    98100
     
    103105        }
    104106
    105         public void Register(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
    106         {
    107             this.p2pControl.OnPeerReceivedMsg += new P2PBase.P2PMessageReceived(MessageReceived);
     107        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
     108        {
     109            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
     110            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
    108111
    109112            this.sTopic = sTopic;
    110113            this.checkPublishersAvailability = checkPublishersAvailability;
    111114            this.publisherReplyTimespan = publishersReplyTimespan;
    112 
     115            Register();
     116        }
     117
     118        private void Register()
     119        {
    113120            // because CheckPublishersAvailability checks this value, set it for the first time here...
    114121            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
     
    132139        }
    133140
    134         private void MessageReceived(PeerId sourceAddr, string sData)
    135         {
    136             if (sourceAddr != actualPublisher)
    137             {
    138                 GuiLogging("RECEIVED message from third party peer (not the publisher!): " + sData.Trim() + ", ID: " + sourceAddr, NotificationLevel.Debug);
     141        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
     142        {
     143            if (sender != actualPublisher)
     144            {
     145                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
    139146                return;
    140147            }
    141 
    142             PubSubMessageType msgType = this.p2pControl.GetMsgType(sData);
    143 
    144148            switch (msgType)
    145149            {
     
    153157                    break;
    154158                case PubSubMessageType.Ping:
    155                     SendMessage(sourceAddr, PubSubMessageType.Pong);
    156                     GuiLogging("REPLIED to a ping message from " + sourceAddr, NotificationLevel.Debug);
     159                    SendMessage(sender, PubSubMessageType.Pong);
     160                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
    157161                    break;
    158162                case PubSubMessageType.Register:
    159163                case PubSubMessageType.Unregister:
    160                     GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Warning);
     164                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
    161165                    // continuously try to get a unregister and than re-register with publisher
    162166                    Stop(msgType);
    163                     Register(this.sTopic,this.checkPublishersAvailability, this.publisherReplyTimespan);
     167                    Register();
    164168                    break;
    165169                case PubSubMessageType.Solution:
     
    178182                    }
    179183                    break;
    180                 // if the received Data couldn't be casted to enum,
    181                 // it must be some data
    182                 case PubSubMessageType.NULL:
    183                     // functionality swapped for better inheritance
    184                     Thread handlingOtherIncomingData = new Thread(new ParameterizedThreadStart(HandleIncomingData));
    185                     handlingOtherIncomingData.Start(new IncomingData(sourceAddr, sData));
    186                     break;
    187184                case PubSubMessageType.Alive:
    188185                default:
     
    192189        }
    193190
    194         // helper class to realize the necessary two parameters for the threading method HandleIncomingMethod
    195         private class IncomingData
    196         {
    197             public PeerId sourceAddr = null;
    198             public string sData = null;
    199 
    200             public IncomingData(PeerId sourceAddr, string sData)
    201             {
    202                 this.sourceAddr = sourceAddr;
    203                 this.sData = sData;
    204             }
    205         }
    206 
    207         private void HandleIncomingData(object incomData)
    208         {
    209             if (incomData is IncomingData)
    210             {
    211                 IncomingData incomingData = incomData as IncomingData;
    212                 HandleIncomingData(incomingData.sourceAddr, incomingData.sData);
    213             }
    214             else
    215             {
    216                 throw (new Exception("Wrong object type in HandleIncomingData"));
    217             }
     191        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
     192        {
     193            if (sender != actualPublisher)
     194            {
     195                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
     196                return;
     197            }
     198            // functionality swapped for better inheritance
     199            HandleIncomingData(sender, data);
    218200        }
    219201
     
    223205        /// <param name="senderId"></param>
    224206        /// <param name="sData"></param>
    225         protected virtual void HandleIncomingData(PeerId senderId, string sData)
     207        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
    226208        {
    227209            GuiLogging("RECEIVED: Message from '" + senderId
    228                     + "' with data: '" + sData + "'", NotificationLevel.Debug);
     210                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
    229211
    230212            if (OnTextArrivedFromPublisher != null)
    231                 OnTextArrivedFromPublisher(sData, senderId);
     213                OnTextArrivedFromPublisher(data, senderId);
    232214        }
    233215
     
    288270        }
    289271
     272        /// <summary>
     273        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
     274        /// a Timer will be started, to check periodically the DHT entry.
     275        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
     276        /// The Timer for periodically checking the Publishers availability is also started here.
     277        /// </summary>
     278        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
    290279        private PeerId CheckPublisherAvailability()
    291280        {
     
    300289                    timerRegisteringNotPossible = new Timer(OnRegisteringNotPossible, null, 10000, 10000);
    301290                }
     291                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
     292                //GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Warning);
    302293                return null;
    303294            }
    304295
    305             byte[] byteISettings = this.p2pControl.DHTload(this.sTopic + this.sDHTSettingsPostfix);
     296            byte[] byteISettings = this.p2pControl.DHTload(this.sTopic + DHT_ALIVE_POSTFIX);
    306297            if (byteISettings == null)
    307298            {
    308                 GuiLogging("Can't find settings from Publisher for the Subscriber.", NotificationLevel.Error);
     299                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
    309300                return null;
    310301            }
     
    313304            pid = this.p2pControl.GetPeerID(bytePubId);
    314305            if (actualPublisher == null) //first time initialization
     306            {
    315307                actualPublisher = pid;
    316 
    317             GuiLogging("RECEIVED: Publishers' peer name '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
     308                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
     309            }
     310            else if (actualPublisher != pid)
     311            {
     312                GuiLogging("Publisher has been changed from ID '" +  actualPublisher + "' to '" + pid + "'", NotificationLevel.Debug);
     313                actualPublisher = pid;
     314                // because the publisher has changed, send a new register msg
     315                SendMessage(actualPublisher, PubSubMessageType.Register);
     316            }
     317
     318            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
    318319
    319320            // setting timer to check periodical the availability of the publishing peer
     
    334335            PeerId newPubId = CheckPublisherAvailability();
    335336
    336             if (newPubId == null)
    337             {
    338                 GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Warning);
    339                 return;
    340             }
    341             if (newPubId != actualPublisher)
    342             {
    343                 //Handle case, when publisher changed or isn't active at present (don't reply on response)
    344                 GuiLogging("CHANGED: Publisher from '" + actualPublisher
    345                     + "' to '" + newPubId + "'!", NotificationLevel.Info);
    346                 actualPublisher = newPubId;
    347                 // because the publisher has changed, send a new register msg
    348                 SendMessage(actualPublisher, PubSubMessageType.Register);
    349             }
    350             else
     337            if (newPubId == actualPublisher)
    351338            {
    352339                // Timer will be only stopped, when OnMessageReceived-Event received
     
    367354        {
    368355            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Warning);
    369             // TODO: anything
     356            // try to register again
     357            Register();
    370358        }
    371359
     
    424412            }
    425413            #endregion
    426 
    427             this.p2pControl.OnPeerReceivedMsg -= MessageReceived;
     414            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
     415
     416            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
     417            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
    428418
    429419            this.Started = false;
     420            GuiLogging("Subscriber is completely stopped",NotificationLevel.Debug);
    430421        }
    431422
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberSettings.cs

    r872 r1068  
    8888        /* FOR TESTING ISSUES */
    8989
    90         private int checkPublishersAvailability = 60;
     90        private int checkPublishersAvailability = 240;
    9191        [TaskPane("Check Publisher Interval (in sec)","To check liveness or possibly changed publishing peer in intervals","Intervals",0,false,DisplayLevel.Beginner,ControlType.NumericUpDown, ValidationType.RangeInteger,20,int.MaxValue)]
    9292        public int CheckPublishersAvailability
     
    107107        }
    108108
    109         private int publishersReplyTimespan = 5;
    110         [TaskPane("Publisher Reply Timespan (in sec)", "When checking publishers availability, ping message is sent. The publisher must answer with a pong message in the timespan!", "Intervals", 0, false, DisplayLevel.Beginner, ControlType.NumericUpDown, ValidationType.RangeInteger, 2, 60)]
     109        private int publishersReplyTimespan = 10;
     110        [TaskPane("Publisher Reply Timespan (in sec)", "When checking publishers availability, ping message is sent. The publisher must answer with a pong message in the timespan!", "Intervals", 0, false, DisplayLevel.Beginner, ControlType.NumericUpDown, ValidationType.RangeInteger, 2, 120)]
    111111        public int PublishersReplyTimespan
    112112        {
Note: See TracChangeset for help on using the changeset viewer.