Skip to content
This repository has been archived by the owner on May 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #19 from justcoding121/develop
Browse files Browse the repository at this point in the history
leaveopen parameter added
  • Loading branch information
honfika authored Mar 24, 2018
2 parents 3209a23 + 08701e5 commit a59ce90
Showing 1 changed file with 58 additions and 137 deletions.
195 changes: 58 additions & 137 deletions StreamExtended/Network/CustomBufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ namespace StreamExtended.Network
/// <seealso cref="System.IO.Stream" />
public class CustomBufferedStream : Stream, IBufferedStream
{
#if NET45
private AsyncCallback readCallback;
#endif

private readonly Stream baseStream;

private readonly bool leaveOpen;
private byte[] streamBuffer;

private readonly byte[] oneByteBuffer = new byte[1];
Expand All @@ -30,17 +26,18 @@ public class CustomBufferedStream : Stream, IBufferedStream

private bool disposed;

private bool closed;

/// <summary>
/// Initializes a new instance of the <see cref="CustomBufferedStream"/> class.
/// </summary>
/// <param name="baseStream">The base stream.</param>
/// <param name="bufferSize">Size of the buffer.</param>
public CustomBufferedStream(Stream baseStream, int bufferSize)
/// <param name="leaveOpen"><see langword="true" /> to leave the stream open after disposing the <see cref="T:CustomBufferedStream" /> object; otherwise, <see langword="false" />.</param>
public CustomBufferedStream(Stream baseStream, int bufferSize, bool leaveOpen = false)
{
#if NET45
readCallback = ReadCallback;
#endif
this.baseStream = baseStream;
this.leaveOpen = leaveOpen;
streamBuffer = BufferPool.GetBuffer(bufferSize);
}

Expand Down Expand Up @@ -116,61 +113,6 @@ public override void Write(byte[] buffer, int offset, int count)
baseStream.Write(buffer, offset, count);
}

#if NET45
/// <summary>
/// Begins an asynchronous read operation. (Consider using <see cref="M:System.IO.Stream.ReadAsync(System.Byte[],System.Int32,System.Int32)" /> instead; see the Remarks section.)
/// </summary>
/// <param name="buffer">The buffer to read the data into.</param>
/// <param name="offset">The byte offset in <paramref name="buffer" /> at which to begin writing data read from the stream.</param>
/// <param name="count">The maximum number of bytes to read.</param>
/// <param name="callback">An optional asynchronous callback, to be called when the read is complete.</param>
/// <param name="state">A user-provided object that distinguishes this particular asynchronous read request from other requests.</param>
/// <returns>
/// An <see cref="T:System.IAsyncResult" /> that represents the asynchronous read, which could still be pending.
/// </returns>
[DebuggerStepThrough]
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
if (bufferLength > 0)
{
int available = Math.Min(bufferLength, count);
Buffer.BlockCopy(streamBuffer, bufferPos, buffer, offset, available);
bufferPos += available;
bufferLength -= available;
return new ReadAsyncResult(buffer, offset, available, state, callback);
}

var result = new ReadAsyncResult(buffer, offset, 0, state, callback);
result.BaseResult = baseStream.BeginRead(buffer, offset, count, readCallback, result);
return result;
}

private void ReadCallback(IAsyncResult ar)
{
var readResult = (ReadAsyncResult)ar.AsyncState;
readResult.BaseResult = ar;
readResult.Callback(readResult);
}

/// <summary>
/// Begins an asynchronous write operation. (Consider using <see cref="M:System.IO.Stream.WriteAsync(System.Byte[],System.Int32,System.Int32)" /> instead; see the Remarks section.)
/// </summary>
/// <param name="buffer">The buffer to write data from.</param>
/// <param name="offset">The byte offset in <paramref name="buffer" /> from which to begin writing.</param>
/// <param name="count">The maximum number of bytes to write.</param>
/// <param name="callback">An optional asynchronous callback, to be called when the write is complete.</param>
/// <param name="state">A user-provided object that distinguishes this particular asynchronous write request from other requests.</param>
/// <returns>
/// An IAsyncResult that represents the asynchronous write, which could still be pending.
/// </returns>
[DebuggerStepThrough]
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
OnDataSent(buffer, offset, count);
return baseStream.BeginWrite(buffer, offset, count, callback, state);
}
#endif

/// <summary>
/// Asynchronously reads the bytes from the current stream and writes them to another stream, using a specified buffer size and cancellation token.
/// </summary>
Expand All @@ -191,35 +133,6 @@ public override async Task CopyToAsync(Stream destination, int bufferSize, Cance
await base.CopyToAsync(destination, bufferSize, cancellationToken);
}

