source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1769

Last change on this file since 1769 was 1769, checked in by kopal, 11 years ago
  • added RepeatStart Property to PluginModel
  • some optimizations on executionEngine
File size: 19.5 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            this.workspaceModel = workspaceModel;
75
76            if (!IsRunning)
77            {
78                IsRunning = true;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80               
81                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
82                //We do this, because measurements showed that we get the best performance if we
83                //use this amount of schedulers
84                schedulers = new Scheduler[amountSchedulers];
85                for (int i = 0; i < amountSchedulers; i++)
86                {
87                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
88                }
89               
90                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
91                workspaceModel.resetStates();
92
93                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
94                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
95                schedulers[0].AddProtocol(updateGuiProtocol);
96                updateGuiProtocol.Start();
97
98                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
99                if (this.BenchmarkPlugins)
100                {
101                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
102                    schedulers[0].AddProtocol(benchmarkProtocol);
103                    benchmarkProtocol.Start();
104                }
105
106                //Here we create for each PluginModel an own PluginProtocol
107                //By using round-robin we give each protocol to another scheduler to gain
108                //a good average load balancing of the schedulers
109                //we also initalize each plugin
110                int counter=0;
111                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
112                {
113                    pluginModel.Plugin.PreExecution();
114                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
115                    pluginModel.PluginProtocol = pluginProtocol;
116                    schedulers[counter].AddProtocol(pluginProtocol);
117                   
118                    pluginProtocol.Start();
119                    counter = (counter + 1) % (amountSchedulers);
120
121                    if (pluginModel.Startable)
122                    {
123                        MessageExecution msg = new MessageExecution();
124                        msg.PluginModel = pluginModel;
125                        pluginProtocol.BroadcastMessageReliably(msg);
126                    }
127                }
128
129                foreach (Scheduler scheduler in schedulers)
130                {
131                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
132                }
133            }
134        }     
135     
136        /// <summary>
137        /// Stop the execution process:
138        /// calls shutdown on all schedulers + calls stop() on each plugin
139        /// </summary>
140        public void Stop()
141        {
142            //First stop alle plugins
143            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
144            {
145                pluginModel.Plugin.Stop();
146                pluginModel.Plugin.PostExecution();
147            }           
148
149            IsRunning = false;
150            //Secondly stop all Gears4Net Schedulers
151            foreach (Scheduler scheduler in schedulers)
152            {
153                scheduler.Shutdown();
154            }
155             
156        }
157
158        /// <summary>
159        /// Pause the execution
160        /// </summary>
161        public void Pause()
162        {
163            //not implemented yet
164        }
165
166        /// <summary>
167        /// Use the logger of the WorkspaceManagerEditor
168        /// </summary>
169        /// <param name="message"></param>
170        /// <param name="level"></param>
171        public void GuiLogMessage(string message, NotificationLevel level)
172        {           
173            WorkspaceManagerEditor.GuiLogMessage(message, level);
174        }           
175    }
176 
177    /// <summary>
178    /// Message send to scheduler for a Plugin to trigger the Execution
179    /// </summary>
180    public class MessageExecution : MessageBase
181    {
182        public PluginModel PluginModel;
183    }
184
185    /// <summary>
186    /// A Protocol for updating the GUI in time intervals
187    /// </summary>
188    public class UpdateGuiProtocol : ProtocolBase
189    {
190        private WorkspaceModel workspaceModel;
191        private ExecutionEngine executionEngine;     
192
193        /// <summary>
194        /// Create a new protocol. Each protocol requires a scheduler which provides
195        /// a thread for execution.
196        /// </summary>
197        /// <param name="scheduler"></param>
198        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
199            : base(scheduler)
200        {
201            this.workspaceModel = workspaceModel;
202            this.executionEngine = executionEngine;           
203        }
204
205        /// <summary>
206        /// The main function of the protocol
207        /// </summary>
208        /// <param name="stateMachine"></param>
209        /// <returns></returns>
210        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
211        {
212            while (this.executionEngine.IsRunning)
213            {
214                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
215            }
216        }
217
218        /// <summary>
219        /// Handler function for a message.
220        /// This handler must not block, because it executes inside the thread of the scheduler.
221        /// </summary>
222        /// <param name="msg"></param>
223        private void HandleUpdateGui()
224        {
225            //Get the gui Thread
226            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
227            {
228                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
229                {
230                    if (pluginModel.GuiNeedsUpdate)
231                    {
232                        pluginModel.GuiNeedsUpdate = false;
233                        pluginModel.paint();
234                        if (pluginModel.UpdateableView != null)
235                        {
236                            pluginModel.UpdateableView.update();
237                        }
238                    }
239                }
240                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
241                {
242                    if (connectionModel.GuiNeedsUpdate)
243                    {
244                        if (connectionModel.UpdateableView != null)
245                        {
246                            connectionModel.UpdateableView.update();
247                        }
248                    }
249                }
250            }
251            , null);
252        }
253    }
254   
255    /// <summary>
256    /// A Protocol for benchmarking
257    /// </summary>
258    public class BenchmarkProtocol : ProtocolBase
259    {
260        private WorkspaceModel workspaceModel;
261        private ExecutionEngine executionEngine;
262
263        /// <summary>
264        /// Create a new protocol. Each protocol requires a scheduler which provides
265        /// a thread for execution.
266        /// </summary>
267        /// <param name="scheduler"></param>
268        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
269            : base(scheduler)
270        {
271            this.workspaceModel = workspaceModel;
272            this.executionEngine = executionEngine;
273        }
274
275        /// <summary>
276        /// The main function of the protocol
277        /// </summary>
278        /// <param name="stateMachine"></param>
279        /// <returns></returns>
280        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
281        {
282            while (this.executionEngine.IsRunning)
283            {
284                yield return Timeout(1000, HandleBenchmark);
285            }
286        }
287
288        /// <summary>
289        /// Handler function for a message.
290        /// This handler must not block, because it executes inside the thread of the scheduler.
291        /// </summary>
292        /// <param name="msg"></param>
293        private void HandleBenchmark()
294        {
295            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
296            this.executionEngine.ExecutedPluginsCounter = 0;
297        }
298
299    }
300
301    /// <summary>
302    /// A Protocol for a PluginModel
303    /// </summary>
304    public class PluginProtocol : ProtocolBase
305    {
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, this.HandleExecute);               
331            }
332        }
333
334        /// <summary>
335        /// Call the execution function of the wrapped IPlugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340           
341            //executionEngine.GuiLogMessage("HandleExecute for \"" + msg.PluginModel.Name + "\"", NotificationLevel.Debug);
342            //Fill the plugins Inputs with data
343            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
344            {
345                if (connectorModel.HasData)
346                {
347                    try
348                    {
349                        if (connectorModel.IsDynamic)
350                        {
351                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
352                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
353                        }
354                        else
355                        {
356                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
357                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
358                        }
359
360                    }
361                    catch (Exception ex)
362                    {
363                        this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
364                        this.PluginModel.State = PluginModelState.Error;
365                        this.PluginModel.GuiNeedsUpdate = true;
366                        return;
367                    }
368                }
369            }
370
371            msg.PluginModel.Plugin.Execute();
372
373            if (this.executionEngine.BenchmarkPlugins)
374            {
375                this.executionEngine.ExecutedPluginsCounter++;
376            }
377
378            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
379            {
380                if (connectorModel.HasData)
381                {
382
383                    connectorModel.HasData = false;
384                    connectorModel.Data = null;
385                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
386                    {
387                        connectionModel.Active = false;
388                        connectorModel.GuiNeedsUpdate = true;
389
390                        if (!connectionModel.From.PluginModel.Startable || (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
391                        {
392                            connectionModel.From.PluginModel.checkExecutable(connectionModel.From.PluginModel.PluginProtocol);
393                        }
394                    }
395                }
396            }             
397        }
398     
399    }
400
401    /// <summary>
402    /// Gears4Net Scheduler. The scheduler only runs protocols which do not have a waiting
403    /// plugin on the protocol plugins outputs
404    /// </summary>
405    public class WorkspaceManagerScheduler : Scheduler
406    {
407        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
408        private bool shutdown = false;
409        private System.Threading.Thread thread;
410        private Context currentContext;
411
412                public WorkspaceManagerScheduler() : this(String.Empty)
413                {
414
415                }
416
417        public WorkspaceManagerScheduler(string name)
418            : base()
419        {
420            this.currentContext = Thread.CurrentContext;
421
422            thread = new System.Threading.Thread(this.Start);
423            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
424                        thread.Name = name;
425           
426        }
427
428        public void startScheduler()
429        {
430            thread.Start();
431        }
432
433        private void Start()
434        {
435            if (this.currentContext != Thread.CurrentContext)
436                this.currentContext.DoCallBack(Start);
437
438            // Loop forever
439            while (true)
440            {
441                this.wakeup.WaitOne();
442
443                // Loop while there are more protocols waiting
444                while (true)
445                {
446                    // Should the scheduler stop?
447                    if (this.shutdown)
448                        return;
449                   
450                    ProtocolBase protocol = null;
451                    lock (this)
452                    {
453                        // No more protocols? -> Wait
454                        if (this.waitingProtocols.Count == 0)
455                            break;
456                    }
457                    protocol = this.waitingProtocols.Dequeue();                                           
458                    ProtocolStatus status = protocol.Run();
459
460                    lock (this)
461                    {
462                        switch (status)
463                        {
464                            case ProtocolStatus.Created:
465                                System.Diagnostics.Debug.Assert(false);
466                                break;
467                            case ProtocolStatus.Ready:
468                                this.waitingProtocols.Enqueue(protocol);
469                                break;
470                            case ProtocolStatus.Waiting:
471                                break;
472                            case ProtocolStatus.Terminated:
473                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
474                                this.RemoveProtocol(protocol);
475                                break;
476                        }
477                    }
478                }
479            }
480        }
481
482        /// <summary>
483        /// Removes a protocol from the internal queue
484        /// </summary>
485        /// <param name="protocol"></param>
486        public override void RemoveProtocol(ProtocolBase protocol)
487        {
488            lock (this)
489            {
490                this.protocols.Remove(protocol);
491                if (this.protocols.Count == 0)
492                    this.Shutdown();
493            }
494        }
495
496        /// <summary>
497        /// Adds a protocol to the internal queue
498        /// </summary>
499        /// <param name="protocol"></param>
500        public override void AddProtocol(ProtocolBase protocol)
501        {
502            lock (this)
503            {
504                this.protocols.Add(protocol);
505            }
506        }
507
508        /// <summary>
509        /// Wakeup this scheduler
510        /// </summary>
511        /// <param name="protocol"></param>
512        public override void Wakeup(ProtocolBase protocol)
513        {
514            lock (this)
515            {
516                if (!this.waitingProtocols.Contains(protocol))
517                    this.waitingProtocols.Enqueue(protocol);
518                this.wakeup.Set();
519            }
520        }
521
522        /// <summary>
523        /// Terminates the scheduler
524        /// </summary>
525        public override void Shutdown()
526        {
527            this.shutdown = true;
528            this.wakeup.Set();
529        }
530    }
531}
Note: See TracBrowser for help on using the repository browser.