source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1770

Last change on this file since 1770 was 1770, checked in by kopal, 11 years ago
  • some executionEngine changes
File size: 19.5 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            this.workspaceModel = workspaceModel;
75
76            if (!IsRunning)
77            {
78                IsRunning = true;
79                int amountSchedulers = System.Environment.ProcessorCount * 2;
80                //amountSchedulers = 1;
81
82                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
83                //We do this, because measurements showed that we get the best performance if we
84                //use this amount of schedulers
85                schedulers = new Scheduler[amountSchedulers];
86                for (int i = 0; i < amountSchedulers; i++)
87                {
88                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
89                }
90               
91                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
92                workspaceModel.resetStates();
93
94                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
95                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
96                schedulers[0].AddProtocol(updateGuiProtocol);
97                updateGuiProtocol.Start();
98
99                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
100                if (this.BenchmarkPlugins)
101                {
102                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
103                    schedulers[0].AddProtocol(benchmarkProtocol);
104                    benchmarkProtocol.Start();
105                }
106
107                //Here we create for each PluginModel an own PluginProtocol
108                //By using round-robin we give each protocol to another scheduler to gain
109                //a good average load balancing of the schedulers
110                //we also initalize each plugin
111                int counter=0;
112                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
113                {
114                    pluginModel.Plugin.PreExecution();
115                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
116                    pluginModel.PluginProtocol = pluginProtocol;
117                    schedulers[counter].AddProtocol(pluginProtocol);
118                   
119                    pluginProtocol.Start();
120                    counter = (counter + 1) % (amountSchedulers);
121
122                    if (pluginModel.Startable)
123                    {
124                        MessageExecution msg = new MessageExecution();
125                        msg.PluginModel = pluginModel;
126                        pluginProtocol.BroadcastMessageReliably(msg);
127                    }
128                }
129
130                foreach (Scheduler scheduler in schedulers)
131                {
132                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
133                }
134            }
135        }     
136     
137        /// <summary>
138        /// Stop the execution process:
139        /// calls shutdown on all schedulers + calls stop() on each plugin
140        /// </summary>
141        public void Stop()
142        {
143            //First stop alle plugins
144            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
145            {
146                pluginModel.Plugin.Stop();
147                pluginModel.Plugin.PostExecution();
148            }           
149
150            IsRunning = false;
151            //Secondly stop all Gears4Net Schedulers
152            foreach (Scheduler scheduler in schedulers)
153            {
154                scheduler.Shutdown();
155            }
156             
157        }
158
159        /// <summary>
160        /// Pause the execution
161        /// </summary>
162        public void Pause()
163        {
164            //not implemented yet
165        }
166
167        /// <summary>
168        /// Use the logger of the WorkspaceManagerEditor
169        /// </summary>
170        /// <param name="message"></param>
171        /// <param name="level"></param>
172        public void GuiLogMessage(string message, NotificationLevel level)
173        {           
174            WorkspaceManagerEditor.GuiLogMessage(message, level);
175        }           
176    }
177 
178    /// <summary>
179    /// Message send to scheduler for a Plugin to trigger the Execution
180    /// </summary>
181    public class MessageExecution : MessageBase
182    {
183        public PluginModel PluginModel;
184    }
185
186    /// <summary>
187    /// A Protocol for updating the GUI in time intervals
188    /// </summary>
189    public class UpdateGuiProtocol : ProtocolBase
190    {
191        private WorkspaceModel workspaceModel;
192        private ExecutionEngine executionEngine;     
193
194        /// <summary>
195        /// Create a new protocol. Each protocol requires a scheduler which provides
196        /// a thread for execution.
197        /// </summary>
198        /// <param name="scheduler"></param>
199        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
200            : base(scheduler)
201        {
202            this.workspaceModel = workspaceModel;
203            this.executionEngine = executionEngine;           
204        }
205
206        /// <summary>
207        /// The main function of the protocol
208        /// </summary>
209        /// <param name="stateMachine"></param>
210        /// <returns></returns>
211        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
212        {
213            while (this.executionEngine.IsRunning)
214            {
215                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
216            }
217        }
218
219        /// <summary>
220        /// Handler function for a message.
221        /// This handler must not block, because it executes inside the thread of the scheduler.
222        /// </summary>
223        /// <param name="msg"></param>
224        private void HandleUpdateGui()
225        {
226            //Get the gui Thread
227            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
228            {
229                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
230                {
231                    if (pluginModel.GuiNeedsUpdate)
232                    {
233                        pluginModel.GuiNeedsUpdate = false;
234                        pluginModel.paint();
235                        if (pluginModel.UpdateableView != null)
236                        {
237                            pluginModel.UpdateableView.update();
238                        }
239                    }
240                }
241                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
242                {
243                    if (connectionModel.GuiNeedsUpdate)
244                    {
245                        if (connectionModel.UpdateableView != null)
246                        {
247                            connectionModel.UpdateableView.update();
248                        }
249                    }
250                }
251            }
252            , null);
253        }
254    }
255   
256    /// <summary>
257    /// A Protocol for benchmarking
258    /// </summary>
259    public class BenchmarkProtocol : ProtocolBase
260    {
261        private WorkspaceModel workspaceModel;
262        private ExecutionEngine executionEngine;
263
264        /// <summary>
265        /// Create a new protocol. Each protocol requires a scheduler which provides
266        /// a thread for execution.
267        /// </summary>
268        /// <param name="scheduler"></param>
269        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
270            : base(scheduler)
271        {
272            this.workspaceModel = workspaceModel;
273            this.executionEngine = executionEngine;
274        }
275
276        /// <summary>
277        /// The main function of the protocol
278        /// </summary>
279        /// <param name="stateMachine"></param>
280        /// <returns></returns>
281        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
282        {
283            while (this.executionEngine.IsRunning)
284            {
285                yield return Timeout(1000, HandleBenchmark);
286            }
287        }
288
289        /// <summary>
290        /// Handler function for a message.
291        /// This handler must not block, because it executes inside the thread of the scheduler.
292        /// </summary>
293        /// <param name="msg"></param>
294        private void HandleBenchmark()
295        {
296            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
297            this.executionEngine.ExecutedPluginsCounter = 0;
298        }
299
300    }
301
302    /// <summary>
303    /// A Protocol for a PluginModel
304    /// </summary>
305    public class PluginProtocol : ProtocolBase
306    {
307        public PluginModel PluginModel;
308        private ExecutionEngine executionEngine;
309
310        /// <summary>
311        /// Create a new protocol. Each protocol requires a scheduler which provides
312        /// a thread for execution.
313        /// </summary>
314        /// <param name="scheduler"></param>
315        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
316            : base(scheduler)
317        {
318            this.PluginModel = pluginModel;
319            this.executionEngine = executionEngine;
320        }
321
322        /// <summary>
323        /// The main function of the protocol     
324        /// </summary>
325        /// <param name="stateMachine"></param>
326        /// <returns></returns>
327        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
328        {
329            while (this.executionEngine.IsRunning)
330            {
331                yield return Receive<MessageExecution>(null, this.HandleExecute);               
332            }
333        }
334
335        /// <summary>
336        /// Call the execution function of the wrapped IPlugin
337        /// </summary>
338        /// <param name="msg"></param>
339        private void HandleExecute(MessageExecution msg)
340        {
341           
342            //executionEngine.GuiLogMessage("HandleExecute for \"" + msg.PluginModel.Name + "\"", NotificationLevel.Debug);
343            //Fill the plugins Inputs with data
344            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
345            {
346                if (connectorModel.HasData)
347                {
348                    try
349                    {
350                        if (connectorModel.IsDynamic)
351                        {
352                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
353                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
354                        }
355                        else
356                        {
357                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
358                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
359                        }
360
361                    }
362                    catch (Exception ex)
363                    {
364                        this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
365                        this.PluginModel.State = PluginModelState.Error;
366                        this.PluginModel.GuiNeedsUpdate = true;
367                        return;
368                    }
369                }
370            }
371
372            msg.PluginModel.Plugin.Execute();
373
374            if (this.executionEngine.BenchmarkPlugins)
375            {
376                this.executionEngine.ExecutedPluginsCounter++;
377            }
378
379            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
380            {
381                if (connectorModel.HasData)
382                {
383                    connectorModel.HasData = false;
384                    connectorModel.Data = null;
385                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
386                    {
387                        connectionModel.Active = false;
388                        connectorModel.GuiNeedsUpdate = true;
389
390                        if (!connectionModel.From.PluginModel.Startable || (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
391                        {
392                            connectionModel.From.PluginModel.checkExecutable(connectionModel.From.PluginModel.PluginProtocol);
393                        }
394                    }
395                }
396            }             
397        }
398     
399    }
400
401    /// <summary>
402    /// Gears4Net Scheduler. The scheduler only runs protocols which do not have a waiting
403    /// plugin on the protocol plugins outputs
404    /// </summary>
405    public class WorkspaceManagerScheduler : Scheduler
406    {
407        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
408        private bool shutdown = false;
409        private System.Threading.Thread thread;
410        private Context currentContext;
411
412                public WorkspaceManagerScheduler() : this(String.Empty)
413                {
414
415                }
416
417        public WorkspaceManagerScheduler(string name)
418            : base()
419        {
420            this.currentContext = Thread.CurrentContext;
421
422            thread = new System.Threading.Thread(this.Start);
423            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
424                        thread.Name = name;
425           
426        }
427
428        public void startScheduler()
429        {
430            thread.Start();
431        }
432
433        private void Start()
434        {
435            if (this.currentContext != Thread.CurrentContext)
436                this.currentContext.DoCallBack(Start);
437
438            // Loop forever
439            while (true)
440            {
441                this.wakeup.WaitOne();
442
443                // Loop while there are more protocols waiting
444                while (true)
445                {
446                    // Should the scheduler stop?
447                    if (this.shutdown)
448                        return;
449                   
450                    ProtocolBase protocol = null;
451                    lock (this)
452                    {
453                        // No more protocols? -> Wait
454                        if (this.waitingProtocols.Count == 0)
455                            break;
456                    }
457                    protocol = this.waitingProtocols.Dequeue();                                           
458                    ProtocolStatus status = protocol.Run();
459
460                    lock (this)
461                    {
462                        switch (status)
463                        {
464                            case ProtocolStatus.Created:
465                                System.Diagnostics.Debug.Assert(false);
466                                break;
467                            case ProtocolStatus.Ready:
468                                this.waitingProtocols.Enqueue(protocol);
469                                break;
470                            case ProtocolStatus.Waiting:
471                                break;
472                            case ProtocolStatus.Terminated:
473                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
474                                this.RemoveProtocol(protocol);
475                                break;
476                        }
477                    }
478                }
479            }
480        }
481
482        /// <summary>
483        /// Removes a protocol from the internal queue
484        /// </summary>
485        /// <param name="protocol"></param>
486        public override void RemoveProtocol(ProtocolBase protocol)
487        {
488            lock (this)
489            {
490                this.protocols.Remove(protocol);
491                if (this.protocols.Count == 0)
492                    this.Shutdown();
493            }
494        }
495
496        /// <summary>
497        /// Adds a protocol to the internal queue
498        /// </summary>
499        /// <param name="protocol"></param>
500        public override void AddProtocol(ProtocolBase protocol)
501        {
502            lock (this)
503            {
504                this.protocols.Add(protocol);
505            }
506        }
507
508        /// <summary>
509        /// Wakeup this scheduler
510        /// </summary>
511        /// <param name="protocol"></param>
512        public override void Wakeup(ProtocolBase protocol)
513        {
514            lock (this)
515            {
516                if (!this.waitingProtocols.Contains(protocol))
517                    this.waitingProtocols.Enqueue(protocol);
518                this.wakeup.Set();
519            }
520        }
521
522        /// <summary>
523        /// Terminates the scheduler
524        /// </summary>
525        public override void Shutdown()
526        {
527            this.shutdown = true;
528            this.wakeup.Set();
529        }
530    }
531}
Note: See TracBrowser for help on using the repository browser.