source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1179

Last change on this file since 1179 was 1179, checked in by arnold, 12 years ago

ProgressChunk stuff for P2PManager QuickWatchPresentation

File size: 24.2 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Publisher-change is possible, but catch old Publishers subscriber list
27 *   isn't implemented yet ((de)serialization of the subscribers is
28 *   implemented and tested)
29 * - Manager-change is possible, but catch job history isn't implemented yes
30 *   ((de)serialization of job management lists)
31 * - Benchmarking the working peers
32 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
33 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
34 *   loosing any Job-Information (this happens at present, when pressing the Stop
35 *   button of the CrypTool Workspace)
36 */
37
38namespace Cryptool.Plugins.PeerToPeer
39{
40    public class P2PManagerBase_NEW : P2PPublisherBase
41    {
42        #region Events and Delegates
43
44        public delegate void ProcessProgress(double progressInPercent);       
45        public delegate void NewJobAllocated(BigInteger jobId);
46        public delegate void ResultReceived(BigInteger jobId);
47        public delegate void JobCanceled(BigInteger jobId);
48        public delegate void NoMoreJobsLeft();
49        public delegate void AllJobResultsReceived(BigInteger lastJobId);
50        public event ProcessProgress OnProcessProgress;
51        /// <summary>
52        /// When a new job was successfully allocated to a worker (after receiving
53        /// its "JobAccepted"-Message), this event is thrown
54        /// </summary>
55        public event NewJobAllocated OnNewJobAllocated;
56        /// <summary>
57        /// When a new job result was received (and accepted) this event is thrown
58        /// </summary>
59        public event ResultReceived OnResultReceived;
60        /// <summary>
61        /// is thrown when an active worker leaves the network and the jobs come back
62        /// </summary>
63        public event JobCanceled OnJobCanceled;
64        /// <summary>
65        /// When the last job from the DistributableJob-Stack is allocated, but
66        /// the Manager is still waiting for some JobResults this event is thrown
67        /// </summary>
68        public event NoMoreJobsLeft OnNoMoreJobsLeft;
69        /// <summary>
70        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
71        /// this event is thrown
72        /// </summary>
73        public event AllJobResultsReceived OnAllJobResultsReceived;
74
75        #endregion
76
77        #region Variables
78
79        /// <summary>
80        /// this control contains a JobStack and other special
81        /// management for a SPECIAL distributable Job
82        /// </summary>
83        private IDistributableJob distributableJobControl;
84        /// <summary>
85        /// this list contains all jobs, which were sent to workers,
86        /// but the workers hadn't accept/decline the Job at present
87        /// </summary>
88        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
89        /// <summary>
90        /// this dict contains all jobs/workers, who were successfully
91        /// distributed (so the manager already had received a JobAccepted Msg)
92        /// </summary>
93        private Dictionary<BigInteger, PeerId> jobsInProgress;
94
95        private bool managerStarted = false;
96        public bool ManagerStarted
97        {
98            get { return this.managerStarted; }
99            private set { this.managerStarted = value; }
100        }
101
102        /// <summary>
103        /// When the Manager is started, this variable must be set.
104        /// </summary>
105        private string sTopic = String.Empty;
106        public string TopicName
107        {
108            get { return this.sTopic; }
109            private set { this.sTopic = value; }
110        }
111
112        private long lAliveMessageInterval;
113        public long AliveMesageInterval
114        {
115            get { return this.lAliveMessageInterval ; }
116            set { this.lAliveMessageInterval = value; }
117        }
118
119        private DateTime startWorkingTime = DateTime.MinValue;
120        /// <summary>
121        /// This value will be initialized after allocating the first job to a worker.
122        /// Before initialization this is MinValue! Used for end time approximation
123        /// </summary>
124        public DateTime StartWorkingTime
125        {
126            get { return this.startWorkingTime; } 
127        }
128
129        #endregion
130
131        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
132        {
133            this.distributableJobControl = distributableJob;
134
135            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
136            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
137        }
138
139        public void StartManager(string sTopic, long aliveMessageInterval)
140        {
141            // only when the main manager plugin is connected with a Peer-PlugIn
142            // and a IWorkerControl-PlugIn, this Manager can start its work
143            if (this.distributableJobControl != null && this.p2pControl != null)
144            {
145                //set value to null, when restarting the manager
146                this.startWorkingTime = DateTime.MinValue; 
147                this.TopicName = sTopic;
148                this.AliveMesageInterval = aliveMessageInterval;
149                base.Start(this.TopicName, this.AliveMesageInterval);
150            }
151            else
152            {
153                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
154            }
155        }
156
157        protected override void PeerCompletelyStarted()
158        {
159            base.PeerCompletelyStarted();
160
161            this.ManagerStarted = true;
162            GetProgressInformation();
163            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
164        }
165
166        public override void Stop(PubSubMessageType msgType)
167        {
168            base.Stop(msgType);
169
170            this.ManagerStarted = false;
171            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
172            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
173
174            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
175        }
176
177        /// <summary>
178        /// because the manager needs additional peer information for all workers,
179        /// this method is overwritten. WorkersManagement throws events, when
180        /// a Worker leaves or joins the "solution network", so we can re-add or
181        /// allocate a job.
182        /// </summary>
183        /// <param name="aliveMessageInterval"></param>
184        protected override void AssignManagement(long aliveMessageInterval)
185        {
186            this.peerManagement = new WorkersManagement(aliveMessageInterval);
187            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
188            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
189            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
190        }
191
192        /// <summary>
193        /// only accepts DistributableJob-specific messages (created, checked and transformed by
194        /// the static class JobMessages). All other message are dropped!
195        /// </summary>
196        /// <param name="sender"></param>
197        /// <param name="data"></param>
198        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
199        {
200            if (!JobMessages.IsJobMessageType(data[0]))
201            {
202                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
203                return;
204            } 
205            switch (JobMessages.GetMessageJobType(data[0]))
206            {
207                case MessageJobType.JobAcceptanceInfo:
208                    HandleJobAcceptanceMessage(sender, data);
209                    break;
210                case MessageJobType.JobResult:
211                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
212                    HandleJobResultMessage(sender, data);
213                    break;
214                case MessageJobType.Free:
215                    HandleFreeMessage(sender, data);
216                    break;
217                default:
218                    GuiLogging("Obscure Message (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
219                    break;
220            } // end switch
221            GetProgressInformation();
222        }
223
224        /// <summary>
225        /// This method is only overwritten because we have to ignore the Solution-case in
226        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
227        /// the working status of the Manager, because it havn't the overview of the JobParts)
228        /// </summary>
229        /// <param name="sender"></param>
230        /// <param name="msgType"></param>
231        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
232        {
233            // ignore Solution case, because other worker could work on...
234            if (msgType != PubSubMessageType.Solution)
235                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
236                base.p2pControl_OnSystemMessageReceived(sender, msgType);
237        }
238
239        #region Handle different DistributableJob-specific, incoming messages
240
241        /// <summary>
242        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
243        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
244        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
245        /// </summary>
246        /// <param name="sender"></param>
247        /// <param name="data"></param>
248        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
249        {
250            BigInteger jobId = null;
251            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
252            {
253                this.distributableJobControl.JobAccepted(jobId);
254                lock (this.jobsInProgress)
255                {
256                    if (!this.jobsInProgress.ContainsKey(jobId))
257                    {
258                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
259                        this.jobsInProgress.Add(jobId, sender);
260                        if (OnNewJobAllocated != null)
261                            OnNewJobAllocated(jobId);
262                    }
263                    //else
264                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
265                }
266                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
267            }
268            else // if AcceptanceInfo is declined
269            {
270                this.distributableJobControl.JobDeclined(jobId);
271
272                // set busy worker to free, because he delined the job
273
274                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice...
275                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
276                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
277            }
278            // in every case remove the job from thew waiting Dictionary
279            lock (this.jobsWaitingForAcceptanceInfo)
280            {
281                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
282                {
283                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
284                }
285                //else
286                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
287            }
288        }
289
290        /// <summary>
291        /// Sets the incoming result in the DistributableJob class, removes the job from
292        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
293        /// </summary>
294        /// <param name="sender"></param>
295        /// <param name="data"></param>
296        private void HandleJobResultMessage(PeerId sender, byte[] data)
297        {
298            BigInteger jobId;
299
300            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
301            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
302
303            if (OnResultReceived != null)
304                OnResultReceived(jobId);
305
306            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
307                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
308
309            lock (this.jobsInProgress)
310            {
311                if (this.jobsInProgress.ContainsKey(jobId))
312                    this.jobsInProgress.Remove(jobId);
313                //dirty workaround because P2PJobAdmin sends the result msg twice...
314                //else
315                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
316            }
317        }
318
319        /// <summary>
320        /// If message content declares the sender as a free worker,
321        /// set this worker from busy to free, otherwise do nothing
322        /// </summary>
323        /// <param name="sender"></param>
324        /// <param name="data"></param>
325        private void HandleFreeMessage(PeerId sender, byte[] data)
326        {
327            // only handle the "true"-case, because otherwise there is nothing to do
328            if (JobMessages.GetFreeWorkerStatusMessage(data))
329            {
330                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
331                // only if worker already exists in the "busy list", it will set to free and event will be thrown
332                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
333            }
334        }
335
336        #endregion
337
338        #region Worker-action-handling
339
340        /// <summary>
341        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
342        /// </summary>
343        private void peerManagement_OnFreeWorkersAvailable()
344        {
345            if (!this.ManagerStarted)
346            {
347                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
348                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
349                if (removeSettings)
350                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
351                else
352                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
353            }
354            else
355            {
356                /* edited by Arnold - 2010.02.23 */
357                // because parallel incoming free workers could run
358                // into concurrence in this method, so some workers
359                // could get more than one job - so they have to
360                // queue the additional jobs.
361                lock (this)
362                {
363                    AllocateJobs();
364                }
365            }
366
367            GetProgressInformation();
368        }
369
370        /// <summary>
371        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
372        /// be pushed back to the main Jobstack
373        /// </summary>
374        /// <param name="peerId"></param>
375        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
376        {
377            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
378
379            // necessary lock, because the amount of jobs in Progress could change while traversing this list
380            lock (this.jobsInProgress)
381            {
382                // push job back and remove list entries for "jobs in progress"
383                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
384
385                BigInteger jobId;
386                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
387                {
388                    jobId = allJobsForRemovedPeer[i];
389                    this.distributableJobControl.Push(jobId);
390                    this.jobsInProgress.Remove(jobId);
391                    if (OnJobCanceled != null)
392                        OnJobCanceled(jobId);
393                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
394                }
395            }
396
397            // necessary lock, because the amount of jobs in Progress could change while traversing this list
398            lock (this.jobsWaitingForAcceptanceInfo)
399            {
400                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
401                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
402                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
403
404                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
405                {
406                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
407                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
408                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
409                }
410            }
411
412            GetProgressInformation();
413        }
414       
415        /// <summary>
416        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
417        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
418        /// so it can be checked, if the Worker respond to the Job-allocation
419        /// </summary>
420        private void AllocateJobs()
421        {
422            int i = 0;
423            BigInteger temp_jobId = null;
424            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetFreeWorkers();
425
426            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
427
428            // set the start working time after allocating the FIRST job
429            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
430                this.startWorkingTime = DateTime.Now;
431
432            foreach (PeerId worker in freePeers)
433            {
434                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
435                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
436                {
437                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
438
439                    // set free worker to busy in the peerManagement class
440                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
441
442                    // get actual subscriber/worker and send the new job
443                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
444
445                    if (OnNewJobAllocated != null)
446                        OnNewJobAllocated(temp_jobId);
447
448                    GuiLogging("Job '" + temp_jobId.ToString() + "' were sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
449                    i++;
450                }
451                else
452                {
453                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
454                    if (OnNoMoreJobsLeft != null)
455                        OnNoMoreJobsLeft();
456                }
457            } // end foreach
458            GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
459        }
460
461        #endregion
462
463        /// <summary>
464        /// returns the percental progress information of the whole job (value is between 0 and 100)
465        /// </summary>
466        /// <returns>the percental progress information of the whole job</returns>
467        private double GetProgressInformation()
468        {
469            double jobProgressInPercent;
470            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
471            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
472            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
473
474            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
475            {
476                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
477            }
478            else if (lAllocatedAmount > 0)
479            {
480                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
481            }
482            else if (lFinishedAmount > 0)
483            {
484                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
485            }
486            else
487            {
488                jobProgressInPercent = 0.0;
489            }
490
491            if (OnProcessProgress != null)
492                OnProcessProgress(jobProgressInPercent);
493
494            return jobProgressInPercent;
495        }
496
497        /// <summary>
498        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
499        /// When no job is finished yet, it returns an empty timespan
500        /// </summary>
501        /// <returns></returns>
502        public DateTime EstimatedEndTime()
503        {
504            DateTime retTime = DateTime.MaxValue;
505            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
506            {
507                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
508                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
509                double restSeconds = jobsPerSecond * 
510                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
511                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
512                retTime = DateTime.Now.AddSeconds(restSeconds);
513            }
514            return retTime;
515        }
516
517        #region Forward PeerManagement Values
518
519        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
520        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
521
522        #endregion
523    }
524}
Note: See TracBrowser for help on using the repository browser.