Skip to content
This repository has been archived by the owner on May 17, 2024. It is now read-only.

Commit

Permalink
CancellationTokens added
Browse files Browse the repository at this point in the history
  • Loading branch information
honfika committed Apr 14, 2018
1 parent 08701e5 commit 183b66d
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 75 deletions.
4 changes: 3 additions & 1 deletion StreamExtended/Network/ClientHelloAlpnAdderStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.IO;
using System.Threading;

namespace StreamExtended.Network
{
Expand Down Expand Up @@ -47,7 +48,8 @@ public override void Write(byte[] buffer, int offset, int count)
var ms = new MemoryStream(buffer, offset, count);

//this can be non async, because reads from a memory stream
var clientHello = SslTools.PeekClientHello(new CustomBufferedStream(ms, (int)ms.Length)).Result;
var cts = new CancellationTokenSource();
var clientHello = SslTools.PeekClientHello(new CustomBufferedStream(ms, (int)ms.Length), cts.Token).Result;
if (clientHello != null)
{
// 0x00 0x10: ALPN identifier
Expand Down
19 changes: 10 additions & 9 deletions StreamExtended/Network/CopyStream.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using StreamExtended.Helpers;

Expand Down Expand Up @@ -30,18 +31,18 @@ public CopyStream(CustomBinaryReader reader, CustomBinaryWriter writer, int buff
buffer = BufferPool.GetBuffer(bufferSize);
}

public async Task<bool> FillBufferAsync()
public async Task<bool> FillBufferAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await FlushAsync();
return await reader.FillBufferAsync();
await FlushAsync(cancellationToken);
return await reader.FillBufferAsync(cancellationToken);
}

public async Task FlushAsync()
public async Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
//send out the current data from from the buffer
if (bufferLength > 0)
{
await writer.WriteAsync(buffer, 0, bufferLength);
await writer.WriteAsync(buffer, 0, bufferLength, cancellationToken);
bufferLength = 0;
}
}
Expand All @@ -54,20 +55,20 @@ public byte ReadByteFromBuffer()
return b;
}

