Skip to content

Commit

Permalink
Deleted duplex buffered stream (close #8)
Browse files Browse the repository at this point in the history
  • Loading branch information
solyutor committed Mar 12, 2020
1 parent c05175a commit 1f567d0
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 372 deletions.
93 changes: 0 additions & 93 deletions src/SolarWind.Tests/DuplexBufferedStreamTests.cs

This file was deleted.

85 changes: 71 additions & 14 deletions src/SolarWind/Internals/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Codestellation.SolarWind.Protocol;
using Codestellation.SolarWind.Threading;
using Microsoft.Extensions.Logging;

namespace Codestellation.SolarWind.Internals
Expand All @@ -15,23 +16,29 @@ internal class Connection : IDisposable
private readonly AsyncNetworkStream _networkStream;
private readonly ILogger _logger;
private readonly Action _reconnect;
private readonly DuplexBufferedStream _mainStream;
private bool _disposed;
private readonly byte[] _readBuffer;
private int _readPosition;
private int _readLength;

private readonly byte[] _writeBuffer;
private int _writePosition;

public bool Connected => !_disposed;

private Connection(AsyncNetworkStream networkStream, ILogger logger, Action reconnect)
{
_networkStream = networkStream ?? throw new ArgumentNullException(nameof(networkStream));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_reconnect = reconnect;

_mainStream = new DuplexBufferedStream(_networkStream);
_readBuffer = ArrayPool<byte>.Shared.Rent(64 * 1024);
_writeBuffer = ArrayPool<byte>.Shared.Rent(64 * 1024);
}

public void Reconnect() => _reconnect?.Invoke();


public async ValueTask ReceiveAsync(PooledMemoryStream readBuffer, int bytesToReceive, CancellationToken cancellation)
public async ValueTask ReceiveAsync(PooledMemoryStream destination, int bytesToReceive, CancellationToken cancellation)
{
try
{
Expand All @@ -43,7 +50,21 @@ public async ValueTask ReceiveAsync(PooledMemoryStream readBuffer, int bytesToRe
throw new TaskCanceledException();
}

left -= await readBuffer.WriteAsync(_mainStream, bytesToReceive, cancellation).ConfigureAwait(ContinueOn.IOScheduler);
var available = _readLength - _readPosition;
if (available == 0)
{
_readPosition = 0;
_readLength = 0;
_readLength = await _networkStream
.ReadAsync(_readBuffer, 0, _readBuffer.Length, cancellation)
.ConfigureAwait(false);
available = _readLength;
}

var bytesToStream = Math.Min(available, left);
destination.Write(_readBuffer, _readPosition, bytesToStream);
_readPosition += bytesToStream;
left -= bytesToStream;
}
}
catch (ObjectDisposedException e)
Expand All @@ -52,18 +73,47 @@ public async ValueTask ReceiveAsync(PooledMemoryStream readBuffer, int bytesToRe
}
}

public ValueTask WriteAsync(in Message message, CancellationToken cancellation)
public async ValueTask WriteAsync(Message message, CancellationToken cancellation)
{
try
{
_logger.LogDebug($"Writing message {message.Header.ToString()}");
var wireHeader = new WireHeader(message.Header, new PayloadSize((int)message.Payload.Length));
WireHeader.WriteTo(wireHeader, _mainStream);
return message.Payload.CopyIntoAsync(_mainStream, cancellation);

var available = _writeBuffer.Length - _writePosition;
PooledMemoryStream payload = message.Payload;
var wireHeader = new WireHeader(message.Header, new PayloadSize((int)payload.Length));

if (available < WireHeader.Size)
{
await FlushAsync(cancellation).ConfigureAwait(false);
}

WireHeader.WriteTo(in wireHeader, _writeBuffer, _writePosition);
_writePosition += WireHeader.Size;

var bytesToSend = (int)payload.Length;
payload.Position = 0;
while (bytesToSend > 0)
{
available = _writeBuffer.Length - _writePosition;

if (available == 0)
{
await FlushAsync(cancellation).ConfigureAwait(false);
}

var sliceSize = Math.Min(available, bytesToSend);
var readFromPayload = payload.Read(_writeBuffer, _writePosition, sliceSize);

Debug.Assert(sliceSize == readFromPayload);

bytesToSend -= readFromPayload;
_writePosition += readFromPayload;
}
}
catch (ObjectDisposedException e)
{
throw new IOException("Receive failed", e);
throw new IOException("Send failed", e);
}
}

Expand Down Expand Up @@ -178,13 +228,20 @@ private static void ConfigureSocket(Socket socket, SolarWindHubOptions options)
socket.LingerState = new LingerOption(true, 1);
}

public Task FlushAsync(CancellationToken cancellation) => _mainStream.FlushAsync(cancellation);
public Task FlushAsync(CancellationToken cancellation)
{
var length = _writePosition;
_writePosition = 0; //Zero it here to avoid making the method async
return _networkStream.WriteAsync(_writeBuffer, 0, length, cancellation);
}

public void Dispose()
{
_disposed = true;
_mainStream.Close();
_mainStream.Dispose();
ArrayPool<byte>.Shared.Return(_readBuffer);
ArrayPool<byte>.Shared.Return(_writeBuffer);
_networkStream.Close();
_networkStream.Dispose();
}
}
}
Loading

0 comments on commit 1f567d0

Please sign in to comment.