Ignore:
Timestamp:
Feb 13, 2010, 7:56:47 PM (12 years ago)
Author:
arnold
Message:

P2PManager/P2PJobAdmin: Finetuning of leaving and re-joining the network.
Miscellaneous null-Checks implemented, so some p2p-sided errors where catched.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerPublisher/P2PPublisherBase.cs

    r1137 r1144  
    55using Cryptool.PluginBase;
    66using Cryptool.PluginBase.Control;
    7 using System.Threading;
     7//using System.Threading;
     8using System.Timers;
    89using System.IO;
    910
     
    2627        private string topic = String.Empty;
    2728        private Timer timerWaitingForAliveMsg;
     29       
    2830        private PeerId ownPeerId;
    2931        /// <summary>
     
    4648        /// Interval for waiting for other Publishers Pong in milliseconds!
    4749        /// </summary>
    48         const long INTERVAL_WAITING_FOR_OTHER_PUBS_PONG = 10000;
    49         Timer waitingForOtherPublishersPong;
     50        private const long INTERVAL_WAITING_FOR_OTHER_PUBS_PONG = 10000;
     51        private Timer waitingForOtherPublishersPong;
    5052        /// <summary>
    5153        /// if this value is set, you are between the TimeSpan of checking liveness of the other peer.
    5254        /// If Timespan runs out without receiving a Pong-Msg from the other Publisher, assume its functionality
    5355        /// </summary>
    54         PeerId otherPublisherPeer = null;
    55         bool otherPublisherHadResponded = false;
     56        private PeerId otherPublisherPeer = null;
     57        private bool otherPublisherHadResponded = false;
    5658
    5759        #endregion
     
    6062        {
    6163            this.p2pControl = p2pControl;
     64
     65            this.timerWaitingForAliveMsg = new Timer();
     66            this.timerWaitingForAliveMsg.AutoReset = true;
     67            this.timerWaitingForAliveMsg.Elapsed += new ElapsedEventHandler(OnWaitingForAliveMsg);
     68
     69            this.waitingForOtherPublishersPong = new Timer();
     70            this.waitingForOtherPublishersPong.AutoReset = false;
     71            this.waitingForOtherPublishersPong.Interval = INTERVAL_WAITING_FOR_OTHER_PUBS_PONG;
     72            this.waitingForOtherPublishersPong.Elapsed += new ElapsedEventHandler(OnWaitingForOtherPublishersPong);
     73        }
     74
     75        // Publisher-exchange extension - Arnie 2010.02.02
     76        /// <summary>
     77        /// Callback function for waitingForOtherPublishersPong-object. Will be only executed, when a
     78        /// different Publisher-ID was found in the DHT, to check if the "old" Publisher is still
     79        /// alive!
     80        /// </summary>
     81        /// <param name="sender"></param>
     82        /// <param name="e"></param>
     83        private void OnWaitingForOtherPublishersPong(object sender, ElapsedEventArgs e)
     84        {
     85            if (this.otherPublisherHadResponded)
     86            {
     87                GuiLogging("Can't assume functionality of an alive Publishers. So starting this workspace isn't possible!", NotificationLevel.Error);
     88                Stop(PubSubMessageType.PublisherRivalryProblem);
     89            }
     90            else
     91            {
     92                this.waitingForOtherPublishersPong.Stop();
     93                GuiLogging("First trial to assume functionality of the old Publisher.", NotificationLevel.Debug);
     94                // we have to delete all OLD Publishers entries to assume its functionality
     95                DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.topic);
     96                Start(this.topic, this.aliveMessageInterval);
     97            }
     98        }
     99
     100        /// <summary>
     101        /// if peers are outdated (alive message doesn't arrive in the given interval)
     102        /// ping them to give them a second chance
     103        /// </summary>
     104        /// <param name="sender"></param>
     105        /// <param name="e"></param>
     106        private void OnWaitingForAliveMsg(object sender, ElapsedEventArgs e)
     107        {
     108            // get second chance list from SubscribersManagement (Second chance list = The timespans of this subscribers are expired)
     109            List<PeerId> lstOutdatedSubscribers = this.peerManagement.CheckVitality();
     110            foreach (PeerId outdatedSubscriber in lstOutdatedSubscribers)
     111            {
     112                this.p2pControl.SendToPeer(PubSubMessageType.Ping, outdatedSubscriber);
     113                GuiLogging("PING outdated peer " + outdatedSubscriber, NotificationLevel.Debug);
     114            }
    62115        }
    63116
     
    76129        /// <param name="aliveMessageInterval">Declare interval (in sec) in which every subscriber has to send an alive message to the publisher</param>
    77130        /// <returns>true, if writing all necessary information was written in DHT, otherwise false</returns>
    78         public bool Start(string sTopic, long aliveMessageInterval)
    79         {
     131        public void Start(string sTopic, long aliveMessageInterval)
     132        {
     133            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
     134            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
    80135            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
    81136            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
     
    103158                if (byRead != myPeerId)
    104159                {   
    105                     if (this.waitingForOtherPublishersPong == null)
    106                     {
    107                         this.otherPublisherHadResponded = false;
    108                         this.otherPublisherPeer = byRead;
    109                         this.waitingForOtherPublishersPong = new Timer(OnWaitingForOtherPublishersPong,
    110                             null, INTERVAL_WAITING_FOR_OTHER_PUBS_PONG, INTERVAL_WAITING_FOR_OTHER_PUBS_PONG);
    111 
    112                         this.p2pControl.SendToPeer(PubSubMessageType.Ping, byRead);
    113 
    114                         GuiLogging("Another Publisher was found. Waiting for Pong-Response for "
    115                             + INTERVAL_WAITING_FOR_OTHER_PUBS_PONG / 1000 + " seconds. When it won't response "
    116                             + "assume its functionality.", NotificationLevel.Debug);
    117                         return false;
    118                     }
    119                     else
    120                     {
    121                         // if this code will be executed, there's an error in this class logic
    122                         GuiLogging("Can't store Publisher in the DHT because the Entry was already occupied.", NotificationLevel.Error);
    123                         return false;
    124                     }
     160                    this.otherPublisherHadResponded = false;
     161                    this.otherPublisherPeer = byRead;
     162                    this.waitingForOtherPublishersPong.Enabled = true;
     163
     164                    this.p2pControl.SendToPeer(PubSubMessageType.Ping, byRead);
     165
     166                    GuiLogging("Another Publisher was found. Waiting for Pong-Response for "
     167                        + INTERVAL_WAITING_FOR_OTHER_PUBS_PONG / 1000 + " seconds. When it won't response "
     168                        + "assume its functionality.", NotificationLevel.Debug);
     169                    return;
    125170                }
    126171            }
     
    133178            {
    134179                GuiLogging("Storing Publishers ID and/or Publishers Settings wasn't possible.", NotificationLevel.Error);
    135                 return false;
     180                return;
    136181            }
    137182
     
    139184            this.Started = true;
    140185
    141             return true;
    142         }
     186            PeerCompletelyStarted();
     187        }
     188
     189        /// <summary>
     190        /// Will only be thrown, when Publisher has successfully registered in the p2p network!
     191        /// </summary>
     192        protected virtual void PeerCompletelyStarted()
     193        {  }
    143194
    144195        /// <summary>
     
    167218        public virtual void Stop(PubSubMessageType msgType)
    168219        {
    169             if (this.p2pControl != null && this.p2pControl.PeerStarted())
    170             {
    171 
     220            // don't remove informations, when it occurs a rivalry problem between two
     221            // Publishers, because otherwise this will kill the whole topic-solution-network.
     222            if (this.p2pControl != null && this.p2pControl.PeerStarted() && msgType != PubSubMessageType.PublisherRivalryProblem)
     223            {
    172224                GuiLogging("Begin removing the information from the DHT", NotificationLevel.Debug);
    173225
     
    186238            GuiLogging("Stopping all timers.", NotificationLevel.Debug);
    187239
    188             if (this.timerWaitingForAliveMsg != null)
    189             {
    190                 this.timerWaitingForAliveMsg.Dispose();
    191                 this.timerWaitingForAliveMsg = null;
    192             }
    193             // Publisher-exchange extension - Arnie 2010.02.02
    194             if (this.waitingForOtherPublishersPong != null)
    195             {
    196                 this.waitingForOtherPublishersPong.Dispose();
    197                 this.waitingForOtherPublishersPong = null;
    198             }
     240            this.timerWaitingForAliveMsg.Stop();
     241            this.waitingForOtherPublishersPong.Stop();
    199242
    200243            GuiLogging("Deregister message-received-events", NotificationLevel.Debug);
     
    224267                    if (this.peerManagement.Add(sender))
    225268                    {
     269                        if(!this.timerWaitingForAliveMsg.Enabled)
     270                            this.timerWaitingForAliveMsg.Start();
    226271                        GuiLogging("REGISTERED: Peer with ID " + sender + "- RegExepted Msg was sent.", NotificationLevel.Info);
    227272                        this.p2pControl.SendToPeer(PubSubMessageType.RegisteringAccepted, sender);
     
    232277                    }
    233278                    break;
     279                case PubSubMessageType.Stop:
    234280                case PubSubMessageType.Unregister:
    235281                    if (!this.peerManagement.Remove(sender))
     
    252298                    Stop(msgType);
    253299                    break;
    254                 case PubSubMessageType.Stop: //ignore this case. No subscriber can't command the Publisher to stop!
    255                     break;
    256300                default:
    257301                    throw (new NotImplementedException());
    258302            } // end switch
    259             if (timerWaitingForAliveMsg == null)
    260                 timerWaitingForAliveMsg = new Timer(OnWaitingForAliveMsg, null, this.aliveMessageInterval,
    261                     this.aliveMessageInterval);
    262 
    263             if (msgType != PubSubMessageType.Unregister)
     303
     304            if (msgType != PubSubMessageType.Unregister && msgType != PubSubMessageType.Stop)
    264305            {
    265306                if(this.peerManagement.Update(sender))
     
    297338        }
    298339
    299         /// <summary>
    300         /// if peers are outdated (alive message doesn't arrive in the given interval)
    301         /// ping them to give them a second chance
    302         /// </summary>
    303         /// <param name="state"></param>
    304         private void OnWaitingForAliveMsg(object state)
    305         {
    306             // get second chance list from SubscribersManagement (Second chance list = The timespans of this subscribers are expired)
    307             List<PeerId> lstOutdatedSubscribers = this.peerManagement.CheckVitality();
    308             foreach (PeerId outdatedSubscriber in lstOutdatedSubscribers)
    309             {
    310                 this.p2pControl.SendToPeer(PubSubMessageType.Ping, outdatedSubscriber);
    311                 GuiLogging("PING outdated peer " + outdatedSubscriber, NotificationLevel.Debug);
    312             }
    313         }
    314 
    315340        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
    316341        {
     
    325350
    326351        #endregion
    327 
    328         // Publisher-exchange extension - Arnie 2010.02.02
    329         /// <summary>
    330         /// Callback function for waitingForOtherPublishersPong-object. Will be only executed, when a
    331         /// different Publisher-ID was found in the DHT, to check if the "old" Publisher is still
    332         /// alive!
    333         /// </summary>
    334         /// <param name="state"></param>
    335         private void OnWaitingForOtherPublishersPong(object state)
    336         {
    337             if (this.otherPublisherHadResponded)
    338             {
    339                 GuiLogging("Can't assume functionality of an alive Publishers. So starting this workspace isn't possible!", NotificationLevel.Error);
    340             }
    341             else
    342             {
    343                 if (this.waitingForOtherPublishersPong != null)
    344                 {
    345                     this.waitingForOtherPublishersPong.Dispose();
    346                     this.waitingForOtherPublishersPong = null;
    347                 }
    348                 GuiLogging("First trial to assume old Publishers functionality.", NotificationLevel.Debug);
    349                 // we have to delete all OLD Publishers entries to assume its functionality
    350                 DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.topic);
    351                 Start(this.topic, this.aliveMessageInterval);
    352             }
    353         }
    354352
    355353        // Only for testing the (De-)Serialization of SubscribersManagement
Note: See TracChangeset for help on using the changeset viewer.