public async Task<int> ReadAsync(byte[] buffer, int offset, int count)
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
{
int result = await reader.ReadBytesAsync(buffer, offset, count);
int result = await reader.ReadBytesAsync(buffer, offset, count, cancellationToken);
if (result > 0)
{
if (bufferLength + result > bufferSize)
{
await FlushAsync();
await FlushAsync(cancellationToken);
}

Buffer.BlockCopy(buffer, offset, this.buffer, bufferLength, result);
bufferLength += result;
ReadBytes += result;
await FlushAsync();
await FlushAsync(cancellationToken);
}

return result;
Expand Down
27 changes: 15 additions & 12 deletions StreamExtended/Network/CustomBinaryReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace StreamExtended.Network
Expand Down Expand Up @@ -33,7 +34,7 @@ public CustomBinaryReader(IBufferedStream stream, int bufferSize)
/// Read a line from the byte stream
/// </summary>
/// <returns></returns>
public async Task<string> ReadLineAsync()
public async Task<string> ReadLineAsync(CancellationToken cancellationToken = default(CancellationToken))
{
byte lastChar = default(byte);

Expand All @@ -42,7 +43,7 @@ public async Task<string> ReadLineAsync()
// try to use the thread static buffer, usually it is enough
var buffer = Buffer;

while (stream.DataAvailable || await stream.FillBufferAsync())
while (stream.DataAvailable || await stream.FillBufferAsync(cancellationToken))
{
byte newChar = stream.ReadByteFromBuffer();
buffer[bufferDataLength] = newChar;
Expand Down Expand Up @@ -87,11 +88,11 @@ public async Task<string> ReadLineAsync()
/// Read until the last new line
/// </summary>
/// <returns></returns>
public async Task<List<string>> ReadAllLinesAsync()
public async Task<List<string>> ReadAllLinesAsync(CancellationToken cancellationToken = default(CancellationToken))
{
string tmpLine;
var requestLines = new List<string>();
while (!string.IsNullOrEmpty(tmpLine = await ReadLineAsync()))
while (!string.IsNullOrEmpty(tmpLine = await ReadLineAsync(cancellationToken)))
{
requestLines.Add(tmpLine);
}
Expand All @@ -103,9 +104,9 @@ public async Task<List<string>> ReadAllLinesAsync()
/// Read until the last new line, ignores the result
/// </summary>
/// <returns></returns>
public async Task ReadAndIgnoreAllLinesAsync()
public async Task ReadAndIgnoreAllLinesAsync(CancellationToken cancellationToken = default(CancellationToken))
{
while (!string.IsNullOrEmpty(await ReadLineAsync()))
while (!string.IsNullOrEmpty(await ReadLineAsync(cancellationToken)))
{
}
}
Expand All @@ -115,10 +116,11 @@ public async Task ReadAndIgnoreAllLinesAsync()
/// </summary>
/// <param name="buffer"></param>
/// <param name="bytesToRead"></param>
/// <param name="cancellationToken"></param>
/// <returns>The number of bytes read</returns>
public Task<int> ReadBytesAsync(byte[] buffer, int bytesToRead)
public Task<int> ReadBytesAsync(byte[] buffer, int bytesToRead, CancellationToken cancellationToken = default(CancellationToken))
{
return stream.ReadAsync(buffer, 0, bytesToRead);
return stream.ReadAsync(buffer, 0, bytesToRead, cancellationToken);
}

/// <summary>
Expand All @@ -127,10 +129,11 @@ public Task<int> ReadBytesAsync(byte[] buffer, int bytesToRead)
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="bytesToRead"></param>
/// <param name="cancellationToken"></param>
/// <returns>The number of bytes read</returns>
public Task<int> ReadBytesAsync(byte[] buffer, int offset, int bytesToRead)
public Task<int> ReadBytesAsync(byte[] buffer, int offset, int bytesToRead, CancellationToken cancellationToken = default(CancellationToken))
{
return stream.ReadAsync(buffer, offset, bytesToRead);
return stream.ReadAsync(buffer, offset, bytesToRead, cancellationToken);
}

public bool DataAvailable => stream.DataAvailable;
Expand All @@ -139,9 +142,9 @@ public Task<int> ReadBytesAsync(byte[] buffer, int offset, int bytesToRead)
/// Fills the buffer asynchronous.
/// </summary>
/// <returns></returns>
public Task<bool> FillBufferAsync()
public Task<bool> FillBufferAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return stream.FillBufferAsync();
return stream.FillBufferAsync(cancellationToken);
}

public byte ReadByteFromBuffer()
Expand Down
9 changes: 5 additions & 4 deletions StreamExtended/Network/CustomBinaryWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace StreamExtended.Network
Expand All @@ -12,14 +13,14 @@ protected CustomBinaryWriter(Stream stream)
this.stream = stream;
}

public Task WriteAsync(byte[] data, int offset, int count)
public Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
{
return stream.WriteAsync(data, offset, count);
return stream.WriteAsync(data, offset, count, cancellationToken);
}

protected Task FlushAsync()
protected Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return stream.FlushAsync();
return stream.FlushAsync(cancellationToken);
}
}
}
13 changes: 7 additions & 6 deletions StreamExtended/Network/CustomBufferedPeekStream.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace StreamExtended.Network
Expand All @@ -25,9 +26,9 @@ internal CustomBufferedPeekStream(CustomBufferedStream baseStream, int startPosi
/// </summary>
internal int Available => baseStream.Available - Position;

internal async Task<bool> EnsureBufferLength(int length)
internal async Task<bool> EnsureBufferLength(int length, CancellationToken cancellationToken)
{
var val = await baseStream.PeekByteAsync(Position + length - 1);
var val = await baseStream.PeekByteAsync(Position + length - 1, cancellationToken);
return val != -1;
}

Expand Down Expand Up @@ -66,9 +67,9 @@ internal byte[] ReadBytes(int length)
/// Fills the buffer asynchronous.
/// </summary>
/// <returns></returns>
Task<bool> IBufferedStream.FillBufferAsync()
Task<bool> IBufferedStream.FillBufferAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return baseStream.FillBufferAsync();
return baseStream.FillBufferAsync(cancellationToken);
}

