source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1776

Last change on this file since 1776 was 1776, checked in by kopal, 11 years ago

some ExecutionEngine changes

File size: 20.7 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            this.workspaceModel = workspaceModel;
75
76            if (!IsRunning)
77            {
78                IsRunning = true;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;               
80
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                int counter=0;
111                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
112                {
113                    pluginModel.Plugin.PreExecution();
114                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
115                    pluginModel.PluginProtocol = pluginProtocol;
116                    schedulers[counter].AddProtocol(pluginProtocol);
117                   
118                    pluginProtocol.Start();
119                    counter = (counter + 1) % (amountSchedulers);
120
121                    if (pluginModel.Startable)
122                    {
123                        MessageExecution msg = new MessageExecution();
124                        msg.PluginModel = pluginModel;
125                        pluginProtocol.BroadcastMessageReliably(msg);
126                    }
127                }
128
129                foreach (Scheduler scheduler in schedulers)
130                {
131                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
132                }
133            }
134        }     
135     
136        /// <summary>
137        /// Stop the execution process:
138        /// calls shutdown on all schedulers + calls stop() on each plugin
139        /// </summary>
140        public void Stop()
141        {
142            //First stop alle plugins
143            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
144            {
145                pluginModel.Plugin.Stop();
146                pluginModel.Plugin.PostExecution();
147            }           
148
149            IsRunning = false;
150            //Secondly stop all Gears4Net Schedulers
151            foreach (Scheduler scheduler in schedulers)
152            {
153                scheduler.Shutdown();
154            }
155             
156        }
157
158        /// <summary>
159        /// Pause the execution
160        /// </summary>
161        public void Pause()
162        {
163            //not implemented yet
164        }
165
166        /// <summary>
167        /// Use the logger of the WorkspaceManagerEditor
168        /// </summary>
169        /// <param name="message"></param>
170        /// <param name="level"></param>
171        public void GuiLogMessage(string message, NotificationLevel level)
172        {           
173            WorkspaceManagerEditor.GuiLogMessage(message, level);
174        }           
175    }
176 
177    /// <summary>
178    /// Message send to scheduler for a Plugin to trigger the Execution
179    /// </summary>
180    public class MessageExecution : MessageBase
181    {
182        public PluginModel PluginModel;
183    }
184
185    /// <summary>
186    /// A Protocol for updating the GUI in time intervals
187    /// </summary>
188    public class UpdateGuiProtocol : ProtocolBase
189    {
190        private WorkspaceModel workspaceModel;
191        private ExecutionEngine executionEngine;     
192
193        /// <summary>
194        /// Create a new protocol. Each protocol requires a scheduler which provides
195        /// a thread for execution.
196        /// </summary>
197        /// <param name="scheduler"></param>
198        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
199            : base(scheduler)
200        {
201            this.workspaceModel = workspaceModel;
202            this.executionEngine = executionEngine;           
203        }
204
205        /// <summary>
206        /// The main function of the protocol
207        /// </summary>
208        /// <param name="stateMachine"></param>
209        /// <returns></returns>
210        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
211        {
212            while (this.executionEngine.IsRunning)
213            {
214                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
215            }
216        }
217
218        /// <summary>
219        /// Handler function for a message.
220        /// This handler must not block, because it executes inside the thread of the scheduler.
221        /// </summary>
222        /// <param name="msg"></param>
223        private void HandleUpdateGui()
224        {
225            //Get the gui Thread
226            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
227            {
228                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
229                {
230                    if (pluginModel.GuiNeedsUpdate)
231                    {
232                        pluginModel.GuiNeedsUpdate = false;
233                        pluginModel.paint();
234                        if (pluginModel.UpdateableView != null)
235                        {
236                            pluginModel.UpdateableView.update();
237                        }
238                    }
239                }
240                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
241                {
242                    if (connectionModel.GuiNeedsUpdate)
243                    {
244                        if (connectionModel.UpdateableView != null)
245                        {
246                            connectionModel.UpdateableView.update();
247                        }
248                    }
249                }
250            }
251            , null);
252        }
253    }
254   
255    /// <summary>
256    /// A Protocol for benchmarking
257    /// </summary>
258    public class BenchmarkProtocol : ProtocolBase
259    {
260        private WorkspaceModel workspaceModel;
261        private ExecutionEngine executionEngine;
262
263        /// <summary>
264        /// Create a new protocol. Each protocol requires a scheduler which provides
265        /// a thread for execution.
266        /// </summary>
267        /// <param name="scheduler"></param>
268        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
269            : base(scheduler)
270        {
271            this.workspaceModel = workspaceModel;
272            this.executionEngine = executionEngine;
273        }
274
275        /// <summary>
276        /// The main function of the protocol
277        /// </summary>
278        /// <param name="stateMachine"></param>
279        /// <returns></returns>
280        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
281        {
282            while (this.executionEngine.IsRunning)
283            {
284                yield return Timeout(1000, HandleBenchmark);
285            }
286        }
287
288        /// <summary>
289        /// Handler function for a message.
290        /// This handler must not block, because it executes inside the thread of the scheduler.
291        /// </summary>
292        /// <param name="msg"></param>
293        private void HandleBenchmark()
294        {
295            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
296            this.executionEngine.ExecutedPluginsCounter = 0;
297        }
298
299    }
300
301    /// <summary>
302    /// A Protocol for a PluginModel
303    /// </summary>
304    public class PluginProtocol : ProtocolBase
305    {
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, this.HandleExecute);               
331            }
332        }
333
334        /// <summary>
335        /// Call the execution function of the wrapped IPlugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340           
341
342            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
343            {
344                return;
345            }
346
347            foreach (ConnectorModel connectorModel in msg.PluginModel.InputConnectors)
348            {
349                if (!connectorModel.IControl && (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
350                {
351                    return;
352                }
353            }
354
355            foreach (ConnectorModel connectorModel in msg.PluginModel.OutputConnectors)
356            {
357                if (!connectorModel.IControl)
358                {
359                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
360                    {
361                        if (connectionModel.To.HasData)
362                        {
363                            return;
364                        }
365                    }
366                }
367            }
368
369            //Fill the plugins Inputs with data
370            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
371            {
372                try
373                {
374                    if (connectorModel.HasData)
375                    {
376                        if (connectorModel.IsDynamic)
377                        {
378                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
379                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
380                        }
381                        else
382                        {
383                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
384                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
385                        }
386                    }
387                }
388                catch (Exception ex)
389                {
390                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
391                    this.PluginModel.State = PluginModelState.Error;
392                    this.PluginModel.GuiNeedsUpdate = true;
393                    return;
394                }
395            }
396           
397            msg.PluginModel.Plugin.Execute();
398           
399            if (this.executionEngine.BenchmarkPlugins)
400            {
401                this.executionEngine.ExecutedPluginsCounter++;
402            }
403
404            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
405            {
406                if (connectorModel.HasData)
407                {
408                    connectorModel.HasData = false;
409                    connectorModel.Data = null;
410                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
411                    {
412                        connectionModel.Active = false;
413                        connectorModel.GuiNeedsUpdate = true;
414                    }
415                }
416            }
417           
418           
419
420            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
421            {
422                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
423                {
424                    if (!connectionModel.From.PluginModel.Startable || (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
425                    {
426                        MessageExecution message_exec = new MessageExecution();
427                        message_exec.PluginModel = connectionModel.From.PluginModel;
428                        connectionModel.From.PluginModel.PluginProtocol.BroadcastMessage(message_exec);
429                    }
430                }
431            }
432           
433        }
434     
435    }
436
437    /// <summary>
438    /// Gears4Net Scheduler. The scheduler only runs protocols which do not have a waiting
439    /// plugin on the protocol plugins outputs
440    /// </summary>
441    public class WorkspaceManagerScheduler : Scheduler
442    {
443        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
444        private bool shutdown = false;
445        private System.Threading.Thread thread;
446        private Context currentContext;
447
448                public WorkspaceManagerScheduler() : this(String.Empty)
449                {
450
451                }
452
453        public WorkspaceManagerScheduler(string name)
454            : base()
455        {
456            this.currentContext = Thread.CurrentContext;
457
458            thread = new System.Threading.Thread(this.Start);
459            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
460                        thread.Name = name;
461           
462        }
463
464        public void startScheduler()
465        {
466            thread.Start();
467        }
468
469        private void Start()
470        {
471            if (this.currentContext != Thread.CurrentContext)
472                this.currentContext.DoCallBack(Start);
473
474            // Loop forever
475            while (true)
476            {
477                this.wakeup.WaitOne();
478
479                // Loop while there are more protocols waiting
480                while (true)
481                {
482                    // Should the scheduler stop?
483                    if (this.shutdown)
484                        return;
485                   
486                    ProtocolBase protocol = null;
487                    lock (this)
488                    {
489                        // No more protocols? -> Wait
490                        if (this.waitingProtocols.Count == 0)
491                            break;
492                    }
493                    protocol = this.waitingProtocols.Dequeue();                                           
494                    ProtocolStatus status = protocol.Run();
495
496                    lock (this)
497                    {
498                        switch (status)
499                        {
500                            case ProtocolStatus.Created:
501                                System.Diagnostics.Debug.Assert(false);
502                                break;
503                            case ProtocolStatus.Ready:
504                                this.waitingProtocols.Enqueue(protocol);
505                                break;
506                            case ProtocolStatus.Waiting:
507                                break;
508                            case ProtocolStatus.Terminated:
509                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
510                                this.RemoveProtocol(protocol);
511                                break;
512                        }
513                    }
514                }
515            }
516        }
517
518        /// <summary>
519        /// Removes a protocol from the internal queue
520        /// </summary>
521        /// <param name="protocol"></param>
522        public override void RemoveProtocol(ProtocolBase protocol)
523        {
524            lock (this)
525            {
526                this.protocols.Remove(protocol);
527                if (this.protocols.Count == 0)
528                    this.Shutdown();
529            }
530        }
531
532        /// <summary>
533        /// Adds a protocol to the internal queue
534        /// </summary>
535        /// <param name="protocol"></param>
536        public override void AddProtocol(ProtocolBase protocol)
537        {
538            lock (this)
539            {
540                this.protocols.Add(protocol);
541            }
542        }
543
544        /// <summary>
545        /// Wakeup this scheduler
546        /// </summary>
547        /// <param name="protocol"></param>
548        public override void Wakeup(ProtocolBase protocol)
549        {
550            lock (this)
551            {
552                if (!this.waitingProtocols.Contains(protocol))
553                    this.waitingProtocols.Enqueue(protocol);
554                this.wakeup.Set();
555            }
556        }
557
558        /// <summary>
559        /// Terminates the scheduler
560        /// </summary>
561        public override void Shutdown()
562        {
563            this.shutdown = true;
564            this.wakeup.Set();
565        }
566    }
567}
Note: See TracBrowser for help on using the repository browser.