source: trunk/CrypPlugins/PeerToPeerJobs/JobMessages.cs @ 1348

Last change on this file since 1348 was 1348, checked in by arnold, 12 years ago

P2PManager/Worker: Manager sends a "no more jobs left" message to the requesting worker or after receiving the last job result to all active registered workers. After receiving this message, the workers stop their "waiting for new jobs" timer, which sends "free worker" messages to the Manager periodically.
For future use: After worker has received the "no more jobs left" message from the Manager, it should register with a new Manager to computate more jobs.

File size: 15.5 KB
Line 
1/* Copyright 2010 Team CrypTool (Christian Arnold), Uni Duisburg-Essen
2
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6
7       http://www.apache.org/licenses/LICENSE-2.0
8
9   Unless required by applicable law or agreed to in writing, software
10   distributed under the License is distributed on an "AS IS" BASIS,
11   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   See the License for the specific language governing permissions and
13   limitations under the License.
14*/
15
16using System;
17using System.Collections.Generic;
18using System.Linq;
19using System.Text;
20using Cryptool.PluginBase.Miscellaneous;
21
22namespace Cryptool.Plugins.PeerToPeer.Jobs
23{
24    /// <summary>
25    /// use this ENUM only in the context with JobMessages!
26    /// </summary>
27    public enum MessageJobType
28    {
29        /// <summary>
30        /// indicates that this message contains a JobAccepted/JobDeclined Information
31        /// </summary>
32        JobAcceptanceInfo = 200,
33        /// <summary>
34        /// indicates that this message contains a JobResult
35        /// </summary>
36        JobResult = 201,
37        /// <summary>
38        /// indicates, that it has capacities to process a new job (use it only
39        /// for messages FROM the P2PJobAdmin TO the Manager)
40        /// </summary>
41        Free = 202,
42        /// <summary>
43        /// indicates that this message contains a JobPart (use it only for
44        /// messages FROM the Manager TO the P2PJobAdmin)
45        /// </summary>
46        JobPart = 203,
47        /// <summary>
48        /// Manager should send this msgType to all requesting
49        /// workers, when it has no more jobs left
50        /// </summary>
51        NoMoreJobsLeft = 204
52    }
53
54    public static class JobMessages
55    {
56        #region Detection and Type-Request of JobMessages
57
58        /// <summary>
59        /// returns true, when the first byte indicates, that this is a JobMessage
60        /// </summary>
61        /// <param name="firstByte">first byte of a message</param>
62        /// <returns></returns>
63        public static bool IsJobMessageType(byte firstByte)
64        {
65            foreach (MessageJobType msgJobType in Enum.GetValues(typeof(MessageJobType)))
66            {
67                if ((MessageJobType)firstByte == msgJobType)
68                    return true;
69            }
70            return false;
71        }
72
73        /// <summary>
74        /// use this method only, when you are sure, that the first byte indicates
75        /// a JobMessage (use IsJobMessageType first), then it will return the
76        /// special MessageJobType
77        /// </summary>
78        /// <param name="firstByte">First byte of the message</param>
79        /// <returns></returns>
80        public static MessageJobType GetMessageJobType(byte firstByte)
81        {
82            foreach (MessageJobType msgJobType in Enum.GetValues(typeof(MessageJobType)))
83            {
84                if ((MessageJobType)firstByte == msgJobType)
85                    return msgJobType;
86            }
87            throw (new Exception("No other JobMessageTypes are available!"));
88        }
89
90        #endregion
91
92        /// <summary>
93        /// integrates the serialized JobPart into the DistributableJob-specific
94        /// communication protocol - returns a completely compatible DistributableJob-Message
95        /// </summary>
96        /// <param name="jobId">the jobId of the actual JobPart</param>
97        /// <param name="data">the already serialized JobPart-data</param>
98        /// <returns></returns>
99        public static byte[] CreateJobPartMessage(BigInteger jobId, byte[] data)
100        {
101            byte[] jobIdBytes = SerializeJobId(jobId);
102
103            byte[] result = new byte[jobIdBytes.Length + data.Length + 1];
104            result[0] = (byte)MessageJobType.JobPart;
105
106            Buffer.BlockCopy(jobIdBytes, 0, result, 1, jobIdBytes.Length);
107            Buffer.BlockCopy(data, 0, result, jobIdBytes.Length + 1, data.Length);
108
109            return result;
110        }
111
112        /// <summary>
113        /// Fetches the serialized JobPart data and the JobId of the JobPart out of
114        /// the DistributableJob-specific message. If the message doesn't fit to the
115        /// communication protocol, this method returns null, otherwise the raw serialized
116        /// JobPart data!
117        /// </summary>
118        /// <param name="data">DistributableJob-specific message, which was proved that it is a JobPartMessage before</param>
119        /// <param name="jobId">catches the JobId out of the raw byte data and returns it, too</param>
120        /// <returns></returns>
121        public static byte[] GetJobPartMessage(byte[] data, out BigInteger jobId)
122        {
123            byte[] serializedJobPart = null;
124            jobId = null;
125            if ((MessageJobType)data[0] == MessageJobType.JobPart)
126            {
127                byte[] tailBytes = new byte[data.Length - 1];
128
129                Buffer.BlockCopy(data, 1, tailBytes, 0, tailBytes.Length);
130                int bytesLeft;
131                jobId = DeserializeJobId(tailBytes, out bytesLeft);
132
133                serializedJobPart = new byte[bytesLeft];
134                Buffer.BlockCopy(tailBytes, tailBytes.Length - bytesLeft, serializedJobPart, 0, serializedJobPart.Length);
135            }
136            return serializedJobPart;
137        }
138
139        /* Byte representation of a Job Result Message:
140         * 1 Byte:  MessageJobType
141         * n Bytes: serialized job id (incl. byteLen; use DeserializeBigInt!)
142         * n Bytes: serialized job result */
143        /// <summary>
144        /// integrates the serialized JobResult into the DistributableJob-specific
145        /// communication protocol - returns a completely compatible DistributableJob-Message
146        /// </summary>
147        /// <param name="jobId">the jobId of the actual JobPart</param>
148        /// <param name="data">the already serialized JobResult-data</param>
149        /// <returns></returns>
150        public static byte[] CreateJobResultMessage(BigInteger jobId, byte[] data)
151        {
152            byte[] jobIdBytes = SerializeJobId(jobId);
153
154            byte[] result = new byte[jobIdBytes.Length + data.Length + 1];
155            result[0] = (byte)MessageJobType.JobResult;
156
157            Buffer.BlockCopy(jobIdBytes, 0, result, 1, jobIdBytes.Length);
158            Buffer.BlockCopy(data, 0, result, jobIdBytes.Length + 1, data.Length);
159
160            return result;
161        }
162
163        /// <summary>
164        /// Fetches the serialized JobResult data and the JobId of the JobResult out of
165        /// the DistributableJob-specific message. If the message doesn't fit to the
166        /// communication protocol, this method returns null, otherwise the raw serialized
167        /// JobResult data!
168        /// </summary>
169        /// <param name="data">DistributableJob-specific message, which was proved that it is a JobResultMessage before</param>
170        /// <param name="jobId">catches the JobId out of the raw byte data and returns it, too</param>
171        /// <returns></returns>
172        public static byte[] GetJobResult(byte[] data, out BigInteger jobId)
173        {
174            byte[] serializedJobResult = null;
175            jobId = null;
176            if ((MessageJobType)data[0] == MessageJobType.JobResult)
177            {
178                byte[] tailBytes = new byte[data.Length - 1];
179
180                Buffer.BlockCopy(data, 1, tailBytes, 0, tailBytes.Length);
181                int bytesLeft;
182                jobId = DeserializeJobId(tailBytes, out bytesLeft);
183                serializedJobResult = new byte[bytesLeft];
184                Buffer.BlockCopy(tailBytes, tailBytes.Length - bytesLeft, serializedJobResult, 0, serializedJobResult.Length);
185            }
186            return serializedJobResult;
187        }
188
189        /* Byte representation of a job acceptance Message:
190         * 1 Byte:  MessageJobType
191         * n Bytes: bool JobAcceptance
192         * n Bytes: serialized Job Id (incl. length, use DeserializeJobId!)*/
193        /// <summary>
194        /// creates a DistributableJob-specific message, which contains information
195        /// about the job acceptance (Accepted or Declined)
196        /// </summary>
197        /// <param name="jobId">the jobId of the actual JobPart</param>
198        /// <param name="jobAccepted">true, when JobPart was accepted by the P2PJobAdmin, otherwise false</param>
199        /// <returns></returns>
200        public static byte[] CreateJobAcceptanceMessage(BigInteger jobId, bool jobAccepted)
201        {
202            byte[] serializedJobId = SerializeJobId(jobId);
203
204            byte jobAcceptedByte = Convert.ToByte(jobAccepted);
205
206            byte[] serializedData = new byte[serializedJobId.Length + 2];
207            serializedData[0] = (byte)MessageJobType.JobAcceptanceInfo;
208            serializedData[1] = jobAcceptedByte;
209            Buffer.BlockCopy(serializedJobId, 0, serializedData, 2, serializedJobId.Length);
210
211            return serializedData;
212        }
213
214        /// <summary>
215        /// Fetches the JobAcceptance-Result out of the DistributableJob-specific
216        /// message. Returns true, if JobPart was accepted by the P2PJobAdmin,
217        /// otherwise false. Additionally it returns the JobId of the accepted/declined
218        /// JobPart.
219        /// </summary>
220        /// <param name="data">DistributableJob-specific message</param>
221        /// <param name="jobId">catches the JobId out of the raw byte data and returns it, too</param>
222        /// <returns></returns>
223        public static bool GetJobAcceptanceMessage(byte[] data, out BigInteger jobId)
224        {
225            bool result;
226            if ((MessageJobType)data[0] == MessageJobType.JobAcceptanceInfo)
227            {
228                result = Convert.ToBoolean(data[1]);
229                byte[] jobIdBytes = new byte[data.Length - 2];
230                Buffer.BlockCopy(data, 2, jobIdBytes, 0, jobIdBytes.Length);
231                int neverMind;
232                jobId = DeserializeJobId(jobIdBytes, out neverMind);
233            }
234            else
235            {
236                throw (new Exception("byte[] representation wasn't a JobAcceptance Message!"));
237            }
238            return result;
239        }
240
241        /// <summary>
242        /// When a P2PJobAdmin is ready to get a new JobPart for processing,
243        /// it have to send a "Free Worker"-Status-Message. This method returns
244        /// a DistributableJob-specific message.
245        /// </summary>
246        /// <param name="free">true, if worker is ready for processing new, incoming JobParts.</param>
247        /// <returns></returns>
248        public static byte[] CreateFreeWorkerStatusMessage(bool free)
249        {
250            byte[] retValue = new byte[2];
251            retValue[0] = (byte)MessageJobType.Free;
252            if (free)
253                retValue[1] = 1;
254            else
255                retValue[1] = 0;
256            return retValue;
257        }
258
259        /// <summary>
260        /// When a P2PJobAdmin is ready for processing new incoming JobParts
261        /// it sends a "Free Worker"-Status-Message. This method returns true
262        /// when Worker is ready (so send a new JobPart to it), otherwise false
263        /// </summary>
264        /// <param name="msg">the whole, DistributableJob-specific message</param>
265        /// <returns></returns>
266        public static bool GetFreeWorkerStatusMessage(byte[] msg)
267        {
268            bool retValue = false;
269            if ((MessageJobType)msg[0] == MessageJobType.Free && msg.Length == 2)
270            {
271                if (msg[1] == 0)
272                    retValue = false;
273                else
274                    retValue = true;
275            }
276            return retValue;
277        }
278
279        // TODO: Create NoMoreJobsLeft Msg
280
281        /// <summary>
282        /// When the Manager has no more jobs left, it should send a "no more
283        /// jobs left" message to all job-requesting Workers.
284        /// </summary>
285        /// <returns>serialized "no more jobs left" message</returns>
286        public static byte[] CreateNoMoreJobsLeftMessage()
287        {
288            byte[] retValue = new byte[2];
289            retValue[0] = (byte)MessageJobType.NoMoreJobsLeft;
290            retValue[1] = 1;
291            return retValue;
292        }
293
294        /// <summary>
295        /// Only workers can receive this message type. Manager sent
296        /// this Msg, when it has no more jobs left to allocate.
297        /// </summary>
298        /// <param name="msg"></param>
299        /// <returns></returns>
300        public static bool GetNoMoreJobsLeftMessage(byte[] msg)
301        {
302            bool retValue = false;
303            if ((MessageJobType)msg[0] == MessageJobType.NoMoreJobsLeft && msg.Length == 2)
304            {
305                if (msg[1] == 1)
306                    retValue = true;
307            }
308            return retValue;
309        }
310
311        #region (De-)Serialization of JobId
312
313        public static byte[] SerializeJobId(BigInteger jobId)
314        {
315            byte[] resultByte = null;
316            if (jobId != null)
317            {
318                // Note, there is a Bug in BigInt: BigInt b = 256; => b.dataLength = 1 -- it should be 2!
319                // As a workarround rely on getBytes().Length (the null bytes for the BigInt 0 should be fixed now)
320                byte[] jobIdBytes = jobId.getBytes();
321                byte[] jobIdBytesLen = BitConverter.GetBytes(jobIdBytes.Length);
322             
323                resultByte = new byte[jobIdBytes.Length + jobIdBytesLen.Length];
324               
325                Buffer.BlockCopy(jobIdBytesLen, 0, resultByte, 0, jobIdBytesLen.Length);
326                Buffer.BlockCopy(jobIdBytes, 0, resultByte, jobIdBytesLen.Length, jobIdBytes.Length);
327            }
328            return resultByte;
329        }
330
331        /// <summary>
332        /// Deserialized a jobId from any byte[] array. Requirement: The byte[] has
333        /// to start with a four byte (int32) JobId-Length-Information and have than enough
334        /// bytes left to deserialize the BigInteger Value. If any bytes left after
335        /// deserializing the JobId, the out value will specify this amount.
336        /// </summary>
337        /// <param name="serializedJobId">The byte[] has to start with a four bytes (int32)
338        /// JobId-Length-Information and have than enough bytes left to deserialize
339        /// the BigInteger Value</param>
340        /// <param name="bytesLeft">If any bytes left after
341        /// deserializing the JobId, the out value will specify this amount.</param>
342        /// <returns></returns>
343        public static BigInteger DeserializeJobId(byte[] serializedJobId, out int bytesLeft)
344        {
345            // byte length of Int32
346            int iInt32 = 4;
347
348            BigInteger result = null;
349            bytesLeft = serializedJobId.Length;
350            if (serializedJobId != null && serializedJobId.Length > iInt32)
351            {
352                byte[] bigIntByteLen = new byte[iInt32];
353                Buffer.BlockCopy(serializedJobId, 0, bigIntByteLen, 0, bigIntByteLen.Length);
354                int bigIntLen = BitConverter.ToInt32(bigIntByteLen, 0);
355                byte[] bigIntByte = new byte[bigIntLen];
356                Buffer.BlockCopy(serializedJobId, bigIntByteLen.Length, bigIntByte, 0, bigIntByte.Length);
357                result = new BigInteger(bigIntByte, bigIntByte.Length);
358                bytesLeft = serializedJobId.Length - iInt32 - bigIntByte.Length;
359            }
360            return result;
361        }
362#endregion
363    }
364}
Note: See TracBrowser for help on using the repository browser.