Ignore:
Timestamp:
Nov 19, 2009, 6:03:03 PM (12 years ago)
Author:
arnold
Message:

Important updates for P2PBase, P2PPublisher and P2PSubscriber.

Location:
trunk/CrypPlugins/PeerToPeerSubscriber
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriber.cs

    r862 r872  
    2525using System.ComponentModel;
    2626
    27 /*
    28  * IDEAS:
    29  * - Publisher takes subscriber list out of the DHT and registered
    30  *   itself with all subscribers pro-active (handle Register-Msg in Subscriber!)
    31  *
    32  * TODO:
    33  * - Handle "publisher-changed" case (reconfirm registration, etc.)
    34  */
    35 
    3627namespace Cryptool.Plugins.PeerToPeer
    3728{
     
    4334    public class P2PSubscriber : IInput
    4435    {
    45         #region Variables
    46 
    47         private string sDHTSettingsPostfix = "Settings";
    48         private long sendAliveMessageInterval;
    49         private long checkPublishersAvailability;
    50 
    5136        private P2PSubscriberSettings settings;
    52         private IP2PControl p2pMaster;
    53         /// <summary>
    54         /// If the DHT Entry of the given Task is empty, continous try to
    55         /// find a meanwhile inscribed Publishers PeerID
    56         /// </summary>
    57         private Timer timerRegisteringNotPossible;
    58         /// <summary>
    59         /// For informing the publisher pro-active, that this subscriber
    60         /// is still interested in this Task.
    61         /// </summary>
    62         private Timer timerSendingAliveMsg;
    63         /// <summary>
    64         /// checking liveness, availability and/or changes (new peer) of the Publisher
    65         /// </summary>
    66         private Timer timerCheckPubAvailability;
    67         /// <summary>
    68         /// this timer gets started when the availability of the publisher,
    69         /// at which the subscriber had registered, is checked. If the timer
    70         /// callback is called and no Pong-message was received, the probability
    71         /// that the Publisher is down is high!
    72         /// </summary>
    73         private Timer timeoutForPublishersPong;
    74         /// <summary>
    75         /// After register message is sent to publisher, this timer gets started.
    76         /// If the publisher doesn't response with a RegisteringAccepted-Message,
    77         /// the probability that the publisher is down is high!
    78         /// </summary>
    79         private Timer timeoutForPublishersRegAccept;
    80         /// <summary>
    81         /// PeerID of the actual publisher. This ID is will be checked continious
    82         /// on liveliness and/or updated if Publisher had changed.
    83         /// </summary>
    84         private string actualPublisher;
    85 
    86         #endregion
     37        private IP2PControl p2pControl;
     38        private P2PSubscriberBase p2pSubscriber;
    8739
    8840        #region In and Output
     
    9143        /// Catches the completely configurated, initialized and joined P2P object from the P2PPeer-Slave-PlugIn.
    9244        /// </summary>
    93         [PropertyInfo(Direction.ControlMaster,"P2P Slave","Input the P2P-Peer-PlugIn","",true,false,DisplayLevel.Beginner,QuickWatchFormat.Text,null)]
    94         public IP2PControl P2PMaster
     45        [PropertyInfo(Direction.ControlMaster, "P2P Slave", "Input the P2P-Peer-PlugIn", "", true, false, DisplayLevel.Beginner, QuickWatchFormat.Text, null)]
     46        public IP2PControl P2PControl
    9547        {
    9648            get
    9749            {
    98                 return this.p2pMaster;
     50                return this.p2pControl;
    9951            }
    10052            set
    10153            {
    102                 if (this.p2pMaster != null)
    103                 {
    104                     this.p2pMaster.OnPeerReceivedMsg -= P2PMaster_OnPeerReceivedMsg;
    105                     this.p2pMaster.OnStatusChanged -= P2PMaster_OnStatusChanged;
     54                if (this.p2pControl != null)
     55                {
     56                    this.p2pControl.OnStatusChanged -= P2PControl_OnStatusChanged;
    10657                }
    10758                if (value != null)
    10859                {
    109                     this.p2pMaster = (P2PPeerMaster)value;
    110                     this.p2pMaster.OnPeerReceivedMsg += new P2PBase.P2PMessageReceived(P2PMaster_OnPeerReceivedMsg);
    111                     this.p2pMaster.OnStatusChanged += new IControlStatusChangedEventHandler(P2PMaster_OnStatusChanged);
    112                     OnPropertyChanged("P2PMaster");
     60                    this.p2pControl = (P2PPeerMaster)value;
     61                    this.p2pControl.OnStatusChanged += new IControlStatusChangedEventHandler(P2PControl_OnStatusChanged);
     62                    OnPropertyChanged("P2PControl");
    11363                }
    11464                else
    11565                {
    116                     this.p2pMaster = null;
     66                    this.p2pControl = null;
    11767                }
    11868            }
     
    13989        #endregion
    14090
    141         private void P2PMaster_OnPeerReceivedMsg(string sSourceAddr, string sData)
    142         {
    143             MessageReceived(sSourceAddr, sData);
    144         }
    145 
    146         private void P2PMaster_OnStatusChanged(IControl sender, bool readyForExecution)
     91        private void P2PControl_OnStatusChanged(IControl sender, bool readyForExecution)
    14792        {
    14893            //throw new NotImplementedException();
     
    160105
    161106        #region Standard functionality
     107
    162108        public P2PSubscriber()
    163109        {
     
    176122            if (e.PropertyName == "BtnUnregister")
    177123            {
    178                 StopActiveWork();
    179                 GuiLogMessage("Subscriber unregistered from Publisher!", NotificationLevel.Info);
     124                if (this.p2pSubscriber != null)
     125                {
     126                    this.p2pSubscriber.Stop();
     127                    GuiLogMessage("Subscriber unregistered from Publisher!", NotificationLevel.Info);
     128                }
     129                else
     130                {
     131                    GuiLogMessage("Publisher isn't initialized, so this action isn't possible.", NotificationLevel.Info);
     132                }
    180133            }
    181134            if (e.PropertyName == "BtnRegister")
    182135            {
    183                 Register();
     136                RegisterSubscriber();
    184137                GuiLogMessage("Subscriber registers with Publisher!", NotificationLevel.Info);
    185138            }
    186139            if (e.PropertyName == "BtnSolutionFound")
    187140            {
    188                 SendMessage(actualPublisher, PubSubMessageType.Solution);
    189                 GuiLogMessage("Solution found message sent to Publisher.", NotificationLevel.Info);
     141                if (this.p2pSubscriber != null)
     142                {
     143                    this.p2pSubscriber.SolutionFound();
     144                    GuiLogMessage("Solution found message sent to Publisher.", NotificationLevel.Info);
     145                }
     146                else
     147                {
     148                    GuiLogMessage("Publisher isn't initialized, so this action isn't possible.", NotificationLevel.Info);
     149                }
    190150            }
    191151        }
     
    220180        public void Pause()
    221181        {
    222             StopActiveWork();
    223182        }
    224183
    225184        public void Stop()
    226185        {
    227             StopActiveWork();
     186            if(this.p2pSubscriber != null)
     187                this.p2pSubscriber.Stop();
    228188        }
    229189
     
    240200        {
    241201            // if no P2P Slave PlugIn is connected with this PlugIn --> No execution!
    242             if (P2PMaster == null)
     202            if (P2PControl == null)
    243203            {
    244204                GuiLogMessage("No P2P_Peer connected with this PlugIn!", NotificationLevel.Error);
     
    247207            if (this.settings.TopicName != null)
    248208            {
    249                 Register();
     209                RegisterSubscriber();
    250210            }
    251211            else
     
    255215        }
    256216
     217        private void RegisterSubscriber()
     218        {
     219            if (this.p2pSubscriber == null)
     220            {
     221                this.p2pSubscriber = new P2PSubscriberBase(this.P2PControl);
     222                this.p2pSubscriber.OnGuiMessage += new P2PSubscriberBase.GuiMessage(p2pSubscriber_OnGuiMessage);
     223                this.p2pSubscriber.OnTextArrivedFromPublisher += new P2PSubscriberBase.TextArrivedFromPublisher(p2pSubscriber_OnTextArrivedFromPublisher);
     224                this.p2pSubscriber.Register(this.settings.TopicName, (long)(this.settings.CheckPublishersAvailability * 1000),
     225                    (long)(this.settings.PublishersReplyTimespan * 1000));
     226            }
     227            else
     228            {
     229                this.p2pSubscriber.Register(this.settings.TopicName, (long)(this.settings.CheckPublishersAvailability * 1000),
     230                    (long)(this.settings.PublishersReplyTimespan * 1000));
     231            }
     232        }
     233
     234        void p2pSubscriber_OnTextArrivedFromPublisher(string sData, PeerId pid)
     235        {
     236            this.Outputvalue = sData;
     237        }
     238
     239        void p2pSubscriber_OnGuiMessage(string sData, NotificationLevel notificationLevel)
     240        {
     241            GuiLogMessage(sData, notificationLevel);
     242        }
     243
    257244        #region INotifyPropertyChanged Members
    258245
     
    276263        }
    277264
    278         #endregion
    279 
    280         #region Subscriber methods
    281 
    282         /// <summary>
    283         /// if true, check whether a new Publisher is the actual one
    284         /// and renew Settings
    285         /// </summary>
    286         private bool bolStopped = true;
    287 
    288         private void Register()
    289         {
    290             string sPubId = CheckPublisherAvailability();
    291             // if DHT Entry for Task is empty, there exist no Publisher at present.
    292             // The method PublisherIsAlive starts a Timer for this case to continous proof Publisher-DHT-Entry
    293             if (sPubId != null)
    294             {
    295                 SendMessage(sPubId, PubSubMessageType.Register);
    296                 long interval = this.settings.PublishersReplyTimespan * 1000;
    297                 if (this.timeoutForPublishersRegAccept == null)
    298                     this.timeoutForPublishersRegAccept = new Timer(OnTimeoutRegisteringAccepted, null, interval, interval);
    299                 this.bolStopped = false;
    300             }
    301         }
    302 
    303         private void MessageReceived(string sSourceAddr, string sData)
    304         {
    305             if (sSourceAddr != actualPublisher)
    306             {
    307                 GuiLogMessage("RECEIVED message from third party peer (not the publisher!): " + sData.Trim() + ", ID: " + sSourceAddr, NotificationLevel.Info);
    308                 return;
    309             }
    310 
    311             PubSubMessageType msgType = this.P2PMaster.GetMsgType(sData);
    312 
    313             switch (msgType)
    314             {
    315                 case PubSubMessageType.RegisteringAccepted:
    316                     GuiLogMessage("REGISTERING ACCEPTED received from publisher!", NotificationLevel.Info);
    317                     if (this.timeoutForPublishersRegAccept != null)
    318                     {
    319                         this.timeoutForPublishersRegAccept.Dispose();
    320                         this.timeoutForPublishersRegAccept = null;
    321                     }
    322                     break;
    323                 case PubSubMessageType.Ping:
    324                     SendMessage(sSourceAddr, PubSubMessageType.Pong);
    325                     GuiLogMessage("REPLIED to a ping message from " + sSourceAddr, NotificationLevel.Info);
    326                     break;
    327                 case PubSubMessageType.Register:
    328                 case PubSubMessageType.Unregister:
    329                     /* can't work the right way, because at present the
    330                      * publisher can't remove information of the DHT
    331                      * (point of entry for subscribers) */
    332                     GuiLogMessage(msgType.ToString().ToUpper() + " received from PUBLISHER.", NotificationLevel.Warning);
    333                     // continuously try to get a unregister and than re-register with publisher
    334                     StopActiveWork();
    335                     Register();
    336                     break;
    337                 case PubSubMessageType.Solution:
    338                     StopActiveWork();
    339                     GuiLogMessage("Another Subscriber had found the solution!",NotificationLevel.Info);
    340                     break;
    341                 case PubSubMessageType.Stop:
    342                     StopActiveWork();
    343                     GuiLogMessage("STOP received from publisher. Subscriber is stopped!", NotificationLevel.Warning);
    344                     break;
    345                 case PubSubMessageType.Pong:
    346                     if (this.timeoutForPublishersPong != null)
    347                     {
    348                         this.timeoutForPublishersPong.Dispose();
    349                         this.timeoutForPublishersPong = null;
    350                     }
    351                     break;
    352                 // if the received Data couldn't be casted to enum,
    353                 // it must be text-data
    354                 case PubSubMessageType.NULL:
    355                     GuiLogMessage("RECEIVED: Message from '" + sSourceAddr
    356                     + "' with data: '" + sData + "'", NotificationLevel.Info);
    357                     Outputvalue = sData;
    358                     break;
    359                 case PubSubMessageType.Alive:
    360                 default:
    361                     // not possible at the moment
    362                     break;
    363             }
    364         }
    365 
    366         private void SendMessage(string pubPeerId, PubSubMessageType msgType)
    367         {
    368             if (timerSendingAliveMsg == null && !this.bolStopped)
    369                 timerSendingAliveMsg = new Timer(OnSendAliveMessage, null, sendAliveMessageInterval, sendAliveMessageInterval);
    370 
    371             switch (msgType)
    372             {
    373                 case PubSubMessageType.Register:
    374                     // stop "RegisteringNotPossibleTimer
    375                     if (timerRegisteringNotPossible != null)
    376                     {
    377                         timerRegisteringNotPossible.Dispose();
    378                         timerRegisteringNotPossible = null;
    379                     }
    380                     break;
    381                 case PubSubMessageType.Alive:
    382                 case PubSubMessageType.Ping:
    383                 case PubSubMessageType.Pong:
    384                 case PubSubMessageType.Unregister:
    385                     break;
    386                 case PubSubMessageType.Solution:
    387                     StopActiveWork();
    388                     break;
    389                 default:
    390                     GuiLogMessage("No Message sent, because MessageType wasn't supported: " + msgType.ToString(),NotificationLevel.Warning);
    391                     return;
    392             }
    393             this.P2PMaster.SendToPeer(msgType, pubPeerId);
    394 
    395             // don't show every single alive message
    396             if(msgType != PubSubMessageType.Alive)
    397                 GuiLogMessage(msgType.ToString() + " message sent to Publisher", NotificationLevel.Info);
    398         }
    399 
    400         // registering isn't possible if no publisher has stored
    401         // his ID in the DHT Entry with the Key TaskName
    402         private void OnRegisteringNotPossible(object state)
    403         {
    404             string sPubId = CheckPublisherAvailability();
    405             // if DHT Entry for Task is empty, there exist no Publisher at present.
    406             // The method PublisherIsAlive starts a Timer for this case to continous proof Publisher-DHT-Entry
    407             if (sPubId != null)
    408                 SendMessage(sPubId, PubSubMessageType.Register);
    409         }
    410 
    411         private void OnSendAliveMessage(object state)
    412         {
    413             SendMessage(actualPublisher, PubSubMessageType.Alive);
    414         }
    415 
    416         private string CheckPublisherAvailability()
    417         {
    418             byte[] bytePubId = P2PMaster.DHTload(this.settings.TopicName);
    419             if (bytePubId == null)
    420             {
    421                 if (timerRegisteringNotPossible == null && !this.bolStopped)
    422                 {
    423                     // if DHT value doesn't exist at this moment, wait for 10 seconds and try again
    424                     timerRegisteringNotPossible = new Timer(OnRegisteringNotPossible, null, 10000, 10000);
    425                 }
    426                 return null;
    427             }
    428 
    429             byte[] byteISettings = P2PMaster.DHTload(this.settings.TopicName + this.sDHTSettingsPostfix);
    430             if (byteISettings == null)
    431             {
    432                 GuiLogMessage("Can't find settings from Publisher for the Subscriber.", NotificationLevel.Error);
    433                 return null;
    434             }
    435             sendAliveMessageInterval = System.BitConverter.ToInt32(byteISettings, 0);
    436 
    437             string sPubId = UTF8Encoding.UTF8.GetString(bytePubId);
    438             GuiLogMessage("RECEIVED: Publishers' peer name '" + sPubId + "', Alive-Msg-Interval: " + sendAliveMessageInterval / 1000 + " sec!", NotificationLevel.Info);
    439 
    440             if (actualPublisher == null) //first time initialization
    441                 actualPublisher = sPubId;
    442 
    443             checkPublishersAvailability = this.settings.CheckPublishersAvailability * 1000;
    444 
    445             // setting timer to check periodical the availability of the publishing peer
    446             if (timerCheckPubAvailability == null && !this.bolStopped)
    447                 timerCheckPubAvailability = new Timer(OnCheckPubAvailability, null, checkPublishersAvailability, checkPublishersAvailability);
    448 
    449             return sPubId;
    450         }
    451 
    452         private void OnCheckPubAvailability(object state)
    453         {
    454             string sNewPubId = CheckPublisherAvailability();
    455 
    456             if (sNewPubId == null)
    457             {
    458                 GuiLogMessage("Publisher wasn't found in DHT or settings didn't stored on the right way.", NotificationLevel.Warning);
    459                 return;
    460             }
    461             if (sNewPubId != actualPublisher)
    462             {
    463                 //Handle case, when publisher changed or isn't active at present (don't reply on response)
    464                 GuiLogMessage("CHANGED: Publisher from '" + actualPublisher
    465                     + "' to '" + sNewPubId + "'!", NotificationLevel.Info);
    466                 actualPublisher = sNewPubId;
    467                 // because the publisher has changed, send a new register msg
    468                 SendMessage(actualPublisher, PubSubMessageType.Register);
    469             }
    470             else
    471             {
    472                 // Timer will be only stopped, when OnMessageReceived-Event received
    473                 // a Pong-Response from the publisher!
    474                 SendMessage(actualPublisher, PubSubMessageType.Ping);
    475                 if (timeoutForPublishersPong == null)
    476                 {
    477                     long interval = this.settings.PublishersReplyTimespan * 1000;
    478                     timeoutForPublishersPong = new Timer(OnTimeoutPublishersPong, null, interval, interval);
    479                 }
    480             }
    481         }
    482 
    483         /// <summary>
    484         /// This callback only get fired, when the publisher didn't sent a response on the register message.
    485         /// </summary>
    486         /// <param name="state"></param>
    487         private void OnTimeoutRegisteringAccepted(object state)
    488         {
    489             GuiLogMessage("TIMEOUT: Waiting for registering accepted message from publisher!", NotificationLevel.Warning);
    490             // TODO: anything
    491         }
    492 
    493         /// <summary>
    494         /// This callback only get fired, when the publisher didn't sent a response on the ping message.
    495         /// </summary>
    496         /// <param name="state"></param>
    497         private void OnTimeoutPublishersPong(object state)
    498         {
    499             GuiLogMessage("Publisher didn't answer on Ping in the given time span!", NotificationLevel.Warning);
    500             timeoutForPublishersPong.Dispose();
    501             timeoutForPublishersPong = null;
    502             // try to get an active publisher and re-register
    503             CheckPublisherAvailability();
    504         }
    505 
    506         /// <summary>
    507         /// Will stop all timers, so Subscriber ends with sending
    508         /// Register-, Alive- and Pong-messages. Furthermore an
    509         /// unregister message will be send to the publisher
    510         /// </summary>
    511         private void StopActiveWork()
    512         {
    513             this.bolStopped = true;
    514             if(actualPublisher != null)
    515                 SendMessage(actualPublisher,PubSubMessageType.Unregister);
    516 
    517             #region stopping all timers, if they are still active
    518             if (this.timerSendingAliveMsg != null)
    519             {
    520                 this.timerSendingAliveMsg.Dispose();
    521                 this.timerSendingAliveMsg = null;
    522             }
    523             if (this.timerRegisteringNotPossible != null)
    524             {
    525                 this.timerRegisteringNotPossible.Dispose();
    526                 this.timerRegisteringNotPossible = null;
    527             }
    528             if (this.timerCheckPubAvailability != null)
    529             {
    530                 this.timerCheckPubAvailability.Dispose();
    531                 this.timerCheckPubAvailability = null;
    532             }
    533             if (this.timeoutForPublishersRegAccept != null)
    534             {
    535                 this.timeoutForPublishersRegAccept.Dispose();
    536                 this.timeoutForPublishersRegAccept = null;
    537             }
    538             if (this.timeoutForPublishersPong != null)
    539             {
    540                 this.timeoutForPublishersPong.Dispose();
    541                 this.timeoutForPublishersPong = null;
    542             }
    543             #endregion
    544         }
    545265        #endregion
    546266    }
  • trunk/CrypPlugins/PeerToPeerSubscriber/P2PSubscriberSettings.cs

    r862 r872  
    107107        }
    108108
    109         private int publishersReplyTimespan = 2;
     109        private int publishersReplyTimespan = 5;
    110110        [TaskPane("Publisher Reply Timespan (in sec)", "When checking publishers availability, ping message is sent. The publisher must answer with a pong message in the timespan!", "Intervals", 0, false, DisplayLevel.Beginner, ControlType.NumericUpDown, ValidationType.RangeInteger, 2, 60)]
    111111        public int PublishersReplyTimespan
  • trunk/CrypPlugins/PeerToPeerSubscriber/PeerToPeerSubscriber.csproj

    r836 r872  
    4242      <RequiredTargetFramework>3.5</RequiredTargetFramework>
    4343    </Reference>
     44    <Reference Include="System.Drawing" />
     45    <Reference Include="System.Windows.Forms" />
    4446    <Reference Include="System.Xml.Linq">
    4547      <RequiredTargetFramework>3.5</RequiredTargetFramework>
     
    5658  <ItemGroup>
    5759    <Compile Include="P2PSubscriber.cs" />
     60    <Compile Include="P2PSubscriberBase.cs" />
    5861    <Compile Include="P2PSubscriberSettings.cs" />
    5962    <Compile Include="Properties\AssemblyInfo.cs" />
Note: See TracChangeset for help on using the changeset viewer.