source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1759

Last change on this file since 1759 was 1759, checked in by kopal, 11 years ago

optimized executionEngine

File size: 19.4 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            this.workspaceModel = workspaceModel;
75
76            if (!IsRunning)
77            {
78                IsRunning = true;
79
80                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
81                //We do this, because measurements showed that we get the best performance if we
82                //use this amount of schedulers
83                schedulers = new Scheduler[System.Environment.ProcessorCount*2];
84                for(int i=0;i< System.Environment.ProcessorCount*2;i++){
85                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
86                }
87               
88                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
89                workspaceModel.resetStates();
90
91                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
92                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
93                schedulers[0].AddProtocol(updateGuiProtocol);
94                updateGuiProtocol.Start();
95
96                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
97                if (this.BenchmarkPlugins)
98                {
99                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
100                    schedulers[0].AddProtocol(benchmarkProtocol);
101                    benchmarkProtocol.Start();
102                }
103
104                //Here we create for each PluginModel an own PluginProtocol
105                //By using round-robin we give each protocol to another scheduler to gain
106                //a good average load balancing of the schedulers
107                //we also initalize each plugin
108                int counter=0;
109                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
110                {
111                    pluginModel.Plugin.PreExecution();
112                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
113                    pluginModel.PluginProtocol = pluginProtocol;
114                    schedulers[counter].AddProtocol(pluginProtocol);
115                   
116                    pluginProtocol.Start();
117                    counter = (counter + 1) % (System.Environment.ProcessorCount*2);
118
119                    if (pluginModel.Startable)
120                    {
121                        MessageExecution msg = new MessageExecution();
122                        msg.PluginModel = pluginModel;
123                        pluginProtocol.BroadcastMessageReliably(msg);
124                    }
125                }
126
127                foreach (Scheduler scheduler in schedulers)
128                {
129                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
130                }
131            }
132        }     
133     
134        /// <summary>
135        /// Stop the execution process:
136        /// calls shutdown on all schedulers + calls stop() on each plugin
137        /// </summary>
138        public void Stop()
139        {
140            //First stop alle plugins
141            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
142            {
143                pluginModel.Plugin.Stop();
144                pluginModel.Plugin.PostExecution();
145            }           
146
147            IsRunning = false;
148            //Secondly stop all Gears4Net Schedulers
149            foreach (Scheduler scheduler in schedulers)
150            {
151                scheduler.Shutdown();
152            }
153             
154        }
155
156        /// <summary>
157        /// Pause the execution
158        /// </summary>
159        public void Pause()
160        {
161            //not implemented yet
162        }
163
164        /// <summary>
165        /// Use the logger of the WorkspaceManagerEditor
166        /// </summary>
167        /// <param name="message"></param>
168        /// <param name="level"></param>
169        public void GuiLogMessage(string message, NotificationLevel level)
170        {           
171            WorkspaceManagerEditor.GuiLogMessage(message, level);
172        }           
173    }
174 
175    /// <summary>
176    /// Message send to scheduler for a Plugin to trigger the Execution
177    /// </summary>
178    public class MessageExecution : MessageBase
179    {
180        public PluginModel PluginModel;
181    }
182
183    /// <summary>
184    /// A Protocol for updating the GUI in time intervals
185    /// </summary>
186    public class UpdateGuiProtocol : ProtocolBase
187    {
188        private WorkspaceModel workspaceModel;
189        private ExecutionEngine executionEngine;     
190
191        /// <summary>
192        /// Create a new protocol. Each protocol requires a scheduler which provides
193        /// a thread for execution.
194        /// </summary>
195        /// <param name="scheduler"></param>
196        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
197            : base(scheduler)
198        {
199            this.workspaceModel = workspaceModel;
200            this.executionEngine = executionEngine;           
201        }
202
203        /// <summary>
204        /// The main function of the protocol
205        /// </summary>
206        /// <param name="stateMachine"></param>
207        /// <returns></returns>
208        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
209        {
210            while (this.executionEngine.IsRunning)
211            {
212                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
213            }
214        }
215
216        /// <summary>
217        /// Handler function for a message.
218        /// This handler must not block, because it executes inside the thread of the scheduler.
219        /// </summary>
220        /// <param name="msg"></param>
221        private void HandleUpdateGui()
222        {
223            //Get the gui Thread
224            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
225            {
226                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
227                {
228                    if (pluginModel.GuiNeedsUpdate)
229                    {
230                        pluginModel.GuiNeedsUpdate = false;
231                        pluginModel.paint();
232                        if (pluginModel.UpdateableView != null)
233                        {
234                            pluginModel.UpdateableView.update();
235                        }
236                    }
237                }
238                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
239                {
240                    if (connectionModel.GuiNeedsUpdate)
241                    {
242                        if (connectionModel.UpdateableView != null)
243                        {
244                            connectionModel.UpdateableView.update();
245                        }
246                    }
247                }
248            }
249            , null);
250        }
251    }
252   
253    /// <summary>
254    /// A Protocol for benchmarking
255    /// </summary>
256    public class BenchmarkProtocol : ProtocolBase
257    {
258        private WorkspaceModel workspaceModel;
259        private ExecutionEngine executionEngine;
260
261        /// <summary>
262        /// Create a new protocol. Each protocol requires a scheduler which provides
263        /// a thread for execution.
264        /// </summary>
265        /// <param name="scheduler"></param>
266        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
267            : base(scheduler)
268        {
269            this.workspaceModel = workspaceModel;
270            this.executionEngine = executionEngine;
271        }
272
273        /// <summary>
274        /// The main function of the protocol
275        /// </summary>
276        /// <param name="stateMachine"></param>
277        /// <returns></returns>
278        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
279        {
280            while (this.executionEngine.IsRunning)
281            {
282                yield return Timeout(1000, HandleBenchmark);
283            }
284        }
285
286        /// <summary>
287        /// Handler function for a message.
288        /// This handler must not block, because it executes inside the thread of the scheduler.
289        /// </summary>
290        /// <param name="msg"></param>
291        private void HandleBenchmark()
292        {
293            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
294            this.executionEngine.ExecutedPluginsCounter = 0;
295        }
296
297    }
298
299    /// <summary>
300    /// A Protocol for a PluginModel
301    /// </summary>
302    public class PluginProtocol : ProtocolBase
303    {
304        public PluginModel PluginModel;
305        private ExecutionEngine executionEngine;
306
307        /// <summary>
308        /// Create a new protocol. Each protocol requires a scheduler which provides
309        /// a thread for execution.
310        /// </summary>
311        /// <param name="scheduler"></param>
312        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
313            : base(scheduler)
314        {
315            this.PluginModel = pluginModel;
316            this.executionEngine = executionEngine;
317        }
318
319        /// <summary>
320        /// The main function of the protocol     
321        /// </summary>
322        /// <param name="stateMachine"></param>
323        /// <returns></returns>
324        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
325        {
326            while (this.executionEngine.IsRunning)
327            {
328                yield return Receive<MessageExecution>(null, this.HandleExecute);               
329            }
330        }
331
332        /// <summary>
333        /// Call the execution function of the wrapped IPlugin
334        /// </summary>
335        /// <param name="msg"></param>
336        private void HandleExecute(MessageExecution msg)
337        {
338            //executionEngine.GuiLogMessage("HandleExecute for \"" + msg.PluginModel.Name + "\"", NotificationLevel.Debug);
339            //Fill the plugins Inputs with data
340            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
341            {
342                if (connectorModel.HasData)
343                {
344                    try
345                    {
346                        if (connectorModel.IsDynamic)
347                        {
348                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
349                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
350                        }
351                        else
352                        {
353                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
354                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
355                        }
356
357                    }
358                    catch (Exception ex)
359                    {
360                        this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
361                        this.PluginModel.State = PluginModelState.Error;
362                        this.PluginModel.GuiNeedsUpdate = true;
363                        return;
364                    }
365                }
366            }
367           
368            msg.PluginModel.Plugin.Execute();
369           
370            if (this.executionEngine.BenchmarkPlugins)
371            {
372                this.executionEngine.ExecutedPluginsCounter++;
373            }
374
375            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
376            {
377                if (connectorModel.HasData)
378                {
379
380                    connectorModel.HasData = false;
381                    connectorModel.Data = null;
382                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
383                    {
384                        connectionModel.Active = false;
385                        connectorModel.GuiNeedsUpdate = true;
386
387                        if (!connectionModel.From.PluginModel.Startable) 
388                        {
389                            connectionModel.From.PluginModel.checkExecutable(connectionModel.From.PluginModel.PluginProtocol);
390                        }
391                    }                   
392                }
393            }
394           
395        }
396     
397    }
398
399    /// <summary>
400    /// Gears4Net Scheduler. The scheduler only runs protocols which do not have a waiting
401    /// plugin on the protocol plugins outputs
402    /// </summary>
403    public class WorkspaceManagerScheduler : Scheduler
404    {
405        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
406        private bool shutdown = false;
407        private System.Threading.Thread thread;
408        private Context currentContext;
409
410                public WorkspaceManagerScheduler() : this(String.Empty)
411                {
412
413                }
414
415        public WorkspaceManagerScheduler(string name)
416            : base()
417        {
418            this.currentContext = Thread.CurrentContext;
419
420            thread = new System.Threading.Thread(this.Start);
421            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
422                        thread.Name = name;
423           
424        }
425
426        public void startScheduler()
427        {
428            thread.Start();
429        }
430
431        private void Start()
432        {
433            if (this.currentContext != Thread.CurrentContext)
434                this.currentContext.DoCallBack(Start);
435
436            // Loop forever
437            while (true)
438            {
439                this.wakeup.WaitOne();
440
441                // Loop while there are more protocols waiting
442                while (true)
443                {
444                    // Should the scheduler stop?
445                    if (this.shutdown)
446                        return;
447                   
448                    ProtocolBase protocol = null;
449                    lock (this)
450                    {
451                        // No more protocols? -> Wait
452                        if (this.waitingProtocols.Count == 0)
453                            break;
454                    }
455                    protocol = this.waitingProtocols.Dequeue();                                           
456                    ProtocolStatus status = protocol.Run();
457
458                    lock (this)
459                    {
460                        switch (status)
461                        {
462                            case ProtocolStatus.Created:
463                                System.Diagnostics.Debug.Assert(false);
464                                break;
465                            case ProtocolStatus.Ready:
466                                this.waitingProtocols.Enqueue(protocol);
467                                break;
468                            case ProtocolStatus.Waiting:
469                                break;
470                            case ProtocolStatus.Terminated:
471                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
472                                this.RemoveProtocol(protocol);
473                                break;
474                        }
475                    }
476                }
477            }
478        }
479
480        /// <summary>
481        /// Removes a protocol from the internal queue
482        /// </summary>
483        /// <param name="protocol"></param>
484        public override void RemoveProtocol(ProtocolBase protocol)
485        {
486            lock (this)
487            {
488                this.protocols.Remove(protocol);
489                if (this.protocols.Count == 0)
490                    this.Shutdown();
491            }
492        }
493
494        /// <summary>
495        /// Adds a protocol to the internal queue
496        /// </summary>
497        /// <param name="protocol"></param>
498        public override void AddProtocol(ProtocolBase protocol)
499        {
500            lock (this)
501            {
502                this.protocols.Add(protocol);
503            }
504        }
505
506        /// <summary>
507        /// Wakeup this scheduler
508        /// </summary>
509        /// <param name="protocol"></param>
510        public override void Wakeup(ProtocolBase protocol)
511        {
512            lock (this)
513            {
514                if (!this.waitingProtocols.Contains(protocol))
515                    this.waitingProtocols.Enqueue(protocol);
516                this.wakeup.Set();
517            }
518        }
519
520        /// <summary>
521        /// Terminates the scheduler
522        /// </summary>
523        public override void Shutdown()
524        {
525            this.shutdown = true;
526            this.wakeup.Set();
527        }
528    }
529}
Note: See TracBrowser for help on using the repository browser.