source: trunk/CrypPlugins/PeerToPeerPublisher/SubscribersManagement.cs @ 1137

Last change on this file since 1137 was 1137, checked in by arnold, 12 years ago

Completely redesigned Manager-JobAdmin-Worker-infrastructure to distribute Jobs with a Peer-to-Peer infrastructure to remote CT2-Workspaces.

To test this infrastructure, open 2 instances of CT and load P2P_Manager_NEW_DES.cte and in the other instance P2P_Worker_NEW.cte.
HINT: Working with remote peers isn't possible every time, because the so called "SuperNode", which is necessary for relaying, sometimes goes down. But testing this infrastructure on different computers in the same network should work every time.

File size: 10.2 KB
Line 
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5using System.Collections;
6using System.IO;
7using Cryptool.PluginBase.Control;
8
9namespace Cryptool.Plugins.PeerToPeer
10{
11    public class SubscriberManagement
12    {
13        public delegate void SubscriberRemoved(PeerId peerId);
14        /// <summary>
15        /// this event is thrown when a subscriber/worker has been removed from managing list
16        /// </summary>
17        public event SubscriberRemoved OnSubscriberRemoved;
18
19        private DateTime dateTimeNow;
20        /// <summary>
21        /// contains all active subscribers
22        /// </summary>
23        private Dictionary<PeerId, DateTime> checkList;
24        /// <summary>
25        /// when a peer is in this list, it will be deleted on the next vitality check
26        /// </summary>
27        private HashSet<PeerId> secondChanceList;
28
29        private long expirationTime;
30        /// <summary>
31        /// Timespan in which subscriber gets marked secondChance first and twice is removed from Subscriber list.
32        /// Latency of 3 seconds will be added because of network latency.
33        /// </summary>
34        public long ExpirationTime
35        {
36            get { return this.expirationTime + 3000;  }
37            set { this.expirationTime = value; } 
38        }
39
40        /// <summary>
41        ///
42        /// </summary>
43        /// <param name="expirationTime">expiration Time of a subscriber in milliseconds</param>
44        public SubscriberManagement(long expirationTime)
45        {
46            this.dateTimeNow = new DateTime();
47            this.checkList = new Dictionary<PeerId, DateTime>();
48            this.secondChanceList = new HashSet<PeerId>();
49            this.ExpirationTime = expirationTime;
50        }
51
52        /// <summary>
53        /// Add a subscriber to the subscriber list if it doesn't already exist
54        /// </summary>
55        /// <param name="subscriberId">ID of the Subscriber</param>
56        /// <returns>true if subscriber wasn't in List and is added, otherwise false</returns>
57        public virtual bool Add(PeerId subscriberId)
58        {
59            bool retValue = false;
60            if (!this.checkList.ContainsKey(subscriberId))
61            {
62                this.dateTimeNow = DateTime.Now;
63                // locking checkList instead of activeSubsList, because all other functions work on checkList, not on activeSubsList
64                lock (this.checkList)
65                {
66                    this.checkList.Add(subscriberId, this.dateTimeNow);
67                    retValue = true;
68                }
69            }
70            return retValue;
71
72        }
73
74        /// <summary>
75        /// Updates the Timestamp of the given subscriber if it exists.
76        /// </summary>
77        /// <param name="subscriberId"></param>
78        /// <returns></returns>
79        public bool Update(PeerId subscriberId)
80        {
81            this.dateTimeNow = DateTime.Now;
82            if (this.checkList.ContainsKey(subscriberId))
83            {
84                this.checkList[subscriberId] = this.dateTimeNow;
85                // remove subscriber from this list, because it's updated now and hence alive!
86                if (this.secondChanceList.Contains(subscriberId))
87                    this.secondChanceList.Remove(subscriberId);
88                return true;
89            }
90            return false;
91        }
92
93        /// <summary>
94        /// Removes Subscriber/Worker from all managment lists
95        /// </summary>
96        /// <param name="subscriberId">ID of the removed subscriber/worker</param>
97        /// <returns></returns>
98        public virtual bool Remove(PeerId subscriberId)
99        {
100            bool result = false;
101            lock (this.checkList)
102            {
103                if (this.secondChanceList.Contains(subscriberId))
104                    this.secondChanceList.Remove(subscriberId);
105                if (this.checkList.ContainsKey(subscriberId))
106                {
107                    this.checkList.Remove(subscriberId);
108                    result = true;
109                }
110
111                if (result && OnSubscriberRemoved != null)
112                    OnSubscriberRemoved(subscriberId);
113            }
114            return result;
115        }
116
117        /// <summary>
118        /// Removes all Subscribers which are long-rated outdated (expiration time is considered).
119        /// The recently outdated subscribers will be added to the returned second chance list.
120        /// </summary>
121        /// <returns>all recently outdated subscribers</returns>
122        public List<PeerId> CheckVitality()
123        {
124            this.dateTimeNow = DateTime.Now;
125
126            List<PeerId> removeSubscribersFromDict = new List<PeerId>();
127
128            lock (this.checkList)
129            {
130                foreach (KeyValuePair<PeerId, DateTime> entry in this.checkList)
131                {
132                    DateTime valueWithExpirationTime = entry.Value.AddMilliseconds(ExpirationTime);
133
134                    // if time is expired AND the ID is already in the secondChanceList --> Add to remove list
135                    if (this.dateTimeNow > valueWithExpirationTime && secondChanceList.Contains(entry.Key))
136                    {
137                        removeSubscribersFromDict.Add(entry.Key);
138                    }
139                    else if (this.dateTimeNow > valueWithExpirationTime) //otherwise give a second chance
140                    {
141                        this.secondChanceList.Add(entry.Key);
142                    }
143                }
144            } //end lock(this.checkList)
145
146            foreach (PeerId removeSub in removeSubscribersFromDict)
147            {
148                Remove(removeSub);
149            }
150
151            return this.secondChanceList.ToList<PeerId>();
152        }
153
154        public List<PeerId> GetAllSubscribers()
155        {
156            return this.checkList.Keys.ToList<PeerId>();
157        }
158
159        public virtual void Dispose()
160        {
161            this.checkList.Clear();
162            this.checkList = null;
163            this.secondChanceList.Clear();
164            this.secondChanceList = null;
165        }
166
167        private Encoding enc = Encoding.UTF8;
168        /// <summary>
169        /// serializes only the active subscribers list,
170        /// either the second chance list or the timestamps
171        /// are nonrelevant, because after Deserializing
172        /// this stuff, the availablity is obsolete and have
173        /// to be additionally checked
174        /// </summary>
175        /// <returns></returns>
176        public virtual byte[] Serialize()
177        {
178            byte[] ret = null; 
179            lock (this.checkList)
180            {
181                if (this.checkList == null || this.checkList.Count == 0)
182                {
183                    return null;
184                }
185                List<PeerId> lstActivePeers = this.checkList.Keys.ToList<PeerId>();
186                using (MemoryStream memStream = new MemoryStream())
187                {
188                    // first write dataset count to list
189                    memStream.WriteByte(Convert.ToByte(lstActivePeers.Count));
190                    // than write every peer id as an byte array - first byte is the length of the id
191                    foreach (PeerId pid in lstActivePeers)
192                    {
193                        byte[] byPid = pid.ToByteArray();
194                        byte[] enhancedByte = new byte[byPid.Length + 1];
195                        //additional store PeerId length to ease reconstructing
196                        enhancedByte[0] = Convert.ToByte(byPid.Length);
197                        Buffer.BlockCopy(byPid, 0, enhancedByte, 1, byPid.Length);
198                        memStream.Write(enhancedByte, 0, enhancedByte.Length);
199                    }
200                    ret = memStream.ToArray();
201                }
202            }
203            return ret;
204        }
205
206        /// <summary>
207        /// Deserializes a Subscriber/Worker list and reconstructs the PeerIds. Returns
208        /// the deserialized PeerId, so you can ping the peers to check whether they are
209        /// available anymore.
210        /// </summary>
211        /// <param name="serializedPeerIds">the already serialized peerId byte-array</param>
212        /// <param name="p2pControl">a reference to the p2pControl to convert deserialized byte arrays to valid PeerIds</param>
213        /// <returns></returns>
214        public virtual List<PeerId> Deserialize(byte[] serializedPeerIds, ref IP2PControl p2pControl)
215        {
216            if (serializedPeerIds == null || serializedPeerIds.Length < 2)
217            {
218                throw (new Exception("Invalid byte[] input - deserialization not possible"));
219            }
220            List<PeerId> deserializedPeers = new List<PeerId>();
221
222            MemoryStream memStream = new MemoryStream(serializedPeerIds);
223            try
224            {
225                int peerIdAmount = Convert.ToInt32(memStream.ReadByte());
226                int peerIdLen, readResult;
227                PeerId pid;
228                for (int i = 0; i < peerIdAmount; i++)
229                {
230                    peerIdLen = Convert.ToInt32(memStream.ReadByte());
231                    byte[] byPeerId = new byte[peerIdLen];
232                    readResult = memStream.Read(byPeerId, 0, byPeerId.Length);
233                    if (readResult == 0)
234                        throw (new Exception("Deserialization process of the byte[] was canceled, because byte[] didn't achieve to the conventions."));
235                    // create a new PeerId Object and add it to the list
236                    pid = p2pControl.GetPeerID(byPeerId);
237                    //deserializedPeers.Add(byPeerId);
238                    deserializedPeers.Add(pid);
239                }
240            }
241            catch (Exception ex)
242            {
243                memStream.Flush();
244                memStream.Close();
245                memStream.Dispose();
246                deserializedPeers.Clear();
247                deserializedPeers = null;
248                throw new Exception("Deserialization process of byte[] was canceled, because byte[] didn't achieve to the conventions.", ex);
249            }
250            return deserializedPeers;
251        }
252    }
253}
Note: See TracBrowser for help on using the repository browser.