source: trunk/CrypPlugins/P2PEditor/Distributed/JobListManager.cs @ 2081

Last change on this file since 2081 was 2081, checked in by Sven Rech, 11 years ago

CrypP2P tries to reconnect now when SystemLeft event occurs

File size: 6.7 KB
RevLine 
[1727]1using System;
[1665]2using System.Collections.Generic;
[1477]3using System.IO;
4using Cryptool.P2P;
[1727]5using Cryptool.P2P.Internal;
[1477]6using Cryptool.PluginBase;
7
8namespace Cryptool.P2PEditor.Distributed
9{
[1527]10    public class JobListManager
[1477]11    {
12        private const string JoblistKey = "Cryptool.P2PEditor.JobList";
13        private const string WorkspaceKeyPrefix = "Workspace";
[1727]14        private readonly P2PEditor p2PEditor;
[1477]15
[1527]16        public JobListManager(P2PEditor p2PEditor)
[1477]17        {
[1727]18            this.p2PEditor = p2PEditor;
[1527]19        }
[1477]20
[1727]21        public ICollection<DistributedJob> JobList()
[1527]22        {
[1727]23            p2PEditor.GuiLogMessage("Fetching DHT job list...", NotificationLevel.Debug);
[1527]24
[1616]25            if (!P2PManager.IsConnected)
[1477]26            {
[1727]27                p2PEditor.GuiLogMessage("P2P not connected, cannot fetch job list.", NotificationLevel.Error);
[1477]28                return new List<DistributedJob>();
29            }
30
[1665]31            var serialisedJobList = P2PManager.Retrieve(JoblistKey).Data;
[1477]32            if (serialisedJobList == null)
33            {
34                // no job list in DHT, create empty list
[1727]35                p2PEditor.GuiLogMessage("No list in DHT, creating empty list.", NotificationLevel.Debug);
[1477]36                return new List<DistributedJob>();
37            }
38
[1727]39            return ByteArrayToJobList(serialisedJobList);
[1477]40        }
41
[1527]42        public void AddDistributedJob(DistributedJob distributedJob)
[1477]43        {
[1727]44            p2PEditor.GuiLogMessage("Distributing new job...", NotificationLevel.Debug);
[1477]45
[1616]46            if (!P2PManager.IsConnected)
[1477]47            {
[1727]48                p2PEditor.GuiLogMessage("P2P not connected, cannot distribute job.", NotificationLevel.Error);
[1477]49                return;
50            }
51
[1563]52            var currentJobList = JobList();
[1477]53            currentJobList.Add(distributedJob);
54
[1727]55            var serializedJobList = JobListToByteArray(currentJobList);
56            P2PManager.Store(JoblistKey, serializedJobList);
[1477]57
[1563]58            var workspaceData = File.ReadAllBytes(distributedJob.LocalFilePath);
[1727]59            p2PEditor.GuiLogMessage(
[1527]60                "Workspace size: " + workspaceData.Length + ", storing at " + GenerateWorkspaceKey(distributedJob),
61                NotificationLevel.Debug);
62            P2PManager.Store(GenerateWorkspaceKey(distributedJob), workspaceData);
[1477]63
[1727]64            p2PEditor.GuiLogMessage("Distributed job " + distributedJob.Name, NotificationLevel.Info);
[1477]65        }
[1527]66
[1563]67        public void DeleteDistributedJob(DistributedJob distributedJobToDelete)
68        {
[1727]69            p2PEditor.GuiLogMessage("Deleting job...", NotificationLevel.Debug);
[1563]70
[1616]71            if (!P2PManager.IsConnected)
[1563]72            {
[1727]73                p2PEditor.GuiLogMessage("P2P not connected, cannot distribute job.", NotificationLevel.Error);
[1563]74                return;
75            }
76
77            var currentJobList = JobList();
[1579]78            currentJobList.Remove(distributedJobToDelete);
[1563]79
[1727]80            var serializedJobList = JobListToByteArray(currentJobList);
81            P2PManager.Store(JoblistKey, serializedJobList);
[1563]82
83            // Retrieve job first to satify versioned DHT
84            P2PManager.Retrieve(GenerateWorkspaceKey(distributedJobToDelete));
85            P2PManager.Remove(GenerateWorkspaceKey(distributedJobToDelete));
86
[1727]87            p2PEditor.GuiLogMessage("Deleted distributed job " + distributedJobToDelete.Name, NotificationLevel.Info);
[1563]88        }
89
[1527]90        public void CompleteDistributedJob(DistributedJob distributedJob)
91        {
[1665]92            distributedJob.ConvertRawWorkspaceToLocalFile(P2PManager.Retrieve(GenerateWorkspaceKey(distributedJob)).Data);
[1527]93        }
94
[1727]95        public void RetrieveDownloadCount(DistributedJob distributedJob)
96        {
97            var result = P2PManager.Retrieve(GenerateDownloadCounterKey(distributedJob));
98           
99            if (result.Status == RequestResultType.KeyNotFound)
100            {
101                distributedJob.Downloads = 0;
102                return;
103            }
104
[2081]105            if (result.Data != null)
106            {
107                var binaryReader = new BinaryReader(new MemoryStream(result.Data));
108                distributedJob.Downloads = binaryReader.ReadInt32();
109                distributedJob.LastDownload = DateTime.FromBinary(binaryReader.ReadInt64());
110            }
[1727]111        }
112
113        public void RetrieveCurrentStatus(DistributedJob distributedJob)
114        {
115            if (string.IsNullOrEmpty(distributedJob.StatusKey)) return;
116
117            var result = P2PManager.Retrieve(distributedJob.StatusKey);
118            if (result.Status != RequestResultType.Success) return;
119
120            var status = DistributedJobSerializer.StatusFromReader(new BinaryReader(new MemoryStream(result.Data)));
121            distributedJob.Status = status;
122        }
123
124        public void IncreaseDownloadCount(DistributedJob distributedJob)
125        {
126            RetrieveDownloadCount(distributedJob);
127            distributedJob.Downloads++;
128
129            var memoryStream = new MemoryStream();
130            var binaryWriter = new BinaryWriter(memoryStream);
131            binaryWriter.Write(distributedJob.Downloads);
132            binaryWriter.Write(DateTime.UtcNow.ToBinary());
133
134            P2PManager.Store(GenerateDownloadCounterKey(distributedJob), memoryStream.ToArray());
135        }
136
[1527]137        private static string GenerateWorkspaceKey(DistributedJob distributedJob)
138        {
[1727]139            return string.Format("{0}.{1}.{2}", JoblistKey, WorkspaceKeyPrefix, distributedJob.Guid);
[1527]140        }
[1727]141
142        private static string GenerateDownloadCounterKey(DistributedJob distributedJob)
143        {
144            return string.Format("{0}.{1}.{2}.{3}", JoblistKey, WorkspaceKeyPrefix, distributedJob.Guid, "downloads");
145        }
146
147        private static byte[] JobListToByteArray(ICollection<DistributedJob> distributedJobList)
148        {
149            var memoryStream = new MemoryStream();
150            var binaryWriter = new BinaryWriter(memoryStream);
151            binaryWriter.Write(distributedJobList.Count);
152
153            foreach (var distributedJob in distributedJobList)
154            {
155                DistributedJobSerializer.ToWriter(distributedJob, binaryWriter);
156            }
157
158            return memoryStream.ToArray();
159        }
160
161        private ICollection<DistributedJob> ByteArrayToJobList(byte[] rawData)
162        {
163            var distributedJobList = new List<DistributedJob>();
164            var binaryReader = new BinaryReader(new MemoryStream(rawData));
165
166            var numberOfJobs = binaryReader.ReadInt32();
167            for (var i = 0; i < numberOfJobs; i++)
168            {
169                distributedJobList.Add(DistributedJobSerializer.FromReader(binaryReader));
170            }
171
172            return distributedJobList;
173        }
[1477]174    }
175}
Note: See TracBrowser for help on using the repository browser.