source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1846

Last change on this file since 1846 was 1846, checked in by kopal, 11 years ago

schedulers now log startups and endings

File size: 25.5 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45        private volatile bool isRunning = false;
46
47        public long ExecutedPluginsCounter { get; set; }
48        public bool BenchmarkPlugins { get; set; }
49        public long GuiUpdateInterval { get; set; }
50        public int SleepTime { get; set; }
51
52        /// <summary>
53        /// Creates a new ExecutionEngine
54        /// </summary>
55        /// <param name="workspaceManagerEditor"></param>
56        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
57        {
58            WorkspaceManagerEditor = workspaceManagerEditor;
59        }
60
61        /// <summary>
62        /// Is this ExecutionEngine running?
63        /// </summary>
64        public bool IsRunning
65        {
66            get{return this.isRunning;}
67            private set{this.isRunning = value;}
68        }
69
70        /// <summary>
71        /// Execute the given Model
72        /// </summary>
73        /// <param name="workspaceModel"></param>
74        public void Execute(WorkspaceModel workspaceModel)
75        {
76            if (!IsRunning)
77            {
78                IsRunning = true;
79                this.workspaceModel = workspaceModel;
80                int amountSchedulers = System.Environment.ProcessorCount * 2;
81
82                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
83                //We do this, because measurements showed that we get the best performance if we
84                //use this amount of schedulers
85                schedulers = new Scheduler[amountSchedulers];
86                for (int i = 0; i < amountSchedulers; i++)
87                {
88                    schedulers[i] = new WorkspaceManagerScheduler("WorkspaceManagerScheduler-" + i);
89                    ((WorkspaceManagerScheduler)schedulers[i]).executionEngine = this;
90                }
91               
92                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
93                workspaceModel.resetStates();
94
95                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
96                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
97                schedulers[0].AddProtocol(updateGuiProtocol);
98                updateGuiProtocol.Start();
99
100                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
101                if (this.BenchmarkPlugins)
102                {
103                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
104                    schedulers[0].AddProtocol(benchmarkProtocol);
105                    benchmarkProtocol.Start();
106                }
107
108                //Here we create for each PluginModel an own PluginProtocol
109                //By using round-robin we give each protocol to another scheduler to gain
110                //a good average load balancing of the schedulers
111                //we also initalize each plugin
112                //It is possible that a plugin is also a PluginProtocol
113                //if that is true we do not create a new one but use the plugin instead the created one
114                int counter=0;
115                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
116                {
117                    PluginProtocol pluginProtocol = null;
118
119                    if (pluginModel.Plugin is PluginProtocol)
120                    {
121                        pluginProtocol = (PluginProtocol)pluginModel.Plugin;
122                        pluginProtocol.setExecutionEngineSettings(schedulers[counter], pluginModel, this);
123                    }
124                    else
125                    {
126                        pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel, this);
127                    }
128
129                    pluginModel.Plugin.PreExecution();                   
130                    pluginModel.PluginProtocol = pluginProtocol;
131                    schedulers[counter].AddProtocol(pluginProtocol);
132
133                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
134                    {
135                        pluginProtocol.Start();
136                    }
137
138                    counter = (counter + 1) % (amountSchedulers);
139
140                    if (pluginModel.Startable)
141                    {
142                        MessageExecution msg = new MessageExecution();
143                        msg.PluginModel = pluginModel;
144                        pluginProtocol.BroadcastMessageReliably(msg);
145                    }
146                }
147
148                foreach (Scheduler scheduler in schedulers)
149                {
150                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
151                }
152            }
153        }     
154     
155        /// <summary>
156        /// Stop the execution process:
157        /// calls shutdown on all schedulers + calls stop() on each plugin
158        /// </summary>
159        public void Stop()
160        {
161            //First stop alle plugins
162            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
163            {
164                pluginModel.Plugin.Stop();
165                pluginModel.Plugin.PostExecution();
166            }           
167
168            IsRunning = false;
169            //Secondly stop all Gears4Net Schedulers
170            foreach (Scheduler scheduler in schedulers)
171            {
172                scheduler.Shutdown();
173            }
174             
175        }
176
177        /// <summary>
178        /// Pause the execution
179        /// </summary>
180        public void Pause()
181        {
182            //not implemented yet
183        }
184
185        /// <summary>
186        /// Use the logger of the WorkspaceManagerEditor
187        /// </summary>
188        /// <param name="message"></param>
189        /// <param name="level"></param>
190        public void GuiLogMessage(string message, NotificationLevel level)
191        {           
192            WorkspaceManagerEditor.GuiLogMessage(message, level);
193        }           
194    }
195 
196    /// <summary>
197    /// Message send to scheduler for a Plugin to trigger the Execution
198    /// </summary>
199    public class MessageExecution : MessageBase
200    {
201        public PluginModel PluginModel;
202    }
203
204    /// <summary>
205    /// A Protocol for updating the GUI in time intervals
206    /// </summary>
207    public class UpdateGuiProtocol : ProtocolBase
208    {
209        private WorkspaceModel workspaceModel;
210        private ExecutionEngine executionEngine;     
211
212        /// <summary>
213        /// Create a new protocol. Each protocol requires a scheduler which provides
214        /// a thread for execution.
215        /// </summary>
216        /// <param name="scheduler"></param>
217        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
218            : base(scheduler)
219        {
220            this.workspaceModel = workspaceModel;
221            this.executionEngine = executionEngine;           
222        }
223
224        /// <summary>
225        /// The main function of the protocol
226        /// </summary>
227        /// <param name="stateMachine"></param>
228        /// <returns></returns>
229        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
230        {
231            while (this.executionEngine.IsRunning)
232            {
233                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
234            }
235        }
236
237        /// <summary>
238        /// Handler function for a message.
239        /// This handler must not block, because it executes inside the thread of the scheduler.
240        /// </summary>
241        /// <param name="msg"></param>
242        private void HandleUpdateGui()
243        {
244            //Get the gui Thread
245            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
246            {
247                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
248                {
249                    if (pluginModel.GuiNeedsUpdate)
250                    {
251                        pluginModel.GuiNeedsUpdate = false;
252                        pluginModel.paint();
253                        if (pluginModel.UpdateableView != null)
254                        {
255                            pluginModel.UpdateableView.update();
256                        }
257                    }
258                }
259                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
260                {
261                    if (connectionModel.GuiNeedsUpdate)
262                    {
263                        if (connectionModel.UpdateableView != null)
264                        {
265                            connectionModel.UpdateableView.update();
266                        }
267                    }
268                }
269            }
270            , null);
271        }
272    }
273   
274    /// <summary>
275    /// A Protocol for benchmarking
276    /// </summary>
277    public class BenchmarkProtocol : ProtocolBase
278    {
279        private WorkspaceModel workspaceModel;
280        private ExecutionEngine executionEngine;
281
282        /// <summary>
283        /// Create a new protocol. Each protocol requires a scheduler which provides
284        /// a thread for execution.
285        /// </summary>
286        /// <param name="scheduler"></param>
287        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
288            : base(scheduler)
289        {
290            this.workspaceModel = workspaceModel;
291            this.executionEngine = executionEngine;
292        }
293
294        /// <summary>
295        /// The main function of the protocol
296        /// </summary>
297        /// <param name="stateMachine"></param>
298        /// <returns></returns>
299        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
300        {
301            while (this.executionEngine.IsRunning)
302            {
303                yield return Timeout(1000, HandleBenchmark);
304            }
305        }
306
307        /// <summary>
308        /// Handler function for a message.
309        /// This handler must not block, because it executes inside the thread of the scheduler.
310        /// </summary>
311        /// <param name="msg"></param>
312        private void HandleBenchmark()
313        {
314            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
315            this.executionEngine.ExecutedPluginsCounter = 0;
316        }
317
318    }
319
320    /// <summary>
321    /// A Protocol for a PluginModel
322    /// </summary>
323    public class PluginProtocol : ProtocolBase
324    {       
325        public PluginModel PluginModel;
326        private ExecutionEngine executionEngine;
327
328        /// <summary>
329        /// Create a new protocol. Each protocol requires a scheduler which provides
330        /// a thread for execution.
331        /// </summary>
332        /// <param name="scheduler"></param>
333        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
334            : base(scheduler)
335        {
336            this.PluginModel = pluginModel;
337            this.executionEngine = executionEngine;
338        }
339
340        /// <summary>
341        ///
342        /// </summary>
343        /// <param name="scheduler"></param>
344        /// <param name="pluginModel"></param>
345        /// <param name="executionEngine"></param>
346        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
347        {
348            this.Scheduler = scheduler;
349            this.PluginModel = pluginModel;
350            this.executionEngine = executionEngine;
351        }
352
353
354        /// <summary>
355        /// The main function of the protocol     
356        /// </summary>
357        /// <param name="stateMachine"></param>
358        /// <returns></returns>
359        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
360        {
361            while (this.executionEngine.IsRunning)
362            {
363                yield return Receive<MessageExecution>(null, HandleExecute);
364            }
365        }
366
367        /// <summary>
368        /// Handle an execution of a plugin
369        /// </summary>
370        /// <param name="msg"></param>
371        private void HandleExecute(MessageExecution msg)
372        {
373            // 1. Check if Plugin may Execute
374            if (!mayExecute(PluginModel))
375            {
376                return;
377            }
378
379            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
380            if (!fillInputs())
381            {
382                return;
383            }
384
385            //3. Execute the Plugin -> call the IPlugin.Execute()
386            try
387            {
388                PluginModel.Plugin.Execute();
389            }
390            catch (Exception ex)
391            {
392                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
393                this.PluginModel.State = PluginModelState.Error;
394                this.PluginModel.GuiNeedsUpdate = true;
395                return;
396            }
397
398            //4. Count for the benchmark
399            if (this.executionEngine.BenchmarkPlugins)
400            {
401                this.executionEngine.ExecutedPluginsCounter++;
402            }
403
404            //5. If the user wants to, sleep some time
405            if (this.executionEngine.SleepTime > 0)
406            {
407                Thread.Sleep(this.executionEngine.SleepTime);
408            }
409
410            //6. Clear all used inputs
411            //clearInputs();
412
413            //7. Send execute messages to possible executable next plugins
414            //runNextPlugins();
415        }
416
417        /// <summary>
418        /// Send execute messages to possible executable next plugins
419        /// </summary>
420        public void runNextPlugins()
421        {           
422            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
423            {
424                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
425                {
426                    if (!connectionModel.From.PluginModel.Startable ||
427                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
428                    {
429                        if (mayExecute(connectionModel.From.PluginModel))
430                        {
431                            MessageExecution message_exec = new MessageExecution();
432                            message_exec.PluginModel = connectionModel.From.PluginModel;
433                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
434                        }
435                    }
436                }
437            }
438        }
439     
440        /// <summary>
441        /// Delete all input data of inputs of the plugin
442        /// </summary>
443        public void clearInputs()
444        {
445            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
446            {
447                if (connectorModel.HasData)
448                {
449                    connectorModel.Data = null;
450                    connectorModel.HasData = false;
451                    connectorModel.GuiNeedsUpdate = true;
452
453                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
454                    {
455                        connectionModel.Active = false;
456                        connectorModel.GuiNeedsUpdate = true;
457                    }
458                }
459            }
460        }
461
462        /// <summary>
463        /// Fill all inputs of the plugin
464        /// </summary>
465        /// <returns></returns>
466        public bool fillInputs()
467        {
468            //Fill the plugins inputs with data
469            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
470            {
471                try
472                {
473                    if (connectorModel.HasData && connectorModel.Data.value != null)
474                    {
475                        if (connectorModel.IsDynamic)
476                        {
477                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
478                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
479                        }
480                        else
481                        {
482                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
483                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
484                        }
485                    }
486                }
487                catch (Exception ex)
488                {
489                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
490                    this.PluginModel.State = PluginModelState.Error;
491                    this.PluginModel.GuiNeedsUpdate = true;
492                    return false;
493                }
494            }
495            return true;
496        }
497
498        /// <summary>
499        ///
500        /// </summary>
501        /// <returns></returns>
502        public bool mayExecute()
503        {
504            return mayExecute(this.PluginModel);
505        }
506
507        /// <summary>
508        /// Check if the PluginModel may execute
509        /// </summary>
510        /// <param name="pluginModel"></param>
511        /// <returns></returns>
512        private bool mayExecute(PluginModel pluginModel)
513        {
514            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
515            {
516                return false;
517            }
518
519            //Check if all necessary inputs are set
520            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
521            {
522                if (!connectorModel.IControl &&
523                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
524                    connectorModel.Data == null))
525                {
526                    return false;
527                }
528            }
529
530            //Check if all outputs are free
531            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
532            {
533                if (!connectorModel.IControl)
534                {
535                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
536                    {
537                        if (connectionModel.To.HasData)
538                        {
539                            return false;
540                        }
541                    }
542                }
543            }
544            return true;
545        }
546
547        /// <summary>
548        ///
549        /// </summary>
550        public ExecutionEngine ExecutionEngine
551        {
552            get { return this.executionEngine; }           
553        }
554    }
555
556    /// <summary>
557    /// Gears4Net Scheduler
558    /// </summary>
559    public class WorkspaceManagerScheduler : Scheduler
560    {
561        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
562        private bool shutdown = false;
563        private System.Threading.Thread thread;
564        private Context currentContext;
565        public ExecutionEngine executionEngine = null;
566
567                public WorkspaceManagerScheduler() : this(String.Empty)
568                {
569
570                }
571
572        public WorkspaceManagerScheduler(string name)
573            : base()
574        {
575            this.currentContext = Thread.CurrentContext;
576
577            thread = new System.Threading.Thread(this.Start);
578            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
579                        thread.Name = name;
580           
581        }
582
583        public void startScheduler()
584        {
585            thread.Start();
586        }
587
588        private void Start()
589        {
590            if (this.currentContext != Thread.CurrentContext){
591                this.currentContext.DoCallBack(Start);}
592           
593            this.executionEngine.GuiLogMessage("Scheduler " + this.thread.Name + " up and running", NotificationLevel.Debug);
594
595            // Loop forever
596            while (true)
597            {
598                this.wakeup.WaitOne();
599
600                // Loop while there are more protocols waiting
601                while (true)
602                {
603                    // Should the scheduler stop?
604                    if (this.shutdown)
605                    {
606                        this.executionEngine.GuiLogMessage("Scheduler " + this.thread.Name + " terminated", NotificationLevel.Debug);
607                        return;
608                    }
609                   
610                    ProtocolBase protocol = null;
611                    lock (this)
612                    {
613                        // No more protocols? -> Wait
614                        if (this.waitingProtocols.Count == 0)
615                            break;
616                    }
617
618                    try
619                    {
620                        protocol = this.waitingProtocols.Dequeue();
621                        ProtocolStatus status = protocol.Run();
622
623                        lock (this)
624                        {
625                            switch (status)
626                            {
627                                case ProtocolStatus.Created:
628                                    System.Diagnostics.Debug.Assert(false);
629                                    break;
630                                case ProtocolStatus.Ready:
631                                    this.waitingProtocols.Enqueue(protocol);
632                                    break;
633                                case ProtocolStatus.Waiting:
634                                    break;
635                                case ProtocolStatus.Terminated:
636                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
637                                    this.RemoveProtocol(protocol);
638                                    break;
639                            }
640                        }
641                    }
642                    catch (Exception ex) 
643                    {
644                        System.Diagnostics.Debug.Fail("Error during scheduling: " + ex.Message + " - " + ex.InnerException);
645                    }
646                }
647            }
648        }
649
650        /// <summary>
651        /// Removes a protocol from the internal queue
652        /// </summary>
653        /// <param name="protocol"></param>
654        public override void RemoveProtocol(ProtocolBase protocol)
655        {
656            lock (this)
657            {
658                this.protocols.Remove(protocol);
659                if (this.protocols.Count == 0)
660                    this.Shutdown();
661            }
662        }
663
664        /// <summary>
665        /// Adds a protocol to the internal queue
666        /// </summary>
667        /// <param name="protocol"></param>
668        public override void AddProtocol(ProtocolBase protocol)
669        {
670            lock (this)
671            {
672                this.protocols.Add(protocol);
673            }
674        }
675
676        /// <summary>
677        /// Wakeup this scheduler
678        /// </summary>
679        /// <param name="protocol"></param>
680        public override void Wakeup(ProtocolBase protocol)
681        {
682            lock (this)
683            {
684                if (!this.waitingProtocols.Contains(protocol))
685                    this.waitingProtocols.Enqueue(protocol);
686                this.wakeup.Set();
687            }
688        }
689
690        /// <summary>
691        /// Terminates the scheduler
692        /// </summary>
693        public override void Shutdown()
694        {
695            this.shutdown = true;
696            this.wakeup.Set();
697        }
698    }
699}
Note: See TracBrowser for help on using the repository browser.