source: trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs @ 1780

Last change on this file since 1780 was 1780, checked in by kopal, 11 years ago

now no 2 connections between the same connectors are possible

File size: 20.9 KB
Line 
1/*                             
2   Copyright 2010 Nils Kopal, Viktor M.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21
22using WorkspaceManager.Model;
23using System.Threading;
24using System.Collections;
25using Cryptool.PluginBase;
26using System.Reflection;
27using Gears4Net;
28using System.Windows.Threading;
29using System.Runtime.Remoting.Contexts;
30
31namespace WorkspaceManager.Execution
32{
33    /// <summary>
34    /// Engine to execute a model of the WorkspaceManager
35    /// This class needs a WorkspaceManager to be instantiated
36    /// To run an execution process it also needs a WorkspaceModel
37    ///
38    /// This class uses Gears4Net to execute the plugins
39    /// </summary>
40    public class ExecutionEngine
41    {
42        private WorkspaceManager WorkspaceManagerEditor;
43        private Scheduler[] schedulers;
44        private WorkspaceModel workspaceModel;
45
46        public long ExecutedPluginsCounter { get; set; }
47        public bool BenchmarkPlugins { get; set; }
48        public long GuiUpdateInterval { get; set; }
49
50        /// <summary>
51        /// Creates a new ExecutionEngine
52        /// </summary>
53        /// <param name="workspaceManagerEditor"></param>
54        public ExecutionEngine(WorkspaceManager workspaceManagerEditor)
55        {
56            WorkspaceManagerEditor = workspaceManagerEditor;
57        }
58
59        /// <summary>
60        /// Is this ExecutionEngine running?
61        /// </summary>
62        public bool IsRunning
63        {
64            get;
65            private set;
66        }
67
68        /// <summary>
69        /// Execute the given Model
70        /// </summary>
71        /// <param name="workspaceModel"></param>
72        public void Execute(WorkspaceModel workspaceModel)
73        {
74            if (!IsRunning)
75            {
76                IsRunning = true;
77                this.workspaceModel = workspaceModel;
78                int amountSchedulers = System.Environment.ProcessorCount * 2;
79               
80                //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
81                //We do this, because measurements showed that we get the best performance if we
82                //use this amount of schedulers
83                schedulers = new Scheduler[amountSchedulers];
84                for (int i = 0; i < amountSchedulers; i++)
85                {
86                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
87                }
88               
89                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
90                workspaceModel.resetStates();
91
92                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
93                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
94                schedulers[0].AddProtocol(updateGuiProtocol);
95                updateGuiProtocol.Start();
96
97                //The BenchmarkProtocl counts the amount of executed plugins per seconds and writes this to debug
98                if (this.BenchmarkPlugins)
99                {
100                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
101                    schedulers[0].AddProtocol(benchmarkProtocol);
102                    benchmarkProtocol.Start();
103                }
104
105                //Here we create for each PluginModel an own PluginProtocol
106                //By using round-robin we give each protocol to another scheduler to gain
107                //a good average load balancing of the schedulers
108                //we also initalize each plugin
109                int counter=0;
110                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
111                {
112                    pluginModel.Plugin.PreExecution();
113                    PluginProtocol pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel,this);
114                    pluginModel.PluginProtocol = pluginProtocol;
115                    schedulers[counter].AddProtocol(pluginProtocol);
116                   
117                    pluginProtocol.Start();
118                    counter = (counter + 1) % (amountSchedulers);
119
120                    if (pluginModel.Startable)
121                    {
122                        MessageExecution msg = new MessageExecution();
123                        msg.PluginModel = pluginModel;
124                        pluginProtocol.BroadcastMessageReliably(msg);
125                    }
126                }
127
128                foreach (Scheduler scheduler in schedulers)
129                {
130                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
131                }
132            }
133        }     
134     
135        /// <summary>
136        /// Stop the execution process:
137        /// calls shutdown on all schedulers + calls stop() on each plugin
138        /// </summary>
139        public void Stop()
140        {
141            //First stop alle plugins
142            foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
143            {
144                pluginModel.Plugin.Stop();
145                pluginModel.Plugin.PostExecution();
146            }           
147
148            IsRunning = false;
149            //Secondly stop all Gears4Net Schedulers
150            foreach (Scheduler scheduler in schedulers)
151            {
152                scheduler.Shutdown();
153            }
154             
155        }
156
157        /// <summary>
158        /// Pause the execution
159        /// </summary>
160        public void Pause()
161        {
162            //not implemented yet
163        }
164
165        /// <summary>
166        /// Use the logger of the WorkspaceManagerEditor
167        /// </summary>
168        /// <param name="message"></param>
169        /// <param name="level"></param>
170        public void GuiLogMessage(string message, NotificationLevel level)
171        {           
172            WorkspaceManagerEditor.GuiLogMessage(message, level);
173        }           
174    }
175 
176    /// <summary>
177    /// Message send to scheduler for a Plugin to trigger the Execution
178    /// </summary>
179    public class MessageExecution : MessageBase
180    {
181        public PluginModel PluginModel;
182    }
183
184    /// <summary>
185    /// A Protocol for updating the GUI in time intervals
186    /// </summary>
187    public class UpdateGuiProtocol : ProtocolBase
188    {
189        private WorkspaceModel workspaceModel;
190        private ExecutionEngine executionEngine;     
191
192        /// <summary>
193        /// Create a new protocol. Each protocol requires a scheduler which provides
194        /// a thread for execution.
195        /// </summary>
196        /// <param name="scheduler"></param>
197        public UpdateGuiProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
198            : base(scheduler)
199        {
200            this.workspaceModel = workspaceModel;
201            this.executionEngine = executionEngine;           
202        }
203
204        /// <summary>
205        /// The main function of the protocol
206        /// </summary>
207        /// <param name="stateMachine"></param>
208        /// <returns></returns>
209        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
210        {
211            while (this.executionEngine.IsRunning)
212            {
213                yield return Timeout(this.executionEngine.GuiUpdateInterval, HandleUpdateGui);
214            }
215        }
216
217        /// <summary>
218        /// Handler function for a message.
219        /// This handler must not block, because it executes inside the thread of the scheduler.
220        /// </summary>
221        /// <param name="msg"></param>
222        private void HandleUpdateGui()
223        {
224            //Get the gui Thread
225            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
226            {
227                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
228                {
229                    if (pluginModel.GuiNeedsUpdate)
230                    {
231                        pluginModel.GuiNeedsUpdate = false;
232                        pluginModel.paint();
233                        if (pluginModel.UpdateableView != null)
234                        {
235                            pluginModel.UpdateableView.update();
236                        }
237                    }
238                }
239                foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
240                {
241                    if (connectionModel.GuiNeedsUpdate)
242                    {
243                        if (connectionModel.UpdateableView != null)
244                        {
245                            connectionModel.UpdateableView.update();
246                        }
247                    }
248                }
249            }
250            , null);
251        }
252    }
253   
254    /// <summary>
255    /// A Protocol for benchmarking
256    /// </summary>
257    public class BenchmarkProtocol : ProtocolBase
258    {
259        private WorkspaceModel workspaceModel;
260        private ExecutionEngine executionEngine;
261
262        /// <summary>
263        /// Create a new protocol. Each protocol requires a scheduler which provides
264        /// a thread for execution.
265        /// </summary>
266        /// <param name="scheduler"></param>
267        public BenchmarkProtocol(Scheduler scheduler, WorkspaceModel workspaceModel, ExecutionEngine executionEngine)
268            : base(scheduler)
269        {
270            this.workspaceModel = workspaceModel;
271            this.executionEngine = executionEngine;
272        }
273
274        /// <summary>
275        /// The main function of the protocol
276        /// </summary>
277        /// <param name="stateMachine"></param>
278        /// <returns></returns>
279        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
280        {
281            while (this.executionEngine.IsRunning)
282            {
283                yield return Timeout(1000, HandleBenchmark);
284            }
285        }
286
287        /// <summary>
288        /// Handler function for a message.
289        /// This handler must not block, because it executes inside the thread of the scheduler.
290        /// </summary>
291        /// <param name="msg"></param>
292        private void HandleBenchmark()
293        {
294            this.workspaceModel.WorkspaceManagerEditor.GuiLogMessage("Executing at about " + this.executionEngine.ExecutedPluginsCounter + " Plugins/s", NotificationLevel.Debug);
295            this.executionEngine.ExecutedPluginsCounter = 0;
296        }
297
298    }
299
300    /// <summary>
301    /// A Protocol for a PluginModel
302    /// </summary>
303    public class PluginProtocol : ProtocolBase
304    {       
305        public PluginModel PluginModel;
306        private ExecutionEngine executionEngine;
307
308        /// <summary>
309        /// Create a new protocol. Each protocol requires a scheduler which provides
310        /// a thread for execution.
311        /// </summary>
312        /// <param name="scheduler"></param>
313        public PluginProtocol(Scheduler scheduler, PluginModel pluginModel,ExecutionEngine executionEngine)
314            : base(scheduler)
315        {
316            this.PluginModel = pluginModel;
317            this.executionEngine = executionEngine;
318        }
319
320        /// <summary>
321        /// The main function of the protocol     
322        /// </summary>
323        /// <param name="stateMachine"></param>
324        /// <returns></returns>
325        public override System.Collections.Generic.IEnumerator<ReceiverBase> Execute(AbstractStateMachine stateMachine)
326        {
327            while (this.executionEngine.IsRunning)
328            {
329                yield return Receive<MessageExecution>(null, this.HandleExecute);               
330            }
331        }
332
333        /// <summary>
334        /// Call the execution function of the wrapped IPlugin
335        /// </summary>
336        /// <param name="msg"></param>
337        private void HandleExecute(MessageExecution msg)
338        {
339           
340            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
341            {
342                return;
343            }
344
345            //Check if all necessary inputs are set
346            foreach (ConnectorModel connectorModel in msg.PluginModel.InputConnectors)
347            {
348                if (!connectorModel.IControl && 
349                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
350                    connectorModel.Data==null))
351                {
352                    return;
353                }
354            }
355
356            //Check if all outputs are free
357            foreach (ConnectorModel connectorModel in msg.PluginModel.OutputConnectors)
358            {
359                if (!connectorModel.IControl)
360                {
361                    foreach (ConnectionModel connectionModel in connectorModel.OutputConnections)
362                    {
363                        if (connectionModel.To.HasData)
364                        {
365                            return;
366                        }
367                    }
368                }
369            }
370
371            //Fill the plugins inputs with data
372            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
373            {
374                try
375                {
376                    if (connectorModel.HasData && connectorModel.Data.value != null)
377                    {
378                        if (connectorModel.IsDynamic)
379                        {
380                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
381                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data.value });
382                        }
383                        else
384                        {
385                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
386                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data.value, null);
387                        }
388                    }
389                }
390                catch (Exception ex)
391                {
392                    this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
393                    this.PluginModel.State = PluginModelState.Error;
394                    this.PluginModel.GuiNeedsUpdate = true;
395                    return;
396                }
397            }
398           
399            msg.PluginModel.Plugin.Execute();
400
401            if (this.executionEngine.BenchmarkPlugins)
402            {
403                this.executionEngine.ExecutedPluginsCounter++;
404            }
405
406            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
407            {
408                if (connectorModel.HasData)
409                {                   
410                    connectorModel.Data = null;
411                    connectorModel.HasData = false;
412                    foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
413                    {
414                        connectionModel.Active = false;
415                        connectorModel.GuiNeedsUpdate = true;
416                    }
417                }
418            }
419           
420            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
421            {
422                foreach (ConnectionModel connectionModel in connectorModel.InputConnections)
423                {
424                    if (!connectionModel.From.PluginModel.Startable || 
425                        (connectionModel.From.PluginModel.Startable && connectionModel.From.PluginModel.RepeatStart))
426                    {
427                        MessageExecution message_exec = new MessageExecution();
428                        message_exec.PluginModel = connectionModel.From.PluginModel;
429                        connectionModel.From.PluginModel.PluginProtocol.BroadcastMessage(message_exec);
430                    }
431                }
432            }           
433        }
434    }
435
436    /// <summary>
437    /// Gears4Net Scheduler. The scheduler only runs protocols which do not have a waiting
438    /// plugin on the protocol plugins outputs
439    /// </summary>
440    public class WorkspaceManagerScheduler : Scheduler
441    {
442        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
443        private bool shutdown = false;
444        private System.Threading.Thread thread;
445        private Context currentContext;
446
447                public WorkspaceManagerScheduler() : this(String.Empty)
448                {
449
450                }
451
452        public WorkspaceManagerScheduler(string name)
453            : base()
454        {
455            this.currentContext = Thread.CurrentContext;
456
457            thread = new System.Threading.Thread(this.Start);
458            thread.SetApartmentState(System.Threading.ApartmentState.MTA);
459                        thread.Name = name;
460           
461        }
462
463        public void startScheduler()
464        {
465            thread.Start();
466        }
467
468        private void Start()
469        {
470            if (this.currentContext != Thread.CurrentContext)
471                this.currentContext.DoCallBack(Start);
472
473            // Loop forever
474            while (true)
475            {
476                this.wakeup.WaitOne();
477
478                // Loop while there are more protocols waiting
479                while (true)
480                {
481                    // Should the scheduler stop?
482                    if (this.shutdown)
483                        return;
484                   
485                    ProtocolBase protocol = null;
486                    lock (this)
487                    {
488                        // No more protocols? -> Wait
489                        if (this.waitingProtocols.Count == 0)
490                            break;
491                    }
492
493                    protocol = this.waitingProtocols.Dequeue();
494                    ProtocolStatus status = protocol.Run();
495
496                    lock (this)
497                    {
498                        switch (status)
499                        {
500                            case ProtocolStatus.Created:
501                                System.Diagnostics.Debug.Assert(false);
502                                break;
503                            case ProtocolStatus.Ready:
504                                this.waitingProtocols.Enqueue(protocol);
505                                break;
506                            case ProtocolStatus.Waiting:
507                                break;
508                            case ProtocolStatus.Terminated:
509                                System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
510                                this.RemoveProtocol(protocol);
511                                break;
512                        }
513                    }
514                   
515                }
516            }
517        }
518
519        /// <summary>
520        /// Removes a protocol from the internal queue
521        /// </summary>
522        /// <param name="protocol"></param>
523        public override void RemoveProtocol(ProtocolBase protocol)
524        {
525            lock (this)
526            {
527                this.protocols.Remove(protocol);
528                if (this.protocols.Count == 0)
529                    this.Shutdown();
530            }
531        }
532
533        /// <summary>
534        /// Adds a protocol to the internal queue
535        /// </summary>
536        /// <param name="protocol"></param>
537        public override void AddProtocol(ProtocolBase protocol)
538        {
539            lock (this)
540            {
541                this.protocols.Add(protocol);
542            }
543        }
544
545        /// <summary>
546        /// Wakeup this scheduler
547        /// </summary>
548        /// <param name="protocol"></param>
549        public override void Wakeup(ProtocolBase protocol)
550        {
551            lock (this)
552            {
553                if (!this.waitingProtocols.Contains(protocol))
554                    this.waitingProtocols.Enqueue(protocol);
555                this.wakeup.Set();
556            }
557        }
558
559        /// <summary>
560        /// Terminates the scheduler
561        /// </summary>
562        public override void Shutdown()
563        {
564            this.shutdown = true;
565            this.wakeup.Set();
566        }
567    }
568}
Note: See TracBrowser for help on using the repository browser.