Ignore:
Timestamp:
Feb 11, 2010, 3:48:49 PM (12 years ago)
Author:
arnold
Message:

Completely redesigned Manager-JobAdmin-Worker-infrastructure to distribute Jobs with a Peer-to-Peer infrastructure to remote CT2-Workspaces.

To test this infrastructure, open 2 instances of CT and load P2P_Manager_NEW_DES.cte and in the other instance P2P_Worker_NEW.cte.
HINT: Working with remote peers isn't possible every time, because the so called "SuperNode", which is necessary for relaying, sometimes goes down. But testing this infrastructure on different computers in the same network should work every time.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberBase.cs

    r1130 r1137  
    6969        /// </summary>
    7070        private PeerId actualPublisher;
     71        public PeerId ActualPublisher
     72        {
     73            get { return this.actualPublisher; }
     74            set { this.actualPublisher = value; }
     75        }
     76
    7177        /// <summary>
    7278        /// if true, check whether a new Publisher is the actual one
     
    7985        /// Status flag which contains the state of the Subscriber
    8086        /// </summary>
    81         public bool Started 
    82         {
    83             get { return this.started;  }
    84             private set { this.started = value;  }
     87        public bool Started
     88        {
     89            get { return this.started; }
     90            private set { this.started = value; }
    8591        }
    8692
     
    96102
    97103        /* END: Only for experimental cases */
    98 
    99104
    100105        public P2PSubscriberBase(IP2PControl p2pControl)
     
    243248                case PubSubMessageType.Pong:
    244249                case PubSubMessageType.Unregister:
    245                 case PubSubMessageType.Stop: 
     250                case PubSubMessageType.Stop:
    246251                case PubSubMessageType.Solution:
    247252                    break;
     
    260265
    261266        // registering isn't possible if no publisher has stored
    262         // its ID in the DHT Entry with the Key TaskName
     267        // his ID in the DHT Entry with the Key TaskName
    263268        private void OnRegisteringNotPossible(object state)
    264269        {
     
    306311                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
    307312            }
    308             //else if (actualPublisher != pid)
    309             //{
    310             //    GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
    311             //    SendMessage(pubId, PubSubMessageType.Register);
    312             //    actualPublisher = pid;
    313             //}
    314313
    315314            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
     
    342341                }
    343342            }
    344             if (newPubId != actualPublisher)
    345                 Register();
    346343        }
    347344
     
    363360        private void OnTimeoutPublishersPong(object state)
    364361        {
    365             GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Warning);
     362            GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Info);
    366363            if (timeoutForPublishersPong != null)
    367364            {
     
    417414
    418415            this.Started = false;
    419             GuiLogging("Subscriber is completely stopped",NotificationLevel.Debug);
     416            GuiLogging("Subscriber is completely stopped", NotificationLevel.Debug);
    420417        }
    421418
     
    427424    }
    428425}
     426
     427
     428//using System;
     429//using System.Collections.Generic;
     430//using System.Linq;
     431//using System.Text;
     432//using Cryptool.PluginBase.Control;
     433//using Cryptool.PluginBase;
     434//using System.Threading;
     435
     436///*
     437// * All Subscriber functions work problem-free!
     438// *
     439// * IDEAS:
     440// * - Publisher takes subscriber list out of the DHT and registered
     441// *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
     442// */
     443
     444//namespace Cryptool.Plugins.PeerToPeer
     445//{
     446//    public class P2PSubscriberBase
     447//    {
     448//        public delegate void GuiMessage(string sData, NotificationLevel notificationLevel);
     449//        public event GuiMessage OnGuiMessage;
     450//        public delegate void TextArrivedFromPublisher(byte[] data, PeerId pid);
     451//        public event TextArrivedFromPublisher OnTextArrivedFromPublisher;
     452//        public delegate void ReceivedStopFromPublisher(PubSubMessageType stopType, string sData);
     453//        /// <summary>
     454//        /// fired when Manager sent "stop" message to the worker.
     455//        /// </summary>
     456//        public event ReceivedStopFromPublisher OnReceivedStopMessageFromPublisher;
     457
     458//        #region Variables
     459
     460//        protected IP2PControl p2pControl;
     461//        private long sendAliveMessageInterval;
     462//        private long checkPublishersAvailability;
     463//        private long publisherReplyTimespan;
     464//        private string sTopic;
     465
     466//        /// <summary>
     467//        /// If the DHT Entry of the given Task is empty, continous try to
     468//        /// find a meanwhile inscribed Publishers PeerID
     469//        /// </summary>
     470//        private Timer timerRegisteringNotPossible;
     471//        /// <summary>
     472//        /// For informing the publisher pro-active, that this subscriber
     473//        /// is still interested in this Task.
     474//        /// </summary>
     475//        private Timer timerSendingAliveMsg;
     476//        /// <summary>
     477//        /// checking liveness, availability and/or changes (new peer) of the Publisher
     478//        /// </summary>
     479//        private Timer timerCheckPubAvailability;
     480//        /// <summary>
     481//        /// this timer gets started when the availability of the publisher,
     482//        /// at which the subscriber had registered, is checked. If the timer
     483//        /// callback is called and no Pong-message was received, the probability
     484//        /// that the Publisher is down is high!
     485//        /// </summary>
     486//        private Timer timeoutForPublishersPong;
     487//        /// <summary>
     488//        /// After register message is sent to publisher, this timer gets started.
     489//        /// If the publisher doesn't response with a RegisteringAccepted-Message,
     490//        /// the probability that the publisher is down is high!
     491//        /// </summary>
     492//        private Timer timeoutForPublishersRegAccept;
     493//        /// <summary>
     494//        /// PeerID of the actual publisher. This ID is will be checked continious
     495//        /// on liveliness and/or updated if Publisher had changed.
     496//        /// </summary>
     497//        private PeerId actualPublisher;
     498//        /// <summary>
     499//        /// PeerID of the actual publisher. This ID is will be checked continious
     500//        /// on liveliness and/or updated if Publisher had changed
     501//        /// </summary>
     502//        public PeerId ActualPublisher
     503//        {
     504//            get { return this.actualPublisher; }
     505//        }
     506
     507//        /// <summary>
     508//        /// if true, check whether a new Publisher is the actual one
     509//        /// and renew Settings
     510//        /// </summary>
     511//        private bool bolStopped = true;
     512
     513//        private bool started = false;
     514//        /// <summary>
     515//        /// Status flag which contains the state of the Subscriber
     516//        /// </summary>
     517//        public bool Started
     518//        {
     519//            get { return this.started;  }
     520//            private set { this.started = value;  }
     521//        }
     522
     523//        #endregion
     524
     525//        /* BEGIN: Only for experimental cases */
     526
     527//        public void SolutionFound(byte[] solutionData)
     528//        {
     529//            SendMessage(actualPublisher, PubSubMessageType.Solution);
     530//            this.p2pControl.SendToPeer(solutionData, actualPublisher);
     531//        }
     532
     533//        /* END: Only for experimental cases - 2010.02.07  */
     534
     535//        /* BEGIN: Added for P2PJobAdmin */
     536//        public P2PSubscriberBase()
     537//        {
     538//        }
     539
     540//        public void Start(IP2PControl p2pControl, string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
     541//        {
     542//            this.p2pControl = p2pControl;
     543//            Start(sTopic, checkPublishersAvailability, publisherReplyTimespan);
     544//        }
     545//        /* END: Added for P2PJobAdmin - 2010.02.07 */
     546
     547//        public P2PSubscriberBase(IP2PControl p2pControl)
     548//        {
     549//            this.p2pControl = p2pControl;
     550//        }     
     551
     552//        public void Start(string sTopic, long checkPublishersAvailability, long publishersReplyTimespan)
     553//        {
     554//            this.sTopic = sTopic;
     555//            this.checkPublishersAvailability = checkPublishersAvailability;
     556//            this.publisherReplyTimespan = publishersReplyTimespan;
     557//            Register();
     558//        }
     559
     560//        private void Register()
     561//        {
     562//            // Unfortunately you have to register this events every time, because this events will be deregistered, when
     563//            // Publisher/Manager sends a Unregister/Stop-Message... There isn't any possibility to check,
     564//            // whether the Events are already registered (if(dings != null) or anything else).
     565//            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
     566//            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
     567//            this.p2pControl.OnPayloadMessageReceived += new P2PPayloadMessageReceived(p2pControl_OnPayloadMessageReceived);
     568//            this.p2pControl.OnSystemMessageReceived += new P2PSystemMessageReceived(p2pControl_OnSystemMessageReceived);
     569
     570//            // because CheckPublishersAvailability checks this value, set it for the first time here...
     571//            // if bolStopped = true, the Timer for Checking Publishers liveliness doesn't start
     572//            this.bolStopped = false;
     573//            PeerId pubId = CheckPublishersAvailability();
     574//            // if DHT Entry for the task is empty, no Publisher exists at present.
     575//            // The method CheckPublishersAvailability starts a Timer for this case to continous proof Publisher-DHT-Entry
     576//            if (pubId == null)
     577//            {
     578//                this.Started = false;
     579//                // if PubId is null, the Publisher isn't started!
     580//                this.bolStopped = true;
     581//                GuiLogging("No publisher for registering found.", NotificationLevel.Info);
     582//                return;
     583//            }
     584
     585//            // when the actual publisher differs from the new detected publisher, change it
     586//            if (pubId != null && (actualPublisher != null && actualPublisher != pubId))
     587//            {
     588//                GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
     589//                actualPublisher = pubId;
     590//            }
     591//            SendMessage(pubId, PubSubMessageType.Register);
     592//            this.started = true;
     593//        }
     594
     595//        private void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
     596//        {
     597//            if (sender != actualPublisher)
     598//            {
     599//                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + msgType.ToString() + ", ID: " + sender, NotificationLevel.Debug);
     600//                return;
     601//            }
     602//            switch (msgType)
     603//            {
     604//                case PubSubMessageType.RegisteringAccepted:
     605//                    GuiLogging("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
     606//                    if (this.timeoutForPublishersRegAccept != null)
     607//                    {
     608//                        this.timeoutForPublishersRegAccept.Dispose();
     609//                        this.timeoutForPublishersRegAccept = null;
     610//                    }
     611//                    break;
     612//                case PubSubMessageType.Ping:
     613//                    SendMessage(sender, PubSubMessageType.Pong);
     614//                    GuiLogging("REPLIED to a ping message from " + sender, NotificationLevel.Debug);
     615//                    break;
     616//                case PubSubMessageType.Register:
     617//                case PubSubMessageType.Unregister:
     618//                    GuiLogging(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Debug);
     619//                    // continuously try to get a unregister and than re-register with publisher
     620//                    Stop(msgType);
     621//                    Register();
     622//                    break;
     623//                case PubSubMessageType.Solution:
     624//                    Stop(msgType);
     625//                    GuiLogging("Another Subscriber had found the solution!", NotificationLevel.Info);
     626//                    break;
     627//                case PubSubMessageType.Stop:
     628//                    Stop(msgType);
     629//                    GuiLogging("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
     630//                    break;
     631//                case PubSubMessageType.Pong:
     632//                    if (this.timeoutForPublishersPong != null)
     633//                    {
     634//                        this.timeoutForPublishersPong.Dispose();
     635//                        this.timeoutForPublishersPong = null;
     636//                    }
     637//                    break;
     638//                case PubSubMessageType.Alive:
     639//                default:
     640//                    // not possible at the moment
     641//                    break;
     642//            }
     643//        }
     644
     645//        private void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
     646//        {
     647//            if (sender != actualPublisher)
     648//            {
     649//                GuiLogging("RECEIVED message from third party peer (not the publisher!): " + UTF8Encoding.UTF8.GetString(data) + ", ID: " + sender, NotificationLevel.Debug);
     650//                return;
     651//            }
     652//            // functionality swapped for better inheritance
     653//            HandleIncomingData(sender, data);
     654//        }
     655
     656//        /// <summary>
     657//        /// Incoming data will be printed in the information field and the OnTextArrivedEvent will be thrown
     658//        /// </summary>
     659//        /// <param name="senderId"></param>
     660//        /// <param name="sData"></param>
     661//        protected virtual void HandleIncomingData(PeerId senderId, byte[] data)
     662//        {
     663//            GuiLogging("RECEIVED: Message from '" + senderId
     664//                    + "' with data: '" + UTF8Encoding.UTF8.GetString(data) + "'", NotificationLevel.Debug);
     665
     666//            if (OnTextArrivedFromPublisher != null)
     667//                OnTextArrivedFromPublisher(data, senderId);
     668//        }
     669
     670//        private void SendMessage(PeerId pubPeerId, PubSubMessageType msgType)
     671//        {
     672//            if (timerSendingAliveMsg == null && !this.bolStopped)
     673//                timerSendingAliveMsg = new Timer(OnSendAliveMessage, null, sendAliveMessageInterval, sendAliveMessageInterval);
     674
     675//            switch (msgType)
     676//            {
     677//                case PubSubMessageType.Register:
     678//                    // stop "RegisteringNotPossibleTimer
     679//                    if (timerRegisteringNotPossible != null)
     680//                    {
     681//                        timerRegisteringNotPossible.Dispose();
     682//                        timerRegisteringNotPossible = null;
     683//                    }
     684//                    // start waiting interval for RegAccept Message
     685//                    if (this.timeoutForPublishersRegAccept == null)
     686//                        this.timeoutForPublishersRegAccept = new Timer(OnTimeoutRegisteringAccepted, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
     687//                    break;
     688//                case PubSubMessageType.Alive:
     689//                case PubSubMessageType.Ping:
     690//                case PubSubMessageType.Pong:
     691//                case PubSubMessageType.Unregister:
     692//                case PubSubMessageType.Stop:
     693//                case PubSubMessageType.Solution:
     694//                    break;
     695//                //case PubSubMessageType.Solution:
     696//                //    // when i send Solution to the Stop method, we will run into a recursive loop between SendMessage and Stop!
     697//                //    Stop(PubSubMessageType.NULL);
     698//                //    break;
     699//                default:
     700//                    GuiLogging("No Message sent, because MessageType wasn't supported: " + msgType.ToString(), NotificationLevel.Warning);
     701//                    return;
     702//            }
     703//            this.p2pControl.SendToPeer(msgType, pubPeerId);
     704
     705//            GuiLogging(msgType.ToString() + " message sent to Publisher", NotificationLevel.Debug);
     706//        }
     707
     708//        // registering isn't possible if no publisher has stored
     709//        // its ID in the DHT Entry with the Key TaskName
     710//        private void OnRegisteringNotPossible(object state)
     711//        {
     712//            Register();
     713//        }
     714
     715//        private void OnSendAliveMessage(object state)
     716//        {
     717//            SendMessage(actualPublisher, PubSubMessageType.Alive);
     718//        }
     719
     720//        /// <summary>
     721//        /// Returns the actual Publishers ID or null, when a publisher wasn't found in the DHT. In the second case,
     722//        /// a Timer will be started, to check periodically the DHT entry.
     723//        /// When the publishers entry changed the Publishers ID, a Register-message will be send to the new Publisher.
     724//        /// The Timer for periodically checking the Publishers availability is also started here.
     725//        /// </summary>
     726//        /// <returns>the actual Publishers ID or null, when a publisher wasn't found in the DHT</returns>
     727//        private PeerId CheckPublishersAvailability()
     728//        {
     729//            PeerId pid = DHT_CommonManagement.GetTopicsPublisherId(ref this.p2pControl, this.sTopic);
     730
     731//            if (pid == null)
     732//            {
     733//                if (timerRegisteringNotPossible == null && !this.bolStopped)
     734//                {
     735//                    // if DHT value doesn't exist at this moment, wait for 10 seconds and try again
     736//                    timerRegisteringNotPossible = new Timer(OnRegisteringNotPossible, null, 10000, 10000);
     737//                }
     738//                GuiLogging("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Debug);
     739//                return null;
     740//            }
     741
     742//            sendAliveMessageInterval = DHT_CommonManagement.GetAliveMessageInterval(ref this.p2pControl, this.sTopic);
     743
     744//            if (sendAliveMessageInterval == 0)
     745//            {
     746//                GuiLogging("Can't find AliveMsg-Settings from Publisher for the Subscriber.", NotificationLevel.Error);
     747//                return null;
     748//            }
     749
     750//            if (actualPublisher == null) //first time initialization
     751//            {
     752//                actualPublisher = pid;
     753//                GuiLogging("First time received publishers ID.", NotificationLevel.Debug);
     754//            }
     755//            //else if (actualPublisher != pid)
     756//            //{
     757//            //    GuiLogging("Publisher has been changed from ID '" + actualPublisher + "' to '" + pubId + "'", NotificationLevel.Debug);
     758//            //    SendMessage(pubId, PubSubMessageType.Register);
     759//            //    actualPublisher = pid;
     760//            //}
     761
     762//            GuiLogging("RECEIVED: Publishers' peer ID '" + pid + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Debug);
     763
     764//            // setting timer to check periodical the availability of the publishing peer
     765//            if (timerCheckPubAvailability == null && !this.bolStopped)
     766//                timerCheckPubAvailability = new Timer(OnCheckPubAvailability, null, this.checkPublishersAvailability, this.checkPublishersAvailability);
     767
     768//            return pid;
     769//        }
     770
     771//        /// <summary>
     772//        /// Callback for timerCheckPubAvailability (adjustable parameter
     773//        /// in settings, usually every 60 seconds). If another Peer
     774//        /// takes over Publishing the Task, this will be handled in this callback, too.
     775//        /// </summary>
     776//        /// <param name="state"></param>
     777//        private void OnCheckPubAvailability(object state)
     778//        {
     779//            PeerId newPubId = CheckPublishersAvailability();
     780
     781//            if (newPubId == actualPublisher)
     782//            {
     783//                // Timer will be only stopped, when OnMessageReceived-Event received
     784//                // a Pong-Response from the publisher!
     785//                SendMessage(actualPublisher, PubSubMessageType.Ping);
     786//                if (timeoutForPublishersPong == null)
     787//                {
     788//                    timeoutForPublishersPong = new Timer(OnTimeoutPublishersPong, null, this.publisherReplyTimespan, this.publisherReplyTimespan);
     789//                }
     790//            }
     791//            if (newPubId != actualPublisher)
     792//                Register();
     793//        }
     794
     795//        /// <summary>
     796//        /// This callback is only fired, when the publisher didn't sent a response on the register message.
     797//        /// </summary>
     798//        /// <param name="state"></param>
     799//        private void OnTimeoutRegisteringAccepted(object state)
     800//        {
     801//            GuiLogging("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Debug);
     802//            // try to register again
     803//            Register();
     804//        }
     805
     806//        /// <summary>
     807//        /// This callback os only fired, when the publisher didn't sent a response on the ping message.
     808//        /// </summary>
     809//        /// <param name="state"></param>
     810//        private void OnTimeoutPublishersPong(object state)
     811//        {
     812//            GuiLogging("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Warning);
     813//            if (timeoutForPublishersPong != null)
     814//            {
     815//                timeoutForPublishersPong.Dispose();
     816//                timeoutForPublishersPong = null;
     817//            }
     818//            // try to get an active publisher and re-register
     819//            CheckPublishersAvailability();
     820//        }
     821
     822//        /// <summary>
     823//        /// Will stop all timers, so Subscriber ends with sending
     824//        /// Register-, Alive- and Pong-messages. Furthermore an
     825//        /// unregister message will be send to the publisher
     826//        /// </summary>
     827//        public void Stop(PubSubMessageType msgType)
     828//        {
     829//            this.bolStopped = true;
     830//            if (actualPublisher != null && msgType != PubSubMessageType.NULL)
     831//                SendMessage(actualPublisher, msgType);
     832
     833//            #region stopping all timers, if they are still active
     834//            if (this.timerSendingAliveMsg != null)
     835//            {
     836//                this.timerSendingAliveMsg.Dispose();
     837//                this.timerSendingAliveMsg = null;
     838//            }
     839//            if (this.timerRegisteringNotPossible != null)
     840//            {
     841//                this.timerRegisteringNotPossible.Dispose();
     842//                this.timerRegisteringNotPossible = null;
     843//            }
     844//            if (this.timerCheckPubAvailability != null)
     845//            {
     846//                this.timerCheckPubAvailability.Dispose();
     847//                this.timerCheckPubAvailability = null;
     848//            }
     849//            if (this.timeoutForPublishersRegAccept != null)
     850//            {
     851//                this.timeoutForPublishersRegAccept.Dispose();
     852//                this.timeoutForPublishersRegAccept = null;
     853//            }
     854//            if (this.timeoutForPublishersPong != null)
     855//            {
     856//                this.timeoutForPublishersPong.Dispose();
     857//                this.timeoutForPublishersPong = null;
     858//            }
     859//            #endregion
     860//            GuiLogging("All Timers were stopped successfully", NotificationLevel.Debug);
     861
     862//            this.p2pControl.OnSystemMessageReceived -= p2pControl_OnSystemMessageReceived;
     863//            this.p2pControl.OnPayloadMessageReceived -= p2pControl_OnPayloadMessageReceived;
     864
     865//            this.Started = false;
     866//            GuiLogging("Subscriber is completely stopped",NotificationLevel.Debug);
     867//        }
     868
     869//        protected void GuiLogging(string sText, NotificationLevel notLev)
     870//        {
     871//            if (OnGuiMessage != null)
     872//                OnGuiMessage(sText, notLev);
     873//        }
     874//    }
     875//}
Note: See TracChangeset for help on using the changeset viewer.