source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 2142

Last change on this file since 2142 was 2142, checked in by nolte, 11 years ago

Full working Versionnumber code added.
INFO: THE UNCOMMENTED UPDATE WILL BE COMITTED THURSDAY 8PM (20Uhr :P )
PLEASE UPDATE YOUR SVN AFTERWARDS!

File size: 25.8 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30using System.IO;
31using System.Diagnostics;
32
33namespace WorkspaceManager.Execution
34{
35    /// <summary>
36    /// Engine to execute a model of the WorkspaceManager
37    /// This class needs a WorkspaceManager to be instantiated
38    /// To run an execution process it also needs a WorkspaceModel
39    ///
40    /// This class uses Gears4Net to execute the plugins
41    /// </summary>
42    public class ExecutionEngine
43    {
44        private WorkspaceManager WorkspaceManagerEditor;
45        private Scheduler scheduler;
46        private WorkspaceModel workspaceModel;
47        private volatile bool isRunning = false;
48        private BenchmarkProtocol BenchmarkProtocol = null;
49
50        public volatile int ExecutedPluginsCounter = 0;
51        public bool BenchmarkPlugins = false;
52        public int GuiUpdateInterval = 0;
53        public int SleepTime = 0;
54        public int ThreadPriority = 0;
55
56        /// <summary>
57        /// Creates a new ExecutionEngine
58        /// </summary>
59        /// <param name="workspaceManagerEditor"></param>
60        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
61        {
62            WorkspaceManagerEditor = workspaceManagerEditor;
63        }
64
65        /// <summary>
66        /// Is this ExecutionEngine running?
67        /// </summary>
68        public bool IsRunning
69        {
70            get{return this.isRunning;}
71            private set{this.isRunning = value;}
72        }
73
74        /// <summary>
75        /// Execute the given Model
76        /// </summary>
77        /// <param name="workspaceModel"></param>
78        public void Execute(WorkspaceModel workspaceModel, int amountThreads)
79        {
80            if (!IsRunning)
81            {
82                IsRunning = true;
83                this.workspaceModel = workspaceModel;
84
85                if (amountThreads <= 0)
86                {
87                    amountThreads = 1;
88                }
89
90                scheduler = new WorkspaceManagerScheduler("WorkspaceManagerScheduler", amountThreads,this);
91               
92                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
93                workspaceModel.resetStates();
94
95                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
96                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(scheduler, workspaceModel, this);
97                scheduler.AddProtocol(updateGuiProtocol);
98                updateGuiProtocol.Start();
99
100                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
101                if (this.BenchmarkPlugins)
102                {
103                    BenchmarkProtocol = new BenchmarkProtocol(scheduler, this.workspaceModel, this);
104                    scheduler.AddProtocol(BenchmarkProtocol);
105                    BenchmarkProtocol.Start();
106                }
107
108                //Here we create for each PluginModel an own PluginProtocol
109                //By using round-robin we give each protocol to another scheduler to gain
110                //a good average load balancing of the schedulers
111                //we also initalize each plugin
112                //It is possible that a plugin is also a PluginProtocol
113                //if that is true we do not create a new one but use the plugin instead the created one               
114                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
115                {
116                    PluginProtocol pluginProtocol = new PluginProtocol(scheduler, pluginModel, this);
117                    MessageExecution message = new MessageExecution();
118                    message.PluginModel = pluginModel;
119                    pluginModel.MessageExecution = message;
120
121                    pluginModel.Plugin.PreExecution();                   
122                    pluginModel.PluginProtocol = pluginProtocol;
123                    scheduler.AddProtocol(pluginProtocol);
124
125                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
126                    {
127                        pluginProtocol.Start();
128                    }
129                 
130                    if (pluginModel.Startable)
131                    {
132                        pluginProtocol.BroadcastMessage(pluginModel.MessageExecution);
133                    }
134                }
135
136                ((WorkspaceManagerScheduler)scheduler).startScheduling();
137               
138            }
139        }     
140     
141        /// <summary>
142        /// Stop the execution process:
143        /// calls shutdown on all schedulers + calls stop() on each plugin
144        /// </summary>
145        public void Stop()
146        {
147           
148            //First stop alle plugins
149            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
150            {
151                pluginModel.Plugin.Stop();                   
152            }
153
154            IsRunning = false;
155            //Secondly stop all Gears4Net Schedulers
156            scheduler.Shutdown();
157
158            //call all PostExecution methods of all plugins
159            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
160            {
161                pluginModel.Plugin.PostExecution();
162            }
163
164            //remove the plugin protocol of each plugin model
165            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
166            {
167                pluginModel.PluginProtocol = null;
168            }
169
170            this.WorkspaceManagerEditor = null;
171            this.workspaceModel = null;
172         
173        }
174
175        /// <summary>
176        /// Pause the execution
177        /// </summary>
178        public void Pause()
179        {
180            //not implemented yet
181        }
182
183        /// <summary>
184        /// Use the logger of the WorkspaceManagerEditor
185        /// </summary>
186        /// <param name="message"></param>
187        /// <param name="level"></param>
188        public void GuiLogMessage(string message, NotificationLevel level)
189        {
190            if (WorkspaceManagerEditor != null)
191            {
192                WorkspaceManagerEditor.GuiLogMessage(message, level);
193            }
194        }
195    }
196 
197    /// <summary>
198    /// Message send to scheduler for a Plugin to trigger the Execution
199    /// </summary>
200    public class MessageExecution : MessageBase
201    {
202        public PluginModel PluginModel;
203    }
204
205    /// <summary>
206    /// A Protocol for updating the GUI in time intervals
207    /// </summary>
208    public class UpdateGuiProtocol : ProtocolBase
209    {
210        private WorkspaceModel workspaceModel;
211        private ExecutionEngine executionEngine;
212
213        /// <summary>
214        /// Create a new protocol. Each protocol requires a scheduler which provides
215        /// a thread for execution.
216        /// </summary>
217        /// <param name="scheduler"></param>
218        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
219            : base(scheduler)
220        {
221            this.workspaceModel = workspaceModel;
222            this.executionEngine = executionEngine;           
223        }
224       
225        /// <summary>
226        /// The main function of the protocol
227        /// </summary>
228        /// <param name="stateMachine"></param>
229        /// <returns></returns>
230        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
231        {           
232            yield return new IntervalReceiver(this.executionEngine.GuiUpdateInterval,this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
233        }
234
235        /// <summary>
236        /// Handler function for a message.
237        /// This handler must not block, because it executes inside the thread of the scheduler.
238        /// </summary>
239        /// <param name="msg"></param>
240        private void HandleUpdateGui()
241        {
242            //Get the gui Thread
243            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
244            {
245                List<PluginModel> pluginModels = workspaceModel.AllPluginModels;
246                foreach (PluginModel pluginModel in pluginModels)
247                {
248                    if (pluginModel.GuiNeedsUpdate)
249                    {
250                        pluginModel.GuiNeedsUpdate = false;
251                        pluginModel.paint();
252                        if (pluginModel.UpdateableView != null)
253                        {
254                            pluginModel.UpdateableView.update();
255                        }
256                    }
257                }
258                List<ConnectionModel> connectionModels = workspaceModel.AllConnectionModels;
259                foreach (ConnectionModel connectionModel in connectionModels)
260                {
261                    if (connectionModel.GuiNeedsUpdate)
262                    {
263                        if (connectionModel.UpdateableView != null)
264                        {
265                            connectionModel.UpdateableView.update();
266                        }
267                    }
268                }
269            }
270            , null);
271
272        }
273    }
274   
275    /// <summary>
276    /// A Protocol for benchmarking
277    /// </summary>
278    public class BenchmarkProtocol : ProtocolBase
279    {
280        private WorkspaceModel workspaceModel;
281        private ExecutionEngine executionEngine;
282     
283        /// <summary>
284        /// Create a new protocol. Each protocol requires a scheduler which provides
285        /// a thread for execution.
286        /// </summary>
287        /// <param name="scheduler"></param>
288        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
289            : base(scheduler)
290        {
291            this.workspaceModel = workspaceModel;
292            this.executionEngine = executionEngine;         
293        }
294
295        /// <summary>
296        /// The main function of the protocol
297        /// </summary>
298        /// <param name="stateMachine"></param>
299        /// <returns></returns>
300        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
301        {
302            yield return new IntervalReceiver(1000,1000, HandleBenchmark);           
303        }
304
305        /// <summary>
306        /// Handler function for a message.
307        /// This handler must not block, because it executes inside the thread of the scheduler.
308        /// </summary>
309        /// <param name="msg"></param>
310        private void HandleBenchmark()
311        {
312            StringBuilder sb = new StringBuilder();
313            sb.Append("Executing at about ");
314            sb.Append(this.executionEngine.ExecutedPluginsCounter); 
315            sb.Append(" Plugins/s");
316
317            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage(sb.ToString(), NotificationLevel.Debug);         
318            this.executionEngine.ExecutedPluginsCounter = 0;
319        }       
320    }
321
322    /// <summary>
323    /// A Protocol for a PluginModel
324    /// </summary>
325    public class PluginProtocol : ProtocolBase
326    {       
327        public PluginModel PluginModel;
328        private ExecutionEngine executionEngine;
329
330        /// <summary>
331        /// Create a new protocol. Each protocol requires a scheduler which provides
332        /// a thread for execution.
333        /// </summary>
334        /// <param name="scheduler"></param>
335        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
336            : base(scheduler)
337        {
338            this.PluginModel = pluginModel;
339            this.executionEngine = executionEngine;
340        }       
341
342        /// <summary>
343        ///
344        /// </summary>
345        /// <param name="scheduler"></param>
346        /// <param name="pluginModel"></param>
347        /// <param name="executionEngine"></param>
348        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
349        {
350            this.Scheduler = scheduler;
351            this.PluginModel = pluginModel;
352            this.executionEngine = executionEngine;
353        }
354
355
356        /// <summary>
357        /// The main function of the protocol     
358        /// </summary>
359        /// <param name="stateMachine"></param>
360        /// <returns></returns>
361        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
362        {           
363            yield return new PersistentReceiver(Receive<MessageExecution>(this.Filter, HandleExecute));               
364        }
365
366        /// <summary>
367        /// Filter that checks wether the Plugin fits to the internal Plugin reference of this PluginProtocl
368        /// </summary>
369        /// <param name="msg"></param>
370        /// <returns></returns>
371        private bool Filter(MessageExecution msg)
372        {
373            if (msg.PluginModel != this.PluginModel)
374            {
375                return false;
376            }
377            return true;
378        }
379        /// <summary>
380        /// Handle an execution of a plugin
381        /// </summary>
382        /// <param name="msg"></param>
383        private void HandleExecute(MessageExecution msg)
384        {
385            // ################
386            // 1. Check if Plugin may Execute
387            // ################
388
389            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
390            {
391                return;
392            }
393
394            //Check if all necessary inputs are set
395            List<ConnectorModel> inputConnectors = msg.PluginModel.InputConnectors;
396            foreach (ConnectorModel connectorModel in inputConnectors)
397            {
398                if (!connectorModel.IControl &&
399                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
400                {
401                    return;
402                }
403            }
404
405            //Check if all outputs are free
406            List<ConnectorModel> outputConnectors = msg.PluginModel.OutputConnectors;
407            foreach (ConnectorModel connectorModel in outputConnectors)
408            {
409                if (!connectorModel.IControl)
410                {
411                    List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
412                    foreach (ConnectionModel connectionModel in outputConnections)
413                    {
414                        if (connectionModel.To.HasData)
415                        {
416                            return;
417                        }
418                    }
419                }
420            }
421
422            // ################
423            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
424            // ################
425
426            //Fill the plugins inputs with data
427            foreach (ConnectorModel connectorModel in inputConnectors)
428            {
429                try
430                {
431                    if (connectorModel.HasData && connectorModel.Data != null)
432                    {
433                        if (connectorModel.IsDynamic)
434                        {
435                       
436                            if(connectorModel.method == null)
437                            {
438                                connectorModel.method = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
439                            }
440                            connectorModel.method.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
441                        }
442                        else
443                        {
444                            if (connectorModel.property == null)
445                            {
446                                connectorModel.property = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
447                            }
448                            connectorModel.property.SetValue(PluginModel.Plugin, connectorModel.Data, null);
449                        }
450                    }
451                }
452                catch (Exception ex)
453                {
454                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
455                    this.PluginModel.State = PluginModelState.Error;
456                    this.PluginModel.GuiNeedsUpdate = true;
457                    return;
458                }
459            }
460
461            // ################
462            //3. Execute the Plugin -> call the IPlugin.Execute()
463            // ################
464
465            try
466            {
467                PluginModel.Plugin.Execute();
468            }
469            catch (Exception ex)
470            {
471                if (ex.Message.Equals("Exception of type 'KeySearcher.P2P.Exceptions.KeySearcherStopException' was thrown."))
472                {
473                    //KeysearcherStopExceptions need to shutdown the sheduler. Otherwise the process would start over and over again.
474                    this.Stop();
475                    return;
476                }
477                else
478                {
479                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
480                    this.PluginModel.State = PluginModelState.Error;
481                    this.PluginModel.GuiNeedsUpdate = true;
482                    return;
483                }
484            }
485
486            // ################
487            //4. Count for the benchmark
488            // ################
489
490            if (this.executionEngine.BenchmarkPlugins)
491            {
492                this.executionEngine.ExecutedPluginsCounter++;               
493            }
494
495            // ################
496            //5. If the user wants to, sleep some time
497            // ################
498
499            if (this.executionEngine.SleepTime > 0)
500            {
501                Thread.Sleep(this.executionEngine.SleepTime);
502            }
503        }       
504
505        /// <summary>
506        ///
507        /// </summary>
508        public ExecutionEngine ExecutionEngine
509        {
510            get { return this.executionEngine; }           
511        }
512    }
513
514    /// <summary>
515    /// Gears4Net Scheduler
516    /// </summary>
517    public class WorkspaceManagerScheduler : Scheduler
518    {
519        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
520        private bool shutdown = false;
521        private Thread[] threads;
522        private volatile int runningThreads = 0;
523
524        public ExecutionEngine executionEngine = null;         
525
526        public WorkspaceManagerScheduler(string name, int amountThreads, ExecutionEngine executionEngine)
527            : base()
528        {
529            threads = new Thread[amountThreads];
530            this.executionEngine = executionEngine;
531
532            for (int i = 0; i < threads.Length;i++ )
533            {
534                threads[i] = new Thread(this.DoScheduling);
535                threads[i].SetApartmentState(ApartmentState.MTA);
536                threads[i].Name = name + "-Thread-" + i;
537
538                switch (this.executionEngine.ThreadPriority)
539                {
540                    case 0:
541                        threads[i].Priority = ThreadPriority.AboveNormal;
542                        break;
543                    case 1:
544                        threads[i].Priority = ThreadPriority.BelowNormal;
545                        break;
546                    case 2:
547                        threads[i].Priority = ThreadPriority.Highest;
548                        break;
549                    case 3:
550                        threads[i].Priority = ThreadPriority.Lowest;
551                        break;
552                    case 4:
553                        threads[i].Priority = ThreadPriority.Normal;
554                        break;
555                }
556            }
557           
558        }
559
560        public void startScheduling()
561        {
562            foreach (Thread thread in threads)
563            {
564                thread.Start();
565                lock (this)
566                {
567                    runningThreads++;
568                }
569            }
570        }
571       
572        private void DoScheduling()
573        {           
574
575            this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " up and running", NotificationLevel.Debug);
576            Queue<ProtocolBase> waitingProtocols = this.waitingProtocols;
577           
578            // Loop forever
579            while (true)
580            {               
581                this.wakeup.WaitOne();
582
583                // Loop while there are more protocols waiting
584                while (true)
585                {
586                    // Should the scheduler stop?
587                    if (this.shutdown)
588                    {                       
589                        this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " terminated", NotificationLevel.Debug);
590                        lock (this)
591                        {
592                            runningThreads--;
593                        }                                           
594                        return;
595                    }
596
597                    try
598                    {
599                        ProtocolBase protocol = null;
600                        lock (this)
601                        {
602                            if (waitingProtocols.Count == 0)
603                                break;
604                            protocol = waitingProtocols.Dequeue();
605                        }
606                       
607                        ProtocolStatus status = protocol.Run();
608
609                        lock (this)
610                        {
611                            switch (status)
612                            {
613                                case ProtocolStatus.Created:
614                                    System.Diagnostics.Debug.Assert(false);
615                                    break;
616                                case ProtocolStatus.Ready:
617                                    waitingProtocols.Enqueue(protocol);
618                                    break;
619                                case ProtocolStatus.Waiting:
620                                    break;
621                                case ProtocolStatus.Terminated:
622                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
623                                    this.RemoveProtocol(protocol);
624                                    break;
625                            }
626                        }
627                    }
628                    catch (Exception ex)
629                    {
630                        this.executionEngine.GuiLogMessage("Error during scheduling: " + ex.Message + " - " + ex.InnerException,NotificationLevel.Error);
631                    }
632                }
633            }
634        }
635        /// <summary>
636        /// Removes a protocol from the internal queue
637        /// </summary>
638        /// <param name="protocol"></param>
639        public override void RemoveProtocol(ProtocolBase protocol)
640        {
641            lock (this)
642            {
643                this.protocols.Remove(protocol);
644                if (this.protocols.Count == 0)
645                    this.Shutdown();
646            }
647        }
648
649        /// <summary>
650        /// Adds a protocol to the internal queue
651        /// </summary>
652        /// <param name="protocol"></param>
653        public override void AddProtocol(ProtocolBase protocol)
654        {
655            lock (this)
656            {
657                this.protocols.Add(protocol);
658            }
659        }
660
661        /// <summary>
662        /// Wakeup this scheduler
663        /// </summary>
664        /// <param name="protocol"></param>
665        public override void Wakeup(ProtocolBase protocol)
666        {
667            lock (this)
668            {
669                if (!this.waitingProtocols.Contains(protocol))
670                    this.waitingProtocols.Enqueue(protocol);
671                this.wakeup.Set();
672            }
673        }
674
675        /// <summary>
676        /// Terminates the scheduler
677        /// </summary>
678        public override void Shutdown()
679        {
680            this.shutdown = true;
681            this.wakeup.Set();
682
683            this.executionEngine.GuiLogMessage("Waiting for all scheduler threads to stop", NotificationLevel.Debug);
684            while (runningThreads > 0)
685            {
686                Thread.Sleep(50);
687                this.wakeup.Set();
688            }
689            this.executionEngine.GuiLogMessage("All scheduler threads stopped", NotificationLevel.Debug);
690
691            this.waitingProtocols.Clear();
692            this.protocols.Clear();           
693        }
694    }
695}
Note: See TracBrowser for help on using the repository browser.