/// <summary>
Expand All @@ -81,9 +82,9 @@ byte IBufferedStream.ReadByteFromBuffer()
return ReadByte();
}

Task<int> IBufferedStream.ReadAsync(byte[] buffer, int offset, int count)
Task<int> IBufferedStream.ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return baseStream.ReadAsync(buffer, offset, count);
return baseStream.ReadAsync(buffer, offset, count, cancellationToken);
}
}
}
24 changes: 8 additions & 16 deletions StreamExtended/Network/CustomBufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public override void Write(byte[] buffer, int offset, int count)
/// <returns>
/// A task that represents the asynchronous copy operation.
/// </returns>
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken = default(CancellationToken))
{
if (bufferLength > 0)
{
Expand All @@ -140,7 +140,7 @@ public override async Task CopyToAsync(Stream destination, int bufferSize, Cance
/// <returns>
/// A task that represents the asynchronous flush operation.
/// </returns>
public override Task FlushAsync(CancellationToken cancellationToken)
public override Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return baseStream.FlushAsync(cancellationToken);
}
Expand All @@ -165,7 +165,7 @@ public override Task FlushAsync(CancellationToken cancellationToken)
/// less than the requested number, or it can be 0 (zero)
/// if the end of the stream has been reached.
/// </returns>
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
{
if (bufferLength == 0)
{
Expand Down Expand Up @@ -209,12 +209,13 @@ public override int ReadByte()
/// Peeks a byte asynchronous.
/// </summary>
/// <param name="index">The index.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
public async Task<int> PeekByteAsync(int index)
public async Task<int> PeekByteAsync(int index, CancellationToken cancellationToken = default(CancellationToken))
{
if (Available <= index)
{
await FillBufferAsync();
await FillBufferAsync(cancellationToken);
}

if (Available <= index)
Expand Down Expand Up @@ -268,7 +269,7 @@ public byte ReadByteFromBuffer()
/// A task that represents the asynchronous write operation.
/// </returns>
[DebuggerStepThrough]
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
{
OnDataSent(buffer, offset, count);
return baseStream.WriteAsync(buffer, offset, count, cancellationToken);
Expand Down Expand Up @@ -417,21 +418,12 @@ public bool FillBuffer()
}
}

/// <summary>
/// Fills the buffer asynchronous.
/// </summary>
/// <returns></returns>
public Task<bool> FillBufferAsync()
{
return FillBufferAsync(CancellationToken.None);
}

/// <summary>
/// Fills the buffer asynchronous.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns></returns>
public async Task<bool> FillBufferAsync(CancellationToken cancellationToken)
public async Task<bool> FillBufferAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (closed)
{
Expand Down
7 changes: 4 additions & 3 deletions StreamExtended/Network/IBufferedStream.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace StreamExtended.Network
{
public interface IBufferedStream
{
bool DataAvailable { get; }

Task<bool> FillBufferAsync();
Task<bool> FillBufferAsync(CancellationToken cancellationToken);

byte ReadByteFromBuffer();

Task<int> ReadAsync(byte[] buffer, int offset, int count);
Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
}
}
4 changes: 3 additions & 1 deletion StreamExtended/Network/ServerHelloAlpnAdderStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.IO;
using System.Threading;

namespace StreamExtended.Network
{
Expand Down Expand Up @@ -47,7 +48,8 @@ public override void Write(byte[] buffer, int offset, int count)
var ms = new MemoryStream(buffer, offset, count);

//this can be non async, because reads from a memory stream
var serverHello = SslTools.PeekServerHello(new CustomBufferedStream(ms, (int)ms.Length)).Result;
var cts = new CancellationTokenSource();
var serverHello = SslTools.PeekServerHello(new CustomBufferedStream(ms, (int)ms.Length), cts.Token).Result;
if (serverHello != null)
{
// 0x00 0x10: ALPN identifier
Expand Down
Loading

0 comments on commit 183b66d

Please sign in to comment.