Changeset 1144


Ignore:
Timestamp:
Feb 13, 2010, 7:56:47 PM (12 years ago)
Author:
arnold
Message:

P2PManager/P2PJobAdmin: Finetuning of leaving and re-joining the network.
Miscellaneous null-Checks implemented, so some p2p-sided errors where catched.

Location:
trunk
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPluginBase/CrypPluginBase.csproj

    r1106 r1144  
    44    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
    55    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
    6     <ProductVersion>9.0.30729</ProductVersion>
     6    <ProductVersion>9.0.21022</ProductVersion>
    77    <SchemaVersion>2.0</SchemaVersion>
    88    <ProjectGuid>{25DB2B47-A457-4EC2-960B-395CE07AE093}</ProjectGuid>
  • trunk/CrypPlugins/KeySearcher_IControl/KeySearcher_IControl.cs

    r1139 r1144  
    116116        void keySearcher_OnBruteforcingEnded(LinkedList<KeySearcher.KeySearcher.ValueKey> top10List)
    117117        {
    118             this.dtEndProcessing = DateTime.Now;
    119             // Create a new JobResult
    120             TimeSpan processingTime = this.dtEndProcessing.Subtract(this.dtStartProcessing);
    121             KeyPatternJobResult jobResult =
    122                 new KeyPatternJobResult(this.JobId, top10List, processingTime);
    123 
    124             GuiLogging("Ended bruteforcing JobId '" + this.JobId.ToString() + "' in "
    125                 + processingTime.TotalMinutes.ToString() + " minutes",NotificationLevel.Info);
    126 
    127             // if registered, sending the serialized Job Result
    128             if (OnProcessingSuccessfullyEnded != null)
    129             {
    130                 OnProcessingSuccessfullyEnded(this.JobId, jobResult.Serialize());
     118            if (this.JobId != null || top10List != null)
     119            {
     120                this.dtEndProcessing = DateTime.Now;
     121                // Create a new JobResult
     122                TimeSpan processingTime = this.dtEndProcessing.Subtract(this.dtStartProcessing);
     123                KeyPatternJobResult jobResult =
     124                    new KeyPatternJobResult(this.JobId, top10List, processingTime);
     125
     126                GuiLogging("Ended bruteforcing JobId '" + this.JobId.ToString() + "' in "
     127                    + processingTime.TotalMinutes.ToString() + " minutes", NotificationLevel.Info);
     128
     129                // if registered, sending the serialized Job Result
     130                if (OnProcessingSuccessfullyEnded != null)
     131                {
     132                    OnProcessingSuccessfullyEnded(this.JobId, jobResult.Serialize());
     133                }
     134            }
     135            else
     136            {
     137                GuiLogging("Bruteforcing was canceled, because jobId and/or jobResult are null.", NotificationLevel.Info);
     138                if (OnProcessingCanceled != null)
     139                    OnProcessingCanceled(null);
    131140            }
    132141        }
  • trunk/CrypPlugins/PeerToPeerBase/IP2PControl.cs

    r1137 r1144  
    7171        Stop = 7,
    7272        /// <summary>
    73         /// because Enum is non-nullable, I used this workaround
     73        /// When a new Publisher A canceled to takeover the Topic of another Publisher B,
     74        /// because B is still active, so A can't takeover functionality of B!
    7475        /// </summary>
    75         NULL = 666
     76        PublisherRivalryProblem = 8,
     77        /// <summary>
     78        /// Only send this msg type, when a fatal error occured at Publisher or Subscriber
     79        /// </summary>
     80        Error = 222
    7681    }
    7782
  • trunk/CrypPlugins/PeerToPeerBase/P2PPeer.cs

    r1137 r1144  
    5151        {
    5252            this.p2pBase = new P2PBase();
    53             // to forward event from overlay/dht MessageReceived-Event from P2PBase
    54             this.p2pBase.OnP2PMessageReceived += new P2PBase.P2PMessageReceived(p2pBase_OnP2PMessageReceived);
    5553            this.settings = new P2PPeerSettings(this);
    5654            this.settings.TaskPaneAttributeChanged += new TaskPaneAttributeChangedHandler(settings_TaskPaneAttributeChanged);
     
    212210
    213211                this.p2pBase.AllowLoggingToMonitor = this.settings.Log2Monitor;
     212
     213                // to forward event from overlay/dht MessageReceived-Event from P2PBase
     214                this.p2pBase.OnP2PMessageReceived += new P2PBase.P2PMessageReceived(p2pBase_OnP2PMessageReceived);
    214215
    215216                if (CheckAndInstallPAPCertificates())
     
    253254                else
    254255                {
     256                    this.p2pBase.OnP2PMessageReceived -= p2pBase_OnP2PMessageReceived;
    255257                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.NotConnected);
    256258                    GuiLogMessage("Peer stopped: " + !this.PeerStarted, NotificationLevel.Info);
  • trunk/CrypPlugins/PeerToPeerBase/PeerToPeerBase.cs

    r1129 r1144  
    675675        public PeerId(OverlayAddress oAddress)
    676676        {
    677             this.stringId = oAddress.ToString();
    678             this.byteId = oAddress.ToByteArray();
    679 
    680             // FNV-1 hashing
    681             uint fnvHash = OFFSET_BASIS;
    682             foreach (byte b in byteId)
    683             {
    684                 fnvHash = (fnvHash * PRIME) ^ b;
    685             }
    686             hashCode = (int)fnvHash;
     677            if (oAddress != null)
     678            {
     679                this.stringId = oAddress.ToString();
     680                this.byteId = oAddress.ToByteArray();
     681
     682                // FNV-1 hashing
     683                uint fnvHash = OFFSET_BASIS;
     684                foreach (byte b in byteId)
     685                {
     686                    fnvHash = (fnvHash * PRIME) ^ b;
     687                }
     688                hashCode = (int)fnvHash;
     689            }
    687690        }
    688691
  • trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs

    r1137 r1144  
    116116        }
    117117
    118         public bool StartManager(string sTopic, long aliveMessageInterval)
    119         {
    120             bool result = false;
     118        public void StartManager(string sTopic, long aliveMessageInterval)
     119        {
    121120            // only when the main manager plugin is connected with a Peer-PlugIn
    122121            // and a IWorkerControl-PlugIn, this Manager can start its work
     
    125124                this.TopicName = sTopic;
    126125                this.AliveMesageInterval = aliveMessageInterval;
    127                 result = base.Start(this.TopicName, this.AliveMesageInterval);
    128 
    129                 this.ManagerStarted = result;
    130 
    131                 GetProgressInformation();
    132             }
    133             GuiLogging("P2PManager starting status: " + result.ToString(), NotificationLevel.Info);
    134             return result;
     126                base.Start(this.TopicName, this.AliveMesageInterval);
     127            }
     128            else
     129            {
     130                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
     131            }
     132        }
     133
     134        protected override void PeerCompletelyStarted()
     135        {
     136            base.PeerCompletelyStarted();
     137
     138            this.ManagerStarted = true;
     139            GetProgressInformation();
     140            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
    135141        }
    136142
     
    141147            this.ManagerStarted = false;
    142148            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
    143 
    144             GuiLogging("P2PManager was stopped successully", NotificationLevel.Info);
     149            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
     150
     151            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
    145152        }
    146153
     
    313320        private void peerManagement_OnFreeWorkersAvailable()
    314321        {
    315             if (!this.managerStarted)
     322            if (!this.ManagerStarted)
    316323            {
    317324                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
    318                 throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register..."));
    319             }
    320 
    321             AllocateJobs();
     325                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
     326                if (removeSettings)
     327                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
     328                else
     329                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
     330            }
     331            else
     332                AllocateJobs();
    322333
    323334            GetProgressInformation();
  • trunk/CrypPlugins/PeerToPeerPublisher/P2PPublisherBase.cs

    r1137 r1144  
    55using Cryptool.PluginBase;
    66using Cryptool.PluginBase.Control;
    7 using System.Threading;
     7//using System.Threading;
     8using System.Timers;
    89using System.IO;
    910
     
    2627        private string topic = String.Empty;
    2728        private Timer timerWaitingForAliveMsg;
     29       
    2830        private PeerId ownPeerId;
    2931        /// <summary>
     
    4648        /// Interval for waiting for other Publishers Pong in milliseconds!
    4749        /// </summary>
    48         const long INTERVAL_WAITING_FOR_OTHER_PUBS_PONG = 10000;
    49         Timer waitingForOtherPublishersPong;
     50        private const long INTERVAL_WAITING_FOR_OTHER_PUBS_PONG = 10000;
     51        private Timer waitingForOtherPublishersPong;
    5052        /// <summary>
    5153        /// if this value is set, you are between the TimeSpan of checking liveness of the other peer.
    5254        /// If Timespan runs out without receiving a Pong-Msg from the other Publisher, assume its functionality
    5355        /// </summary>
    54         PeerId otherPublisherPeer = null;
    55         bool otherPublisherHadResponded = false;
     56        private PeerId otherPublisherPeer = null;
     57        private bool otherPublisherHadResponded = false;
    5658
    5759        #endregion
     
    6062        {
    6163            this.p2pControl = p2pControl;
     64
     65            this.timerWaitingForAliveMsg = new Timer();
     66            this.timerWaitingForAliveMsg.AutoReset = true;
     67            this.timerWaitingForAliveMsg.Elapsed += new ElapsedEventHandler(OnWaitingForAliveMsg);
     68
     69            this.waitingForOtherPublishersPong = new Timer();
     70            this.waitingForOtherPublishersPong.AutoReset = false;
     71            this.waitingForOtherPublishersPong.Interval = INTERVAL_WAITING_FOR_OTHER_PUBS_PONG;
     72            this.waitingForOtherPublishersPong.Elapsed += new ElapsedEventHandler(OnWaitingForOtherPublishersPong);
     73        }
     74
     75        // Publisher-exchange extension - Arnie 2010.02.02
     76        /// <summary>
     77        /// Callback function for waitingForOtherPublishersPong-object. Will be only executed, when a
     78        /// different Publisher-ID was found in the DHT, to check if the "old" Publisher is still
     79        /// alive!
     80        /// </summary>
     81        /// <param name="sender"></param>
     82        /// <param name="e"></param>
     83        private void OnWaitingForOtherPublishersPong(object sender, ElapsedEventArgs e)
     84        {
     85            if (this.otherPublisherHadResponded)
     86            {
     87                GuiLogging("Can't assume functionality of an alive Publishers. So starting this workspace isn't possible!", NotificationLevel.Error);
     88                Stop(PubSubMessageType.PublisherRivalryProblem);
     89            }
     90            else
     91            {
     92                this.waitingForOtherPublishersPong.Stop();
     93                GuiLogging("First trial to assume functionality of the old Publisher.", NotificationLevel.Debug);
     94                // we have to delete all OLD Publishers entries to assume its functionality
     95                DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.topic);
     96                Start(this.topic, this.aliveMessageInterval);
     97            }
     98        }
     99
     100        /// <summary>
     101        /// if peers are outdated (alive message doesn't arrive in the given interval)
     102        /// ping them to give them a second chance
     103        /// </summary>
     104        /// <param name="sender"></param>
     105        /// <param name="e"></param>
     106        private void OnWaitingForAliveMsg(object sender, ElapsedEventArgs e)
     107        {
     108            // get second chance list from SubscribersManagement (Second chance list = The timespans of this subscribers are expired)
     109            List<PeerId> lstOutdatedSubscribers = this.peerManagement.CheckVitality();
     110            foreach (PeerId outdatedSubscriber in lstOutdatedSubscribers)
     111            {
     112                this.p2pControl.SendToPeer(PubSubMessageType.Ping, outdatedSubscriber);
     113                GuiLogging("PING outdated peer " + outdatedSubscriber, NotificationLevel.Debug);
     114            }
    62115        }
    63116
     
    76129        /// <param name="aliveMessageInterval">Declare interval (in sec) in which every subscriber has to send an alive message to the publisher</param>
    77130        /// <returns>true, if writing all necessary information was written in DHT, otherwise false</returns>
    78         public bool Start(string sTopic, long aliveMessageInterval)
    79         {
     131        public void Start(string sTopic, long aliveMessageInterval)
     132        {
     133            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
     134            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
    80135            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
    81136            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
     
    103158                if (byRead != myPeerId)
    104159                {   
    105                     if (this.waitingForOtherPublishersPong == null)
    106                     {
    107                         this.otherPublisherHadResponded = false;
    108                         this.otherPublisherPeer = byRead;
    109                         this.waitingForOtherPublishersPong = new Timer(OnWaitingForOtherPublishersPong,
    110                             null, INTERVAL_WAITING_FOR_OTHER_PUBS_PONG, INTERVAL_WAITING_FOR_OTHER_PUBS_PONG);
    111 
    112                         this.p2pControl.SendToPeer(PubSubMessageType.Ping, byRead);
    113 
    114                         GuiLogging("Another Publisher was found. Waiting for Pong-Response for "
    115                             + INTERVAL_WAITING_FOR_OTHER_PUBS_PONG / 1000 + " seconds. When it won't response "
    116                             + "assume its functionality.", NotificationLevel.Debug);
    117                         return false;
    118                     }
    119                     else
    120                     {
    121                         // if this code will be executed, there's an error in this class logic
    122                         GuiLogging("Can't store Publisher in the DHT because the Entry was already occupied.", NotificationLevel.Error);
    123                         return false;
    124                     }
     160                    this.otherPublisherHadResponded = false;
     161                    this.otherPublisherPeer = byRead;
     162                    this.waitingForOtherPublishersPong.Enabled = true;
     163
     164                    this.p2pControl.SendToPeer(PubSubMessageType.Ping, byRead);
     165
     166                    GuiLogging("Another Publisher was found. Waiting for Pong-Response for "
     167                        + INTERVAL_WAITING_FOR_OTHER_PUBS_PONG / 1000 + " seconds. When it won't response "
     168                        + "assume its functionality.", NotificationLevel.Debug);
     169                    return;
    125170                }
    126171            }
     
    133178            {
    134179                GuiLogging("Storing Publishers ID and/or Publishers Settings wasn't possible.", NotificationLevel.Error);
    135                 return false;
     180                return;
    136181            }
    137182
     
    139184            this.Started = true;
    140185
    141             return true;
    142         }
     186            PeerCompletelyStarted();
     187        }
     188
     189        /// <summary>
     190        /// Will only be thrown, when Publisher has successfully registered in the p2p network!
     191        /// </summary>
     192        protected virtual void PeerCompletelyStarted()
     193        {  }
    143194
    144195        /// <summary>
     
    167218        public virtual void Stop(PubSubMessageType msgType)
    168219        {
    169             if (this.p2pControl != null && this.p2pControl.PeerStarted())
    170             {
    171 
     220            // don't remove informations, when it occurs a rivalry problem between two
     221            // Publishers, because otherwise this will kill the whole topic-solution-network.
     222            if (this.p2pControl != null && this.p2pControl.PeerStarted() && msgType != PubSubMessageType.PublisherRivalryProblem)
     223            {
    172224                GuiLogging("Begin removing the information from the DHT", NotificationLevel.Debug);
    173225
     
    186238            GuiLogging("Stopping all timers.", NotificationLevel.Debug);
    187239
    188             if (this.timerWaitingForAliveMsg != null)
    189             {
    190                 this.timerWaitingForAliveMsg.Dispose();
    191                 this.timerWaitingForAliveMsg = null;
    192             }
    193             // Publisher-exchange extension - Arnie 2010.02.02
    194             if (this.waitingForOtherPublishersPong != null)
    195             {
    196                 this.waitingForOtherPublishersPong.Dispose();
    197                 this.waitingForOtherPublishersPong = null;
    198             }
     240            this.timerWaitingForAliveMsg.Stop();
     241            this.waitingForOtherPublishersPong.Stop();
    199242
    200243            GuiLogging("Deregister message-received-events", NotificationLevel.Debug);
     
    224267                    if (this.peerManagement.Add(sender))
    225268                    {
     269                        if(!this.timerWaitingForAliveMsg.Enabled)
     270                            this.timerWaitingForAliveMsg.Start();
    226271                        GuiLogging("REGISTERED: Peer with ID " + sender + "- RegExepted Msg was sent.", NotificationLevel.Info);
    227272                        this.p2pControl.SendToPeer(PubSubMessageType.RegisteringAccepted, sender);
     
    232277                    }
    233278                    break;
     279                case PubSubMessageType.Stop:
    234280                case PubSubMessageType.Unregister:
    235281                    if (!this.peerManagement.Remove(sender))
     
    252298                    Stop(msgType);
    253299                    break;
    254                 case PubSubMessageType.Stop: //ignore this case. No subscriber can't command the Publisher to stop!
    255                     break;
    256300                default:
    257301                    throw (new NotImplementedException());
    258302            } // end switch
    259             if (timerWaitingForAliveMsg == null)
    260                 timerWaitingForAliveMsg = new Timer(OnWaitingForAliveMsg, null, this.aliveMessageInterval,
    261                     this.aliveMessageInterval);
    262 
    263             if (msgType != PubSubMessageType.Unregister)
     303
     304            if (msgType != PubSubMessageType.Unregister && msgType != PubSubMessageType.Stop)
    264305            {
    265306                if(this.peerManagement.Update(sender))
     
    297338        }
    298339
    299         /// <summary>
    300         /// if peers are outdated (alive message doesn't arrive in the given interval)
    301         /// ping them to give them a second chance
    302         /// </summary>
    303         /// <param name="state"></param>
    304         private void OnWaitingForAliveMsg(object state)
    305         {
    306             // get second chance list from SubscribersManagement (Second chance list = The timespans of this subscribers are expired)
    307             List<PeerId> lstOutdatedSubscribers = this.peerManagement.CheckVitality();
    308             foreach (PeerId outdatedSubscriber in lstOutdatedSubscribers)
    309             {
    310                 this.p2pControl.SendToPeer(PubSubMessageType.Ping, outdatedSubscriber);
    311                 GuiLogging("PING outdated peer " + outdatedSubscriber, NotificationLevel.Debug);
    312             }
    313         }
    314 
    315340        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
    316341        {
     
    325350
    326351        #endregion
    327 
    328         // Publisher-exchange extension - Arnie 2010.02.02
    329         /// <summary>
    330         /// Callback function for waitingForOtherPublishersPong-object. Will be only executed, when a
    331         /// different Publisher-ID was found in the DHT, to check if the "old" Publisher is still
    332         /// alive!
    333         /// </summary>
    334         /// <param name="state"></param>
    335         private void OnWaitingForOtherPublishersPong(object state)
    336         {
    337             if (this.otherPublisherHadResponded)
    338             {
    339                 GuiLogging("Can't assume functionality of an alive Publishers. So starting this workspace isn't possible!", NotificationLevel.Error);
    340             }
    341             else
    342             {
    343                 if (this.waitingForOtherPublishersPong != null)
    344                 {
    345                     this.waitingForOtherPublishersPong.Dispose();
    346                     this.waitingForOtherPublishersPong = null;
    347                 }
    348                 GuiLogging("First trial to assume old Publishers functionality.", NotificationLevel.Debug);
    349                 // we have to delete all OLD Publishers entries to assume its functionality
    350                 DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.topic);
    351                 Start(this.topic, this.aliveMessageInterval);
    352             }
    353         }
    354352
    355353        // Only for testing the (De-)Serialization of SubscribersManagement
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs

    r1139 r1144  
    3838
    3939        /// <summary>
    40         /// If the DHT Entry of the given Task is empty, continous try to
    41         /// find a meanwhile inscribed Publishers PeerID
    42         /// </summary>
    43         private Timer timerRegisteringNotPossible;
    44         /// <summary>
    45         /// For informing the publisher pro-active, that this subscriber
    46         /// is still interested in this Task.
     40        /// Checking liveness, availability and/or changes (new peer) of the Publisher.
     41        /// Retrieves the required DHT entries and initiates the necessary steps
     42        /// </summary>
     43        private Timer timerCheckPubAvailability;
     44        /// <summary>
     45        /// To inform the publisher pro-active, that this subscriber
     46        /// is still alive and interested in this Task, send periodical
     47        /// alive messages.
    4748        /// </summary>
    4849        private Timer timerSendingAliveMsg;
    4950        /// <summary>
    50         /// checking liveness, availability and/or changes (new peer) of the Publisher
    51         /// </summary>
    52         private Timer timerCheckPubAvailability;
    53         /// <summary>
    54         /// this timer gets started when the availability of the publisher,
    55         /// at which the subscriber had registered, is checked. If the timer
    56         /// callback is called and no Pong-message was received, the probability
    57         /// that the Publisher is down is high!
     51        /// This timer gets started when a DHT entry for a Publisher exists and
     52        /// we want to check the liveness of the publisher,
     53        /// at which the subscriber had registered. Therefore we send a Ping message
     54        /// to the Publisher. If the timer callback is called and no Pong-Response was
     55        /// received from the Publisher, the probability is high, that the Publisher is down!
    5856        /// </summary>
    5957        private Timer timeoutForPublishersPong;
    6058        /// <summary>
    61         /// After register message is sent to publisher, this timer gets started.
    62         /// If the publisher doesn't response with a RegisteringAccepted-Message,
    63         /// the probability that the publisher is down is high!
     59        /// After register message is sent to publisher, this timer is started.
     60        /// If the publisher doesn't responds with a RegisteringAccepted-Message,
     61        /// the probability is high, that the publisher is down!
    6462        /// </summary>
    6563        private Timer timeoutForPublishersRegAccept;
     
    108106
    109107            this.timeoutForPublishersPong = new Timer();
     108            this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
     109
    110110            this.timeoutForPublishersRegAccept = new Timer();
     111            this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
     112
    111113            this.timerCheckPubAvailability = new Timer();
    112             this.timerRegisteringNotPossible = new Timer();
     114            this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
     115
    113116            this.timerSendingAliveMsg = new Timer();
     117            this.timerSendingAliveMsg.Elapsed += new ElapsedEventHandler(OnSendAliveMessage);
    114118        }
    115119
    116120        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
    117121        {
    118             /* Initialize all timers */
    119             double pubTimeResponseTimeout = Convert.ToDouble(publishersReplyTimespan);
    120 
    121             this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
    122             this.timeoutForPublishersPong.Elapsed += new ElapsedEventHandler(OnTimeoutPublishersPong);
    123 
    124             this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
    125             this.timeoutForPublishersRegAccept.Elapsed += new ElapsedEventHandler(OnTimeoutRegisteringAccepted);
    126 
    127             this.timerCheckPubAvailability.Interval = Convert.ToDouble(checkPublishersAvailability);
    128             this.timerCheckPubAvailability.Elapsed += new ElapsedEventHandler(OnCheckPubAvailability);
    129 
    130             this.timerRegisteringNotPossible.Interval = Convert.ToDouble(10000);
    131             this.timerRegisteringNotPossible.Elapsed += new ElapsedEventHandler(OnRegisteringNotPossible);
     122            this.actualPublisher = null;
    132123
    133124            this.sTopic = sTopic;
    134125            this.checkPublishersAvailability = checkPublishersAvailability;
    135126            this.publisherReplyTimespan = publishersReplyTimespan;
    136             Register();
    137         }
    138 
    139         private void Register()
    140         {
    141             // Unfortunately you have to register this events every time, because this events will be deregistered, when
    142             // Publisher/Manager sends a Unregister/Stop-Message... There isn't any possibility to check,
    143             // whether the Events are already registered (if(dings != null) or anything else).
    144             this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
    145             this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
     127
     128            #region Initialize network-maintanance-Timers
     129
     130            double pubTimeResponseTimeout = Convert.ToDouble(this.publisherReplyTimespan);
     131
     132            this.timerCheckPubAvailability.Interval = Convert.ToDouble(this.checkPublishersAvailability);
     133            this.timerCheckPubAvailability.AutoReset = true;
     134            this.timeoutForPublishersPong.Interval = pubTimeResponseTimeout;
     135            this.timeoutForPublishersRegAccept.Interval = pubTimeResponseTimeout;
     136
     137            #endregion
     138
    146139            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
    147140            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
    148141
    149             // because CheckPublishersAvailability checks this value, set it for the first time here...
    150             // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
    151             this.bolStopped = false;
    152             PeerId pubId = CheckPublishersAvailability();
    153             // if DHT Entry for the task is empty, no Publisher exists at present.
    154             // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
    155             if (pubId == null)
    156             {
    157                 this.Started = false;
    158                 // if PubId is null, the Publisher isn't started!
    159                 this.bolStopped = true;
    160                 GuiLogging("No publisher for registering found.", NotificationLevel.Info);
    161                 return;
    162             }
    163 
    164             // when the actual publisher differs from the new detected publisher, change it
    165             if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
    166             {
    167                 GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
    168                 actualPublisher = pubId;
    169             }
    170             SendMessage(pubId, PubSubMessageType.Register);
    171             this.started = true;
     142            if (this.p2pControl != null)
     143            {
     144                string sNonrelevant;
     145                PeerId myPeerId = this.p2pControl.GetPeerID(out sNonrelevant);
     146                GuiLogging("Started Subscriber with ID: '" + myPeerId.ToString() + "'", NotificationLevel.Info);
     147            }
     148
     149            this.Started = true;
     150
     151            CheckPublishersAvailability2();
     152        }
     153
     154        private void CheckPublishersAvailability2()
     155        {
     156            // retrieve publisher information from the DHT
     157            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
     158            this.sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
     159
     160            if (pid == null || this.sendAliveMessageInterval == 0)
     161            {
     162                GuiLogging("No Publisher/Manager information for registering found in the DHT.", NotificationLevel.Info);
     163            }
     164            else
     165            {
     166                this.timerSendingAliveMsg.Interval = Convert.ToDouble(this.sendAliveMessageInterval);
     167                this.timerSendingAliveMsg.Start();
     168
     169                if (actualPublisher == null)
     170                {
     171                    GuiLogging("Found a Publisher/Manager with ID '" + pid.ToString() + ", so register with it.", NotificationLevel.Info);
     172                    SendMessage(pid, PubSubMessageType.Register);
     173                    timeoutForPublishersRegAccept.Start();
     174                }
     175                else if (actualPublisher == pid)
     176                {
     177                    // Timer will be only stopped, when OnMessageReceived-Event received
     178                    // a Pong-Response from the publisher!
     179                    SendMessage(pid, PubSubMessageType.Ping);
     180                    this.timeoutForPublishersPong.Start();
     181                    GuiLogging("Successfully checked publishers'/managers' information in the DHT. To check liveness, a Ping message was sended to '" + pid.ToString() + "'.", NotificationLevel.Debug);
     182                }
     183                else
     184                {
     185                    GuiLogging("The Publisher/Manager had changed from '" + this.actualPublisher.ToString()
     186                        + "' to '" + pid.ToString() + "'. Register with the new Publisher/Manager.", NotificationLevel.Info);
     187                    SendMessage(pid, PubSubMessageType.Register);
     188                    timeoutForPublishersRegAccept.Start();
     189                }
     190                this.actualPublisher = pid;
     191            }
     192            this.timerCheckPubAvailability.Enabled = true;
    172193        }
    173194
    174195        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
    175196        {
    176             if (sender != actualPublisher)
     197            if (sender != this.actualPublisher)
    177198            {
    178199                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
     
    194215                    // continuously try to get a unregister and than re-register with publisher
    195216                    Stop(msgType);
    196                     Register();
     217                    CheckPublishersAvailability2();
    197218                    break;
    198219                case PubSubMessageType.Solution:
     
    205226                    break;
    206227                case PubSubMessageType.Pong:
     228                    SendMessage(sender, PubSubMessageType.Register);
    207229                    this.timeoutForPublishersPong.Stop();
    208230                    break;
     
    212234                    break;
    213235            }
    214             // workaround, because Timers.Timer doesn't contains a "Reset" method
     236            // workaround, because Timers.Timer doesn't contains a "Reset" method --> when receiving a
     237            // message from the Publisher, we can reset the "check pub availability"-interval time!
    215238            this.timerCheckPubAvailability.Enabled = false;
    216             this.timerCheckPubAvailability.Enabled = false;
     239            this.timerCheckPubAvailability.Enabled = true;
    217240        }
    218241
     
    247270            {
    248271                case PubSubMessageType.Register:
    249                     // stop "RegisteringNotPossibleTimer
    250                     this.timerRegisteringNotPossible.Stop();
    251272                    // start waiting interval for RegAccept Message
    252                     this.timeoutForPublishersRegAccept.Stop();
     273                    this.timeoutForPublishersRegAccept.Start();
    253274                    break;
    254275                case PubSubMessageType.Alive:
     
    276297            this.p2pControl.SendToPeer(msgType, pubPeerId);
    277298
    278             GuiLogging(msgType.ToString() + " message sent to Publisher", NotificationLevel.Debug);
    279         }
    280 
    281         // registering isn't possible if no publisher has stored
    282         // his ID in the DHT Entry with the Key TaskName
    283         private void OnRegisteringNotPossible(object sender, ElapsedEventArgs e)
    284         {
    285             Register();
    286         }
    287 
    288         private void OnSendAliveMessage(object state)
     299            GuiLogging(msgType.ToString() + " message sent to Publisher ID '" + pubPeerId.ToString() + "'.", NotificationLevel.Debug);
     300        }
     301
     302        private void OnSendAliveMessage(object sender, ElapsedEventArgs e)
    289303        {
    290304            SendMessage(actualPublisher, PubSubMessageType.Alive);
    291         }
    292 
    293         /// <summary>
    294         /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
    295         /// a Timer will be started, to check periodically the DHT entry.
    296         /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
    297         /// The Timer for periodically checking the Publishers availability is also started here.
    298         /// </summary>
    299         /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
    300         private PeerId CheckPublishersAvailability()
    301         {
    302             PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
    303 
    304             if (pid == null)
    305             {
    306                 this.timerRegisteringNotPossible.Start();
    307                 GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
    308                 return null;
    309             }
    310 
    311             sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
    312 
    313             if (sendAliveMessageInterval == 0)
    314             {
    315                 GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
    316                 return null;
    317             }
    318             this.timerSendingAliveMsg.Interval = Convert.ToDouble(sendAliveMessageInterval);
    319             this.timerSendingAliveMsg.Start();
    320 
    321             if (actualPublisher == null) //first time initialization
    322             {
    323                 actualPublisher = pid;
    324                 GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
    325             }
    326 
    327             GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
    328 
    329             this.timerCheckPubAvailability.Start();
    330             // setting timer to check periodical the availability of the publishing peer
    331 
    332             return pid;
    333305        }
    334306
     
    336308        /// Callback for timerCheckPubAvailability (adjustable parameter
    337309        /// in settings, usually every 60 seconds). If another Peer
    338         /// takes over Publishing the Task, this will be handled in this callback, too.
     310        /// takes over Publishing the Task, this and many other things
     311        /// will be initiated here
    339312        /// </summary>
    340313        /// <param name="state"></param>
    341314        private void OnCheckPubAvailability(object sender, ElapsedEventArgs e)
    342315        {
    343             PeerId newPubId = CheckPublishersAvailability();
    344 
    345             if (newPubId == actualPublisher)
    346             {
    347                 // Timer will be only stopped, when OnMessageReceived-Event received
    348                 // a Pong-Response from the publisher!
    349                 SendMessage(actualPublisher, PubSubMessageType.Ping);
    350                 this.timeoutForPublishersPong.Start();
    351             }
     316            CheckPublishersAvailability2();
     317
     318            //PeerId newPubId = CheckPublishersAvailability();
     319
     320            //if (newPubId == actualPublisher)
     321            //{
     322            //    // Timer will be only stopped, when OnMessageReceived-Event received
     323            //    // a Pong-Response from the publisher!
     324            //    SendMessage(actualPublisher, PubSubMessageType.Ping);
     325            //    this.timeoutForPublishersPong.Start();
     326            //}
    352327        }
    353328
     
    360335            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
    361336            // try to register again
    362             Register();
     337           
     338            //Register();
     339            CheckPublishersAvailability2();
    363340        }
    364341
     
    369346        private void OnTimeoutPublishersPong(object sender, ElapsedEventArgs e)
    370347        {
    371             GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Info);
     348            GuiLogging("Publisher didn't respond on subscribers' ping message in the given time span!", NotificationLevel.Info);
    372349            this.timeoutForPublishersPong.Stop();
    373350            // try to get an active publisher and re-register
    374             CheckPublishersAvailability();
     351
     352            //CheckPublishersAvailability();
     353            CheckPublishersAvailability2();
    375354        }
    376355
     
    382361        public void Stop(PubSubMessageType msgType)
    383362        {
    384             if (actualPublisher != null && msgType != PubSubMessageType.NULL)
     363            if (actualPublisher != null)
    385364                SendMessage(actualPublisher, msgType);
    386365
     
    390369            this.timeoutForPublishersRegAccept.Stop();
    391370            this.timerSendingAliveMsg.Stop();
    392             this.timerCheckPubAvailability.Start();
    393371
    394372            //when Sub received a UnReg message, it haven't stop
     
    399377                this.bolStopped = true;
    400378
    401                 this.timerRegisteringNotPossible.Stop();
     379                this.timerCheckPubAvailability.Stop();
    402380
    403381                this.Started = false;
     
    410388                GuiLogging("Publisher/Manager had left the network, waiting for its comeback or takeover by a new Publisher/Manager.", NotificationLevel.Info);
    411389            }
     390
    412391            #endregion
     392
    413393            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
    414394        }
     
    421401    }
    422402}
     403
     404
     405
     406//private void Register()
     407//        {
     408//            // because CheckPublishersAvailability checks this value, set it for the first time here...
     409//            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
     410//            this.bolStopped = false;
     411//            PeerId pubId = CheckPublishersAvailability();
     412//            // if DHT Entry for the task is empty, no Publisher exists at present.
     413//            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
     414//            if (pubId == null)
     415//            {
     416//                this.Started = false;
     417//                // if PubId is null, the Publisher isn't started!
     418//                this.bolStopped = true;
     419//                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
     420//                return;
     421//            }
     422
     423//            // when the actual publisher differs from the new detected publisher, change it
     424//            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
     425//            {
     426//                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
     427//                actualPublisher = pubId;
     428//            }
     429//            SendMessage(pubId, PubSubMessageType.Register);
     430//            this.timeoutForPublishersRegAccept.Start();
     431//            this.started = true;
     432//        }
     433
     434//        /// <summary>
     435//        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
     436//        /// a Timer will be started, to check periodically the DHT entry.
     437//        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
     438//        /// The Timer for periodically checking the Publishers availability is also started here.
     439//        /// </summary>
     440//        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
     441//        private PeerId CheckPublishersAvailability()
     442//        {
     443//            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
     444
     445//            if (pid == null)
     446//            {
     447//                // do nothing, because every time this method will be invoked by
     448//                // the timerCheckPubAvailability-Event, the DHT entry will be checked
     449//                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
     450//                return null;
     451//            }
     452
     453//            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
     454
     455//            if (sendAliveMessageInterval == 0)
     456//            {
     457//                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
     458//                return null;
     459//            }
     460//            this.timerSendingAliveMsg.Interval = Convert.ToDouble(sendAliveMessageInterval);
     461//            this.timerSendingAliveMsg.Start();
     462
     463//            if (actualPublisher == null) //first time initialization
     464//            {
     465//                actualPublisher = pid;
     466//                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
     467//            }
     468
     469//            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
     470
     471//            this.timerCheckPubAvailability.Start();
     472//            // setting timer to check periodical the availability of the publishing peer
     473
     474//            return pid;
     475//        }
  • trunk/CrypPlugins/PeerToPeerWorker_NEW/P2PJobAdminBase.cs

    r1137 r1144  
    2323using Cryptool.Plugins.PeerToPeer.Jobs;
    2424
    25 /* TODO:
    26  * - Check why Register-Messages were send so often
     25/* TWO DIFFERENT STOPPING CASE:
     26 * 1) When P2P-Admin is stopped, deregister WorkerControl-Events, so
     27 *    the (unfinished) JobPart-Result won't be sent to the Manager
     28 *    - Copy registering the WorkerControl-Events to "StartWorkerControl"
     29 *    - Unregister WorkerControl-Events in "StopWorkerControl"
     30 *   
     31 * 2) When P2P-Admin is stopped, the WorkerControl sends the (unfinished)
     32 *    JobPart-Result to the Manager (the Manager can handle this case without
     33 *    any problem).
     34 *    - Copy registering the WorkerControl-Events to the constructor
     35 *    - Comment the unregistering of the WorkerControl-Events in the
     36 *      "StopWorkerControl" method
    2737 */
    2838
     
    7080        {
    7181            this.workerControl = controlWorker;
    72             this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
    73             this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
    74             this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
     82
     83            // see comment above, to know why the following lines are commented
     84            //this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
     85            //this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
     86            //this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
    7587
    7688            this.waitingJobStack = new Stack<byte[]>();
     
    8698            if (!base.Started)
    8799            {
     100                // see comment above, to know why the following lines are uncommented
     101                this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
     102                this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
     103                this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
     104
    88105                // starts subscriber
    89106                base.Start(sTopicName, lCheckPublishersAvailability, lPublishersReplyTimespan);
     
    95112        public void StopWorkerControl(PubSubMessageType msgType)
    96113        {
    97             if (this.workerControl != null)
    98                 this.workerControl.StopProcessing();
    99114            if (base.Started)
     115            {
     116                // see comment above, to know why the following lines are uncommented
     117                this.workerControl.OnProcessingCanceled -= workerControl_OnProcessingCanceled;
     118                this.workerControl.OnProcessingSuccessfullyEnded -= workerControl_OnProcessingSuccessfullyEnded;
     119                this.workerControl.OnInfoTextReceived -= workerControl_OnInfoTextReceived;
    100120                base.Stop(msgType);
     121
     122                // delete the waiting Job List, so after re-registering, this worker
     123                // will process the new incoming jobs and not old jobs, which were
     124                // already pushed to the global Job List of the Manager after receiving
     125                // the unregister message from this worker.
     126                if(this.waitingJobStack != null && this.waitingJobStack.Count > 0)
     127                    this.waitingJobStack.Clear();
     128
     129                if (this.workerControl != null)
     130                    this.workerControl.StopProcessing();
     131                GuiLogging("P2P-Job-Admin is successfully stopped (Unregistering with Manager, Processing of the Worker is stopped)",NotificationLevel.Info);
     132            }
     133            else
     134                GuiLogging("P2P-Job-Admin isn't started yet. So stopping-events won't be executed.", NotificationLevel.Info);
    101135        }
    102136
     
    190224        private void workerControl_OnProcessingSuccessfullyEnded(BigInteger jobId, byte[] result)
    191225        {
    192             base.GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
     226            GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
    193227            this.p2pControl.SendToPeer(JobMessages.CreateJobResultMessage(jobId, result), base.ActualPublisher);
    194228
     
    199233                OnSuccessfullyEnded();
    200234
     235            CheckIfAnyJobsLeft();
     236           
     237        }
     238
     239        private void CheckIfAnyJobsLeft()
     240        {
    201241            if (this.waitingJobStack.Count > 0)
    202242            {
     
    210250                GuiLogging("No jobs in the 'waitingJob'-Stack, so send 'free'-information to the Manager. Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
    211251            }
    212            
    213252        }
    214253
     
    217256            if (OnCanceledWorking != null)
    218257                OnCanceledWorking();
     258            CheckIfAnyJobsLeft();
    219259        }
    220260
Note: See TracChangeset for help on using the changeset viewer.