Skip to content

Commit

Permalink
Merge pull request #318 from VelvetToroyashi/refactor/performant-gateway
Browse files Browse the repository at this point in the history
Optimize the websocket (again)
  • Loading branch information
Nihlus authored Oct 5, 2023
2 parents 29dade8 + 434c68a commit 0be68d7
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Backend/Remora.Discord.Gateway/DiscordGatewayClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ CancellationToken disconnectRequested
{
sendResult = await rateLimitPolicy.ExecuteAsync
(
() => _transportService.SendPayloadAsync(userPayload, disconnectRequested)
() => _transportService.SendPayloadAsync(userPayload, disconnectRequested).AsTask()
);

if (sendResult.IsSuccess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface IPayloadTransportService
/// <param name="payload">The payload.</param>
/// <param name="ct">The cancellation token for this operation.</param>
/// <returns>A send result which may or may not have succeeded.</returns>
Task<Result> SendPayloadAsync(IPayload payload, CancellationToken ct = default);
ValueTask<Result> SendPayloadAsync(IPayload payload, CancellationToken ct = default);

/// <summary>
/// Asynchronously receives a payload.
Expand All @@ -76,7 +76,7 @@ public interface IPayloadTransportService
/// </remarks>
/// <param name="ct">The cancellation token for this operation.</param>
/// <returns>A receive result which may or may not have succeeded.</returns>
Task<Result<IPayload>> ReceivePayloadAsync(CancellationToken ct = default);
ValueTask<Result<IPayload>> ReceivePayloadAsync(CancellationToken ct = default);

/// <summary>
/// Disconnects from the transport endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
Expand Down Expand Up @@ -149,7 +152,10 @@ public async Task<Result> ConnectAsync(Uri endpoint, CancellationToken ct = defa
}

/// <inheritdoc />
public async Task<Result> SendPayloadAsync(IPayload payload, CancellationToken ct = default)
#if NET6_0_OR_GREATER
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
#endif
public async ValueTask<Result> SendPayloadAsync(IPayload payload, CancellationToken ct = default)
{
if (_clientWebSocket is null)
{
Expand All @@ -161,60 +167,46 @@ public async Task<Result> SendPayloadAsync(IPayload payload, CancellationToken c
return new InvalidOperationError("The socket was not open.");
}

await using var memoryStream = new MemoryStream();
// Most common case is heartbeat ({"op":1"}), so this is more than enough
// if it isn't, the writer resizes the array, which is rare, and a cost we're
// fine with eating.
using var bufferWriter = new ArrayPoolBufferWriter<byte>(128);

byte[]? buffer = null;
try
{
await JsonSerializer.SerializeAsync(memoryStream, payload, _jsonOptions, ct);

if (memoryStream.Length > 4096)
{
return new NotSupportedError
(
"The payload was too large to be accepted by the gateway."
);
}

buffer = ArrayPool<byte>.Shared.Rent((int)memoryStream.Length);
memoryStream.Seek(0, SeekOrigin.Begin);

// Copy the data
var copiedBytes = 0;
while (copiedBytes < memoryStream.Length)
{
var bufferSegment = new ArraySegment<byte>(buffer, copiedBytes, (int)memoryStream.Length - copiedBytes);
copiedBytes += await memoryStream.ReadAsync(bufferSegment, ct);
}
// `SkipValidation = !Debugger.IsAttached` is to replicate what STJ does internally;
// it's worth noting however that STJ uses #if !DEBUG, while we simply check if
// there's a debugger, which is good enough.
await using var writer = new Utf8JsonWriter(bufferWriter, new JsonWriterOptions { Encoder = _jsonOptions.Encoder, Indented = false, SkipValidation = !Debugger.IsAttached });
JsonSerializer.Serialize(writer, payload, _jsonOptions);

// Send the whole payload as one chunk
var segment = new ArraySegment<byte>(buffer, 0, (int)memoryStream.Length);

await _clientWebSocket.SendAsync(segment, WebSocketMessageType.Text, true, ct);
if (bufferWriter.WrittenSpan.Length > 4096)
{
return new NotSupportedError
(
"The payload was too large to be accepted by the gateway."
);
}

if (_clientWebSocket.CloseStatus.HasValue)
{
if (Enum.IsDefined(typeof(GatewayCloseStatus), (int)_clientWebSocket.CloseStatus))
{
return new GatewayDiscordError((GatewayCloseStatus)_clientWebSocket.CloseStatus);
}
// Send the whole payload as one chunk
await _clientWebSocket.SendAsync(bufferWriter.WrittenMemory, WebSocketMessageType.Text, true, ct);

return new GatewayWebSocketError(_clientWebSocket.CloseStatus.Value);
}
if (!_clientWebSocket.CloseStatus.HasValue)
{
return Result.FromSuccess();
}
finally

if (Enum.IsDefined(typeof(GatewayCloseStatus), (int)_clientWebSocket.CloseStatus))
{
if (buffer is not null)
{
ArrayPool<byte>.Shared.Return(buffer);
}
return new GatewayDiscordError((GatewayCloseStatus)_clientWebSocket.CloseStatus);
}

return Result.FromSuccess();
return new GatewayWebSocketError(_clientWebSocket.CloseStatus.Value);
}

/// <inheritdoc />
public async Task<Result<IPayload>> ReceivePayloadAsync(CancellationToken ct = default)
#if NET6_0_OR_GREATER
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
#endif
public async ValueTask<Result<IPayload>> ReceivePayloadAsync(CancellationToken ct = default)
{
if (_clientWebSocket is null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public async Task<Result> ConnectAsync(Uri endpoint, CancellationToken ct = defa
}

/// <inheritdoc />
public async Task<Result> SendPayloadAsync(IPayload payload, CancellationToken ct = default)
public async ValueTask<Result> SendPayloadAsync(IPayload payload, CancellationToken ct = default)
{
try
{
Expand Down Expand Up @@ -295,7 +295,7 @@ public async Task<Result> SendPayloadAsync(IPayload payload, CancellationToken c
}

/// <inheritdoc />
public async Task<Result<IPayload>> ReceivePayloadAsync(CancellationToken ct = default)
public async ValueTask<Result<IPayload>> ReceivePayloadAsync(CancellationToken ct = default)
{
while (!ct.IsCancellationRequested)
{
Expand Down

0 comments on commit 0be68d7

Please sign in to comment.