source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1199

Last change on this file since 1199 was 1199, checked in by arnold, 12 years ago

P2PManager

  • Enlarged KeyPatternSize range to 1.000, because the new DES/AES-Implementation runs so fast, that a KeyPattern with KeyPatternSize of 150 will be processed in approximately 40 seconds on a modern PC
  • Embellished Layout
  • Enhanced information display (total processing time)
  • Jobs in progress change their color every second between Yellow and LightGray

Samples:

  • Changed from CFB to CBC, because this modus is much faster than CFB!
  • Changed KeyPatternSize to 150, because the new DES/AES Implementation runs much faster
File size: 24.7 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Publisher-change is possible, but catch old Publishers subscriber list
27 *   isn't implemented yet ((de)serialization of the subscribers is
28 *   implemented and tested)
29 * - Manager-change is possible, but catch job history isn't implemented yes
30 *   ((de)serialization of job management lists)
31 * - Benchmarking the working peers
32 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
33 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
34 *   loosing any Job-Information (this happens at present, when pressing the Stop
35 *   button of the CrypTool Workspace)
36 */
37
38namespace Cryptool.Plugins.PeerToPeer
39{
40    public class P2PManagerBase_NEW : P2PPublisherBase
41    {
42        #region Events and Delegates
43
44        public delegate void ProcessProgress(double progressInPercent);       
45        public delegate void NewJobAllocated(BigInteger jobId);
46        public delegate void ResultReceived(BigInteger jobId);
47        public delegate void JobCanceled(BigInteger jobId);
48        public delegate void NoMoreJobsLeft();
49        public delegate void AllJobResultsReceived(BigInteger lastJobId);
50        public event ProcessProgress OnProcessProgress;
51        /// <summary>
52        /// When a new job was successfully allocated to a worker (after receiving
53        /// its "JobAccepted"-Message), this event is thrown
54        /// </summary>
55        public event NewJobAllocated OnNewJobAllocated;
56        /// <summary>
57        /// When a new job result was received (and accepted) this event is thrown
58        /// </summary>
59        public event ResultReceived OnResultReceived;
60        /// <summary>
61        /// is thrown when an active worker leaves the network and the jobs come back
62        /// </summary>
63        public event JobCanceled OnJobCanceled;
64        /// <summary>
65        /// When the last job from the DistributableJob-Stack is allocated, but
66        /// the Manager is still waiting for some JobResults this event is thrown
67        /// </summary>
68        public event NoMoreJobsLeft OnNoMoreJobsLeft;
69        /// <summary>
70        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
71        /// this event is thrown
72        /// </summary>
73        public event AllJobResultsReceived OnAllJobResultsReceived;
74
75        #endregion
76
77        #region Variables
78
79        /// <summary>
80        /// this control contains a JobStack and other special
81        /// management for a SPECIAL distributable Job
82        /// </summary>
83        private IDistributableJob distributableJobControl;
84        /// <summary>
85        /// this list contains all jobs, which were sent to workers,
86        /// but the workers hadn't accept/decline the Job at present
87        /// </summary>
88        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
89        /// <summary>
90        /// this dict contains all jobs/workers, who were successfully
91        /// distributed (so the manager already had received a JobAccepted Msg)
92        /// </summary>
93        private Dictionary<BigInteger, PeerId> jobsInProgress;
94
95        private bool managerStarted = false;
96        public bool ManagerStarted
97        {
98            get { return this.managerStarted; }
99            private set { this.managerStarted = value; }
100        }
101
102        /// <summary>
103        /// When the Manager is started, this variable must be set.
104        /// </summary>
105        private string sTopic = String.Empty;
106        public string TopicName
107        {
108            get { return this.sTopic; }
109            private set { this.sTopic = value; }
110        }
111
112        private long lAliveMessageInterval;
113        public long AliveMesageInterval
114        {
115            get { return this.lAliveMessageInterval ; }
116            set { this.lAliveMessageInterval = value; }
117        }
118
119        private DateTime startWorkingTime = DateTime.MinValue;
120        /// <summary>
121        /// This value will be initialized after allocating the first job to a worker.
122        /// Before initialization this is MinValue! Used for end time approximation
123        /// </summary>
124        public DateTime StartWorkingTime
125        {
126            get { return this.startWorkingTime; } 
127        }
128
129        private bool lastJobAllocated = false;
130
131        #endregion
132
133        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
134        {
135            this.distributableJobControl = distributableJob;
136
137            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
138            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
139        }
140
141        public void StartManager(string sTopic, long aliveMessageInterval)
142        {
143            // only when the main manager plugin is connected with a Peer-PlugIn
144            // and a IWorkerControl-PlugIn, this Manager can start its work
145            if (this.distributableJobControl != null && this.p2pControl != null)
146            {
147                this.distributableJobControl.OnLastResultReceived += new LastResultReceived(distributableJobControl_OnLastResultReceived);
148
149                //set value to null, when restarting the manager
150                this.startWorkingTime = DateTime.MinValue; 
151                this.TopicName = sTopic;
152                this.AliveMesageInterval = aliveMessageInterval;
153                base.Start(this.TopicName, this.AliveMesageInterval);
154            }
155            else
156            {
157                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
158            }
159        }
160
161        void distributableJobControl_OnLastResultReceived(BigInteger jobId)
162        {
163            if (OnAllJobResultsReceived != null)
164                OnAllJobResultsReceived(jobId);
165        }
166
167        protected override void PeerCompletelyStarted()
168        {
169            base.PeerCompletelyStarted();
170
171            this.ManagerStarted = true;
172            GetProgressInformation();
173            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
174        }
175
176        public override void Stop(PubSubMessageType msgType)
177        {
178            base.Stop(msgType);
179
180            this.ManagerStarted = false;
181            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
182            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
183            this.distributableJobControl.OnLastResultReceived -= distributableJobControl_OnLastResultReceived;
184
185            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
186        }
187
188        /// <summary>
189        /// because the manager needs additional peer information for all workers,
190        /// this method is overwritten. WorkersManagement throws events, when
191        /// a Worker leaves or joins the "solution network", so we can re-add or
192        /// allocate a job.
193        /// </summary>
194        /// <param name="aliveMessageInterval"></param>
195        protected override void AssignManagement(long aliveMessageInterval)
196        {
197            this.peerManagement = new WorkersManagement(aliveMessageInterval);
198            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
199            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
200            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
201        }
202
203        /// <summary>
204        /// only accepts DistributableJob-specific messages (created, checked and transformed by
205        /// the static class JobMessages). All other message are dropped!
206        /// </summary>
207        /// <param name="sender"></param>
208        /// <param name="data"></param>
209        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
210        {
211            if (!JobMessages.IsJobMessageType(data[0]))
212            {
213                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
214                return;
215            } 
216            switch (JobMessages.GetMessageJobType(data[0]))
217            {
218                case MessageJobType.JobAcceptanceInfo:
219                    HandleJobAcceptanceMessage(sender, data);
220                    break;
221                case MessageJobType.JobResult:
222                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
223                    HandleJobResultMessage(sender, data);
224                    break;
225                case MessageJobType.Free:
226                    HandleFreeMessage(sender, data);
227                    break;
228                default:
229                    GuiLogging("Obscure Message (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
230                    break;
231            } // end switch
232            GetProgressInformation();
233        }
234
235        /// <summary>
236        /// This method is only overwritten because we have to ignore the Solution-case in
237        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
238        /// the working status of the Manager, because it havn't the overview of the JobParts)
239        /// </summary>
240        /// <param name="sender"></param>
241        /// <param name="msgType"></param>
242        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
243        {
244            // ignore Solution case, because other worker could work on...
245            if (msgType != PubSubMessageType.Solution)
246                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
247                base.p2pControl_OnSystemMessageReceived(sender, msgType);
248        }
249
250        #region Handle different DistributableJob-specific, incoming messages
251
252        /// <summary>
253        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
254        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
255        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
256        /// </summary>
257        /// <param name="sender"></param>
258        /// <param name="data"></param>
259        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
260        {
261            BigInteger jobId = null;
262            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
263            {
264                this.distributableJobControl.JobAccepted(jobId);
265                lock (this.jobsInProgress)
266                {
267                    if (!this.jobsInProgress.ContainsKey(jobId))
268                    {
269                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
270                        this.jobsInProgress.Add(jobId, sender);
271                        if (OnNewJobAllocated != null)
272                            OnNewJobAllocated(jobId);
273                    }
274                    //else
275                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
276                }
277                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
278            }
279            else // if AcceptanceInfo is declined
280            {
281                this.distributableJobControl.JobDeclined(jobId);
282
283                // set busy worker to free, because he delined the job
284
285                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice or more...
286                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
287                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
288            }
289            // in every case remove the job from thew waiting Dictionary
290            lock (this.jobsWaitingForAcceptanceInfo)
291            {
292                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
293                {
294                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
295                }
296                //else
297                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
298            }
299        }
300
301        /// <summary>
302        /// Sets the incoming result in the DistributableJob class, removes the job from
303        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
304        /// </summary>
305        /// <param name="sender"></param>
306        /// <param name="data"></param>
307        private void HandleJobResultMessage(PeerId sender, byte[] data)
308        {
309            BigInteger jobId;
310
311            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
312            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
313
314            if (OnResultReceived != null)
315                OnResultReceived(jobId);
316
317            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
318                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
319
320            lock (this.jobsInProgress)
321            {
322                if (this.jobsInProgress.ContainsKey(jobId))
323                    this.jobsInProgress.Remove(jobId);
324                //dirty workaround because P2PJobAdmin sends the result msg twice...
325                //else
326                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
327            }
328        }
329
330        /// <summary>
331        /// If message content declares the sender as a free worker,
332        /// set this worker from busy to free, otherwise do nothing
333        /// </summary>
334        /// <param name="sender"></param>
335        /// <param name="data"></param>
336        private void HandleFreeMessage(PeerId sender, byte[] data)
337        {
338            // only handle the "true"-case, because otherwise there is nothing to do
339            if (JobMessages.GetFreeWorkerStatusMessage(data))
340            {
341                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
342                // only if worker already exists in the "busy list", it will set to free and event will be thrown
343                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
344            }
345        }
346
347        #endregion
348
349        #region Worker-action-handling
350
351        /// <summary>
352        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
353        /// </summary>
354        private void peerManagement_OnFreeWorkersAvailable()
355        {
356            if (!this.ManagerStarted)
357            {
358                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
359                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
360                if (removeSettings)
361                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
362                else
363                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
364            }
365            else
366            {
367                /* edited by Arnold - 2010.02.23 */
368                // because parallel incoming free workers could run
369                // into concurrence in this method, so some workers
370                // could get more than one job - so they have to
371                // queue the additional jobs.
372                lock (this)
373                {
374                    AllocateJobs();
375                }
376            }
377
378            GetProgressInformation();
379        }
380
381        /// <summary>
382        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
383        /// be pushed back to the main Jobstack
384        /// </summary>
385        /// <param name="peerId"></param>
386        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
387        {
388            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
389
390            // necessary lock, because the amount of jobs in Progress could change while traversing this list
391            lock (this.jobsInProgress)
392            {
393                // push job back and remove list entries for "jobs in progress"
394                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
395
396                BigInteger jobId;
397                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
398                {
399                    jobId = allJobsForRemovedPeer[i];
400                    this.distributableJobControl.Push(jobId);
401                    this.jobsInProgress.Remove(jobId);
402                    if (OnJobCanceled != null)
403                        OnJobCanceled(jobId);
404                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
405                }
406            }
407
408            // necessary lock, because the amount of jobs in Progress could change while traversing this list
409            lock (this.jobsWaitingForAcceptanceInfo)
410            {
411                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
412                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
413                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
414
415                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
416                {
417                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
418                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
419                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
420                }
421            }
422
423            GetProgressInformation();
424        }
425       
426        /// <summary>
427        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
428        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
429        /// so it can be checked, if the Worker respond to the Job-allocation
430        /// </summary>
431        private void AllocateJobs()
432        {
433            int i = 0;
434            BigInteger temp_jobId = null;
435            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetFreeWorkers();
436
437            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
438
439            // set the start working time after allocating the FIRST job
440            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
441                this.startWorkingTime = DateTime.Now;
442
443            foreach (PeerId worker in freePeers)
444            {
445                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
446                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
447                {
448                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
449
450                    // set free worker to busy in the peerManagement class
451                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
452
453                    // get actual subscriber/worker and send the new job
454                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
455
456                    if (OnNewJobAllocated != null)
457                        OnNewJobAllocated(temp_jobId);
458
459                    GuiLogging("Job '" + temp_jobId.ToString() + "' were sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
460                    i++;
461                }
462                else
463                {
464                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
465                    if (OnNoMoreJobsLeft != null)
466                        OnNoMoreJobsLeft();
467                }
468            } // end foreach
469            GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
470        }
471
472        #endregion
473
474        /// <summary>
475        /// returns the percental progress information of the whole job (value is between 0 and 100)
476        /// </summary>
477        /// <returns>the percental progress information of the whole job</returns>
478        private double GetProgressInformation()
479        {
480            double jobProgressInPercent;
481            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
482            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
483            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
484
485            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
486            {
487                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
488            }
489            else if (lAllocatedAmount > 0)
490            {
491                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
492            }
493            else if (lFinishedAmount > 0)
494            {
495                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
496            }
497            else
498            {
499                jobProgressInPercent = 0.0;
500            }
501
502            if (OnProcessProgress != null)
503                OnProcessProgress(jobProgressInPercent);
504
505            return jobProgressInPercent;
506        }
507
508        /// <summary>
509        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
510        /// When no job is finished yet, it returns an empty timespan
511        /// </summary>
512        /// <returns></returns>
513        public DateTime EstimatedEndTime()
514        {
515            DateTime retTime = DateTime.MaxValue;
516            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
517            {
518                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
519                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
520                double restSeconds = jobsPerSecond * 
521                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
522                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
523                retTime = DateTime.Now.AddSeconds(restSeconds);
524            }
525            return retTime;
526        }
527
528        #region Forward PeerManagement Values
529
530        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
531        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
532
533        #endregion
534    }
535}
Note: See TracBrowser for help on using the repository browser.