source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1812

Last change on this file since 1812 was 1812, checked in by kopal, 11 years ago
  • added two experimental plugins with CStream support
File size: 24.6 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49        public int SleepTime { get; set; }
50
51        /// <summary>
52        /// Creates a new ExecutionEngine
53        /// </summary>
54        /// <param name="workspaceManagerEditor"></param>
55        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
56        {
57            WorkspaceManagerEditor = workspaceManagerEditor;
58        }
59
60        /// <summary>
61        /// Is this ExecutionEngine running?
62        /// </summary>
63        public bool IsRunning
64        {
65            get;
66            private set;
67        }
68
69        /// <summary>
70        /// Execute the given Model
71        /// </summary>
72        /// <param name="workspaceModel"></param>
73        public void Execute(WorkspaceModel workspaceModel)
74        {
75            if (!IsRunning)
76            {
77                IsRunning = true;
78                this.workspaceModel = workspaceModel;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                //It is possible that a plugin is also a PluginProtocol
111                //if that is true we do not create a new one but use the plugin instead the created one
112                int counter=0;
113                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
114                {
115                    PluginProtocol pluginProtocol = null;
116
117                    if (pluginModel.Plugin is PluginProtocol)
118                    {
119                        pluginProtocol = (PluginProtocol)pluginModel.Plugin;
120                        pluginProtocol.setExecutionEngineSettings(schedulers[counter], pluginModel, this);
121                    }
122                    else
123                    {
124                        pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel, this);
125                    }
126
127                    pluginModel.Plugin.PreExecution();                   
128                    pluginModel.PluginProtocol = pluginProtocol;
129                    schedulers[counter].AddProtocol(pluginProtocol);
130
131                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
132                    {
133                        pluginProtocol.Start();
134                    }
135
136                    counter = (counter + 1) % (amountSchedulers);
137
138                    if (pluginModel.Startable)
139                    {
140                        MessageExecution msg = new MessageExecution();
141                        msg.PluginModel = pluginModel;
142                        pluginProtocol.BroadcastMessageReliably(msg);
143                    }
144                }
145
146                foreach (Scheduler scheduler in schedulers)
147                {
148                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
149                }
150            }
151        }     
152     
153        /// <summary>
154        /// Stop the execution process:
155        /// calls shutdown on all schedulers + calls stop() on each plugin
156        /// </summary>
157        public void Stop()
158        {
159            //First stop alle plugins
160            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
161            {
162                pluginModel.Plugin.Stop();
163                pluginModel.Plugin.PostExecution();
164            }           
165
166            IsRunning = false;
167            //Secondly stop all Gears4Net Schedulers
168            foreach (Scheduler scheduler in schedulers)
169            {
170                scheduler.Shutdown();
171            }
172             
173        }
174
175        /// <summary>
176        /// Pause the execution
177        /// </summary>
178        public void Pause()
179        {
180            //not implemented yet
181        }
182
183        /// <summary>
184        /// Use the logger of the WorkspaceManagerEditor
185        /// </summary>
186        /// <param name="message"></param>
187        /// <param name="level"></param>
188        public void GuiLogMessage(string message, NotificationLevel level)
189        {           
190            WorkspaceManagerEditor.GuiLogMessage(message, level);
191        }           
192    }
193 
194    /// <summary>
195    /// Message send to scheduler for a Plugin to trigger the Execution
196    /// </summary>
197    public class MessageExecution : MessageBase
198    {
199        public PluginModel PluginModel;
200    }
201
202    /// <summary>
203    /// A Protocol for updating the GUI in time intervals
204    /// </summary>
205    public class UpdateGuiProtocol : ProtocolBase
206    {
207        private WorkspaceModel workspaceModel;
208        private ExecutionEngine executionEngine;     
209
210        /// <summary>
211        /// Create a new protocol. Each protocol requires a scheduler which provides
212        /// a thread for execution.
213        /// </summary>
214        /// <param name="scheduler"></param>
215        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
216            : base(scheduler)
217        {
218            this.workspaceModel = workspaceModel;
219            this.executionEngine = executionEngine;           
220        }
221
222        /// <summary>
223        /// The main function of the protocol
224        /// </summary>
225        /// <param name="stateMachine"></param>
226        /// <returns></returns>
227        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
228        {
229            while (this.executionEngine.IsRunning)
230            {
231                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
232            }
233        }
234
235        /// <summary>
236        /// Handler function for a message.
237        /// This handler must not block, because it executes inside the thread of the scheduler.
238        /// </summary>
239        /// <param name="msg"></param>
240        private void HandleUpdateGui()
241        {
242            //Get the gui Thread
243            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
244            {
245                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
246                {
247                    if (pluginModel.GuiNeedsUpdate)
248                    {
249                        pluginModel.GuiNeedsUpdate = false;
250                        pluginModel.paint();
251                        if (pluginModel.UpdateableView != null)
252                        {
253                            pluginModel.UpdateableView.update();
254                        }
255                    }
256                }
257                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
258                {
259                    if (connectionModel.GuiNeedsUpdate)
260                    {
261                        if (connectionModel.UpdateableView != null)
262                        {
263                            connectionModel.UpdateableView.update();
264                        }
265                    }
266                }
267            }
268            , null);
269        }
270    }
271   
272    /// <summary>
273    /// A Protocol for benchmarking
274    /// </summary>
275    public class BenchmarkProtocol : ProtocolBase
276    {
277        private WorkspaceModel workspaceModel;
278        private ExecutionEngine executionEngine;
279
280        /// <summary>
281        /// Create a new protocol. Each protocol requires a scheduler which provides
282        /// a thread for execution.
283        /// </summary>
284        /// <param name="scheduler"></param>
285        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
286            : base(scheduler)
287        {
288            this.workspaceModel = workspaceModel;
289            this.executionEngine = executionEngine;
290        }
291
292        /// <summary>
293        /// The main function of the protocol
294        /// </summary>
295        /// <param name="stateMachine"></param>
296        /// <returns></returns>
297        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
298        {
299            while (this.executionEngine.IsRunning)
300            {
301                yield return Timeout(1000, HandleBenchmark);
302            }
303        }
304
305        /// <summary>
306        /// Handler function for a message.
307        /// This handler must not block, because it executes inside the thread of the scheduler.
308        /// </summary>
309        /// <param name="msg"></param>
310        private void HandleBenchmark()
311        {
312            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
313            this.executionEngine.ExecutedPluginsCounter = 0;
314        }
315
316    }
317
318    /// <summary>
319    /// A Protocol for a PluginModel
320    /// </summary>
321    public class PluginProtocol : ProtocolBase
322    {       
323        public PluginModel PluginModel;
324        private ExecutionEngine executionEngine;
325
326        /// <summary>
327        /// Create a new protocol. Each protocol requires a scheduler which provides
328        /// a thread for execution.
329        /// </summary>
330        /// <param name="scheduler"></param>
331        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
332            : base(scheduler)
333        {
334            this.PluginModel = pluginModel;
335            this.executionEngine = executionEngine;
336        }
337
338        /// <summary>
339        ///
340        /// </summary>
341        /// <param name="scheduler"></param>
342        /// <param name="pluginModel"></param>
343        /// <param name="executionEngine"></param>
344        public void setExecutionEngineSettings(Scheduler scheduler, PluginModel pluginModel, ExecutionEngine executionEngine)
345        {
346            this.Scheduler = scheduler;
347            this.PluginModel = pluginModel;
348            this.executionEngine = executionEngine;
349        }
350
351
352        /// <summary>
353        /// The main function of the protocol     
354        /// </summary>
355        /// <param name="stateMachine"></param>
356        /// <returns></returns>
357        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
358        {
359            while (this.executionEngine.IsRunning)
360            {
361                yield return Receive<MessageExecution>(null, HandleExecute);
362            }
363        }
364
365        /// <summary>
366        /// Handle an execution of a plugin
367        /// </summary>
368        /// <param name="msg"></param>
369        private void HandleExecute(MessageExecution msg)
370        {
371            // 1. Check if Plugin may Execute
372            if (!mayExecute(PluginModel))
373            {
374                return;
375            }
376
377            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
378            if (!fillInputs())
379            {
380                return;
381            }
382
383            //3. Execute the Plugin -> call the IPlugin.Execute()
384            try
385            {
386                PluginModel.Plugin.Execute();
387            }
388            catch (Exception ex)
389            {
390                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
391                this.PluginModel.State = PluginModelState.Error;
392                this.PluginModel.GuiNeedsUpdate = true;
393                return;
394            }
395
396            //4. Count for the benchmark
397            if (this.executionEngine.BenchmarkPlugins)
398            {
399                this.executionEngine.ExecutedPluginsCounter++;
400            }
401
402            //5. If the user wants to, sleep some time
403            if (this.executionEngine.SleepTime > 0)
404            {
405                Thread.Sleep(this.executionEngine.SleepTime);
406            }
407
408            //6. Clear all used inputs
409            clearInputs();
410
411            //7. Send execute messages to possible executable next plugins
412            runNextPlugins();
413        }
414
415        /// <summary>
416        /// Send execute messages to possible executable next plugins
417        /// </summary>
418        public void runNextPlugins()
419        {           
420            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
421            {
422                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
423                {
424                    if (!connectionModel.From.PluginModel.Startable ||
425                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
426                    {
427                        if (mayExecute(connectionModel.From.PluginModel))
428                        {
429                            MessageExecution message_exec = new MessageExecution();
430                            message_exec.PluginModel = connectionModel.From.PluginModel;
431                            connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
432                        }
433                    }
434                }
435            }
436        }
437     
438        /// <summary>
439        /// Delete all input data of inputs of the plugin
440        /// </summary>
441        public void clearInputs()
442        {
443            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
444            {
445                if (connectorModel.HasData)
446                {
447                    connectorModel.Data = null;
448                    connectorModel.HasData = false;
449                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
450                    {
451                        connectionModel.Active = false;
452                        connectorModel.GuiNeedsUpdate = true;
453                    }
454                }
455            }
456        }
457
458        /// <summary>
459        /// Fill all inputs of the plugin
460        /// </summary>
461        /// <returns></returns>
462        public bool fillInputs()
463        {
464            //Fill the plugins inputs with data
465            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
466            {
467                try
468                {
469                    if (connectorModel.HasData && connectorModel.Data.value != null)
470                    {
471                        if (connectorModel.IsDynamic)
472                        {
473                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
474                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
475                        }
476                        else
477                        {
478                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
479                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
480                        }
481                    }
482                }
483                catch (Exception ex)
484                {
485                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
486                    this.PluginModel.State = PluginModelState.Error;
487                    this.PluginModel.GuiNeedsUpdate = true;
488                    return false;
489                }
490            }
491            return true;
492        }
493
494        /// <summary>
495        ///
496        /// </summary>
497        /// <returns></returns>
498        public bool mayExecute()
499        {
500            return mayExecute(this.PluginModel);
501        }
502
503        /// <summary>
504        /// Check if the PluginModel may execute
505        /// </summary>
506        /// <param name="pluginModel"></param>
507        /// <returns></returns>
508        private bool mayExecute(PluginModel pluginModel)
509        {
510            if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
511            {
512                return false;
513            }
514
515            //Check if all necessary inputs are set
516            foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
517            {
518                if (!connectorModel.IControl &&
519                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
520                    connectorModel.Data == null))
521                {
522                    return false;
523                }
524            }
525
526            //Check if all outputs are free
527            foreach (ConnectorModel connectorModel in pluginModel.OutputConnectors)
528            {
529                if (!connectorModel.IControl)
530                {
531                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
532                    {
533                        if (connectionModel.To.HasData)
534                        {
535                            return false;
536                        }
537                    }
538                }
539            }
540            return true;
541        }
542
543        /// <summary>
544        ///
545        /// </summary>
546        public ExecutionEngine ExecutionEngine
547        {
548            get { return this.executionEngine; }           
549        }
550    }
551
552    /// <summary>
553    /// Gears4Net Scheduler
554    /// </summary>
555    public class WorkspaceManagerScheduler : Scheduler
556    {
557        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
558        private bool shutdown = false;
559        private System.Threading.Thread thread;
560        private Context currentContext;
561
562                public WorkspaceManagerScheduler() : this(String.Empty)
563                {
564
565                }
566
567        public WorkspaceManagerScheduler(string name)
568            : base()
569        {
570            this.currentContext = Thread.CurrentContext;
571
572            thread = new System.Threading.Thread(this.Start);
573            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
574                        thread.Name = name;
575           
576        }
577
578        public void startScheduler()
579        {
580            thread.Start();
581        }
582
583        private void Start()
584        {
585            if (this.currentContext != Thread.CurrentContext)
586                this.currentContext.DoCallBack(Start);
587
588            // Loop forever
589            while (true)
590            {
591                this.wakeup.WaitOne();
592
593                // Loop while there are more protocols waiting
594                while (true)
595                {
596                    // Should the scheduler stop?
597                    if (this.shutdown)
598                        return;
599                   
600                    ProtocolBase protocol = null;
601                    lock (this)
602                    {
603                        // No more protocols? -> Wait
604                        if (this.waitingProtocols.Count == 0)
605                            break;
606                    }
607
608                    protocol = this.waitingProtocols.Dequeue();
609                    ProtocolStatus status = protocol.Run();
610
611                    lock (this)
612                    {
613                        switch (status)
614                        {
615                            case ProtocolStatus.Created:
616                                System.Diagnostics.Debug.Assert(false);
617                                break;
618                            case ProtocolStatus.Ready:
619                                this.waitingProtocols.Enqueue(protocol);
620                                break;
621                            case ProtocolStatus.Waiting:
622                                break;
623                            case ProtocolStatus.Terminated:
624                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
625                                this.RemoveProtocol(protocol);
626                                break;
627                        }
628                    }
629                   
630                }
631            }
632        }
633
634        /// <summary>
635        /// Removes a protocol from the internal queue
636        /// </summary>
637        /// <param name="protocol"></param>
638        public override void RemoveProtocol(ProtocolBase protocol)
639        {
640            lock (this)
641            {
642                this.protocols.Remove(protocol);
643                if (this.protocols.Count == 0)
644                    this.Shutdown();
645            }
646        }
647
648        /// <summary>
649        /// Adds a protocol to the internal queue
650        /// </summary>
651        /// <param name="protocol"></param>
652        public override void AddProtocol(ProtocolBase protocol)
653        {
654            lock (this)
655            {
656                this.protocols.Add(protocol);
657            }
658        }
659
660        /// <summary>
661        /// Wakeup this scheduler
662        /// </summary>
663        /// <param name="protocol"></param>
664        public override void Wakeup(ProtocolBase protocol)
665        {
666            lock (this)
667            {
668                if (!this.waitingProtocols.Contains(protocol))
669                    this.waitingProtocols.Enqueue(protocol);
670                this.wakeup.Set();
671            }
672        }
673
674        /// <summary>
675        /// Terminates the scheduler
676        /// </summary>
677        public override void Shutdown()
678        {
679            this.shutdown = true;
680            this.wakeup.Set();
681        }
682    }
683}
Note: See TracBrowser for help on using the repository browser.