source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1805

Last change on this file since 1805 was 1805, checked in by kopal, 11 years ago
  • added SleepTime Settings
  • structured the ExecutionEngine for better overview
File size: 24.6 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49        public int SleepTime { get; set; }
50
51        /// <summary>
52        /// Creates a new ExecutionEngine
53        /// </summary>
54        /// <param name="workspaceManagerEditor"></param>
55        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
56        {
57            WorkspaceManagerEditor = workspaceManagerEditor;
58        }
59
60        /// <summary>
61        /// Is this ExecutionEngine running?
62        /// </summary>
63        public bool IsRunning
64        {
65            get;
66            private set;
67        }
68
69        /// <summary>
70        /// Execute the given Model
71        /// </summary>
72        /// <param name="workspaceModel"></param>
73        public void Execute(WorkspaceModel workspaceModel)
74        {
75            if (!IsRunning)
76            {
77                IsRunning = true;
78                this.workspaceModel = workspaceModel;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                int counter=0;
111                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
112                {
113                    pluginModel.Plugin.PreExecution();
114                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
115                    pluginModel.PluginProtocol = pluginProtocol;
116                    schedulers[counter].AddProtocol(pluginProtocol);
117                   
118                    pluginProtocol.Start();
119                    counter = (counter + 1) % (amountSchedulers);
120
121                    if (pluginModel.Startable)
122                    {
123                        MessageExecution msg = new MessageExecution();
124                        msg.PluginModel = pluginModel;
125                        pluginProtocol.BroadcastMessageReliably(msg);
126                    }
127                }
128
129                foreach (Scheduler scheduler in schedulers)
130                {
131                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
132                }
133            }
134        }     
135     
136        /// <summary>
137        /// Stop the execution process:
138        /// calls shutdown on all schedulers + calls stop() on each plugin
139        /// </summary>
140        public void Stop()
141        {
142            //First stop alle plugins
143            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
144            {
145                pluginModel.Plugin.Stop();
146                pluginModel.Plugin.PostExecution();
147            }           
148
149            IsRunning = false;
150            //Secondly stop all Gears4Net Schedulers
151            foreach (Scheduler scheduler in schedulers)
152            {
153                scheduler.Shutdown();
154            }
155             
156        }
157
158        /// <summary>
159        /// Pause the execution
160        /// </summary>
161        public void Pause()
162        {
163            //not implemented yet
164        }
165
166        /// <summary>
167        /// Use the logger of the WorkspaceManagerEditor
168        /// </summary>
169        /// <param name="message"></param>
170        /// <param name="level"></param>
171        public void GuiLogMessage(string message, NotificationLevel level)
172        {           
173            WorkspaceManagerEditor.GuiLogMessage(message, level);
174        }           
175    }
176 
177    /// <summary>
178    /// Message send to scheduler for a Plugin to trigger the Execution
179    /// </summary>
180    public class MessageExecution : MessageBase
181    {
182        public PluginModel PluginModel;
183    }
184
185    /// <summary>
186    /// A Protocol for updating the GUI in time intervals
187    /// </summary>
188    public class UpdateGuiProtocol : ProtocolBase
189    {
190        private WorkspaceModel workspaceModel;
191        private ExecutionEngine executionEngine;     
192
193        /// <summary>
194        /// Create a new protocol. Each protocol requires a scheduler which provides
195        /// a thread for execution.
196        /// </summary>
197        /// <param name="scheduler"></param>
198        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
199            : base(scheduler)
200        {
201            this.workspaceModel = workspaceModel;
202            this.executionEngine = executionEngine;           
203        }
204
205        /// <summary>
206        /// The main function of the protocol
207        /// </summary>
208        /// <param name="stateMachine"></param>
209        /// <returns></returns>
210        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
211        {
212            while (this.executionEngine.IsRunning)
213            {
214                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
215            }
216        }
217
218        /// <summary>
219        /// Handler function for a message.
220        /// This handler must not block, because it executes inside the thread of the scheduler.
221        /// </summary>
222        /// <param name="msg"></param>
223        private void HandleUpdateGui()
224        {
225            //Get the gui Thread
226            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
227            {
228                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
229                {
230                    if (pluginModel.GuiNeedsUpdate)
231                    {
232                        pluginModel.GuiNeedsUpdate = false;
233                        pluginModel.paint();
234                        if (pluginModel.UpdateableView != null)
235                        {
236                            pluginModel.UpdateableView.update();
237                        }
238                    }
239                }
240                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
241                {
242                    if (connectionModel.GuiNeedsUpdate)
243                    {
244                        if (connectionModel.UpdateableView != null)
245                        {
246                            connectionModel.UpdateableView.update();
247                        }
248                    }
249                }
250            }
251            , null);
252        }
253    }
254   
255    /// <summary>
256    /// A Protocol for benchmarking
257    /// </summary>
258    public class BenchmarkProtocol : ProtocolBase
259    {
260        private WorkspaceModel workspaceModel;
261        private ExecutionEngine executionEngine;
262
263        /// <summary>
264        /// Create a new protocol. Each protocol requires a scheduler which provides
265        /// a thread for execution.
266        /// </summary>
267        /// <param name="scheduler"></param>
268        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
269            : base(scheduler)
270        {
271            this.workspaceModel = workspaceModel;
272            this.executionEngine = executionEngine;
273        }
274
275        /// <summary>
276        /// The main function of the protocol
277        /// </summary>
278        /// <param name="stateMachine"></param>
279        /// <returns></returns>
280        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
281        {
282            while (this.executionEngine.IsRunning)
283            {
284                yield return Timeout(1000, HandleBenchmark);
285            }
286        }
287
288        /// <summary>
289        /// Handler function for a message.
290        /// This handler must not block, because it executes inside the thread of the scheduler.
291        /// </summary>
292        /// <param name="msg"></param>
293        private void HandleBenchmark()
294        {
295            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
296            this.executionEngine.ExecutedPluginsCounter = 0;
297        }
298
299    }
300
301    /// <summary>
302    /// A Protocol for a PluginModel
303    /// </summary>
304    public class PluginProtocol : ProtocolBase
305    {       
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, HandleExecute);
331            }
332        }
333
334        /// <summary>
335        /// Handle an execution of a plugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340            // 1. Check if Plugin may Execute
341            if (!mayExecute(PluginModel))
342            {
343                return;
344            }
345
346            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
347            if (!fillInputs())
348            {
349                return;
350            }
351
352            //3. Execute the Plugin -> call the IPlugin.Execute()
353            PluginModel.Plugin.Execute();
354
355            //4. Count for the benchmark
356            if (this.executionEngine.BenchmarkPlugins)
357            {
358                this.executionEngine.ExecutedPluginsCounter++;
359            }
360
361            //5. If the user wants to, sleep some time
362            if (this.executionEngine.SleepTime > 0)
363            {
364                Thread.Sleep(this.executionEngine.SleepTime);
365            }
366
367            //6. Clear all used inputs
368            clearInputs();
369
370            //7. Fill all outputs of our plugin
371            fillOutputs();
372
373            //8. Send start messages to possible executable next plugins
374            runNextPlugins();
375        }
376
377        /// <summary>
378        /// Send execute messages to possible executable next plugins
379        /// </summary>
380        private void runNextPlugins()
381        {
382            foreach (ConnectorModel connectorModel in PluginModel.OutputConnectors)
383            {
384                foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
385                {
386                    if (mayExecute(connectionModel.To.PluginModel))
387                    {
388                        MessageExecution msg_exce = new MessageExecution();
389                        msg_exce.PluginModel = connectionModel.To.PluginModel;
390                        connectionModel.To.PluginModel.PluginProtocol.BroadcastMessage(msg_exce);
391                    }
392                }
393            }
394
395            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
396            {
397                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
398                {
399                    if (!connectionModel.From.PluginModel.Startable ||
400                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
401                    {
402                        if (mayExecute(connectionModel.From.PluginModel))
403                        {
404                            MessageExecution message_exec = new MessageExecution();
405                            message_exec.PluginModel = connectionModel.From.PluginModel;
406                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
407                        }
408                    }
409                }
410            }
411        }
412
413        /// <summary>
414        /// Fill all outputs of the plugin
415        /// </summary>
416        private void fillOutputs()
417        {
418            foreach (ConnectorModel connectorModel in PluginModel.OutputConnectors)
419            {
420                object data;
421
422                if (connectorModel.IsDynamic)
423                {
424                    data = connectorModel.PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicGetterName).Invoke(connectorModel.PluginModel.Plugin, new object[] { connectorModel.PropertyName });
425                }
426                else
427                {
428                    data = connectorModel.PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName).GetValue(connectorModel.PluginModel.Plugin, null);
429                }
430
431                if (data != null)
432                {
433                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
434                    {
435                        Data Data = new Data();
436                        Data.value = data;
437                        connectionModel.To.Data = Data;
438                        connectionModel.To.HasData = true;
439                        connectionModel.Active = true;
440                        connectionModel.GuiNeedsUpdate = true;
441                        connectorModel.Data = Data;
442                    }
443                }
444            }
445        }
446
447        /// <summary>
448        /// Delete all input data of inputs of the plugin
449        /// </summary>
450        private void clearInputs()
451        {
452            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
453            {
454                if (connectorModel.HasData)
455                {
456                    connectorModel.Data = null;
457                    connectorModel.HasData = false;
458                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
459                    {
460                        connectionModel.Active = false;
461                        connectorModel.GuiNeedsUpdate = true;
462                    }
463                }
464            }
465        }
466
467        /// <summary>
468        /// Fill all inputs of the plugin
469        /// </summary>
470        /// <returns></returns>
471        private bool fillInputs()
472        {
473            //Fill the plugins inputs with data
474            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
475            {
476                try
477                {
478                    if (connectorModel.HasData && connectorModel.Data.value != null)
479                    {
480                        if (connectorModel.IsDynamic)
481                        {
482                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
483                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
484                        }
485                        else
486                        {
487                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
488                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
489                        }
490                    }
491                }
492                catch (Exception ex)
493                {
494                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
495                    this.PluginModel.State = PluginModelState.Error;
496                    this.PluginModel.GuiNeedsUpdate = true;
497                    return false;
498                }
499            }
500            return true;
501        }
502
503        /// <summary>
504        /// Check if the PluginModel may execute
505        /// </summary>
506        /// <param name="pluginModel"></param>
507        /// <returns></returns>
508        private bool mayExecute(PluginModel pluginModel)
509        {
510            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
511            {
512                return false;
513            }
514
515            //Check if all necessary inputs are set
516            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
517            {
518                if (!connectorModel.IControl &&
519                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
520                    connectorModel.Data == null))
521                {
522                    return false;
523                }
524            }
525
526            //Check if all outputs are free
527            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
528            {
529                if (!connectorModel.IControl)
530                {
531                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
532                    {
533                        if (connectionModel.To.HasData)
534                        {
535                            return false;
536                        }
537                    }
538                }
539            }
540            return true;
541        }
542    }
543
544    /// <summary>
545    /// Gears4Net Scheduler
546    /// </summary>
547    public class WorkspaceManagerScheduler : Scheduler
548    {
549        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
550        private bool shutdown = false;
551        private System.Threading.Thread thread;
552        private Context currentContext;
553
554                public WorkspaceManagerScheduler() : this(String.Empty)
555                {
556
557                }
558
559        public WorkspaceManagerScheduler(string name)
560            : base()
561        {
562            this.currentContext = Thread.CurrentContext;
563
564            thread = new System.Threading.Thread(this.Start);
565            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
566                        thread.Name = name;
567           
568        }
569
570        public void startScheduler()
571        {
572            thread.Start();
573        }
574
575        private void Start()
576        {
577            if (this.currentContext != Thread.CurrentContext)
578                this.currentContext.DoCallBack(Start);
579
580            // Loop forever
581            while (true)
582            {
583                this.wakeup.WaitOne();
584
585                // Loop while there are more protocols waiting
586                while (true)
587                {
588                    // Should the scheduler stop?
589                    if (this.shutdown)
590                        return;
591                   
592                    ProtocolBase protocol = null;
593                    lock (this)
594                    {
595                        // No more protocols? -> Wait
596                        if (this.waitingProtocols.Count == 0)
597                            break;
598                    }
599
600                    protocol = this.waitingProtocols.Dequeue();
601                    ProtocolStatus status = protocol.Run();
602
603                    lock (this)
604                    {
605                        switch (status)
606                        {
607                            case ProtocolStatus.Created:
608                                System.Diagnostics.Debug.Assert(false);
609                                break;
610                            case ProtocolStatus.Ready:
611                                this.waitingProtocols.Enqueue(protocol);
612                                break;
613                            case ProtocolStatus.Waiting:
614                                break;
615                            case ProtocolStatus.Terminated:
616                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
617                                this.RemoveProtocol(protocol);
618                                break;
619                        }
620                    }
621                   
622                }
623            }
624        }
625
626        /// <summary>
627        /// Removes a protocol from the internal queue
628        /// </summary>
629        /// <param name="protocol"></param>
630        public override void RemoveProtocol(ProtocolBase protocol)
631        {
632            lock (this)
633            {
634                this.protocols.Remove(protocol);
635                if (this.protocols.Count == 0)
636                    this.Shutdown();
637            }
638        }
639
640        /// <summary>
641        /// Adds a protocol to the internal queue
642        /// </summary>
643        /// <param name="protocol"></param>
644        public override void AddProtocol(ProtocolBase protocol)
645        {
646            lock (this)
647            {
648                this.protocols.Add(protocol);
649            }
650        }
651
652        /// <summary>
653        /// Wakeup this scheduler
654        /// </summary>
655        /// <param name="protocol"></param>
656        public override void Wakeup(ProtocolBase protocol)
657        {
658            lock (this)
659            {
660                if (!this.waitingProtocols.Contains(protocol))
661                    this.waitingProtocols.Enqueue(protocol);
662                this.wakeup.Set();
663            }
664        }
665
666        /// <summary>
667        /// Terminates the scheduler
668        /// </summary>
669        public override void Shutdown()
670        {
671            this.shutdown = true;
672            this.wakeup.Set();
673        }
674    }
675}
Note: See TracBrowser for help on using the repository browser.