source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1348

Last change on this file since 1348 was 1348, checked in by arnold, 12 years ago

P2PManager/Worker: Manager sends a "no more jobs left" message to the requesting worker or after receiving the last job result to all active registered workers. After receiving this message, the workers stop their "waiting for new jobs" timer, which sends "free worker" messages to the Manager periodically.
For future use: After worker has received the "no more jobs left" message from the Manager, it should register with a new Manager to computate more jobs.

File size: 26.0 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Publisher-change is possible, but catch old Publishers subscriber list
27 *   isn't implemented yet ((de)serialization of the subscribers is
28 *   implemented and tested)
29 * - Manager-change is possible, but catch job history isn't implemented yet
30 *   ((de)serialization of job management lists)
31 * - Benchmarking the working peers
32 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
33 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
34 *   loosing any Job-Information (this happens at present, when pressing the Stop
35 *   button of the CrypTool Workspace)
36 */
37
38namespace Cryptool.Plugins.PeerToPeer
39{
40    public class P2PManagerBase_NEW : P2PPublisherBase
41    {
42        #region Events and Delegates
43
44        public delegate void ProcessProgress(double progressInPercent);       
45        public delegate void NewJobAllocated(BigInteger jobId);
46        public delegate void ResultReceived(BigInteger jobId);
47        public delegate void JobCanceled(BigInteger jobId);
48        public delegate void NoMoreJobsLeft();
49        public delegate void AllJobResultsReceived(BigInteger lastJobId);
50        public event ProcessProgress OnProcessProgress;
51        /// <summary>
52        /// When a new job was successfully allocated to a worker (after receiving
53        /// its "JobAccepted"-Message), this event is thrown
54        /// </summary>
55        public event NewJobAllocated OnNewJobAllocated;
56        /// <summary>
57        /// When a new job result was received (and accepted) this event is thrown
58        /// </summary>
59        public event ResultReceived OnResultReceived;
60        /// <summary>
61        /// is thrown when an active worker leaves the network and the jobs come back
62        /// </summary>
63        public event JobCanceled OnJobCanceled;
64        /// <summary>
65        /// When the last job from the DistributableJob-Stack is allocated, but
66        /// the Manager is still waiting for some JobResults this event is thrown
67        /// </summary>
68        public event NoMoreJobsLeft OnNoMoreJobsLeft;
69        /// <summary>
70        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
71        /// this event is thrown
72        /// </summary>
73        public event AllJobResultsReceived OnAllJobResultsReceived;
74
75        #endregion
76
77        #region Variables
78
79        /// <summary>
80        /// this control contains a JobStack and other special
81        /// management for a SPECIAL distributable Job
82        /// </summary>
83        private IDistributableJob distributableJobControl;
84        /// <summary>
85        /// this list contains all jobs, which were sent to workers,
86        /// but the workers hadn't accept/decline the Job at present
87        /// </summary>
88        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
89        /// <summary>
90        /// this dict contains all jobs/workers, who were successfully
91        /// distributed (so the manager already had received a JobAccepted Msg)
92        /// </summary>
93        private Dictionary<BigInteger, PeerId> jobsInProgress;
94
95        private bool managerStarted = false;
96        public bool ManagerStarted
97        {
98            get { return this.managerStarted; }
99            private set { this.managerStarted = value; }
100        }
101
102        /// <summary>
103        /// When the Manager is started, this variable must be set.
104        /// </summary>
105        private string sTopic = String.Empty;
106        public string TopicName
107        {
108            get { return this.sTopic; }
109            private set { this.sTopic = value; }
110        }
111
112        private long lAliveMessageInterval;
113        public long AliveMesageInterval
114        {
115            get { return this.lAliveMessageInterval ; }
116            set { this.lAliveMessageInterval = value; }
117        }
118
119        private DateTime startWorkingTime = DateTime.MinValue;
120        /// <summary>
121        /// This value will be initialized after allocating the first job to a worker.
122        /// Before initialization this is MinValue! Used for end time approximation
123        /// </summary>
124        public DateTime StartWorkingTime
125        {
126            get { return this.startWorkingTime; } 
127        }
128
129        private bool lastJobAllocated = false;
130
131        #endregion
132
133        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
134        {
135            this.distributableJobControl = distributableJob;
136
137            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
138            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
139        }
140
141        public void StartManager(string sTopic, long aliveMessageInterval)
142        {
143            // only when the main manager plugin is connected with a Peer-PlugIn
144            // and a IWorkerControl-PlugIn, this Manager can start its work
145            if (this.distributableJobControl != null && this.p2pControl != null)
146            {
147                this.distributableJobControl.OnLastResultReceived += new LastResultReceived(distributableJobControl_OnLastResultReceived);
148
149                //set value to null, when restarting the manager
150                this.startWorkingTime = DateTime.MinValue; 
151                this.TopicName = sTopic;
152                this.AliveMesageInterval = aliveMessageInterval;
153                base.Start(this.TopicName, this.AliveMesageInterval);
154            }
155            else
156            {
157                GuiLogging("Manager couldn't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
158            }
159        }
160
161        void distributableJobControl_OnLastResultReceived(BigInteger jobId)
162        {
163            if (OnAllJobResultsReceived != null)
164                OnAllJobResultsReceived(jobId);
165
166            /* New Feature. When all JobResults are received, stop Manager, so another
167             * Manager can replace it and then allocate Jobs to the free workers*/
168            GuiLogging("All Job results received, so deregistering from the solution network to accomodate this topic for another Manager.",NotificationLevel.Info);
169
170            // added by Arnie 2010.04.24
171            SendNoMoreJobsLeftMsgToAllWorkers();
172
173            Stop(PubSubMessageType.Unregister);
174        }
175
176        // added by Arnie 2010.04.24
177        private void SendNoMoreJobsLeftMsgToAllWorkers()
178        {
179            int i = 0;
180            byte[] noMoreJobsLeftMsg = JobMessages.CreateNoMoreJobsLeftMessage();
181            foreach (PeerId wkr in base.peerManagement.GetAllSubscribers())
182            {
183                this.p2pControl.SendToPeer(noMoreJobsLeftMsg, wkr);
184                i++;
185            }
186            GuiLogging("Sended 'No more jobs left' messages to " + i + " workers.", NotificationLevel.Debug);
187        }
188
189        protected override void PeerCompletelyStarted()
190        {
191            base.PeerCompletelyStarted();
192
193            this.ManagerStarted = true;
194            GetProgressInformation();
195            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
196        }
197
198        public override void Stop(PubSubMessageType msgType)
199        {
200            base.Stop(msgType);
201
202            this.ManagerStarted = false;
203            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
204            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
205            this.distributableJobControl.OnLastResultReceived -= distributableJobControl_OnLastResultReceived;
206
207            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
208        }
209
210        /// <summary>
211        /// because the manager needs additional peer information for all workers,
212        /// this method is overwritten. WorkersManagement throws events, when
213        /// a Worker leaves or joins the "solution network", so we can re-add or
214        /// allocate a job.
215        /// </summary>
216        /// <param name="aliveMessageInterval"></param>
217        protected override void AssignManagement(long aliveMessageInterval)
218        {
219            this.peerManagement = new WorkersManagement(aliveMessageInterval);
220            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
221            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
222            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
223        }
224
225        /// <summary>
226        /// only accepts DistributableJob-specific messages (created, checked and transformed by
227        /// the static class JobMessages). All other message are dropped!
228        /// </summary>
229        /// <param name="sender"></param>
230        /// <param name="data"></param>
231        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
232        {
233            if (!JobMessages.IsJobMessageType(data[0]))
234            {
235                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
236                return;
237            } 
238            switch (JobMessages.GetMessageJobType(data[0]))
239            {
240                case MessageJobType.JobAcceptanceInfo:
241                    HandleJobAcceptanceMessage(sender, data);
242                    break;
243                case MessageJobType.JobResult:
244                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
245                    HandleJobResultMessage(sender, data);
246                    break;
247                case MessageJobType.Free:
248                    HandleFreeMessage(sender, data);
249                    break;
250                default:
251                    GuiLogging("Obscure Message. First byte: " + Convert.ToInt32(data[0]) + ". Data: (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
252                    break;
253            } // end switch
254            GetProgressInformation();
255        }
256
257        /// <summary>
258        /// This method is only overwritten because we have to ignore the Solution-case in
259        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
260        /// the working status of the Manager, because it havn't the overview of the JobParts)
261        /// </summary>
262        /// <param name="sender"></param>
263        /// <param name="msgType"></param>
264        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
265        {
266            // ignore Solution case, because other worker could work on...
267            if (msgType != PubSubMessageType.Solution)
268                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
269                base.p2pControl_OnSystemMessageReceived(sender, msgType);
270        }
271
272        #region Handle different DistributableJob-specific, incoming messages
273
274        /// <summary>
275        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
276        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
277        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
278        /// </summary>
279        /// <param name="sender"></param>
280        /// <param name="data"></param>
281        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
282        {
283            BigInteger jobId = null;
284            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
285            {
286                this.distributableJobControl.JobAccepted(jobId);
287                lock (this.jobsInProgress)
288                {
289                    if (!this.jobsInProgress.ContainsKey(jobId))
290                    {
291                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
292                        this.jobsInProgress.Add(jobId, sender);
293                        if (OnNewJobAllocated != null)
294                            OnNewJobAllocated(jobId);
295                    }
296                    //else
297                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
298                }
299                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
300            }
301            else // if AcceptanceInfo is declined
302            {
303                this.distributableJobControl.JobDeclined(jobId);
304
305                // set busy worker to free, because he delined the job
306
307                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice or more...
308                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
309                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
310            }
311            // in every case remove the job from thew waiting Dictionary
312            lock (this.jobsWaitingForAcceptanceInfo)
313            {
314                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
315                {
316                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
317                }
318                //else
319                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
320            }
321        }
322
323        /// <summary>
324        /// Sets the incoming result in the DistributableJob class, removes the job from
325        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
326        /// </summary>
327        /// <param name="sender"></param>
328        /// <param name="data"></param>
329        private void HandleJobResultMessage(PeerId sender, byte[] data)
330        {
331            BigInteger jobId;
332
333            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
334            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
335
336            if (OnResultReceived != null)
337                OnResultReceived(jobId);
338
339            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
340                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
341
342            lock (this.jobsInProgress)
343            {
344                if (this.jobsInProgress.ContainsKey(jobId))
345                    this.jobsInProgress.Remove(jobId);
346                //dirty workaround because P2PJobAdmin sends the result msg twice...
347                //else
348                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
349            }
350        }
351
352        /// <summary>
353        /// If message content declares the sender as a free worker,
354        /// set this worker from busy to free, otherwise do nothing
355        /// </summary>
356        /// <param name="sender"></param>
357        /// <param name="data"></param>
358        private void HandleFreeMessage(PeerId sender, byte[] data)
359        {
360            // only handle the "true"-case, because otherwise there is nothing to do
361            if (JobMessages.GetFreeWorkerStatusMessage(data))
362            {
363                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
364                // only if worker already exists in the "busy list", it will set to free and event will be thrown
365                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
366            }
367        }
368
369        #endregion
370
371        #region Worker-action-handling
372
373        /// <summary>
374        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
375        /// </summary>
376        private void peerManagement_OnFreeWorkersAvailable()
377        {
378            if (!this.ManagerStarted)
379            {
380                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
381                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
382                if (removeSettings)
383                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
384                else
385                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
386            }
387            else
388            {
389                /* edited by Arnold - 2010.02.23 */
390                // because parallel incoming free workers could run
391                // into concurrence in this method, so some workers
392                // could get more than one job - so they have to
393                // queue the additional jobs.
394                lock (this)
395                {
396                    AllocateJobs();
397                }
398            }
399
400            GetProgressInformation();
401        }
402
403        /// <summary>
404        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
405        /// be pushed back to the main Jobstack
406        /// </summary>
407        /// <param name="peerId"></param>
408        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
409        {
410            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
411
412            // necessary lock, because the amount of jobs in Progress could change while traversing this list
413            lock (this.jobsInProgress)
414            {
415                // push job back and remove list entries for "jobs in progress"
416                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
417
418                BigInteger jobId;
419                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
420                {
421                    jobId = allJobsForRemovedPeer[i];
422                    this.distributableJobControl.Push(jobId);
423                    this.jobsInProgress.Remove(jobId);
424                    if (OnJobCanceled != null)
425                        OnJobCanceled(jobId);
426                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
427                }
428            }
429
430            // necessary lock, because the amount of jobs in Progress could change while traversing this list
431            lock (this.jobsWaitingForAcceptanceInfo)
432            {
433                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
434                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
435                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
436
437                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
438                {
439                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
440                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
441                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
442                }
443            }
444
445            GetProgressInformation();
446        }
447       
448        /// <summary>
449        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
450        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
451        /// so it can be checked, if the Worker respond to the Job-allocation
452        /// </summary>
453        private void AllocateJobs()
454        {
455            int i = 0;
456            BigInteger temp_jobId = null;
457            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetFreeWorkers();
458
459            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
460
461            // set the start working time after allocating the FIRST job
462            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
463                this.startWorkingTime = DateTime.Now;
464
465            foreach (PeerId worker in freePeers)
466            {
467                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
468                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
469                {
470                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
471
472                    // set free worker to busy in the peerManagement class
473                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
474
475                    // get actual subscriber/worker and send the new job
476                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
477
478                    if (OnNewJobAllocated != null)
479                        OnNewJobAllocated(temp_jobId);
480
481                    GuiLogging("Job '" + temp_jobId.ToString() + "' was sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
482                    i++;
483                }
484                else
485                {
486                    //edited by Arnie 2010.04.24
487                    //sending "no more jobs left msg" to the free worker, so it can stop its Free-Msg-Timer
488                    base.p2pControl.SendToPeer(JobMessages.CreateNoMoreJobsLeftMessage(), worker);
489
490                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
491                    if (OnNoMoreJobsLeft != null)
492                        OnNoMoreJobsLeft();
493                }
494            } // end foreach
495            GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
496        }
497
498        #endregion
499
500        /// <summary>
501        /// returns the percental progress information of the whole job (value is between 0 and 100)
502        /// </summary>
503        /// <returns>the percental progress information of the whole job</returns>
504        private double GetProgressInformation()
505        {
506            double jobProgressInPercent;
507            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
508            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
509            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
510
511            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
512            {
513                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
514            }
515            else if (lAllocatedAmount > 0)
516            {
517                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
518            }
519            else if (lFinishedAmount > 0)
520            {
521                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
522            }
523            else
524            {
525                jobProgressInPercent = 0.0;
526            }
527
528            if (OnProcessProgress != null)
529                OnProcessProgress(jobProgressInPercent);
530
531            return jobProgressInPercent;
532        }
533
534        /// <summary>
535        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
536        /// When no job is finished yet, it returns an empty timespan
537        /// </summary>
538        /// <returns></returns>
539        public DateTime EstimatedEndTime()
540        {
541            DateTime retTime = DateTime.MaxValue;
542            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
543            {
544                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
545                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
546                double restSeconds = jobsPerSecond * 
547                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
548                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
549                retTime = DateTime.Now.AddSeconds(restSeconds);
550            }
551            return retTime;
552        }
553
554        #region Forward PeerManagement Values
555
556        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
557        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
558
559        #endregion
560    }
561}
Note: See TracBrowser for help on using the repository browser.