source: trunk/CrypPlugins/PeerToPeerManager_NEW/P2PManagerBase_NEW.cs @ 1144

Last change on this file since 1144 was 1144, checked in by arnold, 12 years ago

P2PManager/P2PJobAdmin: Finetuning of leaving and re-joining the network.
Miscellaneous null-Checks implemented, so some p2p-sided errors where catched.

File size: 20.8 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.Plugins.PeerToPeer.Jobs;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24
25/* TODO:
26 * - Make Publisher-change possible ((de)serialize subscribers into DHT)
27 * - Make Manager-change possible
28 * - Benchmarking the working peers
29 *   (this.distributableJobControl.SetResult() returns the TimeSpan for the result)
30 */
31
32namespace Cryptool.Plugins.PeerToPeer
33{
34    public class P2PManagerBase_NEW : P2PPublisherBase
35    {
36        #region Events and Delegates
37
38        public delegate void ProcessProgress(double progressInPercent);       
39        public delegate void NewJobAllocated(BigInteger jobId);
40        public delegate void ResultReceived(BigInteger jobId);
41        public delegate void NoMoreJobsLeft();
42        public delegate void AllJobResultsReceived(BigInteger lastJobId);
43        public event ProcessProgress OnProcessProgress;
44        /// <summary>
45        /// When a new job was successfully allocated to a worker (after receiving
46        /// its "JobAccepted"-Message), this event is thrown
47        /// </summary>
48        public event NewJobAllocated OnNewJobAllocated;
49        /// <summary>
50        /// When a new job result was received (and accepted) this event is thrown
51        /// </summary>
52        public event ResultReceived OnResultReceived;
53        /// <summary>
54        /// When the last job from the DistributableJob-Stack is allocated, but
55        /// the Manager is still waiting for some JobResults this event is thrown
56        /// </summary>
57        public event NoMoreJobsLeft OnNoMoreJobsLeft;
58        /// <summary>
59        /// When no more jobs left AND the last "ausstehendes" JobResult comes in,
60        /// this event is thrown
61        /// </summary>
62        public event AllJobResultsReceived OnAllJobResultsReceived;
63
64        #endregion
65
66        #region Variables
67
68        /// <summary>
69        /// this control contains a JobStack and other special
70        /// management for a SPECIAL distributable Job
71        /// </summary>
72        private IDistributableJob distributableJobControl;
73        /// <summary>
74        /// this list contains all jobs, which were sent to workers,
75        /// but the workers hadn't accept/decline the Job at present
76        /// </summary>
77        private Dictionary<BigInteger, PeerId> jobsWaitingForAcceptanceInfo;
78        /// <summary>
79        /// this dict contains all jobs/workers, who were successfully
80        /// distributed (so the manager already had received a JobAccepted Msg)
81        /// </summary>
82        private Dictionary<BigInteger, PeerId> jobsInProgress;
83
84        private bool managerStarted = false;
85        public bool ManagerStarted
86        {
87            get { return this.managerStarted; }
88            private set { this.managerStarted = value; }
89        }
90
91        /// <summary>
92        /// When the Manager is started, this variable must be set.
93        /// </summary>
94        private string sTopic = String.Empty;
95        public string TopicName
96        {
97            get { return this.sTopic; }
98            private set { this.sTopic = value; }
99        }
100
101        private long lAliveMessageInterval;
102        public long AliveMesageInterval
103        {
104            get { return this.lAliveMessageInterval ; }
105            set { this.lAliveMessageInterval = value; }
106        }
107
108        #endregion
109
110        public P2PManagerBase_NEW(IP2PControl p2pControl, IDistributableJob distributableJob) : base(p2pControl)
111        {
112            this.distributableJobControl = distributableJob;
113
114            this.jobsWaitingForAcceptanceInfo = new Dictionary<BigInteger, PeerId>();
115            this.jobsInProgress = new Dictionary<BigInteger, PeerId>();
116        }
117
118        public void StartManager(string sTopic, long aliveMessageInterval)
119        {
120            // only when the main manager plugin is connected with a Peer-PlugIn
121            // and a IWorkerControl-PlugIn, this Manager can start its work
122            if (this.distributableJobControl != null && this.p2pControl != null)
123            {
124                this.TopicName = sTopic;
125                this.AliveMesageInterval = aliveMessageInterval;
126                base.Start(this.TopicName, this.AliveMesageInterval);
127            }
128            else
129            {
130                GuiLogging("Manager can't be started, because P2P-Peer- or Distributable-Job-PlugIn isn't connected with the Manager or the connection is broken...", NotificationLevel.Warning);
131            }
132        }
133
134        protected override void PeerCompletelyStarted()
135        {
136            base.PeerCompletelyStarted();
137
138            this.ManagerStarted = true;
139            GetProgressInformation();
140            GuiLogging("P2PManager is started right now.", NotificationLevel.Info);
141        }
142
143        public override void Stop(PubSubMessageType msgType)
144        {
145            base.Stop(msgType);
146
147            this.ManagerStarted = false;
148            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable -= peerManagement_OnFreeWorkersAvailable;
149            ((WorkersManagement)this.peerManagement).OnSubscriberRemoved -= peerManagement_OnSubscriberRemoved;
150
151            GuiLogging("P2PManager was stopped successully.", NotificationLevel.Info);
152        }
153
154        /// <summary>
155        /// because the manager needs additional peer information for all workers,
156        /// this method is overwritten. WorkersManagement throws events, when
157        /// a Worker leaves or joins the "solution network", so we can re-add or
158        /// allocate a job.
159        /// </summary>
160        /// <param name="aliveMessageInterval"></param>
161        protected override void AssignManagement(long aliveMessageInterval)
162        {
163            this.peerManagement = new WorkersManagement(aliveMessageInterval);
164            this.peerManagement.OnSubscriberRemoved +=new SubscriberManagement.SubscriberRemoved(peerManagement_OnSubscriberRemoved);
165            // waiting for new workers joining the manager or already joined worker, who were set to "free" again
166            ((WorkersManagement)this.peerManagement).OnFreeWorkersAvailable += new WorkersManagement.FreeWorkersAvailable(peerManagement_OnFreeWorkersAvailable);
167        }
168
169        /// <summary>
170        /// only accepts DistributableJob-specific messages (created, checked and transformed by
171        /// the static class JobMessages). All other message are dropped!
172        /// </summary>
173        /// <param name="sender"></param>
174        /// <param name="data"></param>
175        protected override void p2pControl_OnPayloadMessageReceived(PeerId sender, byte[] data)
176        {
177            if (!JobMessages.IsJobMessageType(data[0]))
178            {
179                GuiLogging("Received an undefined message (not a job accepted message or a job result).", NotificationLevel.Debug);
180                return;
181            } 
182            switch (JobMessages.GetMessageJobType(data[0]))
183            {
184                case MessageJobType.JobAcceptanceInfo:
185                    HandleJobAcceptanceMessage(sender, data);
186                    break;
187                case MessageJobType.JobResult:
188                    GuiLogging("Received JobResult message from Peer '" + sender.ToString() + "'. Beginning to set result now.", NotificationLevel.Debug);
189                    HandleJobResultMessage(sender, data);
190                    break;
191                case MessageJobType.Free:
192                    HandleFreeMessage(sender, data);
193                    break;
194                default:
195                    GuiLogging("Obscure Message (" + Encoding.UTF8.GetString(data) + ") received from '" + sender.ToString() + "'.", NotificationLevel.Info);
196                    break;
197            } // end switch
198            GetProgressInformation();
199        }
200
201        /// <summary>
202        /// This method is only overwritten because we have to ignore the Solution-case in
203        /// the System-Message-Handling (a Peer mustn't send a Solution message, which influences
204        /// the working status of the Manager, because it havn't the overview of the JobParts)
205        /// </summary>
206        /// <param name="sender"></param>
207        /// <param name="msgType"></param>
208        protected override void p2pControl_OnSystemMessageReceived(PeerId sender, PubSubMessageType msgType)
209        {
210            // ignore Solution case, because other worker could work on...
211            if (msgType != PubSubMessageType.Solution)
212                // base class handles all administration cases (register, alive, unregister, ping, pong, ...)
213                base.p2pControl_OnSystemMessageReceived(sender, msgType);
214        }
215
216        #region Handle different DistributableJob-specific, incoming messages
217
218        /// <summary>
219        /// Handles the two job-acceptance cases (accepted or declined). Adds accepted jobs
220        /// to the "inProgress" Dictionary, sets a busy declined worker to free (when message
221        /// is JobDeclined) and removes the job in every case from the waitingForAcceptance list
222        /// </summary>
223        /// <param name="sender"></param>
224        /// <param name="data"></param>
225        private void HandleJobAcceptanceMessage(PeerId sender, byte[] data)
226        {
227            BigInteger jobId = null;
228            if (JobMessages.GetJobAcceptanceMessage(data, out jobId))
229            {
230                this.distributableJobControl.JobAccepted(jobId);
231                lock (this.jobsInProgress)
232                {
233                    if (!this.jobsInProgress.ContainsKey(jobId))
234                    {
235                        // add to jobs in progress, because P2PJobAdmin has accepted the job!
236                        this.jobsInProgress.Add(jobId, sender);
237                        if (OnNewJobAllocated != null)
238                            OnNewJobAllocated(jobId);
239                    }
240                    //else
241                    //    throw (new Exception("Received a JobAccepted message for a already accepted JobId... JobId: " + jobId.ToString()));
242                }
243                GuiLogging("JobId '" + jobId.ToString() + "' was accepted by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
244            }
245            else // if AcceptanceInfo is declined
246            {
247                this.distributableJobControl.JobDeclined(jobId);
248
249                // set busy worker to free, because he delined the job
250
251                // TODO: maybe create a "black list" for peers, who had declined this kind of Job twice...
252                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
253                GuiLogging("JobId '" + jobId.ToString() + "' was declined by Peer '" + sender.ToString() + "'.", NotificationLevel.Info);
254            }
255            // in every case remove the job from thew waiting Dictionary
256            lock (this.jobsWaitingForAcceptanceInfo)
257            {
258                if (this.jobsWaitingForAcceptanceInfo.ContainsKey(jobId))
259                {
260                    this.jobsWaitingForAcceptanceInfo.Remove(jobId);
261                }
262                //else
263                //    throw (new Exception("Received a JobAcceptance-Message for a jobId, which isn't in the waitingForAcceptance-List... JobId: " + jobId.ToString()));
264            }
265        }
266
267        /// <summary>
268        /// Sets the incoming result in the DistributableJob class, removes the job from
269        /// the JobsInProgress Dictionary and throws the OnResultReceivedEvent
270        /// </summary>
271        /// <param name="sender"></param>
272        /// <param name="data"></param>
273        private void HandleJobResultMessage(PeerId sender, byte[] data)
274        {
275            BigInteger jobId;
276
277            byte[] serializedJobResult = JobMessages.GetJobResult(data, out jobId);
278            TimeSpan jobProcessingTime = this.distributableJobControl.SetResult(jobId, serializedJobResult);
279
280            if (OnResultReceived != null)
281                OnResultReceived(jobId);
282
283            GuiLogging("JobResult for Job '" + jobId.ToString() + "' received. Processing Time: "
284                + jobProcessingTime.TotalMinutes.ToString() + " minutes. Worker-Id: '" + sender.ToString() + "'.", NotificationLevel.Info);
285
286            lock (this.jobsInProgress)
287            {
288                if (this.jobsInProgress.ContainsKey(jobId))
289                    this.jobsInProgress.Remove(jobId);
290                //dirty workaround because P2PJobAdmin sends the result msg twice...
291                //else
292                //    throw (new Exception("Received a valid job result, which wasn't allocated before!!!"));
293            }
294        }
295
296        /// <summary>
297        /// If message content declares the sender as a free worker,
298        /// set this worker from busy to free, otherwise do nothing
299        /// </summary>
300        /// <param name="sender"></param>
301        /// <param name="data"></param>
302        private void HandleFreeMessage(PeerId sender, byte[] data)
303        {
304            // only handle the "true"-case, because otherwise there is nothing to do
305            if (JobMessages.GetFreeWorkerStatusMessage(data))
306            {
307                GuiLogging("Received a 'free'-message from Peer '" + sender.ToString() + "'.", NotificationLevel.Debug);
308                // only if worker already exists in the "busy list", it will set to free and event will be thrown
309                ((WorkersManagement)this.peerManagement).SetBusyWorkerToFree(sender);
310            }
311        }
312
313        #endregion
314
315        #region Worker-action-handling
316
317        /// <summary>
318        /// every time when new workers are available, continue distribution of Jobs (if any JobParts left)
319        /// </summary>
320        private void peerManagement_OnFreeWorkersAvailable()
321        {
322            if (!this.ManagerStarted)
323            {
324                GuiLogging("Manager isn't started at present, so I can't disperse the patterns.", NotificationLevel.Error);
325                bool removeSettings = DHT_CommonManagement.DeleteAllPublishersEntries(ref this.p2pControl, this.TopicName);
326                if (removeSettings)
327                    GuiLogging("Manager is stopped, but DHT entries were still existing, so they were deleted!", NotificationLevel.Info);
328                else
329                    throw (new Exception("Critical error in P2PManager. Manager isn't started yet, but the workers can register... Even removing DHT entries weren't possible..."));
330            }
331            else
332                AllocateJobs();
333
334            GetProgressInformation();
335        }
336
337        /// <summary>
338        /// When a Worker leaves the network, its (maybe) allocated JobParts have to
339        /// be pushed back to the main Jobstack
340        /// </summary>
341        /// <param name="peerId"></param>
342        private void peerManagement_OnSubscriberRemoved(PeerId peerId)
343        {
344            GuiLogging("REMOVED worker " + peerId, NotificationLevel.Info);
345
346            // push job back and remove list entries for "jobs in progress"
347            List<BigInteger> allJobsForRemovedPeer = (from k in this.jobsInProgress where k.Value == peerId select k.Key).ToList<BigInteger>();
348
349            BigInteger jobId;
350            for (int i = 0; i < allJobsForRemovedPeer.Count; i++)
351            {
352                jobId = allJobsForRemovedPeer[i];
353                this.distributableJobControl.Push(jobId);
354                this.jobsInProgress.Remove(jobId);
355                GuiLogging("Pushed job '" + jobId.ToString() + "' back to the stack, because peer left the network.", NotificationLevel.Debug);
356            }
357
358            // Set the JobDeclined-status for all jobs of the removed peer, which are still waiting
359            // for an acceptance information. Than remove all jobs from the "jobs waiting for acceptance info" List
360            List<BigInteger> allWaitingEntriesForRemovedPeer = (from k in this.jobsWaitingForAcceptanceInfo where k.Value == peerId select k.Key).ToList<BigInteger>();
361
362            for (int i = 0; i < allWaitingEntriesForRemovedPeer.Count; i++)
363            {
364                this.distributableJobControl.JobDeclined(allWaitingEntriesForRemovedPeer[i]);
365                this.jobsWaitingForAcceptanceInfo.Remove(allWaitingEntriesForRemovedPeer[i]);
366                GuiLogging("Declined job '" + allWaitingEntriesForRemovedPeer[i].ToString() + "', because peer left the network.", NotificationLevel.Debug);
367            }
368
369            GetProgressInformation();
370        }
371       
372        /// <summary>
373        /// Allocates new JobParts to new registered or calling-for-jobs Workers.
374        /// Additionally it adds the allocated job to a waitingForAcceptance Dictionary,
375        /// so it can be checked, if the Worker respond to the Job-allocation
376        /// </summary>
377        private void AllocateJobs()
378        {
379            int i = 0;
380            BigInteger temp_jobId = null;
381            List<PeerId> freePeers = ((WorkersManagement)this.peerManagement).GetAllSubscribers();
382
383            GuiLogging("Trying to allocate " + freePeers.Count + " job(s) to workers.", NotificationLevel.Debug);
384
385            foreach (PeerId worker in freePeers)
386            {
387                byte[] serializedNewJob = this.distributableJobControl.Pop(out temp_jobId);
388                if (serializedNewJob != null) // if this is null, there are no more JobParts on the main stack!
389                {
390                    this.jobsWaitingForAcceptanceInfo.Add(temp_jobId, worker);
391                    // get actual subscriber/worker and send the new job
392                    base.p2pControl.SendToPeer(JobMessages.CreateJobPartMessage(temp_jobId, serializedNewJob), worker);
393
394                    if (OnNewJobAllocated != null)
395                        OnNewJobAllocated(temp_jobId);
396
397                    // set free worker to busy in the peerManagement class
398                    ((WorkersManagement)this.peerManagement).SetFreeWorkerToBusy(worker);
399
400                    GuiLogging("Job '" + temp_jobId.ToString() + "' were sent to worker id '" + worker.ToString() + "'", NotificationLevel.Info);
401                    i++;
402                }
403                else
404                {
405                    GuiLogging("No more jobs left. So wait for the last results, than close this task.", NotificationLevel.Debug);
406                    if (OnNoMoreJobsLeft != null)
407                        OnNoMoreJobsLeft();
408                }
409                GuiLogging(i + " Job(s) allocated to worker(s).", NotificationLevel.Debug);
410            } // end foreach
411        }
412
413        #endregion
414
415        /// <summary>
416        /// returns the percental progress information of the whole job (value is between 0 and 100)
417        /// </summary>
418        /// <returns>the percental progress information of the whole job</returns>
419        private double GetProgressInformation()
420        {
421            double jobProgressInPercent;
422            double lFinishedAmount = (double)this.distributableJobControl.FinishedAmount.LongValue();
423            double lAllocatedAmount = (double)this.distributableJobControl.AllocatedAmount.LongValue();
424            double lTotalAmount = (double)this.distributableJobControl.TotalAmount.LongValue();
425
426            if (lFinishedAmount > 0 && lAllocatedAmount > 0)
427            {
428                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount) + 100 * (lFinishedAmount / lTotalAmount);
429            }
430            else if (lAllocatedAmount > 0)
431            {
432                jobProgressInPercent = 30 * (lAllocatedAmount / lTotalAmount);
433            }
434            else if (lFinishedAmount > 0)
435            {
436                jobProgressInPercent = 100 * (lFinishedAmount / lTotalAmount);
437            }
438            else
439            {
440                jobProgressInPercent = 0.0;
441            }
442
443            if (OnProcessProgress != null)
444                OnProcessProgress(jobProgressInPercent);
445
446            return jobProgressInPercent;
447        }
448
449        #region Forward PeerManagement Values
450
451        public int FreeWorkers() { return ((WorkersManagement)peerManagement).GetFreeWorkersAmount(); }
452        public int BusyWorkers() { return ((WorkersManagement)peerManagement).GetBusyWorkersAmount(); }
453
454        #endregion
455    }
456}
Note: See TracBrowser for help on using the repository browser.