Changeset 1091 for trunk/CrypPluginBase


Ignore:
Timestamp:
Jan 19, 2010, 1:15:25 PM (12 years ago)
Author:
Matthäus Wander
Message:

finished CStreamWriter and Reader

Location:
trunk/CrypPluginBase/IO
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/CrypPluginBase/IO/CStreamReader.cs

    r1085 r1091  
    2121using System.IO;
    2222using System.Threading;
     23using System.Diagnostics;
    2324
    2425namespace Cryptool.PluginBase.IO
    2526{
    26     public class CStreamReader : Stream
     27    /// <summary>
     28    /// Read from a corresponding CStreamWriter.
     29    /// </summary>
     30    public class CStream : Stream, IDisposable
    2731    {
     32        #region Private fields and constructors
     33
    2834        private readonly CStreamWriter _writer;
    2935
    3036        private FileStream _readStream;
    31         private int _readPtr = 0;
    32 
    33         public CStreamReader(CStreamWriter writer)
     37        private int _readPtr;
     38
     39        private bool _disposed;
     40
     41        public CStream(CStreamWriter writer)
    3442        {
    3543            _writer = writer;
     44            _writer.ShutdownEvent += shutdownHandler;
     45            _writer.SwapEvent += swapHandler;
    3646
    3747            if (_writer.IsSwapped)
    3848            {
    39                 SwapHandler();
    40             }
    41             else
    42             {
    43                 _writer.SwitchToSwapEvent += SwapHandler;
    44             }
    45         }
     49                swapHandler();
     50            }
     51        }
     52
     53        ~CStream()
     54        {
     55            Dispose(false);
     56        }
     57
     58        #endregion
     59
     60        #region Public properties
    4661
    4762        public override bool CanRead
     
    6075        }
    6176
    62         public override void Flush()
    63         {
    64             throw new NotSupportedException();
     77        public bool IsSwapped
     78        {
     79            get
     80            {
     81                return _readStream != null;
     82            }
    6583        }
    6684
     
    82100                    return _readPtr;
    83101            }
     102
     103            // Seeking currently not supported.
    84104            set
    85105            {
     
    88108        }
    89109
    90         private int availableRead()
    91         {
    92             long avail = _writer.Position - (IsSwapped ? _readStream.Position : _readPtr);
    93             return (int)Math.Min(int.MaxValue, avail);
    94         }
    95 
    96         /// <summary>
    97         /// Convenience method for Read(byte[], int, int)
     110        #endregion
     111
     112        #region Public methods
     113
     114        public override void Close()
     115        {
     116            Dispose();
     117        }
     118
     119        public new void Dispose()
     120        {
     121            if (!_disposed)
     122            {
     123                Dispose(true);
     124                GC.SuppressFinalize(this);
     125            }
     126        }
     127
     128        public override void Flush()
     129        {
     130            throw new NotSupportedException();
     131        }
     132
     133        /// <summary>
     134        /// Convenience method for Read(byte[] buf, 0, buf.Length)
    98135        /// </summary>
    99136        /// <param name="buffer"></param>
     
    105142
    106143        /// <summary>
    107         /// POSIX-like read: Reads 1 to count amount of bytes into given buffer.
    108         /// Blocks until at least 1 byte has been read. Does not guarantee to read the requested amount of data.
     144        /// Read POSIX-like 1 to count amount of bytes into given byte array.
     145        /// Blocks until at least 1 byte has been read or underlying stream has been closed.
     146        /// Does not guarantee to read the requested amount of data, can read less.
    109147        /// </summary>
    110148        /// <param name="buffer"></param>
    111149        /// <param name="offset"></param>
    112150        /// <param name="count"></param>
    113         /// <returns>amount of bytes that has been read into buffer</returns>
     151        /// <returns>amount of bytes that has been read into buffer or 0 if EOF</returns>
    114152        public override int Read(byte[] buffer, int offset, int count)
    115153        {
    116154            lock (_writer.InternalMonitor)
    117155            {
    118                 while (availableRead() < 1)
     156                int available;
     157
     158                while ((available = availableRead()) < 1)
    119159                {
     160                    if (_writer.IsClosed)
     161                        return 0; // EOF
     162
    120163                    Monitor.Wait(_writer.InternalMonitor);
    121164                }
    122165
    123                 int readAttempt = Math.Min(availableRead(), count);
     166                int readAttempt = Math.Min(available, count);
    124167
    125168                if (IsSwapped)
    126169                {
    127170                    // MUST NOT block, otherwise we're potentially deadlocked
     171                    Debug.Assert(_writer.Length - _readStream.Position > 0);
    128172                    return _readStream.Read(buffer, offset, readAttempt);
    129173                }
     
    138182        }
    139183
     184        /// <summary>
     185        /// Convenience method for Read (non-POSIX): block until array is full or EOF occurs.
     186        /// </summary>
     187        public int ReadFully(byte[] buffer)
     188        {
     189            return ReadFully(buffer, 0, buffer.Length);
     190        }
     191
     192        /// <summary>
     193        /// Convenience method for Read (non-POSIX): block until required amount of data has
     194        /// been retrieved or EOF occurs.
     195        /// </summary>
    140196        public int ReadFully(byte[] buffer, int offset, int count)
    141197        {
    142             throw new NotSupportedException();
    143         }
    144 
    145         // seeking is currently not supported
     198            int readSum = 0;
     199            while (readSum < count)
     200            {
     201                int read = Read(buffer, offset, (count - readSum));
     202               
     203                if (read == 0) // EOF
     204                    return readSum;
     205
     206                readSum += read;
     207            }
     208
     209            return readSum;
     210        }
     211
     212        /// <summary>
     213        /// Seeking is currently not supported
     214        /// </summary>
    146215        public override long Seek(long offset, SeekOrigin origin)
    147216        {
     
    154223        }
    155224
     225        /// <summary>
     226        /// Reader can't write.
     227        /// </summary>
    156228        public override void Write(byte[] buffer, int offset, int count)
    157229        {
     
    159231        }
    160232
    161         public bool IsSwapped
    162         {
    163             get
    164             {
    165                 return _readStream != null;
    166             }
    167         }
    168 
    169         private void SwapHandler()
     233        #endregion
     234
     235        #region Private/protected methods
     236
     237        private int availableRead()
     238        {
     239            long avail = _writer.Position - (IsSwapped ? _readStream.Position : _readPtr);
     240            return (int)Math.Min(int.MaxValue, avail);
     241        }
     242
     243        /// <summary>
     244        /// Switch from membuff to swapfile
     245        /// </summary>
     246        private void swapHandler()
    170247        {
    171248            _readStream = new FileStream(_writer.FilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
     
    175252            }
    176253        }
     254
     255        /// <summary>
     256        /// Writer shutting down, release file.
     257        /// </summary>
     258        private void shutdownHandler()
     259        {
     260            if (IsSwapped)
     261            {
     262                _readStream.Close();
     263            }
     264        }
     265
     266        protected override void Dispose(bool disposing)
     267        {
     268            if (disposing)
     269            {
     270                base.Dispose(disposing);
     271            }
     272
     273            if (IsSwapped)
     274            {
     275                _readStream.Close();
     276                _readStream = null;
     277            }
     278
     279            _writer.ShutdownEvent -= shutdownHandler;
     280            _writer.SwapEvent -= swapHandler;
     281
     282            _disposed = true;
     283        }
     284
     285        #endregion
    177286    }
    178287}
  • trunk/CrypPluginBase/IO/CStreamWriter.cs

    r1085 r1091  
    2121using System.IO;
    2222using System.Threading;
     23using System.Diagnostics;
     24using System.Runtime.ConstrainedExecution;
    2325
    2426namespace Cryptool.PluginBase.IO
    2527{
    26     public class CStreamWriter : Stream
     28    /// <summary>
     29    /// Create a stream to pass data with arbitrary size to another CT2 plugin.
     30    /// The stream is internally backed by a non-cyclic memory buffer and switches automagically to a
     31    /// temporary file if the membuff exceeds a certain size. Please note that the buffer does not
     32    /// forget old data, therefore you can derive an arbitary number of stream readers at any time.
     33    ///
     34    /// You MAY CreateReader() and pass it even if the writing has not been finished yet.
     35    /// You SHOULD Flush() the stream when you're writing large data amounts and expect the readers
     36    /// to perform intermediate processing before writing has been finished.
     37    /// You MUST Close() the stream when you're finished with writing, otherwise the reader will block
     38    /// and wait for more data infinitely.
     39    ///
     40    /// You SHOULD Dispose() the stream when you're done using it (or use the C# "using" keyword) in
     41    /// order to remove the temporary swapfile, however if you forget to, the GC finalizer will clean
     42    /// up for you.
     43    /// </public>
     44    public class CStreamWriter : Stream, IDisposable
    2745    {
    2846        #region Private fields and constructors
    2947
    3048        private readonly object _monitor;
    31 
     49        private bool _closed;
     50        private bool _disposed;
     51
     52        // membuff
    3253        private byte[] _buffer;
     54        private int _bufPtr;
     55
     56        // swapfile
    3357        private FileStream _writeStream;
    34         private string _writePath;
    35         private int _writePtr;
    36 
    37         // Need length information of closed file streams.
    38         private bool _closed;
    39         private long _closedLength;
     58        private string _filePath;
     59        private long _closedFileLength;
    4060
    4161        /// <summary>
     
    5878        ~CStreamWriter()
    5979        {
    60             if (IsSwapped)
    61             {
    62                 _writeStream.Close();
    63                 File.Delete(_writePath);
    64             }
     80            Dispose(false);
    6581        }
    6682
     
    85101
    86102        /// <summary>
     103        /// File path of swapfile (if any).
     104        /// </summary>
     105        public string FilePath
     106        {
     107            get { return _filePath; }
     108        }
     109
     110        /// <summary>
    87111        /// Returns whether the underlying buffer is swapped out to filesystem or not.
    88112        /// </summary>
     
    101125                if (IsSwapped)
    102126                {
    103                     return _closed ? _closedLength : _writeStream.Length;
     127                    return _closed ? _closedFileLength : _writeStream.Length;
    104128                }
    105129                else
    106130                {
    107                     return _writePtr;
     131                    return _bufPtr;
    108132                }
    109133            }
     
    116140                if (IsSwapped)
    117141                {
    118                     return _closed ? _closedLength : _writeStream.Position;
     142                    return _closed ? _closedFileLength : _writeStream.Position;
    119143                }
    120144                else
    121145                {
    122                     return _writePtr;
    123                 }
    124             }
    125 
    126             // can not seek
     146                    return _bufPtr;
     147                }
     148            }
     149
     150            // writer can not seek
    127151            set { throw new NotSupportedException(); }
    128152        }
     
    132156        #region Public methods
    133157
     158        /// <summary>
     159        /// You MUST call Close() when you're done writing or the readers will be stuck in an infinite loop.
     160        ///
     161        /// Please note: Contrary to the API description of Stream.Close() this method DOES NOT release all
     162        /// resources associated to this CStream. Call Dispose() to release resources or let the GC call
     163        /// the finalizer.
     164        /// </summary>
    134165        public override void Close()
    135166        {
     167            /*
     168             * Note 1: We're NOT following the advice of implementing cleanup code in Dispose() without
     169             * touching Close(), because we explicitly want to mark a stream as closed without disposing
     170             * the underlying mem/file buffer.
     171             *
     172             * Note 2: There is no call to base.Close() as it would suppress the finalization.
     173             */
     174
    136175            lock (_monitor)
    137176            {
    138                 base.Close();
     177                // do nothing if already closed
     178                if (_closed)
     179                    return;
     180
     181                _closed = true;
    139182
    140183                if (IsSwapped)
    141184                {
    142                     _closed = true;
    143                     _closedLength = _writeStream.Length;
     185                    _closedFileLength = _writeStream.Length;
    144186                    _writeStream.Close();
    145187                }
     
    149191        }
    150192
    151         public CStreamReader CreateReader()
    152         {
    153             return new CStreamReader(this);
     193        /// <summary>
     194        /// Create a new instance to read from this CStream.
     195        /// </summary>
     196        /// <returns></returns>
     197        public CStream CreateReader()
     198        {
     199            return new CStream(this);
     200        }
     201
     202        public void Dispose()
     203        {
     204            if (!_disposed)
     205            {
     206                Dispose(true);
     207                GC.SuppressFinalize(this);
     208            }
    154209        }
    155210
     
    169224
    170225        /// <summary>
    171         /// can not read, get CStreamReader instead
     226        /// Can not read, use CStream instead.
    172227        /// </summary>
    173228        public override int Read(byte[] buffer, int offset, int count)
     
    176231        }
    177232
    178         // can not seek
     233        /// <summary>
     234        /// Writer can not seek.
     235        /// </summary>
     236        /// <param name="offset"></param>
     237        /// <param name="origin"></param>
     238        /// <returns></returns>
    179239        public override long Seek(long offset, SeekOrigin origin)
    180240        {
     
    182242        }
    183243
    184         // can not seek
     244        /// <summary>
     245        /// Writer can not seek.
     246        /// </summary>
     247        /// <param name="value"></param>
    185248        public override void SetLength(long value)
    186249        {
     
    189252
    190253        /// <summary>
    191         /// convenience method for Write(byte[], int, int)
     254        /// Convenience method for Write(byte[] buf, 0, buf.Length)
    192255        /// </summary>
    193256        /// <param name="buf"></param>
     
    198261
    199262        /// <summary>
    200         /// write to mem/file buffer (switches transparently if required)
     263        /// Write to mem/file buffer (switches transparently if required)
    201264        /// </summary>
    202265        public override void Write(byte[] buf, int offset, int count)
    203266        {
     267            if (_closed || _disposed)
     268                throw new InvalidOperationException("Can't write, CStream already closed/disposed");
     269
    204270            lock(_monitor)
    205271            {
    206272                if (!IsSwapped && hasMemorySpace(count))
    207273                {
    208                     Array.Copy(buf, offset, _buffer, _writePtr, count);
    209                     _writePtr += count;
     274                    Array.Copy(buf, offset, _buffer, _bufPtr, count);
     275                    _bufPtr += count;
    210276                }
    211277                else
     
    214280                    {
    215281                        createSwapFile();
    216                         _writeStream.Write(_buffer, 0, _writePtr);
    217                         _writeStream.Flush(); // required before announcing swap event
     282                        _writeStream.Write(_buffer, 0, _bufPtr);
     283                        _writeStream.Flush(); // ensure reader can seek before announcing swap event
    218284                        _buffer = null;
    219285
    220                         if (SwitchToSwapEvent != null)
    221                             SwitchToSwapEvent();
     286                        if (SwapEvent != null)
     287                            SwapEvent();
    222288                    }
    223289
     
    231297        #endregion
    232298
    233         #region Private methods
     299        #region Private/protected methods
     300
     301        protected override void Dispose(bool disposing)
     302        {
     303            if (disposing)
     304            {
     305                base.Dispose(disposing);
     306            }
     307
     308            if (IsSwapped)
     309            {
     310                // Force readers to close file.
     311                if (ShutdownEvent != null)
     312                    ShutdownEvent();
     313
     314                // Remove swapfile
     315                _writeStream.Close();
     316                _writeStream = null;
     317                File.Delete(_filePath);
     318            }
     319
     320            ShutdownEvent = null;
     321            SwapEvent = null;
     322
     323            _disposed = true;
     324        }
    234325
    235326        private bool hasMemorySpace(int count)
    236327        {
    237             return _writePtr + count <= _buffer.Length;
     328            return _bufPtr + count <= _buffer.Length;
    238329        }
    239330
    240331        private void createSwapFile()
    241332        {
    242             _writePath = DirectoryHelper.GetNewTempFilePath();
    243             _writeStream = new FileStream(_writePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.Read);
     333            _filePath = DirectoryHelper.GetNewTempFilePath();
     334            _writeStream = new FileStream(_filePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.Read);
    244335        }
    245336
     
    253344        }
    254345
    255         internal delegate void UseSwapFileHandler();
    256 
    257         internal event UseSwapFileHandler SwitchToSwapEvent;
     346        internal delegate void ReaderCallback();
     347
     348        internal event ReaderCallback SwapEvent;
     349
     350        internal event ReaderCallback ShutdownEvent;
    258351
    259352        internal byte[] MemBuff
     
    262355        }
    263356
    264         internal string FilePath
    265         {
    266             get { return _writePath; }
    267         }
    268 
    269         #endregion
     357        internal bool IsClosed
     358        {
     359            get { return _closed; }
     360        }
     361
     362        #endregion
     363
    270364    }
    271365}
Note: See TracChangeset for help on using the changeset viewer.