Ignore:
Timestamp:
Feb 13, 2010, 7:56:47 PM (12 years ago)
Author:
arnold
Message:

P2PManager/P2PJobAdmin: Finetuning of leaving and re-joining the network.
Miscellaneous null-Checks implemented, so some p2p-sided errors where catched.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs

    r1139 r1144  
    3838
    3939        /// <summary>
    40         /// If the DHT Entry of the given Task is empty, continous try to
    41         /// find a meanwhile inscribed Publishers PeerID
    42         /// </summary>
    43         private Timer timerRegisteringNotPossible;
    44         /// <summary>
    45         /// For informing the publisher pro-active, that this subscriber
    46         /// is still interested in this Task.
     40        /// Checking liveness, availability and/or changes (new peer) of the Publisher.
     41        /// Retrieves the required DHT entries and initiates the necessary steps
     42        /// </summary>
     43        private Timer timerCheckPubAvailability;
     44        /// <summary>
     45        /// To inform the publisher pro-active, that this subscriber
     46        /// is still alive and interested in this Task, send periodical
     47        /// alive messages.
    4748        /// </summary>
    4849        private Timer timerSendingAliveMsg;
    4950        /// <summary>
    50         /// checking liveness, availability and/or changes (new peer) of the Publisher
    51         /// </summary>
    52         private Timer timerCheckPubAvailability;
    53         /// <summary>
    54         /// this timer gets started when the availability of the publisher,
    55         /// at which the subscriber had registered, is checked. If the timer
    56         /// callback is called and no Pong-message was received, the probability
    57         /// that the Publisher is down is high!
     51        /// This timer gets started when a DHT entry for a Publisher exists and
     52        /// we want to check the liveness of the publisher,
     53        /// at which the subscriber had registered. Therefore we send a Ping message
     54        /// to the Publisher. If the timer callback is called and no Pong-Response was
     55        /// received from the Publisher, the probability is high, that the Publisher is down!
    5856        /// </summary>
    5957        private Timer timeoutForPublishersPong;
    6058        /// <summary>
    61         /// After register message is sent to publisher, this timer gets started.
    62         /// If the publisher doesn't response with a RegisteringAccepted-Message,
    63         /// the probability that the publisher is down is high!
     59        /// After register message is sent to publisher, this timer is started.
     60        /// If the publisher doesn't responds with a RegisteringAccepted-Message,
     61        /// the probability is high, that the publisher is down!
    6462        /// </summary>
    6563        private Timer timeoutForPublishersRegAccept;
     
    108106
    109107            this.timeoutForPublishersPong = new Timer();
     108            this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
     109
    110110            this.timeoutForPublishersRegAccept = new Timer();
     111            this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
     112
    111113            this.timerCheckPubAvailability = new Timer();
    112             this.timerRegisteringNotPossible = new Timer();
     114            this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
     115
    113116            this.timerSendingAliveMsg = new Timer();
     117            this.timerSendingAliveMsg.Elapsed += new ElapsedEventHandler(OnSendAliveMessage);
    114118        }
    115119
    116120        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
    117121        {
    118             /* Initialize all timers */
    119             double pubTimeResponseTimeout = Convert.ToDouble(publishersReplyTimespan);
    120 
    121             this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
    122             this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
    123 
    124             this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
    125             this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
    126 
    127             this.timerCheckPubAvailability.Interval = Convert.ToDouble(checkPublishersAvailability);
    128             this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
    129 
    130             this.timerRegisteringNotPossible.Interval = Convert.ToDouble(10000);
    131             this.timerRegisteringNotPossible.Elapsed += new ElapsedEventHandler(OnRegisteringNotPossible);
     122            this.actualPublisher = null;
    132123
    133124            this.sTopic = sTopic;
    134125            this.checkPublishersAvailability = checkPublishersAvailability;
    135126            this.publisherReplyTimespan = publishersReplyTimespan;
    136             Register();
    137         }
    138 
    139         private void Register()
    140         {
    141             // Unfortunately you have to register this events every time, because this events will be deregistered, when
    142             // Publisher/Manager sends a Unregister/Stop-Message... There isn't any possibility to check,
    143             // whether the Events are already registered (if(dings != null) or anything else).
    144             this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
    145             this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
     127
     128            #region Initialize network-maintanance-Timers
     129
     130            double pubTimeResponseTimeout = Convert.ToDouble(this.publisherReplyTimespan);
     131
     132            this.timerCheckPubAvailability.Interval = Convert.ToDouble(this.checkPublishersAvailability);
     133            this.timerCheckPubAvailability.AutoReset = true;
     134            this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
     135            this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
     136
     137            #endregion
     138
    146139            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
    147140            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
    148141
    149             // because CheckPublishersAvailability checks this value, set it for the first time here...
    150             // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
    151             this.bolStopped = false;
    152             PeerId pubId = CheckPublishersAvailability();
    153             // if DHT Entry for the task is empty, no Publisher exists at present.
    154             // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
    155             if (pubId == null)
    156             {
    157                 this.Started = false;
    158                 // if PubId is null, the Publisher isn't started!
    159                 this.bolStopped = true;
    160                 GuiLogging("No publisher for registering found.", NotificationLevel.Info);
    161                 return;
    162             }
    163 
    164             // when the actual publisher differs from the new detected publisher, change it
    165             if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
    166             {
    167                 GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
    168                 actualPublisher = pubId;
    169             }
    170             SendMessage(pubId, PubSubMessageType.Register);
    171             this.started = true;
     142            if (this.p2pControl != null)
     143            {
     144                string sNonrelevant;
     145                PeerId myPeerId = this.p2pControl.GetPeerID(out sNonrelevant);
     146                GuiLogging("Started Subscriber with ID: '" + myPeerId.ToString() + "'", NotificationLevel.Info);
     147            }
     148
     149            this.Started = true;
     150
     151            CheckPublishersAvailability2();
     152        }
     153
     154        private void CheckPublishersAvailability2()
     155        {
     156            // retrieve publisher information from the DHT
     157            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
     158            this.sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
     159
     160            if (pid == null || this.sendAliveMessageInterval == 0)
     161            {
     162                GuiLogging("No Publisher/Manager information for registering found in the DHT.", NotificationLevel.Info);
     163            }
     164            else
     165            {
     166                this.timerSendingAliveMsg.Interval = Convert.ToDouble(this.sendAliveMessageInterval);
     167                this.timerSendingAliveMsg.Start();
     168
     169                if (actualPublisher == null)
     170                {
     171                    GuiLogging("Found a Publisher/Manager with ID '" + pid.ToString() + ", so register with it.", NotificationLevel.Info);
     172                    SendMessage(pid, PubSubMessageType.Register);
     173                    timeoutForPublishersRegAccept.Start();
     174                }
     175                else if (actualPublisher == pid)
     176                {
     177                    // Timer will be only stopped, when OnMessageReceived-Event received
     178                    // a Pong-Response from the publisher!
     179                    SendMessage(pid, PubSubMessageType.Ping);
     180                    this.timeoutForPublishersPong.Start();
     181                    GuiLogging("Successfully checked publishers'/managers' information in the DHT. To check liveness, a Ping message was sended to '" + pid.ToString() + "'.", NotificationLevel.Debug);
     182                }
     183                else
     184                {
     185                    GuiLogging("The Publisher/Manager had changed from '" + this.actualPublisher.ToString()
     186                        + "' to '" + pid.ToString() + "'. Register with the new Publisher/Manager.", NotificationLevel.Info);
     187                    SendMessage(pid, PubSubMessageType.Register);
     188                    timeoutForPublishersRegAccept.Start();
     189                }
     190                this.actualPublisher = pid;
     191            }
     192            this.timerCheckPubAvailability.Enabled = true;
    172193        }
    173194
    174195        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
    175196        {
    176             if (sender != actualPublisher)
     197            if (sender != this.actualPublisher)
    177198            {
    178199                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
     
    194215                    // continuously try to get a unregister and than re-register with publisher
    195216                    Stop(msgType);
    196                     Register();
     217                    CheckPublishersAvailability2();
    197218                    break;
    198219                case PubSubMessageType.Solution:
     
    205226                    break;
    206227                case PubSubMessageType.Pong:
     228                    SendMessage(sender, PubSubMessageType.Register);
    207229                    this.timeoutForPublishersPong.Stop();
    208230                    break;
     
    212234                    break;
    213235            }
    214             // workaround, because Timers.Timer doesn't contains a "Reset" method
     236            // workaround, because Timers.Timer doesn't contains a "Reset" method --> when receiving a
     237            // message from the Publisher, we can reset the "check pub availability"-interval time!
    215238            this.timerCheckPubAvailability.Enabled = false;
    216             this.timerCheckPubAvailability.Enabled = false;
     239            this.timerCheckPubAvailability.Enabled = true;
    217240        }
    218241
     
    247270            {
    248271                case PubSubMessageType.Register:
    249                     // stop "RegisteringNotPossibleTimer
    250                     this.timerRegisteringNotPossible.Stop();
    251272                    // start waiting interval for RegAccept Message
    252                     this.timeoutForPublishersRegAccept.Stop();
     273                    this.timeoutForPublishersRegAccept.Start();
    253274                    break;
    254275                case PubSubMessageType.Alive:
     
    276297            this.p2pControl.SendToPeer(msgType, pubPeerId);
    277298
    278             GuiLogging(msgType.ToString() + " message sent to Publisher", NotificationLevel.Debug);
    279         }
    280 
    281         // registering isn't possible if no publisher has stored
    282         // his ID in the DHT Entry with the Key TaskName
    283         private void OnRegisteringNotPossible(object sender, ElapsedEventArgs e)
    284         {
    285             Register();
    286         }
    287 
    288         private void OnSendAliveMessage(object state)
     299            GuiLogging(msgType.ToString() + " message sent to Publisher ID '" + pubPeerId.ToString() + "'.", NotificationLevel.Debug);
     300        }
     301
     302        private void OnSendAliveMessage(object sender, ElapsedEventArgs e)
    289303        {
    290304            SendMessage(actualPublisher, PubSubMessageType.Alive);
    291         }
    292 
    293         /// <summary>
    294         /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
    295         /// a Timer will be started, to check periodically the DHT entry.
    296         /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
    297         /// The Timer for periodically checking the Publishers availability is also started here.
    298         /// </summary>
    299         /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
    300         private PeerId CheckPublishersAvailability()
    301         {
    302             PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
    303 
    304             if (pid == null)
    305             {
    306                 this.timerRegisteringNotPossible.Start();
    307                 GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
    308                 return null;
    309             }
    310 
    311             sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
    312 
    313             if (sendAliveMessageInterval == 0)
    314             {
    315                 GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
    316                 return null;
    317             }
    318             this.timerSendingAliveMsg.Interval = Convert.ToDouble(sendAliveMessageInterval);
    319             this.timerSendingAliveMsg.Start();
    320 
    321             if (actualPublisher == null) //first time initialization
    322             {
    323                 actualPublisher = pid;
    324                 GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
    325             }
    326 
    327             GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
    328 
    329             this.timerCheckPubAvailability.Start();
    330             // setting timer to check periodical the availability of the publishing peer
    331 
    332             return pid;
    333305        }
    334306
     
    336308        /// Callback for timerCheckPubAvailability (adjustable parameter
    337309        /// in settings, usually every 60 seconds). If another Peer
    338         /// takes over Publishing the Task, this will be handled in this callback, too.
     310        /// takes over Publishing the Task, this and many other things
     311        /// will be initiated here
    339312        /// </summary>
    340313        /// <param name="state"></param>
    341314        private void OnCheckPubAvailability(object sender, ElapsedEventArgs e)
    342315        {
    343             PeerId newPubId = CheckPublishersAvailability();
    344 
    345             if (newPubId == actualPublisher)
    346             {
    347                 // Timer will be only stopped, when OnMessageReceived-Event received
    348                 // a Pong-Response from the publisher!
    349                 SendMessage(actualPublisher, PubSubMessageType.Ping);
    350                 this.timeoutForPublishersPong.Start();
    351             }
     316            CheckPublishersAvailability2();
     317
     318            //PeerId newPubId = CheckPublishersAvailability();
     319
     320            //if (newPubId == actualPublisher)
     321            //{
     322            //    // Timer will be only stopped, when OnMessageReceived-Event received
     323            //    // a Pong-Response from the publisher!
     324            //    SendMessage(actualPublisher, PubSubMessageType.Ping);
     325            //    this.timeoutForPublishersPong.Start();
     326            //}
    352327        }
    353328
     
    360335            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
    361336            // try to register again
    362             Register();
     337           
     338            //Register();
     339            CheckPublishersAvailability2();
    363340        }
    364341
     
    369346        private void OnTimeoutPublishersPong(object sender, ElapsedEventArgs e)
    370347        {
    371             GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Info);
     348            GuiLogging("Publisher didn't respond on subscribers' ping message in the given time span!", NotificationLevel.Info);
    372349            this.timeoutForPublishersPong.Stop();
    373350            // try to get an active publisher and re-register
    374             CheckPublishersAvailability();
     351
     352            //CheckPublishersAvailability();
     353            CheckPublishersAvailability2();
    375354        }
    376355
     
    382361        public void Stop(PubSubMessageType msgType)
    383362        {
    384             if (actualPublisher != null && msgType != PubSubMessageType.NULL)
     363            if (actualPublisher != null)
    385364                SendMessage(actualPublisher, msgType);
    386365
     
    390369            this.timeoutForPublishersRegAccept.Stop();
    391370            this.timerSendingAliveMsg.Stop();
    392             this.timerCheckPubAvailability.Start();
    393371
    394372            //when Sub received a UnReg message, it haven't stop
     
    399377                this.bolStopped = true;
    400378
    401                 this.timerRegisteringNotPossible.Stop();
     379                this.timerCheckPubAvailability.Stop();
    402380
    403381                this.Started = false;
     
    410388                GuiLogging("Publisher/Manager had left the network, waiting for its comeback or takeover by a new Publisher/Manager.", NotificationLevel.Info);
    411389            }
     390
    412391            #endregion
     392
    413393            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
    414394        }
     
    421401    }
    422402}
     403
     404
     405
     406//private void Register()
     407//        {
     408//            // because CheckPublishersAvailability checks this value, set it for the first time here...
     409//            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
     410//            this.bolStopped = false;
     411//            PeerId pubId = CheckPublishersAvailability();
     412//            // if DHT Entry for the task is empty, no Publisher exists at present.
     413//            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
     414//            if (pubId == null)
     415//            {
     416//                this.Started = false;
     417//                // if PubId is null, the Publisher isn't started!
     418//                this.bolStopped = true;
     419//                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
     420//                return;
     421//            }
     422
     423//            // when the actual publisher differs from the new detected publisher, change it
     424//            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
     425//            {
     426//                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
     427//                actualPublisher = pubId;
     428//            }
     429//            SendMessage(pubId, PubSubMessageType.Register);
     430//            this.timeoutForPublishersRegAccept.Start();
     431//            this.started = true;
     432//        }
     433
     434//        /// <summary>
     435//        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
     436//        /// a Timer will be started, to check periodically the DHT entry.
     437//        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
     438//        /// The Timer for periodically checking the Publishers availability is also started here.
     439//        /// </summary>
     440//        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
     441//        private PeerId CheckPublishersAvailability()
     442//        {
     443//            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
     444
     445//            if (pid == null)
     446//            {
     447//                // do nothing, because every time this method will be invoked by
     448//                // the timerCheckPubAvailability-Event, the DHT entry will be checked
     449//                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
     450//                return null;
     451//            }
     452
     453//            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
     454
     455//            if (sendAliveMessageInterval == 0)
     456//            {
     457//                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
     458//                return null;
     459//            }
     460//            this.timerSendingAliveMsg.Interval = Convert.ToDouble(sendAliveMessageInterval);
     461//            this.timerSendingAliveMsg.Start();
     462
     463//            if (actualPublisher == null) //first time initialization
     464//            {
     465//                actualPublisher = pid;
     466//                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
     467//            }
     468
     469//            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
     470
     471//            this.timerCheckPubAvailability.Start();
     472//            // setting timer to check periodical the availability of the publishing peer
     473
     474//            return pid;
     475//        }
Note: See TracChangeset for help on using the changeset viewer.