source: trunk/CrypPlugins/KeySearcher/P2P/Storage/StorageHelper.cs @ 2250

Last change on this file since 2250 was 2250, checked in by nolte, 11 years ago

MTC back to life.
Statistic + Updater Fix.
Working version!!!

File size: 14.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using Cryptool.P2P;
5using Cryptool.P2P.Internal;
6using Cryptool.PluginBase;
7using KeySearcher.P2P.Exceptions;
8using KeySearcher.P2P.Presentation;
9using KeySearcher.P2P.Tree;
10
11namespace KeySearcher.P2P.Storage
12{
13    class StorageHelper
14    {
15        private readonly KeySearcher keySearcher;
16        private readonly StatisticsGenerator statisticsGenerator;
17        private readonly StatusContainer statusContainer;
18
19        //VERSIONNUMBER: Important. Set it +1 manually everytime the length of the MemoryStream Changes
20        private const int version = 4;
21        /*
22        -----------------------------Versionnumber Changelog---------------------------------------------
23        |   Version 1: Added the Versionnumber to the Stream (Leafs)
24        |   Version 2: Added the first User Statistics (Avatar,ID,Count) to the Stream
25        |   Version 3: Added version question (in front of results) + more statistic information (hostname,date) to the Stream
26        |   Version 4: Reorganisation of the DHT structure. Update fully working now + features from previous versions available
27        -------------------------------------------------------------------------------------------------
28         */
29
30        public StorageHelper(KeySearcher keySearcher, StatisticsGenerator statisticsGenerator, StatusContainer statusContainer)
31        {
32            this.keySearcher = keySearcher;
33            this.statisticsGenerator = statisticsGenerator;
34            this.statusContainer = statusContainer;
35        }
36
37        //-------------------------------------------------------------------------------------------
38        //AFTER CHANGING THE FOLLOWING METHODS INCREASE THE VERSION-NUMBER AT THE TOP OF THIS CLASS!
39        //-------------------------------------------------------------------------------------------
40        internal RequestResult UpdateInDht(NodeBase nodeToUpdate)
41        {
42            var memoryStream = new MemoryStream();
43            var binaryWriter = new BinaryWriter(memoryStream);
44
45            //TODO: Append Updater Version
46            binaryWriter.Write('W');
47            binaryWriter.Write(version); 
48
49            if (nodeToUpdate is Node)
50            {
51                UpdateNodeInDht((Node) nodeToUpdate, binaryWriter);
52            } else
53            {
54                UpdateLeafInDht((Leaf) nodeToUpdate, binaryWriter);
55            }
56
57            // Append results
58            binaryWriter.Write(nodeToUpdate.Result.Count);
59            foreach (var valueKey in nodeToUpdate.Result)
60            {
61                binaryWriter.Write(valueKey.key);
62                binaryWriter.Write(valueKey.value);
63                binaryWriter.Write(valueKey.decryption.Length);
64                binaryWriter.Write(valueKey.decryption);
65            }                       
66             
67            //Creating a copy of the activity dictionary
68            var copyAct = nodeToUpdate.Activity;
69
70            //Write number of avatarnames
71            binaryWriter.Write(copyAct.Keys.Count);
72            foreach (string avatar in copyAct.Keys)
73            {
74                var maschCopy = copyAct[avatar];
75                //write avatarname
76                binaryWriter.Write(avatar);
77                //write the number of maschines for this avatar
78                binaryWriter.Write(maschCopy.Keys.Count);
79
80                foreach (long maschID in maschCopy.Keys)
81                {
82                    //write the maschines and their patterncount
83                    binaryWriter.Write(maschID);
84                    binaryWriter.Write(maschCopy[maschID].Count); //int 32
85                    binaryWriter.Write(maschCopy[maschID].Hostname); //String
86                    binaryWriter.Write(maschCopy[maschID].Date.ToBinary()); //DateTime
87                }
88            }
89            return StoreWithStatistic(KeyInDht(nodeToUpdate), memoryStream.ToArray());
90        }
91
92        private static void UpdateNodeInDht(Node nodeToUpdate, BinaryWriter binaryWriter)
93        {
94            binaryWriter.Write(nodeToUpdate.LeftChildFinished);
95            binaryWriter.Write(nodeToUpdate.RightChildFinished);
96        }
97
98        private static void UpdateLeafInDht(Leaf nodeToUpdate, BinaryWriter binaryWriter)
99        {
100            binaryWriter.Write('V');
101            binaryWriter.Write(version);
102            var buffer = nodeToUpdate.LastReservationDate.ToBinary();
103            binaryWriter.Write(buffer);
104            binaryWriter.Write(nodeToUpdate.getClientIdentifier());
105        }
106
107        internal RequestResult UpdateFromDht(NodeBase nodeToUpdate, bool forceUpdate = false)
108        {
109
110            if (!forceUpdate && nodeToUpdate.LastUpdate > DateTime.Now.Subtract(new TimeSpan(0, 0, 5)))
111            {
112                return new RequestResult { Status = RequestResultType.Success };
113            }
114
115            nodeToUpdate.LastUpdate = DateTime.Now;
116
117            var requestResult = RetrieveWithStatistic(KeyInDht(nodeToUpdate));
118            var nodeBytes = requestResult.Data;
119
120            if (nodeBytes == null)
121            {
122                return requestResult;
123            }
124
125            var binaryReader = new BinaryReader(new MemoryStream(nodeBytes));
126
127            //oldVersionFlag will be used to garantee further changes in the Stream
128            var oldVersionFlag = CheckNodeVersion(binaryReader);
129
130            if (nodeToUpdate is Node)
131            {
132                UpdateNodeFromDht((Node) nodeToUpdate, binaryReader);
133            } else
134            {
135                UpdateLeafFromDht((Leaf)nodeToUpdate, binaryReader);
136            }
137
138            // Load results
139            var resultCount = binaryReader.ReadInt32();
140            for (var i = 0; i < resultCount; i++)
141            {
142                var newResult = new KeySearcher.ValueKey
143                                    {
144                                        key = binaryReader.ReadString(),
145                                        value = binaryReader.ReadDouble(),
146                                        decryption = binaryReader.ReadBytes(binaryReader.ReadInt32())
147                                    };
148                nodeToUpdate.Result.AddLast(newResult);
149            }
150           
151            if (binaryReader.BaseStream.Length != binaryReader.BaseStream.Position)
152            { 
153                //Reading the number of avatarnames
154                int avatarcount = binaryReader.ReadInt32();
155                for(int i=0; i<avatarcount;i++)
156                {
157                    //Reading the avatarname and the maschine-count for this name
158                    string avatarname = binaryReader.ReadString();
159                    int maschcount = binaryReader.ReadInt32();
160                    var readMaschcount = new Dictionary<long, Information>();
161                   
162                    for (int j = 0; j < maschcount; j++)
163                    {
164                        //reading the IDs and patterncount
165                        long maschID = binaryReader.ReadInt64();
166                        int count = binaryReader.ReadInt32();
167                        string host = binaryReader.ReadString(); 
168                        var date = DateTime.FromBinary(binaryReader.ReadInt64());
169                        readMaschcount.Add(maschID, new Information() {Count = count, Hostname = host, Date = date});
170                       
171                    }
172                   
173                    if (nodeToUpdate.Activity.ContainsKey(avatarname))
174                    {
175                        nodeToUpdate.Activity[avatarname] = readMaschcount;
176                    }
177                    else
178                    {
179                        nodeToUpdate.Activity.Add(avatarname, readMaschcount);                     
180                    }
181                }               
182            }
183                       
184            if (resultCount > 0)
185            {
186                keySearcher.IntegrateNewResults(nodeToUpdate.Result, nodeToUpdate.Activity, nodeToUpdate.DistributedJobIdentifier);
187                statisticsGenerator.ProcessPatternResults(nodeToUpdate.Result);
188            }
189
190            nodeToUpdate.UpdateCache();
191            return requestResult;
192        }
193
194        private static void UpdateNodeFromDht(Node nodeToUpdate, BinaryReader binaryReader)
195        {
196            nodeToUpdate.LeftChildFinished = binaryReader.ReadBoolean() || nodeToUpdate.LeftChildFinished;
197            nodeToUpdate.RightChildFinished = binaryReader.ReadBoolean() || nodeToUpdate.RightChildFinished;
198        }
199
200        private static void UpdateLeafFromDht(Leaf nodeToUpdate, BinaryReader binaryReader)
201        {
202            var oldVersionFlag = CheckVersion(binaryReader);
203               
204            var date = DateTime.FromBinary(binaryReader.ReadInt64());
205            if (date > nodeToUpdate.LastReservationDate)
206            {
207                nodeToUpdate.LastReservationDate = date;
208            }
209           
210            try
211            {
212                if (binaryReader.BaseStream.Length - binaryReader.BaseStream.Position >= 8)
213                {
214                    nodeToUpdate.setClientIdentifier(binaryReader.ReadInt64());
215                }
216                else
217                {
218                    throw new Exception();
219                }
220            }
221            catch (Exception)
222            {
223                // client id not available, use default
224                nodeToUpdate.setClientIdentifier(-1);
225            }
226           
227        }
228
229        internal static string KeyInDht(NodeBase node)
230        {
231            return string.Format("{0}_node_{1}_{2}", node.DistributedJobIdentifier, node.From, node.To);
232        }
233
234        private static int CheckVersion(BinaryReader binaryReader)
235        {           
236            try
237            {
238                //Checking if there's a version in the stream
239                int vers = binaryReader.PeekChar();
240                if (vers == 86) //V infront of a Leaf
241                {
242                    //Reading the char and the versionnumber
243                    char magic = binaryReader.ReadChar();
244                    int versionInUse = binaryReader.ReadInt32();
245                    //Check if a newer Version is in use
246                    if (versionInUse > version)
247                    {
248                        throw new KeySearcherStopException();
249                    }
250                    return versionInUse;
251                }
252                else
253                {
254                    return 0;
255                }
256            }
257            catch(KeySearcherStopException)
258            {
259                throw new KeySearcherStopException();
260            }           
261        }
262
263        private static int CheckNodeVersion(BinaryReader binaryReader)
264        {
265            try
266            {
267                //Checking if there's a version in the stream
268                int vers = binaryReader.PeekChar();
269                if (vers == 87) //W infront of a Node
270                {
271                    //Reading the char and the versionnumber
272                    char magic = binaryReader.ReadChar();
273                    int versionInUse = binaryReader.ReadInt32();
274                    //Check if a newer Version is in use
275                    if (versionInUse > version)
276                    {
277                        throw new KeySearcherStopException();
278                    }
279                    return versionInUse;
280                }
281                else
282                {
283                    return 0;
284                }
285            }
286            catch (KeySearcherStopException)
287            {
288                throw new KeySearcherStopException();
289            }
290        }
291
292        public DateTime StartDate(String ofJobIdentifier)
293        {
294            var key = ofJobIdentifier + "_startdate";
295            var requestResult = RetrieveWithStatistic(key);
296
297            if (requestResult.IsSuccessful() && requestResult.Data != null)
298            {
299                var startTimeUtc = DateTime.SpecifyKind(
300                    DateTime.FromBinary(BitConverter.ToInt64(requestResult.Data, 0)), DateTimeKind.Utc);
301                return startTimeUtc.ToLocalTime();
302            }
303
304            StoreWithStatistic(key, BitConverter.GetBytes((DateTime.UtcNow.ToBinary())));
305            return DateTime.Now;
306        }
307
308        public long SubmitterID(String ofJobIdentifier)
309        {
310            var key = ofJobIdentifier + "_submitterid";
311            var requestResult = RetrieveWithStatistic(key);
312
313            if (requestResult.IsSuccessful() && requestResult.Data != null)
314            {
315                var submitterid = BitConverter.ToInt64(requestResult.Data, 0);
316                return submitterid;
317            }
318
319            StoreWithStatistic(key, BitConverter.GetBytes(Cryptool.PluginBase.Miscellaneous.UniqueIdentifier.GetID()));
320            return Cryptool.PluginBase.Miscellaneous.UniqueIdentifier.GetID();
321        }
322
323        public RequestResult RetrieveWithStatistic(string key)
324        {
325            statusContainer.RetrieveRequests++;
326            statusContainer.TotalDhtRequests++;
327            var requestResult = P2PManager.Retrieve(key);
328
329            if (requestResult.Data != null)
330            {
331                statusContainer.RetrievedBytes += requestResult.Data.Length;
332                statusContainer.TotalBytes += requestResult.Data.Length;
333            }
334
335            return requestResult;
336        }
337
338        public RequestResult RemoveWithStatistic(string key)
339        {
340            statusContainer.RemoveRequests++;
341            statusContainer.TotalDhtRequests++;
342            return P2PManager.Remove(key);
343        }
344
345        public RequestResult StoreWithStatistic(string key, byte[] data)
346        {
347            statusContainer.StoreRequests++;
348            statusContainer.TotalDhtRequests++;
349            var requestResult = P2PManager.Store(key, data);
350
351            if (requestResult.Data != null)
352            {
353                statusContainer.StoredBytes += requestResult.Data.Length;
354                statusContainer.TotalBytes += requestResult.Data.Length;
355            }
356
357            return requestResult;
358        }
359    }
360}
Note: See TracBrowser for help on using the repository browser.