source: trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs @ 1630

Last change on this file since 1630 was 1630, checked in by Sven Rech, 12 years ago

quadratic sieve changes

File size: 23.0 KB
Line 
1/*                             
2   Copyright 2010 Sven Rech, Uni Duisburg-Essen
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17using System;
18using System.Collections.Generic;
19using System.Linq;
20using System.Text;
21using System.IO.Compression;
22using System.IO;
23using Cryptool.P2P;
24using System.Numerics;
25using System.Diagnostics;
26using System.Collections;
27using System.Threading;
28using System.Windows.Threading;
29using System.Runtime.Serialization.Formatters.Binary;
30
31namespace Cryptool.Plugins.QuadraticSieve
32{
33    class PeerToPeer
34    {
35        private struct PeerPerformanceInformations
36        {
37            public DateTime lastChecked;
38            public int peerID;
39            public double performance;
40            public int lastAlive;           
41
42            public PeerPerformanceInformations(DateTime lastChecked, int peerID, double performance, int lastAlive)
43            {
44                this.lastAlive = lastAlive;
45                this.lastChecked = lastChecked;
46                this.peerID = peerID;
47                this.performance = performance;
48            }
49        }
50
51        private string channel;
52        private BigInteger number;
53        private BigInteger factor;
54        private int head;
55        private Queue storequeue;   //yields to store in the DHT
56        private Queue loadqueue;    //yields that have been loaded from the DHT
57        private Thread loadStoreThread;
58        private bool stopLoadStoreThread;
59        private QuadraticSievePresentation quadraticSieveQuickWatchPresentation;
60        private AutoResetEvent yieldEvent;
61        private int ourID;           //Our ID
62        private Dictionary<int, string> nameCache;  //associates the ids with the names
63        private Queue<PeerPerformanceInformations> peerPerformances;      //A queue of performances from the different peers ordered by the date last checked.
64        private HashSet<int> activePeers;                                 //A set of active peers
65        private double ourPerformance = 0;
66        private int aliveCounter = 0;       //This is stored together with the performance in the DHT
67        private string ourName;
68
69        public delegate void P2PWarningHandler(String warning);
70        public event P2PWarningHandler P2PWarning;       
71
72        public PeerToPeer(QuadraticSievePresentation presentation, AutoResetEvent yieldEvent)
73        {
74            quadraticSieveQuickWatchPresentation = presentation;
75            this.yieldEvent = yieldEvent;
76            SetOurID();
77        }
78
79        private void SetOurID()
80        {
81            Random random = new Random();
82            ourID = random.Next(1, Int32.MaxValue);        //TODO: Maybe we should calculate an id based on mac address and username?
83            quadraticSieveQuickWatchPresentation.ProgressYields.setOurID(ourID);
84            ourName = System.Net.Dns.GetHostName();
85        }
86
87        /// <summary>
88        /// Reads yield at position "index" from DHT and returns the ownerID and the decompressed packet itself.
89        /// </summary>
90        private byte[] ReadYield(int index, out int ownerID)
91        {
92            ownerID = 0;
93            byte[] yield = P2PManager.Retrieve(YieldIdentifier(index));
94            if (yield == null)
95                return null;
96
97            byte[] decompressedYield = DecompressYield(yield);
98
99            byte[] idbytes = new byte[4];
100            Array.Copy(decompressedYield, idbytes, 4);
101            ownerID = BitConverter.ToInt32(idbytes, 0);
102            byte[] y = new byte[decompressedYield.Length - 4];
103            Array.Copy(decompressedYield, 4, y, 0, y.Length);
104
105            return y;
106        }
107
108        private static byte[] DecompressYield(byte[] yield)
109        {
110            MemoryStream memStream = new MemoryStream();
111            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Decompress);
112            memStream.Write(yield, 0, yield.Length);
113            memStream.Position = 0;
114            MemoryStream memStream2 = new MemoryStream();
115            defStream.CopyTo(memStream2);
116            defStream.Close();
117            byte[] decompressedYield = memStream2.ToArray();
118            return decompressedYield;
119        }
120
121        private uint ReadAndEnqueueYield(int loadIndex, uint downloaded, uint uploaded)
122        {
123            int ownerID;
124            byte[] yield = ReadYield(loadIndex, out ownerID);
125            if (yield != null)
126            {
127                downloaded += (uint)yield.Length;
128                ShowTransfered(downloaded, uploaded);
129                loadqueue.Enqueue(yield);
130
131                //Progress Yield:
132                if (!nameCache.ContainsKey(ownerID))
133                {
134                    byte[] n = P2PManager.Retrieve(NameIdentifier(ownerID));
135                    if (n != null)
136                        nameCache.Add(ownerID, System.Text.ASCIIEncoding.ASCII.GetString(n));
137                }
138                string name = null;
139                if (nameCache.ContainsKey(ownerID))
140                    name = nameCache[ownerID];
141                SetProgressYield(loadIndex, ownerID, name);
142
143                //Performance and alive informations:
144                if (!activePeers.Contains(ownerID))
145                {
146                    UpdatePeerPerformanceAndAliveInformation(ownerID);
147                    activePeers.Add(ownerID);
148                    UpdateActivePeerInformation();
149                }
150
151                yieldEvent.Set();
152                return downloaded;
153            }
154            return 0;
155        }
156
157        /// <summary>
158        /// Tries to read and enqueue yield on position "loadIndex".
159        /// If it fails, it stores the index in lostIndices queue.
160        /// </summary>
161        private uint TryReadAndEnqueueYield(int index, uint downloaded, uint uploaded, Queue<KeyValuePair<int, DateTime>> lostIndices)
162        {
163            uint res = ReadAndEnqueueYield(index, downloaded, uploaded);
164            if (res != 0)
165                downloaded = res;
166            else
167            {
168                var e = new KeyValuePair<int, DateTime>(index, DateTime.Now);
169                lostIndices.Enqueue(e);
170                SetProgressYield(index, -1, null);
171            }
172
173            return downloaded;
174        }
175
176        private void LoadStoreThreadProc()
177        {
178            int loadIndex = 0;
179            uint downloaded = 0;
180            uint uploaded = 0;
181            HashSet<int> ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
182            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
183            Queue<KeyValuePair<int, DateTime>> lostIndices = new Queue<KeyValuePair<int, DateTime>>();
184            double lastPerformance = 0;
185            DateTime performanceLastPut = new DateTime();
186
187            try
188            {
189                while (!stopLoadStoreThread)
190                {
191                    //Store our performance and our alive counter in the DHT, either if the performance changed or when the last write was more than 1 minute ago:
192                    if (ourPerformance != lastPerformance || performanceLastPut.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 1, 0))) < 0)
193                    {
194                        P2PManager.Retrieve(PerformanceIdentifier(ourID));      //just to outsmart the versioning system
195                        P2PManager.Store(PerformanceIdentifier(ourID), concat(BitConverter.GetBytes(ourPerformance), BitConverter.GetBytes(aliveCounter++)));
196                        lastPerformance = ourPerformance;
197                        performanceLastPut = DateTime.Now;
198                    }
199
200                    //updates all peer performances which have last been checked more than 2 minutes ago and check if they are still alive:
201                    while (peerPerformances.Count != 0 && peerPerformances.Peek().lastChecked.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
202                    {
203                        var e = peerPerformances.Dequeue();
204
205                        byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(e.peerID));
206                        if (performancebytes != null)
207                        {
208                            double performance = BitConverter.ToDouble(performancebytes, 0);
209                            int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
210                            if (peerAliveCounter <= e.lastAlive)
211                            {
212                                activePeers.Remove(e.peerID);
213                                UpdateActivePeerInformation();
214                            }
215                            else
216                                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, e.peerID, performance, peerAliveCounter));
217                        }
218                        else
219                        {
220                            activePeers.Remove(e.peerID);
221                        }                       
222                    }
223
224                    SynchronizeHead();
225
226                    bool busy = false;
227
228                    if (storequeue.Count != 0)  //store our packages
229                    {
230                        byte[] yield = (byte[])storequeue.Dequeue();
231                        bool success = P2PManager.Store(YieldIdentifier(head), yield);
232                        while (!success)
233                        {
234                            SynchronizeHead();
235                            success = P2PManager.Store(YieldIdentifier(head), yield);
236                        }
237                        SetProgressYield(head, ourID, null);
238                        ourIndices.Add(head);
239                       
240                        head++;
241
242                        //show informations about the uploaded yield:
243                        uploaded += (uint)yield.Length;
244                        ShowTransfered(downloaded, uploaded);
245                        busy = true;
246                    }
247
248                    //load the other yields:
249
250                    //skip all indices which are uploaded by us:
251                    while (ourIndices.Contains(loadIndex))
252                        loadIndex++;
253
254                    if (loadIndex < head)
255                    {
256                        downloaded = TryReadAndEnqueueYield(loadIndex, downloaded, uploaded, lostIndices);
257                        loadIndex++;
258                        busy = true;
259                    }
260                   
261                    //check all lost indices which are last checked longer than 2 minutes ago (but only if we have nothing else to do):
262                    if (!busy)
263                    {
264                        int count = 0;                       
265                        //TODO: Maybe we should throw away those indices, which have been checked more than several times.
266                        while (lostIndices.Count != 0 && lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0, 2, 0))) < 0)
267                        {
268                            var e = lostIndices.Dequeue();
269                            downloaded = TryReadAndEnqueueYield(loadIndex, downloaded, uploaded, lostIndices);
270                            count++;
271                        }
272
273                        if (count == 0)
274                            Thread.Sleep(5000);    //Wait 5 seconds
275                    }
276
277                }
278            }
279            catch (ThreadInterruptedException)
280            {
281                return;
282            }
283        }
284
285        private void UpdateActivePeerInformation()
286        {
287            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
288            {
289                quadraticSieveQuickWatchPresentation.amountOfPeers.Content = "" + activePeers.Count + " peers active!";
290            }, null);
291        }
292
293        private void UpdatePeerPerformanceAndAliveInformation(int peerID)
294        {
295            byte[] performancebytes = P2PManager.Retrieve(PerformanceIdentifier(peerID));
296            if (performancebytes != null)
297            {
298                double performance = BitConverter.ToDouble(performancebytes, 0);
299                int peerAliveCounter = BitConverter.ToInt32(performancebytes, 8);
300                peerPerformances.Enqueue(new PeerPerformanceInformations(DateTime.Now, peerID, performance, peerAliveCounter));
301            }
302        }
303
304        /// <summary>
305        /// Loads head from DHT. If ours is greater (or there is no entry yet), we store ours.
306        /// </summary>
307        private void SynchronizeHead()
308        {
309            byte[] h = P2PManager.Retrieve(HeadIdentifier());
310            if (h != null)
311            {
312                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
313                if (head > dhthead)
314                {
315                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));
316                    if (!success)
317                        SynchronizeHead();
318                }
319                else if (head < dhthead)
320                {
321                    head = dhthead;
322                    SetProgressYield(head - 1, 0, null);
323                }
324            }
325            else
326            {
327                bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));
328                if (!success)
329                    SynchronizeHead();
330            }
331        }
332
333        private void ShowTransfered(uint downloaded, uint uploaded)
334        {
335            string s1 = ((downloaded / 1024.0) / 1024).ToString();
336            string size1 = s1.Substring(0, (s1.Length < 3) ? s1.Length : 3);
337            string s2 = ((uploaded / 1024.0) / 1024).ToString();
338            string size2 = s2.Substring(0, (s2.Length < 3) ? s2.Length : 3);
339            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
340            {
341                quadraticSieveQuickWatchPresentation.relationsInfo.Content = "Downloaded " + size1 + " MB! Uploaded " + size2 + " MB!";
342            }, null);
343        }
344
345        private void SetProgressYield(int index, int id, string name)
346        {
347            quadraticSieveQuickWatchPresentation.Dispatcher.BeginInvoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
348            {
349                quadraticSieveQuickWatchPresentation.ProgressYields.Set(index, id, name);
350            }, null);
351        }
352
353        private void ClearProgressYields()
354        {
355            quadraticSieveQuickWatchPresentation.Dispatcher.Invoke(DispatcherPriority.Normal, (SendOrPostCallback)delegate
356            {
357                quadraticSieveQuickWatchPresentation.ProgressYields.Clear();
358            }, null);
359        }
360
361        private string HeadIdentifier()
362        {
363            return channel + "#" + number + "-" + factor + "HEAD";
364        }
365
366        private string FactorListIdentifier()
367        {
368            return channel + "#" + number + "FACTORLIST";
369        }
370
371        private string YieldIdentifier(int index)
372        {
373            return channel + "#" + number + "-" + factor + "!" + index;
374        }
375
376        private string NameIdentifier(int ID)
377        {
378            return channel + "#" + number + "NAME" + ID.ToString();
379        }
380
381        private string PerformanceIdentifier(int ID)
382        {
383            return channel + "#" + number + "-" + factor + "PERFORMANCE" + ID.ToString();
384        }   
385
386        private void StartLoadStoreThread()
387        {
388            storequeue = Queue.Synchronized(new Queue());
389            loadqueue = Queue.Synchronized(new Queue());
390
391            stopLoadStoreThread = false;
392            loadStoreThread = new Thread(LoadStoreThreadProc);
393            loadStoreThread.Start();
394        }
395
396        /// <summary>
397        /// Concatenates the two byte arrays a1 and a2 an returns the result array.
398        /// </summary>
399        private byte[] concat(byte[] a1, byte[] a2)
400        {
401            byte[] res = new byte[a1.Length + a2.Length];
402            System.Buffer.BlockCopy(a1, 0, res, 0, a1.Length);
403            System.Buffer.BlockCopy(a2, 0, res, a1.Length, a2.Length);
404            return res;
405        }
406
407        public void StopLoadStoreThread()
408        {
409            stopLoadStoreThread = true;
410            loadStoreThread.Interrupt();
411            loadStoreThread.Join();
412            loadStoreThread = null;
413        }
414
415        public Queue GetLoadedYieldsQueue()
416        {
417            return loadqueue;
418        }
419
420        /// <summary>
421        /// Compresses the yield and puts it in the DHT.
422        /// </summary>
423        public void Put(byte[] serializedYield)
424        {
425            //Add our ID:
426            byte[] idbytes = BitConverter.GetBytes(ourID);
427            Debug.Assert(idbytes.Length == 4);
428            serializedYield = concat(idbytes, serializedYield); ;
429
430            //Compress:
431            MemoryStream memStream = new MemoryStream();
432            DeflateStream defStream = new DeflateStream(memStream, CompressionMode.Compress);
433            defStream.Write(serializedYield, 0, serializedYield.Length);
434            defStream.Close();
435            byte[] compressedYield = memStream.ToArray();
436
437            //Debug stuff:
438            byte[] decompr = DecompressYield(compressedYield);
439            Debug.Assert(decompr.Length == serializedYield.Length);
440
441            //store in queue, so the LoadStoreThread can store it in the DHT later:
442            storequeue.Enqueue(compressedYield);           
443        }
444
445        /// <summary>
446        /// Sets the channel in which we want to sieve
447        /// </summary>
448        public void SetChannel(string channel)
449        {
450            this.channel = channel;
451        }
452
453        /// <summary>
454        /// Sets the number to sieve
455        /// </summary>
456        public void SetNumber(BigInteger number)
457        {
458            this.number = number;
459        }
460
461        /// <summary>
462        /// Sets the factor to sieve next and starts reading informations from the DHT.
463        /// </summary>
464        public void SetFactor(BigInteger factor)
465        {
466            this.factor = factor;
467            Debug.Assert(this.number % this.factor == 0);
468
469            ClearProgressYields();
470            nameCache = new Dictionary<int, string>();
471            peerPerformances = new Queue<PeerPerformanceInformations>();
472            activePeers = new HashSet<int>();
473
474            //load head:
475            byte[] h = P2PManager.Retrieve(HeadIdentifier());
476            if (h != null)
477            {               
478                head = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
479                SetProgressYield(head-1, 0, null);
480            }
481            else
482                head = 0;
483
484            //SetOurID();
485
486            //store our name:
487            P2PManager.Retrieve(NameIdentifier(ourID));     //just to outsmart the versioning system
488            P2PManager.Store(NameIdentifier(ourID),  System.Text.ASCIIEncoding.ASCII.GetBytes(ourName.ToCharArray()));
489
490            if (loadStoreThread != null)
491                throw new Exception("LoadStoreThread already started");
492            StartLoadStoreThread();
493        }
494
495        /// <summary>
496        /// Synchronizes the factorManager with the DHT.
497        /// Return false if this.factor is not a composite factor in the DHT (which means, that another peer already finished sieving this.factor).
498        /// </summary>
499        public bool SyncFactorManager(FactorManager factorManager)
500        {
501            FactorManager dhtFactorManager = null;
502            //load DHT Factor Manager:
503            byte[] dhtFactorManagerBytes = P2PManager.Retrieve(FactorListIdentifier());
504            if (dhtFactorManagerBytes != null)
505            {
506                MemoryStream memstream = new MemoryStream();
507                memstream.Write(dhtFactorManagerBytes, 0, dhtFactorManagerBytes.Length);
508                memstream.Position = 0;
509                BinaryFormatter bformatter = new BinaryFormatter();
510                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
511                try
512                {
513                    dhtFactorManager = (FactorManager)bformatter.Deserialize(memstream);
514                }
515                catch (System.Runtime.Serialization.SerializationException)
516                {
517                    P2PWarning("DHT factor list is broken!");
518                    P2PManager.Remove(FactorListIdentifier());
519                    return SyncFactorManager(factorManager);
520                }               
521            }
522
523            //Synchronize DHT Factor Manager with our Factor List
524            if (dhtFactorManager == null || factorManager.Synchronize(dhtFactorManager))
525            {
526                //Our Factor Manager has more informations, so let's store it in the DHT:
527                MemoryStream memstream = new MemoryStream();
528                BinaryFormatter bformatter = new BinaryFormatter();
529                bformatter.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
530                bformatter.Serialize(memstream, factorManager);
531                bool success = P2PManager.Store(FactorListIdentifier(), memstream.ToArray());
532                if (!success)
533                {
534                    Thread.Sleep(1000);
535                    return SyncFactorManager(factorManager);       //Just try again
536                }
537            }
538
539            return factorManager.ContainsComposite(this.factor);
540        }
541
542        /// <summary>
543        /// Sets the performance of this machine, so that this class can write it to the DHT later.
544        /// The performance is meassured in relations per ms.
545        /// </summary>
546        public void SetOurPerformance(double[] relationsPerMS)
547        {
548            double globalPerformance = 0;
549            foreach (double r in relationsPerMS)
550                globalPerformance += r;
551
552            ourPerformance = globalPerformance;
553        }
554
555        /// <summary>
556        /// Returns the performance of all peers (excluding ourselve) in relations per ms.
557        /// </summary>
558        public double GetP2PPerformance()
559        {
560            double perf = 0;
561
562            foreach (var p in peerPerformances)
563                perf += p.performance;
564
565            return perf;
566        }
567    }
568}
Note: See TracBrowser for help on using the repository browser.