source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1173

Last change on this file since 1173 was 1173, checked in by arnold, 12 years ago

some bug fixes.

File size: 23.8 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Publisher-change is possible, but catch old Publishers subscriber list
27 *   isn't implemented yet ((de)serialization of the subscribers is
28 *   implemented and tested)
29 * - Make Manager-change possible ((de)serialization of job management lists)
30 * - Benchmarking the working peers
31 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
32 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
33 *   loosing any Job-Information (this happens at present, when pressing the Stop
34 *   button of the CrypTool Workspace)
35 */
36
37namespace Cryptool.Plugins.PeerToPeer
38{
39    public class P2PManagerBase_NEW : P2PPublisherBase
40    {
41        #region Events and Delegates
42
43        public delegate void ProcessProgress(double progressInPercent);       
44        public delegate void NewJobAllocated(BigInteger jobId);
45        public delegate void ResultReceived(BigInteger jobId);
46        public delegate void NoMoreJobsLeft();
47        public delegate void AllJobResultsReceived(BigInteger lastJobId);
48        public event ProcessProgress OnProcessProgress;
49        /// <summary>
50        /// When a new job was successfully allocated to a worker (after receiving
51        /// its "JobAccepted"-Message), this event is thrown
52        /// </summary>
53        public event NewJobAllocated OnNewJobAllocated;
54        /// <summary>
55        /// When a new job result was received (and accepted) this event is thrown
56        /// </summary>
57        public event ResultReceived OnResultReceived;
58        /// <summary>
59        /// When the last job from the DistributableJob-Stack is allocated, but
60        /// the Manager is still waiting for some JobResults this event is thrown
61        /// </summary>
62        public event NoMoreJobsLeft OnNoMoreJobsLeft;
63        /// <summary>
64        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
65        /// this event is thrown
66        /// </summary>
67        public event AllJobResultsReceived OnAllJobResultsReceived;
68
69        #endregion
70
71        #region Variables
72
73        /// <summary>
74        /// this control contains a JobStack and other special
75        /// management for a SPECIAL distributable Job
76        /// </summary>
77        private IDistributableJob distributableJobControl;
78        /// <summary>
79        /// this list contains all jobs, which were sent to workers,
80        /// but the workers hadn't accept/decline the Job at present
81        /// </summary>
82        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
83        /// <summary>
84        /// this dict contains all jobs/workers, who were successfully
85        /// distributed (so the manager already had received a JobAccepted Msg)
86        /// </summary>
87        private Dictionary<BigInteger, PeerId> jobsInProgress;
88
89        private bool managerStarted = false;
90        public bool ManagerStarted
91        {
92            get { return this.managerStarted; }
93            private set { this.managerStarted = value; }
94        }
95
96        /// <summary>
97        /// When the Manager is started, this variable must be set.
98        /// </summary>
99        private string sTopic = String.Empty;
100        public string TopicName
101        {
102            get { return this.sTopic; }
103            private set { this.sTopic = value; }
104        }
105
106        private long lAliveMessageInterval;
107        public long AliveMesageInterval
108        {
109            get { return this.lAliveMessageInterval ; }
110            set { this.lAliveMessageInterval = value; }
111        }
112
113        private DateTime startWorkingTime = DateTime.MinValue;
114        /// <summary>
115        /// This value will be initialized after allocating the first job to a worker.
116        /// Before initialization this is MinValue! Used for end time approximation
117        /// </summary>
118        public DateTime StartWorkingTime
119        {
120            get { return this.startWorkingTime; } 
121        }
122
123        #endregion
124
125        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
126        {
127            this.distributableJobControl = distributableJob;
128
129            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
130            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
131        }
132
133        public void StartManager(string sTopic, long aliveMessageInterval)
134        {
135            // only when the main manager plugin is connected with a Peer-PlugIn
136            // and a IWorkerControl-PlugIn, this Manager can start its work
137            if (this.distributableJobControl != null && this.p2pControl != null)
138            {
139                //set value to null, when restarting the manager
140                this.startWorkingTime = DateTime.MinValue; 
141                this.TopicName = sTopic;
142                this.AliveMesageInterval = aliveMessageInterval;
143                base.Start(this.TopicName, this.AliveMesageInterval);
144            }
145            else
146            {
147                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
148            }
149        }
150
151        protected override void PeerCompletelyStarted()
152        {
153            base.PeerCompletelyStarted();
154
155            this.ManagerStarted = true;
156            GetProgressInformation();
157            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
158        }
159
160        public override void Stop(PubSubMessageType msgType)
161        {
162            base.Stop(msgType);
163
164            this.ManagerStarted = false;
165            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
166            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
167
168            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
169        }
170
171        /// <summary>
172        /// because the manager needs additional peer information for all workers,
173        /// this method is overwritten. WorkersManagement throws events, when
174        /// a Worker leaves or joins the "solution network", so we can re-add or
175        /// allocate a job.
176        /// </summary>
177        /// <param name="aliveMessageInterval"></param>
178        protected override void AssignManagement(long aliveMessageInterval)
179        {
180            this.peerManagement = new WorkersManagement(aliveMessageInterval);
181            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
182            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
183            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
184        }
185
186        /// <summary>
187        /// only accepts DistributableJob-specific messages (created, checked and transformed by
188        /// the static class JobMessages). All other message are dropped!
189        /// </summary>
190        /// <param name="sender"></param>
191        /// <param name="data"></param>
192        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
193        {
194            if (!JobMessages.IsJobMessageType(data[0]))
195            {
196                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
197                return;
198            } 
199            switch (JobMessages.GetMessageJobType(data[0]))
200            {
201                case MessageJobType.JobAcceptanceInfo:
202                    HandleJobAcceptanceMessage(sender, data);
203                    break;
204                case MessageJobType.JobResult:
205                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
206                    HandleJobResultMessage(sender, data);
207                    break;
208                case MessageJobType.Free:
209                    HandleFreeMessage(sender, data);
210                    break;
211                default:
212                    GuiLogging("Obscure Message (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
213                    break;
214            } // end switch
215            GetProgressInformation();
216        }
217
218        /// <summary>
219        /// This method is only overwritten because we have to ignore the Solution-case in
220        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
221        /// the working status of the Manager, because it havn't the overview of the JobParts)
222        /// </summary>
223        /// <param name="sender"></param>
224        /// <param name="msgType"></param>
225        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
226        {
227            // ignore Solution case, because other worker could work on...
228            if (msgType != PubSubMessageType.Solution)
229                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
230                base.p2pControl_OnSystemMessageReceived(sender, msgType);
231        }
232
233        #region Handle different DistributableJob-specific, incoming messages
234
235        /// <summary>
236        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
237        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
238        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
239        /// </summary>
240        /// <param name="sender"></param>
241        /// <param name="data"></param>
242        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
243        {
244            BigInteger jobId = null;
245            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
246            {
247                this.distributableJobControl.JobAccepted(jobId);
248                lock (this.jobsInProgress)
249                {
250                    if (!this.jobsInProgress.ContainsKey(jobId))
251                    {
252                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
253                        this.jobsInProgress.Add(jobId, sender);
254                        if (OnNewJobAllocated != null)
255                            OnNewJobAllocated(jobId);
256                    }
257                    //else
258                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
259                }
260                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
261            }
262            else // if AcceptanceInfo is declined
263            {
264                this.distributableJobControl.JobDeclined(jobId);
265
266                // set busy worker to free, because he delined the job
267
268                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice...
269                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
270                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
271            }
272            // in every case remove the job from thew waiting Dictionary
273            lock (this.jobsWaitingForAcceptanceInfo)
274            {
275                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
276                {
277                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
278                }
279                //else
280                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
281            }
282        }
283
284        /// <summary>
285        /// Sets the incoming result in the DistributableJob class, removes the job from
286        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
287        /// </summary>
288        /// <param name="sender"></param>
289        /// <param name="data"></param>
290        private void HandleJobResultMessage(PeerId sender, byte[] data)
291        {
292            BigInteger jobId;
293
294            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
295            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
296
297            if (OnResultReceived != null)
298                OnResultReceived(jobId);
299
300            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
301                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
302
303            lock (this.jobsInProgress)
304            {
305                if (this.jobsInProgress.ContainsKey(jobId))
306                    this.jobsInProgress.Remove(jobId);
307                //dirty workaround because P2PJobAdmin sends the result msg twice...
308                //else
309                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
310            }
311        }
312
313        /// <summary>
314        /// If message content declares the sender as a free worker,
315        /// set this worker from busy to free, otherwise do nothing
316        /// </summary>
317        /// <param name="sender"></param>
318        /// <param name="data"></param>
319        private void HandleFreeMessage(PeerId sender, byte[] data)
320        {
321            // only handle the "true"-case, because otherwise there is nothing to do
322            if (JobMessages.GetFreeWorkerStatusMessage(data))
323            {
324                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
325                // only if worker already exists in the "busy list", it will set to free and event will be thrown
326                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
327            }
328        }
329
330        #endregion
331
332        #region Worker-action-handling
333
334        /// <summary>
335        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
336        /// </summary>
337        private void peerManagement_OnFreeWorkersAvailable()
338        {
339            if (!this.ManagerStarted)
340            {
341                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
342                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
343                if (removeSettings)
344                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
345                else
346                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
347            }
348            else
349            {
350                /* edited by Arnold - 2010.02.23 */
351                // because parallel incoming free workers could run
352                // into concurrence in this method, so some workers
353                // could get more than one job - so they have to
354                // queue the additional jobs.
355                lock (this)
356                {
357                    AllocateJobs();
358                }
359            }
360
361            GetProgressInformation();
362        }
363
364        /// <summary>
365        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
366        /// be pushed back to the main Jobstack
367        /// </summary>
368        /// <param name="peerId"></param>
369        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
370        {
371            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
372
373            // necessary lock, because the amount of jobs in Progress could change while traversing this list
374            lock (this.jobsInProgress)
375            {
376                // push job back and remove list entries for "jobs in progress"
377                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
378
379                BigInteger jobId;
380                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
381                {
382                    jobId = allJobsForRemovedPeer[i];
383                    this.distributableJobControl.Push(jobId);
384                    this.jobsInProgress.Remove(jobId);
385                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
386                }
387            }
388
389            // necessary lock, because the amount of jobs in Progress could change while traversing this list
390            lock (this.jobsWaitingForAcceptanceInfo)
391            {
392                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
393                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
394                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
395
396                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
397                {
398                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
399                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
400                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
401                }
402            }
403
404            GetProgressInformation();
405        }
406       
407        /// <summary>
408        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
409        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
410        /// so it can be checked, if the Worker respond to the Job-allocation
411        /// </summary>
412        private void AllocateJobs()
413        {
414            int i = 0;
415            BigInteger temp_jobId = null;
416            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetAllSubscribers();
417
418            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
419
420            // set the start working time after allocating the FIRST job
421            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
422                this.startWorkingTime = DateTime.Now;
423
424            foreach (PeerId worker in freePeers)
425            {
426                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
427                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
428                {
429                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
430                    // get actual subscriber/worker and send the new job
431                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
432
433                    if (OnNewJobAllocated != null)
434                        OnNewJobAllocated(temp_jobId);
435
436                    // set free worker to busy in the peerManagement class
437                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
438
439                    GuiLogging("Job '" + temp_jobId.ToString() + "' were sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
440                    i++;
441                }
442                else
443                {
444                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
445                    if (OnNoMoreJobsLeft != null)
446                        OnNoMoreJobsLeft();
447                }
448                GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
449            } // end foreach
450        }
451
452        #endregion
453
454        /// <summary>
455        /// returns the percental progress information of the whole job (value is between 0 and 100)
456        /// </summary>
457        /// <returns>the percental progress information of the whole job</returns>
458        private double GetProgressInformation()
459        {
460            double jobProgressInPercent;
461            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
462            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
463            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
464
465            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
466            {
467                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
468            }
469            else if (lAllocatedAmount > 0)
470            {
471                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
472            }
473            else if (lFinishedAmount > 0)
474            {
475                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
476            }
477            else
478            {
479                jobProgressInPercent = 0.0;
480            }
481
482            if (OnProcessProgress != null)
483                OnProcessProgress(jobProgressInPercent);
484
485            return jobProgressInPercent;
486        }
487
488        /// <summary>
489        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
490        /// When no job is finished yet, it returns an empty timespan
491        /// </summary>
492        /// <returns></returns>
493        public DateTime EstimatedEndTime()
494        {
495            DateTime retTime = DateTime.MaxValue;
496            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
497            {
498                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
499                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
500                double restSeconds = jobsPerSecond * 
501                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
502                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
503                retTime = DateTime.Now.AddSeconds(restSeconds);
504            }
505            return retTime;
506        }
507
508        #region Forward PeerManagement Values
509
510        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
511        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
512
513        #endregion
514    }
515}
Note: See TracBrowser for help on using the repository browser.