using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading; using Ionic.Crc; namespace Ionic.Zlib { public class ParallelDeflateOutputStream : Stream { public CompressionStrategy Strategy { get; private set; } public int MaxBufferPairs { get { return this._maxBufferPairs; } set { bool flag = value < 4; if (flag) { throw new ArgumentException("MaxBufferPairs", "Value must be 4 or greater."); } this._maxBufferPairs = value; } } public int BufferSize { get { return this._bufferSize; } set { bool flag = value < 1024; if (flag) { throw new ArgumentOutOfRangeException("BufferSize", "BufferSize must be greater than 1024 bytes"); } this._bufferSize = value; } } public int Crc32 { get { return this._Crc32; } } public long BytesProcessed { get { return this._totalBytesProcessed; } } public override bool CanSeek { get { return false; } } public override bool CanRead { get { return false; } } public override bool CanWrite { get { return this._outStream.CanWrite; } } public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { return this._outStream.Position; } set { throw new NotSupportedException(); } } private static readonly int IO_BUFFER_SIZE_DEFAULT = 65536; private static readonly int BufferPairsPerCore = 4; private List _pool; private bool _leaveOpen; private bool emitting; private Stream _outStream; private int _maxBufferPairs; private int _bufferSize = ParallelDeflateOutputStream.IO_BUFFER_SIZE_DEFAULT; private AutoResetEvent _newlyCompressedBlob; private object _outputLock = new object(); private bool _isClosed; private bool _firstWriteDone; private int _currentlyFilling; private int _lastFilled; private int _lastWritten; private int _latestCompressed; private int _Crc32; private CRC32 _runningCrc; private object _latestLock = new object(); private Queue _toWrite; private Queue _toFill; private long _totalBytesProcessed; private CompressionLevel _compressLevel; private volatile Exception _pendingException; private bool _handlingException; private object _eLock = new object(); private ParallelDeflateOutputStream.TraceBits _DesiredTrace = ParallelDeflateOutputStream.TraceBits.EmitLock | ParallelDeflateOutputStream.TraceBits.EmitEnter | ParallelDeflateOutputStream.TraceBits.EmitBegin | ParallelDeflateOutputStream.TraceBits.EmitDone | ParallelDeflateOutputStream.TraceBits.EmitSkip | ParallelDeflateOutputStream.TraceBits.Session | ParallelDeflateOutputStream.TraceBits.Compress | ParallelDeflateOutputStream.TraceBits.WriteEnter | ParallelDeflateOutputStream.TraceBits.WriteTake; [Flags] private enum TraceBits : uint { None = 0u, NotUsed1 = 1u, EmitLock = 2u, EmitEnter = 4u, EmitBegin = 8u, EmitDone = 16u, EmitSkip = 32u, EmitAll = 58u, Flush = 64u, Lifecycle = 128u, Session = 256u, Synch = 512u, Instance = 1024u, Compress = 2048u, Write = 4096u, WriteEnter = 8192u, WriteTake = 16384u, All = 4294967295u } public ParallelDeflateOutputStream(Stream stream) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false) { } public ParallelDeflateOutputStream(Stream stream, CompressionLevel level) : this(stream, level, CompressionStrategy.Default, false) { } public ParallelDeflateOutputStream(Stream stream, bool leaveOpen) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) { } public ParallelDeflateOutputStream(Stream stream, CompressionLevel level, bool leaveOpen) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) { } public ParallelDeflateOutputStream(Stream stream, CompressionLevel level, CompressionStrategy strategy, bool leaveOpen) { this._outStream = stream; this._compressLevel = level; this.Strategy = strategy; this._leaveOpen = leaveOpen; this.MaxBufferPairs = 16; } private void _InitializePoolOfWorkItems() { this._toWrite = new Queue(); this._toFill = new Queue(); this._pool = new List(); int num = ParallelDeflateOutputStream.BufferPairsPerCore * Environment.ProcessorCount; num = Math.Min(num, this._maxBufferPairs); for (int i = 0; i < num; i++) { this._pool.Add(new WorkItem(this._bufferSize, this._compressLevel, this.Strategy, i)); this._toFill.Enqueue(i); } this._newlyCompressedBlob = new AutoResetEvent(false); this._runningCrc = new CRC32(); this._currentlyFilling = -1; this._lastFilled = -1; this._lastWritten = -1; this._latestCompressed = -1; } public override void Write(byte[] buffer, int offset, int count) { bool mustWait = false; bool isClosed = this._isClosed; if (isClosed) { throw new InvalidOperationException(); } bool flag = this._pendingException != null; if (flag) { this._handlingException = true; Exception pendingException = this._pendingException; this._pendingException = null; throw pendingException; } bool flag2 = count == 0; if (!flag2) { bool flag3 = !this._firstWriteDone; if (flag3) { this._InitializePoolOfWorkItems(); this._firstWriteDone = true; } for (;;) { this.EmitPendingBuffers(false, mustWait); mustWait = false; bool flag4 = this._currentlyFilling >= 0; int num; if (flag4) { num = this._currentlyFilling; goto IL_D2; } bool flag5 = this._toFill.Count == 0; if (!flag5) { num = this._toFill.Dequeue(); this._lastFilled++; goto IL_D2; } mustWait = true; IL_1A9: if (count <= 0) { return; } continue; IL_D2: WorkItem workItem = this._pool[num]; int num2 = (workItem.buffer.Length - workItem.inputBytesAvailable > count) ? count : (workItem.buffer.Length - workItem.inputBytesAvailable); workItem.ordinal = this._lastFilled; Buffer.BlockCopy(buffer, offset, workItem.buffer, workItem.inputBytesAvailable, num2); count -= num2; offset += num2; workItem.inputBytesAvailable += num2; bool flag6 = workItem.inputBytesAvailable == workItem.buffer.Length; if (flag6) { bool flag7 = !ThreadPool.QueueUserWorkItem(new WaitCallback(this._DeflateOne), workItem); if (flag7) { break; } this._currentlyFilling = -1; } else { this._currentlyFilling = num; } bool flag8 = count > 0; if (flag8) { } goto IL_1A9; } throw new Exception("Cannot enqueue workitem"); } } private void _FlushFinish() { byte[] array = new byte[128]; ZlibCodec zlibCodec = new ZlibCodec(); int num = zlibCodec.InitializeDeflate(this._compressLevel, false); zlibCodec.InputBuffer = null; zlibCodec.NextIn = 0; zlibCodec.AvailableBytesIn = 0; zlibCodec.OutputBuffer = array; zlibCodec.NextOut = 0; zlibCodec.AvailableBytesOut = array.Length; num = zlibCodec.Deflate(FlushType.Finish); bool flag = num != 1 && num != 0; if (flag) { throw new Exception("deflating: " + zlibCodec.Message); } bool flag2 = array.Length - zlibCodec.AvailableBytesOut > 0; if (flag2) { this._outStream.Write(array, 0, array.Length - zlibCodec.AvailableBytesOut); } zlibCodec.EndDeflate(); this._Crc32 = this._runningCrc.Crc32Result; } private void _Flush(bool lastInput) { bool isClosed = this._isClosed; if (isClosed) { throw new InvalidOperationException(); } bool flag = this.emitting; if (!flag) { bool flag2 = this._currentlyFilling >= 0; if (flag2) { WorkItem wi = this._pool[this._currentlyFilling]; this._DeflateOne(wi); this._currentlyFilling = -1; } if (lastInput) { this.EmitPendingBuffers(true, false); this._FlushFinish(); } else { this.EmitPendingBuffers(false, false); } } } public override void Flush() { bool flag = this._pendingException != null; if (flag) { this._handlingException = true; Exception pendingException = this._pendingException; this._pendingException = null; throw pendingException; } bool handlingException = this._handlingException; if (!handlingException) { this._Flush(false); } } public override void Close() { bool flag = this._pendingException != null; if (flag) { this._handlingException = true; Exception pendingException = this._pendingException; this._pendingException = null; throw pendingException; } bool handlingException = this._handlingException; if (!handlingException) { bool isClosed = this._isClosed; if (!isClosed) { this._Flush(true); bool flag2 = !this._leaveOpen; if (flag2) { this._outStream.Close(); } this._isClosed = true; } } } public new void Dispose() { this.Close(); this._pool = null; this.Dispose(true); } protected override void Dispose(bool disposing) { base.Dispose(disposing); } public void Reset(Stream stream) { bool flag = !this._firstWriteDone; if (!flag) { this._toWrite.Clear(); this._toFill.Clear(); foreach (WorkItem workItem in this._pool) { this._toFill.Enqueue(workItem.index); workItem.ordinal = -1; } this._firstWriteDone = false; this._totalBytesProcessed = 0L; this._runningCrc = new CRC32(); this._isClosed = false; this._currentlyFilling = -1; this._lastFilled = -1; this._lastWritten = -1; this._latestCompressed = -1; this._outStream = stream; } } private void EmitPendingBuffers(bool doAll, bool mustWait) { bool flag = this.emitting; if (!flag) { this.emitting = true; bool flag2 = doAll || mustWait; if (flag2) { this._newlyCompressedBlob.WaitOne(); } do { int num = -1; int num2 = doAll ? 200 : (mustWait ? -1 : 0); int num3 = -1; for (;;) { bool flag3 = Monitor.TryEnter(this._toWrite, num2); if (flag3) { num3 = -1; try { bool flag4 = this._toWrite.Count > 0; if (flag4) { num3 = this._toWrite.Dequeue(); } } finally { Monitor.Exit(this._toWrite); } bool flag5 = num3 >= 0; if (flag5) { WorkItem workItem = this._pool[num3]; bool flag6 = workItem.ordinal != this._lastWritten + 1; if (flag6) { Queue toWrite = this._toWrite; lock (toWrite) { this._toWrite.Enqueue(num3); } bool flag7 = num == num3; if (flag7) { this._newlyCompressedBlob.WaitOne(); num = -1; } else { bool flag8 = num == -1; if (flag8) { num = num3; } } } else { num = -1; this._outStream.Write(workItem.compressed, 0, workItem.compressedBytesAvailable); this._runningCrc.Combine(workItem.crc, workItem.inputBytesAvailable); this._totalBytesProcessed += (long)workItem.inputBytesAvailable; workItem.inputBytesAvailable = 0; this._lastWritten = workItem.ordinal; this._toFill.Enqueue(workItem.index); bool flag9 = num2 == -1; if (flag9) { num2 = 0; } } } } else { num3 = -1; } IL_1AE: if (num3 < 0) { break; } continue; goto IL_1AE; } } while (doAll && this._lastWritten != this._latestCompressed); this.emitting = false; } } private void _DeflateOne(object wi) { WorkItem workItem = (WorkItem)wi; try { int index = workItem.index; CRC32 crc = new CRC32(); crc.SlurpBlock(workItem.buffer, 0, workItem.inputBytesAvailable); this.DeflateOneSegment(workItem); workItem.crc = crc.Crc32Result; object latestLock = this._latestLock; lock (latestLock) { bool flag = workItem.ordinal > this._latestCompressed; if (flag) { this._latestCompressed = workItem.ordinal; } } Queue toWrite = this._toWrite; lock (toWrite) { this._toWrite.Enqueue(workItem.index); } this._newlyCompressedBlob.Set(); } catch (Exception pendingException) { object eLock = this._eLock; lock (eLock) { bool flag2 = this._pendingException != null; if (flag2) { this._pendingException = pendingException; } } } } private bool DeflateOneSegment(WorkItem workitem) { ZlibCodec compressor = workitem.compressor; compressor.ResetDeflate(true); compressor.NextIn = 0; compressor.AvailableBytesIn = workitem.inputBytesAvailable; compressor.NextOut = 0; compressor.AvailableBytesOut = workitem.compressed.Length; do { compressor.Deflate(FlushType.None); } while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0); int num = compressor.Deflate(FlushType.Sync); workitem.compressedBytesAvailable = (int)compressor.TotalBytesOut; return true; } [Conditional("Trace")] private void TraceOutput(ParallelDeflateOutputStream.TraceBits bits, string format, params object[] varParams) { bool flag = (bits & this._DesiredTrace) > ParallelDeflateOutputStream.TraceBits.None; if (flag) { object outputLock = this._outputLock; lock (outputLock) { int hashCode = Thread.CurrentThread.GetHashCode(); } } } public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } } }