source: trunk/CrypPlugins/PeerToPeerWorker_NEW/P2PJobAdminBase.cs @ 1348

Last change on this file since 1348 was 1348, checked in by arnold, 12 years ago

P2PManager/Worker: Manager sends a "no more jobs left" message to the requesting worker or after receiving the last job result to all active registered workers. After receiving this message, the workers stop their "waiting for new jobs" timer, which sends "free worker" messages to the Manager periodically.
For future use: After worker has received the "no more jobs left" message from the Manager, it should register with a new Manager to computate more jobs.

File size: 15.3 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.PluginBase;
22using Cryptool.PluginBase.Miscellaneous;
23using Cryptool.Plugins.PeerToPeer.Jobs;
24using System.Timers;
25
26/* TWO DIFFERENT STOPPING CASE:
27 * 1) When P2P-Admin is stopped, deregister WorkerControl-Events, so
28 *    the (unfinished) JobPart-Result won't be sent to the Manager
29 *    - Copy registering the WorkerControl-Events to "StartWorkerControl"
30 *    - Unregister WorkerControl-Events in "StopWorkerControl"
31 *   
32 * 2) When P2P-Admin is stopped, the WorkerControl sends the (unfinished)
33 *    JobPart-Result to the Manager (the Manager can handle this case without
34 *    any problem).
35 *    - Copy registering the WorkerControl-Events to the constructor
36 *    - Comment the unregistering of the WorkerControl-Events in the
37 *      "StopWorkerControl" method
38 */
39
40namespace Cryptool.Plugins.PeerToPeer
41{
42    public class P2PJobAdminBase : P2PSubscriberBase
43    {
44        #region Events for PlugIn-Color-Status
45
46        public delegate void StartWorking();
47        public delegate void SuccessfullyEnded();
48        public delegate void CanceledWorking();
49        public delegate void WorkerStopped();
50
51        public event StartWorking OnStartWorking;
52        public event SuccessfullyEnded OnSuccessfullyEnded;
53        public event CanceledWorking OnCanceledWorking;
54        public event WorkerStopped OnWorkerStopped;
55
56        #endregion
57
58        private IControlWorker workerControl;
59        private static int WAITING_FOR_JOBS = 5000;
60
61        /// <summary>
62        /// if worker sends a "free" msg to the manager, but it doesn't react on this,
63        /// although there are some jobs to distribute...
64        /// </summary>
65        Timer timerWaitingForJobs = new Timer(WAITING_FOR_JOBS);
66
67        /// <summary>
68        /// if more than one job arrived at the same time, buffer it in this dictionary
69        /// (can't happen under normal circumstances, but when it happens, we are prepared)
70        /// </summary>
71        Stack<byte[]> waitingJobStack;
72        private bool isWorking = false;
73        public bool IsWorking
74        {
75            get { return this.isWorking; }
76            private set { this.isWorking = value; }
77        }
78
79        private bool actualMngrHasNoMoreJobsLeft = false;
80        public bool ActualMngrHasNoMoreJobsLeft
81        {
82            get { return this.actualMngrHasNoMoreJobsLeft; }
83            private set{ this.actualMngrHasNoMoreJobsLeft = value;} 
84        }
85
86        ///// <summary>
87        ///// everytime, when start processing a new job, set
88        ///// its jobId in this variable, so processing the
89        ///// same job several times can be avoided
90        ///// </summary>
91        //private BigInteger actualJobId = null;
92
93        #region Constructor and Event Handling
94
95        public P2PJobAdminBase(IP2PControl p2pControl, IControlWorker controlWorker) : base(p2pControl)
96        {
97            this.timerWaitingForJobs.Elapsed += new ElapsedEventHandler(timerWaitingForJobs_Elapsed);
98            this.workerControl = controlWorker;
99
100            // see comment above, to know why the following lines are commented
101            //this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
102            //this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
103            //this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
104
105            this.waitingJobStack = new Stack<byte[]>();
106        }
107
108        void workerControl_OnInfoTextReceived(string sText, NotificationLevel notLevel)
109        {
110            base.GuiLogging(sText, notLevel);
111        }
112
113        public void StartWorkerControl(string sTopicName, long lCheckPublishersAvailability, long lPublishersReplyTimespan)
114        {
115            if (!base.Started)
116            {
117                // see comment above, to know why the following lines are uncommented
118                this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
119                this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
120                this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
121
122                // starts subscriber
123                base.Start(sTopicName, lCheckPublishersAvailability, lPublishersReplyTimespan);
124            }
125            else
126                base.GuiLogging("P2PJobAdmin is already started.", NotificationLevel.Info);
127        }
128
129        public void StopWorkerControl(PubSubMessageType msgType)
130        {
131            this.IsWorking = false;
132
133            if (base.Started)
134            {
135                // see comment above, to know why the following lines are uncommented
136                this.workerControl.OnProcessingCanceled -= workerControl_OnProcessingCanceled;
137                this.workerControl.OnProcessingSuccessfullyEnded -= workerControl_OnProcessingSuccessfullyEnded;
138                this.workerControl.OnInfoTextReceived -= workerControl_OnInfoTextReceived;
139
140                this.timerWaitingForJobs.Stop();
141
142                base.Stop(msgType);
143
144                // delete the waiting Job List, so after re-registering, this worker
145                // will process the new incoming jobs and not old jobs, which were
146                // already pushed to the global Job List of the Manager after receiving
147                // the unregister message from this worker.
148                if(this.waitingJobStack != null && this.waitingJobStack.Count > 0)
149                    this.waitingJobStack.Clear();
150
151                if (this.workerControl != null)
152                    this.workerControl.StopProcessing();
153                GuiLogging("P2P-Job-Admin is successfully stopped (Unregistering with Manager, Processing of the Worker is stopped)",NotificationLevel.Info);
154            }
155            else
156                GuiLogging("P2P-Job-Admin isn't started yet. So stopping-events won't be executed.", NotificationLevel.Info);
157        }
158
159        void P2PWorker_OnReceivedStopMessageFromPublisher(PubSubMessageType stopType, string sData)
160        {
161            switch (stopType)
162            {
163                case PubSubMessageType.Stop:
164                case PubSubMessageType.Unregister:
165                    if (OnWorkerStopped != null)
166                        OnWorkerStopped();
167                    break;
168                case PubSubMessageType.Solution:
169                    if (OnSuccessfullyEnded != null)
170                        OnSuccessfullyEnded();
171                    break;
172                default:
173                    break;
174            }
175        }
176
177        #endregion
178
179        #region THE RELEVANT PART
180
181        // this method only processes Payload Data, Internal-organisation Data will be handled internally!
182        protected override void HandleIncomingData(PeerId senderId, byte[] data)
183        {
184            if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.JobPart)
185            {
186                //added by Arnold 2010.03.22
187                this.timerWaitingForJobs.Stop(); //when receiving a new job, time can be stopped
188
189                BigInteger jobId = null;
190                GuiLogging("Received a JobPart from '" + senderId.ToString() + "'", NotificationLevel.Debug);
191                byte[] serializedRawJobPartData = JobMessages.GetJobPartMessage(data, out jobId);
192                StartProcessing(senderId, serializedRawJobPartData);
193            }
194            else if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.NoMoreJobsLeft)
195            {
196                this.ActualMngrHasNoMoreJobsLeft = true;
197                this.timerWaitingForJobs.Stop();
198                GuiLogging("Received 'no more jobs left' message from the Manager. Stopped waiting for jobs timer.", NotificationLevel.Debug);
199                // TODO: for future use maybe register to another task, because this task has no more
200                //       jobs to computate --> after registering with a new Manager, set this.ActualMngrHasNoMoreJobsLeft to true!!!
201            }
202            else
203            {
204                GuiLogging("Received some strange data (no JobPart) from peer '" + senderId.ToString()
205                    + "'. Data: " + Encoding.UTF8.GetString(data), NotificationLevel.Debug);
206            }
207        }
208
209        /// <summary>
210        /// Starts processing the incoming JobPart, when the waiting Stack has no elements
211        /// (it will be priorized).
212        /// </summary>
213        /// <param name="senderId">Sender Id</param>
214        /// <param name="data">already DistributableJob-"unpacked" serialized JobPart data</param>
215        private void StartProcessing(PeerId senderId, byte[] data)
216        {
217            if (this.workerControl == null) // eventually check if sender is the actual publisher, too...
218            {
219                GuiLogging("Processing a new job isn't possible, because IWorkerControl isn't initialized yet.", NotificationLevel.Info);
220                return;
221            }
222
223            BigInteger jobId;           
224
225            if (this.IsWorking) // if it's still working, add Job to a waiting stack
226            {               
227                this.waitingJobStack.Push(data);
228                GuiLogging("New incoming job will be pushed to the 'waitingJobStack', because the Worker is still processing a job.", NotificationLevel.Debug);
229            }
230            else
231            {
232                this.IsWorking = true;
233                bool result = this.workerControl.StartProcessing(data, out jobId);
234
235                // visualize Working-Status in PlugIn, if it has registered with this event
236                if(result)
237                {
238                    if (OnStartWorking != null)
239                        OnStartWorking();
240                }
241
242                byte[] jobAcceptanceStatus = JobMessages.CreateJobAcceptanceMessage(jobId, result);
243                this.p2pControl.SendToPeer(jobAcceptanceStatus, senderId);
244
245                this.IsWorking = result;
246
247                GuiLogging("Processing started? " + result + ". JobId: " + jobId.ToString(), NotificationLevel.Debug);
248            }
249        }
250
251        /// <summary>
252        /// When IWorkerControl throws this event, processing is (successfully) ended. So send the
253        /// Result to the Manager and decide whether to process a "stacked" job or ask for new Jobs
254        /// </summary>
255        /// <param name="jobId">JobId of the (successfully) ended Result</param>
256        /// <param name="result">serialized Result data</param>
257        private void workerControl_OnProcessingSuccessfullyEnded(BigInteger jobId, byte[] result)
258        {
259            //GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
260            GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
261            this.p2pControl.SendToPeer(JobMessages.CreateJobResultMessage(jobId, result), base.ActualPublisher);
262
263            // set working flag to false, so processing a new job is possible
264            this.IsWorking = false;
265            // visualizes the Working-Status, if PlugIn registered with this event
266            if (OnSuccessfullyEnded != null)
267                OnSuccessfullyEnded();
268
269            CheckIfAnyJobsLeft();   
270        }
271
272        private void CheckIfAnyJobsLeft()
273        {
274            if (this.waitingJobStack.Count > 0)
275            {
276                GuiLogging("There's a job in the 'waitingJob'-Stack, so process it before processing new incoming jobs from the Manager.", NotificationLevel.Info);
277                StartProcessing(base.ActualPublisher, this.waitingJobStack.Pop());
278            }
279            else
280            {
281                if (!this.ActualMngrHasNoMoreJobsLeft)
282                {
283                    // no more jobs in the waiting stack, so send Mngr the information, that Worker is waiting for new jobs now
284                    this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
285                    GuiLogging("No jobs in the 'waitingJob'-Stack, so send 'free'-information to the Manager. Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
286                    // If this timer elapses, it will check if the isWorking flag is true. Than it will stop the timer.
287                    // Otherwise it will send a new free msg to the Manager, if the last free msg got lost
288                    this.timerWaitingForJobs.Start();
289                }
290                else
291                {
292                    GuiLogging("Worker has noticed that Manager has no more jobs left.", NotificationLevel.Debug);
293                    // TODO: for future use maybe register to another task, because this task has no more
294                    //       jobs to computate --> after registering with a new Manager, set this.ActualMngrHasNoMoreJobsLeft to true!!!
295                }
296            }
297        }
298
299        // Added by Arnold - 2010.03.22
300        /// <summary>
301        /// this method is only necessary when the regular free message got lost on the way to the manager.
302        /// In this case the Worker won't get any more jobs, so it sends a free message
303        /// </summary>
304        /// <param name="sender"></param>
305        /// <param name="e"></param>
306        void timerWaitingForJobs_Elapsed(object sender, ElapsedEventArgs e)
307        {
308            if (!isWorking)
309            {
310                this.timerWaitingForJobs.Interval += 2000; // every time this event is thrown, heighten the timer interval
311                base.SendRegMsg();
312                this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
313                GuiLogging("Because the last 'free worker'-Message got lost, try again.", NotificationLevel.Info);
314            }
315            else
316            {
317                // reset timer interval - it could be heighten in the if brace...
318                this.timerWaitingForJobs.Interval = WAITING_FOR_JOBS;
319                // when Worker is working, than the time can be stopped
320                this.timerWaitingForJobs.Stop();
321            }
322        }
323
324        void workerControl_OnProcessingCanceled(byte[] result)
325        {
326            if (OnCanceledWorking != null)
327                OnCanceledWorking();
328            CheckIfAnyJobsLeft();
329        }
330
331        #endregion
332    }
333}
Note: See TracBrowser for help on using the repository browser.