source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1777

Last change on this file since 1777 was 1777, checked in by kopal, 11 years ago

some ExecutionEngine changes

File size: 20.9 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            if (!IsRunning)
75            {
76                IsRunning = true;
77                this.workspaceModel = workspaceModel;
78                int amountSchedulers = System.Environment.ProcessorCount * 2;
79 
80                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
81                //We do this, because measurements showed that we get the best performance if we
82                //use this amount of schedulers
83                schedulers = new Scheduler[amountSchedulers];
84                for (int i = 0; i < amountSchedulers; i++)
85                {
86                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
87                }
88               
89                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
90                workspaceModel.resetStates();
91
92                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
93                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
94                schedulers[0].AddProtocol(updateGuiProtocol);
95                updateGuiProtocol.Start();
96
97                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
98                if (this.BenchmarkPlugins)
99                {
100                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
101                    schedulers[0].AddProtocol(benchmarkProtocol);
102                    benchmarkProtocol.Start();
103                }
104
105                //Here we create for each PluginModel an own PluginProtocol
106                //By using round-robin we give each protocol to another scheduler to gain
107                //a good average load balancing of the schedulers
108                //we also initalize each plugin
109                int counter=0;
110                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
111                {
112                    pluginModel.Plugin.PreExecution();
113                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
114                    pluginModel.PluginProtocol = pluginProtocol;
115                    schedulers[counter].AddProtocol(pluginProtocol);
116                   
117                    pluginProtocol.Start();
118                    counter = (counter + 1) % (amountSchedulers);
119
120                    if (pluginModel.Startable)
121                    {
122                        MessageExecution msg = new MessageExecution();
123                        msg.PluginModel = pluginModel;
124                        pluginProtocol.BroadcastMessageReliably(msg);
125                    }
126                }
127
128                foreach (Scheduler scheduler in schedulers)
129                {
130                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
131                }
132            }
133        }     
134     
135        /// <summary>
136        /// Stop the execution process:
137        /// calls shutdown on all schedulers + calls stop() on each plugin
138        /// </summary>
139        public void Stop()
140        {
141            //First stop alle plugins
142            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
143            {
144                pluginModel.Plugin.Stop();
145                pluginModel.Plugin.PostExecution();
146            }           
147
148            IsRunning = false;
149            //Secondly stop all Gears4Net Schedulers
150            foreach (Scheduler scheduler in schedulers)
151            {
152                scheduler.Shutdown();
153            }
154             
155        }
156
157        /// <summary>
158        /// Pause the execution
159        /// </summary>
160        public void Pause()
161        {
162            //not implemented yet
163        }
164
165        /// <summary>
166        /// Use the logger of the WorkspaceManagerEditor
167        /// </summary>
168        /// <param name="message"></param>
169        /// <param name="level"></param>
170        public void GuiLogMessage(string message, NotificationLevel level)
171        {           
172            WorkspaceManagerEditor.GuiLogMessage(message, level);
173        }           
174    }
175 
176    /// <summary>
177    /// Message send to scheduler for a Plugin to trigger the Execution
178    /// </summary>
179    public class MessageExecution : MessageBase
180    {
181        public PluginModel PluginModel;
182    }
183
184    /// <summary>
185    /// A Protocol for updating the GUI in time intervals
186    /// </summary>
187    public class UpdateGuiProtocol : ProtocolBase
188    {
189        private WorkspaceModel workspaceModel;
190        private ExecutionEngine executionEngine;     
191
192        /// <summary>
193        /// Create a new protocol. Each protocol requires a scheduler which provides
194        /// a thread for execution.
195        /// </summary>
196        /// <param name="scheduler"></param>
197        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
198            : base(scheduler)
199        {
200            this.workspaceModel = workspaceModel;
201            this.executionEngine = executionEngine;           
202        }
203
204        /// <summary>
205        /// The main function of the protocol
206        /// </summary>
207        /// <param name="stateMachine"></param>
208        /// <returns></returns>
209        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
210        {
211            while (this.executionEngine.IsRunning)
212            {
213                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
214            }
215        }
216
217        /// <summary>
218        /// Handler function for a message.
219        /// This handler must not block, because it executes inside the thread of the scheduler.
220        /// </summary>
221        /// <param name="msg"></param>
222        private void HandleUpdateGui()
223        {
224            //Get the gui Thread
225            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
226            {
227                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
228                {
229                    if (pluginModel.GuiNeedsUpdate)
230                    {
231                        pluginModel.GuiNeedsUpdate = false;
232                        pluginModel.paint();
233                        if (pluginModel.UpdateableView != null)
234                        {
235                            pluginModel.UpdateableView.update();
236                        }
237                    }
238                }
239                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
240                {
241                    if (connectionModel.GuiNeedsUpdate)
242                    {
243                        if (connectionModel.UpdateableView != null)
244                        {
245                            connectionModel.UpdateableView.update();
246                        }
247                    }
248                }
249            }
250            , null);
251        }
252    }
253   
254    /// <summary>
255    /// A Protocol for benchmarking
256    /// </summary>
257    public class BenchmarkProtocol : ProtocolBase
258    {
259        private WorkspaceModel workspaceModel;
260        private ExecutionEngine executionEngine;
261
262        /// <summary>
263        /// Create a new protocol. Each protocol requires a scheduler which provides
264        /// a thread for execution.
265        /// </summary>
266        /// <param name="scheduler"></param>
267        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
268            : base(scheduler)
269        {
270            this.workspaceModel = workspaceModel;
271            this.executionEngine = executionEngine;
272        }
273
274        /// <summary>
275        /// The main function of the protocol
276        /// </summary>
277        /// <param name="stateMachine"></param>
278        /// <returns></returns>
279        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
280        {
281            while (this.executionEngine.IsRunning)
282            {
283                yield return Timeout(1000, HandleBenchmark);
284            }
285        }
286
287        /// <summary>
288        /// Handler function for a message.
289        /// This handler must not block, because it executes inside the thread of the scheduler.
290        /// </summary>
291        /// <param name="msg"></param>
292        private void HandleBenchmark()
293        {
294            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
295            this.executionEngine.ExecutedPluginsCounter = 0;
296        }
297
298    }
299
300    /// <summary>
301    /// A Protocol for a PluginModel
302    /// </summary>
303    public class PluginProtocol : ProtocolBase
304    {
305       
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, this.HandleExecute);               
331            }
332        }
333
334        /// <summary>
335        /// Call the execution function of the wrapped IPlugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340           
341            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
342            {
343                return;
344            }
345
346            //Check if all necessary inputs are set
347            foreach (ConnectorModel connectorModel in msg.PluginModel.InputConnectors)
348            {
349                if (!connectorModel.IControl && 
350                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
351                    connectorModel.Data==null))
352                {
353                    return;
354                }
355            }
356
357            //Check if all outputs are free
358            foreach (ConnectorModel connectorModel in msg.PluginModel.OutputConnectors)
359            {
360                if (!connectorModel.IControl)
361                {
362                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
363                    {
364                        if (connectionModel.To.HasData)
365                        {
366                            return;
367                        }
368                    }
369                }
370            }
371
372            //Fill the plugins inputs with data
373            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
374            {
375                try
376                {
377                    if (connectorModel.HasData && connectorModel.Data.value != null)
378                    {
379                        if (connectorModel.IsDynamic)
380                        {
381                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
382                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
383                        }
384                        else
385                        {
386                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
387                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
388                        }
389                    }
390                }
391                catch (Exception ex)
392                {
393                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
394                    this.PluginModel.State = PluginModelState.Error;
395                    this.PluginModel.GuiNeedsUpdate = true;
396                    return;
397                }
398            }
399           
400            msg.PluginModel.Plugin.Execute();
401           
402            if (this.executionEngine.BenchmarkPlugins)
403            {
404                this.executionEngine.ExecutedPluginsCounter++;
405            }
406
407            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
408            {
409                if (connectorModel.HasData)
410                {                   
411                    connectorModel.Data = null;
412                    connectorModel.HasData = false;
413                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
414                    {
415                        connectionModel.Active = false;
416                        connectorModel.GuiNeedsUpdate = true;
417                    }
418                }
419            }
420           
421            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
422            {
423                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
424                {
425                    if (!connectionModel.From.PluginModel.Startable || 
426                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
427                    {
428                        MessageExecution message_exec = new MessageExecution();
429                        message_exec.PluginModel = connectionModel.From.PluginModel;
430                        connectionModel.From.PluginModel.PluginProtocol.BroadcastMessage(message_exec);
431                    }
432                }
433            }           
434        }
435    }
436
437    /// <summary>
438    /// Gears4Net Scheduler. The scheduler only runs protocols which do not have a waiting
439    /// plugin on the protocol plugins outputs
440    /// </summary>
441    public class WorkspaceManagerScheduler : Scheduler
442    {
443        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
444        private bool shutdown = false;
445        private System.Threading.Thread thread;
446        private Context currentContext;
447
448                public WorkspaceManagerScheduler() : this(String.Empty)
449                {
450
451                }
452
453        public WorkspaceManagerScheduler(string name)
454            : base()
455        {
456            this.currentContext = Thread.CurrentContext;
457
458            thread = new System.Threading.Thread(this.Start);
459            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
460                        thread.Name = name;
461           
462        }
463
464        public void startScheduler()
465        {
466            thread.Start();
467        }
468
469        private void Start()
470        {
471            if (this.currentContext != Thread.CurrentContext)
472                this.currentContext.DoCallBack(Start);
473
474            // Loop forever
475            while (true)
476            {
477                this.wakeup.WaitOne();
478
479                // Loop while there are more protocols waiting
480                while (true)
481                {
482                    // Should the scheduler stop?
483                    if (this.shutdown)
484                        return;
485                   
486                    ProtocolBase protocol = null;
487                    lock (this)
488                    {
489                        // No more protocols? -> Wait
490                        if (this.waitingProtocols.Count == 0)
491                            break;
492                    }
493                    protocol = this.waitingProtocols.Dequeue();                                           
494                    ProtocolStatus status = protocol.Run();
495
496                    lock (this)
497                    {
498                        switch (status)
499                        {
500                            case ProtocolStatus.Created:
501                                System.Diagnostics.Debug.Assert(false);
502                                break;
503                            case ProtocolStatus.Ready:
504                                this.waitingProtocols.Enqueue(protocol);
505                                break;
506                            case ProtocolStatus.Waiting:
507                                break;
508                            case ProtocolStatus.Terminated:
509                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
510                                this.RemoveProtocol(protocol);
511                                break;
512                        }
513                    }
514                }
515            }
516        }
517
518        /// <summary>
519        /// Removes a protocol from the internal queue
520        /// </summary>
521        /// <param name="protocol"></param>
522        public override void RemoveProtocol(ProtocolBase protocol)
523        {
524            lock (this)
525            {
526                this.protocols.Remove(protocol);
527                if (this.protocols.Count == 0)
528                    this.Shutdown();
529            }
530        }
531
532        /// <summary>
533        /// Adds a protocol to the internal queue
534        /// </summary>
535        /// <param name="protocol"></param>
536        public override void AddProtocol(ProtocolBase protocol)
537        {
538            lock (this)
539            {
540                this.protocols.Add(protocol);
541            }
542        }
543
544        /// <summary>
545        /// Wakeup this scheduler
546        /// </summary>
547        /// <param name="protocol"></param>
548        public override void Wakeup(ProtocolBase protocol)
549        {
550            lock (this)
551            {
552                if (!this.waitingProtocols.Contains(protocol))
553                    this.waitingProtocols.Enqueue(protocol);
554                this.wakeup.Set();
555            }
556        }
557
558        /// <summary>
559        /// Terminates the scheduler
560        /// </summary>
561        public override void Shutdown()
562        {
563            this.shutdown = true;
564            this.wakeup.Set();
565        }
566    }
567}
Note: See TracBrowser for help on using the repository browser.