source: trunk/CrypPlugins/PeerToPeerWorker_NEW/P2PJobAdminBase.cs @ 1433

Last change on this file since 1433 was 1433, checked in by Paul Lelgemann, 12 years ago

o Extracted common classes from PeerToPeerBase plugin into new PeerToPeer plugin as a preparation for the new P2P proxy
o Modified directory properties to ignore the CrypBuild directory

File size: 15.4 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.PluginBase;
22using Cryptool.PluginBase.Miscellaneous;
23using Cryptool.Plugins.PeerToPeer.Jobs;
24using System.Timers;
25using Cryptool.Plugins.PeerToPeer.Internal;
26
27/* TWO DIFFERENT STOPPING CASE:
28 * 1) When P2P-Admin is stopped, deregister WorkerControl-Events, so
29 *    the (unfinished) JobPart-Result won't be sent to the Manager
30 *    - Copy registering the WorkerControl-Events to "StartWorkerControl"
31 *    - Unregister WorkerControl-Events in "StopWorkerControl"
32 *   
33 * 2) When P2P-Admin is stopped, the WorkerControl sends the (unfinished)
34 *    JobPart-Result to the Manager (the Manager can handle this case without
35 *    any problem).
36 *    - Copy registering the WorkerControl-Events to the constructor
37 *    - Comment the unregistering of the WorkerControl-Events in the
38 *      "StopWorkerControl" method
39 */
40
41namespace Cryptool.Plugins.PeerToPeer
42{
43    public class P2PJobAdminBase : P2PSubscriberBase
44    {
45        #region Events for PlugIn-Color-Status
46
47        public delegate void StartWorking();
48        public delegate void SuccessfullyEnded();
49        public delegate void CanceledWorking();
50        public delegate void WorkerStopped();
51
52        public event StartWorking OnStartWorking;
53        public event SuccessfullyEnded OnSuccessfullyEnded;
54        public event CanceledWorking OnCanceledWorking;
55        public event WorkerStopped OnWorkerStopped;
56
57        #endregion
58
59        private IControlWorker workerControl;
60        private static int WAITING_FOR_JOBS = 5000;
61
62        /// <summary>
63        /// if worker sends a "free" msg to the manager, but it doesn't react on this,
64        /// although there are some jobs to distribute...
65        /// </summary>
66        Timer timerWaitingForJobs = new Timer(WAITING_FOR_JOBS);
67
68        /// <summary>
69        /// if more than one job arrived at the same time, buffer it in this dictionary
70        /// (can't happen under normal circumstances, but when it happens, we are prepared)
71        /// </summary>
72        Stack<byte[]> waitingJobStack;
73        private bool isWorking = false;
74        public bool IsWorking
75        {
76            get { return this.isWorking; }
77            private set { this.isWorking = value; }
78        }
79
80        // added by Arnie 2010.04.24
81        private bool actualMngrHasNoMoreJobsLeft = false;
82        public bool ActualMngrHasNoMoreJobsLeft
83        {
84            get { return this.actualMngrHasNoMoreJobsLeft; }
85            private set{ this.actualMngrHasNoMoreJobsLeft = value;} 
86        }
87
88        ///// <summary>
89        ///// everytime, when start processing a new job, set
90        ///// its jobId in this variable, so processing the
91        ///// same job several times can be avoided
92        ///// </summary>
93        //private BigInteger actualJobId = null;
94
95        #region Constructor and Event Handling
96
97        public P2PJobAdminBase(IP2PControl p2pControl, IControlWorker controlWorker) : base(p2pControl)
98        {
99            this.timerWaitingForJobs.Elapsed += new ElapsedEventHandler(timerWaitingForJobs_Elapsed);
100            this.workerControl = controlWorker;
101
102            // see comment above, to know why the following lines are commented
103            //this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
104            //this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
105            //this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
106
107            this.waitingJobStack = new Stack<byte[]>();
108        }
109
110        void workerControl_OnInfoTextReceived(string sText, NotificationLevel notLevel)
111        {
112            base.GuiLogging(sText, notLevel);
113        }
114
115        public void StartWorkerControl(string sTopicName, long lCheckPublishersAvailability, long lPublishersReplyTimespan)
116        {
117            if (!base.Started)
118            {
119                // see comment above, to know why the following lines are uncommented
120                this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
121                this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
122                this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
123
124                // starts subscriber
125                base.Start(sTopicName, lCheckPublishersAvailability, lPublishersReplyTimespan);
126            }
127            else
128                base.GuiLogging("P2PJobAdmin is already started.", NotificationLevel.Info);
129        }
130
131        public void StopWorkerControl(PubSubMessageType msgType)
132        {
133            this.IsWorking = false;
134
135            if (base.Started)
136            {
137                // see comment above, to know why the following lines are uncommented
138                this.workerControl.OnProcessingCanceled -= workerControl_OnProcessingCanceled;
139                this.workerControl.OnProcessingSuccessfullyEnded -= workerControl_OnProcessingSuccessfullyEnded;
140                this.workerControl.OnInfoTextReceived -= workerControl_OnInfoTextReceived;
141
142                this.timerWaitingForJobs.Stop();
143
144                base.Stop(msgType);
145
146                // delete the waiting Job List, so after re-registering, this worker
147                // will process the new incoming jobs and not old jobs, which were
148                // already pushed to the global Job List of the Manager after receiving
149                // the unregister message from this worker.
150                if(this.waitingJobStack != null && this.waitingJobStack.Count > 0)
151                    this.waitingJobStack.Clear();
152
153                if (this.workerControl != null)
154                    this.workerControl.StopProcessing();
155                GuiLogging("P2P-Job-Admin is successfully stopped (Unregistering with Manager, Processing of the Worker is stopped)",NotificationLevel.Info);
156            }
157            else
158                GuiLogging("P2P-Job-Admin isn't started yet. So stopping-events won't be executed.", NotificationLevel.Info);
159        }
160
161        void P2PWorker_OnReceivedStopMessageFromPublisher(PubSubMessageType stopType, string sData)
162        {
163            switch (stopType)
164            {
165                case PubSubMessageType.Stop:
166                case PubSubMessageType.Unregister:
167                    if (OnWorkerStopped != null)
168                        OnWorkerStopped();
169                    break;
170                case PubSubMessageType.Solution:
171                    if (OnSuccessfullyEnded != null)
172                        OnSuccessfullyEnded();
173                    break;
174                default:
175                    break;
176            }
177        }
178
179        #endregion
180
181        #region THE RELEVANT PART
182
183        // this method only processes Payload Data, Internal-organisation Data will be handled internally!
184        protected override void HandleIncomingData(PeerId senderId, byte[] data)
185        {
186            if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.JobPart)
187            {
188                //added by Arnold 2010.03.22
189                this.timerWaitingForJobs.Stop(); //when receiving a new job, time can be stopped
190
191                BigInteger jobId = null;
192                GuiLogging("Received a JobPart from '" + senderId.ToString() + "'", NotificationLevel.Debug);
193                byte[] serializedRawJobPartData = JobMessages.GetJobPartMessage(data, out jobId);
194                StartProcessing(senderId, serializedRawJobPartData);
195            }
196            else if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.NoMoreJobsLeft)
197            {
198                this.ActualMngrHasNoMoreJobsLeft = true;
199                this.timerWaitingForJobs.Stop();
200                GuiLogging("Received 'no more jobs left' message from the Manager. Stopped waiting for jobs timer.", NotificationLevel.Debug);
201                // TODO: for future use maybe register to another task, because this task has no more
202                //       jobs to computate --> after registering with a new Manager, set this.ActualMngrHasNoMoreJobsLeft to true!!!
203            }
204            else
205            {
206                GuiLogging("Received some strange data (no JobPart) from peer '" + senderId.ToString()
207                    + "'. Data: " + Encoding.UTF8.GetString(data), NotificationLevel.Debug);
208            }
209        }
210
211        /// <summary>
212        /// Starts processing the incoming JobPart, when the waiting Stack has no elements
213        /// (it will be priorized).
214        /// </summary>
215        /// <param name="senderId">Sender Id</param>
216        /// <param name="data">already DistributableJob-"unpacked" serialized JobPart data</param>
217        private void StartProcessing(PeerId senderId, byte[] data)
218        {
219            if (this.workerControl == null) // eventually check if sender is the actual publisher, too...
220            {
221                GuiLogging("Processing a new job isn't possible, because IWorkerControl isn't initialized yet.", NotificationLevel.Info);
222                return;
223            }
224
225            BigInteger jobId;           
226
227            if (this.IsWorking) // if it's still working, add Job to a waiting stack
228            {               
229                this.waitingJobStack.Push(data);
230                GuiLogging("New incoming job will be pushed to the 'waitingJobStack', because the Worker is still processing a job.", NotificationLevel.Debug);
231            }
232            else
233            {
234                this.IsWorking = true;
235                bool result = this.workerControl.StartProcessing(data, out jobId);
236
237                // visualize Working-Status in PlugIn, if it has registered with this event
238                if(result)
239                {
240                    if (OnStartWorking != null)
241                        OnStartWorking();
242                }
243
244                byte[] jobAcceptanceStatus = JobMessages.CreateJobAcceptanceMessage(jobId, result);
245                this.p2pControl.SendToPeer(jobAcceptanceStatus, senderId);
246
247                this.IsWorking = result;
248
249                GuiLogging("Processing started? " + result + ". JobId: " + jobId.ToString(), NotificationLevel.Debug);
250            }
251        }
252
253        /// <summary>
254        /// When IWorkerControl throws this event, processing is (successfully) ended. So send the
255        /// Result to the Manager and decide whether to process a "stacked" job or ask for new Jobs
256        /// </summary>
257        /// <param name="jobId">JobId of the (successfully) ended Result</param>
258        /// <param name="result">serialized Result data</param>
259        private void workerControl_OnProcessingSuccessfullyEnded(BigInteger jobId, byte[] result)
260        {
261            //GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
262            GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
263            this.p2pControl.SendToPeer(JobMessages.CreateJobResultMessage(jobId, result), base.ActualPublisher);
264
265            // set working flag to false, so processing a new job is possible
266            this.IsWorking = false;
267            // visualizes the Working-Status, if PlugIn registered with this event
268            if (OnSuccessfullyEnded != null)
269                OnSuccessfullyEnded();
270
271            CheckIfAnyJobsLeft();   
272        }
273
274        private void CheckIfAnyJobsLeft()
275        {
276            if (this.waitingJobStack.Count > 0)
277            {
278                GuiLogging("There's a job in the 'waitingJob'-Stack, so process it before processing new incoming jobs from the Manager.", NotificationLevel.Info);
279                StartProcessing(base.ActualPublisher, this.waitingJobStack.Pop());
280            }
281            else
282            {
283                if (!this.ActualMngrHasNoMoreJobsLeft)
284                {
285                    // no more jobs in the waiting stack, so send Mngr the information, that Worker is waiting for new jobs now
286                    this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
287                    GuiLogging("No jobs in the 'waitingJob'-Stack, so send 'free'-information to the Manager. Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
288                    // If this timer elapses, it will check if the isWorking flag is true. Than it will stop the timer.
289                    // Otherwise it will send a new free msg to the Manager, if the last free msg got lost
290                    this.timerWaitingForJobs.Start();
291                }
292                else
293                {
294                    GuiLogging("Worker has noticed that Manager has no more jobs left.", NotificationLevel.Debug);
295                    // TODO: for future use maybe register to another task, because this task has no more
296                    //       jobs to computate --> after registering with a new Manager, set this.ActualMngrHasNoMoreJobsLeft to true!!!
297                }
298            }
299        }
300
301        // Added by Arnold - 2010.03.22
302        /// <summary>
303        /// this method is only necessary when the regular free message got lost on the way to the manager.
304        /// In this case the Worker won't get any more jobs, so it sends a free message
305        /// </summary>
306        /// <param name="sender"></param>
307        /// <param name="e"></param>
308        void timerWaitingForJobs_Elapsed(object sender, ElapsedEventArgs e)
309        {
310            if (!isWorking)
311            {
312                this.timerWaitingForJobs.Interval += 2000; // every time this event is thrown, heighten the timer interval
313                base.SendRegMsg();
314                this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
315                GuiLogging("Because the last 'free worker'-Message got lost, try again.", NotificationLevel.Info);
316            }
317            else
318            {
319                // reset timer interval - it could be heighten in the if brace...
320                this.timerWaitingForJobs.Interval = WAITING_FOR_JOBS;
321                // when Worker is working, than the time can be stopped
322                this.timerWaitingForJobs.Stop();
323            }
324        }
325
326        void workerControl_OnProcessingCanceled(byte[] result)
327        {
328            if (OnCanceledWorking != null)
329                OnCanceledWorking();
330            CheckIfAnyJobsLeft();
331        }
332
333        #endregion
334    }
335}
Note: See TracBrowser for help on using the repository browser.