Changeset 1884


Ignore:
Timestamp:
Aug 29, 2010, 2:22:40 PM (11 years ago)
Author:
kopal
Message:
  • created multi processor gears4net scheduler for execution engine
  • added setting for thread priority
Location:
trunk/CrypPlugins/WorkspaceManager
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs

    r1882 r1884  
    4141    {
    4242        private WorkspaceManager WorkspaceManagerEditor;
    43         private Scheduler[] schedulers;
     43        private Scheduler scheduler;
    4444        private WorkspaceModel workspaceModel;
    4545        private volatile bool isRunning = false;
     
    4949        public long GuiUpdateInterval { get; set; }
    5050        public int SleepTime { get; set; }
     51        public int ThreadPriority { get; set; }
    5152
    5253        /// <summary>
     
    7273        /// </summary>
    7374        /// <param name="workspaceModel"></param>
    74         public void Execute(WorkspaceModel workspaceModel, int amountSchedulers)
     75        public void Execute(WorkspaceModel workspaceModel, int amountThreads)
    7576        {
    7677            if (!IsRunning)
     
    7980                this.workspaceModel = workspaceModel;
    8081
    81                 if (amountSchedulers <= 0)
    82                 {
    83                     amountSchedulers = 1;
    84                 }
    85 
    86                 //Here we create n = "ProcessorsCount * 2" Gears4Net schedulers
    87                 //We do this, because measurements showed that we get the best performance if we
    88                 //use this amount of schedulers
    89                 schedulers = new Scheduler[amountSchedulers];
    90                 for (int i = 0; i < amountSchedulers; i++)
    91                 {
    92                     schedulers[i] = new WorkspaceManagerScheduler("WorkspaceManagerScheduler-" + i);
    93                     ((WorkspaceManagerScheduler)schedulers[i]).executionEngine = this;
    94                 }
     82                if (amountThreads <= 0)
     83                {
     84                    amountThreads = 1;
     85                }
     86
     87                scheduler = new WorkspaceManagerScheduler("WorkspaceManagerScheduler", amountThreads,this);
    9588               
    9689                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
     
    9891
    9992                //The UpdateGuiProtocol is a kind of "daemon" which will update the view elements if necessary
    100                 UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(schedulers[0], workspaceModel, this);
    101                 schedulers[0].AddProtocol(updateGuiProtocol);
     93                UpdateGuiProtocol updateGuiProtocol = new UpdateGuiProtocol(scheduler, workspaceModel, this);
     94                scheduler.AddProtocol(updateGuiProtocol);
    10295                updateGuiProtocol.Start();
    10396
     
    10598                if (this.BenchmarkPlugins)
    10699                {
    107                     BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(schedulers[0], this.workspaceModel, this);
    108                     schedulers[0].AddProtocol(benchmarkProtocol);
     100                    BenchmarkProtocol benchmarkProtocol = new BenchmarkProtocol(scheduler, this.workspaceModel, this);
     101                    scheduler.AddProtocol(benchmarkProtocol);
    109102                    benchmarkProtocol.Start();
    110103                }
     
    119112                foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
    120113                {
    121                     PluginProtocol pluginProtocol = null;
    122 
    123                     if (pluginModel.Plugin is PluginProtocol)
    124                     {
    125                         pluginProtocol = (PluginProtocol)pluginModel.Plugin;
    126                         pluginProtocol.setExecutionEngineSettings(schedulers[counter], pluginModel, this);
    127                     }
    128                     else
    129                     {
    130                         pluginProtocol = new PluginProtocol(schedulers[counter], pluginModel, this);
    131                     }
    132 
     114                    PluginProtocol pluginProtocol = new PluginProtocol(scheduler, pluginModel, this);
     115                   
    133116                    pluginModel.Plugin.PreExecution();                   
    134117                    pluginModel.PluginProtocol = pluginProtocol;
    135                     schedulers[counter].AddProtocol(pluginProtocol);
     118                    scheduler.AddProtocol(pluginProtocol);
    136119
    137120                    if (pluginProtocol.Status == ProtocolStatus.Created || pluginProtocol.Status == ProtocolStatus.Terminated)
     
    140123                    }
    141124
    142                     counter = (counter + 1) % (amountSchedulers);
     125                    counter = (counter + 1) % (amountThreads);
    143126
    144127                    if (pluginModel.Startable)
     
    150133                }
    151134
    152                 foreach (Scheduler scheduler in schedulers)
    153                 {
    154                     ((WorkspaceManagerScheduler)scheduler).startScheduler();
    155                 }
     135                ((WorkspaceManagerScheduler)scheduler).startScheduling();
     136               
    156137            }
    157138        }     
     
    172153            IsRunning = false;
    173154            //Secondly stop all Gears4Net Schedulers
    174             foreach (Scheduler scheduler in schedulers)
    175             {
    176                 scheduler.Shutdown();
    177             }
    178              
     155            scheduler.Shutdown();
     156           
    179157        }
    180158
     
    195173        {           
    196174            WorkspaceManagerEditor.GuiLogMessage(message, level);
    197         }           
     175        }
    198176    }
    199177 
     
    249227            this.workspaceModel.WorkspaceManagerEditor.Presentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
    250228            {
    251                 foreach (PluginModel pluginModel in workspaceModel.AllPluginModels)
     229                List<PluginModel> pluginModels = workspaceModel.AllPluginModels;
     230                foreach (PluginModel pluginModel in pluginModels)
    252231                {
    253232                    if (pluginModel.GuiNeedsUpdate)
     
    261240                    }
    262241                }
    263                 foreach (ConnectionModel connectionModel in workspaceModel.AllConnectionModels)
     242                List<ConnectionModel> connectionModels = workspaceModel.AllConnectionModels;
     243                foreach (ConnectionModel connectionModel in connectionModels)
    264244                {
    265245                    if (connectionModel.GuiNeedsUpdate)
     
    388368        private void HandleExecute(MessageExecution msg)
    389369        {
     370            // ################
    390371            // 1. Check if Plugin may Execute
    391             if (!mayExecute(PluginModel))
     372            // ################
     373
     374            if (!msg.PluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
    392375            {
    393376                return;
    394377            }
    395378
     379            //Check if all necessary inputs are set
     380            List<ConnectorModel> inputConnectors = msg.PluginModel.InputConnectors;
     381            foreach (ConnectorModel connectorModel in inputConnectors)
     382            {
     383                if (!connectorModel.IControl &&
     384                    (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && !connectorModel.HasData)
     385                {
     386                    return;
     387                }
     388            }
     389
     390            //Check if all outputs are free
     391            List<ConnectorModel> outputConnectors = msg.PluginModel.OutputConnectors;
     392            foreach (ConnectorModel connectorModel in outputConnectors)
     393            {
     394                if (!connectorModel.IControl)
     395                {
     396                    List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
     397                    foreach (ConnectionModel connectionModel in outputConnections)
     398                    {
     399                        if (connectionModel.To.HasData)
     400                        {
     401                            return;
     402                        }
     403                    }
     404                }
     405            }
     406
     407            // ################
    396408            //2. Fill all Inputs of the plugin, if this fails, stop executing the plugin
    397             if (!fillInputs())
    398             {
    399                 return;
    400             }
    401 
    402             //3. Execute the Plugin -> call the IPlugin.Execute()
    403             try
    404             {
    405                 PluginModel.Plugin.Execute();
    406             }
    407             catch (Exception ex)
    408             {
    409                 this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
    410                 this.PluginModel.State = PluginModelState.Error;
    411                 this.PluginModel.GuiNeedsUpdate = true;
    412                 return;
    413             }
    414 
    415             //4. Count for the benchmark
    416             if (this.executionEngine.BenchmarkPlugins)
    417             {
    418                 this.executionEngine.ExecutedPluginsCounter++;
    419             }
    420 
    421             //5. If the user wants to, sleep some time
    422             if (this.executionEngine.SleepTime > 0)
    423             {
    424                 Thread.Sleep(this.executionEngine.SleepTime);
    425             }
    426 
    427             //6. Clear all used inputs
    428             //clearInputs();
    429 
    430             //7. Send execute messages to possible executable next plugins
    431             //runNextPlugins();
    432         }       
    433      
    434         /// <summary>
    435         /// Delete all input data of inputs of the plugin
    436         /// </summary>
    437         public void clearInputs()
    438         {
    439             List<ConnectorModel> inputConnectors = PluginModel.InputConnectors;
    440             foreach (ConnectorModel connectorModel in inputConnectors)
    441             {
    442                 if (connectorModel.HasData)
    443                 {
    444                     connectorModel.Data = null;
    445                     connectorModel.HasData = false;
    446                     connectorModel.GuiNeedsUpdate = true;
    447 
    448                     List<ConnectionModel> inputConnections = connectorModel.InputConnections;
    449                     foreach (ConnectionModel connectionModel in inputConnections)
    450                     {
    451                         connectionModel.Active = false;
    452                         connectorModel.GuiNeedsUpdate = true;
    453                     }
    454                 }
    455             }
    456         }
    457 
    458         /// <summary>
    459         /// Fill all inputs of the plugin
    460         /// </summary>
    461         /// <returns></returns>
    462         public bool fillInputs()
    463         {
     409            // ################
     410
    464411            //Fill the plugins inputs with data
    465             List<ConnectorModel> inputConnectors = PluginModel.InputConnectors;
    466412            foreach (ConnectorModel connectorModel in inputConnectors)
    467413            {
     
    487433                    this.PluginModel.State = PluginModelState.Error;
    488434                    this.PluginModel.GuiNeedsUpdate = true;
    489                     return false;
    490                 }
    491             }
    492             return true;
    493         }
    494 
    495         /// <summary>
    496         ///
    497         /// </summary>
    498         /// <returns></returns>
    499         public bool mayExecute()
    500         {
    501             return mayExecute(this.PluginModel);
    502         }
    503 
    504         /// <summary>
    505         /// Check if the PluginModel may execute
    506         /// </summary>
    507         /// <param name="pluginModel"></param>
    508         /// <returns></returns>
    509         private bool mayExecute(PluginModel pluginModel)
    510         {
    511             if (!pluginModel.WorkspaceModel.WorkspaceManagerEditor.isExecuting())
    512             {
    513                 return false;
    514             }
    515 
    516             //Check if all necessary inputs are set
    517             List<ConnectorModel> inputConnectors = pluginModel.InputConnectors;
    518             foreach (ConnectorModel connectorModel in inputConnectors)
    519             {
    520                 if (!connectorModel.IControl &&
    521                     (connectorModel.IsMandatory || connectorModel.InputConnections.Count > 0) && (!connectorModel.HasData ||
    522                     connectorModel.Data == null))
    523                 {
    524                     return false;
    525                 }
    526             }
    527 
    528             //Check if all outputs are free
    529             List<ConnectorModel> outputConnectors = pluginModel.OutputConnectors;
    530             foreach (ConnectorModel connectorModel in outputConnectors)
    531             {
    532                 if (!connectorModel.IControl)
    533                 {
    534                     List<ConnectionModel> outputConnections = connectorModel.OutputConnections;
    535                     foreach (ConnectionModel connectionModel in outputConnections)
    536                     {
    537                         if (connectionModel.To.HasData)
    538                         {
    539                             return false;
    540                         }
    541                     }
    542                 }
    543             }
    544             return true;
    545         }
     435                    return;
     436                }
     437            }
     438
     439            // ################
     440            //3. Execute the Plugin -> call the IPlugin.Execute()
     441            // ################
     442
     443            try
     444            {
     445                PluginModel.Plugin.Execute();
     446            }
     447            catch (Exception ex)
     448            {
     449                this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while executing  \"" + PluginModel.Name + "\": " + ex.Message, NotificationLevel.Error);
     450                this.PluginModel.State = PluginModelState.Error;
     451                this.PluginModel.GuiNeedsUpdate = true;
     452                return;
     453            }
     454
     455            // ################
     456            //4. Count for the benchmark
     457            // ################
     458
     459            if (this.executionEngine.BenchmarkPlugins)
     460            {
     461                this.executionEngine.ExecutedPluginsCounter++;
     462            }
     463
     464            // ################
     465            //5. If the user wants to, sleep some time
     466            // ################
     467
     468            if (this.executionEngine.SleepTime > 0)
     469            {
     470                Thread.Sleep(this.executionEngine.SleepTime);
     471            }
     472
     473        }       
    546474
    547475        /// <summary>
     
    561489        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
    562490        private bool shutdown = false;
    563         private System.Threading.Thread thread;
    564         private Context currentContext;
    565         public ExecutionEngine executionEngine = null;
    566 
    567                 public WorkspaceManagerScheduler() : this(String.Empty)
    568                 {
    569 
    570                 }
    571 
    572         public WorkspaceManagerScheduler(string name)
     491        private Thread[] threads;
     492        public ExecutionEngine executionEngine = null;         
     493
     494        public WorkspaceManagerScheduler(string name, int amountThreads, ExecutionEngine executionEngine)
    573495            : base()
    574496        {
    575             this.currentContext = Thread.CurrentContext;
    576 
    577             thread = new System.Threading.Thread(this.Start);
    578             thread.SetApartmentState(System.Threading.ApartmentState.MTA);
    579                         thread.Name = name;
     497            threads = new Thread[amountThreads];
     498            this.executionEngine = executionEngine;
     499
     500            for (int i = 0; i < threads.Length;i++ )
     501            {
     502                threads[i] = new Thread(this.DoScheduling);
     503                threads[i].SetApartmentState(ApartmentState.MTA);
     504                threads[i].Name = name + "-Thread-" + i;
     505
     506                switch (this.executionEngine.ThreadPriority)
     507                {
     508                    case 0:
     509                        threads[i].Priority = ThreadPriority.AboveNormal;
     510                        break;
     511                    case 1:
     512                        threads[i].Priority = ThreadPriority.BelowNormal;
     513                        break;
     514                    case 2:
     515                        threads[i].Priority = ThreadPriority.Highest;
     516                        break;
     517                    case 3:
     518                        threads[i].Priority = ThreadPriority.Lowest;
     519                        break;
     520                    case 4:
     521                        threads[i].Priority = ThreadPriority.Normal;
     522                        break;
     523                }
     524            }
    580525           
    581526        }
    582527
    583         public void startScheduler()
    584         {
    585             thread.Start();
    586         }
    587 
    588         private void Start()
    589         {
    590             if (this.currentContext != Thread.CurrentContext){
    591                 this.currentContext.DoCallBack(Start);}
    592            
    593             this.executionEngine.GuiLogMessage("Scheduler " + this.thread.Name + " up and running", NotificationLevel.Debug);
     528        public void startScheduling()
     529        {
     530            foreach (Thread thread in threads)
     531            {
     532                thread.Start();
     533            }
     534        }
     535       
     536        private void DoScheduling()
     537        {           
     538
     539            this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " up and running", NotificationLevel.Debug);
     540            Queue<ProtocolBase> waitingProtocols = this.waitingProtocols;
    594541
    595542            // Loop forever
     
    604551                    if (this.shutdown)
    605552                    {
    606                         this.executionEngine.GuiLogMessage("Scheduler " + this.thread.Name + " terminated", NotificationLevel.Debug);
     553                        this.executionEngine.GuiLogMessage(Thread.CurrentThread.Name + " terminated", NotificationLevel.Debug);
    607554                        return;
    608555                    }
    609                    
     556
    610557                    ProtocolBase protocol = null;
    611558                    lock (this)
    612559                    {
    613560                        // No more protocols? -> Wait
    614                         if (this.waitingProtocols.Count == 0)
     561                        if (waitingProtocols.Count == 0)
    615562                            break;
    616563                    }
     
    618565                    try
    619566                    {
    620                         protocol = this.waitingProtocols.Dequeue();
     567                        lock (this)
     568                        {
     569                            protocol = waitingProtocols.Dequeue();
     570                        }
    621571                        ProtocolStatus status = protocol.Run();
    622572
     
    629579                                    break;
    630580                                case ProtocolStatus.Ready:
    631                                     this.waitingProtocols.Enqueue(protocol);
     581                                    waitingProtocols.Enqueue(protocol);
    632582                                    break;
    633583                                case ProtocolStatus.Waiting:
     
    640590                        }
    641591                    }
    642                     catch (Exception ex) 
     592                    catch (Exception ex)
    643593                    {
    644594                        System.Diagnostics.Debug.Fail("Error during scheduling: " + ex.Message + " - " + ex.InnerException);
     
    647597            }
    648598        }
    649 
    650599        /// <summary>
    651600        /// Removes a protocol from the internal queue
  • trunk/CrypPlugins/WorkspaceManager/WorkspaceManager.cs

    r1872 r1884  
    413413                try
    414414                {
    415                    schedulers = int.Parse(((WorkspaceManagerSettings)this.Settings).Schedulers);
     415                   schedulers = int.Parse(((WorkspaceManagerSettings)this.Settings).Threads);
    416416                    if (ExecutionEngine.SleepTime < 0)
    417417                    {
     
    427427
    428428                ExecutionEngine.BenchmarkPlugins = ((WorkspaceManagerSettings)this.Settings).BenchmarkPlugins;
     429                ExecutionEngine.ThreadPriority = ((WorkspaceManagerSettings)this.Settings).ThreadPriority;
    429430
    430431                ExecutionEngine.Execute(WorkspaceModel, schedulers);               
  • trunk/CrypPlugins/WorkspaceManager/WorkspaceManagerSettings.cs

    r1872 r1884  
    55using Cryptool.PluginBase;
    66using System.ComponentModel;
     7using System.Threading;
    78
    89namespace WorkspaceManager
     
    1516        public WorkspaceManagerSettings()
    1617        {
    17             this.Schedulers = "" + System.Environment.ProcessorCount * 2;
     18            this.Threads = "" + System.Environment.ProcessorCount;           
    1819        }
    1920
     
    6061        }
    6162
    62         private String schedulers = "0";
    63         [TaskPane("Schedulers", "The amount of parallel gears4net schedulers.", null, 1, false, DisplayLevel.Beginner, ControlType.TextBox)]
    64         public String Schedulers
     63        private String threads = "0";
     64        [TaskPane("Threads", "The amount of used threads for scheduling.", null, 1, false, DisplayLevel.Beginner, ControlType.TextBox)]
     65        public String Threads
    6566        {
    6667            get
    6768            {
    68                 return schedulers;
     69                return threads;
    6970            }
    7071            set
    7172            {
    72                 schedulers = value;
    73                 OnPropertyChanged("Schedulers");
     73                threads = value;
     74                OnPropertyChanged("Threads");
    7475            }
    75         }     
     76        }
     77
     78        private int threadPriority = 4;
     79        [TaskPane("ThreadPriority", "Should the event handling be synchronous?", null, 1, false, DisplayLevel.Beginner, ControlType.ComboBox, new String[] { "AboveNormal", "BelowNormal", "Highest", "Lowest", "Normal" })]
     80        public int ThreadPriority
     81        {
     82            get
     83            {
     84                return threadPriority;
     85            }
     86            set
     87            {
     88                threadPriority = value;
     89                OnPropertyChanged("ThreadPriority");
     90            }
     91        } 
    7692
    7793        private bool benchmarkPlugins = false;
Note: See TracChangeset for help on using the changeset viewer.