source: trunk/CrypPlugins/P2PEditor/Distributed/JobListManager.cs @ 2501

Last change on this file since 2501 was 2501, checked in by Sven Rech, 11 years ago

catched exception in p2peditor which can happen when p2p operation failed

File size: 6.9 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using Cryptool.P2P;
5using Cryptool.P2P.Internal;
6using Cryptool.PluginBase;
7
8namespace Cryptool.P2PEditor.Distributed
9{
10    public class JobListManager
11    {
12        private const string JoblistKey = "Cryptool.P2PEditor.JobList";
13        private const string WorkspaceKeyPrefix = "Workspace";
14        private readonly P2PEditor p2PEditor;
15
16        public JobListManager(P2PEditor p2PEditor)
17        {
18            this.p2PEditor = p2PEditor;
19        }
20
21        public ICollection<DistributedJob> JobList()
22        {
23            p2PEditor.GuiLogMessage("Fetching DHT job list...", NotificationLevel.Debug);
24
25            if (!P2PManager.IsConnected)
26            {
27                p2PEditor.GuiLogMessage("P2P not connected, cannot fetch job list.", NotificationLevel.Error);
28                return new List<DistributedJob>();
29            }
30
31            var serialisedJobList = P2PManager.Retrieve(JoblistKey).Data;
32            if (serialisedJobList == null)
33            {
34                // no job list in DHT, create empty list
35                p2PEditor.GuiLogMessage("No list in DHT, creating empty list.", NotificationLevel.Debug);
36                return new List<DistributedJob>();
37            }
38
39            return ByteArrayToJobList(serialisedJobList);
40        }
41
42        public void AddDistributedJob(DistributedJob distributedJob)
43        {
44            p2PEditor.GuiLogMessage("Distributing new job...", NotificationLevel.Debug);
45
46            if (!P2PManager.IsConnected)
47            {
48                p2PEditor.GuiLogMessage("P2P not connected, cannot distribute job.", NotificationLevel.Error);
49                return;
50            }
51
52            var currentJobList = JobList();
53            currentJobList.Add(distributedJob);
54
55            var serializedJobList = JobListToByteArray(currentJobList);
56            P2PManager.Store(JoblistKey, serializedJobList);
57
58            var workspaceData = File.ReadAllBytes(distributedJob.LocalFilePath);
59            p2PEditor.GuiLogMessage(
60                "Workspace size: " + workspaceData.Length + ", storing at " + GenerateWorkspaceKey(distributedJob),
61                NotificationLevel.Debug);
62            P2PManager.Store(GenerateWorkspaceKey(distributedJob), workspaceData);
63
64            p2PEditor.GuiLogMessage("Distributed job " + distributedJob.Name, NotificationLevel.Info);
65        }
66
67        public void DeleteDistributedJob(DistributedJob distributedJobToDelete)
68        {
69            p2PEditor.GuiLogMessage("Deleting job...", NotificationLevel.Debug);
70
71            if (!P2PManager.IsConnected)
72            {
73                p2PEditor.GuiLogMessage("P2P not connected, cannot distribute job.", NotificationLevel.Error);
74                return;
75            }
76
77            var currentJobList = JobList();
78            currentJobList.Remove(distributedJobToDelete);
79
80            var serializedJobList = JobListToByteArray(currentJobList);
81            P2PManager.Store(JoblistKey, serializedJobList);
82
83            // Retrieve job first to satify versioned DHT
84            P2PManager.Retrieve(GenerateWorkspaceKey(distributedJobToDelete));
85            P2PManager.Remove(GenerateWorkspaceKey(distributedJobToDelete));
86
87            p2PEditor.GuiLogMessage("Deleted distributed job " + distributedJobToDelete.Name, NotificationLevel.Info);
88        }
89
90        public void CompleteDistributedJob(DistributedJob distributedJob)
91        {
92            distributedJob.ConvertRawWorkspaceToLocalFile(P2PManager.Retrieve(GenerateWorkspaceKey(distributedJob)).Data);
93        }
94
95        public void RetrieveDownloadCount(DistributedJob distributedJob)
96        {
97            try
98            {
99                var result = P2PManager.Retrieve(GenerateDownloadCounterKey(distributedJob));
100
101                if (result.Status == RequestResultType.KeyNotFound)
102                {
103                    distributedJob.Downloads = 0;
104                    return;
105                }
106
107                if (result.Data != null)
108                {
109                    var binaryReader = new BinaryReader(new MemoryStream(result.Data));
110                    distributedJob.Downloads = binaryReader.ReadInt32();
111                    distributedJob.LastDownload = DateTime.FromBinary(binaryReader.ReadInt64());
112                }
113            }
114            catch (Exception)
115            {
116            }
117        }
118
119        public void RetrieveCurrentStatus(DistributedJob distributedJob)
120        {
121            if (string.IsNullOrEmpty(distributedJob.StatusKey)) return;
122
123            var result = P2PManager.Retrieve(distributedJob.StatusKey);
124            if (result.Status != RequestResultType.Success) return;
125
126            var status = DistributedJobSerializer.StatusFromReader(new BinaryReader(new MemoryStream(result.Data)));
127            distributedJob.Status = status;
128        }
129
130        public void IncreaseDownloadCount(DistributedJob distributedJob)
131        {
132            RetrieveDownloadCount(distributedJob);
133            distributedJob.Downloads++;
134
135            var memoryStream = new MemoryStream();
136            var binaryWriter = new BinaryWriter(memoryStream);
137            binaryWriter.Write(distributedJob.Downloads);
138            binaryWriter.Write(DateTime.UtcNow.ToBinary());
139
140            P2PManager.Store(GenerateDownloadCounterKey(distributedJob), memoryStream.ToArray());
141        }
142
143        private static string GenerateWorkspaceKey(DistributedJob distributedJob)
144        {
145            return string.Format("{0}.{1}.{2}", JoblistKey, WorkspaceKeyPrefix, distributedJob.Guid);
146        }
147
148        private static string GenerateDownloadCounterKey(DistributedJob distributedJob)
149        {
150            return string.Format("{0}.{1}.{2}.{3}", JoblistKey, WorkspaceKeyPrefix, distributedJob.Guid, "downloads");
151        }
152
153        private static byte[] JobListToByteArray(ICollection<DistributedJob> distributedJobList)
154        {
155            var memoryStream = new MemoryStream();
156            var binaryWriter = new BinaryWriter(memoryStream);
157            binaryWriter.Write(distributedJobList.Count);
158
159            foreach (var distributedJob in distributedJobList)
160            {
161                DistributedJobSerializer.ToWriter(distributedJob, binaryWriter);
162            }
163
164            return memoryStream.ToArray();
165        }
166
167        private ICollection<DistributedJob> ByteArrayToJobList(byte[] rawData)
168        {
169            var distributedJobList = new List<DistributedJob>();
170            var binaryReader = new BinaryReader(new MemoryStream(rawData));
171
172            var numberOfJobs = binaryReader.ReadInt32();
173            for (var i = 0; i < numberOfJobs; i++)
174            {
175                distributedJobList.Add(DistributedJobSerializer.FromReader(binaryReader));
176            }
177
178            return distributedJobList;
179        }
180    }
181}
Note: See TracBrowser for help on using the repository browser.