source: trunk/CrypP2P/Internal/P2PBase.cs @ 2438

Last change on this file since 2438 was 2438, checked in by Sven Rech, 11 years ago

fixed exception messages

File size: 22.5 KB
RevLine 
[1545]1/*
[1616]2   Copyright 2010 Paul Lelgemann and Christian Arnold,
3                  University of Duisburg-Essen
[783]4
5   Licensed under the Apache License, Version 2.0 (the "License");
6   you may not use this file except in compliance with the License.
7   You may obtain a copy of the License at
8
9       http://www.apache.org/licenses/LICENSE-2.0
10
11   Unless required by applicable law or agreed to in writing, software
12   distributed under the License is distributed on an "AS IS" BASIS,
13   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   See the License for the specific language governing permissions and
15   limitations under the License.
16*/
17
18using System;
[1704]19using System.Linq;
[783]20using System.Text;
[1545]21using System.Threading;
[1579]22using Cryptool.PluginBase;
[1545]23using Cryptool.Plugins.PeerToPeer.Internal;
24using Gears4Net;
25using PeersAtPlay;
26using PeersAtPlay.Monitoring;
27using PeersAtPlay.P2PLink;
28using PeersAtPlay.P2PLink.SnalNG;
29using PeersAtPlay.P2POverlay;
[783]30using PeersAtPlay.P2POverlay.Bootstrapper;
[2269]31using PeersAtPlay.P2POverlay.Bootstrapper.DnsBootstrapper;
[1579]32using PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2;
[783]33using PeersAtPlay.P2POverlay.Bootstrapper.LocalMachineBootstrapper;
34using PeersAtPlay.P2POverlay.FullMeshOverlay;
[1545]35using PeersAtPlay.P2PStorage.DHT;
36using PeersAtPlay.P2PStorage.FullMeshDHT;
[1909]37using PeersAtPlay.PapsClient;
[1107]38using PeersAtPlay.Util.Logging;
[1672]39using PeersAtPlay.P2POverlay.Chord;
[2154]40using PeersAtPlay.P2PStorage.WebDHT;
[2379]41using System.Security.Cryptography;
[783]42
[1170]43/* TODO:
[1115]44 * - Delete UseNatTraversal-Flag and insert CertificateCheck and CertificateSetup
[794]45 * - Testing asynchronous methods incl. EventHandlers
[783]46 */
[1545]47
[1374]48namespace Cryptool.P2P.Internal
[783]49{
50    /// <summary>
[1545]51    ///   Wrapper class to integrate peer@play environment into CrypTool.
52    ///   This class synchronizes asynchronous methods for easier usage in CT2.
[783]53    /// </summary>
54    public class P2PBase
55    {
[1545]56        #region Variables
[783]57
[1919]58        private AutoResetEvent systemJoined;
[1698]59        private readonly AutoResetEvent systemLeft;
60        private IBootstrapper bootstrapper;
61        private IP2PLinkManager linkmanager;
62        private P2POverlay overlay;
[1579]63        internal IDHT Dht;
64        internal IVersionedDHT VersionedDht;
[783]65
66        /// <summary>
[1545]67        ///   True if system was successfully joined, false if system is COMPLETELY left
[783]68        /// </summary>
[1616]69        public bool IsConnected { get; private set; }
[783]70
71        /// <summary>
[1545]72        ///   True if the underlying peer to peer system has been fully initialized
[783]73        /// </summary>
[1616]74        public bool IsInitialized { get; private set; }
[783]75
76        #endregion
77
[1545]78        #region Delegates
[783]79
[1545]80        public event P2PMessageReceived OnP2PMessageReceived;
81        public delegate void P2PMessageReceived(PeerId sourceAddr, byte[] data);
[1107]82
[1545]83        public event SystemJoined OnSystemJoined;
84        public delegate void SystemJoined();
[1107]85
[1545]86        public event SystemLeft OnSystemLeft;
87        public delegate void SystemLeft();
[862]88
[783]89        #endregion
90
[1616]91        public P2PBase()
[783]92        {
[1616]93            IsConnected = false;
94            IsInitialized = false;
[1545]95
[1698]96            systemJoined = new AutoResetEvent(false);
97            systemLeft = new AutoResetEvent(false);
[783]98        }
99
[1545]100        #region Basic P2P Methods (Init, Start, Stop)
[783]101
102        /// <summary>
[1545]103        ///   Initializes the underlying peer-to-peer system with settings configured in P2PSettings. This step is required in order to be able to establish a connection.
[783]104        /// </summary>
[1545]105        public void Initialize()
[813]106        {
[2020]107            Scheduler scheduler = new STAScheduler("pap_snal");
108            Scheduler scheduler_2 = new STAScheduler("pap_mysql");
[1616]109
110            switch (P2PSettings.Default.LinkManager)
[813]111            {
[1616]112                case P2PLinkManagerType.Snal:
113                    LogToMonitor("Init LinkMgr: Using NAT Traversal stuff");
[1545]114
[1616]115                    // NAT-Traversal stuff needs a different Snal-Version
[1698]116                    linkmanager = new Snal(scheduler);
[1261]117
[1616]118                    var settings = new PeersAtPlay.P2PLink.SnalNG.Settings();
119                    settings.LoadDefaults();
120                    settings.ConnectInternal = true;
121                    settings.LocalReceivingPort = P2PSettings.Default.LocalReceivingPort;
122                    settings.UseLocalAddressDetection = P2PSettings.Default.UseLocalAddressDetection;
123                    settings.NoDelay = false;
124                    settings.ReuseAddress = false;
125                    settings.UseNetworkMonitorServer = true;
[1702]126                    settings.CloseConnectionAfterPingTimeout = true;
[1645]127
[1672]128                    settings.FragmentMessages = true;
129                    settings.FragmentMessageSize = 10*1024;
[1261]130
[1698]131                    linkmanager.Settings = settings;
132                    linkmanager.ApplicationType = ApplicationType.CrypTool;
[1545]133
[1616]134                    break;
135                default:
[1909]136                    throw new NotImplementedException();
[1616]137            }
[1115]138
[1616]139            switch (P2PSettings.Default.Bootstrapper)
140            {
141                case P2PBootstrapperType.LocalMachineBootstrapper:
142                    // LocalMachineBootstrapper = only local connection (runs only on one machine)
[1698]143                    bootstrapper = new LocalMachineBootstrapper();
[1616]144                    break;
145                case P2PBootstrapperType.IrcBootstrapper:
146                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.DelaySymmetricResponse = true;
147                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.IncludeSymmetricResponse = false;
[1643]148                    PeersAtPlay.P2POverlay.Bootstrapper.IrcBootstrapperV2.Settings.UsePeerCache = false;
[1545]149
[1698]150                    bootstrapper = new IrcBootstrapper(scheduler);
[1616]151                    break;
[2269]152                case P2PBootstrapperType.DnsBootstrapper:
153                    bootstrapper = new DnsBootstrapper();
154                    break;
[1616]155                default:
[1909]156                    throw new NotImplementedException();
[1616]157            }
[813]158
[1909]159            try
[1616]160            {
[1909]161                switch (P2PSettings.Default.Architecture)
162                {
163                    case P2PArchitecture.FullMesh:
164                        overlay = new FullMeshOverlay(scheduler);
165                        Dht = new FullMeshDHT(scheduler);
166                        break;
167                    case P2PArchitecture.Chord:
168                        overlay = new ChordNGCore(scheduler);
169                        Dht = (IDHT) overlay;
170                        break;
171                    case P2PArchitecture.Server:
[1914]172                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerHost = P2PSettings.Default.ServerHost;
[1909]173                        PeersAtPlay.PapsClient.Properties.Settings.Default.ServerPort = P2PSettings.Default.ServerPort;
174                        bootstrapper = new LocalMachineBootstrapper();
175                        overlay = new PapsClientOverlay();
[2020]176                        Dht = new PapsClientDht(scheduler_2);
[1909]177                        break;
[2154]178                    case P2PArchitecture.WebDHT:
179                        Dht = new WebDHT(scheduler_2);
180                        break;
[1909]181                    default:
182                        throw new NotImplementedException();
183                }
[1616]184            }
[1909]185            catch(Exception e)
186            {
187                P2PManager.GuiLogMessage("Error initializing P2P network: " + e.Message, NotificationLevel.Error);
188                return;
189            }
[1545]190
[2020]191            if (overlay != null)
192            {
193                overlay.MessageReceived += OverlayMessageReceived;
194            }
[1616]195            Dht.SystemJoined += OnDhtSystemJoined;
196            Dht.SystemLeft += OnDhtSystemLeft;
[1545]197
[1616]198            VersionedDht = (IVersionedDHT) Dht;
[1589]199
[1616]200            P2PManager.GuiLogMessage("Initializing DHT with world name " + P2PSettings.Default.WorldName,
[1909]201                                        NotificationLevel.Info);
202            IsInitialized = true;
[2379]203            Dht.Initialize(P2PSettings.Default.PeerName, DecryptString(P2PSettings.Default.Password), P2PSettings.Default.WorldName, overlay,
[1919]204                           bootstrapper, linkmanager, null);
[813]205        }
206
207        /// <summary>
[1545]208        ///   Starts the P2P System. When the given P2P world doesn't exist yet,
209        ///   inclusive creating the and bootstrapping to the P2P network.
210        ///   In either case joining the P2P world.
211        ///   This synchronized method returns true not before the peer has
212        ///   successfully joined the network (this may take one or two minutes).
[783]213        /// </summary>
[1545]214        /// <exception cref = "InvalidOperationException">When the peer-to-peer system has not been initialized.
215        /// After validating the settings, this can be done by calling Initialize().</exception>
[783]216        /// <returns>True, if the peer has completely joined the p2p network</returns>
217        public bool SynchStart()
218        {
[1616]219            if (!IsInitialized)
[1545]220            {
[1616]221                throw new InvalidOperationException("Peer-to-peer is not initialized.");
222            }
[783]223
[1616]224            if (IsConnected)
225            {
226                return true;
227            }
[928]228
[1909]229            try
230            {
231                Dht.BeginStart(BeginStartEventHandler);
[1545]232
[1909]233                // Wait for event SystemJoined. When it's invoked, the peer completely joined the P2P system
234                systemJoined.WaitOne();
235                P2PManager.GuiLogMessage("System join process ended.", NotificationLevel.Debug);
236            }
237            catch (Exception e)
238            {
239                e.GetBaseException();
240            }
[1545]241
[1616]242            return true;
[783]243        }
244
[1589]245        private void BeginStartEventHandler(DHTEventArgs eventArgs)
246        {
[1616]247            P2PManager.GuiLogMessage("Received DHTEventArgs: " + eventArgs + ", state: " + eventArgs.State, NotificationLevel.Debug);
[1589]248        }
249
[783]250        /// <summary>
[1545]251        ///   Disconnects from the peer-to-peer system.
[783]252        /// </summary>
[1545]253        /// <returns>True, if the peer has completely left the p2p network</returns>
254        public bool SynchStop()
[783]255        {
[1616]256            if (Dht == null) return false;
[783]257
[1616]258            Dht.BeginStop(null);
[1545]259
[1919]260            if (IsConnected)
261                systemLeft.WaitOne();
[1616]262
[1919]263            systemJoined.Reset();
264            systemLeft.Reset();
[1616]265
266            return true;
[783]267        }
268
269        #endregion
270
[1545]271        #region Peer related method (GetPeerId, Send message to peer)
272
[813]273        /// <summary>
[1545]274        ///   Get PeerName of the actual peer
[813]275        /// </summary>
[1545]276        /// <param name = "sPeerName">out: additional peer information UserName on LinkManager</param>
[862]277        /// <returns>PeerID as a String</returns>
[1545]278        public PeerId GetPeerId(out string sPeerName)
[813]279        {
[1698]280            sPeerName = linkmanager.UserName;
281            return new PeerId(overlay.LocalAddress);
[813]282        }
283
[1022]284        /// <summary>
[1545]285        ///   Construct PeerId object for a specific byte[] id
[1022]286        /// </summary>
[1545]287        /// <param name = "byteId">overlay address as byte array</param>
[1022]288        /// <returns>corresponding PeerId for given byte[] id</returns>
[1545]289        public PeerId GetPeerId(byte[] byteId)
[1022]290        {
[1107]291            LogToMonitor("GetPeerID: Converting byte[] to PeerId-Object");
[1698]292            return new PeerId(overlay.GetAddress(byteId));
[1022]293        }
294
[836]295        // overlay.LocalAddress = Overlay-Peer-Address/Names
[1067]296        public void SendToPeer(byte[] data, byte[] destinationPeer)
[836]297        {
[1067]298            // get stack size of the pap use-data and add own use data (for optimizing Stack size)
[1698]299            var realStackSize = overlay.GetHeaderSize() + data.Length;
[836]300
[1545]301            var stackData = new ByteStack(realStackSize);
[1067]302            stackData.Push(data);
[836]303
[1698]304            var destinationAddr = overlay.GetAddress(destinationPeer);
[1545]305            var overlayMsg = new OverlayMessage(MessageReceiverType.P2PBase, 
[1698]306                                                overlay.LocalAddress, destinationAddr, stackData);
[1545]307
[1698]308            overlay.Send(overlayMsg);
[836]309        }
310
[1545]311        private void OverlayMessageReceived(object sender, OverlayMessageEventArgs e)
[980]312        {
[1545]313            if (OnP2PMessageReceived == null) return;
314
315            var pid = new PeerId(e.Message.Source);
316            /* You have to fire this event asynchronous, because the main
[1107]317                 * thread will be stopped in this wrapper class for synchronizing
318                 * the asynchronous stuff (AutoResetEvent) --> so this could run
319                 * into a deadlock, when you fire this event synchronous (normal Invoke)
320                 * ATTENTION: This could change the invocation order!!! In my case
321                              no problem, but maybe in future cases... */
[1436]322
[1545]323            // TODO: not safe: The delegate must have only one target
324            // OnP2PMessageReceived.BeginInvoke(pid, e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize), null, null);
[1436]325
[1545]326            foreach (var del in OnP2PMessageReceived.GetInvocationList())
327            {
[1919]328                var data = e.Message.Data.ToArray();
329                if (e.Message.Data.CurrentStackSize != 0)
330                    data = e.Message.Data.PopBytes(e.Message.Data.CurrentStackSize);
331
332                del.DynamicInvoke(pid, data);
[980]333            }
334        }
335
[1545]336        #endregion
337
[783]338        #region Event Handling (System Joined, Left and Message Received)
339
[1545]340        private void OnDhtSystemJoined(object sender, EventArgs e)
[783]341        {
[1909]342            IsConnected = true;
343
[783]344            if (OnSystemJoined != null)
345                OnSystemJoined();
[1545]346
[1698]347            systemJoined.Set();
[783]348        }
349
[1545]350        private void OnDhtSystemLeft(object sender, SystemLeftEventArgs e)
[783]351        {
[1616]352            IsConnected = false;
353            IsInitialized = false;
[1107]354
[1549]355            // Allow new connection to start and check for waiting / blocked tasks
[1616]356            // TODO reset running ConnectionWorkers?
[1698]357            systemLeft.Set();
358            systemJoined.Set();
[1549]359
[1545]360            LogToMonitor("CrypP2P left the system.");
[2081]361
362            if (OnSystemLeft != null)
363                OnSystemLeft();
[783]364        }
365
366        #endregion
367
[1545]368        #region Synchronous Methods + their Callbacks
[783]369
370        /// <summary>
[1545]371        ///   Stores a value in the DHT at the given key
[783]372        /// </summary>
[1545]373        /// <param name = "key">Key of DHT Entry</param>
[1665]374        /// <param name = "value">Value of DHT Entry</param>
375        /// <returns>True, when storing is completed!</returns>
376        public RequestResult SynchStore(string key, string value)
377        {
378            return SynchStore(key, Encoding.UTF8.GetBytes(value));
379        }
380
381        /// <summary>
382        ///   Stores a value in the DHT at the given key
383        /// </summary>
384        /// <param name = "key">Key of DHT Entry</param>
[1545]385        /// <param name = "data">Value of DHT Entry</param>
386        /// <returns>True, when storing is completed!</returns>
[1665]387        public RequestResult SynchStore(string key, byte[] data)
[783]388        {
[1545]389            var autoResetEvent = new AutoResetEvent(false);
[783]390
[1545]391            // LogToMonitor("testcrash" + Encoding.UTF8.GetString(new byte[5000]));
[1919]392            LogToMonitor("Begin: SynchStore. Key: " + key + ", " + (data != null ? data.Length : 0) + " bytes");
[783]393
[1665]394            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key, Data = data};
395            VersionedDht.Store(OnSynchStoreCompleted, key, data, requestResult);
[1374]396
[1545]397            // blocking till response
[2437]398            bool success = autoResetEvent.WaitOne(1000*60*2);
399            if (!success)
400            {
401                if (!IsConnected)
402                    throw new NotConnectedException();
403                else
404                    throw new InvalidOperationException("SynchStore failed for some reason!");
405            }
[1665]406            LogToMonitor("End: SynchStore. Key: " + key + ". Status: " + requestResult.Status);
[1545]407
[1665]408            return requestResult;
[783]409        }
410
411        /// <summary>
[1545]412        ///   Callback for a the synchronized store method
[836]413        /// </summary>
[1545]414        /// <param name = "storeResult">retrieved data container</param>
415        private static void OnSynchStoreCompleted(StoreResult storeResult)
[836]416        {
[1665]417            var requestResult = storeResult.AsyncState as RequestResult;
418            if (requestResult == null)
[862]419            {
[1665]420                LogToMonitor("Received OnSynchStoreCompleted, but RequestResult object is missing. Discarding.");
[1545]421                return;
422            }
[836]423
[1665]424            requestResult.Parse(storeResult);
[862]425
[1545]426            // unblock WaitHandle in the synchronous method
[1665]427            requestResult.WaitHandle.Set();
[862]428        }
429
[836]430        /// <summary>
[1545]431        ///   Get the value of the given DHT Key or null, if it doesn't exist.
[783]432        /// </summary>
[1545]433        /// <param name = "key">Key of DHT Entry</param>
[783]434        /// <returns>Value of DHT Entry</returns>
[1665]435        public RequestResult SynchRetrieve(string key)
[783]436        {
[1545]437            LogToMonitor("Begin: SynchRetrieve. Key: " + key);
[1107]438
[1545]439            var autoResetEvent = new AutoResetEvent(false);
[1914]440            var requestResult = new RequestResult {WaitHandle = autoResetEvent, Key = key};
441
[1665]442            Dht.Retrieve(OnSynchRetrieveCompleted, key, requestResult);
[1107]443
[783]444            // blocking till response
[2437]445            bool success = autoResetEvent.WaitOne(1000 * 60 * 2);
446            if (!success)
447            {
448                if (!IsConnected)
449                    throw new NotConnectedException();
450                else
[2438]451                    throw new InvalidOperationException("SynchRetrieve failed for some reason!");
[2437]452            }
[1107]453
[1665]454            LogToMonitor("End: SynchRetrieve. Key: " + key + ". Status: " + requestResult.Status);
[1107]455
[1665]456            return requestResult;
[783]457        }
458
459        /// <summary>
[1545]460        ///   Callback for a the synchronized retrieval method
[862]461        /// </summary>
[1545]462        /// <param name = "retrieveResult"></param>
463        private static void OnSynchRetrieveCompleted(RetrieveResult retrieveResult)
[862]464        {
[1665]465            var requestResult = retrieveResult.AsyncState as RequestResult;
466            if (requestResult == null)
[1545]467            {
[1665]468                LogToMonitor("Received OnSynchRetrieveCompleted, but RequestResult object is missing. Discarding.");
[1545]469                return;
470            }
[1374]471
[1665]472            requestResult.Parse(retrieveResult);
[862]473
[1545]474            // unblock WaitHandle in the synchronous method
[1665]475            requestResult.WaitHandle.Set();
[862]476        }
[1545]477
[862]478        /// <summary>
[1545]479        ///   Removes a key/value pair out of the DHT
[783]480        /// </summary>
[1545]481        /// <param name = "key">Key of the DHT Entry</param>
[783]482        /// <returns>True, when removing is completed!</returns>
[1665]483        public RequestResult SynchRemove(string key)
[783]484        {
[1545]485            LogToMonitor("Begin SynchRemove. Key: " + key);
[1107]486
[1545]487            var autoResetEvent = new AutoResetEvent(false);
[1665]488            var requestResult = new RequestResult { WaitHandle = autoResetEvent, Key = key };
489            VersionedDht.Remove(OnSynchRemoveCompleted, key, requestResult);
[794]490
[783]491            // blocking till response
[2437]492            bool success = autoResetEvent.WaitOne(1000 * 60 * 2);
493            if (!success)
494            {
495                if (!IsConnected)
496                    throw new NotConnectedException();
497                else
[2438]498                    throw new InvalidOperationException("SynchRemove failed for some reason!");
[2437]499            }
[1107]500
[1665]501            LogToMonitor("End: SynchRemove. Key: " + key + ". Status: " + requestResult.Status);
[1107]502
[1665]503            return requestResult;
[783]504        }
505
506        /// <summary>
[1545]507        ///   Callback for a the synchronized remove method
[783]508        /// </summary>
[1545]509        /// <param name = "removeResult"></param>
510        private static void OnSynchRemoveCompleted(RemoveResult removeResult)
[783]511        {
[1665]512            var requestResult = removeResult.AsyncState as RequestResult;
513            if (requestResult == null)
[783]514            {
[1665]515                LogToMonitor("Received OnSynchRemoveCompleted, but RequestResult object is missing. Discarding.");
[1545]516                return;
517            }
[783]518
[1665]519            requestResult.Parse(removeResult);
[862]520
[1545]521            // unblock WaitHandle in the synchronous method
[1665]522            requestResult.WaitHandle.Set();
[783]523        }
524
525        #endregion
526
[1704]527        #region Statistic Methods
528
529        public long TotalBytesSentOnAllLinks()
530        {
531            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesSent);
532        }
533
534        public long TotalBytesReceivedOnAllLinks()
535        {
536            return (long) linkmanager.GetAllLinkInformation().Sum(linkInformation => linkInformation.TotalBytesReceived);
537        }
538
539        #endregion
540
[1545]541        #region Log facility
542
[783]543        /// <summary>
[1545]544        ///   To log the internal state in the Monitoring Software of P@play
[783]545        /// </summary>
546        public void LogInternalState()
547        {
[1579]548            if (Dht != null)
[783]549            {
[1579]550                Dht.LogInternalState();
[783]551            }
552        }
[1107]553
[1545]554        private static void LogToMonitor(string sTextToLog)
[1107]555        {
[1545]556            if (P2PSettings.Default.Log2Monitor)
[1107]557                Log.Debug(sTextToLog);
558        }
559
[2379]560        /// <summary>
561        /// Encrypts the given string using the current windows user password and converts
562        /// this to a base64 string
563        /// </summary>
564        /// <param name="s"></param>
565        /// <returns>encrypted base64 string</returns>
566        public static string EncryptString(string s)
567        {
568            byte[] bytes = Encoding.Unicode.GetBytes(s);
569            byte[] encBytes = ProtectedData.Protect(bytes, null, DataProtectionScope.CurrentUser);
570            return Convert.ToBase64String(encBytes);
571        }
572
573        /// <summary>
574        /// Decrypts the given base64 string using the current windows user password
575        /// </summary>
576        /// <param name="s"></param>
577        /// <returns>decrypted string</returns>
578        public static string DecryptString(string s)
579        {
580            if (string.IsNullOrEmpty(s))
581            {
582                return "";
583            }
584            try
585            {
586                byte[] encBytes = Convert.FromBase64String(s);
587                byte[] bytes = ProtectedData.Unprotect(encBytes, null, DataProtectionScope.CurrentUser);
588                return Encoding.Unicode.GetString(bytes);
589            }
590            catch (Exception)
591            {
592                return "";
593            }
594        }
595
596
[1545]597        #endregion
[783]598    }
[1545]599}
Note: See TracBrowser for help on using the repository browser.