source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1448

Last change on this file since 1448 was 1448, checked in by Sven Rech, 12 years ago

replaced all BigInteger stuff with the new BigInteger class from .net 4.0

But there are still problems with some plugins (Keysearcher, BigInteger Operations...)

File size: 26.0 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24using Cryptool.Plugins.PeerToPeer.Internal;
25using System.Numerics;
26
27/* TODO:
28 * - Publisher-change is possible, but catch old Publishers subscriber list
29 *   isn't implemented yet ((de)serialization of the subscribers is
30 *   implemented and tested)
31 * - Manager-change is possible, but catch job history isn't implemented yet
32 *   ((de)serialization of job management lists)
33 * - Benchmarking the working peers
34 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
35 * - Insert internal Start-/Stop-Button, so Manager can stop its works without
36 *   loosing any Job-Information (this happens at present, when pressing the Stop
37 *   button of the CrypTool Workspace)
38 */
39
40namespace Cryptool.Plugins.PeerToPeer
41{
42    public class P2PManagerBase_NEW : P2PPublisherBase
43    {
44        #region Events and Delegates
45
46        public delegate void ProcessProgress(double progressInPercent);       
47        public delegate void NewJobAllocated(BigInteger jobId);
48        public delegate void ResultReceived(BigInteger jobId);
49        public delegate void JobCanceled(BigInteger jobId);
50        public delegate void NoMoreJobsLeft();
51        public delegate void AllJobResultsReceived(BigInteger lastJobId);
52        public event ProcessProgress OnProcessProgress;
53        /// <summary>
54        /// When a new job was successfully allocated to a worker (after receiving
55        /// its "JobAccepted"-Message), this event is thrown
56        /// </summary>
57        public event NewJobAllocated OnNewJobAllocated;
58        /// <summary>
59        /// When a new job result was received (and accepted) this event is thrown
60        /// </summary>
61        public event ResultReceived OnResultReceived;
62        /// <summary>
63        /// is thrown when an active worker leaves the network and the jobs come back
64        /// </summary>
65        public event JobCanceled OnJobCanceled;
66        /// <summary>
67        /// When the last job from the DistributableJob-Stack is allocated, but
68        /// the Manager is still waiting for some JobResults this event is thrown
69        /// </summary>
70        public event NoMoreJobsLeft OnNoMoreJobsLeft;
71        /// <summary>
72        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
73        /// this event is thrown
74        /// </summary>
75        public event AllJobResultsReceived OnAllJobResultsReceived;
76
77        #endregion
78
79        #region Variables
80
81        /// <summary>
82        /// this control contains a JobStack and other special
83        /// management for a SPECIAL distributable Job
84        /// </summary>
85        private IDistributableJob distributableJobControl;
86        /// <summary>
87        /// this list contains all jobs, which were sent to workers,
88        /// but the workers hadn't accept/decline the Job at present
89        /// </summary>
90        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
91        /// <summary>
92        /// this dict contains all jobs/workers, who were successfully
93        /// distributed (so the manager already had received a JobAccepted Msg)
94        /// </summary>
95        private Dictionary<BigInteger, PeerId> jobsInProgress;
96
97        private bool managerStarted = false;
98        public bool ManagerStarted
99        {
100            get { return this.managerStarted; }
101            private set { this.managerStarted = value; }
102        }
103
104        /// <summary>
105        /// When the Manager is started, this variable must be set.
106        /// </summary>
107        private string sTopic = String.Empty;
108        public string TopicName
109        {
110            get { return this.sTopic; }
111            private set { this.sTopic = value; }
112        }
113
114        private long lAliveMessageInterval;
115        public long AliveMesageInterval
116        {
117            get { return this.lAliveMessageInterval ; }
118            set { this.lAliveMessageInterval = value; }
119        }
120
121        private DateTime startWorkingTime = DateTime.MinValue;
122        /// <summary>
123        /// This value will be initialized after allocating the first job to a worker.
124        /// Before initialization this is MinValue! Used for end time approximation
125        /// </summary>
126        public DateTime StartWorkingTime
127        {
128            get { return this.startWorkingTime; } 
129        }
130
131        private bool lastJobAllocated = false;
132
133        #endregion
134
135        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
136        {
137            this.distributableJobControl = distributableJob;
138
139            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
140            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
141        }
142
143        public void StartManager(string sTopic, long aliveMessageInterval)
144        {
145            // only when the main manager plugin is connected with a Peer-PlugIn
146            // and a IWorkerControl-PlugIn, this Manager can start its work
147            if (this.distributableJobControl != null && this.p2pControl != null)
148            {
149                this.distributableJobControl.OnLastResultReceived += new LastResultReceived(distributableJobControl_OnLastResultReceived);
150
151                //set value to null, when restarting the manager
152                this.startWorkingTime = DateTime.MinValue; 
153                this.TopicName = sTopic;
154                this.AliveMesageInterval = aliveMessageInterval;
155                base.Start(this.TopicName, this.AliveMesageInterval);
156            }
157            else
158            {
159                GuiLogging("Manager couldn't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
160            }
161        }
162
163        void distributableJobControl_OnLastResultReceived(BigInteger jobId)
164        {
165            if (OnAllJobResultsReceived != null)
166                OnAllJobResultsReceived(jobId);
167
168            /* New Feature. When all JobResults are received, stop Manager, so another
169             * Manager can replace it and then allocate Jobs to the free workers*/
170            GuiLogging("All Job results received, so deregistering from the solution network to accomodate this topic for another Manager.",NotificationLevel.Info);
171
172            // added by Arnie 2010.04.24
173            SendNoMoreJobsLeftMsgToAllWorkers();
174
175            Stop(PubSubMessageType.Unregister);
176        }
177
178        // added by Arnie 2010.04.24
179        private void SendNoMoreJobsLeftMsgToAllWorkers()
180        {
181            int i = 0;
182            byte[] noMoreJobsLeftMsg = JobMessages.CreateNoMoreJobsLeftMessage();
183            foreach (PeerId wkr in base.peerManagement.GetAllSubscribers())
184            {
185                this.p2pControl.SendToPeer(noMoreJobsLeftMsg, wkr);
186                i++;
187            }
188            GuiLogging("Sended 'No more jobs left' messages to " + i + " workers.", NotificationLevel.Debug);
189        }
190
191        protected override void PeerCompletelyStarted()
192        {
193            base.PeerCompletelyStarted();
194
195            this.ManagerStarted = true;
196            GetProgressInformation();
197            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
198        }
199
200        public override void Stop(PubSubMessageType msgType)
201        {
202            base.Stop(msgType);
203
204            this.ManagerStarted = false;
205            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
206            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
207            this.distributableJobControl.OnLastResultReceived -= distributableJobControl_OnLastResultReceived;
208
209            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
210        }
211
212        /// <summary>
213        /// because the manager needs additional peer information for all workers,
214        /// this method is overwritten. WorkersManagement throws events, when
215        /// a Worker leaves or joins the "solution network", so we can re-add or
216        /// allocate a job.
217        /// </summary>
218        /// <param name="aliveMessageInterval"></param>
219        protected override void AssignManagement(long aliveMessageInterval)
220        {
221            this.peerManagement = new WorkersManagement(aliveMessageInterval);
222            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
223            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
224            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
225        }
226
227        /// <summary>
228        /// only accepts DistributableJob-specific messages (created, checked and transformed by
229        /// the static class JobMessages). All other message are dropped!
230        /// </summary>
231        /// <param name="sender"></param>
232        /// <param name="data"></param>
233        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
234        {
235            if (!JobMessages.IsJobMessageType(data[0]))
236            {
237                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
238                return;
239            } 
240            switch (JobMessages.GetMessageJobType(data[0]))
241            {
242                case MessageJobType.JobAcceptanceInfo:
243                    HandleJobAcceptanceMessage(sender, data);
244                    break;
245                case MessageJobType.JobResult:
246                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
247                    HandleJobResultMessage(sender, data);
248                    break;
249                case MessageJobType.Free:
250                    HandleFreeMessage(sender, data);
251                    break;
252                default:
253                    GuiLogging("Obscure Message. First byte: " + Convert.ToInt32(data[0]) + ". Data: (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
254                    break;
255            } // end switch
256            GetProgressInformation();
257        }
258
259        /// <summary>
260        /// This method is only overwritten because we have to ignore the Solution-case in
261        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
262        /// the working status of the Manager, because it havn't the overview of the JobParts)
263        /// </summary>
264        /// <param name="sender"></param>
265        /// <param name="msgType"></param>
266        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
267        {
268            // ignore Solution case, because other worker could work on...
269            if (msgType != PubSubMessageType.Solution)
270                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
271                base.p2pControl_OnSystemMessageReceived(sender, msgType);
272        }
273
274        #region Handle different DistributableJob-specific, incoming messages
275
276        /// <summary>
277        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
278        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
279        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
280        /// </summary>
281        /// <param name="sender"></param>
282        /// <param name="data"></param>
283        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
284        {
285            BigInteger jobId = 0;
286            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
287            {
288                this.distributableJobControl.JobAccepted(jobId);
289                lock (this.jobsInProgress)
290                {
291                    if (!this.jobsInProgress.ContainsKey(jobId))
292                    {
293                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
294                        this.jobsInProgress.Add(jobId, sender);
295                        if (OnNewJobAllocated != null)
296                            OnNewJobAllocated(jobId);
297                    }
298                    //else
299                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
300                }
301                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
302            }
303            else // if AcceptanceInfo is declined
304            {
305                this.distributableJobControl.JobDeclined(jobId);
306
307                // set busy worker to free, because he delined the job
308
309                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice or more...
310                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
311                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
312            }
313            // in every case remove the job from thew waiting Dictionary
314            lock (this.jobsWaitingForAcceptanceInfo)
315            {
316                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
317                {
318                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
319                }
320                //else
321                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
322            }
323        }
324
325        /// <summary>
326        /// Sets the incoming result in the DistributableJob class, removes the job from
327        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
328        /// </summary>
329        /// <param name="sender"></param>
330        /// <param name="data"></param>
331        private void HandleJobResultMessage(PeerId sender, byte[] data)
332        {
333            BigInteger jobId;
334
335            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
336            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
337
338            if (OnResultReceived != null)
339                OnResultReceived(jobId);
340
341            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
342                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
343
344            lock (this.jobsInProgress)
345            {
346                if (this.jobsInProgress.ContainsKey(jobId))
347                    this.jobsInProgress.Remove(jobId);
348                //dirty workaround because P2PJobAdmin sends the result msg twice...
349                //else
350                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
351            }
352        }
353
354        /// <summary>
355        /// If message content declares the sender as a free worker,
356        /// set this worker from busy to free, otherwise do nothing
357        /// </summary>
358        /// <param name="sender"></param>
359        /// <param name="data"></param>
360        private void HandleFreeMessage(PeerId sender, byte[] data)
361        {
362            // only handle the "true"-case, because otherwise there is nothing to do
363            if (JobMessages.GetFreeWorkerStatusMessage(data))
364            {
365                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
366                // only if worker already exists in the "busy list", it will set to free and event will be thrown
367                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
368            }
369        }
370
371        #endregion
372
373        #region Worker-action-handling
374
375        /// <summary>
376        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
377        /// </summary>
378        private void peerManagement_OnFreeWorkersAvailable()
379        {
380            if (!this.ManagerStarted)
381            {
382                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
383                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
384                if (removeSettings)
385                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
386                else
387                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
388            }
389            else
390            {
391                /* edited by Arnold - 2010.02.23 */
392                // because parallel incoming free workers could run
393                // into concurrence in this method, so some workers
394                // could get more than one job - so they have to
395                // queue the additional jobs.
396                lock (this)
397                {
398                    AllocateJobs();
399                }
400            }
401
402            GetProgressInformation();
403        }
404
405        /// <summary>
406        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
407        /// be pushed back to the main Jobstack
408        /// </summary>
409        /// <param name="peerId"></param>
410        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
411        {
412            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
413
414            // necessary lock, because the amount of jobs in Progress could change while traversing this list
415            lock (this.jobsInProgress)
416            {
417                // push job back and remove list entries for "jobs in progress"
418                List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
419
420                BigInteger jobId;
421                for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
422                {
423                    jobId = allJobsForRemovedPeer[i];
424                    this.distributableJobControl.Push(jobId);
425                    this.jobsInProgress.Remove(jobId);
426                    if (OnJobCanceled != null)
427                        OnJobCanceled(jobId);
428                    GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
429                }
430            }
431
432            // necessary lock, because the amount of jobs in Progress could change while traversing this list
433            lock (this.jobsWaitingForAcceptanceInfo)
434            {
435                // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
436                // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
437                List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
438
439                for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
440                {
441                    this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
442                    this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
443                    GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
444                }
445            }
446
447            GetProgressInformation();
448        }
449       
450        /// <summary>
451        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
452        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
453        /// so it can be checked, if the Worker respond to the Job-allocation
454        /// </summary>
455        private void AllocateJobs()
456        {
457            int i = 0;
458            BigInteger temp_jobId = 0;
459            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetFreeWorkers();
460
461            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
462
463            // set the start working time after allocating the FIRST job
464            if (this.startWorkingTime == DateTime.MinValue && freePeers.Count > 0)
465                this.startWorkingTime = DateTime.Now;
466
467            foreach (PeerId worker in freePeers)
468            {
469                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
470                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
471                {
472                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
473
474                    // set free worker to busy in the peerManagement class
475                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
476
477                    // get actual subscriber/worker and send the new job
478                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
479
480                    if (OnNewJobAllocated != null)
481                        OnNewJobAllocated(temp_jobId);
482
483                    GuiLogging("Job '" + temp_jobId.ToString() + "' was sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
484                    i++;
485                }
486                else
487                {
488                    //edited by Arnie 2010.04.24
489                    //sending "no more jobs left msg" to the free worker, so it can stop its Free-Msg-Timer
490                    base.p2pControl.SendToPeer(JobMessages.CreateNoMoreJobsLeftMessage(), worker);
491
492                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
493                    if (OnNoMoreJobsLeft != null)
494                        OnNoMoreJobsLeft();
495                }
496            } // end foreach
497            GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
498        }
499
500        #endregion
501
502        /// <summary>
503        /// returns the percental progress information of the whole job (value is between 0 and 100)
504        /// </summary>
505        /// <returns>the percental progress information of the whole job</returns>
506        private double GetProgressInformation()
507        {
508            double jobProgressInPercent;
509            double lFinishedAmount = (double)(long)this.distributableJobControl.FinishedAmount;
510            double lAllocatedAmount = (double)(long)this.distributableJobControl.AllocatedAmount;
511            double lTotalAmount = (double)(long)this.distributableJobControl.TotalAmount;
512
513            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
514            {
515                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
516            }
517            else if (lAllocatedAmount > 0)
518            {
519                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
520            }
521            else if (lFinishedAmount > 0)
522            {
523                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
524            }
525            else
526            {
527                jobProgressInPercent = 0.0;
528            }
529
530            if (OnProcessProgress != null)
531                OnProcessProgress(jobProgressInPercent);
532
533            return jobProgressInPercent;
534        }
535
536        /// <summary>
537        /// returns the estimated end time (correlation between Start Time, Total amount of jobs and finished jobs).
538        /// When no job is finished yet, it returns an empty timespan
539        /// </summary>
540        /// <returns></returns>
541        public DateTime EstimatedEndTime()
542        {
543            DateTime retTime = DateTime.MaxValue;
544            if ((long)this.distributableJobControl.FinishedAmount > 0)
545            {
546                TimeSpan bruteforcingTime = DateTime.Now.Subtract(this.StartWorkingTime);
547                double jobsPerSecond = bruteforcingTime.TotalSeconds / (long)this.distributableJobControl.FinishedAmount;
548                double restSeconds = jobsPerSecond * 
549                    (long)(this.distributableJobControl.TotalAmount - this.distributableJobControl.FinishedAmount);
550                //retTime.TotalSeconds = jobsPerSecond * (2 - (progressInPercent / 100));
551                retTime = DateTime.Now.AddSeconds(restSeconds);
552            }
553            return retTime;
554        }
555
556        #region Forward PeerManagement Values
557
558        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
559        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
560
561        #endregion
562    }
563}
Note: See TracBrowser for help on using the repository browser.