source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 2380

Last change on this file since 2380 was 2380, checked in by kopal, 11 years ago

removed stacktrace output from exception output of the executionEngine

File size: 25.4 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30using System.IO;
31using System.Diagnostics;
32
33namespace WorkspaceManager.Execution
34{
35    /// <summary>
36    /// Engine to execute a model of the WorkspaceManager
37    /// This class needs a WorkspaceManager to be instantiated
38    /// To run an execution process it also needs a WorkspaceModel
39    ///
40    /// This class uses Gears4Net to execute the plugins
41    /// </summary>
42    public class ExecutionEngine
43    {
44        private WorkspaceManager WorkspaceManagerEditor;
45        private Scheduler scheduler;
46        private WorkspaceModel workspaceModel;
47        private volatile bool isRunning = false;
48        private BenchmarkProtocol BenchmarkProtocol = null;
49
50        public volatile int ExecutedPluginsCounter = 0;
51        public bool BenchmarkPlugins = false;
52        public int GuiUpdateInterval = 0;
53        public int SleepTime = 0;
54        public int ThreadPriority = 0;
55
56        /// <summary>
57        /// Creates a new ExecutionEngine
58        /// </summary>
59        /// <param name="workspaceManagerEditor"></param>
60        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
61        {
62            WorkspaceManagerEditor = workspaceManagerEditor;
63        }
64
65        /// <summary>
66        /// Is this ExecutionEngine running?
67        /// </summary>
68        public bool IsRunning
69        {
70            get{return this.isRunning;}
71            private set{this.isRunning = value;}
72        }
73
74        /// <summary>
75        /// Execute the given Model
76        /// </summary>
77        /// <param name="workspaceModel"></param>
78        public void Execute(WorkspaceModel workspaceModel, int amountThreads)
79        {
80            if (!IsRunning)
81            {
82                IsRunning = true;
83                this.workspaceModel = workspaceModel;
84
85                if (amountThreads <= 0)
86                {
87                    amountThreads = 1;
88                }
89
90                scheduler = new WorkspaceManagerScheduler("WorkspaceManagerScheduler", amountThreads,this);
91               
92                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
93                workspaceModel.resetStates();
94
95                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
96                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(scheduler, workspaceModel, this);
97                scheduler.AddProtocol(updateGuiProtocol);
98                updateGuiProtocol.Start();
99
100                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
101                if (this.BenchmarkPlugins)
102                {
103                    BenchmarkProtocol = new BenchmarkProtocol(scheduler, this.workspaceModel, this);
104                    scheduler.AddProtocol(BenchmarkProtocol);
105                    BenchmarkProtocol.Start();
106                }
107
108                //Here we create for each PluginModel an own PluginProtocol
109                //By using round-robin we give each protocol to another scheduler to gain
110                //a good average load balancing of the schedulers
111                //we also initalize each plugin
112                //It is possible that a plugin is also a PluginProtocol
113                //if that is true we do not create a new one but use the plugin instead the created one               
114                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
115                {
116                    PluginProtocol pluginProtocol = new PluginProtocol(scheduler, pluginModel, this);
117                    MessageExecution message = new MessageExecution();
118                    message.PluginModel = pluginModel;
119                    pluginModel.MessageExecution = message;
120
121                    pluginModel.Plugin.PreExecution();                   
122                    pluginModel.PluginProtocol = pluginProtocol;
123                    scheduler.AddProtocol(pluginProtocol);
124
125                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
126                    {
127                        pluginProtocol.Start();
128                    }
129                 
130                    if (pluginModel.Startable)
131                    {
132                        pluginProtocol.BroadcastMessage(pluginModel.MessageExecution);
133                    }
134                }
135
136                ((WorkspaceManagerScheduler)scheduler).startScheduling();
137               
138            }
139        }     
140     
141        /// <summary>
142        /// Stop the execution process:
143        /// calls shutdown on all schedulers + calls stop() on each plugin
144        /// </summary>
145        public void Stop()
146        {
147           
148            //First stop alle plugins
149            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
150            {
151                pluginModel.Plugin.Stop();                   
152            }
153
154            IsRunning = false;
155            //Secondly stop all Gears4Net Schedulers
156            scheduler.Shutdown();
157
158            //call all PostExecution methods of all plugins
159            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
160            {
161                pluginModel.Plugin.PostExecution();
162            }
163
164            //remove the plugin protocol of each plugin model
165            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
166            {
167                pluginModel.PluginProtocol = null;
168            }
169
170            this.WorkspaceManagerEditor = null;
171            this.workspaceModel = null;
172         
173        }
174
175        /// <summary>
176        /// Pause the execution
177        /// </summary>
178        public void Pause()
179        {
180            //not implemented yet
181        }
182
183        /// <summary>
184        /// Use the logger of the WorkspaceManagerEditor
185        /// </summary>
186        /// <param name="message"></param>
187        /// <param name="level"></param>
188        public void GuiLogMessage(string message, NotificationLevel level)
189        {
190            if (WorkspaceManagerEditor != null)
191            {
192                WorkspaceManagerEditor.GuiLogMessage(message, level);
193            }
194        }
195    }
196 
197    /// <summary>
198    /// Message send to scheduler for a Plugin to trigger the Execution
199    /// </summary>
200    public class MessageExecution : MessageBase
201    {
202        public PluginModel PluginModel;
203    }
204
205    /// <summary>
206    /// A Protocol for updating the GUI in time intervals
207    /// </summary>
208    public class UpdateGuiProtocol : ProtocolBase
209    {
210        private WorkspaceModel workspaceModel;
211        private ExecutionEngine executionEngine;
212
213        /// <summary>
214        /// Create a new protocol. Each protocol requires a scheduler which provides
215        /// a thread for execution.
216        /// </summary>
217        /// <param name="scheduler"></param>
218        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
219            : base(scheduler)
220        {
221            this.workspaceModel = workspaceModel;
222            this.executionEngine = executionEngine;           
223        }
224       
225        /// <summary>
226        /// The main function of the protocol
227        /// </summary>
228        /// <param name="stateMachine"></param>
229        /// <returns></returns>
230        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
231        {           
232            yield return new IntervalReceiver(this.executionEngine.GuiUpdateInterval,this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
233        }
234
235        /// <summary>
236        /// Handler function for a message.
237        /// This handler must not block, because it executes inside the thread of the scheduler.
238        /// </summary>
239        /// <param name="msg"></param>
240        private void HandleUpdateGui()
241        {
242            //Get the gui Thread
243            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
244            {
245                List<PluginModel> pluginModels = workspaceModel.AllPluginModels;
246                foreach (PluginModel pluginModel in pluginModels)
247                {
248                    if (pluginModel.GuiNeedsUpdate)
249                    {
250                        pluginModel.GuiNeedsUpdate = false;
251                        pluginModel.paint();
252                        if (pluginModel.UpdateableView != null)
253                        {
254                            pluginModel.UpdateableView.update();
255                        }
256                    }
257                }
258                List<ConnectionModel> connectionModels = workspaceModel.AllConnectionModels;
259                foreach (ConnectionModel connectionModel in connectionModels)
260                {
261                    if (connectionModel.GuiNeedsUpdate)
262                    {
263                        if (connectionModel.UpdateableView != null)
264                        {
265                            connectionModel.UpdateableView.update();
266                        }
267                    }
268                }
269            }
270            , null);
271
272        }
273    }
274   
275    /// <summary>
276    /// A Protocol for benchmarking
277    /// </summary>
278    public class BenchmarkProtocol : ProtocolBase
279    {
280        private WorkspaceModel workspaceModel;
281        private ExecutionEngine executionEngine;
282     
283        /// <summary>
284        /// Create a new protocol. Each protocol requires a scheduler which provides
285        /// a thread for execution.
286        /// </summary>
287        /// <param name="scheduler"></param>
288        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
289            : base(scheduler)
290        {
291            this.workspaceModel = workspaceModel;
292            this.executionEngine = executionEngine;         
293        }
294
295        /// <summary>
296        /// The main function of the protocol
297        /// </summary>
298        /// <param name="stateMachine"></param>
299        /// <returns></returns>
300        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
301        {
302            yield return new IntervalReceiver(1000,1000, HandleBenchmark);           
303        }
304
305        /// <summary>
306        /// Handler function for a message.
307        /// This handler must not block, because it executes inside the thread of the scheduler.
308        /// </summary>
309        /// <param name="msg"></param>
310        private void HandleBenchmark()
311        {
312            StringBuilder sb = new StringBuilder();
313            sb.Append("Executing at about ");
314            sb.Append(this.executionEngine.ExecutedPluginsCounter); 
315            sb.Append(" Plugins/s");
316
317            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage(sb.ToString(), NotificationLevel.Debug);         
318            this.executionEngine.ExecutedPluginsCounter = 0;
319        }       
320    }
321
322    /// <summary>
323    /// A Protocol for a PluginModel
324    /// </summary>
325    public class PluginProtocol : ProtocolBase
326    {       
327        public PluginModel PluginModel;
328        private ExecutionEngine executionEngine;
329
330        /// <summary>
331        /// Create a new protocol. Each protocol requires a scheduler which provides
332        /// a thread for execution.
333        /// </summary>
334        /// <param name="scheduler"></param>
335        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
336            : base(scheduler)
337        {
338            this.PluginModel = pluginModel;
339            this.executionEngine = executionEngine;
340        }       
341
342        /// <summary>
343        ///
344        /// </summary>
345        /// <param name="scheduler"></param>
346        /// <param name="pluginModel"></param>
347        /// <param name="executionEngine"></param>
348        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
349        {
350            this.Scheduler = scheduler;
351            this.PluginModel = pluginModel;
352            this.executionEngine = executionEngine;
353        }
354
355
356        /// <summary>
357        /// The main function of the protocol     
358        /// </summary>
359        /// <param name="stateMachine"></param>
360        /// <returns></returns>
361        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
362        {           
363            yield return new PersistentReceiver(Receive<MessageExecution>(this.Filter, HandleExecute));               
364        }
365
366        /// <summary>
367        /// Filter that checks wether the Plugin fits to the internal Plugin reference of this PluginProtocl
368        /// </summary>
369        /// <param name="msg"></param>
370        /// <returns></returns>
371        private bool Filter(MessageExecution msg)
372        {
373            if (msg.PluginModel != this.PluginModel)
374            {
375                return false;
376            }
377            return true;
378        }
379        /// <summary>
380        /// Handle an execution of a plugin
381        /// </summary>
382        /// <param name="msg"></param>
383        private void HandleExecute(MessageExecution msg)
384        {
385            // ################
386            // 1. Check if Plugin may Execute
387            // ################
388
389            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
390            {
391                return;
392            }
393
394            //Check if all necessary inputs are set
395            List<ConnectorModel> inputConnectors = msg.PluginModel.InputConnectors;
396            foreach (ConnectorModel connectorModel in inputConnectors)
397            {
398                if (!connectorModel.IControl &&
399                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
400                {
401                    return;
402                }
403            }
404
405            //Check if all outputs are free
406            List<ConnectorModel> outputConnectors = msg.PluginModel.OutputConnectors;
407            foreach (ConnectorModel connectorModel in outputConnectors)
408            {
409                if (!connectorModel.IControl)
410                {
411                    List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
412                    foreach (ConnectionModel connectionModel in outputConnections)
413                    {
414                        if (connectionModel.To.HasData)
415                        {
416                            return;
417                        }
418                    }
419                }
420            }
421
422            // ################
423            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
424            // ################
425
426            //Fill the plugins inputs with data
427            foreach (ConnectorModel connectorModel in inputConnectors)
428            {
429                try
430                {
431                    if (connectorModel.HasData && connectorModel.Data != null)
432                    {
433                        if (connectorModel.IsDynamic)
434                        {
435                       
436                            if(connectorModel.method == null)
437                            {
438                                connectorModel.method = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
439                            }
440                            connectorModel.method.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
441                        }
442                        else
443                        {
444                            if (connectorModel.property == null)
445                            {
446                                connectorModel.property = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
447                            }
448                            connectorModel.property.SetValue(PluginModel.Plugin, connectorModel.Data, null);
449                        }
450                    }
451                }
452                catch (Exception ex)
453                {
454                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
455                    this.PluginModel.State = PluginModelState.Error;
456                    this.PluginModel.GuiNeedsUpdate = true;
457                    return;
458                }
459            }
460
461            // ################
462            //3. Execute the Plugin -> call the IPlugin.Execute()
463            // ################
464
465            try
466            {
467                PluginModel.Plugin.Execute();
468            }
469            catch (Exception ex)
470            {       
471                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
472                this.PluginModel.State = PluginModelState.Error;
473                this.PluginModel.GuiNeedsUpdate = true;
474                return;               
475            }
476
477            // ################
478            //4. Count for the benchmark
479            // ################
480
481            if (this.executionEngine.BenchmarkPlugins)
482            {
483                this.executionEngine.ExecutedPluginsCounter++;               
484            }
485
486            // ################
487            //5. If the user wants to, sleep some time
488            // ################
489
490            if (this.executionEngine.SleepTime > 0)
491            {
492                Thread.Sleep(this.executionEngine.SleepTime);
493            }
494        }       
495
496        /// <summary>
497        ///
498        /// </summary>
499        public ExecutionEngine ExecutionEngine
500        {
501            get { return this.executionEngine; }           
502        }
503    }
504
505    /// <summary>
506    /// Gears4Net Scheduler
507    /// </summary>
508    public class WorkspaceManagerScheduler : Scheduler
509    {
510        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
511        private bool shutdown = false;
512        private Thread[] threads;
513        private volatile int runningThreads = 0;
514
515        public ExecutionEngine executionEngine = null;         
516
517        public WorkspaceManagerScheduler(string name, int amountThreads, ExecutionEngine executionEngine)
518            : base()
519        {
520            threads = new Thread[amountThreads];
521            this.executionEngine = executionEngine;
522
523            for (int i = 0; i < threads.Length;i++ )
524            {
525                threads[i] = new Thread(this.DoScheduling);
526                threads[i].SetApartmentState(ApartmentState.MTA);
527                threads[i].Name = name + "-Thread-" + i;
528
529                switch (this.executionEngine.ThreadPriority)
530                {
531                    case 0:
532                        threads[i].Priority = ThreadPriority.AboveNormal;
533                        break;
534                    case 1:
535                        threads[i].Priority = ThreadPriority.BelowNormal;
536                        break;
537                    case 2:
538                        threads[i].Priority = ThreadPriority.Highest;
539                        break;
540                    case 3:
541                        threads[i].Priority = ThreadPriority.Lowest;
542                        break;
543                    case 4:
544                        threads[i].Priority = ThreadPriority.Normal;
545                        break;
546                }
547            }
548           
549        }
550
551        public void startScheduling()
552        {
553            foreach (Thread thread in threads)
554            {
555                thread.Start();
556                lock (this)
557                {
558                    runningThreads++;
559                }
560            }
561        }
562       
563        private void DoScheduling()
564        {           
565
566            this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " up and running", NotificationLevel.Debug);
567            Queue<ProtocolBase> waitingProtocols = this.waitingProtocols;
568           
569            // Loop forever
570            while (true)
571            {               
572                this.wakeup.WaitOne();
573
574                // Loop while there are more protocols waiting
575                while (true)
576                {
577                    // Should the scheduler stop?
578                    if (this.shutdown)
579                    {                       
580                        this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " terminated", NotificationLevel.Debug);
581                        lock (this)
582                        {
583                            runningThreads--;
584                        }                                           
585                        return;
586                    }
587
588                    try
589                    {
590                        ProtocolBase protocol = null;
591                        lock (this)
592                        {
593                            if (waitingProtocols.Count == 0)
594                                break;
595                            protocol = waitingProtocols.Dequeue();
596                        }
597                       
598                        ProtocolStatus status = protocol.Run();
599
600                        lock (this)
601                        {
602                            switch (status)
603                            {
604                                case ProtocolStatus.Created:
605                                    System.Diagnostics.Debug.Assert(false);
606                                    break;
607                                case ProtocolStatus.Ready:
608                                    waitingProtocols.Enqueue(protocol);
609                                    break;
610                                case ProtocolStatus.Waiting:
611                                    break;
612                                case ProtocolStatus.Terminated:
613                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
614                                    this.RemoveProtocol(protocol);
615                                    break;
616                            }
617                        }
618                    }
619                    catch (Exception ex)
620                    {
621                        this.executionEngine.GuiLogMessage("Error during scheduling: " + ex.Message + " - " + ex.InnerException,NotificationLevel.Error);
622                    }
623                }
624            }
625        }
626        /// <summary>
627        /// Removes a protocol from the internal queue
628        /// </summary>
629        /// <param name="protocol"></param>
630        public override void RemoveProtocol(ProtocolBase protocol)
631        {
632            lock (this)
633            {
634                this.protocols.Remove(protocol);
635                if (this.protocols.Count == 0)
636                    this.Shutdown();
637            }
638        }
639
640        /// <summary>
641        /// Adds a protocol to the internal queue
642        /// </summary>
643        /// <param name="protocol"></param>
644        public override void AddProtocol(ProtocolBase protocol)
645        {
646            lock (this)
647            {
648                this.protocols.Add(protocol);
649            }
650        }
651
652        /// <summary>
653        /// Wakeup this scheduler
654        /// </summary>
655        /// <param name="protocol"></param>
656        public override void Wakeup(ProtocolBase protocol)
657        {
658            lock (this)
659            {
660                if (!this.waitingProtocols.Contains(protocol))
661                    this.waitingProtocols.Enqueue(protocol);
662                this.wakeup.Set();
663            }
664        }
665
666        /// <summary>
667        /// Terminates the scheduler
668        /// </summary>
669        public override void Shutdown()
670        {
671            this.shutdown = true;
672            this.wakeup.Set();
673
674            this.executionEngine.GuiLogMessage("Waiting for all scheduler threads to stop", NotificationLevel.Debug);
675            while (runningThreads > 0)
676            {
677                Thread.Sleep(50);
678                this.wakeup.Set();
679            }
680            this.executionEngine.GuiLogMessage("All scheduler threads stopped", NotificationLevel.Debug);
681
682            this.waitingProtocols.Clear();
683            this.protocols.Clear();           
684        }
685    }
686}
Note: See TracBrowser for help on using the repository browser.