source: trunk/CrypPlugins/PeerToPeerBase/P2PPeer.cs @ 1696

Last change on this file since 1696 was 1696, checked in by Paul Lelgemann, 12 years ago

o Simplified peer-to-peer settings: overlay and DHT type are now selected by architecture

File size: 22.8 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase.Control;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24using System.ComponentModel;
25using Cryptool.PluginBase.IO;
26using Cryptool.Plugins.PeerToPeer.Internal;
27
28namespace Cryptool.Plugins.PeerToPeer
29{
30    [Author("Christian Arnold", "arnold@cryptool.org", "Uni Duisburg-Essen", "http://www.uni-due.de")]
31    [PluginInfo(false, "P2P_Peer", "Creates a new Peer", "", "PeerToPeerBase/icons/peer_inaktiv.png", "PeerToPeerBase/icons/peer_connecting.png", "PeerToPeerBase/icons/peer_online.png", "PeerToPeerBase/icons/peer_error.png")]
32    public class P2PPeer : IIOMisc
33    {
34        // to forward event from overlay/dht MessageReceived-Event from P2PBase
35        public event P2PBase.P2PMessageReceived OnPeerMessageReceived;
36
37        #region Variables
38
39        public P2PBase p2pBase;
40        private P2PPeerSettings settings;
41        private IP2PControl p2pSlave;
42
43        #endregion
44
45        #region Standard functionality
46
47        public P2PPeer()
48        {
49            this.p2pBase = new P2PBase();
50            this.settings = new P2PPeerSettings(this);
51            this.settings.TaskPaneAttributeChanged += new TaskPaneAttributeChangedHandler(settings_TaskPaneAttributeChanged);
52            this.settings.OnPluginStatusChanged += new StatusChangedEventHandler(settings_OnPluginStatusChanged);
53        }
54
55        public event StatusChangedEventHandler OnPluginStatusChanged;
56        private void settings_OnPluginStatusChanged(IPlugin sender, StatusEventArgs args)
57        {
58            if (OnPluginStatusChanged != null) 
59                OnPluginStatusChanged(this, args);
60        }
61
62        // to forward event from overlay/dht MessageReceived-Event from P2PBase
63        private void p2pBase_OnP2PMessageReceived(PeerId sourceAddr, byte[] data)
64        {
65            if (OnPeerMessageReceived != null)
66                OnPeerMessageReceived(sourceAddr, data);
67        }
68
69        void settings_TaskPaneAttributeChanged(ISettings settings, TaskPaneAttributeChangedEventArgs args)
70        {
71            //throw new NotImplementedException();
72        }
73
74        public ISettings Settings
75        {
76            set { this.settings = (P2PPeerSettings)value; }
77            get { return this.settings; }
78        }
79
80        public System.Windows.Controls.UserControl Presentation
81        {
82            get { return null; }
83        }
84
85        public System.Windows.Controls.UserControl QuickWatchPresentation
86        {
87            get { return null; }
88        }
89
90        public void PreExecution()
91        {
92            StartPeer();
93        }
94
95        public void Execute()
96        {
97            // TODO: For future use copy functionality to Execute instead of PreExecute
98            //       so we don't need the workaround anymore!!!
99            // StartPeer();
100        }
101
102        public void PostExecution()
103        {
104        }
105
106        public void Pause()
107        {
108        }
109
110        public void Stop()
111        {
112        }
113
114        public void Initialize()
115        {
116        }
117
118        public void Dispose()
119        {
120            StopPeer();
121        }
122
123        #endregion
124
125        #region IPlugin Members
126
127        public event GuiLogNotificationEventHandler OnGuiLogNotificationOccured;
128
129        public event PluginProgressChangedEventHandler OnPluginProgressChanged;
130
131        #endregion
132
133        #region INotifyPropertyChanged Members
134
135        public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged;
136
137        public void OnPropertyChanged(string name)
138        {
139            EventsHelper.PropertyChanged(PropertyChanged, this, new PropertyChangedEventArgs(name));
140        }
141
142        public event PluginProgressChangedEventHandler OnPluginProcessChanged;
143
144        private void ProgressChanged(double value, double max)
145        {
146            EventsHelper.ProgressChanged(OnPluginProgressChanged, this, new PluginProgressEventArgs(value, max));
147        }
148
149        private void GuiLogMessage(string p, NotificationLevel notificationLevel)
150        {
151            // for evaluation issues only
152            EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this, new GuiLogEventArgs(p + "(" + DebugToFile.GetTimeStamp() + ")", this, notificationLevel));
153            //EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this, new GuiLogEventArgs(p, this, notificationLevel));
154        }
155
156        #endregion
157
158        #region In and Output
159
160        [PropertyInfo(Direction.ControlSlave, "Master Peer", "One peer to rule them all", "", true, false, DisplayLevel.Beginner, QuickWatchFormat.Text, null)]
161        public IP2PControl P2PControlSlave
162        {
163            get
164            {
165                if (this.p2pSlave == null)
166                    // to commit the settings of the plugin to the IControl
167                    this.p2pSlave = new P2PPeerMaster(this);
168                return this.p2pSlave;
169            }
170        }
171
172        #endregion
173
174        #region Start and Stop Peer
175
176        /// <summary>
177        /// Status flag for starting and stopping peer only once.
178        /// </summary>
179        private bool peerStarted = false;
180        /// <summary>
181        /// Status flag for starting and stopping peer only once.
182        /// </summary>
183        public bool PeerStarted
184        {
185            get { return this.peerStarted; }
186            private set { this.peerStarted = value; }
187        }
188
189        public void StartPeer()
190        {
191            if (!this.PeerStarted)
192            {
193                if (this.p2pBase == null)
194                {
195                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Error); 
196                    GuiLogMessage("Starting Peer failed, because Base-Object is null.",NotificationLevel.Error);
197                    return;
198                }
199
200                this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Connecting);
201
202                this.p2pBase.AllowLoggingToMonitor = this.settings.Log2Monitor;
203
204                // to forward event from overlay/dht MessageReceived-Event from P2PBase
205                this.p2pBase.OnP2PMessageReceived += new P2PBase.P2PMessageReceived(p2pBase_OnP2PMessageReceived);
206
207                if (CheckAndInstallPAPCertificates())
208                {
209                    this.p2pBase.Initialize(this.settings.P2PPeerName, this.settings.P2PWorldName,
210                        (P2PLinkManagerType)this.settings.P2PLinkMngrType, (P2PBootstrapperType)this.settings.P2PBSType,
211                        (P2PArchitecture)this.settings.P2PArchitecture);
212                    this.PeerStarted = this.p2pBase.SynchStart();
213
214                    if (this.PeerStarted)
215                    {
216                        this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
217                    }
218                    string joiningStatus = this.PeerStarted == true ? "successful" : "canceled";
219                    GuiLogMessage("Status of joining the P2P System: " + joiningStatus, NotificationLevel.Info);
220                }
221                else
222                {
223                    GuiLogMessage("Because not all p2p certificates were installed, you can't start the p2p system!", NotificationLevel.Error);
224                }
225            }
226            else
227            {
228                GuiLogMessage("Peer is already started!", NotificationLevel.Info);
229            }
230        }
231
232        public void StopPeer()
233        {
234            if (this.PeerStarted && this.p2pBase != null)
235            {
236                this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Connecting);
237
238                this.PeerStarted = !this.p2pBase.SynchStop();
239
240                if (this.PeerStarted)
241                {
242                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
243                    GuiLogMessage("Peer stopped: " + !this.PeerStarted, NotificationLevel.Warning);
244                }
245                else
246                {
247                    this.p2pBase.OnP2PMessageReceived -= p2pBase_OnP2PMessageReceived;
248                    this.settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.NotConnected);
249                    GuiLogMessage("Peer stopped: " + !this.PeerStarted, NotificationLevel.Info);
250                }
251            }
252            else
253            {
254                GuiLogMessage("Peer is already stopped!", NotificationLevel.Info);
255            }
256        }
257
258        /// <summary>
259        /// Checks if all certificates for using the pap p2p system are installed.
260        /// Otherwise it tries to install the missing certificates. If all operations
261        /// succeed, return value is true. Only when value is true, you can try
262        /// to initialize the PAP System.
263        /// </summary>
264        /// <returns>If all operations succeed, return value is true. Only when value
265        /// is true, you can try to initialize the PAP System.</returns>
266        private bool CheckAndInstallPAPCertificates()
267        {
268            bool retValue = false;
269
270            // get exe directory, because there resides the certificate directory
271            System.Reflection.Assembly assemb = System.Reflection.Assembly.GetEntryAssembly();
272            string applicationDir = System.IO.Path.GetDirectoryName(assemb.Location);
273            // check if all necessary certs are installed
274            GuiLogMessage("Check installation of all certificates, which are necessary to run the p2p system", NotificationLevel.Info);
275            List<PAPCertificate.PAP_Certificates> lstMissingCerts = PAPCertificate.CheckAvailabilityOfPAPCertificates(applicationDir);
276            if (lstMissingCerts.Count == 0)
277            {
278                GuiLogMessage("All neccessary p2p certificates are installed.", NotificationLevel.Info);
279                retValue = true;
280            }
281            else
282            {
283                StringBuilder sbMissingCerts = new StringBuilder();
284                for (int i = 0; i < lstMissingCerts.Count; i++)
285                {
286                    sbMissingCerts.AppendLine(Enum.GetName(typeof(PAPCertificate.PAP_Certificates),lstMissingCerts[i]));
287                }
288                GuiLogMessage("Following certificates are missing. They will be installed now.\n" + sbMissingCerts.ToString(), NotificationLevel.Info);
289
290                // try/catch neccessary because the CT-Editor doesn't support the whole exception display process (e.g. shows only "unknown error.")
291                try
292                {
293                    if (PAPCertificate.InstallMissingCertificates(lstMissingCerts, applicationDir))
294                    {
295                        GuiLogMessage("Installation of all missing certificates was successful.", NotificationLevel.Info);
296                        retValue = true;
297                    }
298                    else
299                    {
300                        GuiLogMessage("No/not all missing certificates were installed successful.", NotificationLevel.Error);
301                    }
302                }
303                catch (Exception ex)
304                {
305                    GuiLogMessage("Error occured while installing certificates. Exception: " + ex.ToString(), NotificationLevel.Error);
306                }
307            }
308            return retValue;
309        }
310
311        public void LogInternalState()
312        {
313            if(this.p2pBase != null)
314                this.p2pBase.LogInternalState();
315        }
316        #endregion
317    }
318
319    public class P2PPeerMaster : IP2PControl
320    {
321        private AutoResetEvent systemJoined;
322        private P2PPeer p2pPeer;
323        private PeerId peerID;
324        private string sPeerName;
325        // used for every encoding stuff
326        private Encoding enc = UTF8Encoding.UTF8;
327
328        public P2PPeerMaster(P2PPeer p2pPeer)
329        {
330            this.p2pPeer = p2pPeer;
331            this.systemJoined = new AutoResetEvent(false);
332
333            this.p2pPeer.p2pBase.OnSystemJoined += new P2PBase.SystemJoined(p2pBase_OnSystemJoined);
334            this.p2pPeer.OnPeerMessageReceived += new P2PBase.P2PMessageReceived(p2pPeer_OnPeerMessageReceived);
335            this.OnStatusChanged += new IControlStatusChangedEventHandler(P2PPeerMaster_OnStatusChanged);
336        }
337
338        #region Events and Event-Handling
339       
340        private void p2pBase_OnSystemJoined()
341        {
342            systemJoined.Set();
343        }
344
345        // to forward event from overlay MessageReceived-Event from P2PBase
346        // analyzes the type of message and throws depend upon this anaysis an event
347        public event P2PPayloadMessageReceived OnPayloadMessageReceived;
348        public event P2PSystemMessageReceived OnSystemMessageReceived;
349        private void p2pPeer_OnPeerMessageReceived(PeerId sourceAddr, byte[] data)
350        {
351            switch (GetMessageType(data[0])) //analyses the first byte of data (index, which represents the MessageType)
352            {
353                case P2PMessageIndex.PubSub:
354                    if (data.Length == 2)
355                    {
356                        if(OnSystemMessageReceived != null)
357                            OnSystemMessageReceived(sourceAddr, GetPubSubType(data[1]));
358                    }
359                    else
360                    {
361                        throw (new Exception("Data seems to be from type 'PubSub', but is to long for it... Data: '" + enc.GetString(data) + "'"));
362                    }
363                    break;
364                case P2PMessageIndex.Payload:
365                    if(OnPayloadMessageReceived != null)
366                        OnPayloadMessageReceived(sourceAddr, GetMessagePayload(data));
367                    break;
368                default:
369                    // not implemented. System ignores these messages completely at present
370                    break;
371            }
372        }
373
374        public event IControlStatusChangedEventHandler OnStatusChanged;
375        private void P2PPeerMaster_OnStatusChanged(IControl sender, bool readyForExecution)
376        {
377            if (OnStatusChanged != null)
378                OnStatusChanged(sender, readyForExecution);
379        }
380
381        #endregion
382
383        public bool PeerStarted()
384        {
385            return this.p2pPeer.PeerStarted;
386        }
387
388        /// <summary>
389        /// workaround method. If the PAP functions are used, but the PAP system isn't
390        /// started yet. This could happen because of the plugin hierarchy and
391        /// when a p2p-using plugin uses PAP functions in the PreExecution method,
392        /// this could run into a race condition (peer plugin not computed by the CT-system
393        /// yet, but p2p-using plugin is already executed)
394        /// </summary>
395        /// <returns></returns>
396        private bool SystemJoinedCompletely()
397        {
398            if (!this.p2pPeer.PeerStarted)
399            {
400                this.p2pPeer.StartPeer();
401                this.systemJoined.WaitOne();
402            }
403            return true;
404        }
405
406        #region IP2PControl Members
407
408        public bool DHTstore(string sKey, byte[] byteValue)
409        {
410            if(SystemJoinedCompletely())
411                return this.p2pPeer.p2pBase.SynchStore(sKey, byteValue);
412            return false;
413        }
414
415        public bool DHTstore(string sKey, string sValue)
416        {
417            if (SystemJoinedCompletely())
418                return this.p2pPeer.p2pBase.SynchStore(sKey, sValue);
419            return false;
420        }
421
422        public byte[] DHTload(string sKey)
423        {
424            if (SystemJoinedCompletely())
425                return this.p2pPeer.p2pBase.SynchRetrieve(sKey);
426            return null;
427        }
428
429        public bool DHTremove(string sKey)
430        {
431            if (SystemJoinedCompletely())
432                return this.p2pPeer.p2pBase.SynchRemove(sKey);
433            return false;
434        }
435
436        /// <summary>
437        /// This method only contacts the p2p system, if the peerID wasn't requested before
438        /// </summary>
439        /// <param name="sPeerName">returns the Peer Name</param>
440        /// <returns>returns the Peer ID</returns>
441        public PeerId GetPeerID(out string sPeerName)
442        {
443            if (SystemJoinedCompletely())
444            {
445                if (this.peerID == null)
446                {
447                    this.peerID = this.p2pPeer.p2pBase.GetPeerID(out this.sPeerName);
448                }
449                sPeerName = this.sPeerName;
450                return this.peerID;
451            }
452            sPeerName = this.sPeerName;
453            return null;
454        }
455
456        public PeerId GetPeerID(byte[] byteId)
457        {
458            return p2pPeer.p2pBase.GetPeerID(byteId);
459        }
460
461        private void SendReadilyMessage(byte[] data, PeerId destinationAddress)
462        {
463            if (SystemJoinedCompletely())
464                this.p2pPeer.p2pBase.SendToPeer(data, destinationAddress.ToByteArray());
465        }
466
467        // adds the P2PMessageIndex to the given byte-array
468        public void SendToPeer(byte[] data, PeerId destinationAddress)
469        {
470            byte[] newData = GenerateMessage(data, P2PMessageIndex.Payload);
471            SendReadilyMessage(newData, destinationAddress);
472        }
473
474        public void SendToPeer(string sData, PeerId destinationAddress)
475        {
476            byte[] data = GenerateMessage(sData, P2PMessageIndex.Payload);
477            SendReadilyMessage(data, destinationAddress);
478        }
479        public void SendToPeer(PubSubMessageType msgType, PeerId destinationAddress)
480        {
481            byte[] data = GenerateMessage(msgType);
482            SendReadilyMessage(data, destinationAddress);
483        }
484
485        #region Communication protocol
486
487        /// <summary>
488        /// generates a ct2- and p2p-compatible and processable message
489        /// </summary>
490        /// <param name="payload">payload data in bytes</param>
491        /// <param name="msgIndex">type of message (system message, simple payload for a special use case, etc.)</param>
492        /// <returns>the message, which is processable by the ct2/p2p system</returns>
493        private byte[] GenerateMessage(byte[] payload, P2PMessageIndex msgIndex)
494        {
495            // first byte is the index, if it is payload or Publish/Subscriber stuff
496            byte[] retByte = new byte[1 + payload.Length];
497            retByte[0] = (byte)msgIndex;
498            payload.CopyTo(retByte, 1);
499            return retByte;
500        }
501
502        /// <summary>
503        /// generates a ct2- and p2p-compatible and processable message
504        /// </summary>
505        /// <param name="sPayload">payload data as a string</param>
506        /// <param name="msgIndex">type of message (system message, simple payload for a special use case, etc.)</param>
507        /// <returns>the message, which is processable by the ct2/p2p system</returns>
508        private byte[] GenerateMessage(string sPayload, P2PMessageIndex msgIndex)
509        {
510            return GenerateMessage(enc.GetBytes(sPayload), msgIndex);
511        }
512
513        /// <summary>
514        /// generates a ct2- and p2p-compatible and processable message
515        /// </summary>
516        /// <param name="pubSubData">PubSubMessageType</param>
517        /// <returns>the message, which is processable by the ct2/p2p system<</returns>
518        private byte[] GenerateMessage(PubSubMessageType pubSubData)
519        {
520            byte[] bytePubSubData = new byte[] { (byte)pubSubData };
521            return GenerateMessage(bytePubSubData, P2PMessageIndex.PubSub);
522        }
523
524        /// <summary>
525        /// returns the message type, e.g. PubSub or Payload message
526        /// </summary>
527        /// <param name="msgType">the FIRST byte of a raw message, received by the system</param>
528        /// <returns>the message type</returns>
529        private P2PMessageIndex GetMessageType(byte msgType)
530        {
531            try
532            {
533                return (P2PMessageIndex)msgType;
534            }
535            catch (Exception ex)
536            {
537                throw (ex);
538            }
539        }
540
541        /// <summary>
542        /// returns the message type, e.g. PubSub or Payload message (to accelarate this process, only assign first byte of the whole array message)
543        /// </summary>
544        /// <param name="message">the whole message as an byte array</param>
545        /// <returns>the message type</returns>
546        private P2PMessageIndex GetMessageType(byte[] message)
547        {
548            try
549            {
550                return (P2PMessageIndex)message[0];
551            }
552            catch (Exception ex)
553            {
554                throw (ex);
555            }
556        }
557
558        /// <summary>
559        /// returns only the payload part of the message
560        /// </summary>
561        /// <param name="message">the raw message, received by the system, as an byte array (with the first index byte!!!)</param>
562        /// <returns>only the payload part of the message</returns>
563        private byte[] GetMessagePayload(byte[] message)
564        {
565            if (message.Length > 1)
566            {
567                byte[] retMsg = new byte[message.Length - 1];
568                // workaround because CopyTo doesn't work...
569                //for (int i = 0; i < message.Length-1; i++)
570                //{
571                //    retMsg[i] = message[i + 1];
572                //}
573                Buffer.BlockCopy(message, 1, retMsg, 0, retMsg.Length);
574                return retMsg;
575            }
576            return null;
577        }
578
579        #endregion
580
581
582        /// <summary>
583        /// Converts a string to the PubSubMessageType if possible. Otherwise return null.
584        /// </summary>
585        /// <param name="sData">Data</param>
586        /// <returns>PubSubMessageType if possible. Otherwise null.</returns>
587        private PubSubMessageType GetPubSubType(byte data)
588        {
589            // Convert one byte data to PublishSubscribeMessageType-Enum
590            try
591            {
592                return (PubSubMessageType)data;
593            }
594            catch (Exception ex)
595            {
596                throw(ex);
597            }
598        }
599
600        #endregion
601    }
602}
Note: See TracBrowser for help on using the repository browser.