source: trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs @ 1632

Last change on this file since 1632 was 1632, checked in by Sven Rech, 12 years ago

better id generation for quadratic sieve

File size: 24.1 KB
Line 
1/*                             
2   Copyright 2010 Sven Rech, Uni Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21using System.IO.Compression;
22using System.IO;
23using Cryptool.P2P;
24using System.Numerics;
25using System.Diagnostics;
26using System.Collections;
27using System.Threading;
28using System.Windows.Threading;
29using System.Runtime.Serialization.Formatters.Binary;
30using System.Management;
31using System.Security.Principal;
32using System.Security.Cryptography;
33
34namespace Cryptool.Plugins.QuadraticSieve
35{
36    class PeerToPeer
37    {
38        private struct PeerPerformanceInformations
39        {
40            public DateTime lastChecked;
41            public int peerID;
42            public double performance;
43            public int lastAlive;           
44
45            public PeerPerformanceInformations(DateTime lastChecked, int peerID, double performance, int lastAlive)
46            {
47                this.lastAlive = lastAlive;
48                this.lastChecked = lastChecked;
49                this.peerID = peerID;
50                this.performance = performance;
51            }
52        }
53
54        private string channel;
55        private BigInteger number;
56        private BigInteger factor;
57        private int head;
58        private Queue storequeue;   //yields to store in the DHT
59        private Queue loadqueue;    //yields that have been loaded from the DHT
60        private Thread loadStoreThread;
61        private bool stopLoadStoreThread;
62        private QuadraticSievePresentation quadraticSieveQuickWatchPresentation;
63        private AutoResetEvent yieldEvent;
64        private int ourID;           //Our ID
65        private Dictionary<int, string> nameCache;  //associates the ids with the names
66        private Queue<PeerPerformanceInformations> peerPerformances;      //A queue of performances from the different peers ordered by the date last checked.
67        private HashSet<int> activePeers;                                 //A set of active peers
68        private double ourPerformance = 0;
69        private int aliveCounter = 0;       //This is stored together with the performance in the DHT
70        private string ourName;
71
72        public delegate void P2PWarningHandler(String warning);
73        public event P2PWarningHandler P2PWarning;       
74
75        public PeerToPeer(QuadraticSievePresentation presentation, AutoResetEvent yieldEvent)
76        {
77            quadraticSieveQuickWatchPresentation = presentation;
78            this.yieldEvent = yieldEvent;
79            SetOurID();
80        }
81
82        private void SetOurID()
83        {
84            string username = WindowsIdentity.GetCurrent().Name;
85            string mac = GetMac();
86
87            MD5 md5 = new MD5CryptoServiceProvider();
88            byte[] idBytes = md5.ComputeHash(System.Text.ASCIIEncoding.ASCII.GetBytes(username + mac));           
89           
90            //Is it really a good idea to calculate the ID like this?
91            for (int c = 0; c < idBytes.Length; c++)           
92                ourID = ourID + (idBytes[c] << c);
93
94            quadraticSieveQuickWatchPresentation.ProgressYields.setOurID(ourID);
95
96            ourName = System.Net.Dns.GetHostName();
97        }
98
99        /// <summary>
100        /// Reads yield at position "index" from DHT and returns the ownerID and the decompressed packet itself.
101        /// </summary>
102        private byte[] ReadYield(int index, out int ownerID)
103        {
104            ownerID = 0;
105            byte[] yield = P2PManager.Retrieve(YieldIdentifier(index));
106            if (yield == null)
107                return null;
108
109            byte[] decompressedYield = DecompressYield(yield);
110
111            byte[] idbytes = new byte[4];
112            Array.Copy(decompressedYield, idbytes, 4);
113            ownerID = BitConverter.ToInt32(idbytes, 0);
114            byte[] y = new byte[decompressedYield.Length - 4];
115            Array.Copy(decompressedYield, 4, y, 0, y.Length);
116
117            return y;
118        }
119
120        private static byte[] DecompressYield(byte[] yield)
121        {
122            MemoryStream memStream = new MemoryStream();
123            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Decompress);
124            memStream.Write(yield, 0, yield.Length);
125            memStream.Position = 0;
126            MemoryStream memStream2 = new MemoryStream();
127            defStream.CopyTo(memStream2);
128            defStream.Close();
129            byte[] decompressedYield = memStream2.ToArray();
130            return decompressedYield;
131        }
132
133        private uint ReadAndEnqueueYield(int loadIndex, uint downloaded, uint uploaded)
134        {
135            int ownerID;
136            byte[] yield = ReadYield(loadIndex, out ownerID);
137            if (yield != null)
138            {
139                downloaded += (uint)yield.Length;
140                ShowTransfered(downloaded, uploaded);
141                loadqueue.Enqueue(yield);
142
143                //Progress Yield:
144                if (!nameCache.ContainsKey(ownerID))
145                {
146                    byte[] n = P2PManager.Retrieve(NameIdentifier(ownerID));
147                    if (n != null)
148                        nameCache.Add(ownerID, System.Text.ASCIIEncoding.ASCII.GetString(n));
149                }
150                string name = null;
151                if (nameCache.ContainsKey(ownerID))
152                    name = nameCache[ownerID];
153                SetProgressYield(loadIndex, ownerID, name);
154
155                //Performance and alive informations:
156                if (!activePeers.Contains(ownerID))
157                {
158                    UpdatePeerPerformanceAndAliveInformation(ownerID);
159                    activePeers.Add(ownerID);
160                    UpdateActivePeerInformation();
161                }
162
163                yieldEvent.Set();
164                return downloaded;
165            }
166            return 0;
167        }
168
169        /// <summary>
170        /// Tries to read and enqueue yield on position "loadIndex".
171        /// If it fails, it stores the index in lostIndices queue.
172        /// </summary>
173        private uint TryReadAndEnqueueYield(int index, uint downloaded, uint uploaded, Queue<KeyValuePair<int, DateTime>> lostIndices)
174        {
175            uint res = ReadAndEnqueueYield(index, downloaded, uploaded);
176            if (res != 0)
177                downloaded = res;
178            else
179            {
180                var e = new KeyValuePair<int, DateTime>(index, DateTime.Now);
181                lostIndices.Enqueue(e);
182                SetProgressYield(index, -1, null);
183            }
184
185            return downloaded;
186        }
187
188        private void LoadStoreThreadProc()
189        {
190            int loadIndex = 0;
191            uint downloaded = 0;
192            uint uploaded = 0;
193            HashSet<int> ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
194            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
195            Queue<KeyValuePair<int, DateTime>> lostIndices = new Queue<KeyValuePair<int, DateTime>>();
196            double lastPerformance = 0;
197            DateTime performanceLastPut = new DateTime();
198
199            try
200            {
201                while (!stopLoadStoreThread)
202                {
203                    //Store our performance and our alive counter in the DHT, either if the performance changed or when the last write was more than 1 minute ago:
204                    if (ourPerformance != lastPerformance || performanceLastPut.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 0))) < 0)
205                    {
206                        P2PManager.Retrieve(PerformanceIdentifier(ourID));      //just to outsmart the versioning system
207                        P2PManager.Store(PerformanceIdentifier(ourID), concat(BitConverter.GetBytes(ourPerformance), BitConverter.GetBytes(aliveCounter++)));
208                        lastPerformance = ourPerformance;
209                        performanceLastPut = DateTime.Now;
210                    }
211
212                    //updates all peer performances which have last been checked more than 2 minutes ago and check if they are still alive:
213                    while (peerPerformances.Count != 0 && peerPerformances.Peek().lastChecked.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
214                    {
215                        var e = peerPerformances.Dequeue();
216
217                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(e.peerID));
218                        if (performancebytes != null)
219                        {
220                            double performance = BitConverter.ToDouble(performancebytes, 0);
221                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
222                            if (peerAliveCounter <= e.lastAlive)
223                            {
224                                activePeers.Remove(e.peerID);
225                                UpdateActivePeerInformation();
226                            }
227                            else
228                                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, e.peerID, performance, peerAliveCounter));
229                        }
230                        else
231                        {
232                            activePeers.Remove(e.peerID);
233                        }                       
234                    }
235
236                    SynchronizeHead();
237
238                    bool busy = false;
239
240                    if (storequeue.Count != 0)  //store our packages
241                    {
242                        byte[] yield = (byte[])storequeue.Dequeue();
243                        bool success = P2PManager.Store(YieldIdentifier(head), yield);
244                        while (!success)
245                        {
246                            SynchronizeHead();
247                            success = P2PManager.Store(YieldIdentifier(head), yield);
248                        }
249                        SetProgressYield(head, ourID, null);
250                        ourIndices.Add(head);
251                       
252                        head++;
253
254                        //show informations about the uploaded yield:
255                        uploaded += (uint)yield.Length;
256                        ShowTransfered(downloaded, uploaded);
257                        busy = true;
258                    }
259
260                    //load the other yields:
261
262                    //skip all indices which are uploaded by us:
263                    while (ourIndices.Contains(loadIndex))
264                        loadIndex++;
265
266                    if (loadIndex < head)
267                    {
268                        downloaded = TryReadAndEnqueueYield(loadIndex, downloaded, uploaded, lostIndices);
269                        loadIndex++;
270                        busy = true;
271                    }
272                   
273                    //check all lost indices which are last checked longer than 2 minutes ago (but only if we have nothing else to do):
274                    if (!busy)
275                    {
276                        int count = 0;                       
277                        //TODO: Maybe we should throw away those indices, which have been checked more than several times.
278                        while (lostIndices.Count != 0 && lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
279                        {
280                            var e = lostIndices.Dequeue();
281                            downloaded = TryReadAndEnqueueYield(loadIndex, downloaded, uploaded, lostIndices);
282                            count++;
283                        }
284
285                        if (count == 0)
286                            Thread.Sleep(5000);    //Wait 5 seconds
287                    }
288
289                }
290            }
291            catch (ThreadInterruptedException)
292            {
293                return;
294            }
295        }
296
297        private void UpdateActivePeerInformation()
298        {
299            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
300            {
301                quadraticSieveQuickWatchPresentation.amountOfPeers.Content = "" + activePeers.Count + " peers active!";
302            }, null);
303        }
304
305        private void UpdatePeerPerformanceAndAliveInformation(int peerID)
306        {
307            byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(peerID));
308            if (performancebytes != null)
309            {
310                double performance = BitConverter.ToDouble(performancebytes, 0);
311                int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
312                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, peerID, performance, peerAliveCounter));
313            }
314        }
315
316        /// <summary>
317        /// Loads head from DHT. If ours is greater (or there is no entry yet), we store ours.
318        /// </summary>
319        private void SynchronizeHead()
320        {
321            byte[] h = P2PManager.Retrieve(HeadIdentifier());
322            if (h != null)
323            {
324                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
325                if (head > dhthead)
326                {
327                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));
328                    if (!success)
329                        SynchronizeHead();
330                }
331                else if (head < dhthead)
332                {
333                    head = dhthead;
334                    SetProgressYield(head - 1, 0, null);
335                }
336            }
337            else
338            {
339                bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));
340                if (!success)
341                    SynchronizeHead();
342            }
343        }
344
345        private void ShowTransfered(uint downloaded, uint uploaded)
346        {
347            string s1 = ((downloaded / 1024.0) / 1024).ToString();
348            string size1 = s1.Substring(0, (s1.Length < 3) ? s1.Length : 3);
349            string s2 = ((uploaded / 1024.0) / 1024).ToString();
350            string size2 = s2.Substring(0, (s2.Length < 3) ? s2.Length : 3);
351            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
352            {
353                quadraticSieveQuickWatchPresentation.relationsInfo.Content = "Downloaded " + size1 + " MB! Uploaded " + size2 + " MB!";
354            }, null);
355        }
356
357        private void SetProgressYield(int index, int id, string name)
358        {
359            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
360            {
361                quadraticSieveQuickWatchPresentation.ProgressYields.Set(index, id, name);
362            }, null);
363        }
364
365        private void ClearProgressYields()
366        {
367            quadraticSieveQuickWatchPresentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
368            {
369                quadraticSieveQuickWatchPresentation.ProgressYields.Clear();
370            }, null);
371        }
372
373        private string HeadIdentifier()
374        {
375            return channel + "#" + number + "-" + factor + "HEAD";
376        }
377
378        private string FactorListIdentifier()
379        {
380            return channel + "#" + number + "FACTORLIST";
381        }
382
383        private string YieldIdentifier(int index)
384        {
385            return channel + "#" + number + "-" + factor + "!" + index;
386        }
387
388        private string NameIdentifier(int ID)
389        {
390            return channel + "#" + number + "NAME" + ID.ToString();
391        }
392
393        private string PerformanceIdentifier(int ID)
394        {
395            return channel + "#" + number + "-" + factor + "PERFORMANCE" + ID.ToString();
396        }   
397
398        private void StartLoadStoreThread()
399        {
400            storequeue = Queue.Synchronized(new Queue());
401            loadqueue = Queue.Synchronized(new Queue());
402
403            stopLoadStoreThread = false;
404            loadStoreThread = new Thread(LoadStoreThreadProc);
405            loadStoreThread.Start();
406        }
407
408        public void StopLoadStoreThread()
409        {
410            stopLoadStoreThread = true;
411            loadStoreThread.Interrupt();
412            loadStoreThread.Join();
413            loadStoreThread = null;
414        }
415
416        /// <summary>
417        /// Concatenates the two byte arrays a1 and a2 an returns the result array.
418        /// </summary>
419        private byte[] concat(byte[] a1, byte[] a2)
420        {
421            byte[] res = new byte[a1.Length + a2.Length];
422            System.Buffer.BlockCopy(a1, 0, res, 0, a1.Length);
423            System.Buffer.BlockCopy(a2, 0, res, a1.Length, a2.Length);
424            return res;
425        }
426       
427        /// <summary>
428        /// Returns our MAC address
429        /// </summary>       
430        private string GetMac()
431        {
432            string Mac = string.Empty;
433            ManagementClass MC = new ManagementClass("Win32_NetworkAdapter");
434            ManagementObjectCollection MOCol = MC.GetInstances();
435            foreach (ManagementObject MO in MOCol)
436                if (MO != null)
437                {
438                    if (MO["MacAddress"] != null)
439                    {
440                        Mac = MO["MACAddress"].ToString();
441                        if (Mac != string.Empty)
442                            break;
443                    }
444                }
445            return Mac;
446        }
447
448        public Queue GetLoadedYieldsQueue()
449        {
450            return loadqueue;
451        }
452
453        /// <summary>
454        /// Compresses the yield and puts it in the DHT.
455        /// </summary>
456        public void Put(byte[] serializedYield)
457        {
458            //Add our ID:
459            byte[] idbytes = BitConverter.GetBytes(ourID);
460            Debug.Assert(idbytes.Length == 4);
461            serializedYield = concat(idbytes, serializedYield); ;
462
463            //Compress:
464            MemoryStream memStream = new MemoryStream();
465            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Compress);
466            defStream.Write(serializedYield, 0, serializedYield.Length);
467            defStream.Close();
468            byte[] compressedYield = memStream.ToArray();
469
470            //Debug stuff:
471            byte[] decompr = DecompressYield(compressedYield);
472            Debug.Assert(decompr.Length == serializedYield.Length);
473
474            //store in queue, so the LoadStoreThread can store it in the DHT later:
475            storequeue.Enqueue(compressedYield);           
476        }
477
478        /// <summary>
479        /// Sets the channel in which we want to sieve
480        /// </summary>
481        public void SetChannel(string channel)
482        {
483            this.channel = channel;
484        }
485
486        /// <summary>
487        /// Sets the number to sieve
488        /// </summary>
489        public void SetNumber(BigInteger number)
490        {
491            this.number = number;
492        }
493
494        /// <summary>
495        /// Sets the factor to sieve next and starts reading informations from the DHT.
496        /// </summary>
497        public void SetFactor(BigInteger factor)
498        {
499            this.factor = factor;
500            Debug.Assert(this.number % this.factor == 0);
501
502            ClearProgressYields();
503            nameCache = new Dictionary<int, string>();
504            peerPerformances = new Queue<PeerPerformanceInformations>();
505            activePeers = new HashSet<int>();
506
507            //load head:
508            byte[] h = P2PManager.Retrieve(HeadIdentifier());
509            if (h != null)
510            {               
511                head = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
512                SetProgressYield(head-1, 0, null);
513            }
514            else
515                head = 0;
516           
517            //store our name:
518            P2PManager.Retrieve(NameIdentifier(ourID));     //just to outsmart the versioning system
519            P2PManager.Store(NameIdentifier(ourID),  System.Text.ASCIIEncoding.ASCII.GetBytes(ourName.ToCharArray()));
520
521            if (loadStoreThread != null)
522                throw new Exception("LoadStoreThread already started");
523            StartLoadStoreThread();
524        }
525
526        /// <summary>
527        /// Synchronizes the factorManager with the DHT.
528        /// Return false if this.factor is not a composite factor in the DHT (which means, that another peer already finished sieving this.factor).
529        /// </summary>
530        public bool SyncFactorManager(FactorManager factorManager)
531        {
532            FactorManager dhtFactorManager = null;
533            //load DHT Factor Manager:
534            byte[] dhtFactorManagerBytes = P2PManager.Retrieve(FactorListIdentifier());
535            if (dhtFactorManagerBytes != null)
536            {
537                MemoryStream memstream = new MemoryStream();
538                memstream.Write(dhtFactorManagerBytes, 0, dhtFactorManagerBytes.Length);
539                memstream.Position = 0;
540                BinaryFormatter bformatter = new BinaryFormatter();
541                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
542                try
543                {
544                    dhtFactorManager = (FactorManager)bformatter.Deserialize(memstream);
545                }
546                catch (System.Runtime.Serialization.SerializationException)
547                {
548                    P2PWarning("DHT factor list is broken!");
549                    P2PManager.Remove(FactorListIdentifier());
550                    return SyncFactorManager(factorManager);
551                }               
552            }
553
554            //Synchronize DHT Factor Manager with our Factor List
555            if (dhtFactorManager == null || factorManager.Synchronize(dhtFactorManager))
556            {
557                //Our Factor Manager has more informations, so let's store it in the DHT:
558                MemoryStream memstream = new MemoryStream();
559                BinaryFormatter bformatter = new BinaryFormatter();
560                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
561                bformatter.Serialize(memstream, factorManager);
562                bool success = P2PManager.Store(FactorListIdentifier(), memstream.ToArray());
563                if (!success)
564                {
565                    Thread.Sleep(1000);
566                    return SyncFactorManager(factorManager);       //Just try again
567                }
568            }
569
570            return factorManager.ContainsComposite(this.factor);
571        }
572
573        /// <summary>
574        /// Sets the performance of this machine, so that this class can write it to the DHT later.
575        /// The performance is meassured in relations per ms.
576        /// </summary>
577        public void SetOurPerformance(double[] relationsPerMS)
578        {
579            double globalPerformance = 0;
580            foreach (double r in relationsPerMS)
581                globalPerformance += r;
582
583            ourPerformance = globalPerformance;
584        }
585
586        /// <summary>
587        /// Returns the performance of all peers (excluding ourselve) in relations per ms.
588        /// </summary>
589        public double GetP2PPerformance()
590        {
591            double perf = 0;
592
593            foreach (var p in peerPerformances)
594                perf += p.performance;
595
596            return perf;
597        }
598    }
599}
Note: See TracBrowser for help on using the repository browser.