#if NET45
/// <summary>
/// Waits for the pending asynchronous read to complete. (Consider using <see cref="M:System.IO.Stream.ReadAsync(System.Byte[],System.Int32,System.Int32)" /> instead; see the Remarks section.)
/// </summary>
/// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
/// <returns>
/// The number of bytes read from the stream, between zero (0) and the number of bytes you requested. Streams return zero (0) only at the end of the stream, otherwise, they should block until at least one byte is available.
/// </returns>
[DebuggerStepThrough]
public override int EndRead(IAsyncResult asyncResult)
{
var readResult = (ReadAsyncResult)asyncResult;
int result = readResult.BaseResult == null ? readResult.ReadBytes : baseStream.EndRead(readResult.BaseResult);

OnDataReceived(readResult.Buffer, readResult.Offset, result);
return result;
}

/// <summary>
/// Ends an asynchronous write operation. (Consider using <see cref="M:System.IO.Stream.WriteAsync(System.Byte[],System.Int32,System.Int32)" /> instead; see the Remarks section.)
/// </summary>
/// <param name="asyncResult">A reference to the outstanding asynchronous I/O request.</param>
[DebuggerStepThrough]
public override void EndWrite(IAsyncResult asyncResult)
{
baseStream.EndWrite(asyncResult);
}
#endif

/// <summary>
/// Asynchronously clears all buffers for this stream, causes any buffered data to be written to the underlying device, and monitors cancellation requests.
/// </summary>
Expand Down Expand Up @@ -389,13 +302,15 @@ protected override void Dispose(bool disposing)
if (!disposed)
{
disposed = true;
baseStream.Dispose();
closed = true;
if (!leaveOpen)
{
baseStream.Dispose();
}

var buffer = streamBuffer;
streamBuffer = null;
BufferPool.ReturnBuffer(buffer);
#if NET45
readCallback = null;
#endif
}
}

Expand Down Expand Up @@ -466,6 +381,11 @@ public override int WriteTimeout
/// </summary>
public bool FillBuffer()
{
if (closed)
{
return false;
}

if (bufferLength > 0)
{
//normally we fill the buffer only when it is empty, but sometimes we need more data
Expand All @@ -474,14 +394,27 @@ public bool FillBuffer()
}

bufferPos = 0;
int readBytes = baseStream.Read(streamBuffer, bufferLength, streamBuffer.Length - bufferLength);
if (readBytes > 0)
try
{
OnDataReceived(streamBuffer, bufferLength, readBytes);
bufferLength += readBytes;
int readBytes = baseStream.Read(streamBuffer, bufferLength, streamBuffer.Length - bufferLength);
bool result = readBytes > 0;
if (result)
{
OnDataReceived(streamBuffer, bufferLength, readBytes);
bufferLength += readBytes;
}
else
{
closed = true;
}

return result;
}
catch
{
closed = true;
return false;
}

return readBytes > 0;
}

/// <summary>
Expand All @@ -500,6 +433,11 @@ public Task<bool> FillBufferAsync()
/// <returns></returns>
public async Task<bool> FillBufferAsync(CancellationToken cancellationToken)
{
if (closed)
{
return false;
}

if (bufferLength > 0)
{
//normally we fill the buffer only when it is empty, but sometimes we need more data
Expand All @@ -514,43 +452,26 @@ public async Task<bool> FillBufferAsync(CancellationToken cancellationToken)
}

bufferPos = 0;
int readBytes = await baseStream.ReadAsync(streamBuffer, bufferLength, bytesToRead, cancellationToken);
if (readBytes > 0)
try
{
OnDataReceived(streamBuffer, bufferLength, readBytes);
bufferLength += readBytes;
int readBytes = await baseStream.ReadAsync(streamBuffer, bufferLength, bytesToRead, cancellationToken);
bool result = readBytes > 0;
if (result)
{
OnDataReceived(streamBuffer, bufferLength, readBytes);
bufferLength += readBytes;
}
else
{
closed = true;
}

return result;
}

return readBytes > 0;
}

private class ReadAsyncResult : IAsyncResult
{
public byte[] Buffer { get; }

public int Offset { get; }

public IAsyncResult BaseResult { get; set; }

public int ReadBytes { get; }

public object AsyncState { get; }

public AsyncCallback Callback { get; }

public bool IsCompleted => CompletedSynchronously || BaseResult.IsCompleted;

public WaitHandle AsyncWaitHandle => BaseResult?.AsyncWaitHandle;

public bool CompletedSynchronously => BaseResult == null || BaseResult.CompletedSynchronously;

public ReadAsyncResult(byte[] buffer, int offset, int readBytes, object state, AsyncCallback callback)
catch
{
Buffer = buffer;
Offset = offset;
ReadBytes = readBytes;
AsyncState = state;
Callback = callback;
closed = true;
return false;
}
}
}
Expand Down

0 comments on commit a59ce90

Please sign in to comment.