source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1884

Last change on this file since 1884 was 1884, checked in by kopal, 11 years ago
  • created multi processor gears4net scheduler for execution engine
  • added setting for thread priority
File size: 23.7 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler scheduler;
44        private WorkspaceModel workspaceModel;
45        private volatile bool isRunning = false;
46
47        public long ExecutedPluginsCounter { get; set; }
48        public bool BenchmarkPlugins { get; set; }
49        public long GuiUpdateInterval { get; set; }
50        public int SleepTime { get; set; }
51        public int ThreadPriority { get; set; }
52
53        /// <summary>
54        /// Creates a new ExecutionEngine
55        /// </summary>
56        /// <param name="workspaceManagerEditor"></param>
57        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
58        {
59            WorkspaceManagerEditor = workspaceManagerEditor;
60        }
61
62        /// <summary>
63        /// Is this ExecutionEngine running?
64        /// </summary>
65        public bool IsRunning
66        {
67            get{return this.isRunning;}
68            private set{this.isRunning = value;}
69        }
70
71        /// <summary>
72        /// Execute the given Model
73        /// </summary>
74        /// <param name="workspaceModel"></param>
75        public void Execute(WorkspaceModel workspaceModel, int amountThreads)
76        {
77            if (!IsRunning)
78            {
79                IsRunning = true;
80                this.workspaceModel = workspaceModel;
81
82                if (amountThreads <= 0)
83                {
84                    amountThreads = 1;
85                }
86
87                scheduler = new WorkspaceManagerScheduler("WorkspaceManagerScheduler", amountThreads,this);
88               
89                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
90                workspaceModel.resetStates();
91
92                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
93                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(scheduler, workspaceModel, this);
94                scheduler.AddProtocol(updateGuiProtocol);
95                updateGuiProtocol.Start();
96
97                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
98                if (this.BenchmarkPlugins)
99                {
100                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(scheduler, this.workspaceModel, this);
101                    scheduler.AddProtocol(benchmarkProtocol);
102                    benchmarkProtocol.Start();
103                }
104
105                //Here we create for each PluginModel an own PluginProtocol
106                //By using round-robin we give each protocol to another scheduler to gain
107                //a good average load balancing of the schedulers
108                //we also initalize each plugin
109                //It is possible that a plugin is also a PluginProtocol
110                //if that is true we do not create a new one but use the plugin instead the created one
111                int counter=0;
112                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
113                {
114                    PluginProtocol pluginProtocol = new PluginProtocol(scheduler, pluginModel, this);
115                   
116                    pluginModel.Plugin.PreExecution();                   
117                    pluginModel.PluginProtocol = pluginProtocol;
118                    scheduler.AddProtocol(pluginProtocol);
119
120                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
121                    {
122                        pluginProtocol.Start();
123                    }
124
125                    counter = (counter + 1) % (amountThreads);
126
127                    if (pluginModel.Startable)
128                    {
129                        MessageExecution msg = new MessageExecution();
130                        msg.PluginModel = pluginModel;
131                        pluginProtocol.BroadcastMessageReliably(msg);
132                    }
133                }
134
135                ((WorkspaceManagerScheduler)scheduler).startScheduling();
136               
137            }
138        }     
139     
140        /// <summary>
141        /// Stop the execution process:
142        /// calls shutdown on all schedulers + calls stop() on each plugin
143        /// </summary>
144        public void Stop()
145        {
146            //First stop alle plugins
147            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
148            {
149                pluginModel.Plugin.Stop();
150                pluginModel.Plugin.PostExecution();
151            }           
152
153            IsRunning = false;
154            //Secondly stop all Gears4Net Schedulers
155            scheduler.Shutdown();
156           
157        }
158
159        /// <summary>
160        /// Pause the execution
161        /// </summary>
162        public void Pause()
163        {
164            //not implemented yet
165        }
166
167        /// <summary>
168        /// Use the logger of the WorkspaceManagerEditor
169        /// </summary>
170        /// <param name="message"></param>
171        /// <param name="level"></param>
172        public void GuiLogMessage(string message, NotificationLevel level)
173        {           
174            WorkspaceManagerEditor.GuiLogMessage(message, level);
175        }
176    }
177 
178    /// <summary>
179    /// Message send to scheduler for a Plugin to trigger the Execution
180    /// </summary>
181    public class MessageExecution : MessageBase
182    {
183        public PluginModel PluginModel;
184    }
185
186    /// <summary>
187    /// A Protocol for updating the GUI in time intervals
188    /// </summary>
189    public class UpdateGuiProtocol : ProtocolBase
190    {
191        private WorkspaceModel workspaceModel;
192        private ExecutionEngine executionEngine;     
193
194        /// <summary>
195        /// Create a new protocol. Each protocol requires a scheduler which provides
196        /// a thread for execution.
197        /// </summary>
198        /// <param name="scheduler"></param>
199        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
200            : base(scheduler)
201        {
202            this.workspaceModel = workspaceModel;
203            this.executionEngine = executionEngine;           
204        }
205
206        /// <summary>
207        /// The main function of the protocol
208        /// </summary>
209        /// <param name="stateMachine"></param>
210        /// <returns></returns>
211        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
212        {
213            while (this.executionEngine.IsRunning)
214            {
215                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
216            }
217        }
218
219        /// <summary>
220        /// Handler function for a message.
221        /// This handler must not block, because it executes inside the thread of the scheduler.
222        /// </summary>
223        /// <param name="msg"></param>
224        private void HandleUpdateGui()
225        {
226            //Get the gui Thread
227            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
228            {
229                List<PluginModel> pluginModels = workspaceModel.AllPluginModels;
230                foreach (PluginModel pluginModel in pluginModels)
231                {
232                    if (pluginModel.GuiNeedsUpdate)
233                    {
234                        pluginModel.GuiNeedsUpdate = false;
235                        pluginModel.paint();
236                        if (pluginModel.UpdateableView != null)
237                        {
238                            pluginModel.UpdateableView.update();
239                        }
240                    }
241                }
242                List<ConnectionModel> connectionModels = workspaceModel.AllConnectionModels;
243                foreach (ConnectionModel connectionModel in connectionModels)
244                {
245                    if (connectionModel.GuiNeedsUpdate)
246                    {
247                        if (connectionModel.UpdateableView != null)
248                        {
249                            connectionModel.UpdateableView.update();
250                        }
251                    }
252                }
253            }
254            , null);
255        }
256    }
257   
258    /// <summary>
259    /// A Protocol for benchmarking
260    /// </summary>
261    public class BenchmarkProtocol : ProtocolBase
262    {
263        private WorkspaceModel workspaceModel;
264        private ExecutionEngine executionEngine;
265
266        /// <summary>
267        /// Create a new protocol. Each protocol requires a scheduler which provides
268        /// a thread for execution.
269        /// </summary>
270        /// <param name="scheduler"></param>
271        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
272            : base(scheduler)
273        {
274            this.workspaceModel = workspaceModel;
275            this.executionEngine = executionEngine;
276        }
277
278        /// <summary>
279        /// The main function of the protocol
280        /// </summary>
281        /// <param name="stateMachine"></param>
282        /// <returns></returns>
283        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
284        {
285            while (this.executionEngine.IsRunning)
286            {
287                yield return Timeout(1000, HandleBenchmark);
288            }
289        }
290
291        /// <summary>
292        /// Handler function for a message.
293        /// This handler must not block, because it executes inside the thread of the scheduler.
294        /// </summary>
295        /// <param name="msg"></param>
296        private void HandleBenchmark()
297        {
298            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
299            this.executionEngine.ExecutedPluginsCounter = 0;
300        }
301
302    }
303
304    /// <summary>
305    /// A Protocol for a PluginModel
306    /// </summary>
307    public class PluginProtocol : ProtocolBase
308    {       
309        public PluginModel PluginModel;
310        private ExecutionEngine executionEngine;
311
312        /// <summary>
313        /// Create a new protocol. Each protocol requires a scheduler which provides
314        /// a thread for execution.
315        /// </summary>
316        /// <param name="scheduler"></param>
317        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
318            : base(scheduler)
319        {
320            this.PluginModel = pluginModel;
321            this.executionEngine = executionEngine;
322        }
323
324        /// <summary>
325        ///
326        /// </summary>
327        /// <param name="scheduler"></param>
328        /// <param name="pluginModel"></param>
329        /// <param name="executionEngine"></param>
330        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
331        {
332            this.Scheduler = scheduler;
333            this.PluginModel = pluginModel;
334            this.executionEngine = executionEngine;
335        }
336
337
338        /// <summary>
339        /// The main function of the protocol     
340        /// </summary>
341        /// <param name="stateMachine"></param>
342        /// <returns></returns>
343        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
344        {
345            while (this.executionEngine.IsRunning)
346            {
347                yield return Receive<MessageExecution>(this.Filter, HandleExecute);
348            }
349        }
350
351        /// <summary>
352        /// Filter that checks wether the Plugin fits to the internal Plugin reference of this PluginProtocl
353        /// </summary>
354        /// <param name="msg"></param>
355        /// <returns></returns>
356        private bool Filter(MessageExecution msg)
357        {
358            if (msg.PluginModel != this.PluginModel)
359            {
360                return false;
361            }
362            return true;
363        }
364        /// <summary>
365        /// Handle an execution of a plugin
366        /// </summary>
367        /// <param name="msg"></param>
368        private void HandleExecute(MessageExecution msg)
369        {
370            // ################
371            // 1. Check if Plugin may Execute
372            // ################
373
374            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
375            {
376                return;
377            }
378
379            //Check if all necessary inputs are set
380            List<ConnectorModel> inputConnectors = msg.PluginModel.InputConnectors;
381            foreach (ConnectorModel connectorModel in inputConnectors)
382            {
383                if (!connectorModel.IControl &&
384                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
385                {
386                    return;
387                }
388            }
389
390            //Check if all outputs are free
391            List<ConnectorModel> outputConnectors = msg.PluginModel.OutputConnectors;
392            foreach (ConnectorModel connectorModel in outputConnectors)
393            {
394                if (!connectorModel.IControl)
395                {
396                    List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
397                    foreach (ConnectionModel connectionModel in outputConnections)
398                    {
399                        if (connectionModel.To.HasData)
400                        {
401                            return;
402                        }
403                    }
404                }
405            }
406
407            // ################
408            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
409            // ################
410
411            //Fill the plugins inputs with data
412            foreach (ConnectorModel connectorModel in inputConnectors)
413            {
414                try
415                {
416                    if (connectorModel.HasData && connectorModel.Data.value != null)
417                    {
418                        if (connectorModel.IsDynamic)
419                        {
420                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
421                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
422                        }
423                        else
424                        {
425                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
426                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
427                        }
428                    }
429                }
430                catch (Exception ex)
431                {
432                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
433                    this.PluginModel.State = PluginModelState.Error;
434                    this.PluginModel.GuiNeedsUpdate = true;
435                    return;
436                }
437            }
438
439            // ################
440            //3. Execute the Plugin -> call the IPlugin.Execute()
441            // ################
442
443            try
444            {
445                PluginModel.Plugin.Execute();
446            }
447            catch (Exception ex)
448            {
449                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
450                this.PluginModel.State = PluginModelState.Error;
451                this.PluginModel.GuiNeedsUpdate = true;
452                return;
453            }
454
455            // ################
456            //4. Count for the benchmark
457            // ################
458
459            if (this.executionEngine.BenchmarkPlugins)
460            {
461                this.executionEngine.ExecutedPluginsCounter++;
462            }
463
464            // ################
465            //5. If the user wants to, sleep some time
466            // ################
467
468            if (this.executionEngine.SleepTime > 0)
469            {
470                Thread.Sleep(this.executionEngine.SleepTime);
471            }
472
473        }       
474
475        /// <summary>
476        ///
477        /// </summary>
478        public ExecutionEngine ExecutionEngine
479        {
480            get { return this.executionEngine; }           
481        }
482    }
483
484    /// <summary>
485    /// Gears4Net Scheduler
486    /// </summary>
487    public class WorkspaceManagerScheduler : Scheduler
488    {
489        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
490        private bool shutdown = false;
491        private Thread[] threads;
492        public ExecutionEngine executionEngine = null;         
493
494        public WorkspaceManagerScheduler(string name, int amountThreads, ExecutionEngine executionEngine)
495            : base()
496        {
497            threads = new Thread[amountThreads];
498            this.executionEngine = executionEngine;
499
500            for (int i = 0; i < threads.Length;i++ )
501            {
502                threads[i] = new Thread(this.DoScheduling);
503                threads[i].SetApartmentState(ApartmentState.MTA);
504                threads[i].Name = name + "-Thread-" + i;
505
506                switch (this.executionEngine.ThreadPriority)
507                {
508                    case 0:
509                        threads[i].Priority = ThreadPriority.AboveNormal;
510                        break;
511                    case 1:
512                        threads[i].Priority = ThreadPriority.BelowNormal;
513                        break;
514                    case 2:
515                        threads[i].Priority = ThreadPriority.Highest;
516                        break;
517                    case 3:
518                        threads[i].Priority = ThreadPriority.Lowest;
519                        break;
520                    case 4:
521                        threads[i].Priority = ThreadPriority.Normal;
522                        break;
523                }
524            }
525           
526        }
527
528        public void startScheduling()
529        {
530            foreach (Thread thread in threads)
531            {
532                thread.Start();
533            }
534        }
535       
536        private void DoScheduling()
537        {           
538
539            this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " up and running", NotificationLevel.Debug);
540            Queue<ProtocolBase> waitingProtocols = this.waitingProtocols;
541
542            // Loop forever
543            while (true)
544            {
545                this.wakeup.WaitOne();
546
547                // Loop while there are more protocols waiting
548                while (true)
549                {
550                    // Should the scheduler stop?
551                    if (this.shutdown)
552                    {
553                        this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " terminated", NotificationLevel.Debug);
554                        return;
555                    }
556
557                    ProtocolBase protocol = null;
558                    lock (this)
559                    {
560                        // No more protocols? -> Wait
561                        if (waitingProtocols.Count == 0)
562                            break;
563                    }
564
565                    try
566                    {
567                        lock (this)
568                        {
569                            protocol = waitingProtocols.Dequeue();
570                        }
571                        ProtocolStatus status = protocol.Run();
572
573                        lock (this)
574                        {
575                            switch (status)
576                            {
577                                case ProtocolStatus.Created:
578                                    System.Diagnostics.Debug.Assert(false);
579                                    break;
580                                case ProtocolStatus.Ready:
581                                    waitingProtocols.Enqueue(protocol);
582                                    break;
583                                case ProtocolStatus.Waiting:
584                                    break;
585                                case ProtocolStatus.Terminated:
586                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
587                                    this.RemoveProtocol(protocol);
588                                    break;
589                            }
590                        }
591                    }
592                    catch (Exception ex)
593                    {
594                        System.Diagnostics.Debug.Fail("Error during scheduling: " + ex.Message + " - " + ex.InnerException);
595                    }
596                }
597            }
598        }
599        /// <summary>
600        /// Removes a protocol from the internal queue
601        /// </summary>
602        /// <param name="protocol"></param>
603        public override void RemoveProtocol(ProtocolBase protocol)
604        {
605            lock (this)
606            {
607                this.protocols.Remove(protocol);
608                if (this.protocols.Count == 0)
609                    this.Shutdown();
610            }
611        }
612
613        /// <summary>
614        /// Adds a protocol to the internal queue
615        /// </summary>
616        /// <param name="protocol"></param>
617        public override void AddProtocol(ProtocolBase protocol)
618        {
619            lock (this)
620            {
621                this.protocols.Add(protocol);
622            }
623        }
624
625        /// <summary>
626        /// Wakeup this scheduler
627        /// </summary>
628        /// <param name="protocol"></param>
629        public override void Wakeup(ProtocolBase protocol)
630        {
631            lock (this)
632            {
633                if (!this.waitingProtocols.Contains(protocol))
634                    this.waitingProtocols.Enqueue(protocol);
635                this.wakeup.Set();
636            }
637        }
638
639        /// <summary>
640        /// Terminates the scheduler
641        /// </summary>
642        public override void Shutdown()
643        {
644            this.shutdown = true;
645            this.wakeup.Set();
646        }
647    }
648}
Note: See TracBrowser for help on using the repository browser.