source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1806

Last change on this file since 1806 was 1806, checked in by kopal, 11 years ago

execution exceptions will now be caught

File size: 25.0 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49        public int SleepTime { get; set; }
50
51        /// <summary>
52        /// Creates a new ExecutionEngine
53        /// </summary>
54        /// <param name="workspaceManagerEditor"></param>
55        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
56        {
57            WorkspaceManagerEditor = workspaceManagerEditor;
58        }
59
60        /// <summary>
61        /// Is this ExecutionEngine running?
62        /// </summary>
63        public bool IsRunning
64        {
65            get;
66            private set;
67        }
68
69        /// <summary>
70        /// Execute the given Model
71        /// </summary>
72        /// <param name="workspaceModel"></param>
73        public void Execute(WorkspaceModel workspaceModel)
74        {
75            if (!IsRunning)
76            {
77                IsRunning = true;
78                this.workspaceModel = workspaceModel;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                int counter=0;
111                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
112                {
113                    pluginModel.Plugin.PreExecution();
114                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
115                    pluginModel.PluginProtocol = pluginProtocol;
116                    schedulers[counter].AddProtocol(pluginProtocol);
117                   
118                    pluginProtocol.Start();
119                    counter = (counter + 1) % (amountSchedulers);
120
121                    if (pluginModel.Startable)
122                    {
123                        MessageExecution msg = new MessageExecution();
124                        msg.PluginModel = pluginModel;
125                        pluginProtocol.BroadcastMessageReliably(msg);
126                    }
127                }
128
129                foreach (Scheduler scheduler in schedulers)
130                {
131                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
132                }
133            }
134        }     
135     
136        /// <summary>
137        /// Stop the execution process:
138        /// calls shutdown on all schedulers + calls stop() on each plugin
139        /// </summary>
140        public void Stop()
141        {
142            //First stop alle plugins
143            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
144            {
145                pluginModel.Plugin.Stop();
146                pluginModel.Plugin.PostExecution();
147            }           
148
149            IsRunning = false;
150            //Secondly stop all Gears4Net Schedulers
151            foreach (Scheduler scheduler in schedulers)
152            {
153                scheduler.Shutdown();
154            }
155             
156        }
157
158        /// <summary>
159        /// Pause the execution
160        /// </summary>
161        public void Pause()
162        {
163            //not implemented yet
164        }
165
166        /// <summary>
167        /// Use the logger of the WorkspaceManagerEditor
168        /// </summary>
169        /// <param name="message"></param>
170        /// <param name="level"></param>
171        public void GuiLogMessage(string message, NotificationLevel level)
172        {           
173            WorkspaceManagerEditor.GuiLogMessage(message, level);
174        }           
175    }
176 
177    /// <summary>
178    /// Message send to scheduler for a Plugin to trigger the Execution
179    /// </summary>
180    public class MessageExecution : MessageBase
181    {
182        public PluginModel PluginModel;
183    }
184
185    /// <summary>
186    /// A Protocol for updating the GUI in time intervals
187    /// </summary>
188    public class UpdateGuiProtocol : ProtocolBase
189    {
190        private WorkspaceModel workspaceModel;
191        private ExecutionEngine executionEngine;     
192
193        /// <summary>
194        /// Create a new protocol. Each protocol requires a scheduler which provides
195        /// a thread for execution.
196        /// </summary>
197        /// <param name="scheduler"></param>
198        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
199            : base(scheduler)
200        {
201            this.workspaceModel = workspaceModel;
202            this.executionEngine = executionEngine;           
203        }
204
205        /// <summary>
206        /// The main function of the protocol
207        /// </summary>
208        /// <param name="stateMachine"></param>
209        /// <returns></returns>
210        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
211        {
212            while (this.executionEngine.IsRunning)
213            {
214                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
215            }
216        }
217
218        /// <summary>
219        /// Handler function for a message.
220        /// This handler must not block, because it executes inside the thread of the scheduler.
221        /// </summary>
222        /// <param name="msg"></param>
223        private void HandleUpdateGui()
224        {
225            //Get the gui Thread
226            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
227            {
228                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
229                {
230                    if (pluginModel.GuiNeedsUpdate)
231                    {
232                        pluginModel.GuiNeedsUpdate = false;
233                        pluginModel.paint();
234                        if (pluginModel.UpdateableView != null)
235                        {
236                            pluginModel.UpdateableView.update();
237                        }
238                    }
239                }
240                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
241                {
242                    if (connectionModel.GuiNeedsUpdate)
243                    {
244                        if (connectionModel.UpdateableView != null)
245                        {
246                            connectionModel.UpdateableView.update();
247                        }
248                    }
249                }
250            }
251            , null);
252        }
253    }
254   
255    /// <summary>
256    /// A Protocol for benchmarking
257    /// </summary>
258    public class BenchmarkProtocol : ProtocolBase
259    {
260        private WorkspaceModel workspaceModel;
261        private ExecutionEngine executionEngine;
262
263        /// <summary>
264        /// Create a new protocol. Each protocol requires a scheduler which provides
265        /// a thread for execution.
266        /// </summary>
267        /// <param name="scheduler"></param>
268        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
269            : base(scheduler)
270        {
271            this.workspaceModel = workspaceModel;
272            this.executionEngine = executionEngine;
273        }
274
275        /// <summary>
276        /// The main function of the protocol
277        /// </summary>
278        /// <param name="stateMachine"></param>
279        /// <returns></returns>
280        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
281        {
282            while (this.executionEngine.IsRunning)
283            {
284                yield return Timeout(1000, HandleBenchmark);
285            }
286        }
287
288        /// <summary>
289        /// Handler function for a message.
290        /// This handler must not block, because it executes inside the thread of the scheduler.
291        /// </summary>
292        /// <param name="msg"></param>
293        private void HandleBenchmark()
294        {
295            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
296            this.executionEngine.ExecutedPluginsCounter = 0;
297        }
298
299    }
300
301    /// <summary>
302    /// A Protocol for a PluginModel
303    /// </summary>
304    public class PluginProtocol : ProtocolBase
305    {       
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, HandleExecute);
331            }
332        }
333
334        /// <summary>
335        /// Handle an execution of a plugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340            // 1. Check if Plugin may Execute
341            if (!mayExecute(PluginModel))
342            {
343                return;
344            }
345
346            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
347            if (!fillInputs())
348            {
349                return;
350            }
351
352            //3. Execute the Plugin -> call the IPlugin.Execute()
353            try
354            {
355                PluginModel.Plugin.Execute();
356            }
357            catch (Exception ex)
358            {
359                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
360                this.PluginModel.State = PluginModelState.Error;
361                this.PluginModel.GuiNeedsUpdate = true;
362                return;
363            }
364
365            //4. Count for the benchmark
366            if (this.executionEngine.BenchmarkPlugins)
367            {
368                this.executionEngine.ExecutedPluginsCounter++;
369            }
370
371            //5. If the user wants to, sleep some time
372            if (this.executionEngine.SleepTime > 0)
373            {
374                Thread.Sleep(this.executionEngine.SleepTime);
375            }
376
377            //6. Clear all used inputs
378            clearInputs();
379
380            //7. Fill all outputs of our plugin
381            fillOutputs();
382
383            //8. Send start messages to possible executable next plugins
384            runNextPlugins();
385        }
386
387        /// <summary>
388        /// Send execute messages to possible executable next plugins
389        /// </summary>
390        private void runNextPlugins()
391        {
392            foreach (ConnectorModel connectorModel in PluginModel.OutputConnectors)
393            {
394                foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
395                {
396                    if (mayExecute(connectionModel.To.PluginModel))
397                    {
398                        MessageExecution msg_exce = new MessageExecution();
399                        msg_exce.PluginModel = connectionModel.To.PluginModel;
400                        connectionModel.To.PluginModel.PluginProtocol.BroadcastMessage(msg_exce);
401                    }
402                }
403            }
404
405            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
406            {
407                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
408                {
409                    if (!connectionModel.From.PluginModel.Startable ||
410                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
411                    {
412                        if (mayExecute(connectionModel.From.PluginModel))
413                        {
414                            MessageExecution message_exec = new MessageExecution();
415                            message_exec.PluginModel = connectionModel.From.PluginModel;
416                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
417                        }
418                    }
419                }
420            }
421        }
422
423        /// <summary>
424        /// Fill all outputs of the plugin
425        /// </summary>
426        private void fillOutputs()
427        {
428            foreach (ConnectorModel connectorModel in PluginModel.OutputConnectors)
429            {
430                object data;
431
432                if (connectorModel.IsDynamic)
433                {
434                    data = connectorModel.PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicGetterName).Invoke(connectorModel.PluginModel.Plugin, new object[] { connectorModel.PropertyName });
435                }
436                else
437                {
438                    data = connectorModel.PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName).GetValue(connectorModel.PluginModel.Plugin, null);
439                }
440
441                if (data != null)
442                {
443                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
444                    {
445                        Data Data = new Data();
446                        Data.value = data;
447                        connectionModel.To.Data = Data;
448                        connectionModel.To.HasData = true;
449                        connectionModel.Active = true;
450                        connectionModel.GuiNeedsUpdate = true;
451                        connectorModel.Data = Data;
452                    }
453                }
454            }
455        }
456
457        /// <summary>
458        /// Delete all input data of inputs of the plugin
459        /// </summary>
460        private void clearInputs()
461        {
462            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
463            {
464                if (connectorModel.HasData)
465                {
466                    connectorModel.Data = null;
467                    connectorModel.HasData = false;
468                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
469                    {
470                        connectionModel.Active = false;
471                        connectorModel.GuiNeedsUpdate = true;
472                    }
473                }
474            }
475        }
476
477        /// <summary>
478        /// Fill all inputs of the plugin
479        /// </summary>
480        /// <returns></returns>
481        private bool fillInputs()
482        {
483            //Fill the plugins inputs with data
484            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
485            {
486                try
487                {
488                    if (connectorModel.HasData && connectorModel.Data.value != null)
489                    {
490                        if (connectorModel.IsDynamic)
491                        {
492                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
493                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
494                        }
495                        else
496                        {
497                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
498                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
499                        }
500                    }
501                }
502                catch (Exception ex)
503                {
504                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
505                    this.PluginModel.State = PluginModelState.Error;
506                    this.PluginModel.GuiNeedsUpdate = true;
507                    return false;
508                }
509            }
510            return true;
511        }
512
513        /// <summary>
514        /// Check if the PluginModel may execute
515        /// </summary>
516        /// <param name="pluginModel"></param>
517        /// <returns></returns>
518        private bool mayExecute(PluginModel pluginModel)
519        {
520            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
521            {
522                return false;
523            }
524
525            //Check if all necessary inputs are set
526            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
527            {
528                if (!connectorModel.IControl &&
529                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
530                    connectorModel.Data == null))
531                {
532                    return false;
533                }
534            }
535
536            //Check if all outputs are free
537            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
538            {
539                if (!connectorModel.IControl)
540                {
541                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
542                    {
543                        if (connectionModel.To.HasData)
544                        {
545                            return false;
546                        }
547                    }
548                }
549            }
550            return true;
551        }
552    }
553
554    /// <summary>
555    /// Gears4Net Scheduler
556    /// </summary>
557    public class WorkspaceManagerScheduler : Scheduler
558    {
559        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
560        private bool shutdown = false;
561        private System.Threading.Thread thread;
562        private Context currentContext;
563
564                public WorkspaceManagerScheduler() : this(String.Empty)
565                {
566
567                }
568
569        public WorkspaceManagerScheduler(string name)
570            : base()
571        {
572            this.currentContext = Thread.CurrentContext;
573
574            thread = new System.Threading.Thread(this.Start);
575            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
576                        thread.Name = name;
577           
578        }
579
580        public void startScheduler()
581        {
582            thread.Start();
583        }
584
585        private void Start()
586        {
587            if (this.currentContext != Thread.CurrentContext)
588                this.currentContext.DoCallBack(Start);
589
590            // Loop forever
591            while (true)
592            {
593                this.wakeup.WaitOne();
594
595                // Loop while there are more protocols waiting
596                while (true)
597                {
598                    // Should the scheduler stop?
599                    if (this.shutdown)
600                        return;
601                   
602                    ProtocolBase protocol = null;
603                    lock (this)
604                    {
605                        // No more protocols? -> Wait
606                        if (this.waitingProtocols.Count == 0)
607                            break;
608                    }
609
610                    protocol = this.waitingProtocols.Dequeue();
611                    ProtocolStatus status = protocol.Run();
612
613                    lock (this)
614                    {
615                        switch (status)
616                        {
617                            case ProtocolStatus.Created:
618                                System.Diagnostics.Debug.Assert(false);
619                                break;
620                            case ProtocolStatus.Ready:
621                                this.waitingProtocols.Enqueue(protocol);
622                                break;
623                            case ProtocolStatus.Waiting:
624                                break;
625                            case ProtocolStatus.Terminated:
626                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
627                                this.RemoveProtocol(protocol);
628                                break;
629                        }
630                    }
631                   
632                }
633            }
634        }
635
636        /// <summary>
637        /// Removes a protocol from the internal queue
638        /// </summary>
639        /// <param name="protocol"></param>
640        public override void RemoveProtocol(ProtocolBase protocol)
641        {
642            lock (this)
643            {
644                this.protocols.Remove(protocol);
645                if (this.protocols.Count == 0)
646                    this.Shutdown();
647            }
648        }
649
650        /// <summary>
651        /// Adds a protocol to the internal queue
652        /// </summary>
653        /// <param name="protocol"></param>
654        public override void AddProtocol(ProtocolBase protocol)
655        {
656            lock (this)
657            {
658                this.protocols.Add(protocol);
659            }
660        }
661
662        /// <summary>
663        /// Wakeup this scheduler
664        /// </summary>
665        /// <param name="protocol"></param>
666        public override void Wakeup(ProtocolBase protocol)
667        {
668            lock (this)
669            {
670                if (!this.waitingProtocols.Contains(protocol))
671                    this.waitingProtocols.Enqueue(protocol);
672                this.wakeup.Set();
673            }
674        }
675
676        /// <summary>
677        /// Terminates the scheduler
678        /// </summary>
679        public override void Shutdown()
680        {
681            this.shutdown = true;
682            this.wakeup.Set();
683        }
684    }
685}
Note: See TracBrowser for help on using the repository browser.