source: trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs @ 1678

Last change on this file since 1678 was 1678, checked in by Sven Rech, 12 years ago

quadratic sieve changes

File size: 24.5 KB
Line 
1/*                             
2   Copyright 2010 Sven Rech, Uni Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21using System.IO.Compression;
22using System.IO;
23using Cryptool.P2P;
24using System.Numerics;
25using System.Diagnostics;
26using System.Collections;
27using System.Threading;
28using System.Windows.Threading;
29using System.Runtime.Serialization.Formatters.Binary;
30using System.Management;
31using System.Security.Principal;
32using System.Security.Cryptography;
33
34namespace Cryptool.Plugins.QuadraticSieve
35{
36    class PeerToPeer
37    {
38        /// <summary>
39        /// This structure is used to hold all important peer performance and alive informations in a queue
40        /// </summary>
41        private struct PeerPerformanceInformations
42        {
43            public DateTime lastChecked;
44            public int peerID;
45            public double performance;
46            public int lastAlive;           
47
48            public PeerPerformanceInformations(DateTime lastChecked, int peerID, double performance, int lastAlive)
49            {
50                this.lastAlive = lastAlive;
51                this.lastChecked = lastChecked;
52                this.peerID = peerID;
53                this.performance = performance;
54            }
55        }
56
57        private string channel;
58        private BigInteger number;
59        private BigInteger factor;
60        private int head;
61        private Queue storequeue;   //yields to store in the DHT
62        private Queue loadqueue;    //yields that have been loaded from the DHT
63        private Thread loadStoreThread;
64        private bool stopLoadStoreThread;
65        private QuadraticSievePresentation quadraticSieveQuickWatchPresentation;
66        private AutoResetEvent yieldEvent;
67        private int ourID;           //Our ID
68        private Dictionary<int, string> nameCache;  //associates the ids with the names
69        private Queue<PeerPerformanceInformations> peerPerformances;      //A queue of performances from the different peers ordered by the date last checked.
70        private HashSet<int> activePeers;                                 //A set of active peers
71        private double ourPerformance = 0;
72        private int aliveCounter = 0;       //This is stored together with the performance in the DHT
73        private string ourName;
74        private uint downloaded = 0;
75        private uint uploaded = 0;
76
77        public delegate void P2PWarningHandler(String warning);
78        public event P2PWarningHandler P2PWarning;       
79
80        public PeerToPeer(QuadraticSievePresentation presentation, AutoResetEvent yieldEvent)
81        {
82            quadraticSieveQuickWatchPresentation = presentation;
83            this.yieldEvent = yieldEvent;
84            SetOurID();
85        }
86
87        private void SetOurID()
88        {
89            string username = WindowsIdentity.GetCurrent().Name;
90            string mac = GetMac();
91
92            MD5 md5 = new MD5CryptoServiceProvider();
93            byte[] idBytes = md5.ComputeHash(System.Text.ASCIIEncoding.ASCII.GetBytes(username + mac));           
94           
95            //Is it really a good idea to calculate the ID like this?
96            for (int c = 0; c < idBytes.Length; c++)           
97                ourID = ourID + (idBytes[c] << c);
98
99            quadraticSieveQuickWatchPresentation.ProgressYields.setOurID(ourID);
100
101            ourName = System.Net.Dns.GetHostName();
102        }
103
104        /// <summary>
105        /// Reads yield at position "index" from DHT and returns the ownerID and the decompressed packet itself.
106        /// </summary>
107        private byte[] ReadYield(int index, out int ownerID)
108        {
109            ownerID = 0;
110            byte[] yield = P2PManager.Retrieve(YieldIdentifier(index)).Data;
111            if (yield == null)
112                return null;
113            downloaded += (uint)yield.Length;
114
115            byte[] decompressedYield = DecompressYield(yield);
116
117            byte[] idbytes = new byte[4];
118            Array.Copy(decompressedYield, idbytes, 4);
119            ownerID = BitConverter.ToInt32(idbytes, 0);
120            byte[] y = new byte[decompressedYield.Length - 4];
121            Array.Copy(decompressedYield, 4, y, 0, y.Length);
122           
123            return y;
124        }
125
126        private static byte[] DecompressYield(byte[] yield)
127        {
128            MemoryStream memStream = new MemoryStream();
129            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Decompress);
130            memStream.Write(yield, 0, yield.Length);
131            memStream.Position = 0;
132            MemoryStream memStream2 = new MemoryStream();
133            defStream.CopyTo(memStream2);
134            defStream.Close();
135            byte[] decompressedYield = memStream2.ToArray();
136            return decompressedYield;
137        }
138
139        private bool ReadAndEnqueueYield(int loadIndex, bool checkAlive)
140        {
141            int ownerID;
142            byte[] yield = ReadYield(loadIndex, out ownerID);
143            if (yield != null)
144            {
145                ShowTransfered(downloaded, uploaded);
146                loadqueue.Enqueue(yield);
147
148                string name = null;
149
150                if (ownerID != ourID)
151                {
152                    //Progress Yield:
153                    if (!nameCache.ContainsKey(ownerID))
154                    {
155                        byte[] n = P2PManager.Retrieve(NameIdentifier(ownerID)).Data;
156                        if (n != null)
157                            nameCache.Add(ownerID, System.Text.ASCIIEncoding.ASCII.GetString(n));
158                    }
159                    if (nameCache.ContainsKey(ownerID))
160                        name = nameCache[ownerID];
161                                       
162                    if (checkAlive && !activePeers.Contains(ownerID))
163                    {
164                        //check performance and alive informations:
165                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(ownerID)).Data;
166                        if (performancebytes != null)
167                        {
168                            double performance = BitConverter.ToDouble(performancebytes, 0);
169                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
170                            peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, ownerID, performance, peerAliveCounter));
171                        }
172                        activePeers.Add(ownerID);
173                        UpdateActivePeerInformation();
174                    }
175                }
176
177                //Set progress info:
178                SetProgressYield(loadIndex, ownerID, name);
179
180                yieldEvent.Set();
181                return true;
182            }
183            return false;
184        }
185
186        /// <summary>
187        /// Tries to read and enqueue yield on position "loadIndex".
188        /// If it fails, it stores the index in lostIndices queue.
189        /// </summary>
190        private void TryReadAndEnqueueYield(int index, bool checkAlive, Queue<KeyValuePair<int, DateTime>> lostIndices)
191        {
192            bool succ = ReadAndEnqueueYield(index, checkAlive);
193            if (!succ)               
194            {
195                var e = new KeyValuePair<int, DateTime>(index, DateTime.Now);
196                lostIndices.Enqueue(e);
197                SetProgressYield(index, -1, null);
198            }
199        }
200
201        private void LoadStoreThreadProc()
202        {
203            int loadIndex = 0;
204            downloaded = 0;
205            uploaded = 0;
206            HashSet<int> ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
207            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
208            Queue<KeyValuePair<int, DateTime>> lostIndices = new Queue<KeyValuePair<int, DateTime>>();
209            double lastPerformance = 0;
210            DateTime performanceLastPut = new DateTime();
211
212            try
213            {
214                SynchronizeHead();
215                int startHead = head;
216
217                while (!stopLoadStoreThread)
218                {
219                    //Store our performance and our alive counter in the DHT, either if the performance changed or when the last write was more than 1 minute ago:
220                    if (ourPerformance != lastPerformance || performanceLastPut.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 0))) < 0)
221                    {
222                        P2PManager.Retrieve(PerformanceIdentifier(ourID));      //just to outsmart the versioning system
223                        P2PManager.Store(PerformanceIdentifier(ourID), concat(BitConverter.GetBytes(ourPerformance), BitConverter.GetBytes(aliveCounter++)));
224                        lastPerformance = ourPerformance;
225                        performanceLastPut = DateTime.Now;
226                    }
227
228                    //updates all peer performances which have last been checked more than 1 minutes and 20 seconds ago and check if they are still alive:
229                    while (peerPerformances.Count != 0 && peerPerformances.Peek().lastChecked.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 20))) < 0)
230                    {
231                        var e = peerPerformances.Dequeue();
232
233                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(e.peerID)).Data;
234                        if (performancebytes != null)
235                        {
236                            double performance = BitConverter.ToDouble(performancebytes, 0);
237                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
238                            if (peerAliveCounter <= e.lastAlive)
239                            {
240                                activePeers.Remove(e.peerID);
241                                UpdateActivePeerInformation();
242                            }
243                            else
244                                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, e.peerID, performance, peerAliveCounter));
245                        }
246                        else
247                        {
248                            activePeers.Remove(e.peerID);
249                            UpdateActivePeerInformation();
250                        }                       
251                    }
252
253                    SynchronizeHead();
254
255                    bool busy = false;
256
257                    if (storequeue.Count != 0)  //store our packages
258                    {
259                        byte[] yield = (byte[])storequeue.Dequeue();
260                        bool success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
261                        while (!success)
262                        {
263                            SynchronizeHead();
264                            success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
265                        }
266                        SetProgressYield(head, ourID, null);
267                        ourIndices.Add(head);
268                       
269                        head++;
270
271                        //show informations about the uploaded yield:
272                        uploaded += (uint)yield.Length;
273                        ShowTransfered(downloaded, uploaded);
274                        busy = true;
275                    }
276
277                    //load the other yields:
278
279                    //skip all indices which are uploaded by us:
280                    while (ourIndices.Contains(loadIndex))
281                        loadIndex++;
282
283                    if (loadIndex < head)
284                    {
285                        bool checkAlive = loadIndex >= startHead;       //we don't check the peer alive informations of the packages that existed before we joined
286                        TryReadAndEnqueueYield(loadIndex, checkAlive, lostIndices);
287                        loadIndex++;
288                        busy = true;
289                    }
290                   
291                    //check all lost indices which are last checked longer than 2 minutes ago (but only if we have nothing else to do):
292                    if (!busy)
293                    {
294                        int count = 0;
295                        //TODO: Maybe we should throw away those indices, which have been checked more than several times.
296                        while (lostIndices.Count != 0 && lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
297                        {
298                            var e = lostIndices.Dequeue();
299                            TryReadAndEnqueueYield(loadIndex, true, lostIndices);
300                            count++;
301                        }
302
303                        if (count == 0)
304                            Thread.Sleep(5000);    //Wait 5 seconds
305                    }
306
307                }
308            }
309            catch (ThreadInterruptedException)
310            {
311                return;
312            }
313        }
314
315        private void UpdateActivePeerInformation()
316        {
317            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
318            {
319                quadraticSieveQuickWatchPresentation.amountOfPeers.Content = "" + activePeers.Count + " other peer" + (activePeers.Count!=1 ? "s" : "") + " active!";
320            }, null);
321        }
322
323        /// <summary>
324        /// Loads head from DHT. If ours is greater (or there is no entry yet), we store ours.
325        /// </summary>
326        private void SynchronizeHead()
327        {
328            byte[] h = P2PManager.Retrieve(HeadIdentifier()).Data;
329            if (h != null)
330            {
331                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
332                if (head > dhthead)
333                {
334                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
335                    if (!success)
336                        SynchronizeHead();
337                }
338                else if (head < dhthead)
339                {
340                    head = dhthead;
341                    if (head != 0)
342                        SetProgressYield(head - 1, 0, null);
343                }
344            }
345            else
346            {
347                bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
348                if (!success)
349                    SynchronizeHead();
350            }           
351        }
352
353        private void ShowTransfered(uint downloaded, uint uploaded)
354        {
355            string s1 = ((downloaded / 1024.0) / 1024).ToString();
356            string size1 = s1.Substring(0, (s1.Length < 3) ? s1.Length : 3);
357            string s2 = ((uploaded / 1024.0) / 1024).ToString();
358            string size2 = s2.Substring(0, (s2.Length < 3) ? s2.Length : 3);
359            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
360            {
361                quadraticSieveQuickWatchPresentation.relationsInfo.Content = "Downloaded " + size1 + " MB! Uploaded " + size2 + " MB!";
362            }, null);
363        }
364
365        private void SetProgressYield(int index, int id, string name)
366        {
367            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
368            {
369                quadraticSieveQuickWatchPresentation.ProgressYields.Set(index, id, name);
370            }, null);
371        }
372
373        private void ClearProgressYields()
374        {
375            quadraticSieveQuickWatchPresentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
376            {
377                quadraticSieveQuickWatchPresentation.ProgressYields.Clear();
378            }, null);
379        }
380
381        private string HeadIdentifier()
382        {
383            return channel + "#" + number + "-" + factor + "HEAD";
384        }
385
386        private string FactorListIdentifier()
387        {
388            return channel + "#" + number + "FACTORLIST";
389        }
390
391        private string YieldIdentifier(int index)
392        {
393            return channel + "#" + number + "-" + factor + "!" + index;
394        }
395
396        private string NameIdentifier(int ID)
397        {
398            return channel + "#" + number + "NAME" + ID.ToString();
399        }
400
401        private string PerformanceIdentifier(int ID)
402        {
403            return channel + "#" + number + "-" + factor + "PERFORMANCE" + ID.ToString();
404        }   
405
406        private void StartLoadStoreThread()
407        {
408            storequeue = Queue.Synchronized(new Queue());
409            loadqueue = Queue.Synchronized(new Queue());
410
411            stopLoadStoreThread = false;
412            loadStoreThread = new Thread(LoadStoreThreadProc);
413            loadStoreThread.Start();
414        }
415
416        public void StopLoadStoreThread()
417        {
418            stopLoadStoreThread = true;
419            loadStoreThread.Interrupt();
420            loadStoreThread.Join();
421            loadStoreThread = null;
422        }
423
424        /// <summary>
425        /// Concatenates the two byte arrays a1 and a2 an returns the result array.
426        /// </summary>
427        private byte[] concat(byte[] a1, byte[] a2)
428        {
429            byte[] res = new byte[a1.Length + a2.Length];
430            System.Buffer.BlockCopy(a1, 0, res, 0, a1.Length);
431            System.Buffer.BlockCopy(a2, 0, res, a1.Length, a2.Length);
432            return res;
433        }
434       
435        /// <summary>
436        /// Returns our MAC address
437        /// </summary>       
438        private string GetMac()
439        {
440            string Mac = string.Empty;
441            ManagementClass MC = new ManagementClass("Win32_NetworkAdapter");
442            ManagementObjectCollection MOCol = MC.GetInstances();
443            foreach (ManagementObject MO in MOCol)
444                if (MO != null)
445                {
446                    if (MO["MacAddress"] != null)
447                    {
448                        Mac = MO["MACAddress"].ToString();
449                        if (Mac != string.Empty)
450                            break;
451                    }
452                }
453            return Mac;
454        }
455
456        public Queue GetLoadedYieldsQueue()
457        {
458            return loadqueue;
459        }
460
461        /// <summary>
462        /// Compresses the yield and puts it in the DHT.
463        /// </summary>
464        public void Put(byte[] serializedYield)
465        {
466            //Add our ID:
467            byte[] idbytes = BitConverter.GetBytes(ourID);
468            Debug.Assert(idbytes.Length == 4);
469            serializedYield = concat(idbytes, serializedYield); ;
470
471            //Compress:
472            MemoryStream memStream = new MemoryStream();
473            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Compress);
474            defStream.Write(serializedYield, 0, serializedYield.Length);
475            defStream.Close();
476            byte[] compressedYield = memStream.ToArray();
477
478            //Debug stuff:
479            byte[] decompr = DecompressYield(compressedYield);
480            Debug.Assert(decompr.Length == serializedYield.Length);
481
482            //store in queue, so the LoadStoreThread can store it in the DHT later:
483            storequeue.Enqueue(compressedYield);           
484        }
485
486        /// <summary>
487        /// Sets the channel in which we want to sieve
488        /// </summary>
489        public void SetChannel(string channel)
490        {
491            this.channel = channel;
492        }
493
494        /// <summary>
495        /// Sets the number to sieve
496        /// </summary>
497        public void SetNumber(BigInteger number)
498        {
499            this.number = number;
500        }
501
502        /// <summary>
503        /// Sets the factor to sieve next and starts reading informations from the DHT.
504        /// </summary>
505        public void SetFactor(BigInteger factor)
506        {
507            this.factor = factor;
508            Debug.Assert(this.number % this.factor == 0);
509
510            ClearProgressYields();
511            nameCache = new Dictionary<int, string>();
512            peerPerformances = new Queue<PeerPerformanceInformations>();
513            activePeers = new HashSet<int>();
514
515            head = 0;
516            SynchronizeHead();
517           
518            //store our name:
519            P2PManager.Retrieve(NameIdentifier(ourID));     //just to outsmart the versioning system
520            P2PManager.Store(NameIdentifier(ourID),  System.Text.ASCIIEncoding.ASCII.GetBytes(ourName.ToCharArray()));
521
522            if (loadStoreThread != null)
523                throw new Exception("LoadStoreThread already started");
524            StartLoadStoreThread();
525        }
526
527        /// <summary>
528        /// Synchronizes the factorManager with the DHT.
529        /// Return false if this.factor is not a composite factor in the DHT (which means, that another peer already finished sieving this.factor).
530        /// </summary>
531        public bool SyncFactorManager(FactorManager factorManager)
532        {
533            FactorManager dhtFactorManager = null;
534            //load DHT Factor Manager:
535            byte[] dhtFactorManagerBytes = P2PManager.Retrieve(FactorListIdentifier()).Data;
536            if (dhtFactorManagerBytes != null)
537            {
538                MemoryStream memstream = new MemoryStream();
539                memstream.Write(dhtFactorManagerBytes, 0, dhtFactorManagerBytes.Length);
540                memstream.Position = 0;
541                BinaryFormatter bformatter = new BinaryFormatter();
542                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
543                try
544                {
545                    dhtFactorManager = (FactorManager)bformatter.Deserialize(memstream);
546                }
547                catch (System.Runtime.Serialization.SerializationException)
548                {
549                    P2PWarning("DHT factor list is broken!");
550                    P2PManager.Remove(FactorListIdentifier());
551                    return SyncFactorManager(factorManager);
552                }               
553            }
554
555            //Synchronize DHT Factor Manager with our Factor List
556            if (dhtFactorManager == null || factorManager.Synchronize(dhtFactorManager))
557            {
558                //Our Factor Manager has more informations, so let's store it in the DHT:
559                MemoryStream memstream = new MemoryStream();
560                BinaryFormatter bformatter = new BinaryFormatter();
561                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
562                bformatter.Serialize(memstream, factorManager);
563                bool success = P2PManager.Store(FactorListIdentifier(), memstream.ToArray()).IsSuccessful();
564                if (!success)
565                {
566                    Thread.Sleep(1000);
567                    return SyncFactorManager(factorManager);       //Just try again
568                }
569            }
570
571            return factorManager.ContainsComposite(this.factor);
572        }
573
574        /// <summary>
575        /// Sets the performance of this machine, so that this class can write it to the DHT later.
576        /// The performance is meassured in relations per ms.
577        /// </summary>
578        public void SetOurPerformance(double[] relationsPerMS)
579        {
580            double globalPerformance = 0;
581            foreach (double r in relationsPerMS)
582                globalPerformance += r;
583
584            ourPerformance = globalPerformance;
585        }
586
587        /// <summary>
588        /// Returns the performance of all peers (excluding ourselve) in relations per ms.
589        /// </summary>
590        public double GetP2PPerformance()
591        {
592            double perf = 0;
593
594            foreach (var p in peerPerformances)
595                perf += p.performance;
596
597            return perf;
598        }
599    }
600}
Note: See TracBrowser for help on using the repository browser.