source: trunk/CrypPlugins/PeerToPeerWorker_NEW/P2PJobAdminBase.cs @ 1448

Last change on this file since 1448 was 1448, checked in by Sven Rech, 12 years ago

replaced all BigInteger stuff with the new BigInteger class from .net 4.0

But there are still problems with some plugins (Keysearcher, BigInteger Operations...)

File size: 15.4 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Control;
21using Cryptool.PluginBase;
22using Cryptool.PluginBase.Miscellaneous;
23using Cryptool.Plugins.PeerToPeer.Jobs;
24using System.Timers;
25using Cryptool.Plugins.PeerToPeer.Internal;
26using System.Numerics;
27
28/* TWO DIFFERENT STOPPING CASE:
29 * 1) When P2P-Admin is stopped, deregister WorkerControl-Events, so
30 *    the (unfinished) JobPart-Result won't be sent to the Manager
31 *    - Copy registering the WorkerControl-Events to "StartWorkerControl"
32 *    - Unregister WorkerControl-Events in "StopWorkerControl"
33 *   
34 * 2) When P2P-Admin is stopped, the WorkerControl sends the (unfinished)
35 *    JobPart-Result to the Manager (the Manager can handle this case without
36 *    any problem).
37 *    - Copy registering the WorkerControl-Events to the constructor
38 *    - Comment the unregistering of the WorkerControl-Events in the
39 *      "StopWorkerControl" method
40 */
41
42namespace Cryptool.Plugins.PeerToPeer
43{
44    public class P2PJobAdminBase : P2PSubscriberBase
45    {
46        #region Events for PlugIn-Color-Status
47
48        public delegate void StartWorking();
49        public delegate void SuccessfullyEnded();
50        public delegate void CanceledWorking();
51        public delegate void WorkerStopped();
52
53        public event StartWorking OnStartWorking;
54        public event SuccessfullyEnded OnSuccessfullyEnded;
55        public event CanceledWorking OnCanceledWorking;
56        public event WorkerStopped OnWorkerStopped;
57
58        #endregion
59
60        private IControlWorker workerControl;
61        private static int WAITING_FOR_JOBS = 5000;
62
63        /// <summary>
64        /// if worker sends a "free" msg to the manager, but it doesn't react on this,
65        /// although there are some jobs to distribute...
66        /// </summary>
67        Timer timerWaitingForJobs = new Timer(WAITING_FOR_JOBS);
68
69        /// <summary>
70        /// if more than one job arrived at the same time, buffer it in this dictionary
71        /// (can't happen under normal circumstances, but when it happens, we are prepared)
72        /// </summary>
73        Stack<byte[]> waitingJobStack;
74        private bool isWorking = false;
75        public bool IsWorking
76        {
77            get { return this.isWorking; }
78            private set { this.isWorking = value; }
79        }
80
81        // added by Arnie 2010.04.24
82        private bool actualMngrHasNoMoreJobsLeft = false;
83        public bool ActualMngrHasNoMoreJobsLeft
84        {
85            get { return this.actualMngrHasNoMoreJobsLeft; }
86            private set{ this.actualMngrHasNoMoreJobsLeft = value;} 
87        }
88
89        ///// <summary>
90        ///// everytime, when start processing a new job, set
91        ///// its jobId in this variable, so processing the
92        ///// same job several times can be avoided
93        ///// </summary>
94        //private BigInteger actualJobId = null;
95
96        #region Constructor and Event Handling
97
98        public P2PJobAdminBase(IP2PControl p2pControl, IControlWorker controlWorker) : base(p2pControl)
99        {
100            this.timerWaitingForJobs.Elapsed += new ElapsedEventHandler(timerWaitingForJobs_Elapsed);
101            this.workerControl = controlWorker;
102
103            // see comment above, to know why the following lines are commented
104            //this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
105            //this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
106            //this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
107
108            this.waitingJobStack = new Stack<byte[]>();
109        }
110
111        void workerControl_OnInfoTextReceived(string sText, NotificationLevel notLevel)
112        {
113            base.GuiLogging(sText, notLevel);
114        }
115
116        public void StartWorkerControl(string sTopicName, long lCheckPublishersAvailability, long lPublishersReplyTimespan)
117        {
118            if (!base.Started)
119            {
120                // see comment above, to know why the following lines are uncommented
121                this.workerControl.OnProcessingCanceled += new ProcessingCanceled(workerControl_OnProcessingCanceled);
122                this.workerControl.OnProcessingSuccessfullyEnded += new ProcessingSuccessfullyEnded(workerControl_OnProcessingSuccessfullyEnded);
123                this.workerControl.OnInfoTextReceived += new InfoText(workerControl_OnInfoTextReceived);
124
125                // starts subscriber
126                base.Start(sTopicName, lCheckPublishersAvailability, lPublishersReplyTimespan);
127            }
128            else
129                base.GuiLogging("P2PJobAdmin is already started.", NotificationLevel.Info);
130        }
131
132        public void StopWorkerControl(PubSubMessageType msgType)
133        {
134            this.IsWorking = false;
135
136            if (base.Started)
137            {
138                // see comment above, to know why the following lines are uncommented
139                this.workerControl.OnProcessingCanceled -= workerControl_OnProcessingCanceled;
140                this.workerControl.OnProcessingSuccessfullyEnded -= workerControl_OnProcessingSuccessfullyEnded;
141                this.workerControl.OnInfoTextReceived -= workerControl_OnInfoTextReceived;
142
143                this.timerWaitingForJobs.Stop();
144
145                base.Stop(msgType);
146
147                // delete the waiting Job List, so after re-registering, this worker
148                // will process the new incoming jobs and not old jobs, which were
149                // already pushed to the global Job List of the Manager after receiving
150                // the unregister message from this worker.
151                if(this.waitingJobStack != null && this.waitingJobStack.Count > 0)
152                    this.waitingJobStack.Clear();
153
154                if (this.workerControl != null)
155                    this.workerControl.StopProcessing();
156                GuiLogging("P2P-Job-Admin is successfully stopped (Unregistering with Manager, Processing of the Worker is stopped)",NotificationLevel.Info);
157            }
158            else
159                GuiLogging("P2P-Job-Admin isn't started yet. So stopping-events won't be executed.", NotificationLevel.Info);
160        }
161
162        void P2PWorker_OnReceivedStopMessageFromPublisher(PubSubMessageType stopType, string sData)
163        {
164            switch (stopType)
165            {
166                case PubSubMessageType.Stop:
167                case PubSubMessageType.Unregister:
168                    if (OnWorkerStopped != null)
169                        OnWorkerStopped();
170                    break;
171                case PubSubMessageType.Solution:
172                    if (OnSuccessfullyEnded != null)
173                        OnSuccessfullyEnded();
174                    break;
175                default:
176                    break;
177            }
178        }
179
180        #endregion
181
182        #region THE RELEVANT PART
183
184        // this method only processes Payload Data, Internal-organisation Data will be handled internally!
185        protected override void HandleIncomingData(PeerId senderId, byte[] data)
186        {
187            if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.JobPart)
188            {
189                //added by Arnold 2010.03.22
190                this.timerWaitingForJobs.Stop(); //when receiving a new job, time can be stopped
191
192                BigInteger jobId = 0;
193                GuiLogging("Received a JobPart from '" + senderId.ToString() + "'", NotificationLevel.Debug);
194                byte[] serializedRawJobPartData = JobMessages.GetJobPartMessage(data, out jobId);
195                StartProcessing(senderId, serializedRawJobPartData);
196            }
197            else if (JobMessages.GetMessageJobType(data[0]) == MessageJobType.NoMoreJobsLeft)
198            {
199                this.ActualMngrHasNoMoreJobsLeft = true;
200                this.timerWaitingForJobs.Stop();
201                GuiLogging("Received 'no more jobs left' message from the Manager. Stopped waiting for jobs timer.", NotificationLevel.Debug);
202                // TODO: for future use maybe register to another task, because this task has no more
203                //       jobs to computate --> after registering with a new Manager, set this.ActualMngrHasNoMoreJobsLeft to true!!!
204            }
205            else
206            {
207                GuiLogging("Received some strange data (no JobPart) from peer '" + senderId.ToString()
208                    + "'. Data: " + Encoding.UTF8.GetString(data), NotificationLevel.Debug);
209            }
210        }
211
212        /// <summary>
213        /// Starts processing the incoming JobPart, when the waiting Stack has no elements
214        /// (it will be priorized).
215        /// </summary>
216        /// <param name="senderId">Sender Id</param>
217        /// <param name="data">already DistributableJob-"unpacked" serialized JobPart data</param>
218        private void StartProcessing(PeerId senderId, byte[] data)
219        {
220            if (this.workerControl == null) // eventually check if sender is the actual publisher, too...
221            {
222                GuiLogging("Processing a new job isn't possible, because IWorkerControl isn't initialized yet.", NotificationLevel.Info);
223                return;
224            }
225
226            BigInteger jobId;           
227
228            if (this.IsWorking) // if it's still working, add Job to a waiting stack
229            {               
230                this.waitingJobStack.Push(data);
231                GuiLogging("New incoming job will be pushed to the 'waitingJobStack', because the Worker is still processing a job.", NotificationLevel.Debug);
232            }
233            else
234            {
235                this.IsWorking = true;
236                bool result = this.workerControl.StartProcessing(data, out jobId);
237
238                // visualize Working-Status in PlugIn, if it has registered with this event
239                if(result)
240                {
241                    if (OnStartWorking != null)
242                        OnStartWorking();
243                }
244
245                byte[] jobAcceptanceStatus = JobMessages.CreateJobAcceptanceMessage(jobId, result);
246                this.p2pControl.SendToPeer(jobAcceptanceStatus, senderId);
247
248                this.IsWorking = result;
249
250                GuiLogging("Processing started? " + result + ". JobId: " + jobId.ToString(), NotificationLevel.Debug);
251            }
252        }
253
254        /// <summary>
255        /// When IWorkerControl throws this event, processing is (successfully) ended. So send the
256        /// Result to the Manager and decide whether to process a "stacked" job or ask for new Jobs
257        /// </summary>
258        /// <param name="jobId">JobId of the (successfully) ended Result</param>
259        /// <param name="result">serialized Result data</param>
260        private void workerControl_OnProcessingSuccessfullyEnded(BigInteger jobId, byte[] result)
261        {
262            //GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
263            GuiLogging("Sending job result to Manager. JobId: " + jobId.ToString() + ". Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
264            this.p2pControl.SendToPeer(JobMessages.CreateJobResultMessage(jobId, result), base.ActualPublisher);
265
266            // set working flag to false, so processing a new job is possible
267            this.IsWorking = false;
268            // visualizes the Working-Status, if PlugIn registered with this event
269            if (OnSuccessfullyEnded != null)
270                OnSuccessfullyEnded();
271
272            CheckIfAnyJobsLeft();   
273        }
274
275        private void CheckIfAnyJobsLeft()
276        {
277            if (this.waitingJobStack.Count > 0)
278            {
279                GuiLogging("There's a job in the 'waitingJob'-Stack, so process it before processing new incoming jobs from the Manager.", NotificationLevel.Info);
280                StartProcessing(base.ActualPublisher, this.waitingJobStack.Pop());
281            }
282            else
283            {
284                if (!this.ActualMngrHasNoMoreJobsLeft)
285                {
286                    // no more jobs in the waiting stack, so send Mngr the information, that Worker is waiting for new jobs now
287                    this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
288                    GuiLogging("No jobs in the 'waitingJob'-Stack, so send 'free'-information to the Manager. Mngr-Id: '" + base.ActualPublisher.ToString() + "'.", NotificationLevel.Info);
289                    // If this timer elapses, it will check if the isWorking flag is true. Than it will stop the timer.
290                    // Otherwise it will send a new free msg to the Manager, if the last free msg got lost
291                    this.timerWaitingForJobs.Start();
292                }
293                else
294                {
295                    GuiLogging("Worker has noticed that Manager has no more jobs left.", NotificationLevel.Debug);
296                    // TODO: for future use maybe register to another task, because this task has no more
297                    //       jobs to computate --> after registering with a new Manager, set this.ActualMngrHasNoMoreJobsLeft to true!!!
298                }
299            }
300        }
301
302        // Added by Arnold - 2010.03.22
303        /// <summary>
304        /// this method is only necessary when the regular free message got lost on the way to the manager.
305        /// In this case the Worker won't get any more jobs, so it sends a free message
306        /// </summary>
307        /// <param name="sender"></param>
308        /// <param name="e"></param>
309        void timerWaitingForJobs_Elapsed(object sender, ElapsedEventArgs e)
310        {
311            if (!isWorking)
312            {
313                this.timerWaitingForJobs.Interval += 2000; // every time this event is thrown, heighten the timer interval
314                base.SendRegMsg();
315                this.p2pControl.SendToPeer(JobMessages.CreateFreeWorkerStatusMessage(true), base.ActualPublisher);
316                GuiLogging("Because the last 'free worker'-Message got lost, try again.", NotificationLevel.Info);
317            }
318            else
319            {
320                // reset timer interval - it could be heighten in the if brace...
321                this.timerWaitingForJobs.Interval = WAITING_FOR_JOBS;
322                // when Worker is working, than the time can be stopped
323                this.timerWaitingForJobs.Stop();
324            }
325        }
326
327        void workerControl_OnProcessingCanceled(byte[] result)
328        {
329            if (OnCanceledWorking != null)
330                OnCanceledWorking();
331            CheckIfAnyJobsLeft();
332        }
333
334        #endregion
335    }
336}
Note: See TracBrowser for help on using the repository browser.