source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1178

Last change on this file since 1178 was 1178, checked in by arnold, 12 years ago

P2PManager: Added a job distribution Expander in the QuickWatchPresentation
Bug fixes: Cyclic Register-Message-Sending, Allocating more than one job to each worker

File size: 24.1 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Publisher-change is possible, but catch old Publishers subscriber list
27 *   isn't implemented yet ((de)serialization of the subscribers is
28 *   implemented and tested)
29 * - Make Manager-change possible ((de)serialization of job management lists)
30 * - Benchmarking the working peers
31 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
32 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
33 *   loosing any Job-Information (this happens at present, when pressing the Stop
34 *   button of the CrypTool Workspace)
35 */
36
37namespace Cryptool.Plugins.PeerToPeer
38{
39    public class P2PManagerBase_NEW : P2PPublisherBase
40    {
41        #region Events and Delegates
42
43        public delegate void ProcessProgress(double progressInPercent);       
44        public delegate void NewJobAllocated(BigInteger jobId);
45        public delegate void ResultReceived(BigInteger jobId);
46        public delegate void JobCanceled(BigInteger jobId);
47        public delegate void NoMoreJobsLeft();
48        public delegate void AllJobResultsReceived(BigInteger lastJobId);
49        public event ProcessProgress OnProcessProgress;
50        /// <summary>
51        /// When a new job was successfully allocated to a worker (after receiving
52        /// its "JobAccepted"-Message), this event is thrown
53        /// </summary>
54        public event NewJobAllocated OnNewJobAllocated;
55        /// <summary>
56        /// When a new job result was received (and accepted) this event is thrown
57        /// </summary>
58        public event ResultReceived OnResultReceived;
59        /// <summary>
60        /// is thrown when an active worker leaves the network and the jobs come back
61        /// </summary>
62        public event JobCanceled OnJobCanceled;
63        /// <summary>
64        /// When the last job from the DistributableJob-Stack is allocated, but
65        /// the Manager is still waiting for some JobResults this event is thrown
66        /// </summary>
67        public event NoMoreJobsLeft OnNoMoreJobsLeft;
68        /// <summary>
69        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
70        /// this event is thrown
71        /// </summary>
72        public event AllJobResultsReceived OnAllJobResultsReceived;
73
74        #endregion
75
76        #region Variables
77
78        /// <summary>
79        /// this control contains a JobStack and other special
80        /// management for a SPECIAL distributable Job
81        /// </summary>
82        private IDistributableJob distributableJobControl;
83        /// <summary>
84        /// this list contains all jobs, which were sent to workers,
85        /// but the workers hadn't accept/decline the Job at present
86        /// </summary>
87        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
88        /// <summary>
89        /// this dict contains all jobs/workers, who were successfully
90        /// distributed (so the manager already had received a JobAccepted Msg)
91        /// </summary>
92        private Dictionary<BigInteger, PeerId> jobsInProgress;
93
94        private bool managerStarted = false;
95        public bool ManagerStarted
96        {
97            get { return this.managerStarted; }
98            private set { this.managerStarted = value; }
99        }
100
101        /// <summary>
102        /// When the Manager is started, this variable must be set.
103        /// </summary>
104        private string sTopic = String.Empty;
105        public string TopicName
106        {
107            get { return this.sTopic; }
108            private set { this.sTopic = value; }
109        }
110
111        private long lAliveMessageInterval;
112        public long AliveMesageInterval
113        {
114            get { return this.lAliveMessageInterval ; }
115            set { this.lAliveMessageInterval = value; }
116        }
117
118        private DateTime startWorkingTime = DateTime.MinValue;
119        /// <summary>
120        /// This value will be initialized after allocating the first job to a worker.
121        /// Before initialization this is MinValue! Used for end time approximation
122        /// </summary>
123        public DateTime StartWorkingTime
124        {
125            get { return this.startWorkingTime; } 
126        }
127
128        #endregion
129
130        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
131        {
132            this.distributableJobControl = distributableJob;
133
134            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
135            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
136        }
137
138        public void StartManager(string sTopic, long aliveMessageInterval)
139        {
140            // only when the main manager plugin is connected with a Peer-PlugIn
141            // and a IWorkerControl-PlugIn, this Manager can start its work
142            if (this.distributableJobControl != null && this.p2pControl != null)
143            {
144                //set value to null, when restarting the manager
145                this.startWorkingTime = DateTime.MinValue; 
146                this.TopicName = sTopic;
147                this.AliveMesageInterval = aliveMessageInterval;
148                base.Start(this.TopicName, this.AliveMesageInterval);
149            }
150            else
151            {
152                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
153            }
154        }
155
156        protected override void PeerCompletelyStarted()
157        {
158            base.PeerCompletelyStarted();
159
160            this.ManagerStarted = true;
161            GetProgressInformation();
162            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
163        }
164
165        public override void Stop(PubSubMessageType msgType)
166        {
167            base.Stop(msgType);
168
169            this.ManagerStarted = false;
170            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
171            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
172
173            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
174        }
175
176        /// <summary>
177        /// because the manager needs additional peer information for all workers,
178        /// this method is overwritten. WorkersManagement throws events, when
179        /// a Worker leaves or joins the "solution network", so we can re-add or
180        /// allocate a job.
181        /// </summary>
182        /// <param name="aliveMessageInterval"></param>
183        protected override void AssignManagement(long aliveMessageInterval)
184        {
185            this.peerManagement = new WorkersManagement(aliveMessageInterval);
186            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
187            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
188            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
189        }
190
191        /// <summary>
192        /// only accepts DistributableJob-specific messages (created, checked and transformed by
193        /// the static class JobMessages). All other message are dropped!
194        /// </summary>
195        /// <param name="sender"></param>
196        /// <param name="data"></param>
197        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
198        {
199            if (!JobMessages.IsJobMessageType(data[0]))
200            {
201                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
202                return;
203            } 
204            switch (JobMessages.GetMessageJobType(data[0]))
205            {
206                case MessageJobType.JobAcceptanceInfo:
207                    HandleJobAcceptanceMessage(sender, data);
208                    break;
209                case MessageJobType.JobResult:
210                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
211                    HandleJobResultMessage(sender, data);
212                    break;
213                case MessageJobType.Free:
214                    HandleFreeMessage(sender, data);
215                    break;
216                default:
217                    GuiLogging("Obscure Message (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
218                    break;
219            } // end switch
220            GetProgressInformation();
221        }
222
223        /// <summary>
224        /// This method is only overwritten because we have to ignore the Solution-case in
225        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
226        /// the working status of the Manager, because it havn't the overview of the JobParts)
227        /// </summary>
228        /// <param name="sender"></param>
229        /// <param name="msgType"></param>
230        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
231        {
232            // ignore Solution case, because other worker could work on...
233            if (msgType != PubSubMessageType.Solution)
234                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
235                base.p2pControl_OnSystemMessageReceived(sender, msgType);
236        }
237
238        #region Handle different DistributableJob-specific, incoming messages
239
240        /// <summary>
241        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
242        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
243        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
244        /// </summary>
245        /// <param name="sender"></param>
246        /// <param name="data"></param>
247        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
248        {
249            BigInteger jobId = null;
250            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
251            {
252                this.distributableJobControl.JobAccepted(jobId);
253                lock (this.jobsInProgress)
254                {
255                    if (!this.jobsInProgress.ContainsKey(jobId))
256                    {
257                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
258                        this.jobsInProgress.Add(jobId, sender);
259                        if (OnNewJobAllocated != null)
260                            OnNewJobAllocated(jobId);
261                    }
262                    //else
263                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
264                }
265                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
266            }
267            else // if AcceptanceInfo is declined
268            {
269                this.distributableJobControl.JobDeclined(jobId);
270
271                // set busy worker to free, because he delined the job
272
273                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice...
274                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
275                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
276            }
277            // in every case remove the job from thew waiting Dictionary
278            lock (this.jobsWaitingForAcceptanceInfo)
279            {
280                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
281                {
282                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
283                }
284                //else
285                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
286            }
287        }
288
289        /// <summary>
290        /// Sets the incoming result in the DistributableJob class, removes the job from
291        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
292        /// </summary>
293        /// <param name="sender"></param>
294        /// <param name="data"></param>
295        private void HandleJobResultMessage(PeerId sender, byte[] data)
296        {
297            BigInteger jobId;
298
299            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
300            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
301
302            if (OnResultReceived != null)
303                OnResultReceived(jobId);
304
305            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
306                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
307
308            lock (this.jobsInProgress)
309            {
310                if (this.jobsInProgress.ContainsKey(jobId))
311                    this.jobsInProgress.Remove(jobId);
312                //dirty workaround because P2PJobAdmin sends the result msg twice...
313                //else
314                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
315            }
316        }
317
318        /// <summary>
319        /// If message content declares the sender as a free worker,
320        /// set this worker from busy to free, otherwise do nothing
321        /// </summary>
322        /// <param name="sender"></param>
323        /// <param name="data"></param>
324        private void HandleFreeMessage(PeerId sender, byte[] data)
325        {
326            // only handle the "true"-case, because otherwise there is nothing to do
327            if (JobMessages.GetFreeWorkerStatusMessage(data))
328            {
329                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
330                // only if worker already exists in the "busy list", it will set to free and event will be thrown
331                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
332            }
333        }
334
335        #endregion
336
337        #region Worker-action-handling
338
339        /// <summary>
340        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
341        /// </summary>
342        private void peerManagement_OnFreeWorkersAvailable()
343        {
344            if (!this.ManagerStarted)
345            {
346                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
347                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
348                if (removeSettings)
349                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
350                else
351                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
352            }
353            else
354            {
355                /* edited by Arnold - 2010.02.23 */
356                // because parallel incoming free workers could run
357                // into concurrence in this method, so some workers
358                // could get more than one job - so they have to
359                // queue the additional jobs.
360                lock (this)
361                {
362                    AllocateJobs();
363                }
364            }
365
366            GetProgressInformation();
367        }
368
369        /// <summary>
370        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
371        /// be pushed back to the main Jobstack
372        /// </summary>
373        /// <param name="peerId"></param>
374        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
375        {
376            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
377
378            // necessary lock, because the amount of jobs in Progress could change while traversing this list
379            lock (this.jobsInProgress)
380            {
381                // push job back and remove list entries for "jobs in progress"
382                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
383
384                BigInteger jobId;
385                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
386                {
387                    jobId = allJobsForRemovedPeer[i];
388                    this.distributableJobControl.Push(jobId);
389                    this.jobsInProgress.Remove(jobId);
390                    if (OnJobCanceled != null)
391                        OnJobCanceled(jobId);
392                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
393                }
394            }
395
396            // necessary lock, because the amount of jobs in Progress could change while traversing this list
397            lock (this.jobsWaitingForAcceptanceInfo)
398            {
399                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
400                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
401                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
402
403                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
404                {
405                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
406                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
407                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
408                }
409            }
410
411            GetProgressInformation();
412        }
413       
414        /// <summary>
415        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
416        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
417        /// so it can be checked, if the Worker respond to the Job-allocation
418        /// </summary>
419        private void AllocateJobs()
420        {
421            int i = 0;
422            BigInteger temp_jobId = null;
423            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetFreeWorkers();
424
425            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
426
427            // set the start working time after allocating the FIRST job
428            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
429                this.startWorkingTime = DateTime.Now;
430
431            foreach (PeerId worker in freePeers)
432            {
433                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
434                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
435                {
436                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
437
438                    // set free worker to busy in the peerManagement class
439                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
440
441                    // get actual subscriber/worker and send the new job
442                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
443
444                    if (OnNewJobAllocated != null)
445                        OnNewJobAllocated(temp_jobId);
446
447                    GuiLogging("Job '" + temp_jobId.ToString() + "' were sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
448                    i++;
449                }
450                else
451                {
452                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
453                    if (OnNoMoreJobsLeft != null)
454                        OnNoMoreJobsLeft();
455                }
456            } // end foreach
457            GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
458        }
459
460        #endregion
461
462        /// <summary>
463        /// returns the percental progress information of the whole job (value is between 0 and 100)
464        /// </summary>
465        /// <returns>the percental progress information of the whole job</returns>
466        private double GetProgressInformation()
467        {
468            double jobProgressInPercent;
469            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
470            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
471            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
472
473            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
474            {
475                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
476            }
477            else if (lAllocatedAmount > 0)
478            {
479                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
480            }
481            else if (lFinishedAmount > 0)
482            {
483                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
484            }
485            else
486            {
487                jobProgressInPercent = 0.0;
488            }
489
490            if (OnProcessProgress != null)
491                OnProcessProgress(jobProgressInPercent);
492
493            return jobProgressInPercent;
494        }
495
496        /// <summary>
497        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
498        /// When no job is finished yet, it returns an empty timespan
499        /// </summary>
500        /// <returns></returns>
501        public DateTime EstimatedEndTime()
502        {
503            DateTime retTime = DateTime.MaxValue;
504            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
505            {
506                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
507                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
508                double restSeconds = jobsPerSecond * 
509                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
510                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
511                retTime = DateTime.Now.AddSeconds(restSeconds);
512            }
513            return retTime;
514        }
515
516        #region Forward PeerManagement Values
517
518        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
519        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
520
521        #endregion
522    }
523}
Note: See TracBrowser for help on using the repository browser.