source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1876

Last change on this file since 1876 was 1876, checked in by kopal, 11 years ago

some ExecutionEngine optimizations

File size: 26.1 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45        private volatile bool isRunning = false;
46
47        public long ExecutedPluginsCounter { get; set; }
48        public bool BenchmarkPlugins { get; set; }
49        public long GuiUpdateInterval { get; set; }
50        public int SleepTime { get; set; }
51
52        /// <summary>
53        /// Creates a new ExecutionEngine
54        /// </summary>
55        /// <param name="workspaceManagerEditor"></param>
56        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
57        {
58            WorkspaceManagerEditor = workspaceManagerEditor;
59        }
60
61        /// <summary>
62        /// Is this ExecutionEngine running?
63        /// </summary>
64        public bool IsRunning
65        {
66            get{return this.isRunning;}
67            private set{this.isRunning = value;}
68        }
69
70        /// <summary>
71        /// Execute the given Model
72        /// </summary>
73        /// <param name="workspaceModel"></param>
74        public void Execute(WorkspaceModel workspaceModel, int amountSchedulers)
75        {
76            if (!IsRunning)
77            {
78                IsRunning = true;
79                this.workspaceModel = workspaceModel;
80
81                if (amountSchedulers <= 0)
82                {
83                    amountSchedulers = 1;
84                }
85
86                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
87                //We do this, because measurements showed that we get the best performance if we
88                //use this amount of schedulers
89                schedulers = new Scheduler[amountSchedulers];
90                for (int i = 0; i < amountSchedulers; i++)
91                {
92                    schedulers[i] = new WorkspaceManagerScheduler("WorkspaceManagerScheduler-" + i);
93                    ((WorkspaceManagerScheduler)schedulers[i]).executionEngine = this;
94                }
95               
96                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
97                workspaceModel.resetStates();
98
99                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
100                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
101                schedulers[0].AddProtocol(updateGuiProtocol);
102                updateGuiProtocol.Start();
103
104                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
105                if (this.BenchmarkPlugins)
106                {
107                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
108                    schedulers[0].AddProtocol(benchmarkProtocol);
109                    benchmarkProtocol.Start();
110                }
111
112                //Here we create for each PluginModel an own PluginProtocol
113                //By using round-robin we give each protocol to another scheduler to gain
114                //a good average load balancing of the schedulers
115                //we also initalize each plugin
116                //It is possible that a plugin is also a PluginProtocol
117                //if that is true we do not create a new one but use the plugin instead the created one
118                int counter=0;
119                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
120                {
121                    PluginProtocol pluginProtocol = null;
122
123                    if (pluginModel.Plugin is PluginProtocol)
124                    {
125                        pluginProtocol = (PluginProtocol)pluginModel.Plugin;
126                        pluginProtocol.setExecutionEngineSettings(schedulers[counter], pluginModel, this);
127                    }
128                    else
129                    {
130                        pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel, this);
131                    }
132
133                    pluginModel.Plugin.PreExecution();                   
134                    pluginModel.PluginProtocol = pluginProtocol;
135                    schedulers[counter].AddProtocol(pluginProtocol);
136
137                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
138                    {
139                        pluginProtocol.Start();
140                    }
141
142                    counter = (counter + 1) % (amountSchedulers);
143
144                    if (pluginModel.Startable)
145                    {
146                        MessageExecution msg = new MessageExecution();
147                        msg.PluginModel = pluginModel;
148                        pluginProtocol.BroadcastMessageReliably(msg);
149                    }
150                }
151
152                foreach (Scheduler scheduler in schedulers)
153                {
154                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
155                }
156            }
157        }     
158     
159        /// <summary>
160        /// Stop the execution process:
161        /// calls shutdown on all schedulers + calls stop() on each plugin
162        /// </summary>
163        public void Stop()
164        {
165            //First stop alle plugins
166            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
167            {
168                pluginModel.Plugin.Stop();
169                pluginModel.Plugin.PostExecution();
170            }           
171
172            IsRunning = false;
173            //Secondly stop all Gears4Net Schedulers
174            foreach (Scheduler scheduler in schedulers)
175            {
176                scheduler.Shutdown();
177            }
178             
179        }
180
181        /// <summary>
182        /// Pause the execution
183        /// </summary>
184        public void Pause()
185        {
186            //not implemented yet
187        }
188
189        /// <summary>
190        /// Use the logger of the WorkspaceManagerEditor
191        /// </summary>
192        /// <param name="message"></param>
193        /// <param name="level"></param>
194        public void GuiLogMessage(string message, NotificationLevel level)
195        {           
196            WorkspaceManagerEditor.GuiLogMessage(message, level);
197        }           
198    }
199 
200    /// <summary>
201    /// Message send to scheduler for a Plugin to trigger the Execution
202    /// </summary>
203    public class MessageExecution : MessageBase
204    {
205        public PluginModel PluginModel;
206    }
207
208    /// <summary>
209    /// A Protocol for updating the GUI in time intervals
210    /// </summary>
211    public class UpdateGuiProtocol : ProtocolBase
212    {
213        private WorkspaceModel workspaceModel;
214        private ExecutionEngine executionEngine;     
215
216        /// <summary>
217        /// Create a new protocol. Each protocol requires a scheduler which provides
218        /// a thread for execution.
219        /// </summary>
220        /// <param name="scheduler"></param>
221        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
222            : base(scheduler)
223        {
224            this.workspaceModel = workspaceModel;
225            this.executionEngine = executionEngine;           
226        }
227
228        /// <summary>
229        /// The main function of the protocol
230        /// </summary>
231        /// <param name="stateMachine"></param>
232        /// <returns></returns>
233        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
234        {
235            while (this.executionEngine.IsRunning)
236            {
237                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
238            }
239        }
240
241        /// <summary>
242        /// Handler function for a message.
243        /// This handler must not block, because it executes inside the thread of the scheduler.
244        /// </summary>
245        /// <param name="msg"></param>
246        private void HandleUpdateGui()
247        {
248            //Get the gui Thread
249            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
250            {
251                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
252                {
253                    if (pluginModel.GuiNeedsUpdate)
254                    {
255                        pluginModel.GuiNeedsUpdate = false;
256                        pluginModel.paint();
257                        if (pluginModel.UpdateableView != null)
258                        {
259                            pluginModel.UpdateableView.update();
260                        }
261                    }
262                }
263                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
264                {
265                    if (connectionModel.GuiNeedsUpdate)
266                    {
267                        if (connectionModel.UpdateableView != null)
268                        {
269                            connectionModel.UpdateableView.update();
270                        }
271                    }
272                }
273            }
274            , null);
275        }
276    }
277   
278    /// <summary>
279    /// A Protocol for benchmarking
280    /// </summary>
281    public class BenchmarkProtocol : ProtocolBase
282    {
283        private WorkspaceModel workspaceModel;
284        private ExecutionEngine executionEngine;
285
286        /// <summary>
287        /// Create a new protocol. Each protocol requires a scheduler which provides
288        /// a thread for execution.
289        /// </summary>
290        /// <param name="scheduler"></param>
291        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
292            : base(scheduler)
293        {
294            this.workspaceModel = workspaceModel;
295            this.executionEngine = executionEngine;
296        }
297
298        /// <summary>
299        /// The main function of the protocol
300        /// </summary>
301        /// <param name="stateMachine"></param>
302        /// <returns></returns>
303        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
304        {
305            while (this.executionEngine.IsRunning)
306            {
307                yield return Timeout(1000, HandleBenchmark);
308            }
309        }
310
311        /// <summary>
312        /// Handler function for a message.
313        /// This handler must not block, because it executes inside the thread of the scheduler.
314        /// </summary>
315        /// <param name="msg"></param>
316        private void HandleBenchmark()
317        {
318            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
319            this.executionEngine.ExecutedPluginsCounter = 0;
320        }
321
322    }
323
324    /// <summary>
325    /// A Protocol for a PluginModel
326    /// </summary>
327    public class PluginProtocol : ProtocolBase
328    {       
329        public PluginModel PluginModel;
330        private ExecutionEngine executionEngine;
331
332        /// <summary>
333        /// Create a new protocol. Each protocol requires a scheduler which provides
334        /// a thread for execution.
335        /// </summary>
336        /// <param name="scheduler"></param>
337        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
338            : base(scheduler)
339        {
340            this.PluginModel = pluginModel;
341            this.executionEngine = executionEngine;
342        }
343
344        /// <summary>
345        ///
346        /// </summary>
347        /// <param name="scheduler"></param>
348        /// <param name="pluginModel"></param>
349        /// <param name="executionEngine"></param>
350        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
351        {
352            this.Scheduler = scheduler;
353            this.PluginModel = pluginModel;
354            this.executionEngine = executionEngine;
355        }
356
357
358        /// <summary>
359        /// The main function of the protocol     
360        /// </summary>
361        /// <param name="stateMachine"></param>
362        /// <returns></returns>
363        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
364        {
365            while (this.executionEngine.IsRunning)
366            {
367                yield return Receive<MessageExecution>(this.Filter, HandleExecute);
368            }
369        }
370
371        /// <summary>
372        /// Filter that checks wether the Plugin fits to the internal Plugin reference of this PluginProtocl
373        /// </summary>
374        /// <param name="msg"></param>
375        /// <returns></returns>
376        private bool Filter(MessageExecution msg)
377        {
378            if (msg.PluginModel != this.PluginModel)
379            {
380                return false;
381            }
382            return true;
383        }
384        /// <summary>
385        /// Handle an execution of a plugin
386        /// </summary>
387        /// <param name="msg"></param>
388        private void HandleExecute(MessageExecution msg)
389        {
390            // 1. Check if Plugin may Execute
391            if (!mayExecute(PluginModel))
392            {
393                return;
394            }
395
396            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
397            if (!fillInputs())
398            {
399                return;
400            }
401
402            //3. Execute the Plugin -> call the IPlugin.Execute()
403            try
404            {
405                PluginModel.Plugin.Execute();
406            }
407            catch (Exception ex)
408            {
409                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
410                this.PluginModel.State = PluginModelState.Error;
411                this.PluginModel.GuiNeedsUpdate = true;
412                return;
413            }
414
415            //4. Count for the benchmark
416            if (this.executionEngine.BenchmarkPlugins)
417            {
418                this.executionEngine.ExecutedPluginsCounter++;
419            }
420
421            //5. If the user wants to, sleep some time
422            if (this.executionEngine.SleepTime > 0)
423            {
424                Thread.Sleep(this.executionEngine.SleepTime);
425            }
426
427            //6. Clear all used inputs
428            //clearInputs();
429
430            //7. Send execute messages to possible executable next plugins
431            //runNextPlugins();
432        }
433
434        /// <summary>
435        /// Send execute messages to possible executable next plugins
436        /// </summary>
437        public void runNextPlugins()
438        {           
439            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
440            {
441                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
442                {
443                    if (!connectionModel.From.PluginModel.Startable ||
444                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
445                    {
446                        if (mayExecute(connectionModel.From.PluginModel))
447                        {
448                            MessageExecution message_exec = new MessageExecution();
449                            message_exec.PluginModel = connectionModel.From.PluginModel;
450                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
451                        }
452                    }
453                }
454            }
455        }
456     
457        /// <summary>
458        /// Delete all input data of inputs of the plugin
459        /// </summary>
460        public void clearInputs()
461        {
462            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
463            {
464                if (connectorModel.HasData)
465                {
466                    connectorModel.Data = null;
467                    connectorModel.HasData = false;
468                    connectorModel.GuiNeedsUpdate = true;
469
470                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
471                    {
472                        connectionModel.Active = false;
473                        connectorModel.GuiNeedsUpdate = true;
474                    }
475                }
476            }
477        }
478
479        /// <summary>
480        /// Fill all inputs of the plugin
481        /// </summary>
482        /// <returns></returns>
483        public bool fillInputs()
484        {
485            //Fill the plugins inputs with data
486            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
487            {
488                try
489                {
490                    if (connectorModel.HasData && connectorModel.Data.value != null)
491                    {
492                        if (connectorModel.IsDynamic)
493                        {
494                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
495                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
496                        }
497                        else
498                        {
499                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
500                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
501                        }
502                    }
503                }
504                catch (Exception ex)
505                {
506                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
507                    this.PluginModel.State = PluginModelState.Error;
508                    this.PluginModel.GuiNeedsUpdate = true;
509                    return false;
510                }
511            }
512            return true;
513        }
514
515        /// <summary>
516        ///
517        /// </summary>
518        /// <returns></returns>
519        public bool mayExecute()
520        {
521            return mayExecute(this.PluginModel);
522        }
523
524        /// <summary>
525        /// Check if the PluginModel may execute
526        /// </summary>
527        /// <param name="pluginModel"></param>
528        /// <returns></returns>
529        private bool mayExecute(PluginModel pluginModel)
530        {
531            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
532            {
533                return false;
534            }
535
536            //Check if all necessary inputs are set
537            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
538            {
539                if (!connectorModel.IControl &&
540                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
541                    connectorModel.Data == null))
542                {
543                    return false;
544                }
545            }
546
547            //Check if all outputs are free
548            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
549            {
550                if (!connectorModel.IControl)
551                {
552                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
553                    {
554                        if (connectionModel.To.HasData)
555                        {
556                            return false;
557                        }
558                    }
559                }
560            }
561            return true;
562        }
563
564        /// <summary>
565        ///
566        /// </summary>
567        public ExecutionEngine ExecutionEngine
568        {
569            get { return this.executionEngine; }           
570        }
571    }
572
573    /// <summary>
574    /// Gears4Net Scheduler
575    /// </summary>
576    public class WorkspaceManagerScheduler : Scheduler
577    {
578        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
579        private bool shutdown = false;
580        private System.Threading.Thread thread;
581        private Context currentContext;
582        public ExecutionEngine executionEngine = null;
583
584                public WorkspaceManagerScheduler() : this(String.Empty)
585                {
586
587                }
588
589        public WorkspaceManagerScheduler(string name)
590            : base()
591        {
592            this.currentContext = Thread.CurrentContext;
593
594            thread = new System.Threading.Thread(this.Start);
595            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
596                        thread.Name = name;
597           
598        }
599
600        public void startScheduler()
601        {
602            thread.Start();
603        }
604
605        private void Start()
606        {
607            if (this.currentContext != Thread.CurrentContext){
608                this.currentContext.DoCallBack(Start);}
609           
610            this.executionEngine.GuiLogMessage("Scheduler " + this.thread.Name + " up and running", NotificationLevel.Debug);
611
612            // Loop forever
613            while (true)
614            {
615                this.wakeup.WaitOne();
616
617                // Loop while there are more protocols waiting
618                while (true)
619                {
620                    // Should the scheduler stop?
621                    if (this.shutdown)
622                    {
623                        this.executionEngine.GuiLogMessage("Scheduler " + this.thread.Name + " terminated", NotificationLevel.Debug);
624                        return;
625                    }
626                   
627                    ProtocolBase protocol = null;
628                    lock (this)
629                    {
630                        // No more protocols? -> Wait
631                        if (this.waitingProtocols.Count == 0)
632                            break;
633                    }
634
635                    try
636                    {
637                        protocol = this.waitingProtocols.Dequeue();
638                        ProtocolStatus status = protocol.Run();
639
640                        lock (this)
641                        {
642                            switch (status)
643                            {
644                                case ProtocolStatus.Created:
645                                    System.Diagnostics.Debug.Assert(false);
646                                    break;
647                                case ProtocolStatus.Ready:
648                                    this.waitingProtocols.Enqueue(protocol);
649                                    break;
650                                case ProtocolStatus.Waiting:
651                                    break;
652                                case ProtocolStatus.Terminated:
653                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
654                                    this.RemoveProtocol(protocol);
655                                    break;
656                            }
657                        }
658                    }
659                    catch (Exception ex) 
660                    {
661                        System.Diagnostics.Debug.Fail("Error during scheduling: " + ex.Message + " - " + ex.InnerException);
662                    }
663                }
664            }
665        }
666
667        /// <summary>
668        /// Removes a protocol from the internal queue
669        /// </summary>
670        /// <param name="protocol"></param>
671        public override void RemoveProtocol(ProtocolBase protocol)
672        {
673            lock (this)
674            {
675                this.protocols.Remove(protocol);
676                if (this.protocols.Count == 0)
677                    this.Shutdown();
678            }
679        }
680
681        /// <summary>
682        /// Adds a protocol to the internal queue
683        /// </summary>
684        /// <param name="protocol"></param>
685        public override void AddProtocol(ProtocolBase protocol)
686        {
687            lock (this)
688            {
689                this.protocols.Add(protocol);
690            }
691        }
692
693        /// <summary>
694        /// Wakeup this scheduler
695        /// </summary>
696        /// <param name="protocol"></param>
697        public override void Wakeup(ProtocolBase protocol)
698        {
699            lock (this)
700            {
701                if (!this.waitingProtocols.Contains(protocol))
702                    this.waitingProtocols.Enqueue(protocol);
703                this.wakeup.Set();
704            }
705        }
706
707        /// <summary>
708        /// Terminates the scheduler
709        /// </summary>
710        public override void Shutdown()
711        {
712            this.shutdown = true;
713            this.wakeup.Set();
714        }
715    }
716}
Note: See TracBrowser for help on using the repository browser.