Skip to content

Commit

Permalink
Use new Json Pipeline overloads (#7838)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Nov 29, 2024
1 parent d6b08ac commit 323893e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 29 deletions.
16 changes: 8 additions & 8 deletions src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc
using (result)
{
await using Stream stream = jsonRpcConfig.BufferResponses ? RecyclableStream.GetStream("http") : null;
ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter);
CountingWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter);
try
{
ctx.Response.ContentType = "application/json";
Expand All @@ -210,7 +210,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc
}

first = false;
jsonSerializer.Serialize(resultWriter, entry.Response);
await jsonSerializer.SerializeAsync(resultWriter, entry.Response);
_ = jsonRpcLocalStats.ReportCall(entry.Report);

// We reached the limit and don't want to responded to more request in the batch
Expand All @@ -233,7 +233,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc
}
else
{
jsonSerializer.Serialize(resultWriter, result.Response);
await jsonSerializer.SerializeAsync(resultWriter, result.Response);
}
await resultWriter.CompleteAsync();
if (stream is not null)
Expand All @@ -245,11 +245,11 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpc
}
catch (Exception e) when (e.InnerException is OperationCanceledException)
{
SerializeTimeoutException(resultWriter);
await SerializeTimeoutException(resultWriter);
}
catch (OperationCanceledException)
{
SerializeTimeoutException(resultWriter);
await SerializeTimeoutException(resultWriter);
}
finally
{
Expand Down Expand Up @@ -281,17 +281,17 @@ await PushErrorResponse(e.StatusCode, e.StatusCode == StatusCodes.Status413Paylo
Interlocked.Add(ref Metrics.JsonRpcBytesReceivedHttp, ctx.Request.ContentLength ?? request.Length);
}
}
void SerializeTimeoutException(IBufferWriter<byte> resultStream)
Task SerializeTimeoutException(CountingWriter resultStream)
{
JsonRpcErrorResponse? error = jsonRpcService.GetErrorResponse(ErrorCodes.Timeout, "Request was canceled due to enabled timeout.");
jsonSerializer.Serialize(resultStream, error);
return jsonSerializer.SerializeAsync(resultStream, error);
}
async Task PushErrorResponse(int statusCode, int errorCode, string message)
{
JsonRpcErrorResponse? response = jsonRpcService.GetErrorResponse(errorCode, message);
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = statusCode;
jsonSerializer.Serialize(ctx.Response.BodyWriter, response);
await jsonSerializer.SerializeAsync(ctx.Response.BodyWriter, response);
await ctx.Response.CompleteAsync();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,15 @@ private JsonWriterOptions CreateWriterOptions(bool indented)
public async ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true)
{
var writer = GetPipeWriter(stream, leaveOpen);
Serialize(writer, value, indented);
await JsonSerializer.SerializeAsync(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
await writer.CompleteAsync();

long outputCount = writer.WrittenCount;
return outputCount;
}

public void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false)
{
using var jsonWriter = new Utf8JsonWriter(writer, CreateWriterOptions(indented));
JsonSerializer.Serialize(jsonWriter, value, indented ? JsonOptionsIndented : _jsonOptions);
}
public Task SerializeAsync<T>(PipeWriter writer, T value, bool indented = false)
=> JsonSerializer.SerializeAsync(writer, value, indented ? JsonOptionsIndented : _jsonOptions);

public static void SerializeToStream<T>(Stream stream, T value, bool indented = false)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;

namespace Nethermind.Serialization.Json
Expand All @@ -16,6 +17,6 @@ public interface IJsonSerializer
string Serialize<T>(T value, bool indented = false);
long Serialize<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false);
Task SerializeAsync<T>(PipeWriter writer, T value, bool indented = false);
}
}
34 changes: 20 additions & 14 deletions src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ namespace Nethermind.Serialization.Json;

#nullable enable

public interface ICountingBufferWriter : IBufferWriter<byte>
public abstract class CountingWriter : PipeWriter
{
long WrittenCount { get; }
ValueTask CompleteAsync(Exception? exception = null);
public long WrittenCount { get; protected set; }
}

public sealed class CountingPipeWriter : ICountingBufferWriter
public sealed class CountingPipeWriter : CountingWriter
{
private readonly PipeWriter _writer;
public long WrittenCount { get; private set; }

public CountingPipeWriter(PipeWriter writer)
{
Expand All @@ -34,24 +32,33 @@ public CountingPipeWriter(PipeWriter writer)
_writer = writer;
}

public void Advance(int count)
public override void Advance(int count)
{
_writer.Advance(count);
WrittenCount += count;
}

public Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);
public override Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);

public Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);
public override Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);

public ValueTask CompleteAsync(Exception? exception = null)
{
return _writer.CompleteAsync();
}
public override ValueTask CompleteAsync(Exception? exception = null)
=> _writer.CompleteAsync();

public override void CancelPendingFlush()
=> _writer.CancelPendingFlush();

public override void Complete(Exception? exception = null)
=> _writer.Complete(exception);

public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
=> _writer.FlushAsync(cancellationToken);

public override bool CanGetUnflushedBytes => _writer.CanGetUnflushedBytes;
public override long UnflushedBytes => _writer.UnflushedBytes;
}

public sealed class CountingStreamPipeWriter : PipeWriter, ICountingBufferWriter
public sealed class CountingStreamPipeWriter : CountingWriter
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand Down Expand Up @@ -103,7 +110,6 @@ public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? o
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream { get; }
public long WrittenCount { get; set; }

/// <inheritdoc />
public override void Advance(int bytes)
Expand Down

0 comments on commit 323893e

Please sign in to comment.