source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1929

Last change on this file since 1929 was 1929, checked in by kopal, 11 years ago
  • some optimization
  • Benchmark now loggs Plugins/sec and Memory Usage to logg files (only for testing purposes; will be removed in later releases)
File size: 25.1 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30using System.IO;
31using System.Diagnostics;
32
33namespace WorkspaceManager.Execution
34{
35    /// <summary>
36    /// Engine to execute a model of the WorkspaceManager
37    /// This class needs a WorkspaceManager to be instantiated
38    /// To run an execution process it also needs a WorkspaceModel
39    ///
40    /// This class uses Gears4Net to execute the plugins
41    /// </summary>
42    public class ExecutionEngine
43    {
44        private WorkspaceManager WorkspaceManagerEditor;
45        private Scheduler scheduler;
46        private WorkspaceModel workspaceModel;
47        private volatile bool isRunning = false;
48        private BenchmarkProtocol BenchmarkProtocol;
49
50        public volatile int ExecutedPluginsCounter = 0;
51        public bool BenchmarkPlugins = false;
52        public int GuiUpdateInterval = 0;
53        public int SleepTime = 0;
54        public int ThreadPriority = 0;
55
56        /// <summary>
57        /// Creates a new ExecutionEngine
58        /// </summary>
59        /// <param name="workspaceManagerEditor"></param>
60        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
61        {
62            WorkspaceManagerEditor = workspaceManagerEditor;
63        }
64
65        /// <summary>
66        /// Is this ExecutionEngine running?
67        /// </summary>
68        public bool IsRunning
69        {
70            get{return this.isRunning;}
71            private set{this.isRunning = value;}
72        }
73
74        /// <summary>
75        /// Execute the given Model
76        /// </summary>
77        /// <param name="workspaceModel"></param>
78        public void Execute(WorkspaceModel workspaceModel, int amountThreads)
79        {
80            if (!IsRunning)
81            {
82                IsRunning = true;
83                this.workspaceModel = workspaceModel;
84
85                if (amountThreads <= 0)
86                {
87                    amountThreads = 1;
88                }
89
90                scheduler = new WorkspaceManagerScheduler("WorkspaceManagerScheduler", amountThreads,this);
91               
92                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
93                workspaceModel.resetStates();
94
95                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
96                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(scheduler, workspaceModel, this);
97                scheduler.AddProtocol(updateGuiProtocol);
98                updateGuiProtocol.Start();
99
100                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
101                if (this.BenchmarkPlugins)
102                {
103                    BenchmarkProtocol = new BenchmarkProtocol(scheduler, this.workspaceModel, this);
104                    scheduler.AddProtocol(BenchmarkProtocol);
105                    BenchmarkProtocol.Start();
106                }
107
108                //Here we create for each PluginModel an own PluginProtocol
109                //By using round-robin we give each protocol to another scheduler to gain
110                //a good average load balancing of the schedulers
111                //we also initalize each plugin
112                //It is possible that a plugin is also a PluginProtocol
113                //if that is true we do not create a new one but use the plugin instead the created one               
114                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
115                {
116                    PluginProtocol pluginProtocol = new PluginProtocol(scheduler, pluginModel, this);
117                    MessageExecution message = new MessageExecution();
118                    message.PluginModel = pluginModel;
119                    pluginModel.MessageExecution = message;
120
121                    pluginModel.Plugin.PreExecution();                   
122                    pluginModel.PluginProtocol = pluginProtocol;
123                    scheduler.AddProtocol(pluginProtocol);
124
125                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
126                    {
127                        pluginProtocol.Start();
128                    }
129                 
130                    if (pluginModel.Startable)
131                    {
132                        pluginProtocol.BroadcastMessage(pluginModel.MessageExecution);
133                    }
134                }
135
136                ((WorkspaceManagerScheduler)scheduler).startScheduling();
137               
138            }
139        }     
140     
141        /// <summary>
142        /// Stop the execution process:
143        /// calls shutdown on all schedulers + calls stop() on each plugin
144        /// </summary>
145        public void Stop()
146        {
147            try
148            {
149                //First stop alle plugins
150                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
151                {
152                    pluginModel.Plugin.Stop();
153                    pluginModel.Plugin.PostExecution();
154                }
155
156                IsRunning = false;
157                //Secondly stop all Gears4Net Schedulers
158                scheduler.Shutdown();
159
160                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
161                {
162                    pluginModel.PluginProtocol = null;
163                }
164
165
166
167                this.WorkspaceManagerEditor = null;
168                this.workspaceModel = null;
169            }
170            finally
171            {
172                BenchmarkProtocol.CloseWriters();
173            }
174        }
175
176        /// <summary>
177        /// Pause the execution
178        /// </summary>
179        public void Pause()
180        {
181            //not implemented yet
182        }
183
184        /// <summary>
185        /// Use the logger of the WorkspaceManagerEditor
186        /// </summary>
187        /// <param name="message"></param>
188        /// <param name="level"></param>
189        public void GuiLogMessage(string message, NotificationLevel level)
190        {
191            if (WorkspaceManagerEditor != null)
192            {
193                WorkspaceManagerEditor.GuiLogMessage(message, level);
194            }
195        }
196    }
197 
198    /// <summary>
199    /// Message send to scheduler for a Plugin to trigger the Execution
200    /// </summary>
201    public class MessageExecution : MessageBase
202    {
203        public PluginModel PluginModel;
204    }
205
206    /// <summary>
207    /// A Protocol for updating the GUI in time intervals
208    /// </summary>
209    public class UpdateGuiProtocol : ProtocolBase
210    {
211        private WorkspaceModel workspaceModel;
212        private ExecutionEngine executionEngine;
213
214        /// <summary>
215        /// Create a new protocol. Each protocol requires a scheduler which provides
216        /// a thread for execution.
217        /// </summary>
218        /// <param name="scheduler"></param>
219        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
220            : base(scheduler)
221        {
222            this.workspaceModel = workspaceModel;
223            this.executionEngine = executionEngine;           
224        }
225       
226        /// <summary>
227        /// The main function of the protocol
228        /// </summary>
229        /// <param name="stateMachine"></param>
230        /// <returns></returns>
231        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
232        {           
233            yield return new IntervalReceiver(this.executionEngine.GuiUpdateInterval,this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
234        }
235
236        /// <summary>
237        /// Handler function for a message.
238        /// This handler must not block, because it executes inside the thread of the scheduler.
239        /// </summary>
240        /// <param name="msg"></param>
241        private void HandleUpdateGui()
242        {
243            //Get the gui Thread
244            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
245            {
246                List<PluginModel> pluginModels = workspaceModel.AllPluginModels;
247                foreach (PluginModel pluginModel in pluginModels)
248                {
249                    if (pluginModel.GuiNeedsUpdate)
250                    {
251                        pluginModel.GuiNeedsUpdate = false;
252                        pluginModel.paint();
253                        if (pluginModel.UpdateableView != null)
254                        {
255                            pluginModel.UpdateableView.update();
256                        }
257                    }
258                }
259                List<ConnectionModel> connectionModels = workspaceModel.AllConnectionModels;
260                foreach (ConnectionModel connectionModel in connectionModels)
261                {
262                    if (connectionModel.GuiNeedsUpdate)
263                    {
264                        if (connectionModel.UpdateableView != null)
265                        {
266                            connectionModel.UpdateableView.update();
267                        }
268                    }
269                }
270            }
271            , null);
272
273        }
274    }
275   
276    /// <summary>
277    /// A Protocol for benchmarking
278    /// </summary>
279    public class BenchmarkProtocol : ProtocolBase
280    {
281        private WorkspaceModel workspaceModel;
282        private ExecutionEngine executionEngine;
283        private StreamWriter PerformanceWriter;
284        private StreamWriter MemoryPerformanceWriter;
285
286        /// <summary>
287        /// Create a new protocol. Each protocol requires a scheduler which provides
288        /// a thread for execution.
289        /// </summary>
290        /// <param name="scheduler"></param>
291        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
292            : base(scheduler)
293        {
294            this.workspaceModel = workspaceModel;
295            this.executionEngine = executionEngine;
296            string sFilename = "benchmark_" + DateTime.Now.Hour + DateTime.Now.Minute + DateTime.Now.Second + ".txt";
297            PerformanceWriter = new StreamWriter("plugins_" + sFilename);
298            MemoryPerformanceWriter = new StreamWriter("memory_" + sFilename);
299        }
300
301        /// <summary>
302        /// The main function of the protocol
303        /// </summary>
304        /// <param name="stateMachine"></param>
305        /// <returns></returns>
306        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
307        {
308            yield return new IntervalReceiver(1000,1000, HandleBenchmark);           
309        }
310
311        /// <summary>
312        /// Handler function for a message.
313        /// This handler must not block, because it executes inside the thread of the scheduler.
314        /// </summary>
315        /// <param name="msg"></param>
316        private void HandleBenchmark()
317        {
318            StringBuilder sb = new StringBuilder();
319            sb.Append("Executing at about ");
320            sb.Append(this.executionEngine.ExecutedPluginsCounter); 
321            sb.Append(" Plugins/s");
322
323            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage(sb.ToString(), NotificationLevel.Debug);
324            PerformanceWriter.WriteLine(this.executionEngine.ExecutedPluginsCounter);
325            PerformanceWriter.Flush();
326            MemoryPerformanceWriter.WriteLine(Process.GetCurrentProcess().WorkingSet64);
327            MemoryPerformanceWriter.Flush();
328            this.executionEngine.ExecutedPluginsCounter = 0;
329        }
330
331        public void CloseWriters()
332        {
333            PerformanceWriter.Close();
334            MemoryPerformanceWriter.Close();
335        }
336    }
337
338    /// <summary>
339    /// A Protocol for a PluginModel
340    /// </summary>
341    public class PluginProtocol : ProtocolBase
342    {       
343        public PluginModel PluginModel;
344        private ExecutionEngine executionEngine;
345
346        /// <summary>
347        /// Create a new protocol. Each protocol requires a scheduler which provides
348        /// a thread for execution.
349        /// </summary>
350        /// <param name="scheduler"></param>
351        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
352            : base(scheduler)
353        {
354            this.PluginModel = pluginModel;
355            this.executionEngine = executionEngine;
356        }       
357
358        /// <summary>
359        ///
360        /// </summary>
361        /// <param name="scheduler"></param>
362        /// <param name="pluginModel"></param>
363        /// <param name="executionEngine"></param>
364        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
365        {
366            this.Scheduler = scheduler;
367            this.PluginModel = pluginModel;
368            this.executionEngine = executionEngine;
369        }
370
371
372        /// <summary>
373        /// The main function of the protocol     
374        /// </summary>
375        /// <param name="stateMachine"></param>
376        /// <returns></returns>
377        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
378        {           
379            yield return new PersistentReceiver(Receive<MessageExecution>(this.Filter, HandleExecute));               
380        }
381
382        /// <summary>
383        /// Filter that checks wether the Plugin fits to the internal Plugin reference of this PluginProtocl
384        /// </summary>
385        /// <param name="msg"></param>
386        /// <returns></returns>
387        private bool Filter(MessageExecution msg)
388        {
389            if (msg.PluginModel != this.PluginModel)
390            {
391                return false;
392            }
393            return true;
394        }
395        /// <summary>
396        /// Handle an execution of a plugin
397        /// </summary>
398        /// <param name="msg"></param>
399        private void HandleExecute(MessageExecution msg)
400        {
401            // ################
402            // 1. Check if Plugin may Execute
403            // ################
404
405            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
406            {
407                return;
408            }
409
410            //Check if all necessary inputs are set
411            List<ConnectorModel> inputConnectors = msg.PluginModel.InputConnectors;
412            foreach (ConnectorModel connectorModel in inputConnectors)
413            {
414                if (!connectorModel.IControl &&
415                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
416                {
417                    return;
418                }
419            }
420
421            //Check if all outputs are free
422            List<ConnectorModel> outputConnectors = msg.PluginModel.OutputConnectors;
423            foreach (ConnectorModel connectorModel in outputConnectors)
424            {
425                if (!connectorModel.IControl)
426                {
427                    List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
428                    foreach (ConnectionModel connectionModel in outputConnections)
429                    {
430                        if (connectionModel.To.HasData)
431                        {
432                            return;
433                        }
434                    }
435                }
436            }
437
438            // ################
439            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
440            // ################
441
442            //Fill the plugins inputs with data
443            foreach (ConnectorModel connectorModel in inputConnectors)
444            {
445                try
446                {
447                    if (connectorModel.HasData && connectorModel.Data != null)
448                    {
449                        if (connectorModel.IsDynamic)
450                        {
451                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
452                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
453                        }
454                        else
455                        {
456                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
457                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
458                        }
459                    }
460                }
461                catch (Exception ex)
462                {
463                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
464                    this.PluginModel.State = PluginModelState.Error;
465                    this.PluginModel.GuiNeedsUpdate = true;
466                    return;
467                }
468            }
469
470            // ################
471            //3. Execute the Plugin -> call the IPlugin.Execute()
472            // ################
473
474            try
475            {
476                PluginModel.Plugin.Execute();
477            }
478            catch (Exception ex)
479            {
480                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
481                this.PluginModel.State = PluginModelState.Error;
482                this.PluginModel.GuiNeedsUpdate = true;
483                return;
484            }
485
486            // ################
487            //4. Count for the benchmark
488            // ################
489
490            if (this.executionEngine.BenchmarkPlugins)
491            {
492                this.executionEngine.ExecutedPluginsCounter++;               
493            }
494
495            // ################
496            //5. If the user wants to, sleep some time
497            // ################
498
499            if (this.executionEngine.SleepTime > 0)
500            {
501                Thread.Sleep(this.executionEngine.SleepTime);
502            }
503        }       
504
505        /// <summary>
506        ///
507        /// </summary>
508        public ExecutionEngine ExecutionEngine
509        {
510            get { return this.executionEngine; }           
511        }
512    }
513
514    /// <summary>
515    /// Gears4Net Scheduler
516    /// </summary>
517    public class WorkspaceManagerScheduler : Scheduler
518    {
519        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
520        private bool shutdown = false;
521        private Thread[] threads;
522        public ExecutionEngine executionEngine = null;         
523
524        public WorkspaceManagerScheduler(string name, int amountThreads, ExecutionEngine executionEngine)
525            : base()
526        {
527            threads = new Thread[amountThreads];
528            this.executionEngine = executionEngine;
529
530            for (int i = 0; i < threads.Length;i++ )
531            {
532                threads[i] = new Thread(this.DoScheduling);
533                threads[i].SetApartmentState(ApartmentState.MTA);
534                threads[i].Name = name + "-Thread-" + i;
535
536                switch (this.executionEngine.ThreadPriority)
537                {
538                    case 0:
539                        threads[i].Priority = ThreadPriority.AboveNormal;
540                        break;
541                    case 1:
542                        threads[i].Priority = ThreadPriority.BelowNormal;
543                        break;
544                    case 2:
545                        threads[i].Priority = ThreadPriority.Highest;
546                        break;
547                    case 3:
548                        threads[i].Priority = ThreadPriority.Lowest;
549                        break;
550                    case 4:
551                        threads[i].Priority = ThreadPriority.Normal;
552                        break;
553                }
554            }
555           
556        }
557
558        public void startScheduling()
559        {
560            foreach (Thread thread in threads)
561            {
562                thread.Start();
563            }
564        }
565       
566        private void DoScheduling()
567        {           
568
569            this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " up and running", NotificationLevel.Debug);
570            Queue<ProtocolBase> waitingProtocols = this.waitingProtocols;
571            int i = 0;
572
573            // Loop forever
574            while (true)
575            {               
576                this.wakeup.WaitOne();
577
578                // Loop while there are more protocols waiting
579                while (true)
580                {
581                    // Should the scheduler stop?
582                    if (this.shutdown)
583                    {
584                        this.waitingProtocols.Clear();
585                        this.protocols.Clear();
586                        this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " terminated", NotificationLevel.Debug);
587                        return;
588                    }
589
590                    try
591                    {
592                        ProtocolBase protocol = null;
593                        lock (this)
594                        {
595                            if (waitingProtocols.Count == 0)
596                                break;
597                            protocol = waitingProtocols.Dequeue();
598                        }
599                       
600                        ProtocolStatus status = protocol.Run();
601
602                        lock (this)
603                        {
604                            switch (status)
605                            {
606                                case ProtocolStatus.Created:
607                                    System.Diagnostics.Debug.Assert(false);
608                                    break;
609                                case ProtocolStatus.Ready:
610                                    waitingProtocols.Enqueue(protocol);
611                                    break;
612                                case ProtocolStatus.Waiting:
613                                    break;
614                                case ProtocolStatus.Terminated:
615                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
616                                    this.RemoveProtocol(protocol);
617                                    break;
618                            }
619                        }
620                    }
621                    catch (Exception ex)
622                    {
623                        System.Diagnostics.Debug.Fail("Error during scheduling: " + ex.Message + " - " + ex.InnerException);
624                    }
625                }
626            }
627        }
628        /// <summary>
629        /// Removes a protocol from the internal queue
630        /// </summary>
631        /// <param name="protocol"></param>
632        public override void RemoveProtocol(ProtocolBase protocol)
633        {
634            lock (this)
635            {
636                this.protocols.Remove(protocol);
637                if (this.protocols.Count == 0)
638                    this.Shutdown();
639            }
640        }
641
642        /// <summary>
643        /// Adds a protocol to the internal queue
644        /// </summary>
645        /// <param name="protocol"></param>
646        public override void AddProtocol(ProtocolBase protocol)
647        {
648            lock (this)
649            {
650                this.protocols.Add(protocol);
651            }
652        }
653
654        /// <summary>
655        /// Wakeup this scheduler
656        /// </summary>
657        /// <param name="protocol"></param>
658        public override void Wakeup(ProtocolBase protocol)
659        {
660            lock (this)
661            {
662                if (!this.waitingProtocols.Contains(protocol))
663                    this.waitingProtocols.Enqueue(protocol);
664                this.wakeup.Set();
665            }
666        }
667
668        /// <summary>
669        /// Terminates the scheduler
670        /// </summary>
671        public override void Shutdown()
672        {
673            this.shutdown = true;
674            this.wakeup.Set();
675        }       
676    }
677}
Note: See TracBrowser for help on using the repository browser.