source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1433

Last change on this file since 1433 was 1433, checked in by Paul Lelgemann, 12 years ago

o Extracted common classes from PeerToPeerBase plugin into new PeerToPeer plugin as a preparation for the new P2P proxy
o Modified directory properties to ignore the CrypBuild directory

File size: 26.0 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24using Cryptool.Plugins.PeerToPeer.Internal;
25
26/* TODO:
27 * - Publisher-change is possible, but catch old Publishers subscriber list
28 *   isn't implemented yet ((de)serialization of the subscribers is
29 *   implemented and tested)
30 * - Manager-change is possible, but catch job history isn't implemented yet
31 *   ((de)serialization of job management lists)
32 * - Benchmarking the working peers
33 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
34 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
35 *   loosing any Job-Information (this happens at present, when pressing the Stop
36 *   button of the CrypTool Workspace)
37 */
38
39namespace Cryptool.Plugins.PeerToPeer
40{
41    public class P2PManagerBase_NEW : P2PPublisherBase
42    {
43        #region Events and Delegates
44
45        public delegate void ProcessProgress(double progressInPercent);       
46        public delegate void NewJobAllocated(BigInteger jobId);
47        public delegate void ResultReceived(BigInteger jobId);
48        public delegate void JobCanceled(BigInteger jobId);
49        public delegate void NoMoreJobsLeft();
50        public delegate void AllJobResultsReceived(BigInteger lastJobId);
51        public event ProcessProgress OnProcessProgress;
52        /// <summary>
53        /// When a new job was successfully allocated to a worker (after receiving
54        /// its "JobAccepted"-Message), this event is thrown
55        /// </summary>
56        public event NewJobAllocated OnNewJobAllocated;
57        /// <summary>
58        /// When a new job result was received (and accepted) this event is thrown
59        /// </summary>
60        public event ResultReceived OnResultReceived;
61        /// <summary>
62        /// is thrown when an active worker leaves the network and the jobs come back
63        /// </summary>
64        public event JobCanceled OnJobCanceled;
65        /// <summary>
66        /// When the last job from the DistributableJob-Stack is allocated, but
67        /// the Manager is still waiting for some JobResults this event is thrown
68        /// </summary>
69        public event NoMoreJobsLeft OnNoMoreJobsLeft;
70        /// <summary>
71        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
72        /// this event is thrown
73        /// </summary>
74        public event AllJobResultsReceived OnAllJobResultsReceived;
75
76        #endregion
77
78        #region Variables
79
80        /// <summary>
81        /// this control contains a JobStack and other special
82        /// management for a SPECIAL distributable Job
83        /// </summary>
84        private IDistributableJob distributableJobControl;
85        /// <summary>
86        /// this list contains all jobs, which were sent to workers,
87        /// but the workers hadn't accept/decline the Job at present
88        /// </summary>
89        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
90        /// <summary>
91        /// this dict contains all jobs/workers, who were successfully
92        /// distributed (so the manager already had received a JobAccepted Msg)
93        /// </summary>
94        private Dictionary<BigInteger, PeerId> jobsInProgress;
95
96        private bool managerStarted = false;
97        public bool ManagerStarted
98        {
99            get { return this.managerStarted; }
100            private set { this.managerStarted = value; }
101        }
102
103        /// <summary>
104        /// When the Manager is started, this variable must be set.
105        /// </summary>
106        private string sTopic = String.Empty;
107        public string TopicName
108        {
109            get { return this.sTopic; }
110            private set { this.sTopic = value; }
111        }
112
113        private long lAliveMessageInterval;
114        public long AliveMesageInterval
115        {
116            get { return this.lAliveMessageInterval ; }
117            set { this.lAliveMessageInterval = value; }
118        }
119
120        private DateTime startWorkingTime = DateTime.MinValue;
121        /// <summary>
122        /// This value will be initialized after allocating the first job to a worker.
123        /// Before initialization this is MinValue! Used for end time approximation
124        /// </summary>
125        public DateTime StartWorkingTime
126        {
127            get { return this.startWorkingTime; } 
128        }
129
130        private bool lastJobAllocated = false;
131
132        #endregion
133
134        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
135        {
136            this.distributableJobControl = distributableJob;
137
138            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
139            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
140        }
141
142        public void StartManager(string sTopic, long aliveMessageInterval)
143        {
144            // only when the main manager plugin is connected with a Peer-PlugIn
145            // and a IWorkerControl-PlugIn, this Manager can start its work
146            if (this.distributableJobControl != null && this.p2pControl != null)
147            {
148                this.distributableJobControl.OnLastResultReceived += new LastResultReceived(distributableJobControl_OnLastResultReceived);
149
150                //set value to null, when restarting the manager
151                this.startWorkingTime = DateTime.MinValue; 
152                this.TopicName = sTopic;
153                this.AliveMesageInterval = aliveMessageInterval;
154                base.Start(this.TopicName, this.AliveMesageInterval);
155            }
156            else
157            {
158                GuiLogging("Manager couldn't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
159            }
160        }
161
162        void distributableJobControl_OnLastResultReceived(BigInteger jobId)
163        {
164            if (OnAllJobResultsReceived != null)
165                OnAllJobResultsReceived(jobId);
166
167            /* New Feature. When all JobResults are received, stop Manager, so another
168             * Manager can replace it and then allocate Jobs to the free workers*/
169            GuiLogging("All Job results received, so deregistering from the solution network to accomodate this topic for another Manager.",NotificationLevel.Info);
170
171            // added by Arnie 2010.04.24
172            SendNoMoreJobsLeftMsgToAllWorkers();
173
174            Stop(PubSubMessageType.Unregister);
175        }
176
177        // added by Arnie 2010.04.24
178        private void SendNoMoreJobsLeftMsgToAllWorkers()
179        {
180            int i = 0;
181            byte[] noMoreJobsLeftMsg = JobMessages.CreateNoMoreJobsLeftMessage();
182            foreach (PeerId wkr in base.peerManagement.GetAllSubscribers())
183            {
184                this.p2pControl.SendToPeer(noMoreJobsLeftMsg, wkr);
185                i++;
186            }
187            GuiLogging("Sended 'No more jobs left' messages to " + i + " workers.", NotificationLevel.Debug);
188        }
189
190        protected override void PeerCompletelyStarted()
191        {
192            base.PeerCompletelyStarted();
193
194            this.ManagerStarted = true;
195            GetProgressInformation();
196            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
197        }
198
199        public override void Stop(PubSubMessageType msgType)
200        {
201            base.Stop(msgType);
202
203            this.ManagerStarted = false;
204            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
205            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
206            this.distributableJobControl.OnLastResultReceived -= distributableJobControl_OnLastResultReceived;
207
208            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
209        }
210
211        /// <summary>
212        /// because the manager needs additional peer information for all workers,
213        /// this method is overwritten. WorkersManagement throws events, when
214        /// a Worker leaves or joins the "solution network", so we can re-add or
215        /// allocate a job.
216        /// </summary>
217        /// <param name="aliveMessageInterval"></param>
218        protected override void AssignManagement(long aliveMessageInterval)
219        {
220            this.peerManagement = new WorkersManagement(aliveMessageInterval);
221            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
222            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
223            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
224        }
225
226        /// <summary>
227        /// only accepts DistributableJob-specific messages (created, checked and transformed by
228        /// the static class JobMessages). All other message are dropped!
229        /// </summary>
230        /// <param name="sender"></param>
231        /// <param name="data"></param>
232        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
233        {
234            if (!JobMessages.IsJobMessageType(data[0]))
235            {
236                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
237                return;
238            } 
239            switch (JobMessages.GetMessageJobType(data[0]))
240            {
241                case MessageJobType.JobAcceptanceInfo:
242                    HandleJobAcceptanceMessage(sender, data);
243                    break;
244                case MessageJobType.JobResult:
245                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
246                    HandleJobResultMessage(sender, data);
247                    break;
248                case MessageJobType.Free:
249                    HandleFreeMessage(sender, data);
250                    break;
251                default:
252                    GuiLogging("Obscure Message. First byte: " + Convert.ToInt32(data[0]) + ". Data: (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
253                    break;
254            } // end switch
255            GetProgressInformation();
256        }
257
258        /// <summary>
259        /// This method is only overwritten because we have to ignore the Solution-case in
260        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
261        /// the working status of the Manager, because it havn't the overview of the JobParts)
262        /// </summary>
263        /// <param name="sender"></param>
264        /// <param name="msgType"></param>
265        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
266        {
267            // ignore Solution case, because other worker could work on...
268            if (msgType != PubSubMessageType.Solution)
269                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
270                base.p2pControl_OnSystemMessageReceived(sender, msgType);
271        }
272
273        #region Handle different DistributableJob-specific, incoming messages
274
275        /// <summary>
276        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
277        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
278        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
279        /// </summary>
280        /// <param name="sender"></param>
281        /// <param name="data"></param>
282        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
283        {
284            BigInteger jobId = null;
285            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
286            {
287                this.distributableJobControl.JobAccepted(jobId);
288                lock (this.jobsInProgress)
289                {
290                    if (!this.jobsInProgress.ContainsKey(jobId))
291                    {
292                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
293                        this.jobsInProgress.Add(jobId, sender);
294                        if (OnNewJobAllocated != null)
295                            OnNewJobAllocated(jobId);
296                    }
297                    //else
298                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
299                }
300                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
301            }
302            else // if AcceptanceInfo is declined
303            {
304                this.distributableJobControl.JobDeclined(jobId);
305
306                // set busy worker to free, because he delined the job
307
308                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice or more...
309                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
310                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
311            }
312            // in every case remove the job from thew waiting Dictionary
313            lock (this.jobsWaitingForAcceptanceInfo)
314            {
315                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
316                {
317                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
318                }
319                //else
320                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
321            }
322        }
323
324        /// <summary>
325        /// Sets the incoming result in the DistributableJob class, removes the job from
326        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
327        /// </summary>
328        /// <param name="sender"></param>
329        /// <param name="data"></param>
330        private void HandleJobResultMessage(PeerId sender, byte[] data)
331        {
332            BigInteger jobId;
333
334            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
335            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
336
337            if (OnResultReceived != null)
338                OnResultReceived(jobId);
339
340            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
341                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
342
343            lock (this.jobsInProgress)
344            {
345                if (this.jobsInProgress.ContainsKey(jobId))
346                    this.jobsInProgress.Remove(jobId);
347                //dirty workaround because P2PJobAdmin sends the result msg twice...
348                //else
349                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
350            }
351        }
352
353        /// <summary>
354        /// If message content declares the sender as a free worker,
355        /// set this worker from busy to free, otherwise do nothing
356        /// </summary>
357        /// <param name="sender"></param>
358        /// <param name="data"></param>
359        private void HandleFreeMessage(PeerId sender, byte[] data)
360        {
361            // only handle the "true"-case, because otherwise there is nothing to do
362            if (JobMessages.GetFreeWorkerStatusMessage(data))
363            {
364                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
365                // only if worker already exists in the "busy list", it will set to free and event will be thrown
366                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
367            }
368        }
369
370        #endregion
371
372        #region Worker-action-handling
373
374        /// <summary>
375        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
376        /// </summary>
377        private void peerManagement_OnFreeWorkersAvailable()
378        {
379            if (!this.ManagerStarted)
380            {
381                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
382                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
383                if (removeSettings)
384                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
385                else
386                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
387            }
388            else
389            {
390                /* edited by Arnold - 2010.02.23 */
391                // because parallel incoming free workers could run
392                // into concurrence in this method, so some workers
393                // could get more than one job - so they have to
394                // queue the additional jobs.
395                lock (this)
396                {
397                    AllocateJobs();
398                }
399            }
400
401            GetProgressInformation();
402        }
403
404        /// <summary>
405        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
406        /// be pushed back to the main Jobstack
407        /// </summary>
408        /// <param name="peerId"></param>
409        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
410        {
411            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
412
413            // necessary lock, because the amount of jobs in Progress could change while traversing this list
414            lock (this.jobsInProgress)
415            {
416                // push job back and remove list entries for "jobs in progress"
417                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
418
419                BigInteger jobId;
420                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
421                {
422                    jobId = allJobsForRemovedPeer[i];
423                    this.distributableJobControl.Push(jobId);
424                    this.jobsInProgress.Remove(jobId);
425                    if (OnJobCanceled != null)
426                        OnJobCanceled(jobId);
427                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
428                }
429            }
430
431            // necessary lock, because the amount of jobs in Progress could change while traversing this list
432            lock (this.jobsWaitingForAcceptanceInfo)
433            {
434                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
435                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
436                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
437
438                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
439                {
440                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
441                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
442                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
443                }
444            }
445
446            GetProgressInformation();
447        }
448       
449        /// <summary>
450        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
451        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
452        /// so it can be checked, if the Worker respond to the Job-allocation
453        /// </summary>
454        private void AllocateJobs()
455        {
456            int i = 0;
457            BigInteger temp_jobId = null;
458            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetFreeWorkers();
459
460            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
461
462            // set the start working time after allocating the FIRST job
463            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
464                this.startWorkingTime = DateTime.Now;
465
466            foreach (PeerId worker in freePeers)
467            {
468                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
469                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
470                {
471                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
472
473                    // set free worker to busy in the peerManagement class
474                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
475
476                    // get actual subscriber/worker and send the new job
477                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
478
479                    if (OnNewJobAllocated != null)
480                        OnNewJobAllocated(temp_jobId);
481
482                    GuiLogging("Job '" + temp_jobId.ToString() + "' was sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
483                    i++;
484                }
485                else
486                {
487                    //edited by Arnie 2010.04.24
488                    //sending "no more jobs left msg" to the free worker, so it can stop its Free-Msg-Timer
489                    base.p2pControl.SendToPeer(JobMessages.CreateNoMoreJobsLeftMessage(), worker);
490
491                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
492                    if (OnNoMoreJobsLeft != null)
493                        OnNoMoreJobsLeft();
494                }
495            } // end foreach
496            GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
497        }
498
499        #endregion
500
501        /// <summary>
502        /// returns the percental progress information of the whole job (value is between 0 and 100)
503        /// </summary>
504        /// <returns>the percental progress information of the whole job</returns>
505        private double GetProgressInformation()
506        {
507            double jobProgressInPercent;
508            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
509            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
510            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
511
512            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
513            {
514                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
515            }
516            else if (lAllocatedAmount > 0)
517            {
518                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
519            }
520            else if (lFinishedAmount > 0)
521            {
522                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
523            }
524            else
525            {
526                jobProgressInPercent = 0.0;
527            }
528
529            if (OnProcessProgress != null)
530                OnProcessProgress(jobProgressInPercent);
531
532            return jobProgressInPercent;
533        }
534
535        /// <summary>
536        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
537        /// When no job is finished yet, it returns an empty timespan
538        /// </summary>
539        /// <returns></returns>
540        public DateTime EstimatedEndTime()
541        {
542            DateTime retTime = DateTime.MaxValue;
543            if (this.distributableJobControl.FinishedAmount.LongValue() > 0)
544            {
545                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
546                double jobsPerSecond = bruteforcingTime.TotalSeconds / this.distributableJobControl.FinishedAmount.LongValue();
547                double restSeconds = jobsPerSecond * 
548                    (this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount).LongValue();
549                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
550                retTime = DateTime.Now.AddSeconds(restSeconds);
551            }
552            return retTime;
553        }
554
555        #region Forward PeerManagement Values
556
557        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
558        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
559
560        #endregion
561    }
562}
Note: See TracBrowser for help on using the repository browser.