source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1844

Last change on this file since 1844 was 1844, checked in by kopal, 11 years ago
  • changed deleting of input data from executionEngine to PropertyChangedOnPlugin event (for flow control)
File size: 25.0 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49        public int SleepTime { get; set; }
50
51        /// <summary>
52        /// Creates a new ExecutionEngine
53        /// </summary>
54        /// <param name="workspaceManagerEditor"></param>
55        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
56        {
57            WorkspaceManagerEditor = workspaceManagerEditor;
58        }
59
60        /// <summary>
61        /// Is this ExecutionEngine running?
62        /// </summary>
63        public bool IsRunning
64        {
65            get;
66            private set;
67        }
68
69        /// <summary>
70        /// Execute the given Model
71        /// </summary>
72        /// <param name="workspaceModel"></param>
73        public void Execute(WorkspaceModel workspaceModel)
74        {
75            if (!IsRunning)
76            {
77                IsRunning = true;
78                this.workspaceModel = workspaceModel;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                //It is possible that a plugin is also a PluginProtocol
111                //if that is true we do not create a new one but use the plugin instead the created one
112                int counter=0;
113                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
114                {
115                    PluginProtocol pluginProtocol = null;
116
117                    if (pluginModel.Plugin is PluginProtocol)
118                    {
119                        pluginProtocol = (PluginProtocol)pluginModel.Plugin;
120                        pluginProtocol.setExecutionEngineSettings(schedulers[counter], pluginModel, this);
121                    }
122                    else
123                    {
124                        pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel, this);
125                    }
126
127                    pluginModel.Plugin.PreExecution();                   
128                    pluginModel.PluginProtocol = pluginProtocol;
129                    schedulers[counter].AddProtocol(pluginProtocol);
130
131                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
132                    {
133                        pluginProtocol.Start();
134                    }
135
136                    counter = (counter + 1) % (amountSchedulers);
137
138                    if (pluginModel.Startable)
139                    {
140                        MessageExecution msg = new MessageExecution();
141                        msg.PluginModel = pluginModel;
142                        pluginProtocol.BroadcastMessageReliably(msg);
143                    }
144                }
145
146                foreach (Scheduler scheduler in schedulers)
147                {
148                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
149                }
150            }
151        }     
152     
153        /// <summary>
154        /// Stop the execution process:
155        /// calls shutdown on all schedulers + calls stop() on each plugin
156        /// </summary>
157        public void Stop()
158        {
159            //First stop alle plugins
160            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
161            {
162                pluginModel.Plugin.Stop();
163                pluginModel.Plugin.PostExecution();
164            }           
165
166            IsRunning = false;
167            //Secondly stop all Gears4Net Schedulers
168            foreach (Scheduler scheduler in schedulers)
169            {
170                scheduler.Shutdown();
171            }
172             
173        }
174
175        /// <summary>
176        /// Pause the execution
177        /// </summary>
178        public void Pause()
179        {
180            //not implemented yet
181        }
182
183        /// <summary>
184        /// Use the logger of the WorkspaceManagerEditor
185        /// </summary>
186        /// <param name="message"></param>
187        /// <param name="level"></param>
188        public void GuiLogMessage(string message, NotificationLevel level)
189        {           
190            WorkspaceManagerEditor.GuiLogMessage(message, level);
191        }           
192    }
193 
194    /// <summary>
195    /// Message send to scheduler for a Plugin to trigger the Execution
196    /// </summary>
197    public class MessageExecution : MessageBase
198    {
199        public PluginModel PluginModel;
200    }
201
202    /// <summary>
203    /// A Protocol for updating the GUI in time intervals
204    /// </summary>
205    public class UpdateGuiProtocol : ProtocolBase
206    {
207        private WorkspaceModel workspaceModel;
208        private ExecutionEngine executionEngine;     
209
210        /// <summary>
211        /// Create a new protocol. Each protocol requires a scheduler which provides
212        /// a thread for execution.
213        /// </summary>
214        /// <param name="scheduler"></param>
215        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
216            : base(scheduler)
217        {
218            this.workspaceModel = workspaceModel;
219            this.executionEngine = executionEngine;           
220        }
221
222        /// <summary>
223        /// The main function of the protocol
224        /// </summary>
225        /// <param name="stateMachine"></param>
226        /// <returns></returns>
227        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
228        {
229            while (this.executionEngine.IsRunning)
230            {
231                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
232            }
233        }
234
235        /// <summary>
236        /// Handler function for a message.
237        /// This handler must not block, because it executes inside the thread of the scheduler.
238        /// </summary>
239        /// <param name="msg"></param>
240        private void HandleUpdateGui()
241        {
242            //Get the gui Thread
243            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
244            {
245                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
246                {
247                    if (pluginModel.GuiNeedsUpdate)
248                    {
249                        pluginModel.GuiNeedsUpdate = false;
250                        pluginModel.paint();
251                        if (pluginModel.UpdateableView != null)
252                        {
253                            pluginModel.UpdateableView.update();
254                        }
255                    }
256                }
257                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
258                {
259                    if (connectionModel.GuiNeedsUpdate)
260                    {
261                        if (connectionModel.UpdateableView != null)
262                        {
263                            connectionModel.UpdateableView.update();
264                        }
265                    }
266                }
267            }
268            , null);
269        }
270    }
271   
272    /// <summary>
273    /// A Protocol for benchmarking
274    /// </summary>
275    public class BenchmarkProtocol : ProtocolBase
276    {
277        private WorkspaceModel workspaceModel;
278        private ExecutionEngine executionEngine;
279
280        /// <summary>
281        /// Create a new protocol. Each protocol requires a scheduler which provides
282        /// a thread for execution.
283        /// </summary>
284        /// <param name="scheduler"></param>
285        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
286            : base(scheduler)
287        {
288            this.workspaceModel = workspaceModel;
289            this.executionEngine = executionEngine;
290        }
291
292        /// <summary>
293        /// The main function of the protocol
294        /// </summary>
295        /// <param name="stateMachine"></param>
296        /// <returns></returns>
297        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
298        {
299            while (this.executionEngine.IsRunning)
300            {
301                yield return Timeout(1000, HandleBenchmark);
302            }
303        }
304
305        /// <summary>
306        /// Handler function for a message.
307        /// This handler must not block, because it executes inside the thread of the scheduler.
308        /// </summary>
309        /// <param name="msg"></param>
310        private void HandleBenchmark()
311        {
312            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
313            this.executionEngine.ExecutedPluginsCounter = 0;
314        }
315
316    }
317
318    /// <summary>
319    /// A Protocol for a PluginModel
320    /// </summary>
321    public class PluginProtocol : ProtocolBase
322    {       
323        public PluginModel PluginModel;
324        private ExecutionEngine executionEngine;
325
326        /// <summary>
327        /// Create a new protocol. Each protocol requires a scheduler which provides
328        /// a thread for execution.
329        /// </summary>
330        /// <param name="scheduler"></param>
331        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
332            : base(scheduler)
333        {
334            this.PluginModel = pluginModel;
335            this.executionEngine = executionEngine;
336        }
337
338        /// <summary>
339        ///
340        /// </summary>
341        /// <param name="scheduler"></param>
342        /// <param name="pluginModel"></param>
343        /// <param name="executionEngine"></param>
344        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
345        {
346            this.Scheduler = scheduler;
347            this.PluginModel = pluginModel;
348            this.executionEngine = executionEngine;
349        }
350
351
352        /// <summary>
353        /// The main function of the protocol     
354        /// </summary>
355        /// <param name="stateMachine"></param>
356        /// <returns></returns>
357        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
358        {
359            while (this.executionEngine.IsRunning)
360            {
361                yield return Receive<MessageExecution>(null, HandleExecute);
362            }
363        }
364
365        /// <summary>
366        /// Handle an execution of a plugin
367        /// </summary>
368        /// <param name="msg"></param>
369        private void HandleExecute(MessageExecution msg)
370        {
371            // 1. Check if Plugin may Execute
372            if (!mayExecute(PluginModel))
373            {
374                return;
375            }
376
377            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
378            if (!fillInputs())
379            {
380                return;
381            }
382
383            //3. Execute the Plugin -> call the IPlugin.Execute()
384            try
385            {
386                PluginModel.Plugin.Execute();
387            }
388            catch (Exception ex)
389            {
390                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
391                this.PluginModel.State = PluginModelState.Error;
392                this.PluginModel.GuiNeedsUpdate = true;
393                return;
394            }
395
396            //4. Count for the benchmark
397            if (this.executionEngine.BenchmarkPlugins)
398            {
399                this.executionEngine.ExecutedPluginsCounter++;
400            }
401
402            //5. If the user wants to, sleep some time
403            if (this.executionEngine.SleepTime > 0)
404            {
405                Thread.Sleep(this.executionEngine.SleepTime);
406            }
407
408            //6. Clear all used inputs
409            //clearInputs();
410
411            //7. Send execute messages to possible executable next plugins
412            //runNextPlugins();
413        }
414
415        /// <summary>
416        /// Send execute messages to possible executable next plugins
417        /// </summary>
418        public void runNextPlugins()
419        {           
420            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
421            {
422                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
423                {
424                    if (!connectionModel.From.PluginModel.Startable ||
425                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
426                    {
427                        if (mayExecute(connectionModel.From.PluginModel))
428                        {
429                            MessageExecution message_exec = new MessageExecution();
430                            message_exec.PluginModel = connectionModel.From.PluginModel;
431                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
432                        }
433                    }
434                }
435            }
436        }
437     
438        /// <summary>
439        /// Delete all input data of inputs of the plugin
440        /// </summary>
441        public void clearInputs()
442        {
443            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
444            {
445                if (connectorModel.HasData)
446                {
447                    connectorModel.Data = null;
448                    connectorModel.HasData = false;
449                    connectorModel.GuiNeedsUpdate = true;
450
451                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
452                    {
453                        connectionModel.Active = false;
454                        connectorModel.GuiNeedsUpdate = true;
455                    }
456                }
457            }
458        }
459
460        /// <summary>
461        /// Fill all inputs of the plugin
462        /// </summary>
463        /// <returns></returns>
464        public bool fillInputs()
465        {
466            //Fill the plugins inputs with data
467            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
468            {
469                try
470                {
471                    if (connectorModel.HasData && connectorModel.Data.value != null)
472                    {
473                        if (connectorModel.IsDynamic)
474                        {
475                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
476                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
477                        }
478                        else
479                        {
480                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
481                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
482                        }
483                    }
484                }
485                catch (Exception ex)
486                {
487                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
488                    this.PluginModel.State = PluginModelState.Error;
489                    this.PluginModel.GuiNeedsUpdate = true;
490                    return false;
491                }
492            }
493            return true;
494        }
495
496        /// <summary>
497        ///
498        /// </summary>
499        /// <returns></returns>
500        public bool mayExecute()
501        {
502            return mayExecute(this.PluginModel);
503        }
504
505        /// <summary>
506        /// Check if the PluginModel may execute
507        /// </summary>
508        /// <param name="pluginModel"></param>
509        /// <returns></returns>
510        private bool mayExecute(PluginModel pluginModel)
511        {
512            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
513            {
514                return false;
515            }
516
517            //Check if all necessary inputs are set
518            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
519            {
520                if (!connectorModel.IControl &&
521                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
522                    connectorModel.Data == null))
523                {
524                    return false;
525                }
526            }
527
528            //Check if all outputs are free
529            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
530            {
531                if (!connectorModel.IControl)
532                {
533                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
534                    {
535                        if (connectionModel.To.HasData)
536                        {
537                            return false;
538                        }
539                    }
540                }
541            }
542            return true;
543        }
544
545        /// <summary>
546        ///
547        /// </summary>
548        public ExecutionEngine ExecutionEngine
549        {
550            get { return this.executionEngine; }           
551        }
552    }
553
554    /// <summary>
555    /// Gears4Net Scheduler
556    /// </summary>
557    public class WorkspaceManagerScheduler : Scheduler
558    {
559        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
560        private bool shutdown = false;
561        private System.Threading.Thread thread;
562        private Context currentContext;
563
564                public WorkspaceManagerScheduler() : this(String.Empty)
565                {
566
567                }
568
569        public WorkspaceManagerScheduler(string name)
570            : base()
571        {
572            this.currentContext = Thread.CurrentContext;
573
574            thread = new System.Threading.Thread(this.Start);
575            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
576                        thread.Name = name;
577           
578        }
579
580        public void startScheduler()
581        {
582            thread.Start();
583        }
584
585        private void Start()
586        {
587            if (this.currentContext != Thread.CurrentContext)
588                this.currentContext.DoCallBack(Start);
589
590            // Loop forever
591            while (true)
592            {
593                this.wakeup.WaitOne();
594
595                // Loop while there are more protocols waiting
596                while (true)
597                {
598                    // Should the scheduler stop?
599                    if (this.shutdown)
600                        return;
601                   
602                    ProtocolBase protocol = null;
603                    lock (this)
604                    {
605                        // No more protocols? -> Wait
606                        if (this.waitingProtocols.Count == 0)
607                            break;
608                    }
609
610                    try
611                    {
612                        protocol = this.waitingProtocols.Dequeue();
613                        ProtocolStatus status = protocol.Run();
614
615                        lock (this)
616                        {
617                            switch (status)
618                            {
619                                case ProtocolStatus.Created:
620                                    System.Diagnostics.Debug.Assert(false);
621                                    break;
622                                case ProtocolStatus.Ready:
623                                    this.waitingProtocols.Enqueue(protocol);
624                                    break;
625                                case ProtocolStatus.Waiting:
626                                    break;
627                                case ProtocolStatus.Terminated:
628                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
629                                    this.RemoveProtocol(protocol);
630                                    break;
631                            }
632                        }
633                    }
634                    catch (Exception ex) 
635                    {
636                        System.Diagnostics.Debug.Fail("Error during scheduling: " + ex.Message + " - " + ex.InnerException);
637                    }
638                }
639            }
640        }
641
642        /// <summary>
643        /// Removes a protocol from the internal queue
644        /// </summary>
645        /// <param name="protocol"></param>
646        public override void RemoveProtocol(ProtocolBase protocol)
647        {
648            lock (this)
649            {
650                this.protocols.Remove(protocol);
651                if (this.protocols.Count == 0)
652                    this.Shutdown();
653            }
654        }
655
656        /// <summary>
657        /// Adds a protocol to the internal queue
658        /// </summary>
659        /// <param name="protocol"></param>
660        public override void AddProtocol(ProtocolBase protocol)
661        {
662            lock (this)
663            {
664                this.protocols.Add(protocol);
665            }
666        }
667
668        /// <summary>
669        /// Wakeup this scheduler
670        /// </summary>
671        /// <param name="protocol"></param>
672        public override void Wakeup(ProtocolBase protocol)
673        {
674            lock (this)
675            {
676                if (!this.waitingProtocols.Contains(protocol))
677                    this.waitingProtocols.Enqueue(protocol);
678                this.wakeup.Set();
679            }
680        }
681
682        /// <summary>
683        /// Terminates the scheduler
684        /// </summary>
685        public override void Shutdown()
686        {
687            this.shutdown = true;
688            this.wakeup.Set();
689        }
690    }
691}
Note: See TracBrowser for help on using the repository browser.