source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1803

Last change on this file since 1803 was 1803, checked in by kopal, 11 years ago

removed events of connectors and put date exchange directly into the ExecutionEngine to improve the performance

File size: 22.5 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            if (!IsRunning)
75            {
76                IsRunning = true;
77                this.workspaceModel = workspaceModel;
78                int amountSchedulers = System.Environment.ProcessorCount * 2;
79
80                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
81                //We do this, because measurements showed that we get the best performance if we
82                //use this amount of schedulers
83                schedulers = new Scheduler[amountSchedulers];
84                for (int i = 0; i < amountSchedulers; i++)
85                {
86                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
87                }
88               
89                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
90                workspaceModel.resetStates();
91
92                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
93                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
94                schedulers[0].AddProtocol(updateGuiProtocol);
95                updateGuiProtocol.Start();
96
97                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
98                if (this.BenchmarkPlugins)
99                {
100                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
101                    schedulers[0].AddProtocol(benchmarkProtocol);
102                    benchmarkProtocol.Start();
103                }
104
105                //Here we create for each PluginModel an own PluginProtocol
106                //By using round-robin we give each protocol to another scheduler to gain
107                //a good average load balancing of the schedulers
108                //we also initalize each plugin
109                int counter=0;
110                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
111                {
112                    pluginModel.Plugin.PreExecution();
113                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
114                    pluginModel.PluginProtocol = pluginProtocol;
115                    schedulers[counter].AddProtocol(pluginProtocol);
116                   
117                    pluginProtocol.Start();
118                    counter = (counter + 1) % (amountSchedulers);
119
120                    if (pluginModel.Startable)
121                    {
122                        MessageExecution msg = new MessageExecution();
123                        msg.PluginModel = pluginModel;
124                        pluginProtocol.BroadcastMessageReliably(msg);
125                    }
126                }
127
128                foreach (Scheduler scheduler in schedulers)
129                {
130                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
131                }
132            }
133        }     
134     
135        /// <summary>
136        /// Stop the execution process:
137        /// calls shutdown on all schedulers + calls stop() on each plugin
138        /// </summary>
139        public void Stop()
140        {
141            //First stop alle plugins
142            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
143            {
144                pluginModel.Plugin.Stop();
145                pluginModel.Plugin.PostExecution();
146            }           
147
148            IsRunning = false;
149            //Secondly stop all Gears4Net Schedulers
150            foreach (Scheduler scheduler in schedulers)
151            {
152                scheduler.Shutdown();
153            }
154             
155        }
156
157        /// <summary>
158        /// Pause the execution
159        /// </summary>
160        public void Pause()
161        {
162            //not implemented yet
163        }
164
165        /// <summary>
166        /// Use the logger of the WorkspaceManagerEditor
167        /// </summary>
168        /// <param name="message"></param>
169        /// <param name="level"></param>
170        public void GuiLogMessage(string message, NotificationLevel level)
171        {           
172            WorkspaceManagerEditor.GuiLogMessage(message, level);
173        }           
174    }
175 
176    /// <summary>
177    /// Message send to scheduler for a Plugin to trigger the Execution
178    /// </summary>
179    public class MessageExecution : MessageBase
180    {
181        public PluginModel PluginModel;
182    }
183
184    /// <summary>
185    /// A Protocol for updating the GUI in time intervals
186    /// </summary>
187    public class UpdateGuiProtocol : ProtocolBase
188    {
189        private WorkspaceModel workspaceModel;
190        private ExecutionEngine executionEngine;     
191
192        /// <summary>
193        /// Create a new protocol. Each protocol requires a scheduler which provides
194        /// a thread for execution.
195        /// </summary>
196        /// <param name="scheduler"></param>
197        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
198            : base(scheduler)
199        {
200            this.workspaceModel = workspaceModel;
201            this.executionEngine = executionEngine;           
202        }
203
204        /// <summary>
205        /// The main function of the protocol
206        /// </summary>
207        /// <param name="stateMachine"></param>
208        /// <returns></returns>
209        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
210        {
211            while (this.executionEngine.IsRunning)
212            {
213                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
214            }
215        }
216
217        /// <summary>
218        /// Handler function for a message.
219        /// This handler must not block, because it executes inside the thread of the scheduler.
220        /// </summary>
221        /// <param name="msg"></param>
222        private void HandleUpdateGui()
223        {
224            //Get the gui Thread
225            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
226            {
227                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
228                {
229                    if (pluginModel.GuiNeedsUpdate)
230                    {
231                        pluginModel.GuiNeedsUpdate = false;
232                        pluginModel.paint();
233                        if (pluginModel.UpdateableView != null)
234                        {
235                            pluginModel.UpdateableView.update();
236                        }
237                    }
238                }
239                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
240                {
241                    if (connectionModel.GuiNeedsUpdate)
242                    {
243                        if (connectionModel.UpdateableView != null)
244                        {
245                            connectionModel.UpdateableView.update();
246                        }
247                    }
248                }
249            }
250            , null);
251        }
252    }
253   
254    /// <summary>
255    /// A Protocol for benchmarking
256    /// </summary>
257    public class BenchmarkProtocol : ProtocolBase
258    {
259        private WorkspaceModel workspaceModel;
260        private ExecutionEngine executionEngine;
261
262        /// <summary>
263        /// Create a new protocol. Each protocol requires a scheduler which provides
264        /// a thread for execution.
265        /// </summary>
266        /// <param name="scheduler"></param>
267        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
268            : base(scheduler)
269        {
270            this.workspaceModel = workspaceModel;
271            this.executionEngine = executionEngine;
272        }
273
274        /// <summary>
275        /// The main function of the protocol
276        /// </summary>
277        /// <param name="stateMachine"></param>
278        /// <returns></returns>
279        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
280        {
281            while (this.executionEngine.IsRunning)
282            {
283                yield return Timeout(1000, HandleBenchmark);
284            }
285        }
286
287        /// <summary>
288        /// Handler function for a message.
289        /// This handler must not block, because it executes inside the thread of the scheduler.
290        /// </summary>
291        /// <param name="msg"></param>
292        private void HandleBenchmark()
293        {
294            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
295            this.executionEngine.ExecutedPluginsCounter = 0;
296        }
297
298    }
299
300    /// <summary>
301    /// A Protocol for a PluginModel
302    /// </summary>
303    public class PluginProtocol : ProtocolBase
304    {       
305        public PluginModel PluginModel;
306        private ExecutionEngine executionEngine;
307
308        /// <summary>
309        /// Create a new protocol. Each protocol requires a scheduler which provides
310        /// a thread for execution.
311        /// </summary>
312        /// <param name="scheduler"></param>
313        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
314            : base(scheduler)
315        {
316            this.PluginModel = pluginModel;
317            this.executionEngine = executionEngine;
318        }
319
320        /// <summary>
321        /// The main function of the protocol     
322        /// </summary>
323        /// <param name="stateMachine"></param>
324        /// <returns></returns>
325        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
326        {
327            while (this.executionEngine.IsRunning)
328            {
329                yield return Receive<MessageExecution>(null,HandleExecute);
330                //this.HandleExecute((MessageExecution)stateMachine.CurrentMessage);
331            }
332        }
333
334        /// <summary>
335        /// Call the execution function of the wrapped IPlugin
336        /// </summary>
337        /// <param name="msg"></param>
338        private void HandleExecute(MessageExecution msg)
339        {
340           
341            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
342            {
343                return;
344            }
345           
346            //Check if all necessary inputs are set
347            foreach (ConnectorModel connectorModel in msg.PluginModel.InputConnectors)
348            {
349                if (!connectorModel.IControl && 
350                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
351                    connectorModel.Data==null))
352                {
353                    return;
354                }
355            }
356
357            //Check if all outputs are free
358            foreach (ConnectorModel connectorModel in msg.PluginModel.OutputConnectors)
359            {
360                if (!connectorModel.IControl)
361                {
362                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
363                    {
364                        if (connectionModel.To.HasData)
365                        {
366                            return;
367                        }
368                    }
369                }
370            }
371
372            //Fill the plugins inputs with data
373            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
374            {
375                try
376                {
377                    if (connectorModel.HasData && connectorModel.Data.value != null)
378                    {
379                        if (connectorModel.IsDynamic)
380                        {
381                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
382                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
383                        }
384                        else
385                        {
386                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
387                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
388                        }
389                    }
390                }
391                catch (Exception ex)
392                {
393                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
394                    this.PluginModel.State = PluginModelState.Error;
395                    this.PluginModel.GuiNeedsUpdate = true;
396                    return;
397                }
398            }
399           
400            msg.PluginModel.Plugin.Execute();
401
402            if (this.executionEngine.BenchmarkPlugins)
403            {
404                this.executionEngine.ExecutedPluginsCounter++;
405            }
406
407            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
408            {
409                if (connectorModel.HasData)
410                {                   
411                    connectorModel.Data = null;
412                    connectorModel.HasData = false;
413                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
414                    {
415                        connectionModel.Active = false;
416                        connectorModel.GuiNeedsUpdate = true;
417                    }
418                }
419            }
420
421            foreach (ConnectorModel connectorModel in PluginModel.OutputConnectors)
422            {
423                object data;
424
425                if (connectorModel.IsDynamic)
426                {
427                    data = connectorModel.PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicGetterName).Invoke(connectorModel.PluginModel.Plugin, new object[] { connectorModel.PropertyName });
428                }
429                else
430                {
431                    data = connectorModel.PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName).GetValue(connectorModel.PluginModel.Plugin, null);
432                }
433
434                if (data != null)
435                {
436                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
437                    {
438
439                        Data Data = new Data();
440                        Data.value = data;
441                        connectionModel.To.Data = Data;
442                        connectionModel.To.HasData = true;
443                        connectionModel.Active = true;
444                        connectionModel.GuiNeedsUpdate = true;
445                    }
446                }
447            }
448           
449            foreach (ConnectorModel connectorModel in PluginModel.OutputConnectors)
450            {
451                foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
452                {
453                    MessageExecution msg_exce = new MessageExecution();
454                    msg_exce.PluginModel = connectionModel.To.PluginModel;
455                    connectionModel.To.PluginModel.PluginProtocol.BroadcastMessage(msg_exce);
456                }
457            }
458
459
460            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
461            {
462                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
463                {
464                    if (!connectionModel.From.PluginModel.Startable || 
465                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
466                    {
467                        MessageExecution message_exec = new MessageExecution();
468                        message_exec.PluginModel = connectionModel.From.PluginModel;
469                        connectionModel.From.PluginModel.PluginProtocol.BroadcastMessageReliably(message_exec);
470                    }
471                }
472            }
473        }
474    }
475
476    /// <summary>
477    /// Gears4Net Scheduler
478    /// </summary>
479    public class WorkspaceManagerScheduler : Scheduler
480    {
481        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
482        private bool shutdown = false;
483        private System.Threading.Thread thread;
484        private Context currentContext;
485
486                public WorkspaceManagerScheduler() : this(String.Empty)
487                {
488
489                }
490
491        public WorkspaceManagerScheduler(string name)
492            : base()
493        {
494            this.currentContext = Thread.CurrentContext;
495
496            thread = new System.Threading.Thread(this.Start);
497            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
498                        thread.Name = name;
499           
500        }
501
502        public void startScheduler()
503        {
504            thread.Start();
505        }
506
507        private void Start()
508        {
509            if (this.currentContext != Thread.CurrentContext)
510                this.currentContext.DoCallBack(Start);
511
512            // Loop forever
513            while (true)
514            {
515                this.wakeup.WaitOne();
516
517                // Loop while there are more protocols waiting
518                while (true)
519                {
520                    // Should the scheduler stop?
521                    if (this.shutdown)
522                        return;
523                   
524                    ProtocolBase protocol = null;
525                    lock (this)
526                    {
527                        // No more protocols? -> Wait
528                        if (this.waitingProtocols.Count == 0)
529                            break;
530                    }
531
532                    protocol = this.waitingProtocols.Dequeue();
533                    ProtocolStatus status = protocol.Run();
534
535                    lock (this)
536                    {
537                        switch (status)
538                        {
539                            case ProtocolStatus.Created:
540                                System.Diagnostics.Debug.Assert(false);
541                                break;
542                            case ProtocolStatus.Ready:
543                                this.waitingProtocols.Enqueue(protocol);
544                                break;
545                            case ProtocolStatus.Waiting:
546                                break;
547                            case ProtocolStatus.Terminated:
548                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
549                                this.RemoveProtocol(protocol);
550                                break;
551                        }
552                    }
553                   
554                }
555            }
556        }
557
558        /// <summary>
559        /// Removes a protocol from the internal queue
560        /// </summary>
561        /// <param name="protocol"></param>
562        public override void RemoveProtocol(ProtocolBase protocol)
563        {
564            lock (this)
565            {
566                this.protocols.Remove(protocol);
567                if (this.protocols.Count == 0)
568                    this.Shutdown();
569            }
570        }
571
572        /// <summary>
573        /// Adds a protocol to the internal queue
574        /// </summary>
575        /// <param name="protocol"></param>
576        public override void AddProtocol(ProtocolBase protocol)
577        {
578            lock (this)
579            {
580                this.protocols.Add(protocol);
581            }
582        }
583
584        /// <summary>
585        /// Wakeup this scheduler
586        /// </summary>
587        /// <param name="protocol"></param>
588        public override void Wakeup(ProtocolBase protocol)
589        {
590            lock (this)
591            {
592                if (!this.waitingProtocols.Contains(protocol))
593                    this.waitingProtocols.Enqueue(protocol);
594                this.wakeup.Set();
595            }
596        }
597
598        /// <summary>
599        /// Terminates the scheduler
600        /// </summary>
601        public override void Shutdown()
602        {
603            this.shutdown = true;
604            this.wakeup.Set();
605        }
606    }
607}
Note: See TracBrowser for help on using the repository browser.