source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1977

Last change on this file since 1977 was 1977, checked in by kopal, 11 years ago
  • the executionEngine now stops its threads without exception
  • a plugin chain can only be started now if an "old" run of the executionEngine has been already terminated (that means all of its threads are terminated)
  • stopping of executionEngine threads is done in an own thread so that the rest of ct2 (and its gui) doesnt block (stopping threads waits for all threads to stop and logs this event to the log window)
  • the stop button now can be clicked several time without an error (also in an executing "stopping" run)
File size: 25.1 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30using System.IO;
31using System.Diagnostics;
32
33namespace WorkspaceManager.Execution
34{
35    /// <summary>
36    /// Engine to execute a model of the WorkspaceManager
37    /// This class needs a WorkspaceManager to be instantiated
38    /// To run an execution process it also needs a WorkspaceModel
39    ///
40    /// This class uses Gears4Net to execute the plugins
41    /// </summary>
42    public class ExecutionEngine
43    {
44        private WorkspaceManager WorkspaceManagerEditor;
45        private Scheduler scheduler;
46        private WorkspaceModel workspaceModel;
47        private volatile bool isRunning = false;
48        private BenchmarkProtocol BenchmarkProtocol = null;
49
50        public volatile int ExecutedPluginsCounter = 0;
51        public bool BenchmarkPlugins = false;
52        public int GuiUpdateInterval = 0;
53        public int SleepTime = 0;
54        public int ThreadPriority = 0;
55
56        /// <summary>
57        /// Creates a new ExecutionEngine
58        /// </summary>
59        /// <param name="workspaceManagerEditor"></param>
60        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
61        {
62            WorkspaceManagerEditor = workspaceManagerEditor;
63        }
64
65        /// <summary>
66        /// Is this ExecutionEngine running?
67        /// </summary>
68        public bool IsRunning
69        {
70            get{return this.isRunning;}
71            private set{this.isRunning = value;}
72        }
73
74        /// <summary>
75        /// Execute the given Model
76        /// </summary>
77        /// <param name="workspaceModel"></param>
78        public void Execute(WorkspaceModel workspaceModel, int amountThreads)
79        {
80            if (!IsRunning)
81            {
82                IsRunning = true;
83                this.workspaceModel = workspaceModel;
84
85                if (amountThreads <= 0)
86                {
87                    amountThreads = 1;
88                }
89
90                scheduler = new WorkspaceManagerScheduler("WorkspaceManagerScheduler", amountThreads,this);
91               
92                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
93                workspaceModel.resetStates();
94
95                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
96                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(scheduler, workspaceModel, this);
97                scheduler.AddProtocol(updateGuiProtocol);
98                updateGuiProtocol.Start();
99
100                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
101                if (this.BenchmarkPlugins)
102                {
103                    BenchmarkProtocol = new BenchmarkProtocol(scheduler, this.workspaceModel, this);
104                    scheduler.AddProtocol(BenchmarkProtocol);
105                    BenchmarkProtocol.Start();
106                }
107
108                //Here we create for each PluginModel an own PluginProtocol
109                //By using round-robin we give each protocol to another scheduler to gain
110                //a good average load balancing of the schedulers
111                //we also initalize each plugin
112                //It is possible that a plugin is also a PluginProtocol
113                //if that is true we do not create a new one but use the plugin instead the created one               
114                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
115                {
116                    PluginProtocol pluginProtocol = new PluginProtocol(scheduler, pluginModel, this);
117                    MessageExecution message = new MessageExecution();
118                    message.PluginModel = pluginModel;
119                    pluginModel.MessageExecution = message;
120
121                    pluginModel.Plugin.PreExecution();                   
122                    pluginModel.PluginProtocol = pluginProtocol;
123                    scheduler.AddProtocol(pluginProtocol);
124
125                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
126                    {
127                        pluginProtocol.Start();
128                    }
129                 
130                    if (pluginModel.Startable)
131                    {
132                        pluginProtocol.BroadcastMessage(pluginModel.MessageExecution);
133                    }
134                }
135
136                ((WorkspaceManagerScheduler)scheduler).startScheduling();
137               
138            }
139        }     
140     
141        /// <summary>
142        /// Stop the execution process:
143        /// calls shutdown on all schedulers + calls stop() on each plugin
144        /// </summary>
145        public void Stop()
146        {
147           
148            //First stop alle plugins
149            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
150            {
151                pluginModel.Plugin.Stop();                   
152            }
153
154            IsRunning = false;
155            //Secondly stop all Gears4Net Schedulers
156            scheduler.Shutdown();
157
158            //call all PostExecution methods of all plugins
159            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
160            {
161                pluginModel.Plugin.PostExecution();
162            }
163
164            //remove the plugin protocol of each plugin model
165            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
166            {
167                pluginModel.PluginProtocol = null;
168            }
169
170            this.WorkspaceManagerEditor = null;
171            this.workspaceModel = null;
172         
173        }
174
175        /// <summary>
176        /// Pause the execution
177        /// </summary>
178        public void Pause()
179        {
180            //not implemented yet
181        }
182
183        /// <summary>
184        /// Use the logger of the WorkspaceManagerEditor
185        /// </summary>
186        /// <param name="message"></param>
187        /// <param name="level"></param>
188        public void GuiLogMessage(string message, NotificationLevel level)
189        {
190            if (WorkspaceManagerEditor != null)
191            {
192                WorkspaceManagerEditor.GuiLogMessage(message, level);
193            }
194        }
195    }
196 
197    /// <summary>
198    /// Message send to scheduler for a Plugin to trigger the Execution
199    /// </summary>
200    public class MessageExecution : MessageBase
201    {
202        public PluginModel PluginModel;
203    }
204
205    /// <summary>
206    /// A Protocol for updating the GUI in time intervals
207    /// </summary>
208    public class UpdateGuiProtocol : ProtocolBase
209    {
210        private WorkspaceModel workspaceModel;
211        private ExecutionEngine executionEngine;
212
213        /// <summary>
214        /// Create a new protocol. Each protocol requires a scheduler which provides
215        /// a thread for execution.
216        /// </summary>
217        /// <param name="scheduler"></param>
218        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
219            : base(scheduler)
220        {
221            this.workspaceModel = workspaceModel;
222            this.executionEngine = executionEngine;           
223        }
224       
225        /// <summary>
226        /// The main function of the protocol
227        /// </summary>
228        /// <param name="stateMachine"></param>
229        /// <returns></returns>
230        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
231        {           
232            yield return new IntervalReceiver(this.executionEngine.GuiUpdateInterval,this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
233        }
234
235        /// <summary>
236        /// Handler function for a message.
237        /// This handler must not block, because it executes inside the thread of the scheduler.
238        /// </summary>
239        /// <param name="msg"></param>
240        private void HandleUpdateGui()
241        {
242            //Get the gui Thread
243            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
244            {
245                List<PluginModel> pluginModels = workspaceModel.AllPluginModels;
246                foreach (PluginModel pluginModel in pluginModels)
247                {
248                    if (pluginModel.GuiNeedsUpdate)
249                    {
250                        pluginModel.GuiNeedsUpdate = false;
251                        pluginModel.paint();
252                        if (pluginModel.UpdateableView != null)
253                        {
254                            pluginModel.UpdateableView.update();
255                        }
256                    }
257                }
258                List<ConnectionModel> connectionModels = workspaceModel.AllConnectionModels;
259                foreach (ConnectionModel connectionModel in connectionModels)
260                {
261                    if (connectionModel.GuiNeedsUpdate)
262                    {
263                        if (connectionModel.UpdateableView != null)
264                        {
265                            connectionModel.UpdateableView.update();
266                        }
267                    }
268                }
269            }
270            , null);
271
272        }
273    }
274   
275    /// <summary>
276    /// A Protocol for benchmarking
277    /// </summary>
278    public class BenchmarkProtocol : ProtocolBase
279    {
280        private WorkspaceModel workspaceModel;
281        private ExecutionEngine executionEngine;
282     
283        /// <summary>
284        /// Create a new protocol. Each protocol requires a scheduler which provides
285        /// a thread for execution.
286        /// </summary>
287        /// <param name="scheduler"></param>
288        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
289            : base(scheduler)
290        {
291            this.workspaceModel = workspaceModel;
292            this.executionEngine = executionEngine;         
293        }
294
295        /// <summary>
296        /// The main function of the protocol
297        /// </summary>
298        /// <param name="stateMachine"></param>
299        /// <returns></returns>
300        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
301        {
302            yield return new IntervalReceiver(1000,1000, HandleBenchmark);           
303        }
304
305        /// <summary>
306        /// Handler function for a message.
307        /// This handler must not block, because it executes inside the thread of the scheduler.
308        /// </summary>
309        /// <param name="msg"></param>
310        private void HandleBenchmark()
311        {
312            StringBuilder sb = new StringBuilder();
313            sb.Append("Executing at about ");
314            sb.Append(this.executionEngine.ExecutedPluginsCounter); 
315            sb.Append(" Plugins/s");
316
317            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage(sb.ToString(), NotificationLevel.Debug);         
318            this.executionEngine.ExecutedPluginsCounter = 0;
319        }       
320    }
321
322    /// <summary>
323    /// A Protocol for a PluginModel
324    /// </summary>
325    public class PluginProtocol : ProtocolBase
326    {       
327        public PluginModel PluginModel;
328        private ExecutionEngine executionEngine;
329
330        /// <summary>
331        /// Create a new protocol. Each protocol requires a scheduler which provides
332        /// a thread for execution.
333        /// </summary>
334        /// <param name="scheduler"></param>
335        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
336            : base(scheduler)
337        {
338            this.PluginModel = pluginModel;
339            this.executionEngine = executionEngine;
340        }       
341
342        /// <summary>
343        ///
344        /// </summary>
345        /// <param name="scheduler"></param>
346        /// <param name="pluginModel"></param>
347        /// <param name="executionEngine"></param>
348        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
349        {
350            this.Scheduler = scheduler;
351            this.PluginModel = pluginModel;
352            this.executionEngine = executionEngine;
353        }
354
355
356        /// <summary>
357        /// The main function of the protocol     
358        /// </summary>
359        /// <param name="stateMachine"></param>
360        /// <returns></returns>
361        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
362        {           
363            yield return new PersistentReceiver(Receive<MessageExecution>(this.Filter, HandleExecute));               
364        }
365
366        /// <summary>
367        /// Filter that checks wether the Plugin fits to the internal Plugin reference of this PluginProtocl
368        /// </summary>
369        /// <param name="msg"></param>
370        /// <returns></returns>
371        private bool Filter(MessageExecution msg)
372        {
373            if (msg.PluginModel != this.PluginModel)
374            {
375                return false;
376            }
377            return true;
378        }
379        /// <summary>
380        /// Handle an execution of a plugin
381        /// </summary>
382        /// <param name="msg"></param>
383        private void HandleExecute(MessageExecution msg)
384        {
385            // ################
386            // 1. Check if Plugin may Execute
387            // ################
388
389            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
390            {
391                return;
392            }
393
394            //Check if all necessary inputs are set
395            List<ConnectorModel> inputConnectors = msg.PluginModel.InputConnectors;
396            foreach (ConnectorModel connectorModel in inputConnectors)
397            {
398                if (!connectorModel.IControl &&
399                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
400                {
401                    return;
402                }
403            }
404
405            //Check if all outputs are free
406            List<ConnectorModel> outputConnectors = msg.PluginModel.OutputConnectors;
407            foreach (ConnectorModel connectorModel in outputConnectors)
408            {
409                if (!connectorModel.IControl)
410                {
411                    List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
412                    foreach (ConnectionModel connectionModel in outputConnections)
413                    {
414                        if (connectionModel.To.HasData)
415                        {
416                            return;
417                        }
418                    }
419                }
420            }
421
422            // ################
423            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
424            // ################
425
426            //Fill the plugins inputs with data
427            foreach (ConnectorModel connectorModel in inputConnectors)
428            {
429                try
430                {
431                    if (connectorModel.HasData && connectorModel.Data != null)
432                    {
433                        if (connectorModel.IsDynamic)
434                        {
435                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
436                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
437                        }
438                        else
439                        {
440                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
441                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
442                        }
443                    }
444                }
445                catch (Exception ex)
446                {
447                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
448                    this.PluginModel.State = PluginModelState.Error;
449                    this.PluginModel.GuiNeedsUpdate = true;
450                    return;
451                }
452            }
453
454            // ################
455            //3. Execute the Plugin -> call the IPlugin.Execute()
456            // ################
457
458            try
459            {
460                PluginModel.Plugin.Execute();
461            }
462            catch (Exception ex)
463            {
464                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
465                this.PluginModel.State = PluginModelState.Error;
466                this.PluginModel.GuiNeedsUpdate = true;
467                return;
468            }
469
470            // ################
471            //4. Count for the benchmark
472            // ################
473
474            if (this.executionEngine.BenchmarkPlugins)
475            {
476                this.executionEngine.ExecutedPluginsCounter++;               
477            }
478
479            // ################
480            //5. If the user wants to, sleep some time
481            // ################
482
483            if (this.executionEngine.SleepTime > 0)
484            {
485                Thread.Sleep(this.executionEngine.SleepTime);
486            }
487        }       
488
489        /// <summary>
490        ///
491        /// </summary>
492        public ExecutionEngine ExecutionEngine
493        {
494            get { return this.executionEngine; }           
495        }
496    }
497
498    /// <summary>
499    /// Gears4Net Scheduler
500    /// </summary>
501    public class WorkspaceManagerScheduler : Scheduler
502    {
503        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
504        private bool shutdown = false;
505        private Thread[] threads;
506        private volatile int runningThreads = 0;
507
508        public ExecutionEngine executionEngine = null;         
509
510        public WorkspaceManagerScheduler(string name, int amountThreads, ExecutionEngine executionEngine)
511            : base()
512        {
513            threads = new Thread[amountThreads];
514            this.executionEngine = executionEngine;
515
516            for (int i = 0; i < threads.Length;i++ )
517            {
518                threads[i] = new Thread(this.DoScheduling);
519                threads[i].SetApartmentState(ApartmentState.MTA);
520                threads[i].Name = name + "-Thread-" + i;
521
522                switch (this.executionEngine.ThreadPriority)
523                {
524                    case 0:
525                        threads[i].Priority = ThreadPriority.AboveNormal;
526                        break;
527                    case 1:
528                        threads[i].Priority = ThreadPriority.BelowNormal;
529                        break;
530                    case 2:
531                        threads[i].Priority = ThreadPriority.Highest;
532                        break;
533                    case 3:
534                        threads[i].Priority = ThreadPriority.Lowest;
535                        break;
536                    case 4:
537                        threads[i].Priority = ThreadPriority.Normal;
538                        break;
539                }
540            }
541           
542        }
543
544        public void startScheduling()
545        {
546            foreach (Thread thread in threads)
547            {
548                thread.Start();
549                lock (this)
550                {
551                    runningThreads++;
552                }
553            }
554        }
555       
556        private void DoScheduling()
557        {           
558
559            this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " up and running", NotificationLevel.Debug);
560            Queue<ProtocolBase> waitingProtocols = this.waitingProtocols;
561           
562            // Loop forever
563            while (true)
564            {               
565                this.wakeup.WaitOne();
566
567                // Loop while there are more protocols waiting
568                while (true)
569                {
570                    // Should the scheduler stop?
571                    if (this.shutdown)
572                    {                       
573                        this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " terminated", NotificationLevel.Debug);
574                        lock (this)
575                        {
576                            runningThreads--;
577                        }                                           
578                        return;
579                    }
580
581                    try
582                    {
583                        ProtocolBase protocol = null;
584                        lock (this)
585                        {
586                            if (waitingProtocols.Count == 0)
587                                break;
588                            protocol = waitingProtocols.Dequeue();
589                        }
590                       
591                        ProtocolStatus status = protocol.Run();
592
593                        lock (this)
594                        {
595                            switch (status)
596                            {
597                                case ProtocolStatus.Created:
598                                    System.Diagnostics.Debug.Assert(false);
599                                    break;
600                                case ProtocolStatus.Ready:
601                                    waitingProtocols.Enqueue(protocol);
602                                    break;
603                                case ProtocolStatus.Waiting:
604                                    break;
605                                case ProtocolStatus.Terminated:
606                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
607                                    this.RemoveProtocol(protocol);
608                                    break;
609                            }
610                        }
611                    }
612                    catch (Exception ex)
613                    {
614                        this.executionEngine.GuiLogMessage("Error during scheduling: " + ex.Message + " - " + ex.InnerException,NotificationLevel.Error);
615                    }
616                }
617            }
618        }
619        /// <summary>
620        /// Removes a protocol from the internal queue
621        /// </summary>
622        /// <param name="protocol"></param>
623        public override void RemoveProtocol(ProtocolBase protocol)
624        {
625            lock (this)
626            {
627                this.protocols.Remove(protocol);
628                if (this.protocols.Count == 0)
629                    this.Shutdown();
630            }
631        }
632
633        /// <summary>
634        /// Adds a protocol to the internal queue
635        /// </summary>
636        /// <param name="protocol"></param>
637        public override void AddProtocol(ProtocolBase protocol)
638        {
639            lock (this)
640            {
641                this.protocols.Add(protocol);
642            }
643        }
644
645        /// <summary>
646        /// Wakeup this scheduler
647        /// </summary>
648        /// <param name="protocol"></param>
649        public override void Wakeup(ProtocolBase protocol)
650        {
651            lock (this)
652            {
653                if (!this.waitingProtocols.Contains(protocol))
654                    this.waitingProtocols.Enqueue(protocol);
655                this.wakeup.Set();
656            }
657        }
658
659        /// <summary>
660        /// Terminates the scheduler
661        /// </summary>
662        public override void Shutdown()
663        {
664            this.shutdown = true;
665            this.wakeup.Set();
666
667            this.executionEngine.GuiLogMessage("Waiting for all scheduler threads to stop", NotificationLevel.Debug);
668            while (runningThreads > 0)
669            {
670                Thread.Sleep(50);
671                this.wakeup.Set();
672            }
673            this.executionEngine.GuiLogMessage("All scheduler threads stopped", NotificationLevel.Debug);
674
675            this.waitingProtocols.Clear();
676            this.protocols.Clear();           
677        }
678    }
679}
Note: See TracBrowser for help on using the repository browser.