source: trunk/CrypPlugins/P2PEditor/Distributed/JobListManager.cs @ 1727

Last change on this file since 1727 was 1727, checked in by Paul Lelgemann, 11 years ago

+ P2PEditor can display the status of jobs, if available; Participating displays overlay while loading workspace data
+ KeySearcher can upload status for P2PEditor display

File size: 6.6 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.IO;
4using Cryptool.P2P;
5using Cryptool.P2P.Internal;
6using Cryptool.PluginBase;
7
8namespace Cryptool.P2PEditor.Distributed
9{
10    public class JobListManager
11    {
12        private const string JoblistKey = "Cryptool.P2PEditor.JobList";
13        private const string WorkspaceKeyPrefix = "Workspace";
14        private readonly P2PEditor p2PEditor;
15
16        public JobListManager(P2PEditor p2PEditor)
17        {
18            this.p2PEditor = p2PEditor;
19        }
20
21        public ICollection<DistributedJob> JobList()
22        {
23            p2PEditor.GuiLogMessage("Fetching DHT job list...", NotificationLevel.Debug);
24
25            if (!P2PManager.IsConnected)
26            {
27                p2PEditor.GuiLogMessage("P2P not connected, cannot fetch job list.", NotificationLevel.Error);
28                return new List<DistributedJob>();
29            }
30
31            var serialisedJobList = P2PManager.Retrieve(JoblistKey).Data;
32            if (serialisedJobList == null)
33            {
34                // no job list in DHT, create empty list
35                p2PEditor.GuiLogMessage("No list in DHT, creating empty list.", NotificationLevel.Debug);
36                return new List<DistributedJob>();
37            }
38
39            return ByteArrayToJobList(serialisedJobList);
40        }
41
42        public void AddDistributedJob(DistributedJob distributedJob)
43        {
44            p2PEditor.GuiLogMessage("Distributing new job...", NotificationLevel.Debug);
45
46            if (!P2PManager.IsConnected)
47            {
48                p2PEditor.GuiLogMessage("P2P not connected, cannot distribute job.", NotificationLevel.Error);
49                return;
50            }
51
52            var currentJobList = JobList();
53            currentJobList.Add(distributedJob);
54
55            var serializedJobList = JobListToByteArray(currentJobList);
56            P2PManager.Store(JoblistKey, serializedJobList);
57
58            var workspaceData = File.ReadAllBytes(distributedJob.LocalFilePath);
59            p2PEditor.GuiLogMessage(
60                "Workspace size: " + workspaceData.Length + ", storing at " + GenerateWorkspaceKey(distributedJob),
61                NotificationLevel.Debug);
62            P2PManager.Store(GenerateWorkspaceKey(distributedJob), workspaceData);
63
64            p2PEditor.GuiLogMessage("Distributed job " + distributedJob.Name, NotificationLevel.Info);
65        }
66
67        public void DeleteDistributedJob(DistributedJob distributedJobToDelete)
68        {
69            p2PEditor.GuiLogMessage("Deleting job...", NotificationLevel.Debug);
70
71            if (!P2PManager.IsConnected)
72            {
73                p2PEditor.GuiLogMessage("P2P not connected, cannot distribute job.", NotificationLevel.Error);
74                return;
75            }
76
77            var currentJobList = JobList();
78            currentJobList.Remove(distributedJobToDelete);
79
80            var serializedJobList = JobListToByteArray(currentJobList);
81            P2PManager.Store(JoblistKey, serializedJobList);
82
83            // Retrieve job first to satify versioned DHT
84            P2PManager.Retrieve(GenerateWorkspaceKey(distributedJobToDelete));
85            P2PManager.Remove(GenerateWorkspaceKey(distributedJobToDelete));
86
87            p2PEditor.GuiLogMessage("Deleted distributed job " + distributedJobToDelete.Name, NotificationLevel.Info);
88        }
89
90        public void CompleteDistributedJob(DistributedJob distributedJob)
91        {
92            distributedJob.ConvertRawWorkspaceToLocalFile(P2PManager.Retrieve(GenerateWorkspaceKey(distributedJob)).Data);
93        }
94
95        public void RetrieveDownloadCount(DistributedJob distributedJob)
96        {
97            var result = P2PManager.Retrieve(GenerateDownloadCounterKey(distributedJob));
98           
99            if (result.Status == RequestResultType.KeyNotFound)
100            {
101                distributedJob.Downloads = 0;
102                return;
103            }
104
105            var binaryReader = new BinaryReader(new MemoryStream(result.Data));
106            distributedJob.Downloads = binaryReader.ReadInt32();
107            distributedJob.LastDownload = DateTime.FromBinary(binaryReader.ReadInt64());
108        }
109
110        public void RetrieveCurrentStatus(DistributedJob distributedJob)
111        {
112            if (string.IsNullOrEmpty(distributedJob.StatusKey)) return;
113
114            var result = P2PManager.Retrieve(distributedJob.StatusKey);
115            if (result.Status != RequestResultType.Success) return;
116
117            var status = DistributedJobSerializer.StatusFromReader(new BinaryReader(new MemoryStream(result.Data)));
118            distributedJob.Status = status;
119        }
120
121        public void IncreaseDownloadCount(DistributedJob distributedJob)
122        {
123            RetrieveDownloadCount(distributedJob);
124            distributedJob.Downloads++;
125
126            var memoryStream = new MemoryStream();
127            var binaryWriter = new BinaryWriter(memoryStream);
128            binaryWriter.Write(distributedJob.Downloads);
129            binaryWriter.Write(DateTime.UtcNow.ToBinary());
130
131            P2PManager.Store(GenerateDownloadCounterKey(distributedJob), memoryStream.ToArray());
132        }
133
134        private static string GenerateWorkspaceKey(DistributedJob distributedJob)
135        {
136            return string.Format("{0}.{1}.{2}", JoblistKey, WorkspaceKeyPrefix, distributedJob.Guid);
137        }
138
139        private static string GenerateDownloadCounterKey(DistributedJob distributedJob)
140        {
141            return string.Format("{0}.{1}.{2}.{3}", JoblistKey, WorkspaceKeyPrefix, distributedJob.Guid, "downloads");
142        }
143
144        private static byte[] JobListToByteArray(ICollection<DistributedJob> distributedJobList)
145        {
146            var memoryStream = new MemoryStream();
147            var binaryWriter = new BinaryWriter(memoryStream);
148            binaryWriter.Write(distributedJobList.Count);
149
150            foreach (var distributedJob in distributedJobList)
151            {
152                DistributedJobSerializer.ToWriter(distributedJob, binaryWriter);
153            }
154
155            return memoryStream.ToArray();
156        }
157
158        private ICollection<DistributedJob> ByteArrayToJobList(byte[] rawData)
159        {
160            var distributedJobList = new List<DistributedJob>();
161            var binaryReader = new BinaryReader(new MemoryStream(rawData));
162
163            var numberOfJobs = binaryReader.ReadInt32();
164            for (var i = 0; i < numberOfJobs; i++)
165            {
166                distributedJobList.Add(DistributedJobSerializer.FromReader(binaryReader));
167            }
168
169            return distributedJobList;
170        }
171    }
172}
Note: See TracBrowser for help on using the repository browser.