source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1166

Last change on this file since 1166 was 1166, checked in by arnold, 12 years ago

P2PManager: Design embellished and added an "estimated end time" information.
Samples: Output Boxes for KeySearcher Outputs

File size: 23.1 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Make Publisher-change possible ((de)serialize subscribers into DHT)
27 * - Make Manager-change possible
28 * - Benchmarking the working peers
29 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
30 */
31
32namespace Cryptool.Plugins.PeerToPeer
33{
34    public class P2PManagerBase_NEW : P2PPublisherBase
35    {
36        #region Events and Delegates
37
38        public delegate void ProcessProgress(double progressInPercent);       
39        public delegate void NewJobAllocated(BigInteger jobId);
40        public delegate void ResultReceived(BigInteger jobId);
41        public delegate void NoMoreJobsLeft();
42        public delegate void AllJobResultsReceived(BigInteger lastJobId);
43        public event ProcessProgress OnProcessProgress;
44        /// <summary>
45        /// When a new job was successfully allocated to a worker (after receiving
46        /// its "JobAccepted"-Message), this event is thrown
47        /// </summary>
48        public event NewJobAllocated OnNewJobAllocated;
49        /// <summary>
50        /// When a new job result was received (and accepted) this event is thrown
51        /// </summary>
52        public event ResultReceived OnResultReceived;
53        /// <summary>
54        /// When the last job from the DistributableJob-Stack is allocated, but
55        /// the Manager is still waiting for some JobResults this event is thrown
56        /// </summary>
57        public event NoMoreJobsLeft OnNoMoreJobsLeft;
58        /// <summary>
59        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
60        /// this event is thrown
61        /// </summary>
62        public event AllJobResultsReceived OnAllJobResultsReceived;
63
64        #endregion
65
66        #region Variables
67
68        /// <summary>
69        /// this control contains a JobStack and other special
70        /// management for a SPECIAL distributable Job
71        /// </summary>
72        private IDistributableJob distributableJobControl;
73        /// <summary>
74        /// this list contains all jobs, which were sent to workers,
75        /// but the workers hadn't accept/decline the Job at present
76        /// </summary>
77        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
78        /// <summary>
79        /// this dict contains all jobs/workers, who were successfully
80        /// distributed (so the manager already had received a JobAccepted Msg)
81        /// </summary>
82        private Dictionary<BigInteger, PeerId> jobsInProgress;
83
84        private bool managerStarted = false;
85        public bool ManagerStarted
86        {
87            get { return this.managerStarted; }
88            private set { this.managerStarted = value; }
89        }
90
91        /// <summary>
92        /// When the Manager is started, this variable must be set.
93        /// </summary>
94        private string sTopic = String.Empty;
95        public string TopicName
96        {
97            get { return this.sTopic; }
98            private set { this.sTopic = value; }
99        }
100
101        private long lAliveMessageInterval;
102        public long AliveMesageInterval
103        {
104            get { return this.lAliveMessageInterval ; }
105            set { this.lAliveMessageInterval = value; }
106        }
107
108        private DateTime startWorkingTime = DateTime.MinValue;
109        /// <summary>
110        /// This value will be initialized after allocating the first job to a worker.
111        /// Before initialization this is MinValue! Used for end time approximation
112        /// </summary>
113        public DateTime StartWorkingTime
114        {
115            get { return this.startWorkingTime; } 
116        }
117
118        #endregion
119
120        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
121        {
122            this.distributableJobControl = distributableJob;
123
124            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
125            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
126        }
127
128        public void StartManager(string sTopic, long aliveMessageInterval)
129        {
130            // only when the main manager plugin is connected with a Peer-PlugIn
131            // and a IWorkerControl-PlugIn, this Manager can start its work
132            if (this.distributableJobControl != null && this.p2pControl != null)
133            {
134                //set value to null, when restarting the manager
135                this.startWorkingTime = DateTime.MinValue; 
136                this.TopicName = sTopic;
137                this.AliveMesageInterval = aliveMessageInterval;
138                base.Start(this.TopicName, this.AliveMesageInterval);
139            }
140            else
141            {
142                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
143            }
144        }
145
146        protected override void PeerCompletelyStarted()
147        {
148            base.PeerCompletelyStarted();
149
150            this.ManagerStarted = true;
151            GetProgressInformation();
152            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
153        }
154
155        public override void Stop(PubSubMessageType msgType)
156        {
157            base.Stop(msgType);
158
159            this.ManagerStarted = false;
160            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
161            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
162
163            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
164        }
165
166        /// <summary>
167        /// because the manager needs additional peer information for all workers,
168        /// this method is overwritten. WorkersManagement throws events, when
169        /// a Worker leaves or joins the "solution network", so we can re-add or
170        /// allocate a job.
171        /// </summary>
172        /// <param name="aliveMessageInterval"></param>
173        protected override void AssignManagement(long aliveMessageInterval)
174        {
175            this.peerManagement = new WorkersManagement(aliveMessageInterval);
176            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
177            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
178            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
179        }
180
181        /// <summary>
182        /// only accepts DistributableJob-specific messages (created, checked and transformed by
183        /// the static class JobMessages). All other message are dropped!
184        /// </summary>
185        /// <param name="sender"></param>
186        /// <param name="data"></param>
187        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
188        {
189            if (!JobMessages.IsJobMessageType(data[0]))
190            {
191                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
192                return;
193            } 
194            switch (JobMessages.GetMessageJobType(data[0]))
195            {
196                case MessageJobType.JobAcceptanceInfo:
197                    HandleJobAcceptanceMessage(sender, data);
198                    break;
199                case MessageJobType.JobResult:
200                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
201                    HandleJobResultMessage(sender, data);
202                    break;
203                case MessageJobType.Free:
204                    HandleFreeMessage(sender, data);
205                    break;
206                default:
207                    GuiLogging("Obscure Message (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
208                    break;
209            } // end switch
210            GetProgressInformation();
211        }
212
213        /// <summary>
214        /// This method is only overwritten because we have to ignore the Solution-case in
215        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
216        /// the working status of the Manager, because it havn't the overview of the JobParts)
217        /// </summary>
218        /// <param name="sender"></param>
219        /// <param name="msgType"></param>
220        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
221        {
222            // ignore Solution case, because other worker could work on...
223            if (msgType != PubSubMessageType.Solution)
224                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
225                base.p2pControl_OnSystemMessageReceived(sender, msgType);
226        }
227
228        #region Handle different DistributableJob-specific, incoming messages
229
230        /// <summary>
231        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
232        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
233        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
234        /// </summary>
235        /// <param name="sender"></param>
236        /// <param name="data"></param>
237        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
238        {
239            BigInteger jobId = null;
240            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
241            {
242                this.distributableJobControl.JobAccepted(jobId);
243                lock (this.jobsInProgress)
244                {
245                    if (!this.jobsInProgress.ContainsKey(jobId))
246                    {
247                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
248                        this.jobsInProgress.Add(jobId, sender);
249                        if (OnNewJobAllocated != null)
250                            OnNewJobAllocated(jobId);
251                    }
252                    //else
253                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
254                }
255                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
256            }
257            else // if AcceptanceInfo is declined
258            {
259                this.distributableJobControl.JobDeclined(jobId);
260
261                // set busy worker to free, because he delined the job
262
263                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice...
264                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
265                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
266            }
267            // in every case remove the job from thew waiting Dictionary
268            lock (this.jobsWaitingForAcceptanceInfo)
269            {
270                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
271                {
272                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
273                }
274                //else
275                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
276            }
277        }
278
279        /// <summary>
280        /// Sets the incoming result in the DistributableJob class, removes the job from
281        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
282        /// </summary>
283        /// <param name="sender"></param>
284        /// <param name="data"></param>
285        private void HandleJobResultMessage(PeerId sender, byte[] data)
286        {
287            BigInteger jobId;
288
289            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
290            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
291
292            if (OnResultReceived != null)
293                OnResultReceived(jobId);
294
295            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
296                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
297
298            lock (this.jobsInProgress)
299            {
300                if (this.jobsInProgress.ContainsKey(jobId))
301                    this.jobsInProgress.Remove(jobId);
302                //dirty workaround because P2PJobAdmin sends the result msg twice...
303                //else
304                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
305            }
306        }
307
308        /// <summary>
309        /// If message content declares the sender as a free worker,
310        /// set this worker from busy to free, otherwise do nothing
311        /// </summary>
312        /// <param name="sender"></param>
313        /// <param name="data"></param>
314        private void HandleFreeMessage(PeerId sender, byte[] data)
315        {
316            // only handle the "true"-case, because otherwise there is nothing to do
317            if (JobMessages.GetFreeWorkerStatusMessage(data))
318            {
319                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
320                // only if worker already exists in the "busy list", it will set to free and event will be thrown
321                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
322            }
323        }
324
325        #endregion
326
327        #region Worker-action-handling
328
329        /// <summary>
330        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
331        /// </summary>
332        private void peerManagement_OnFreeWorkersAvailable()
333        {
334            if (!this.ManagerStarted)
335            {
336                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
337                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
338                if (removeSettings)
339                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
340                else
341                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
342            }
343            else
344                AllocateJobs();
345
346            GetProgressInformation();
347        }
348
349        /// <summary>
350        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
351        /// be pushed back to the main Jobstack
352        /// </summary>
353        /// <param name="peerId"></param>
354        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
355        {
356            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
357
358            // necessary lock, because the amount of jobs in Progress could change while traversing this list
359            lock (this.jobsInProgress)
360            {
361                // push job back and remove list entries for "jobs in progress"
362                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
363
364                BigInteger jobId;
365                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
366                {
367                    jobId = allJobsForRemovedPeer[i];
368                    this.distributableJobControl.Push(jobId);
369                    this.jobsInProgress.Remove(jobId);
370                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
371                }
372            }
373
374            // necessary lock, because the amount of jobs in Progress could change while traversing this list
375            lock (this.jobsWaitingForAcceptanceInfo)
376            {
377                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
378                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
379                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
380
381                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
382                {
383                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
384                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
385                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
386                }
387            }
388
389            GetProgressInformation();
390        }
391       
392        /// <summary>
393        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
394        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
395        /// so it can be checked, if the Worker respond to the Job-allocation
396        /// </summary>
397        private void AllocateJobs()
398        {
399            int i = 0;
400            BigInteger temp_jobId = null;
401            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetAllSubscribers();
402
403            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
404
405            // set the start working time after allocating the FIRST job
406            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
407                this.startWorkingTime = DateTime.Now;
408
409            foreach (PeerId worker in freePeers)
410            {
411                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
412                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
413                {
414                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
415                    // get actual subscriber/worker and send the new job
416                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
417
418                    if (OnNewJobAllocated != null)
419                        OnNewJobAllocated(temp_jobId);
420
421                    // set free worker to busy in the peerManagement class
422                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
423
424                    GuiLogging("Job '" + temp_jobId.ToString() + "' were sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
425                    i++;
426                }
427                else
428                {
429                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
430                    if (OnNoMoreJobsLeft != null)
431                        OnNoMoreJobsLeft();
432                }
433                GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
434            } // end foreach
435        }
436
437        #endregion
438
439        /// <summary>
440        /// returns the percental progress information of the whole job (value is between 0 and 100)
441        /// </summary>
442        /// <returns>the percental progress information of the whole job</returns>
443        private double GetProgressInformation()
444        {
445            double jobProgressInPercent;
446            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
447            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
448            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
449
450            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
451            {
452                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
453            }
454            else if (lAllocatedAmount > 0)
455            {
456                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
457            }
458            else if (lFinishedAmount > 0)
459            {
460                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
461            }
462            else
463            {
464                jobProgressInPercent = 0.0;
465            }
466
467            if (OnProcessProgress != null)
468                OnProcessProgress(jobProgressInPercent);
469
470            return jobProgressInPercent;
471        }
472
473        /// <summary>
474        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
475        /// When no job is finished yet, it returns an empty timespan
476        /// </summary>
477        /// <returns></returns>
478        public DateTime EstimatedEndTime()
479        {
480            DateTime retTime = DateTime.MaxValue;
481            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
482            {
483                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
484                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
485                double restSeconds = jobsPerSecond * 
486                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
487                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
488                retTime = DateTime.Now.AddSeconds(restSeconds);
489            }
490            return retTime;
491        }
492
493        #region Forward PeerManagement Values
494
495        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
496        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
497
498        #endregion
499    }
500}
Note: See TracBrowser for help on using the repository browser.