source: trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs @ 1691

Last change on this file since 1691 was 1691, checked in by Sven Rech, 12 years ago

quadratic sieve queue informations

File size: 25.3 KB
Line 
1/*                             
2   Copyright 2010 Sven Rech, Uni Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21using System.IO.Compression;
22using System.IO;
23using Cryptool.P2P;
24using System.Numerics;
25using System.Diagnostics;
26using System.Collections;
27using System.Threading;
28using System.Windows.Threading;
29using System.Runtime.Serialization.Formatters.Binary;
30using System.Management;
31using System.Security.Principal;
32using System.Security.Cryptography;
33
34namespace Cryptool.Plugins.QuadraticSieve
35{
36    class PeerToPeer
37    {
38        /// <summary>
39        /// This structure is used to hold all important peer performance and alive informations in a queue
40        /// </summary>
41        private struct PeerPerformanceInformations
42        {
43            public DateTime lastChecked;
44            public int peerID;
45            public double performance;
46            public int lastAlive;           
47
48            public PeerPerformanceInformations(DateTime lastChecked, int peerID, double performance, int lastAlive)
49            {
50                this.lastAlive = lastAlive;
51                this.lastChecked = lastChecked;
52                this.peerID = peerID;
53                this.performance = performance;
54            }
55        }
56
57        private string channel;
58        private BigInteger number;
59        private BigInteger factor;
60        private int head;
61        private Queue storequeue;   //yields to store in the DHT
62        private Queue loadqueue;    //yields that have been loaded from the DHT
63        private Thread loadStoreThread;
64        private bool stopLoadStoreThread;
65        private QuadraticSievePresentation quadraticSieveQuickWatchPresentation;
66        private AutoResetEvent yieldEvent;
67        private int ourID;           //Our ID
68        private Dictionary<int, string> nameCache;  //associates the ids with the names
69        private Queue<PeerPerformanceInformations> peerPerformances;      //A queue of performances from the different peers ordered by the date last checked.
70        private HashSet<int> activePeers;                                 //A set of active peers
71        private double ourPerformance = 0;
72        private int aliveCounter = 0;       //This is stored together with the performance in the DHT
73        private string ourName;
74        private uint downloaded = 0;
75        private uint uploaded = 0;
76        private HashSet<int> ourIndices;
77        private Queue<KeyValuePair<int, DateTime>> lostIndices;
78        private int loadIndex;
79
80        public delegate void P2PWarningHandler(String warning);
81        public event P2PWarningHandler P2PWarning;       
82
83        public PeerToPeer(QuadraticSievePresentation presentation, AutoResetEvent yieldEvent)
84        {
85            quadraticSieveQuickWatchPresentation = presentation;
86            this.yieldEvent = yieldEvent;
87            SetOurID();
88        }
89
90        private void SetOurID()
91        {
92            string username = WindowsIdentity.GetCurrent().Name;
93            string mac = GetMacIdentifier();
94
95            MD5 md5 = new MD5CryptoServiceProvider();
96            byte[] idBytes = md5.ComputeHash(System.Text.ASCIIEncoding.ASCII.GetBytes(username + mac));
97
98            ourID = BitConverter.ToInt32(idBytes, 3);
99            quadraticSieveQuickWatchPresentation.ProgressYields.setOurID(ourID);
100
101            ourName = System.Net.Dns.GetHostName();
102        }
103
104        /// <summary>
105        /// Reads yield at position "index" from DHT and returns the ownerID and the decompressed packet itself.
106        /// </summary>
107        private byte[] ReadYield(int index, out int ownerID)
108        {
109            ownerID = 0;
110            byte[] yield = P2PManager.Retrieve(YieldIdentifier(index)).Data;
111            if (yield == null)
112                return null;
113            downloaded += (uint)yield.Length;
114
115            byte[] decompressedYield = DecompressYield(yield);
116
117            byte[] idbytes = new byte[4];
118            Array.Copy(decompressedYield, idbytes, 4);
119            ownerID = BitConverter.ToInt32(idbytes, 0);
120            byte[] y = new byte[decompressedYield.Length - 4];
121            Array.Copy(decompressedYield, 4, y, 0, y.Length);
122           
123            return y;
124        }
125
126        private static byte[] DecompressYield(byte[] yield)
127        {
128            MemoryStream memStream = new MemoryStream();
129            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Decompress);
130            memStream.Write(yield, 0, yield.Length);
131            memStream.Position = 0;
132            MemoryStream memStream2 = new MemoryStream();
133            defStream.CopyTo(memStream2);
134            defStream.Close();
135            byte[] decompressedYield = memStream2.ToArray();
136            return decompressedYield;
137        }
138
139        private bool ReadAndEnqueueYield(int loadIndex, bool checkAlive)
140        {
141            int ownerID;
142            byte[] yield = ReadYield(loadIndex, out ownerID);
143            if (yield != null)
144            {
145                ShowTransfered(downloaded, uploaded);
146                loadqueue.Enqueue(yield);
147
148                string name = null;
149
150                if (ownerID != ourID)
151                {
152                    //Progress Yield:
153                    if (!nameCache.ContainsKey(ownerID))
154                    {
155                        byte[] n = P2PManager.Retrieve(NameIdentifier(ownerID)).Data;
156                        if (n != null)
157                            nameCache.Add(ownerID, System.Text.ASCIIEncoding.ASCII.GetString(n));
158                    }
159                    if (nameCache.ContainsKey(ownerID))
160                        name = nameCache[ownerID];
161                                       
162                    if (checkAlive && !activePeers.Contains(ownerID))
163                    {
164                        //check performance and alive informations:
165                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(ownerID)).Data;
166                        if (performancebytes != null)
167                        {
168                            double performance = BitConverter.ToDouble(performancebytes, 0);
169                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
170                            peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, ownerID, performance, peerAliveCounter));
171                        }
172                        activePeers.Add(ownerID);
173                        UpdateActivePeerInformation();
174                    }
175                }
176
177                //Set progress info:
178                SetProgressYield(loadIndex, ownerID, name);
179
180                yieldEvent.Set();
181                return true;
182            }
183            return false;
184        }
185
186        /// <summary>
187        /// Tries to read and enqueue yield on position "loadIndex".
188        /// If it fails, it stores the index in lostIndices queue.
189        /// </summary>
190        private void TryReadAndEnqueueYield(int index, bool checkAlive, Queue<KeyValuePair<int, DateTime>> lostIndices)
191        {
192            bool succ = ReadAndEnqueueYield(index, checkAlive);
193            if (!succ)               
194            {
195                var e = new KeyValuePair<int, DateTime>(index, DateTime.Now);
196                lostIndices.Enqueue(e);
197                SetProgressYield(index, -1, null);
198            }
199        }
200
201        private void LoadStoreThreadProc()
202        {
203            loadIndex = 0;
204            downloaded = 0;
205            uploaded = 0;
206            ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
207            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
208            lostIndices = new Queue<KeyValuePair<int, DateTime>>();
209            double lastPerformance = 0;
210            DateTime performanceLastPut = new DateTime();
211
212            try
213            {
214                SynchronizeHead();
215                int startHead = head;
216
217                while (!stopLoadStoreThread)
218                {
219                    //Store our performance and our alive counter in the DHT, either if the performance changed or when the last write was more than 1 minute ago:
220                    if (ourPerformance != lastPerformance || performanceLastPut.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 0))) < 0)
221                    {
222                        P2PManager.Retrieve(PerformanceIdentifier(ourID));      //just to outsmart the versioning system
223                        P2PManager.Store(PerformanceIdentifier(ourID), concat(BitConverter.GetBytes(ourPerformance), BitConverter.GetBytes(aliveCounter++)));
224                        lastPerformance = ourPerformance;
225                        performanceLastPut = DateTime.Now;
226                    }
227
228                    //updates all peer performances which have last been checked more than 1 minutes and 20 seconds ago and check if they are still alive:
229                    while (peerPerformances.Count != 0 && peerPerformances.Peek().lastChecked.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 20))) < 0)
230                    {
231                        var e = peerPerformances.Dequeue();
232
233                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(e.peerID)).Data;
234                        if (performancebytes != null)
235                        {
236                            double performance = BitConverter.ToDouble(performancebytes, 0);
237                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
238                            if (peerAliveCounter <= e.lastAlive)
239                            {
240                                activePeers.Remove(e.peerID);
241                                UpdateActivePeerInformation();
242                            }
243                            else
244                                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, e.peerID, performance, peerAliveCounter));
245                        }
246                        else
247                        {
248                            activePeers.Remove(e.peerID);
249                            UpdateActivePeerInformation();
250                        }                       
251                    }
252                                       
253                    SynchronizeHead();
254                    UpdateStoreLoadQueueInformation();
255
256                    bool busy = false;
257
258                    if (storequeue.Count != 0)  //store our packages
259                    {
260                        byte[] yield = (byte[])storequeue.Dequeue();
261                        bool success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
262                        while (!success)
263                        {
264                            SynchronizeHead();
265                            success = P2PManager.Store(YieldIdentifier(head), yield).IsSuccessful();
266                        }
267                        SetProgressYield(head, ourID, null);
268                        ourIndices.Add(head);
269                       
270                        head++;
271
272                        //show informations about the uploaded yield:
273                        uploaded += (uint)yield.Length;
274                        ShowTransfered(downloaded, uploaded);
275                        UpdateStoreLoadQueueInformation();
276                        busy = true;
277                    }
278
279                    //load the other yields:
280
281                    //skip all indices which are uploaded by us:
282                    while (ourIndices.Contains(loadIndex))
283                        loadIndex++;
284
285                    if (loadIndex < head)
286                    {
287                        bool checkAlive = loadIndex >= startHead;       //we don't check the peer alive informations of the packages that existed before we joined
288                        TryReadAndEnqueueYield(loadIndex, checkAlive, lostIndices);
289                        loadIndex++;
290                        UpdateStoreLoadQueueInformation();
291                        busy = true;
292                    }
293                   
294                    //check lost indices which are last checked longer than 2 minutes ago (but only if we have nothing else to do):
295                    if (!busy)
296                    {
297                        //TODO: Maybe we should throw away those indices, which have been checked more than several times.
298                        if (lostIndices.Count != 0 && lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
299                        {
300                            var e = lostIndices.Dequeue();
301                            TryReadAndEnqueueYield(loadIndex, true, lostIndices);
302                            UpdateStoreLoadQueueInformation();                           
303                        }
304                        else
305                            Thread.Sleep(5000);    //Wait 5 seconds
306                    }
307
308                }
309            }
310            catch (ThreadInterruptedException)
311            {
312                return;
313            }
314        }
315
316        private void UpdateActivePeerInformation()
317        {
318            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
319            {
320                quadraticSieveQuickWatchPresentation.amountOfPeers.Content = "" + activePeers.Count + " other peer" + (activePeers.Count!=1 ? "s" : "") + " active!";
321            }, null);
322        }
323
324        private void UpdateStoreLoadQueueInformation()
325        {
326            if (storequeue != null && ourIndices != null && lostIndices != null)
327            {
328                quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
329                {
330                    int upload = storequeue.Count;
331                    int download = head - loadIndex - ourIndices.Count(x => x > loadIndex);
332                    int lost = this.lostIndices.Count;
333                    quadraticSieveQuickWatchPresentation.queueInformation.Content = "Queue: Upload " + upload + "! Download " + download + "! Lost " + lost + "!";
334                }, null);
335            }
336        }
337
338        /// <summary>
339        /// Loads head from DHT. If ours is greater (or there is no entry yet), we store ours.
340        /// </summary>
341        private void SynchronizeHead()
342        {
343            byte[] h = P2PManager.Retrieve(HeadIdentifier()).Data;
344            if (h != null)
345            {
346                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
347                if (head > dhthead)
348                {
349                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
350                    if (!success)
351                        SynchronizeHead();
352                }
353                else if (head < dhthead)
354                {
355                    head = dhthead;
356                    if (head != 0)
357                        SetProgressYield(head - 1, 0, null);
358                }
359            }
360            else
361            {
362                bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString())).IsSuccessful();
363                if (!success)
364                    SynchronizeHead();
365            }           
366        }
367
368        private void ShowTransfered(uint downloaded, uint uploaded)
369        {
370            string s1 = ((downloaded / 1024.0) / 1024).ToString();
371            string size1 = s1.Substring(0, (s1.Length < 3) ? s1.Length : 3);
372            string s2 = ((uploaded / 1024.0) / 1024).ToString();
373            string size2 = s2.Substring(0, (s2.Length < 3) ? s2.Length : 3);
374            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
375            {
376                quadraticSieveQuickWatchPresentation.relationsInfo.Content = "Downloaded " + size1 + " MB! Uploaded " + size2 + " MB!";
377            }, null);
378        }
379
380        private void SetProgressYield(int index, int id, string name)
381        {
382            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
383            {
384                quadraticSieveQuickWatchPresentation.ProgressYields.Set(index, id, name);
385            }, null);
386        }
387
388        private void ClearProgressYields()
389        {
390            quadraticSieveQuickWatchPresentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
391            {
392                quadraticSieveQuickWatchPresentation.ProgressYields.Clear();
393            }, null);
394        }
395
396        private string HeadIdentifier()
397        {
398            return channel + "#" + number + "-" + factor + "HEAD";
399        }
400
401        private string FactorListIdentifier()
402        {
403            return channel + "#" + number + "FACTORLIST";
404        }
405
406        private string YieldIdentifier(int index)
407        {
408            return channel + "#" + number + "-" + factor + "!" + index;
409        }
410
411        private string NameIdentifier(int ID)
412        {
413            return channel + "#" + number + "NAME" + ID.ToString();
414        }
415
416        private string PerformanceIdentifier(int ID)
417        {
418            return channel + "#" + number + "-" + factor + "PERFORMANCE" + ID.ToString();
419        }   
420
421        private void StartLoadStoreThread()
422        {
423            storequeue = Queue.Synchronized(new Queue());
424            loadqueue = Queue.Synchronized(new Queue());
425
426            stopLoadStoreThread = false;
427            loadStoreThread = new Thread(LoadStoreThreadProc);
428            loadStoreThread.Start();
429        }
430
431        public void StopLoadStoreThread()
432        {
433            stopLoadStoreThread = true;
434            loadStoreThread.Interrupt();
435            loadStoreThread.Join();
436            loadStoreThread = null;
437        }
438
439        /// <summary>
440        /// Concatenates the two byte arrays a1 and a2 an returns the result array.
441        /// </summary>
442        private byte[] concat(byte[] a1, byte[] a2)
443        {
444            byte[] res = new byte[a1.Length + a2.Length];
445            System.Buffer.BlockCopy(a1, 0, res, 0, a1.Length);
446            System.Buffer.BlockCopy(a2, 0, res, a1.Length, a2.Length);
447            return res;
448        }
449       
450        /// <summary>
451        /// Returns an identifier that depends on the MAC addresses of this system
452        /// </summary>       
453        private string GetMacIdentifier()
454        {
455            string MacID = "";
456            ManagementClass MC = new ManagementClass("Win32_NetworkAdapter");
457            ManagementObjectCollection MOCol = MC.GetInstances();
458            foreach (ManagementObject MO in MOCol)
459                if (MO != null)
460                    if (MO["MacAddress"] != null)
461                        MacID += MO["MACAddress"].ToString();
462            return MacID;
463        }
464
465        public Queue GetLoadedYieldsQueue()
466        {
467            return loadqueue;
468        }
469
470        /// <summary>
471        /// Compresses the yield and puts it in the DHT.
472        /// </summary>
473        public void Put(byte[] serializedYield)
474        {
475            //Add our ID:
476            byte[] idbytes = BitConverter.GetBytes(ourID);
477            Debug.Assert(idbytes.Length == 4);
478            serializedYield = concat(idbytes, serializedYield); ;
479
480            //Compress:
481            MemoryStream memStream = new MemoryStream();
482            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Compress);
483            defStream.Write(serializedYield, 0, serializedYield.Length);
484            defStream.Close();
485            byte[] compressedYield = memStream.ToArray();
486
487            //Debug stuff:
488            byte[] decompr = DecompressYield(compressedYield);
489            Debug.Assert(decompr.Length == serializedYield.Length);
490
491            //store in queue, so the LoadStoreThread can store it in the DHT later:
492            storequeue.Enqueue(compressedYield);
493
494            UpdateStoreLoadQueueInformation();
495        }
496
497        /// <summary>
498        /// Sets the channel in which we want to sieve
499        /// </summary>
500        public void SetChannel(string channel)
501        {
502            this.channel = channel;
503        }
504
505        /// <summary>
506        /// Sets the number to sieve
507        /// </summary>
508        public void SetNumber(BigInteger number)
509        {
510            this.number = number;
511        }
512
513        /// <summary>
514        /// Sets the factor to sieve next and starts reading informations from the DHT.
515        /// </summary>
516        public void SetFactor(BigInteger factor)
517        {
518            this.factor = factor;
519            Debug.Assert(this.number % this.factor == 0);
520
521            ClearProgressYields();
522            nameCache = new Dictionary<int, string>();
523            peerPerformances = new Queue<PeerPerformanceInformations>();
524            activePeers = new HashSet<int>();
525
526            head = 0;
527            SynchronizeHead();
528           
529            //store our name:
530            P2PManager.Retrieve(NameIdentifier(ourID));     //just to outsmart the versioning system
531            P2PManager.Store(NameIdentifier(ourID),  System.Text.ASCIIEncoding.ASCII.GetBytes(ourName.ToCharArray()));
532
533            if (loadStoreThread != null)
534                throw new Exception("LoadStoreThread already started");
535            StartLoadStoreThread();
536        }
537
538        /// <summary>
539        /// Synchronizes the factorManager with the DHT.
540        /// Return false if this.factor is not a composite factor in the DHT (which means, that another peer already finished sieving this.factor).
541        /// </summary>
542        public bool SyncFactorManager(FactorManager factorManager)
543        {
544            FactorManager dhtFactorManager = null;
545            //load DHT Factor Manager:
546            byte[] dhtFactorManagerBytes = P2PManager.Retrieve(FactorListIdentifier()).Data;
547            if (dhtFactorManagerBytes != null)
548            {
549                MemoryStream memstream = new MemoryStream();
550                memstream.Write(dhtFactorManagerBytes, 0, dhtFactorManagerBytes.Length);
551                memstream.Position = 0;
552                BinaryFormatter bformatter = new BinaryFormatter();
553                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
554                try
555                {
556                    dhtFactorManager = (FactorManager)bformatter.Deserialize(memstream);
557                }
558                catch (System.Runtime.Serialization.SerializationException)
559                {
560                    P2PWarning("DHT factor list is broken!");
561                    P2PManager.Remove(FactorListIdentifier());
562                    return SyncFactorManager(factorManager);
563                }               
564            }
565
566            //Synchronize DHT Factor Manager with our Factor List
567            if (dhtFactorManager == null || factorManager.Synchronize(dhtFactorManager))
568            {
569                //Our Factor Manager has more informations, so let's store it in the DHT:
570                MemoryStream memstream = new MemoryStream();
571                BinaryFormatter bformatter = new BinaryFormatter();
572                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
573                bformatter.Serialize(memstream, factorManager);
574                bool success = P2PManager.Store(FactorListIdentifier(), memstream.ToArray()).IsSuccessful();
575                if (!success)
576                {
577                    Thread.Sleep(1000);
578                    return SyncFactorManager(factorManager);       //Just try again
579                }
580            }
581
582            return factorManager.ContainsComposite(this.factor);
583        }
584
585        /// <summary>
586        /// Sets the performance of this machine, so that this class can write it to the DHT later.
587        /// The performance is meassured in relations per ms.
588        /// </summary>
589        public void SetOurPerformance(double[] relationsPerMS)
590        {
591            double globalPerformance = 0;
592            foreach (double r in relationsPerMS)
593                globalPerformance += r;
594
595            ourPerformance = globalPerformance;
596        }
597
598        /// <summary>
599        /// Returns the performance of all peers (excluding ourselve) in relations per ms.
600        /// </summary>
601        public double GetP2PPerformance()
602        {
603            double perf = 0;
604
605            foreach (var p in peerPerformances)
606                perf += p.performance;
607
608            return perf;
609        }
610    }
611}
Note: See TracBrowser for help on using the repository browser.