Changeset 1615


Ignore:
Timestamp:
Jun 9, 2010, 10:28:25 PM (12 years ago)
Author:
Sven Rech
Message:

better p2p loading and storing mechanism for quadratic sieve

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPlugins/QuadraticSieve/PeerToPeer.cs

    r1614 r1615  
    5959                return null;
    6060
    61             byte[] decompressedYield = DecompressYield(yield);
    62            
     61            byte[] decompressedYield = DecompressYield(yield);           
    6362            return decompressedYield;
    6463        }
     
    7978        private void LoadStoreThreadProc()
    8079        {
    81             int loadEnd = head; //we know, that the DHT entries from 1 to "loadEnd" are unknown to us, so we will load these when we have no other things to do
    8280            int loadIndex = 0;
    8381            uint downloaded = 0;
    8482            uint uploaded = 0;
     83            HashSet<int> ourIndices = new HashSet<int>();   //Stores all the indices which belong to our packets
     84            //Stores all the indices (together with there check date) which belong to lost packets (i.e. packets that can't be load anymore):
     85            Queue<KeyValuePair<int, DateTime>> lostIndices = new Queue<KeyValuePair<int, DateTime>>();
    8586
    8687            try
     
    8889                while (!stopLoadStoreThread)
    8990                {
     91                    SynchronizeHead();
     92
    9093                    if (storequeue.Count != 0)  //storing has priority
    9194                    {
     
    9497                        while (!success)
    9598                        {
    96                             downloaded = ReadAndEnqueueYield(head, downloaded, uploaded);
    97                             head++;
     99                            SynchronizeHead();
    98100                            success = P2PManager.Store(YieldIdentifier(head), yield);
    99101                        }
    100102                        SetProgressYield(head, YieldStatus.Ours);
    101 
    102                         //increment and store new head:
     103                        ourIndices.Add(head);
     104                       
    103105                        head++;
    104                         success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));
    105                         if (!success)
    106                         {
    107                             //read head and check if ours is higher:
    108                             byte[] h = P2PManager.Retrieve(HeadIdentifier());
    109                             if (h != null)
    110                             {
    111                                 int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
    112                                 if (head > dhthead)
    113                                     P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));  //try to store again
    114                             }
    115                         }
    116106
    117107                        //show informations about the uploaded yield:
     
    119109                        ShowTransfered(downloaded, uploaded);
    120110                    }
    121                     else                      //if there is nothing to store, we can load the yields up to "loadEnd".
     111                    else                      //if there is nothing to store, we can load the other yields.
    122112                    {
    123                         if (loadIndex < loadEnd)
     113                        while (ourIndices.Contains(loadIndex))
     114                            loadIndex++;
     115
     116                        if (loadIndex < head)
    124117                        {
    125                             downloaded = ReadAndEnqueueYield(loadIndex, downloaded, uploaded);
    126                             loadIndex++;                           
     118                            downloaded = TryReadAndEnqueueYield(loadIndex, downloaded, uploaded, lostIndices);
     119
     120                            loadIndex++;
    127121                        }
    128                         else                //if there is nothing left to load, we can check if there is new data behind our local head
     122                        else
    129123                        {
    130                             Thread.Sleep(10000);        //wait 10 seconds
     124                            //check all lost indices which are last checked longer than 1 minutes ago:
     125                            //TODO: Maybe we should throw away those indices, which have been checked more than several times.
     126                            while (lostIndices.Peek().Value.CompareTo(DateTime.Now.Subtract(new TimeSpan(0,1,0))) < 0)
     127                            {
     128                                var e = lostIndices.Dequeue();
     129                                downloaded = TryReadAndEnqueueYield(loadIndex, downloaded, uploaded, lostIndices);
     130                            }
     131                            Thread.Sleep(5000);    //Wait 5 seconds
    131132                        }
    132 
    133133                    }
    134134                }
     
    138138                return;
    139139            }
     140        }
     141
     142        /// <summary>
     143        /// Tries to read and enqueue yield on position "loadIndex".
     144        /// If it fails, it stores the index in lostIndices queue.
     145        /// </summary>
     146        private uint TryReadAndEnqueueYield(int loadIndex, uint downloaded, uint uploaded, Queue<KeyValuePair<int, DateTime>> lostIndices)
     147        {
     148            uint res = ReadAndEnqueueYield(loadIndex, downloaded, uploaded);
     149            if (res != 0)
     150                downloaded = res;
     151            else
     152            {
     153                var e = new KeyValuePair<int, DateTime>(loadIndex, DateTime.Now);
     154                lostIndices.Enqueue(e);
     155            }
     156            return downloaded;
     157        }
     158
     159        /// <summary>
     160        /// Loads head from DHT. If ours is greater, we store ours.
     161        /// </summary>
     162        private void SynchronizeHead()
     163        {
     164            byte[] h = P2PManager.Retrieve(HeadIdentifier());
     165            if (h != null)
     166            {
     167                int dhthead = int.Parse(System.Text.ASCIIEncoding.ASCII.GetString(h));
     168                if (head > dhthead)
     169                {
     170                    bool success = P2PManager.Store(HeadIdentifier(), System.Text.ASCIIEncoding.ASCII.GetBytes(head.ToString()));
     171                    if (!success)
     172                        SynchronizeHead();
     173                }
     174                else
     175                    head = dhthead;
     176            }           
    140177        }
    141178
     
    305342                    P2PManager.Remove(FactorListIdentifier());
    306343                    return SyncFactorManager(factorManager);
    307                 }
     344                }               
    308345            }
    309346
     
    320357                {
    321358                    Thread.Sleep(1000);
    322                     SyncFactorManager(factorManager);       //Just try again
     359                    return SyncFactorManager(factorManager);       //Just try again
    323360                }
    324361            }
Note: See TracChangeset for help on using the changeset viewer.