source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1713

Last change on this file since 1713 was 1713, checked in by kopal, 11 years ago
  • some execution engine fixes
File size: 20.1 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            this.workspaceModel = workspaceModel;
75
76            if (!IsRunning)
77            {
78                IsRunning = true;
79
80                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
81                //We do this, because measurements showed that we get the best performance if we
82                //use this amount of schedulers
83                schedulers = new Scheduler[System.Environment.ProcessorCount*2];
84                for(int i=0;i< System.Environment.ProcessorCount*2;i++){
85                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
86                }
87               
88                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
89                workspaceModel.resetStates();
90
91                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
92                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
93                schedulers[0].AddProtocol(updateGuiProtocol);
94                updateGuiProtocol.Start();
95
96                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
97                if (this.BenchmarkPlugins)
98                {
99                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
100                    schedulers[0].AddProtocol(benchmarkProtocol);
101                    benchmarkProtocol.Start();
102                }
103
104                //Here we create for each PluginModel an own PluginProtocol
105                //By using round-robin we give each protocol to another scheduler to gain
106                //a good average load balancing of the schedulers
107                //we also initalize each plugin
108                int counter=0;
109                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
110                {
111                    pluginModel.Plugin.PreExecution();
112                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
113                    pluginModel.PluginProtocol = pluginProtocol;
114                    schedulers[counter].AddProtocol(pluginProtocol);
115                   
116                    pluginProtocol.Start();
117                    counter = (counter + 1) % (System.Environment.ProcessorCount*2);
118
119                    if (pluginModel.Startable)
120                    {
121                        MessageExecution msg = new MessageExecution();
122                        msg.PluginModel = pluginModel;
123                        pluginProtocol.BroadcastMessageReliably(msg);
124                    }
125                }
126
127                foreach (Scheduler scheduler in schedulers)
128                {
129                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
130                }
131            }
132        }     
133     
134        /// <summary>
135        /// Stop the execution process:
136        /// calls shutdown on all schedulers + calls stop() on each plugin
137        /// </summary>
138        public void Stop()
139        {
140            //First stop alle plugins
141            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
142            {
143                pluginModel.Plugin.Stop();
144                pluginModel.Plugin.PostExecution();
145            }           
146
147            IsRunning = false;
148            //Secondly stop all Gears4Net Schedulers
149            foreach (Scheduler scheduler in schedulers)
150            {
151                scheduler.Shutdown();
152            }
153             
154        }
155
156        /// <summary>
157        /// Pause the execution
158        /// </summary>
159        public void Pause()
160        {
161            //not implemented yet
162        }
163
164        /// <summary>
165        /// Use the logger of the WorkspaceManagerEditor
166        /// </summary>
167        /// <param name="message"></param>
168        /// <param name="level"></param>
169        public void GuiLogMessage(string message, NotificationLevel level)
170        {           
171            WorkspaceManagerEditor.GuiLogMessage(message, level);
172        }           
173    }
174 
175    /// <summary>
176    /// Message send to scheduler for a Plugin to trigger the Execution
177    /// </summary>
178    public class MessageExecution : MessageBase
179    {
180        public PluginModel PluginModel;
181    }
182
183    /// <summary>
184    /// A Protocol for updating the GUI in time intervals
185    /// </summary>
186    public class UpdateGuiProtocol : ProtocolBase
187    {
188        private WorkspaceModel workspaceModel;
189        private ExecutionEngine executionEngine;     
190
191        /// <summary>
192        /// Create a new protocol. Each protocol requires a scheduler which provides
193        /// a thread for execution.
194        /// </summary>
195        /// <param name="scheduler"></param>
196        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
197            : base(scheduler)
198        {
199            this.workspaceModel = workspaceModel;
200            this.executionEngine = executionEngine;           
201        }
202
203        /// <summary>
204        /// The main function of the protocol
205        /// </summary>
206        /// <param name="stateMachine"></param>
207        /// <returns></returns>
208        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
209        {
210            while (this.executionEngine.IsRunning)
211            {
212                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
213            }
214        }
215
216        /// <summary>
217        /// Handler function for a message.
218        /// This handler must not block, because it executes inside the thread of the scheduler.
219        /// </summary>
220        /// <param name="msg"></param>
221        private void HandleUpdateGui()
222        {
223            //Get the gui Thread
224            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
225            {
226                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
227                {
228                    if (pluginModel.GuiNeedsUpdate)
229                    {
230                        //executionEngine.GuiLogMessage("UpdateGui for \"" + pluginModel.Name + "\"", NotificationLevel.Debug);
231                        pluginModel.GuiNeedsUpdate = false;
232                        pluginModel.paint();
233                        if (pluginModel.UpdateableView != null)
234                        {
235                            pluginModel.UpdateableView.update();
236                        }
237                    }
238                }
239                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
240                {
241                    if (connectionModel.GuiNeedsUpdate)
242                    {
243                        if (connectionModel.UpdateableView != null)
244                        {
245                            connectionModel.UpdateableView.update();
246                        }
247                    }
248                }
249            }
250            , null);
251        }
252    }
253
254   
255    /// <summary>
256    /// A Protocol for benchmarking
257    /// </summary>
258    public class BenchmarkProtocol : ProtocolBase
259    {
260        private WorkspaceModel workspaceModel;
261        private ExecutionEngine executionEngine;
262
263        /// <summary>
264        /// Create a new protocol. Each protocol requires a scheduler which provides
265        /// a thread for execution.
266        /// </summary>
267        /// <param name="scheduler"></param>
268        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
269            : base(scheduler)
270        {
271            this.workspaceModel = workspaceModel;
272            this.executionEngine = executionEngine;
273        }
274
275        /// <summary>
276        /// The main function of the protocol
277        /// </summary>
278        /// <param name="stateMachine"></param>
279        /// <returns></returns>
280        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
281        {
282            while (this.executionEngine.IsRunning)
283            {
284                yield return Timeout(1000, HandleBenchmark);
285            }
286        }
287
288        /// <summary>
289        /// Handler function for a message.
290        /// This handler must not block, because it executes inside the thread of the scheduler.
291        /// </summary>
292        /// <param name="msg"></param>
293        private void HandleBenchmark()
294        {
295            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
296            this.executionEngine.ExecutedPluginsCounter = 0;
297        }
298
299    }
300
301    /// <summary>
302    /// A Protocol for a PluginModel
303    /// </summary>
304    public class PluginProtocol : ProtocolBase
305    {
306        public PluginModel PluginModel;
307        private ExecutionEngine executionEngine;
308
309        /// <summary>
310        /// Create a new protocol. Each protocol requires a scheduler which provides
311        /// a thread for execution.
312        /// </summary>
313        /// <param name="scheduler"></param>
314        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
315            : base(scheduler)
316        {
317            this.PluginModel = pluginModel;
318            this.executionEngine = executionEngine;
319        }
320
321        /// <summary>
322        /// The main function of the protocol     
323        /// </summary>
324        /// <param name="stateMachine"></param>
325        /// <returns></returns>
326        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
327        {
328            while (this.executionEngine.IsRunning)
329            {
330                yield return Receive<MessageExecution>(null, this.HandleExecute);
331                //yield return Parallel(1,new PluginWaitReceiver()) & Receive<MessageExecution>(null, this.HandleExecute);
332                //yield return new PluginWaitReceiver() + Receive<MessageExecution>(null, this.HandleExecute);
333                //yield return Parallel(1,new PluginWaitReceiver()) + Receive<MessageExecution>(null, this.HandleExecute);
334            }
335        }
336
337        /// <summary>
338        /// Call the execution function of the wrapped IPlugin
339        /// </summary>
340        /// <param name="msg"></param>
341        private void HandleExecute(MessageExecution msg)
342        {
343            //executionEngine.GuiLogMessage("HandleExecute for \"" + msg.PluginModel.Name + "\"", NotificationLevel.Debug);
344            //Fill the plugins Inputs with data
345            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
346            {
347                if (connectorModel.HasData)
348                {
349                    try
350                    {
351                        if (connectorModel.IsDynamic)
352                        {
353                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
354                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
355                        }
356                        else
357                        {
358                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
359                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
360                        }
361
362                    }
363                    catch (Exception ex)
364                    {
365                        this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
366                        this.PluginModel.State = PluginModelState.Error;
367                        this.PluginModel.GuiNeedsUpdate = true;
368                        return;
369                    }
370                    finally
371                    {
372                        connectorModel.HasData = false;
373                        connectorModel.Data = null;
374                        foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
375                        {
376                            connectionModel.Active = false;
377                            connectorModel.GuiNeedsUpdate = true;
378                        }
379                    }
380                }
381            }
382           
383            msg.PluginModel.Plugin.Execute();         
384
385            if (this.executionEngine.BenchmarkPlugins)
386            {
387                this.executionEngine.ExecutedPluginsCounter++;                               
388            }
389        }
390     
391    }
392
393    public class WorkspaceManagerScheduler : Scheduler
394    {
395        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
396        private bool shutdown = false;
397        private System.Threading.Thread thread;
398        private Context currentContext;
399
400                public WorkspaceManagerScheduler() : this(String.Empty)
401                {
402
403                }
404
405        public WorkspaceManagerScheduler(string name)
406            : base()
407        {
408            this.currentContext = Thread.CurrentContext;
409
410            thread = new System.Threading.Thread(this.Start);
411            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
412                        thread.Name = name;
413           
414        }
415
416        public void startScheduler()
417        {
418            thread.Start();
419        }
420
421        private void Start()
422        {
423            if (this.currentContext != Thread.CurrentContext)
424                this.currentContext.DoCallBack(Start);
425
426            // Loop forever
427            while (true)
428            {
429                this.wakeup.WaitOne();
430
431                // Loop while there are more protocols waiting
432                while (true)
433                {
434                    // Should the scheduler stop?
435                    if (this.shutdown)
436                        return;
437                   
438                    bool donotrun = false;
439                    ProtocolBase protocol = null;
440                    lock (this)
441                    {
442                        // No more protocols? -> Wait
443                        if (this.waitingProtocols.Count == 0)
444                            break;
445
446                        protocol = this.waitingProtocols.Dequeue();
447
448                        if (protocol is PluginProtocol)
449                        {
450                            PluginProtocol pluginProtocol = (PluginProtocol)protocol;
451                            foreach (ConnectorModel outputConnector in pluginProtocol.PluginModel.OutputConnectors)
452                            {
453                                foreach (ConnectionModel connection in outputConnector.OutputConnections)
454                                {                                   
455                                    if (connection.To.HasData &&
456                                        connection.To.PluginModel != pluginProtocol.PluginModel &&
457                                        donotrun == false)
458                                    {                                           
459                                        this.waitingProtocols.Enqueue(protocol);
460                                        donotrun = true;
461                                    }
462                                 
463                                }
464                            }               
465                        }
466
467                    }
468
469                    if (donotrun == false)
470                    {
471                        ProtocolStatus status = protocol.Run();
472
473                        lock (this)
474                        {
475                            switch (status)
476                            {
477                                case ProtocolStatus.Created:
478                                    System.Diagnostics.Debug.Assert(false);
479                                    break;
480                                case ProtocolStatus.Ready:
481                                    this.waitingProtocols.Enqueue(protocol);
482                                    break;
483                                case ProtocolStatus.Waiting:
484                                    break;
485                                case ProtocolStatus.Terminated:
486                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
487                                    this.RemoveProtocol(protocol);
488                                    break;
489                            }
490                        }
491                    }
492                }
493            }
494        }
495
496        public override void RemoveProtocol(ProtocolBase protocol)
497        {
498            lock (this)
499            {
500                this.protocols.Remove(protocol);
501                if (this.protocols.Count == 0)
502                    this.Shutdown();
503            }
504        }
505
506        public override void AddProtocol(ProtocolBase protocol)
507        {
508            lock (this)
509            {
510                this.protocols.Add(protocol);
511            }
512        }
513
514        public override void Wakeup(ProtocolBase protocol)
515        {
516            lock (this)
517            {
518                if (!this.waitingProtocols.Contains(protocol))
519                    this.waitingProtocols.Enqueue(protocol);
520                this.wakeup.Set();
521            }
522        }
523
524        public override void Shutdown()
525        {
526            this.shutdown = true;
527            this.wakeup.Set();
528        }
529    }
530}
Note: See TracBrowser for help on using the repository browser.