source: trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs @ 1676

Last change on this file since 1676 was 1676, checked in by Sven Rech, 12 years ago

quadratic sieve:

o altered the alive checking mechanism once again
o fixed msieve bug when pressing stop button

File size: 24.4 KB
Line 
1/*                             
2   Copyright 2010 Sven Rech, Uni Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21using System.IO.Compression;
22using System.IO;
23using Cryptool.P2P;
24using System.Numerics;
25using System.Diagnostics;
26using System.Collections;
27using System.Threading;
28using System.Windows.Threading;
29using System.Runtime.Serialization.Formatters.Binary;
30using System.Management;
31using System.Security.Principal;
32using System.Security.Cryptography;
33
34namespace Cryptool.Plugins.QuadraticSieve
35{
36    class PeerToPeer
37    {
38        /// <summary>
39        /// This structure is used to hold all important peer performance and alive informations in a queue
40        /// </summary>
41        private struct PeerPerformanceInformations
42        {
43            public DateTime lastChecked;
44            public int peerID;
45            public double performance;
46            public int lastAlive;           
47
48            public PeerPerformanceInformations(DateTime lastChecked, int peerID, double performance, int lastAlive)
49            {
50                this.lastAlive = lastAlive;
51                this.lastChecked = lastChecked;
52                this.peerID = peerID;
53                this.performance = performance;
54            }
55        }
56
57        private string channel;
58        private BigInteger number;
59        private BigInteger factor;
60        private int head;
61        private Queue storequeue;   //yields to store in the DHT
62        private Queue loadqueue;    //yields that have been loaded from the DHT
63        private Thread loadStoreThread;
64        private bool stopLoadStoreThread;
65        private QuadraticSievePresentation quadraticSieveQuickWatchPresentation;
66        private AutoResetEvent yieldEvent;
67        private int ourID;           //Our ID
68        private Dictionary<int, string> nameCache;  //associates the ids with the names
69        private Queue<PeerPerformanceInformations> peerPerformances;      //A queue of performances from the different peers ordered by the date last checked.
70        private HashSet<int> activePeers;                                 //A set of active peers
71        private double ourPerformance = 0;
72        private int aliveCounter = 0;       //This is stored together with the performance in the DHT
73        private string ourName;
74        private uint downloaded = 0;
75        private uint uploaded = 0;
76
77        public delegate void P2PWarningHandler(String warning);
78        public event P2PWarningHandler P2PWarning;       
79
80        public PeerToPeer(QuadraticSievePresentation presentation, AutoResetEvent yieldEvent)
81        {
82            quadraticSieveQuickWatchPresentation = presentation;
83            this.yieldEvent = yieldEvent;
84            SetOurID();
85        }
86
87        private void SetOurID()
88        {
89            string username = WindowsIdentity.GetCurrent().Name;
90            string mac = GetMac();
91
92            MD5 md5 = new MD5CryptoServiceProvider();
93            byte[] idBytes = md5.ComputeHash(System.Text.ASCIIEncoding.ASCII.GetBytes(username + mac));           
94           
95            //Is it really a good idea to calculate the ID like this?
96            for (int c = 0; c < idBytes.Length; c++)           
97                ourID = ourID + (idBytes[c] << c);
98
99            quadraticSieveQuickWatchPresentation.ProgressYields.setOurID(ourID);
100
101            ourName = System.Net.Dns.GetHostName();
102        }
103
104        /// <summary>
105        /// Reads yield at position "index" from DHT and returns the ownerID and the decompressed packet itself.
106        /// </summary>
107        private byte[] ReadYield(int index, out int ownerID)
108        {
109            ownerID = 0;
110            byte[] yield = P2PManager.Retrieve(YieldIdentifier(index)).Data;
111            if (yield == null)
112                return null;
113            downloaded += (uint)yield.Length;
114
115            byte[] decompressedYield = DecompressYield(yield);
116
117            byte[] idbytes = new byte[4];
118            Array.Copy(decompressedYield, idbytes, 4);
119            ownerID = BitConverter.ToInt32(idbytes, 0);
120            byte[] y = new byte[decompressedYield.Length - 4];
121            Array.Copy(decompressedYield, 4, y, 0, y.Length);
122           
123            return y;
124        }
125
126        private static byte[] DecompressYield(byte[] yield)
127        {
128            MemoryStream memStream = new MemoryStream();
129            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Decompress);
130            memStream.Write(yield, 0, yield.Length);
131            memStream.Position = 0;
132            MemoryStream memStream2 = new MemoryStream();
133            defStream.CopyTo(memStream2);
134            defStream.Close();
135            byte[] decompressedYield = memStream2.ToArray();
136            return decompressedYield;
137        }
138
139        private bool ReadAndEnqueueYield(int loadIndex, bool checkAlive)
140        {
141            int ownerID;
142            byte[] yield = ReadYield(loadIndex, out ownerID);
143            if (yield != null)
144            {
145                ShowTransfered(downloaded, uploaded);
146                loadqueue.Enqueue(yield);
147
148                string name = null;
149
150                if (ownerID != ourID)
151                {
152                    //Progress Yield:
153                    if (!nameCache.ContainsKey(ownerID))
154                    {
155                        byte[] n = P2PManager.Retrieve(NameIdentifier(ownerID)).Data;
156                        if (n != null)
157                            nameCache.Add(ownerID, System.Text.ASCIIEncoding.ASCII.GetString(n));
158                    }
159                    if (nameCache.ContainsKey(ownerID))
160                        name = nameCache[ownerID];
161                                       
162                    if (checkAlive && !activePeers.Contains(ownerID))
163                    {
164                        //check performance and alive informations:
165                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(ownerID)).Data;
166                        if (performancebytes != null)
167                        {
168                            double performance = BitConverter.ToDouble(performancebytes, 0);
169                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
170                            peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, ownerID, performance, peerAliveCounter));
171                        }
172                        activePeers.Add(ownerID);
173                        UpdateActivePeerInformation();
174                    }
175                }
176
177                //Set progress info:
178                SetProgressYield(loadIndex, ownerID, name);
179
180                yieldEvent.Set();
181                return true;
182            }
183            return false;
184        }
185
186        /// <summary>
187        /// Tries to read and enqueue yield on position "loadIndex".
188        /// If it fails, it stores the index in lostIndices queue.
189        /// </summary>
190        private void TryReadAndEnqueueYield(int index, bool checkAlive, Queue<KeyValuePair<int, DateTime>> lostIndices)
191        {
192            bool succ = ReadAndEnqueueYield(index, checkAlive);
193            if (!succ)               
194            {
195                var e = new KeyValuePair<int, DateTime>(index, DateTime.Now);
196                lostIndices.Enqueue(e);
197                SetProgressYield(index, -1, null);
198            }
199        }
200
201        private void LoadStoreThreadProc()
202        {
203            int loadIndex = 0;
204            downloaded = 0;
205            uploaded = 0;
206            HashSet<int> ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
207            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
208            Queue<KeyValuePair<int, DateTime>> lostIndices = new Queue<KeyValuePair<int, DateTime>>();
209            double lastPerformance = 0;
210            DateTime performanceLastPut = new DateTime();
211
212            try
213            {
214                SynchronizeHead();
215                int startHead = head;
216
217                while (!stopLoadStoreThread)
218                {
219                    //Store our performance and our alive counter in the DHT, either if the performance changed or when the last write was more than 1 minute ago:
220                    if (ourPerformance != lastPerformance || performanceLastPut.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 0))) < 0)
221                    {
222                        P2PManager.Retrieve(PerformanceIdentifier(ourID));      //just to outsmart the versioning system
223                        P2PManager.Store(PerformanceIdentifier(ourID), concat(BitConverter.GetBytes(ourPerformance), BitConverter.GetBytes(aliveCounter++)));
224                        lastPerformance = ourPerformance;
225                        performanceLastPut = DateTime.Now;
226                    }
227
228                    //updates all peer performances which have last been checked more than 2 minutes ago and check if they are still alive:
229                    while (peerPerformances.Count != 0 && peerPerformances.Peek().lastChecked.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
230                    {
231                        var e = peerPerformances.Dequeue();
232
233                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(e.peerID)).Data;
234                        if (performancebytes != null)
235                        {
236                            double performance = BitConverter.ToDouble(performancebytes, 0);
237                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
238                            if (peerAliveCounter <= e.lastAlive)
239                            {
240                                activePeers.Remove(e.peerID);
241                                UpdateActivePeerInformation();
242                            }
243                            else
244                                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, e.peerID, performance, peerAliveCounter));
245                        }
246                        else
247                        {
248                            activePeers.Remove(e.peerID);
249                        }                       
250                    }
251
252                    SynchronizeHead();
253
254                    bool busy = false;
255
256                    if (storequeue.Count != 0)  //store our packages
257                    {
258                        byte[] yield = (byte[])storequeue.Dequeue();
259                        bool success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
260                        while (!success)
261                        {
262                            SynchronizeHead();
263                            success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
264                        }
265                        SetProgressYield(head, ourID, null);
266                        ourIndices.Add(head);
267                       
268                        head++;
269
270                        //show informations about the uploaded yield:
271                        uploaded += (uint)yield.Length;
272                        ShowTransfered(downloaded, uploaded);
273                        busy = true;
274                    }
275
276                    //load the other yields:
277
278                    //skip all indices which are uploaded by us:
279                    while (ourIndices.Contains(loadIndex))
280                        loadIndex++;
281
282                    if (loadIndex < head)
283                    {
284                        bool checkAlive = loadIndex >= startHead;       //we don't check the peer alive informations of the packages that existed before we joined
285                        TryReadAndEnqueueYield(loadIndex, checkAlive, lostIndices);
286                        loadIndex++;
287                        busy = true;
288                    }
289                   
290                    //check all lost indices which are last checked longer than 2 minutes ago (but only if we have nothing else to do):
291                    if (!busy)
292                    {
293                        int count = 0;
294                        //TODO: Maybe we should throw away those indices, which have been checked more than several times.
295                        while (lostIndices.Count != 0 && lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
296                        {
297                            var e = lostIndices.Dequeue();
298                            TryReadAndEnqueueYield(loadIndex, true, lostIndices);
299                            count++;
300                        }
301
302                        if (count == 0)
303                            Thread.Sleep(5000);    //Wait 5 seconds
304                    }
305
306                }
307            }
308            catch (ThreadInterruptedException)
309            {
310                return;
311            }
312        }
313
314        private void UpdateActivePeerInformation()
315        {
316            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
317            {
318                quadraticSieveQuickWatchPresentation.amountOfPeers.Content = "" + activePeers.Count + " other peer" + (activePeers.Count!=1 ? "s" : "") + " active!";
319            }, null);
320        }
321
322        /// <summary>
323        /// Loads head from DHT. If ours is greater (or there is no entry yet), we store ours.
324        /// </summary>
325        private void SynchronizeHead()
326        {
327            byte[] h = P2PManager.Retrieve(HeadIdentifier()).Data;
328            if (h != null)
329            {
330                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
331                if (head > dhthead)
332                {
333                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
334                    if (!success)
335                        SynchronizeHead();
336                }
337                else if (head < dhthead)
338                {
339                    head = dhthead;
340                    if (head != 0)
341                        SetProgressYield(head - 1, 0, null);
342                }
343            }
344            else
345            {
346                bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
347                if (!success)
348                    SynchronizeHead();
349            }           
350        }
351
352        private void ShowTransfered(uint downloaded, uint uploaded)
353        {
354            string s1 = ((downloaded / 1024.0) / 1024).ToString();
355            string size1 = s1.Substring(0, (s1.Length < 3) ? s1.Length : 3);
356            string s2 = ((uploaded / 1024.0) / 1024).ToString();
357            string size2 = s2.Substring(0, (s2.Length < 3) ? s2.Length : 3);
358            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
359            {
360                quadraticSieveQuickWatchPresentation.relationsInfo.Content = "Downloaded " + size1 + " MB! Uploaded " + size2 + " MB!";
361            }, null);
362        }
363
364        private void SetProgressYield(int index, int id, string name)
365        {
366            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
367            {
368                quadraticSieveQuickWatchPresentation.ProgressYields.Set(index, id, name);
369            }, null);
370        }
371
372        private void ClearProgressYields()
373        {
374            quadraticSieveQuickWatchPresentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
375            {
376                quadraticSieveQuickWatchPresentation.ProgressYields.Clear();
377            }, null);
378        }
379
380        private string HeadIdentifier()
381        {
382            return channel + "#" + number + "-" + factor + "HEAD";
383        }
384
385        private string FactorListIdentifier()
386        {
387            return channel + "#" + number + "FACTORLIST";
388        }
389
390        private string YieldIdentifier(int index)
391        {
392            return channel + "#" + number + "-" + factor + "!" + index;
393        }
394
395        private string NameIdentifier(int ID)
396        {
397            return channel + "#" + number + "NAME" + ID.ToString();
398        }
399
400        private string PerformanceIdentifier(int ID)
401        {
402            return channel + "#" + number + "-" + factor + "PERFORMANCE" + ID.ToString();
403        }   
404
405        private void StartLoadStoreThread()
406        {
407            storequeue = Queue.Synchronized(new Queue());
408            loadqueue = Queue.Synchronized(new Queue());
409
410            stopLoadStoreThread = false;
411            loadStoreThread = new Thread(LoadStoreThreadProc);
412            loadStoreThread.Start();
413        }
414
415        public void StopLoadStoreThread()
416        {
417            stopLoadStoreThread = true;
418            loadStoreThread.Interrupt();
419            loadStoreThread.Join();
420            loadStoreThread = null;
421        }
422
423        /// <summary>
424        /// Concatenates the two byte arrays a1 and a2 an returns the result array.
425        /// </summary>
426        private byte[] concat(byte[] a1, byte[] a2)
427        {
428            byte[] res = new byte[a1.Length + a2.Length];
429            System.Buffer.BlockCopy(a1, 0, res, 0, a1.Length);
430            System.Buffer.BlockCopy(a2, 0, res, a1.Length, a2.Length);
431            return res;
432        }
433       
434        /// <summary>
435        /// Returns our MAC address
436        /// </summary>       
437        private string GetMac()
438        {
439            string Mac = string.Empty;
440            ManagementClass MC = new ManagementClass("Win32_NetworkAdapter");
441            ManagementObjectCollection MOCol = MC.GetInstances();
442            foreach (ManagementObject MO in MOCol)
443                if (MO != null)
444                {
445                    if (MO["MacAddress"] != null)
446                    {
447                        Mac = MO["MACAddress"].ToString();
448                        if (Mac != string.Empty)
449                            break;
450                    }
451                }
452            return Mac;
453        }
454
455        public Queue GetLoadedYieldsQueue()
456        {
457            return loadqueue;
458        }
459
460        /// <summary>
461        /// Compresses the yield and puts it in the DHT.
462        /// </summary>
463        public void Put(byte[] serializedYield)
464        {
465            //Add our ID:
466            byte[] idbytes = BitConverter.GetBytes(ourID);
467            Debug.Assert(idbytes.Length == 4);
468            serializedYield = concat(idbytes, serializedYield); ;
469
470            //Compress:
471            MemoryStream memStream = new MemoryStream();
472            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Compress);
473            defStream.Write(serializedYield, 0, serializedYield.Length);
474            defStream.Close();
475            byte[] compressedYield = memStream.ToArray();
476
477            //Debug stuff:
478            byte[] decompr = DecompressYield(compressedYield);
479            Debug.Assert(decompr.Length == serializedYield.Length);
480
481            //store in queue, so the LoadStoreThread can store it in the DHT later:
482            storequeue.Enqueue(compressedYield);           
483        }
484
485        /// <summary>
486        /// Sets the channel in which we want to sieve
487        /// </summary>
488        public void SetChannel(string channel)
489        {
490            this.channel = channel;
491        }
492
493        /// <summary>
494        /// Sets the number to sieve
495        /// </summary>
496        public void SetNumber(BigInteger number)
497        {
498            this.number = number;
499        }
500
501        /// <summary>
502        /// Sets the factor to sieve next and starts reading informations from the DHT.
503        /// </summary>
504        public void SetFactor(BigInteger factor)
505        {
506            this.factor = factor;
507            Debug.Assert(this.number % this.factor == 0);
508
509            ClearProgressYields();
510            nameCache = new Dictionary<int, string>();
511            peerPerformances = new Queue<PeerPerformanceInformations>();
512            activePeers = new HashSet<int>();
513
514            head = 0;
515            SynchronizeHead();
516           
517            //store our name:
518            P2PManager.Retrieve(NameIdentifier(ourID));     //just to outsmart the versioning system
519            P2PManager.Store(NameIdentifier(ourID),  System.Text.ASCIIEncoding.ASCII.GetBytes(ourName.ToCharArray()));
520
521            if (loadStoreThread != null)
522                throw new Exception("LoadStoreThread already started");
523            StartLoadStoreThread();
524        }
525
526        /// <summary>
527        /// Synchronizes the factorManager with the DHT.
528        /// Return false if this.factor is not a composite factor in the DHT (which means, that another peer already finished sieving this.factor).
529        /// </summary>
530        public bool SyncFactorManager(FactorManager factorManager)
531        {
532            FactorManager dhtFactorManager = null;
533            //load DHT Factor Manager:
534            byte[] dhtFactorManagerBytes = P2PManager.Retrieve(FactorListIdentifier()).Data;
535            if (dhtFactorManagerBytes != null)
536            {
537                MemoryStream memstream = new MemoryStream();
538                memstream.Write(dhtFactorManagerBytes, 0, dhtFactorManagerBytes.Length);
539                memstream.Position = 0;
540                BinaryFormatter bformatter = new BinaryFormatter();
541                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
542                try
543                {
544                    dhtFactorManager = (FactorManager)bformatter.Deserialize(memstream);
545                }
546                catch (System.Runtime.Serialization.SerializationException)
547                {
548                    P2PWarning("DHT factor list is broken!");
549                    P2PManager.Remove(FactorListIdentifier());
550                    return SyncFactorManager(factorManager);
551                }               
552            }
553
554            //Synchronize DHT Factor Manager with our Factor List
555            if (dhtFactorManager == null || factorManager.Synchronize(dhtFactorManager))
556            {
557                //Our Factor Manager has more informations, so let's store it in the DHT:
558                MemoryStream memstream = new MemoryStream();
559                BinaryFormatter bformatter = new BinaryFormatter();
560                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
561                bformatter.Serialize(memstream, factorManager);
562                bool success = P2PManager.Store(FactorListIdentifier(), memstream.ToArray()).IsSuccessful();
563                if (!success)
564                {
565                    Thread.Sleep(1000);
566                    return SyncFactorManager(factorManager);       //Just try again
567                }
568            }
569
570            return factorManager.ContainsComposite(this.factor);
571        }
572
573        /// <summary>
574        /// Sets the performance of this machine, so that this class can write it to the DHT later.
575        /// The performance is meassured in relations per ms.
576        /// </summary>
577        public void SetOurPerformance(double[] relationsPerMS)
578        {
579            double globalPerformance = 0;
580            foreach (double r in relationsPerMS)
581                globalPerformance += r;
582
583            ourPerformance = globalPerformance;
584        }
585
586        /// <summary>
587        /// Returns the performance of all peers (excluding ourselve) in relations per ms.
588        /// </summary>
589        public double GetP2PPerformance()
590        {
591            double perf = 0;
592
593            foreach (var p in peerPerformances)
594                perf += p.performance;
595
596            return perf;
597        }
598    }
599}
Note: See TracBrowser for help on using the repository browser.