source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1809

Last change on this file since 1809 was 1809, checked in by kopal, 11 years ago
  • Editor now disables asynchronous PropertyChanged in EventHelpers at startup of ExecutionEngine and enables it at termination
  • now Connector PropertyChanged is used to propagate output data (so next plugins may be executed even if plugin before is running)
File size: 23.0 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49        public int SleepTime { get; set; }
50
51        /// <summary>
52        /// Creates a new ExecutionEngine
53        /// </summary>
54        /// <param name="workspaceManagerEditor"></param>
55        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
56        {
57            WorkspaceManagerEditor = workspaceManagerEditor;
58        }
59
60        /// <summary>
61        /// Is this ExecutionEngine running?
62        /// </summary>
63        public bool IsRunning
64        {
65            get;
66            private set;
67        }
68
69        /// <summary>
70        /// Execute the given Model
71        /// </summary>
72        /// <param name="workspaceModel"></param>
73        public void Execute(WorkspaceModel workspaceModel)
74        {
75            if (!IsRunning)
76            {
77                IsRunning = true;
78                this.workspaceModel = workspaceModel;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                int counter=0;
111                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
112                {
113                    pluginModel.Plugin.PreExecution();
114                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
115                    pluginModel.PluginProtocol = pluginProtocol;
116                    schedulers[counter].AddProtocol(pluginProtocol);
117                   
118                    pluginProtocol.Start();
119                    counter = (counter + 1) % (amountSchedulers);
120
121                    if (pluginModel.Startable)
122                    {
123                        MessageExecution msg = new MessageExecution();
124                        msg.PluginModel = pluginModel;
125                        pluginProtocol.BroadcastMessageReliably(msg);
126                    }
127                }
128
129                foreach (Scheduler scheduler in schedulers)
130                {
131                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
132                }
133            }
134        }     
135     
136        /// <summary>
137        /// Stop the execution process:
138        /// calls shutdown on all schedulers + calls stop() on each plugin
139        /// </summary>
140        public void Stop()
141        {
142            //First stop alle plugins
143            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
144            {
145                pluginModel.Plugin.Stop();
146                pluginModel.Plugin.PostExecution();
147            }           
148
149            IsRunning = false;
150            //Secondly stop all Gears4Net Schedulers
151            foreach (Scheduler scheduler in schedulers)
152            {
153                scheduler.Shutdown();
154            }
155             
156        }
157
158        /// <summary>
159        /// Pause the execution
160        /// </summary>
161        public void Pause()
162        {
163            //not implemented yet
164        }
165
166        /// <summary>
167        /// Use the logger of the WorkspaceManagerEditor
168        /// </summary>
169        /// <param name="message"></param>
170        /// <param name="level"></param>
171        public void GuiLogMessage(string message, NotificationLevel level)
172        {           
173            WorkspaceManagerEditor.GuiLogMessage(message, level);
174        }           
175    }
176 
177    /// <summary>
178    /// Message send to scheduler for a Plugin to trigger the Execution
179    /// </summary>
180    public class MessageExecution : MessageBase
181    {
182        public PluginModel PluginModel;
183    }
184
185    /// <summary>
186    /// A Protocol for updating the GUI in time intervals
187    /// </summary>
188    public class UpdateGuiProtocol : ProtocolBase
189    {
190        private WorkspaceModel workspaceModel;
191        private ExecutionEngine executionEngine;     
192
193        /// <summary>
194        /// Create a new protocol. Each protocol requires a scheduler which provides
195        /// a thread for execution.
196        /// </summary>
197        /// <param name="scheduler"></param>
198        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
199            : base(scheduler)
200        {
201            this.workspaceModel = workspaceModel;
202            this.executionEngine = executionEngine;           
203        }
204
205        /// <summary>
206        /// The main function of the protocol
207        /// </summary>
208        /// <param name="stateMachine"></param>
209        /// <returns></returns>
210        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
211        {
212            while (this.executionEngine.IsRunning)
213            {
214                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
215            }
216        }
217
218        /// <summary>
219        /// Handler function for a message.
220        /// This handler must not block, because it executes inside the thread of the scheduler.
221        /// </summary>
222        /// <param name="msg"></param>
223        private void HandleUpdateGui()
224        {
225            //Get the gui Thread
226            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
227            {
228                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
229                {
230                    if (pluginModel.GuiNeedsUpdate)
231                    {
232                        pluginModel.GuiNeedsUpdate = false;
233                        pluginModel.paint();
234                        if (pluginModel.UpdateableView != null)
235                        {
236                            pluginModel.UpdateableView.update();
237                        }
238                    }
239                }
240                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
241                {
242                    if (connectionModel.GuiNeedsUpdate)
243                    {
244                        if (connectionModel.UpdateableView != null)
245                        {
246                            connectionModel.UpdateableView.update();
247                        }
248                    }
249                }
250            }
251            , null);
252        }
253    }
254   
255    /// <summary>
256    /// A Protocol for benchmarking
257    /// </summary>
258    public class BenchmarkProtocol : ProtocolBase
259    {
260        private WorkspaceModel workspaceModel;
261        private ExecutionEngine executionEngine;
262
263        /// <summary>
264        /// Create a new protocol. Each protocol requires a scheduler which provides
265        /// a thread for execution.
266        /// </summary>
267        /// <param name="scheduler"></param>
268        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
269            : base(scheduler)
270        {
271            this.workspaceModel = workspaceModel;
272            this.executionEngine = executionEngine;
273        }
274
275        /// <summary>
276        /// The main function of the protocol
277        /// </summary>
278        /// <param name="stateMachine"></param>
279        /// <returns></returns>
280        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
281        {
282            while (this.executionEngine.IsRunning)
283            {
284                yield return Timeout(1000, HandleBenchmark);
285            }
286        }
287
288        /// <summary>
289        /// Handler function for a message.
290        /// This handler must not block, because it executes inside the thread of the scheduler.
291        /// </summary>
292        /// <param name="msg"></param>
293        private void HandleBenchmark()
294        {
295            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
296            this.executionEngine.ExecutedPluginsCounter = 0;
297        }
298
299    }
300
301    /// <summary>
302    /// A Protocol for a PluginModel
303    /// </summary>
304    public class PluginProtocol : ProtocolBase
305    {       
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, HandleExecute);
331            }
332        }
333
334        /// <summary>
335        /// Handle an execution of a plugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340            // 1. Check if Plugin may Execute
341            if (!mayExecute(PluginModel))
342            {
343                return;
344            }
345
346            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
347            if (!fillInputs())
348            {
349                return;
350            }
351
352            //3. Execute the Plugin -> call the IPlugin.Execute()
353            try
354            {
355                PluginModel.Plugin.Execute();
356            }
357            catch (Exception ex)
358            {
359                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
360                this.PluginModel.State = PluginModelState.Error;
361                this.PluginModel.GuiNeedsUpdate = true;
362                return;
363            }
364
365            //4. Count for the benchmark
366            if (this.executionEngine.BenchmarkPlugins)
367            {
368                this.executionEngine.ExecutedPluginsCounter++;
369            }
370
371            //5. If the user wants to, sleep some time
372            if (this.executionEngine.SleepTime > 0)
373            {
374                Thread.Sleep(this.executionEngine.SleepTime);
375            }
376
377            //6. Clear all used inputs
378            clearInputs();
379
380            //7. Send execute messages to possible executable next plugins
381            runNextPlugins();
382        }
383
384        /// <summary>
385        /// Send execute messages to possible executable next plugins
386        /// </summary>
387        private void runNextPlugins()
388        {           
389            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
390            {
391                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
392                {
393                    if (!connectionModel.From.PluginModel.Startable ||
394                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
395                    {
396                        if (mayExecute(connectionModel.From.PluginModel))
397                        {
398                            MessageExecution message_exec = new MessageExecution();
399                            message_exec.PluginModel = connectionModel.From.PluginModel;
400                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
401                        }
402                    }
403                }
404            }
405        }
406     
407        /// <summary>
408        /// Delete all input data of inputs of the plugin
409        /// </summary>
410        private void clearInputs()
411        {
412            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
413            {
414                if (connectorModel.HasData)
415                {
416                    connectorModel.Data = null;
417                    connectorModel.HasData = false;
418                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
419                    {
420                        connectionModel.Active = false;
421                        connectorModel.GuiNeedsUpdate = true;
422                    }
423                }
424            }
425        }
426
427        /// <summary>
428        /// Fill all inputs of the plugin
429        /// </summary>
430        /// <returns></returns>
431        private bool fillInputs()
432        {
433            //Fill the plugins inputs with data
434            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
435            {
436                try
437                {
438                    if (connectorModel.HasData && connectorModel.Data.value != null)
439                    {
440                        if (connectorModel.IsDynamic)
441                        {
442                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
443                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
444                        }
445                        else
446                        {
447                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
448                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
449                        }
450                    }
451                }
452                catch (Exception ex)
453                {
454                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
455                    this.PluginModel.State = PluginModelState.Error;
456                    this.PluginModel.GuiNeedsUpdate = true;
457                    return false;
458                }
459            }
460            return true;
461        }
462
463        /// <summary>
464        /// Check if the PluginModel may execute
465        /// </summary>
466        /// <param name="pluginModel"></param>
467        /// <returns></returns>
468        private bool mayExecute(PluginModel pluginModel)
469        {
470            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
471            {
472                return false;
473            }
474
475            //Check if all necessary inputs are set
476            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
477            {
478                if (!connectorModel.IControl &&
479                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
480                    connectorModel.Data == null))
481                {
482                    return false;
483                }
484            }
485
486            //Check if all outputs are free
487            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
488            {
489                if (!connectorModel.IControl)
490                {
491                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
492                    {
493                        if (connectionModel.To.HasData)
494                        {
495                            return false;
496                        }
497                    }
498                }
499            }
500            return true;
501        }
502    }
503
504    /// <summary>
505    /// Gears4Net Scheduler
506    /// </summary>
507    public class WorkspaceManagerScheduler : Scheduler
508    {
509        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
510        private bool shutdown = false;
511        private System.Threading.Thread thread;
512        private Context currentContext;
513
514                public WorkspaceManagerScheduler() : this(String.Empty)
515                {
516
517                }
518
519        public WorkspaceManagerScheduler(string name)
520            : base()
521        {
522            this.currentContext = Thread.CurrentContext;
523
524            thread = new System.Threading.Thread(this.Start);
525            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
526                        thread.Name = name;
527           
528        }
529
530        public void startScheduler()
531        {
532            thread.Start();
533        }
534
535        private void Start()
536        {
537            if (this.currentContext != Thread.CurrentContext)
538                this.currentContext.DoCallBack(Start);
539
540            // Loop forever
541            while (true)
542            {
543                this.wakeup.WaitOne();
544
545                // Loop while there are more protocols waiting
546                while (true)
547                {
548                    // Should the scheduler stop?
549                    if (this.shutdown)
550                        return;
551                   
552                    ProtocolBase protocol = null;
553                    lock (this)
554                    {
555                        // No more protocols? -> Wait
556                        if (this.waitingProtocols.Count == 0)
557                            break;
558                    }
559
560                    protocol = this.waitingProtocols.Dequeue();
561                    ProtocolStatus status = protocol.Run();
562
563                    lock (this)
564                    {
565                        switch (status)
566                        {
567                            case ProtocolStatus.Created:
568                                System.Diagnostics.Debug.Assert(false);
569                                break;
570                            case ProtocolStatus.Ready:
571                                this.waitingProtocols.Enqueue(protocol);
572                                break;
573                            case ProtocolStatus.Waiting:
574                                break;
575                            case ProtocolStatus.Terminated:
576                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
577                                this.RemoveProtocol(protocol);
578                                break;
579                        }
580                    }
581                   
582                }
583            }
584        }
585
586        /// <summary>
587        /// Removes a protocol from the internal queue
588        /// </summary>
589        /// <param name="protocol"></param>
590        public override void RemoveProtocol(ProtocolBase protocol)
591        {
592            lock (this)
593            {
594                this.protocols.Remove(protocol);
595                if (this.protocols.Count == 0)
596                    this.Shutdown();
597            }
598        }
599
600        /// <summary>
601        /// Adds a protocol to the internal queue
602        /// </summary>
603        /// <param name="protocol"></param>
604        public override void AddProtocol(ProtocolBase protocol)
605        {
606            lock (this)
607            {
608                this.protocols.Add(protocol);
609            }
610        }
611
612        /// <summary>
613        /// Wakeup this scheduler
614        /// </summary>
615        /// <param name="protocol"></param>
616        public override void Wakeup(ProtocolBase protocol)
617        {
618            lock (this)
619            {
620                if (!this.waitingProtocols.Contains(protocol))
621                    this.waitingProtocols.Enqueue(protocol);
622                this.wakeup.Set();
623            }
624        }
625
626        /// <summary>
627        /// Terminates the scheduler
628        /// </summary>
629        public override void Shutdown()
630        {
631            this.shutdown = true;
632            this.wakeup.Set();
633        }
634    }
635}
Note: See TracBrowser for help on using the repository browser.