source: trunk/CrypPlugins/PeerToPeerBaseProxy/P2PPeer.cs @ 1709

Last change on this file since 1709 was 1665, checked in by Paul Lelgemann, 12 years ago

+ CrypP2P: Return types of synchronous methods Store/Retrieve/Remove changed
o Work on the distributed KeySearcher

File size: 18.1 KB
Line 
1/*
2   Copyright 2010 Paul Lelgemann, University of Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.ComponentModel;
19using System.Text;
20using System.Threading;
21using System.Windows.Controls;
22using Cryptool.P2P;
23using Cryptool.P2P.Internal;
24using Cryptool.PluginBase;
25using Cryptool.PluginBase.Control;
26using Cryptool.PluginBase.IO;
27using Cryptool.PluginBase.Miscellaneous;
28using Cryptool.Plugins.PeerToPeer.Internal;
29
30namespace Cryptool.Plugins.PeerToPeerProxy
31{
32    [Author("Paul Lelgemann", "lelgemann@cryptool.org", "Uni Duisburg-Essen", "http://www.uni-due.de")]
33    [PluginInfo(false, "P2P_Peer_Proxy",
34        "Creates a new Peer. Uses the CrypTool2 built-in P2P network and can be used as a replacement for P2P_Peer.", ""
35        , "PeerToPeerBaseProxy/icons/peer_inactive.png", "PeerToPeerBaseProxy/icons/peer_connecting.png",
36        "PeerToPeerBaseProxy/icons/peer_online.png", "PeerToPeerBaseProxy/icons/peer_error.png")]
37    public class P2PPeer : IIOMisc
38    {
39        #region IIOMisc Members
40
41        public event GuiLogNotificationEventHandler OnGuiLogNotificationOccured;
42
43        public event PluginProgressChangedEventHandler OnPluginProgressChanged;
44
45        public event PropertyChangedEventHandler PropertyChanged;
46
47        #endregion
48
49        // to forward event from overlay/dht MessageReceived-Event from P2PBase
50        public event P2PBase.P2PMessageReceived OnPeerMessageReceived;
51
52        public void OnPropertyChanged(string name)
53        {
54            EventsHelper.PropertyChanged(PropertyChanged, this, new PropertyChangedEventArgs(name));
55        }
56
57        private void GuiLogMessage(string p, NotificationLevel notificationLevel)
58        {
59            // for evaluation issues only
60            EventsHelper.GuiLogMessage(OnGuiLogNotificationOccured, this,
61                                       new GuiLogEventArgs(p + "(" + DebugToFile.GetTimeStamp() + ")", this,
62                                                           notificationLevel));
63        }
64
65        #region In and Output
66
67        [PropertyInfo(Direction.ControlSlave, "Master Peer", "One peer to rule them all", "", true, false,
68            DisplayLevel.Beginner, QuickWatchFormat.Text, null)]
69        public IP2PControl P2PControlSlave
70        {
71            get { return _p2PSlave ?? (_p2PSlave = new P2PPeerMaster(this)); }
72        }
73
74        #endregion
75
76        #region Start and Stop Peer
77
78        /// <summary>
79        /// Status flag for starting and stopping peer only once.
80        /// </summary>
81        public bool PeerStarted()
82        {
83            return P2PManager.IsConnected && !P2PManager.IsConnecting;
84        }
85
86        public void StartPeer()
87        {
88            _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Connecting);
89            P2PManager.P2PBase.OnP2PMessageReceived += P2PBaseOnP2PMessageReceived;
90
91            if (P2PManager.IsConnected)
92            {
93                GuiLogMessage("P2P connected.", NotificationLevel.Info);
94                _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
95            }
96            else if (!P2PManager.IsConnected && P2PManager.IsConnecting)
97            {
98                HandleAlreadyConnecting();
99            }
100            else
101            {
102                if (_settings.Autoconnect)
103                {
104                    HandleAutoconnect();
105                }
106                else
107                {
108                    GuiLogMessage("P2P network offline. Please check the autoconnect checkbox of the proxy plugin or connect manually first.",
109                                      NotificationLevel.Error);
110                    _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Error);
111                    throw new ApplicationException("Workaround for wrong error handling... Workspace should be stopped now.");
112                }
113            }
114        }
115
116        private void HandleAlreadyConnecting()
117        {
118            P2PManager.ConnectionManager.OnP2PConnectionStateChangeOccurred += HandleConnectionStateChange;
119            _connectResetEvent = new AutoResetEvent(false);
120            _connectResetEvent.WaitOne();
121
122            if (P2PManager.IsConnected)
123            {
124                GuiLogMessage("P2P connected.", NotificationLevel.Info);
125                _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
126            }
127            else
128            {
129                GuiLogMessage("P2P network could not be connected.",
130                              NotificationLevel.Error);
131                _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Error);
132                throw new ApplicationException("Workaround for wrong error handling... Workspace should be stopped now.");
133            }
134        }
135
136        private void HandleAutoconnect()
137        {
138            P2PManager.ConnectionManager.OnP2PConnectionStateChangeOccurred += HandleConnectionStateChange;
139            _connectResetEvent = new AutoResetEvent(false);
140
141            P2PManager.Connect();
142
143            _connectResetEvent.WaitOne();
144
145            if (P2PManager.IsConnected)
146            {
147                GuiLogMessage("P2P network was connected due to plugin setting.",
148                              NotificationLevel.Info);
149                _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Online);
150            }
151            else
152            {
153                GuiLogMessage("P2P network could not be connected.",
154                              NotificationLevel.Error);
155                _settings.PeerStatusChanged(P2PPeerSettings.PeerStatus.Error);
156                throw new ApplicationException("Workaround for wrong error handling... Workspace should be stopped now.");
157            }
158        }
159
160        void HandleConnectionStateChange(object sender, bool newState)
161        {
162            _connectResetEvent.Set();
163        }
164
165        public void StopPeer()
166        {
167            P2PManager.P2PBase.OnP2PMessageReceived -= P2PBaseOnP2PMessageReceived;
168            GuiLogMessage("Removed event registration, but peer cannot be stopped, it is running in CrypTool!", NotificationLevel.Info);
169        }
170
171        #endregion Start and Stop Peer
172
173        #region Variables
174
175        private IP2PControl _p2PSlave;
176        private P2PPeerSettings _settings;
177        private AutoResetEvent _connectResetEvent;
178
179        #endregion
180
181        #region Standard functionality
182
183        public P2PPeer()
184        {
185            _settings = new P2PPeerSettings();
186            _settings.OnPluginStatusChanged += SettingsOnPluginStatusChanged;
187        }
188
189        public event StatusChangedEventHandler OnPluginStatusChanged;
190
191        public ISettings Settings
192        {
193            set { _settings = (P2PPeerSettings) value; }
194            get { return _settings; }
195        }
196
197        public UserControl Presentation
198        {
199            get { return null; }
200        }
201
202        public UserControl QuickWatchPresentation
203        {
204            get { return null; }
205        }
206
207        public void PreExecution()
208        {
209            StartPeer();
210        }
211
212        public void Execute()
213        {
214        }
215
216        public void PostExecution()
217        {
218        }
219
220        public void Pause()
221        {
222        }
223
224        public void Stop()
225        {
226        }
227
228        public void Initialize()
229        {
230        }
231
232        public void Dispose()
233        {
234            StopPeer();
235        }
236
237        private void SettingsOnPluginStatusChanged(IPlugin sender, StatusEventArgs args)
238        {
239            if (OnPluginStatusChanged != null)
240                OnPluginStatusChanged(this, args);
241        }
242
243        // to forward event from overlay/dht MessageReceived-Event from P2PBase
244        private void P2PBaseOnP2PMessageReceived(PeerId sourceAddr, byte[] data)
245        {
246            if (OnPeerMessageReceived != null)
247                OnPeerMessageReceived(sourceAddr, data);
248        }
249
250        private static void SettingsTaskPaneAttributeChanged(ISettings settings, TaskPaneAttributeChangedEventArgs args)
251        {
252        }
253
254        #endregion
255    }
256
257
258    public class P2PPeerMaster : IP2PControl
259    {
260        private readonly Encoding _enc = Encoding.UTF8;
261        private readonly P2PPeer _p2PPeer;
262        private readonly AutoResetEvent _systemJoined;
263        private PeerId _peerId;
264        private string _sPeerName;
265        // used for every encoding stuff
266
267        public P2PPeerMaster(P2PPeer p2PPeer)
268        {
269            _p2PPeer = p2PPeer;
270            _systemJoined = new AutoResetEvent(false);
271
272            P2PManager.P2PBase.OnSystemJoined += P2PBaseOnSystemJoined;
273            P2PManager.P2PBase.OnP2PMessageReceived += P2PPeerOnPeerMessageReceived;
274            OnStatusChanged += P2PPeerMaster_OnStatusChanged;
275        }
276
277        #region Events and Event-Handling
278
279        // to forward event from overlay MessageReceived-Event from P2PBase
280        // analyzes the type of message and throws depend upon this anaysis an event
281        public event P2PPayloadMessageReceived OnPayloadMessageReceived;
282        public event P2PSystemMessageReceived OnSystemMessageReceived;
283
284        public event IControlStatusChangedEventHandler OnStatusChanged;
285
286        private void P2PBaseOnSystemJoined()
287        {
288            _systemJoined.Set();
289        }
290
291        private void P2PPeerOnPeerMessageReceived(PeerId sourceAddr, byte[] data)
292        {
293            switch (GetMessageType(data[0])) //analyses the first byte of data (index, which represents the MessageType)
294            {
295                case P2PMessageIndex.PubSub:
296                    if (data.Length == 2)
297                    {
298                        if (OnSystemMessageReceived != null)
299                            OnSystemMessageReceived(sourceAddr, GetPubSubType(data[1]));
300                    }
301                    else
302                    {
303                        throw (new Exception("Data seems to be from type 'PubSub', but is to long for it... Data: '" +
304                                             _enc.GetString(data) + "'"));
305                    }
306                    break;
307                case P2PMessageIndex.Payload:
308                    if (OnPayloadMessageReceived != null)
309                        OnPayloadMessageReceived(sourceAddr, GetMessagePayload(data));
310                    break;
311                default:
312                    // not implemented. System ignores these messages completely at present
313                    break;
314            }
315        }
316
317        private void P2PPeerMaster_OnStatusChanged(IControl sender, bool readyForExecution)
318        {
319            if (OnStatusChanged != null)
320                OnStatusChanged(sender, readyForExecution);
321        }
322
323        #endregion
324
325        #region IP2PControl Members
326
327        public bool PeerStarted()
328        {
329            return _p2PPeer.PeerStarted();
330        }
331
332        public bool DHTstore(string sKey, byte[] byteValue)
333        {
334            return P2PManager.IsConnected && P2PManager.Store(sKey, byteValue).IsSuccessful();
335        }
336
337        public bool DHTstore(string sKey, string sValue)
338        {
339            return P2PManager.IsConnected && P2PManager.Store(sKey, sValue).IsSuccessful();
340        }
341
342        public byte[] DHTload(string sKey)
343        {
344            return P2PManager.IsConnected ? P2PManager.Retrieve(sKey).Data : null;
345        }
346
347        public bool DHTremove(string sKey)
348        {
349            return P2PManager.IsConnected && P2PManager.Remove(sKey).IsSuccessful();
350        }
351
352        /// <summary>
353        /// This method only contacts the p2p system, if the peerID wasn't requested before
354        /// </summary>
355        /// <param name="sPeerName">returns the Peer Name</param>
356        /// <returns>returns the Peer ID</returns>
357        public PeerId GetPeerID(out string sPeerName)
358        {
359            if (SystemJoinedCompletely())
360            {
361                if (_peerId == null)
362                {
363                    _peerId = P2PManager.P2PBase.GetPeerId(out _sPeerName);
364                }
365                sPeerName = _sPeerName;
366                return _peerId;
367            }
368
369            sPeerName = _sPeerName;
370            return null;
371        }
372
373        public PeerId GetPeerID(byte[] byteId)
374        {
375            return P2PManager.P2PBase.GetPeerId(byteId);
376        }
377
378        // adds the P2PMessageIndex to the given byte-array
379        public void SendToPeer(byte[] data, PeerId destinationAddress)
380        {
381            byte[] newData = GenerateMessage(data, P2PMessageIndex.Payload);
382            SendReadilyMessage(newData, destinationAddress);
383        }
384
385        public void SendToPeer(string sData, PeerId destinationAddress)
386        {
387            byte[] data = GenerateMessage(sData, P2PMessageIndex.Payload);
388            SendReadilyMessage(data, destinationAddress);
389        }
390
391        public void SendToPeer(PubSubMessageType msgType, PeerId destinationAddress)
392        {
393            byte[] data = GenerateMessage(msgType);
394            SendReadilyMessage(data, destinationAddress);
395        }
396
397        #endregion
398
399        #region Communication protocol
400
401        /// <summary>
402        /// generates a ct2- and p2p-compatible and processable message
403        /// </summary>
404        /// <param name="payload">payload data in bytes</param>
405        /// <param name="msgIndex">type of message (system message, simple payload for a special use case, etc.)</param>
406        /// <returns>the message, which is processable by the ct2/p2p system</returns>
407        private static byte[] GenerateMessage(byte[] payload, P2PMessageIndex msgIndex)
408        {
409            // first byte is the index, if it is payload or Publish/Subscriber stuff
410            var retByte = new byte[1 + payload.Length];
411            retByte[0] = (byte) msgIndex;
412            payload.CopyTo(retByte, 1);
413            return retByte;
414        }
415
416        /// <summary>
417        /// generates a ct2- and p2p-compatible and processable message
418        /// </summary>
419        /// <param name="sPayload">payload data as a string</param>
420        /// <param name="msgIndex">type of message (system message, simple payload for a special use case, etc.)</param>
421        /// <returns>the message, which is processable by the ct2/p2p system</returns>
422        private byte[] GenerateMessage(string sPayload, P2PMessageIndex msgIndex)
423        {
424            return GenerateMessage(_enc.GetBytes(sPayload), msgIndex);
425        }
426
427        /// <summary>
428        /// generates a ct2- and p2p-compatible and processable message
429        /// </summary>
430        /// <param name="pubSubData">PubSubMessageType</param>
431        /// <returns>the message, which is processable by the ct2/p2p system</returns>
432        private static byte[] GenerateMessage(PubSubMessageType pubSubData)
433        {
434            var bytePubSubData = new[] {(byte) pubSubData};
435            return GenerateMessage(bytePubSubData, P2PMessageIndex.PubSub);
436        }
437
438        /// <summary>
439        /// returns the message type, e.g. PubSub or Payload message
440        /// </summary>
441        /// <param name="msgType">the FIRST byte of a raw message, received by the system</param>
442        /// <returns>the message type</returns>
443        private static P2PMessageIndex GetMessageType(byte msgType)
444        {
445            return (P2PMessageIndex) msgType;
446        }
447
448        /// <summary>
449        /// returns only the payload part of the message
450        /// </summary>
451        /// <param name="message">the raw message, received by the system, as an byte array (with the first index byte!!!)</param>
452        /// <returns>only the payload part of the message</returns>
453        private static byte[] GetMessagePayload(byte[] message)
454        {
455            if (message.Length > 1)
456            {
457                var retMsg = new byte[message.Length - 1];
458                // workaround because CopyTo doesn't work...
459                //for (int i = 0; i < message.Length-1; i++)
460                //{
461                //    retMsg[i] = message[i + 1];
462                //}
463                Buffer.BlockCopy(message, 1, retMsg, 0, retMsg.Length);
464                return retMsg;
465            }
466            return null;
467        }
468
469        #endregion
470
471        /// <summary>
472        /// workaround method. If the PAP functions are used, but the PAP system isn't
473        /// started yet. This could happen because of the plugin hierarchy and
474        /// when a p2p-using plugin uses PAP functions in the PreExecution method,
475        /// this could run into a race condition (peer plugin not computed by the CT-system
476        /// yet, but p2p-using plugin is already executed)
477        /// </summary>
478        /// <returns></returns>
479        private static bool SystemJoinedCompletely()
480        {
481            return P2PManager.IsConnected && !P2PManager.IsConnecting;
482        }
483
484        private static void SendReadilyMessage(byte[] data, PeerId destinationAddress)
485        {
486            if (SystemJoinedCompletely())
487                P2PManager.P2PBase.SendToPeer(data, destinationAddress.ToByteArray());
488        }
489
490        /// <summary>
491        /// Converts a string to the PubSubMessageType if possible. Otherwise return null.
492        /// </summary>
493        /// <param name="data">Data</param>
494        /// <returns>PubSubMessageType if possible. Otherwise null.</returns>
495        private static PubSubMessageType GetPubSubType(byte data)
496        {
497            // Convert one byte data to PublishSubscribeMessageType-Enum
498            return (PubSubMessageType) data;
499        }
500    }
501}
Note: See TracBrowser for help on using the repository browser.