source: trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs @ 1689

Last change on this file since 1689 was 1689, checked in by Sven Rech, 12 years ago

changed quadratic sieve color generation

File size: 24.3 KB
Line 
1/*                             
2   Copyright 2010 Sven Rech, Uni Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21using System.IO.Compression;
22using System.IO;
23using Cryptool.P2P;
24using System.Numerics;
25using System.Diagnostics;
26using System.Collections;
27using System.Threading;
28using System.Windows.Threading;
29using System.Runtime.Serialization.Formatters.Binary;
30using System.Management;
31using System.Security.Principal;
32using System.Security.Cryptography;
33
34namespace Cryptool.Plugins.QuadraticSieve
35{
36    class PeerToPeer
37    {
38        /// <summary>
39        /// This structure is used to hold all important peer performance and alive informations in a queue
40        /// </summary>
41        private struct PeerPerformanceInformations
42        {
43            public DateTime lastChecked;
44            public int peerID;
45            public double performance;
46            public int lastAlive;           
47
48            public PeerPerformanceInformations(DateTime lastChecked, int peerID, double performance, int lastAlive)
49            {
50                this.lastAlive = lastAlive;
51                this.lastChecked = lastChecked;
52                this.peerID = peerID;
53                this.performance = performance;
54            }
55        }
56
57        private string channel;
58        private BigInteger number;
59        private BigInteger factor;
60        private int head;
61        private Queue storequeue;   //yields to store in the DHT
62        private Queue loadqueue;    //yields that have been loaded from the DHT
63        private Thread loadStoreThread;
64        private bool stopLoadStoreThread;
65        private QuadraticSievePresentation quadraticSieveQuickWatchPresentation;
66        private AutoResetEvent yieldEvent;
67        private int ourID;           //Our ID
68        private Dictionary<int, string> nameCache;  //associates the ids with the names
69        private Queue<PeerPerformanceInformations> peerPerformances;      //A queue of performances from the different peers ordered by the date last checked.
70        private HashSet<int> activePeers;                                 //A set of active peers
71        private double ourPerformance = 0;
72        private int aliveCounter = 0;       //This is stored together with the performance in the DHT
73        private string ourName;
74        private uint downloaded = 0;
75        private uint uploaded = 0;
76
77        public delegate void P2PWarningHandler(String warning);
78        public event P2PWarningHandler P2PWarning;       
79
80        public PeerToPeer(QuadraticSievePresentation presentation, AutoResetEvent yieldEvent)
81        {
82            quadraticSieveQuickWatchPresentation = presentation;
83            this.yieldEvent = yieldEvent;
84            SetOurID();
85        }
86
87        private void SetOurID()
88        {
89            string username = WindowsIdentity.GetCurrent().Name;
90            string mac = GetMacIdentifier();
91
92            MD5 md5 = new MD5CryptoServiceProvider();
93            byte[] idBytes = md5.ComputeHash(System.Text.ASCIIEncoding.ASCII.GetBytes(username + mac));
94
95            ourID = BitConverter.ToInt32(idBytes, 3);
96            quadraticSieveQuickWatchPresentation.ProgressYields.setOurID(ourID);
97
98            ourName = System.Net.Dns.GetHostName();
99        }
100
101        /// <summary>
102        /// Reads yield at position "index" from DHT and returns the ownerID and the decompressed packet itself.
103        /// </summary>
104        private byte[] ReadYield(int index, out int ownerID)
105        {
106            ownerID = 0;
107            byte[] yield = P2PManager.Retrieve(YieldIdentifier(index)).Data;
108            if (yield == null)
109                return null;
110            downloaded += (uint)yield.Length;
111
112            byte[] decompressedYield = DecompressYield(yield);
113
114            byte[] idbytes = new byte[4];
115            Array.Copy(decompressedYield, idbytes, 4);
116            ownerID = BitConverter.ToInt32(idbytes, 0);
117            byte[] y = new byte[decompressedYield.Length - 4];
118            Array.Copy(decompressedYield, 4, y, 0, y.Length);
119           
120            return y;
121        }
122
123        private static byte[] DecompressYield(byte[] yield)
124        {
125            MemoryStream memStream = new MemoryStream();
126            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Decompress);
127            memStream.Write(yield, 0, yield.Length);
128            memStream.Position = 0;
129            MemoryStream memStream2 = new MemoryStream();
130            defStream.CopyTo(memStream2);
131            defStream.Close();
132            byte[] decompressedYield = memStream2.ToArray();
133            return decompressedYield;
134        }
135
136        private bool ReadAndEnqueueYield(int loadIndex, bool checkAlive)
137        {
138            int ownerID;
139            byte[] yield = ReadYield(loadIndex, out ownerID);
140            if (yield != null)
141            {
142                ShowTransfered(downloaded, uploaded);
143                loadqueue.Enqueue(yield);
144
145                string name = null;
146
147                if (ownerID != ourID)
148                {
149                    //Progress Yield:
150                    if (!nameCache.ContainsKey(ownerID))
151                    {
152                        byte[] n = P2PManager.Retrieve(NameIdentifier(ownerID)).Data;
153                        if (n != null)
154                            nameCache.Add(ownerID, System.Text.ASCIIEncoding.ASCII.GetString(n));
155                    }
156                    if (nameCache.ContainsKey(ownerID))
157                        name = nameCache[ownerID];
158                                       
159                    if (checkAlive && !activePeers.Contains(ownerID))
160                    {
161                        //check performance and alive informations:
162                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(ownerID)).Data;
163                        if (performancebytes != null)
164                        {
165                            double performance = BitConverter.ToDouble(performancebytes, 0);
166                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
167                            peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, ownerID, performance, peerAliveCounter));
168                        }
169                        activePeers.Add(ownerID);
170                        UpdateActivePeerInformation();
171                    }
172                }
173
174                //Set progress info:
175                SetProgressYield(loadIndex, ownerID, name);
176
177                yieldEvent.Set();
178                return true;
179            }
180            return false;
181        }
182
183        /// <summary>
184        /// Tries to read and enqueue yield on position "loadIndex".
185        /// If it fails, it stores the index in lostIndices queue.
186        /// </summary>
187        private void TryReadAndEnqueueYield(int index, bool checkAlive, Queue<KeyValuePair<int, DateTime>> lostIndices)
188        {
189            bool succ = ReadAndEnqueueYield(index, checkAlive);
190            if (!succ)               
191            {
192                var e = new KeyValuePair<int, DateTime>(index, DateTime.Now);
193                lostIndices.Enqueue(e);
194                SetProgressYield(index, -1, null);
195            }
196        }
197
198        private void LoadStoreThreadProc()
199        {
200            int loadIndex = 0;
201            downloaded = 0;
202            uploaded = 0;
203            HashSet<int> ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
204            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
205            Queue<KeyValuePair<int, DateTime>> lostIndices = new Queue<KeyValuePair<int, DateTime>>();
206            double lastPerformance = 0;
207            DateTime performanceLastPut = new DateTime();
208
209            try
210            {
211                SynchronizeHead();
212                int startHead = head;
213
214                while (!stopLoadStoreThread)
215                {
216                    //Store our performance and our alive counter in the DHT, either if the performance changed or when the last write was more than 1 minute ago:
217                    if (ourPerformance != lastPerformance || performanceLastPut.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 0))) < 0)
218                    {
219                        P2PManager.Retrieve(PerformanceIdentifier(ourID));      //just to outsmart the versioning system
220                        P2PManager.Store(PerformanceIdentifier(ourID), concat(BitConverter.GetBytes(ourPerformance), BitConverter.GetBytes(aliveCounter++)));
221                        lastPerformance = ourPerformance;
222                        performanceLastPut = DateTime.Now;
223                    }
224
225                    //updates all peer performances which have last been checked more than 1 minutes and 20 seconds ago and check if they are still alive:
226                    while (peerPerformances.Count != 0 && peerPerformances.Peek().lastChecked.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 20))) < 0)
227                    {
228                        var e = peerPerformances.Dequeue();
229
230                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(e.peerID)).Data;
231                        if (performancebytes != null)
232                        {
233                            double performance = BitConverter.ToDouble(performancebytes, 0);
234                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
235                            if (peerAliveCounter <= e.lastAlive)
236                            {
237                                activePeers.Remove(e.peerID);
238                                UpdateActivePeerInformation();
239                            }
240                            else
241                                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, e.peerID, performance, peerAliveCounter));
242                        }
243                        else
244                        {
245                            activePeers.Remove(e.peerID);
246                            UpdateActivePeerInformation();
247                        }                       
248                    }
249
250                    SynchronizeHead();
251
252                    bool busy = false;
253
254                    if (storequeue.Count != 0)  //store our packages
255                    {
256                        byte[] yield = (byte[])storequeue.Dequeue();
257                        bool success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
258                        while (!success)
259                        {
260                            SynchronizeHead();
261                            success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
262                        }
263                        SetProgressYield(head, ourID, null);
264                        ourIndices.Add(head);
265                       
266                        head++;
267
268                        //show informations about the uploaded yield:
269                        uploaded += (uint)yield.Length;
270                        ShowTransfered(downloaded, uploaded);
271                        busy = true;
272                    }
273
274                    //load the other yields:
275
276                    //skip all indices which are uploaded by us:
277                    while (ourIndices.Contains(loadIndex))
278                        loadIndex++;
279
280                    if (loadIndex < head)
281                    {
282                        bool checkAlive = loadIndex >= startHead;       //we don't check the peer alive informations of the packages that existed before we joined
283                        TryReadAndEnqueueYield(loadIndex, checkAlive, lostIndices);
284                        loadIndex++;
285                        busy = true;
286                    }
287                   
288                    //check all lost indices which are last checked longer than 2 minutes ago (but only if we have nothing else to do):
289                    if (!busy)
290                    {
291                        int count = 0;
292                        //TODO: Maybe we should throw away those indices, which have been checked more than several times.
293                        while (lostIndices.Count != 0 && lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
294                        {
295                            var e = lostIndices.Dequeue();
296                            TryReadAndEnqueueYield(loadIndex, true, lostIndices);
297                            count++;
298                        }
299
300                        if (count == 0)
301                            Thread.Sleep(5000);    //Wait 5 seconds
302                    }
303
304                }
305            }
306            catch (ThreadInterruptedException)
307            {
308                return;
309            }
310        }
311
312        private void UpdateActivePeerInformation()
313        {
314            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
315            {
316                quadraticSieveQuickWatchPresentation.amountOfPeers.Content = "" + activePeers.Count + " other peer" + (activePeers.Count!=1 ? "s" : "") + " active!";
317            }, null);
318        }
319
320        /// <summary>
321        /// Loads head from DHT. If ours is greater (or there is no entry yet), we store ours.
322        /// </summary>
323        private void SynchronizeHead()
324        {
325            byte[] h = P2PManager.Retrieve(HeadIdentifier()).Data;
326            if (h != null)
327            {
328                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
329                if (head > dhthead)
330                {
331                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
332                    if (!success)
333                        SynchronizeHead();
334                }
335                else if (head < dhthead)
336                {
337                    head = dhthead;
338                    if (head != 0)
339                        SetProgressYield(head - 1, 0, null);
340                }
341            }
342            else
343            {
344                bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
345                if (!success)
346                    SynchronizeHead();
347            }           
348        }
349
350        private void ShowTransfered(uint downloaded, uint uploaded)
351        {
352            string s1 = ((downloaded / 1024.0) / 1024).ToString();
353            string size1 = s1.Substring(0, (s1.Length < 3) ? s1.Length : 3);
354            string s2 = ((uploaded / 1024.0) / 1024).ToString();
355            string size2 = s2.Substring(0, (s2.Length < 3) ? s2.Length : 3);
356            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
357            {
358                quadraticSieveQuickWatchPresentation.relationsInfo.Content = "Downloaded " + size1 + " MB! Uploaded " + size2 + " MB!";
359            }, null);
360        }
361
362        private void SetProgressYield(int index, int id, string name)
363        {
364            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
365            {
366                quadraticSieveQuickWatchPresentation.ProgressYields.Set(index, id, name);
367            }, null);
368        }
369
370        private void ClearProgressYields()
371        {
372            quadraticSieveQuickWatchPresentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
373            {
374                quadraticSieveQuickWatchPresentation.ProgressYields.Clear();
375            }, null);
376        }
377
378        private string HeadIdentifier()
379        {
380            return channel + "#" + number + "-" + factor + "HEAD";
381        }
382
383        private string FactorListIdentifier()
384        {
385            return channel + "#" + number + "FACTORLIST";
386        }
387
388        private string YieldIdentifier(int index)
389        {
390            return channel + "#" + number + "-" + factor + "!" + index;
391        }
392
393        private string NameIdentifier(int ID)
394        {
395            return channel + "#" + number + "NAME" + ID.ToString();
396        }
397
398        private string PerformanceIdentifier(int ID)
399        {
400            return channel + "#" + number + "-" + factor + "PERFORMANCE" + ID.ToString();
401        }   
402
403        private void StartLoadStoreThread()
404        {
405            storequeue = Queue.Synchronized(new Queue());
406            loadqueue = Queue.Synchronized(new Queue());
407
408            stopLoadStoreThread = false;
409            loadStoreThread = new Thread(LoadStoreThreadProc);
410            loadStoreThread.Start();
411        }
412
413        public void StopLoadStoreThread()
414        {
415            stopLoadStoreThread = true;
416            loadStoreThread.Interrupt();
417            loadStoreThread.Join();
418            loadStoreThread = null;
419        }
420
421        /// <summary>
422        /// Concatenates the two byte arrays a1 and a2 an returns the result array.
423        /// </summary>
424        private byte[] concat(byte[] a1, byte[] a2)
425        {
426            byte[] res = new byte[a1.Length + a2.Length];
427            System.Buffer.BlockCopy(a1, 0, res, 0, a1.Length);
428            System.Buffer.BlockCopy(a2, 0, res, a1.Length, a2.Length);
429            return res;
430        }
431       
432        /// <summary>
433        /// Returns an identifier that depends on the MAC addresses of this system
434        /// </summary>       
435        private string GetMacIdentifier()
436        {
437            string MacID = "";
438            ManagementClass MC = new ManagementClass("Win32_NetworkAdapter");
439            ManagementObjectCollection MOCol = MC.GetInstances();
440            foreach (ManagementObject MO in MOCol)
441                if (MO != null)
442                    if (MO["MacAddress"] != null)
443                        MacID += MO["MACAddress"].ToString();
444            return MacID;
445        }
446
447        public Queue GetLoadedYieldsQueue()
448        {
449            return loadqueue;
450        }
451
452        /// <summary>
453        /// Compresses the yield and puts it in the DHT.
454        /// </summary>
455        public void Put(byte[] serializedYield)
456        {
457            //Add our ID:
458            byte[] idbytes = BitConverter.GetBytes(ourID);
459            Debug.Assert(idbytes.Length == 4);
460            serializedYield = concat(idbytes, serializedYield); ;
461
462            //Compress:
463            MemoryStream memStream = new MemoryStream();
464            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Compress);
465            defStream.Write(serializedYield, 0, serializedYield.Length);
466            defStream.Close();
467            byte[] compressedYield = memStream.ToArray();
468
469            //Debug stuff:
470            byte[] decompr = DecompressYield(compressedYield);
471            Debug.Assert(decompr.Length == serializedYield.Length);
472
473            //store in queue, so the LoadStoreThread can store it in the DHT later:
474            storequeue.Enqueue(compressedYield);           
475        }
476
477        /// <summary>
478        /// Sets the channel in which we want to sieve
479        /// </summary>
480        public void SetChannel(string channel)
481        {
482            this.channel = channel;
483        }
484
485        /// <summary>
486        /// Sets the number to sieve
487        /// </summary>
488        public void SetNumber(BigInteger number)
489        {
490            this.number = number;
491        }
492
493        /// <summary>
494        /// Sets the factor to sieve next and starts reading informations from the DHT.
495        /// </summary>
496        public void SetFactor(BigInteger factor)
497        {
498            this.factor = factor;
499            Debug.Assert(this.number % this.factor == 0);
500
501            ClearProgressYields();
502            nameCache = new Dictionary<int, string>();
503            peerPerformances = new Queue<PeerPerformanceInformations>();
504            activePeers = new HashSet<int>();
505
506            head = 0;
507            SynchronizeHead();
508           
509            //store our name:
510            P2PManager.Retrieve(NameIdentifier(ourID));     //just to outsmart the versioning system
511            P2PManager.Store(NameIdentifier(ourID),  System.Text.ASCIIEncoding.ASCII.GetBytes(ourName.ToCharArray()));
512
513            if (loadStoreThread != null)
514                throw new Exception("LoadStoreThread already started");
515            StartLoadStoreThread();
516        }
517
518        /// <summary>
519        /// Synchronizes the factorManager with the DHT.
520        /// Return false if this.factor is not a composite factor in the DHT (which means, that another peer already finished sieving this.factor).
521        /// </summary>
522        public bool SyncFactorManager(FactorManager factorManager)
523        {
524            FactorManager dhtFactorManager = null;
525            //load DHT Factor Manager:
526            byte[] dhtFactorManagerBytes = P2PManager.Retrieve(FactorListIdentifier()).Data;
527            if (dhtFactorManagerBytes != null)
528            {
529                MemoryStream memstream = new MemoryStream();
530                memstream.Write(dhtFactorManagerBytes, 0, dhtFactorManagerBytes.Length);
531                memstream.Position = 0;
532                BinaryFormatter bformatter = new BinaryFormatter();
533                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
534                try
535                {
536                    dhtFactorManager = (FactorManager)bformatter.Deserialize(memstream);
537                }
538                catch (System.Runtime.Serialization.SerializationException)
539                {
540                    P2PWarning("DHT factor list is broken!");
541                    P2PManager.Remove(FactorListIdentifier());
542                    return SyncFactorManager(factorManager);
543                }               
544            }
545
546            //Synchronize DHT Factor Manager with our Factor List
547            if (dhtFactorManager == null || factorManager.Synchronize(dhtFactorManager))
548            {
549                //Our Factor Manager has more informations, so let's store it in the DHT:
550                MemoryStream memstream = new MemoryStream();
551                BinaryFormatter bformatter = new BinaryFormatter();
552                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
553                bformatter.Serialize(memstream, factorManager);
554                bool success = P2PManager.Store(FactorListIdentifier(), memstream.ToArray()).IsSuccessful();
555                if (!success)
556                {
557                    Thread.Sleep(1000);
558                    return SyncFactorManager(factorManager);       //Just try again
559                }
560            }
561
562            return factorManager.ContainsComposite(this.factor);
563        }
564
565        /// <summary>
566        /// Sets the performance of this machine, so that this class can write it to the DHT later.
567        /// The performance is meassured in relations per ms.
568        /// </summary>
569        public void SetOurPerformance(double[] relationsPerMS)
570        {
571            double globalPerformance = 0;
572            foreach (double r in relationsPerMS)
573                globalPerformance += r;
574
575            ourPerformance = globalPerformance;
576        }
577
578        /// <summary>
579        /// Returns the performance of all peers (excluding ourselve) in relations per ms.
580        /// </summary>
581        public double GetP2PPerformance()
582        {
583            double perf = 0;
584
585            foreach (var p in peerPerformances)
586                perf += p.performance;
587
588            return perf;
589        }
590    }
591}
Note: See TracBrowser for help on using the repository browser.