source: trunk/CrypPlugins/PeerToPeerWorker_NEW/P2PJobAdminBase.cs @ 1264

Last change on this file since 1264 was 1264, checked in by arnold, 12 years ago

Little changes in P2PManager and P2PWorker

File size: 13.9 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.PluginBase;
22using Cryptool.PluginBase.Miscellaneous;
23using Cryptool.Plugins.PeerToPeer.Jobs;
24using System.Timers;
25
26/* TWO DIFFERENT STOPPING CASE:
27 * 1) When P2P-Admin is stopped, deregister WorkerControl-Events, so
28 *    the (unfinished) JobPart-Result won't be sent to the Manager
29 *    - Copy registering the WorkerControl-Events to "StartWorkerControl"
30 *    - Unregister WorkerControl-Events in "StopWorkerControl"
31 *   
32 * 2) When P2P-Admin is stopped, the WorkerControl sends the (unfinished)
33 *    JobPart-Result to the Manager (the Manager can handle this case without
34 *    any problem).
35 *    - Copy registering the WorkerControl-Events to the constructor
36 *    - Comment the unregistering of the WorkerControl-Events in the
37 *      "StopWorkerControl" method
38 */
39
40namespace Cryptool.Plugins.PeerToPeer
41{
42    public class P2PJobAdminBase : P2PSubscriberBase
43    {
44        #region Events for PlugIn-Color-Status
45
46        public delegate void StartWorking();
47        public delegate void SuccessfullyEnded();
48        public delegate void CanceledWorking();
49        public delegate void WorkerStopped();
50
51        public event StartWorking OnStartWorking;
52        public event SuccessfullyEnded OnSuccessfullyEnded;
53        public event CanceledWorking OnCanceledWorking;
54        public event WorkerStopped OnWorkerStopped;
55
56        #endregion
57
58        private IControlWorker workerControl;
59        private static int WAITING_FOR_JOBS = 5000;
60
61        /// <summary>
62        /// if worker sends a "free" msg to the manager, but it doesn't react on this,
63        /// although there are some jobs to distribute...
64        /// </summary>
65        Timer timerWaitingForJobs = new Timer(WAITING_FOR_JOBS);
66
67        /// <summary>
68        /// if more than one job arrived at the same time, buffer it in this dictionary
69        /// (can't happen under normal circumstances, but when it happens, we are prepared)
70        /// </summary>
71        Stack<byte[]> waitingJobStack;
72        private bool isWorking = false;
73        public bool IsWorking
74        {
75            get { return this.isWorking; }
76            private set { this.isWorking = value; }
77        }
78        ///// <summary>
79        ///// everytime, when start processing a new job, set
80        ///// its jobId in this variable, so processing the
81        ///// same job several times can be avoided
82        ///// </summary>
83        //private BigInteger actualJobId = null;
84
85        #region Constructor and Event Handling
86
87        public P2PJobAdminBase(IP2PControl p2pControl, IControlWorker controlWorker) : base(p2pControl)
88        {
89            this.timerWaitingForJobs.Elapsed += new ElapsedEventHandler(timerWaitingForJobs_Elapsed);
90            this.workerControl = controlWorker;
91
92            // see comment above, to know why the following lines are commented
93            //this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
94            //this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
95            //this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
96
97            this.waitingJobStack = new Stack<byte[]>();
98        }
99
100        void workerControl_OnInfoTextReceived(string sText, NotificationLevel notLevel)
101        {
102            base.GuiLogging(sText, notLevel);
103        }
104
105        public void StartWorkerControl(string sTopicName, long lCheckPublishersAvailability, long lPublishersReplyTimespan)
106        {
107            if (!base.Started)
108            {
109                // see comment above, to know why the following lines are uncommented
110                this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
111                this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
112                this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
113
114                // starts subscriber
115                base.Start(sTopicName, lCheckPublishersAvailability, lPublishersReplyTimespan);
116            }
117            else
118                base.GuiLogging("P2PJobAdmin is already started.", NotificationLevel.Info);
119        }
120
121        public void StopWorkerControl(PubSubMessageType msgType)
122        {
123            this.IsWorking = false;
124
125            if (base.Started)
126            {
127                // see comment above, to know why the following lines are uncommented
128                this.workerControl.OnProcessingCanceled -= workerControl_OnProcessingCanceled;
129                this.workerControl.OnProcessingSuccessfullyEnded -= workerControl_OnProcessingSuccessfullyEnded;
130                this.workerControl.OnInfoTextReceived -= workerControl_OnInfoTextReceived;
131
132                this.timerWaitingForJobs.Stop();
133
134                base.Stop(msgType);
135
136                // delete the waiting Job List, so after re-registering, this worker
137                // will process the new incoming jobs and not old jobs, which were
138                // already pushed to the global Job List of the Manager after receiving
139                // the unregister message from this worker.
140                if(this.waitingJobStack != null && this.waitingJobStack.Count > 0)
141                    this.waitingJobStack.Clear();
142
143                if (this.workerControl != null)
144                    this.workerControl.StopProcessing();
145                GuiLogging("P2P-Job-Admin is successfully stopped (Unregistering with Manager, Processing of the Worker is stopped)",NotificationLevel.Info);
146            }
147            else
148                GuiLogging("P2P-Job-Admin isn't started yet. So stopping-events won't be executed.", NotificationLevel.Info);
149        }
150
151        void P2PWorker_OnReceivedStopMessageFromPublisher(PubSubMessageType stopType, string sData)
152        {
153            switch (stopType)
154            {
155                case PubSubMessageType.Stop:
156                case PubSubMessageType.Unregister:
157                    if (OnWorkerStopped != null)
158                        OnWorkerStopped();
159                    break;
160                case PubSubMessageType.Solution:
161                    if (OnSuccessfullyEnded != null)
162                        OnSuccessfullyEnded();
163                    break;
164                default:
165                    break;
166            }
167        }
168
169        #endregion
170
171        #region THE RELEVANT PART
172
173        // this method only processes Payload Data, Internal-organisation Data will be handled internally!
174        protected override void HandleIncomingData(PeerId senderId, byte[] data)
175        {
176            if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.JobPart)
177            {
178                //added by Arnold 2010.03.22
179                this.timerWaitingForJobs.Stop(); //when receiving a new job, time can be stopped
180
181                BigInteger jobId = null;
182                GuiLogging("Received a JobPart from '" + senderId.ToString() + "'", NotificationLevel.Debug);
183                byte[] serializedRawJobPartData = JobMessages.GetJobPartMessage(data, out jobId);
184                StartProcessing(senderId, serializedRawJobPartData);
185            }
186            else
187            {
188                GuiLogging("Received some strange data (no JobPart) from peer '" + senderId.ToString() 
189                    + "'. Data: " + Encoding.UTF8.GetString(data), NotificationLevel.Debug);
190            }
191        }
192
193        /// <summary>
194        /// Starts processing the incoming JobPart, when the waiting Stack has no elements
195        /// (it will be priorized).
196        /// </summary>
197        /// <param name="senderId">Sender Id</param>
198        /// <param name="data">already DistributableJob-"unpacked" serialized JobPart data</param>
199        private void StartProcessing(PeerId senderId, byte[] data)
200        {
201            if (this.workerControl == null) // eventually check if sender is the actual publisher, too...
202            {
203                GuiLogging("Processing a new job isn't possible, because IWorkerControl isn't initialized yet.", NotificationLevel.Info);
204                return;
205            }
206
207            BigInteger jobId;           
208
209            if (this.IsWorking) // if it's still working, add Job to a waiting stack
210            {               
211                this.waitingJobStack.Push(data);
212                GuiLogging("New incoming job will be pushed to the 'waitingJobStack', because the Worker is still processing a job.", NotificationLevel.Debug);
213            }
214            else
215            {
216                this.IsWorking = true;
217                bool result = this.workerControl.StartProcessing(data, out jobId);
218
219                // visualize Working-Status in PlugIn, if it has registered with this event
220                if(result)
221                {
222                    if (OnStartWorking != null)
223                        OnStartWorking();
224                }
225
226                byte[] jobAcceptanceStatus = JobMessages.CreateJobAcceptanceMessage(jobId, result);
227                this.p2pControl.SendToPeer(jobAcceptanceStatus, senderId);
228
229                this.IsWorking = result;
230
231                GuiLogging("Processing started? " + result + ". JobId: " + jobId.ToString(), NotificationLevel.Debug);
232            }
233        }
234
235        /// <summary>
236        /// When IWorkerControl throws this event, processing is (successfully) ended. So send the
237        /// Result to the Manager and decide whether to process a "stacked" job or ask for new Jobs
238        /// </summary>
239        /// <param name="jobId">JobId of the (successfully) ended Result</param>
240        /// <param name="result">serialized Result data</param>
241        private void workerControl_OnProcessingSuccessfullyEnded(BigInteger jobId, byte[] result)
242        {
243            //GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
244            GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
245            this.p2pControl.SendToPeer(JobMessages.CreateJobResultMessage(jobId, result), base.ActualPublisher);
246
247            // set working flag to false, so processing a new job is possible
248            this.IsWorking = false;
249            // visualizes the Working-Status, if PlugIn registered with this event
250            if (OnSuccessfullyEnded != null)
251                OnSuccessfullyEnded();
252
253            CheckIfAnyJobsLeft();   
254        }
255
256        private void CheckIfAnyJobsLeft()
257        {
258            if (this.waitingJobStack.Count > 0)
259            {
260                GuiLogging("There's a job in the 'waitingJob'-Stack, so process it before processing new incoming jobs from the Manager.", NotificationLevel.Info);
261                StartProcessing(base.ActualPublisher, this.waitingJobStack.Pop());
262            }
263            else
264            {
265                // no more jobs in the waiting stack, so send Mngr the information, that Worker is waiting for new jobs now
266                this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
267                GuiLogging("No jobs in the 'waitingJob'-Stack, so send 'free'-information to the Manager. Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
268                // If this timer elapses, it will check if the isWorking flag is true. Than it will stop the timer.
269                // Otherwise it will send a new free msg to the Manager, if the last free msg got lost
270                this.timerWaitingForJobs.Start();
271            }
272        }
273
274        // Added by Arnold - 2010.03.22
275        /// <summary>
276        /// this method is only necessary when the regular free message got lost on the way to the manager.
277        /// In this case the Worker won't get any more jobs, so it sends a free message
278        /// </summary>
279        /// <param name="sender"></param>
280        /// <param name="e"></param>
281        void timerWaitingForJobs_Elapsed(object sender, ElapsedEventArgs e)
282        {
283            if (!isWorking)
284            {
285                this.timerWaitingForJobs.Interval += 2000; // every time this event is thrown, heighten the timer interval
286                base.SendRegMsg();
287                this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
288                GuiLogging("Because the last 'free worker'-Message got lost, try again.", NotificationLevel.Info);
289            }
290            else
291            {
292                // reset timer interval - it could be heighten in the if brace...
293                this.timerWaitingForJobs.Interval = WAITING_FOR_JOBS;
294                // when Worker is working, than the time can be stopped
295                this.timerWaitingForJobs.Stop();
296            }
297        }
298
299        void workerControl_OnProcessingCanceled(byte[] result)
300        {
301            if (OnCanceledWorking != null)
302                OnCanceledWorking();
303            CheckIfAnyJobsLeft();
304        }
305
306        #endregion
307    }
308}
Note: See TracBrowser for help on using the repository browser.