source: trunk/CrypPlugins/PeerToPeerBaseProxy/P2PPeer.cs @ 1436

Last change on this file since 1436 was 1436, checked in by Paul Lelgemann, 12 years ago

+ PeerToPeerBaseProxy: events are handled now, but functionality for stopping the workspace still missing

File size: 17.1 KB
Line 
1/* Copyright 2009 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using System.Threading;
21using Cryptool.PluginBase.Control;
22using Cryptool.PluginBase;
23using Cryptool.PluginBase.Miscellaneous;
24using System.ComponentModel;
25using Cryptool.PluginBase.IO;
26using Cryptool.Plugins.PeerToPeer.Internal;
27using Cryptool.P2P;
28using Cryptool.P2P.Internal;
29
30namespace Cryptool.Plugins.PeerToPeerProxy
31{
32    [Author("Christian Arnold", "arnold@cryptool.org", "Uni Duisburg-Essen", "http://www.uni-due.de")]
33    [PluginInfo(false, "P2P_Peer_Proxy", "Creates a new Peer", "", "PeerToPeerBase/icons/peer_inaktiv.png", "PeerToPeerBase/icons/peer_connecting.png", "PeerToPeerBase/icons/peer_online.png", "PeerToPeerBase/icons/peer_error.png")]
34    public class P2PPeer : IIOMisc
35    {
36        // to forward event from overlay/dht MessageReceived-Event from P2PBase
37        public event P2PBase.P2PMessageReceived OnPeerMessageReceived;
38
39        #region Variables
40
41        private P2PPeerSettings settings;
42        private IP2PControl p2pSlave;
43
44        #endregion
45
46        #region Standard functionality
47
48        public P2PPeer()
49        {
50            this.settings = new P2PPeerSettings(this);
51            this.settings.TaskPaneAttributeChanged += new TaskPaneAttributeChangedHandler(settings_TaskPaneAttributeChanged);
52            this.settings.OnPluginStatusChanged += new StatusChangedEventHandler(settings_OnPluginStatusChanged);
53        }
54
55        public event StatusChangedEventHandler OnPluginStatusChanged;
56        private void settings_OnPluginStatusChanged(IPlugin sender, StatusEventArgs args)
57        {
58            if (OnPluginStatusChanged != null)
59                OnPluginStatusChanged(this, args);
60        }
61
62        // to forward event from overlay/dht MessageReceived-Event from P2PBase
63        private void p2pBase_OnP2PMessageReceived(PeerId sourceAddr, byte[] data)
64        {
65            if (OnPeerMessageReceived != null)
66                OnPeerMessageReceived(sourceAddr, data);
67        }
68
69        void settings_TaskPaneAttributeChanged(ISettings settings, TaskPaneAttributeChangedEventArgs args)
70        {
71            //throw new NotImplementedException();
72        }
73
74        public ISettings Settings
75        {
76            set { this.settings = (P2PPeerSettings)value; }
77            get { return this.settings; }
78        }
79
80        public System.Windows.Controls.UserControl Presentation
81        {
82            get { return null; }
83        }
84
85        public System.Windows.Controls.UserControl QuickWatchPresentation
86        {
87            get { return null; }
88        }
89
90        public void PreExecution()
91        {
92            StartPeer();
93        }
94
95        public void Execute()
96        {
97            // TODO: For future use copy functionality to Execute instead of PreExecute
98            //       so we don't need the workaround anymore!!!
99            // StartPeer();
100        }
101
102        public void PostExecution()
103        {
104        }
105
106        public void Pause()
107        {
108        }
109
110        public void Stop()
111        {
112        }
113
114        public void Initialize()
115        {
116        }
117
118        public void Dispose()
119        {
120            StopPeer();
121        }
122
123        #endregion
124
125        #region IPlugin Members
126
127        public event GuiLogNotificationEventHandler OnGuiLogNotificationOccured;
128
129        public event PluginProgressChangedEventHandler OnPluginProgressChanged;
130
131        #endregion
132
133        #region INotifyPropertyChanged Members
134
135        public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged;
136
137        public void OnPropertyChanged(string name)
138        {
139            EventsHelper.PropertyChanged(PropertyChanged, this, new PropertyChangedEventArgs(name));
140        }
141
142        public event PluginProgressChangedEventHandler OnPluginProcessChanged;
143
144        private void ProgressChanged(double value, double max)
145        {
146            EventsHelper.ProgressChanged(OnPluginProgressChanged, this, new PluginProgressEventArgs(value, max));
147        }
148
149        private void GuiLogMessage(string p, NotificationLevel notificationLevel)
150        {
151            // for evaluation issues only
152            EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this, new GuiLogEventArgs(p + "(" + DebugToFile.GetTimeStamp() + ")", this, notificationLevel));
153            //EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this, new GuiLogEventArgs(p, this, notificationLevel));
154        }
155
156        #endregion
157
158        #region In and Output
159
160        [PropertyInfo(Direction.ControlSlave, "Master Peer", "One peer to rule them all", "", true, false, DisplayLevel.Beginner, QuickWatchFormat.Text, null)]
161        public IP2PControl P2PControlSlave
162        {
163            get
164            {
165                if (this.p2pSlave == null)
166                    // to commit the settings of the plugin to the IControl
167                    this.p2pSlave = new P2PPeerMaster(this);
168                return this.p2pSlave;
169            }
170        }
171
172        #endregion
173
174        #region Start and Stop Peer
175        /// <summary>
176        /// Status flag for starting and stopping peer only once.
177        /// </summary>
178        public bool PeerStarted()
179        {
180            return P2PManager.Instance.P2PConnected();
181        }
182
183        public void StartPeer()
184        {
185            GuiLogMessage("Peer is already started by CrypTool!", NotificationLevel.Info);
186            P2PManager.Instance.P2PBase.OnP2PMessageReceived += new P2PBase.P2PMessageReceived(p2pBase_OnP2PMessageReceived);
187        }
188
189        public void StopPeer()
190        {
191
192            GuiLogMessage("Peer cannot be stopped, it is running in CrypTool!", NotificationLevel.Info);
193        }
194        #endregion Start and Stop Peer
195
196        public void LogInternalState()
197        {
198            P2PManager.Instance.P2PBase.LogInternalState();
199        }
200    }
201
202
203    public class P2PPeerMaster : IP2PControl
204    {
205        private AutoResetEvent systemJoined;
206        private P2PPeer p2pPeer;
207        private PeerId peerID;
208        private string sPeerName;
209        // used for every encoding stuff
210        private Encoding enc = UTF8Encoding.UTF8;
211
212        public P2PPeerMaster(P2PPeer p2pPeer)
213        {
214            this.p2pPeer = p2pPeer;
215            this.systemJoined = new AutoResetEvent(false);
216
217            P2PManager.Instance.P2PBase.OnSystemJoined += new P2PBase.SystemJoined(p2pBase_OnSystemJoined);
218            P2PManager.Instance.OnPeerMessageReceived += new P2PBase.P2PMessageReceived(p2pPeer_OnPeerMessageReceived);
219            this.OnStatusChanged += new IControlStatusChangedEventHandler(P2PPeerMaster_OnStatusChanged);
220        }
221
222        #region Events and Event-Handling
223       
224        private void p2pBase_OnSystemJoined()
225        {
226            systemJoined.Set();
227        }
228
229        // to forward event from overlay MessageReceived-Event from P2PBase
230        // analyzes the type of message and throws depend upon this anaysis an event
231        public event P2PPayloadMessageReceived OnPayloadMessageReceived;
232        public event P2PSystemMessageReceived OnSystemMessageReceived;
233        private void p2pPeer_OnPeerMessageReceived(PeerId sourceAddr, byte[] data)
234        {
235            switch (GetMessageType(data[0])) //analyses the first byte of data (index, which represents the MessageType)
236            {
237                case P2PMessageIndex.PubSub:
238                    if (data.Length == 2)
239                    {
240                        if(OnSystemMessageReceived != null)
241                            OnSystemMessageReceived(sourceAddr, GetPubSubType(data[1]));
242                    }
243                    else
244                    {
245                        throw (new Exception("Data seems to be from type 'PubSub', but is to long for it... Data: '" + enc.GetString(data) + "'"));
246                    }
247                    break;
248                case P2PMessageIndex.Payload:
249                    if(OnPayloadMessageReceived != null)
250                        OnPayloadMessageReceived(sourceAddr, GetMessagePayload(data));
251                    break;
252                default:
253                    // not implemented. System ignores these messages completely at present
254                    break;
255            }
256        }
257
258        public event IControlStatusChangedEventHandler OnStatusChanged;
259        private void P2PPeerMaster_OnStatusChanged(IControl sender, bool readyForExecution)
260        {
261            if (OnStatusChanged != null)
262                OnStatusChanged(sender, readyForExecution);
263        }
264
265        #endregion
266
267        public bool PeerStarted()
268        {
269            return this.p2pPeer.PeerStarted();
270        }
271
272        /// <summary>
273        /// workaround method. If the PAP functions are used, but the PAP system isn't
274        /// started yet. This could happen because of the plugin hierarchy and
275        /// when a p2p-using plugin uses PAP functions in the PreExecution method,
276        /// this could run into a race condition (peer plugin not computed by the CT-system
277        /// yet, but p2p-using plugin is already executed)
278        /// </summary>
279        /// <returns></returns>
280        private bool SystemJoinedCompletely()
281        {
282            return P2PManager.Instance.P2PConnected();
283        }
284
285        #region IP2PControl Members
286
287        public bool DHTstore(string sKey, byte[] byteValue)
288        {
289            if (P2PManager.Instance.P2PConnected())
290                return P2PManager.Store(sKey, byteValue);
291            return false;
292        }
293
294        public bool DHTstore(string sKey, string sValue)
295        {
296            if (P2PManager.Instance.P2PConnected())
297                return P2PManager.Store(sKey, sValue);
298            return false;
299        }
300
301        public byte[] DHTload(string sKey)
302        {
303            if (P2PManager.Instance.P2PConnected())
304                return P2PManager.Retrieve(sKey);
305            return null;
306        }
307
308        public bool DHTremove(string sKey)
309        {
310            if (P2PManager.Instance.P2PConnected())
311                return P2PManager.Remove(sKey);
312            return false;
313        }
314
315        /// <summary>
316        /// This method only contacts the p2p system, if the peerID wasn't requested before
317        /// </summary>
318        /// <param name="sPeerName">returns the Peer Name</param>
319        /// <returns>returns the Peer ID</returns>
320        public PeerId GetPeerID(out string sPeerName)
321        {
322            if (SystemJoinedCompletely())
323            {
324                if (this.peerID == null)
325                {
326                    this.peerID = P2PManager.Instance.P2PBase.GetPeerID(out this.sPeerName);
327                }
328                sPeerName = this.sPeerName;
329                return this.peerID;
330            }
331            sPeerName = this.sPeerName;
332            return null;
333        }
334
335        public PeerId GetPeerID(byte[] byteId)
336        {
337            return P2PManager.Instance.P2PBase.GetPeerID(byteId);
338        }
339
340        private void SendReadilyMessage(byte[] data, PeerId destinationAddress)
341        {
342            if (SystemJoinedCompletely())
343                P2PManager.Instance.P2PBase.SendToPeer(data, destinationAddress.ToByteArray());
344        }
345
346        // adds the P2PMessageIndex to the given byte-array
347        public void SendToPeer(byte[] data, PeerId destinationAddress)
348        {
349            byte[] newData = GenerateMessage(data, P2PMessageIndex.Payload);
350            SendReadilyMessage(newData, destinationAddress);
351        }
352
353        public void SendToPeer(string sData, PeerId destinationAddress)
354        {
355            byte[] data = GenerateMessage(sData, P2PMessageIndex.Payload);
356            SendReadilyMessage(data, destinationAddress);
357        }
358        public void SendToPeer(PubSubMessageType msgType, PeerId destinationAddress)
359        {
360            byte[] data = GenerateMessage(msgType);
361            SendReadilyMessage(data, destinationAddress);
362        }
363
364        #region Communication protocol
365
366        /// <summary>
367        /// generates a ct2- and p2p-compatible and processable message
368        /// </summary>
369        /// <param name="payload">payload data in bytes</param>
370        /// <param name="msgIndex">type of message (system message, simple payload for a special use case, etc.)</param>
371        /// <returns>the message, which is processable by the ct2/p2p system</returns>
372        private byte[] GenerateMessage(byte[] payload, P2PMessageIndex msgIndex)
373        {
374            // first byte is the index, if it is payload or Publish/Subscriber stuff
375            byte[] retByte = new byte[1 + payload.Length];
376            retByte[0] = (byte)msgIndex;
377            payload.CopyTo(retByte, 1);
378            return retByte;
379        }
380
381        /// <summary>
382        /// generates a ct2- and p2p-compatible and processable message
383        /// </summary>
384        /// <param name="sPayload">payload data as a string</param>
385        /// <param name="msgIndex">type of message (system message, simple payload for a special use case, etc.)</param>
386        /// <returns>the message, which is processable by the ct2/p2p system</returns>
387        private byte[] GenerateMessage(string sPayload, P2PMessageIndex msgIndex)
388        {
389            return GenerateMessage(enc.GetBytes(sPayload), msgIndex);
390        }
391
392        /// <summary>
393        /// generates a ct2- and p2p-compatible and processable message
394        /// </summary>
395        /// <param name="pubSubData">PubSubMessageType</param>
396        /// <returns>the message, which is processable by the ct2/p2p system<</returns>
397        private byte[] GenerateMessage(PubSubMessageType pubSubData)
398        {
399            byte[] bytePubSubData = new byte[] { (byte)pubSubData };
400            return GenerateMessage(bytePubSubData, P2PMessageIndex.PubSub);
401        }
402
403        /// <summary>
404        /// returns the message type, e.g. PubSub or Payload message
405        /// </summary>
406        /// <param name="msgType">the FIRST byte of a raw message, received by the system</param>
407        /// <returns>the message type</returns>
408        private P2PMessageIndex GetMessageType(byte msgType)
409        {
410            try
411            {
412                return (P2PMessageIndex)msgType;
413            }
414            catch (Exception ex)
415            {
416                throw (ex);
417            }
418        }
419
420        /// <summary>
421        /// returns the message type, e.g. PubSub or Payload message (to accelarate this process, only assign first byte of the whole array message)
422        /// </summary>
423        /// <param name="message">the whole message as an byte array</param>
424        /// <returns>the message type</returns>
425        private P2PMessageIndex GetMessageType(byte[] message)
426        {
427            try
428            {
429                return (P2PMessageIndex)message[0];
430            }
431            catch (Exception ex)
432            {
433                throw (ex);
434            }
435        }
436
437        /// <summary>
438        /// returns only the payload part of the message
439        /// </summary>
440        /// <param name="message">the raw message, received by the system, as an byte array (with the first index byte!!!)</param>
441        /// <returns>only the payload part of the message</returns>
442        private byte[] GetMessagePayload(byte[] message)
443        {
444            if (message.Length > 1)
445            {
446                byte[] retMsg = new byte[message.Length - 1];
447                // workaround because CopyTo doesn't work...
448                //for (int i = 0; i < message.Length-1; i++)
449                //{
450                //    retMsg[i] = message[i + 1];
451                //}
452                Buffer.BlockCopy(message, 1, retMsg, 0, retMsg.Length);
453                return retMsg;
454            }
455            return null;
456        }
457
458        #endregion
459
460
461        /// <summary>
462        /// Converts a string to the PubSubMessageType if possible. Otherwise return null.
463        /// </summary>
464        /// <param name="sData">Data</param>
465        /// <returns>PubSubMessageType if possible. Otherwise null.</returns>
466        private PubSubMessageType GetPubSubType(byte data)
467        {
468            // Convert one byte data to PublishSubscribeMessageType-Enum
469            try
470            {
471                return (PubSubMessageType)data;
472            }
473            catch (Exception ex)
474            {
475                throw(ex);
476            }
477        }
478
479        #endregion
480    }
481}
Note: See TracBrowser for help on using the repository browser.