Ignore:
Timestamp:
Jun 24, 2010, 4:32:45 PM (11 years ago)
Author:
kopal
Message:
  • WorkspaceManager now allows multiple Connections on InputConnectors
  • Created own Gears4Net Scheduler which implements flow control in WorkspaceManagers ExecutionEngine to support loops
  • some small bugfixes
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/WorkspaceManager/Execution/ExecutionEngine.cs

    r1684 r1700  
    2727using Gears4Net;
    2828using System.Windows.Threading;
     29using System.Runtime.Remoting.Contexts;
    2930
    3031namespace WorkspaceManager.Execution
     
    8182                //use this amount of schedulers
    8283                schedulers = new Scheduler[System.Environment.ProcessorCount*2];
    83                 for(int i=0;i<System.Environment.ProcessorCount*2;i++){
    84                     schedulers[i] = new STAScheduler("Scheduler" + i);
    85                 }
    86 
     84                for(int i=0;i< System.Environment.ProcessorCount*2;i++){
     85                    schedulers[i] = new WorkspaceManagerScheduler("Scheduler" + i);                   
     86                }
     87               
    8788                //We have to reset all states of PluginModels, ConnectorModels and ConnectionModels:
    8889                workspaceModel.resetStates();
     
    122123                        pluginProtocol.BroadcastMessageReliably(msg);
    123124                    }
     125                }
     126
     127                foreach (Scheduler scheduler in schedulers)
     128                {
     129                    ((WorkspaceManagerScheduler)scheduler).startScheduler();
    124130                }
    125131            }
     
    298304    public class PluginProtocol : ProtocolBase
    299305    {
    300         private PluginModel pluginModel;
     306        public PluginModel PluginModel;
    301307        private ExecutionEngine executionEngine;
    302308
     
    309315            : base(scheduler)
    310316        {
    311             this.pluginModel = pluginModel;
     317            this.PluginModel = pluginModel;
    312318            this.executionEngine = executionEngine;
    313319        }
     
    322328            while (this.executionEngine.IsRunning)
    323329            {
    324                 yield return Receive<MessageExecution>(null, this.HandleExecute);             
     330                yield return Receive<MessageExecution>(null, this.HandleExecute);
     331                //yield return Parallel(1,new PluginWaitReceiver()) & Receive<MessageExecution>(null, this.HandleExecute);
     332                //yield return new PluginWaitReceiver() + Receive<MessageExecution>(null, this.HandleExecute);
     333                //yield return Parallel(1,new PluginWaitReceiver()) + Receive<MessageExecution>(null, this.HandleExecute);
    325334            }
    326335        }
     
    332341        private void HandleExecute(MessageExecution msg)
    333342        {
    334            
    335343            //executionEngine.GuiLogMessage("HandleExecute for \"" + msg.PluginModel.Name + "\"", NotificationLevel.Debug);
    336344            //Fill the plugins Inputs with data
    337             foreach (ConnectorModel connectorModel in pluginModel.InputConnectors)
     345            foreach (ConnectorModel connectorModel in PluginModel.InputConnectors)
    338346            {
    339347                if (connectorModel.HasData)
    340                 {
    341                     if (connectorModel.IsDynamic)
    342                     {
    343                         MethodInfo propertyInfo = pluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
    344                         propertyInfo.Invoke(pluginModel.Plugin, new object[]{connectorModel.PropertyName, connectorModel.Data});
    345                     }
    346                     else
    347                     {
    348                         PropertyInfo propertyInfo = pluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
    349                         propertyInfo.SetValue(pluginModel.Plugin, connectorModel.Data, null);
     348                {                   
     349                    try
     350                    {
     351                        if (connectorModel.IsDynamic)
     352                        {
     353                            MethodInfo propertyInfo = PluginModel.Plugin.GetType().GetMethod(connectorModel.DynamicSetterName);
     354                            propertyInfo.Invoke(PluginModel.Plugin, new object[] { connectorModel.PropertyName, connectorModel.Data });
     355                        }
     356                        else
     357                        {
     358                            PropertyInfo propertyInfo = PluginModel.Plugin.GetType().GetProperty(connectorModel.PropertyName);
     359                            propertyInfo.SetValue(PluginModel.Plugin, connectorModel.Data, null);
     360                        }
     361                    }
     362                    catch (Exception ex)
     363                    {
     364                        this.PluginModel.WorkspaceModel.WorkspaceManagerEditor.GuiLogMessage("An error occured while setting value of connector \"" + connectorModel.Name + "\" of \"" + PluginModel + "\": " + ex.Message, NotificationLevel.Error);
     365                        this.PluginModel.State = PluginModelState.Error;
     366                        this.PluginModel.GuiNeedsUpdate = true;
     367                        return;
    350368                    }
    351369                }
     
    361379     
    362380    }
     381
     382    public class WorkspaceManagerScheduler : Scheduler
     383    {
     384        private System.Threading.AutoResetEvent wakeup = new System.Threading.AutoResetEvent(false);
     385        private bool shutdown = false;
     386        private System.Threading.Thread thread;
     387        private Context currentContext;
     388
     389                public WorkspaceManagerScheduler() : this(String.Empty)
     390                {
     391
     392                }
     393
     394        public WorkspaceManagerScheduler(string name)
     395            : base()
     396        {
     397            this.currentContext = Thread.CurrentContext;
     398
     399            thread = new System.Threading.Thread(this.Start);
     400            thread.SetApartmentState(System.Threading.ApartmentState.STA);
     401                        thread.Name = name;
     402           
     403        }
     404
     405        public void startScheduler()
     406        {
     407            thread.Start();
     408        }
     409
     410        private void Start()
     411        {
     412            if (this.currentContext != Thread.CurrentContext)
     413                this.currentContext.DoCallBack(Start);
     414
     415            // Loop forever
     416            while (true)
     417            {
     418                this.wakeup.WaitOne();
     419
     420                // Loop while there are more protocols waiting
     421                while (true)
     422                {
     423                    // Should the scheduler stop?
     424                    if (this.shutdown)
     425                        return;
     426                   
     427                    bool donotrun = false;
     428                    ProtocolBase protocol = null;
     429                    lock (this)
     430                    {
     431                        // No more protocols? -> Wait
     432                        if (this.waitingProtocols.Count == 0)
     433                            break;
     434                        protocol = this.waitingProtocols.Dequeue();
     435
     436                        if (protocol is PluginProtocol)
     437                        {
     438                            PluginProtocol pluginProtocol = (PluginProtocol)protocol;
     439                            foreach (ConnectorModel outputConnector in pluginProtocol.PluginModel.OutputConnectors)
     440                            {
     441                                foreach (ConnectionModel connection in outputConnector.OutputConnections)
     442                                {
     443                                   
     444                                    if (connection.To.PluginModel.PluginProtocol.QueueLength > 0 &&
     445                                        connection.To.PluginModel != pluginProtocol.PluginModel &&
     446                                        donotrun == false)
     447                                    {                                           
     448                                        this.waitingProtocols.Enqueue(protocol);
     449                                        donotrun = true;
     450                                    }
     451                                 
     452                                }
     453                            }               
     454                        }
     455
     456                    }
     457
     458                    if (donotrun == false)
     459                    {
     460                        ProtocolStatus status = protocol.Run();
     461
     462                        lock (this)
     463                        {
     464                            switch (status)
     465                            {
     466                                case ProtocolStatus.Created:
     467                                    System.Diagnostics.Debug.Assert(false);
     468                                    break;
     469                                case ProtocolStatus.Ready:
     470                                    this.waitingProtocols.Enqueue(protocol);
     471                                    break;
     472                                case ProtocolStatus.Waiting:
     473                                    break;
     474                                case ProtocolStatus.Terminated:
     475                                    System.Diagnostics.Debug.Assert(!this.waitingProtocols.Contains(protocol));
     476                                    this.RemoveProtocol(protocol);
     477                                    break;
     478                            }
     479                        }
     480                    }
     481                }
     482            }
     483        }
     484
     485        public override void RemoveProtocol(ProtocolBase protocol)
     486        {
     487            lock (this)
     488            {
     489                this.protocols.Remove(protocol);
     490                if (this.protocols.Count == 0)
     491                    this.Shutdown();
     492            }
     493        }
     494
     495        public override void AddProtocol(ProtocolBase protocol)
     496        {
     497            lock (this)
     498            {
     499                this.protocols.Add(protocol);
     500            }
     501        }
     502
     503        public override void Wakeup(ProtocolBase protocol)
     504        {
     505            lock (this)
     506            {
     507                if (!this.waitingProtocols.Contains(protocol))
     508                    this.waitingProtocols.Enqueue(protocol);
     509                this.wakeup.Set();
     510            }
     511        }
     512
     513        public override void Shutdown()
     514        {
     515            this.shutdown = true;
     516            this.wakeup.Set();
     517        }
     518    }
    363519}
Note: See TracChangeset for help on using the changeset viewer.