Skip to content

Commit

Permalink
Add app-owned scatter/gather buffer management (#4758)
Browse files Browse the repository at this point in the history
Defines a new QUIC_RECV_BUF_APP_OWNED mode and StreamProvideReceiveBuffers API

The receive buffer can now handle App owned buffers. App owned buffers will be written to sequentially once (no reuse).
The connection receive window is adjusted based on the buffer space available.
The application can provide buffers using the StreamProvideReceiveBuffers API at any time, and must ensure enough buffer space is provided to receive the full initial receive window.
  • Loading branch information
guhetier authored Feb 19, 2025
1 parent 9d10d84 commit 7da6fea
Show file tree
Hide file tree
Showing 30 changed files with 1,855 additions and 201 deletions.
1 change: 1 addition & 0 deletions docs/api/StreamOpen.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | Opens a unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | Indicates that the stream may be sent in 0-RTT.
**QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES**<br>4 | Indicates stream ID flow control limit updates for the connection should be delayed to StreamClose.
**QUIC_STREAM_OPEN_FLAG_APP_OWNED_BUFFERS**<br>5 | Receive buffers are owned by the app and will be provided using StreamProvideReceiveBuffers. MsQuic won't allocate any buffers for the stream.
`Handler`
Expand Down
152 changes: 152 additions & 0 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,158 @@ MsQuicStreamReceiveComplete(
"[ api] Exit");
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamProvideReceiveBuffers(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ uint32_t BufferCount,
_In_reads_(BufferCount) const QUIC_BUFFER* Buffers
)
{
QUIC_STATUS Status;
QUIC_OPERATION* Oper;
QUIC_CONNECTION* Connection = NULL;
CXPLAT_LIST_ENTRY ChunkList;
CxPlatListInitializeHead(&ChunkList);

QuicTraceEvent(
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_STREAM_PROVIDE_RECEIVE_BUFFERS,
Handle);

if (!IS_STREAM_HANDLE(Handle) || Buffers == NULL || BufferCount == 0) {
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;
}

for (uint32_t i = 0; i < BufferCount; ++i) {
if (Buffers[i].Length == 0) {
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;
}
}

#pragma prefast(suppress: __WARNING_25024, "Pointer cast already validated.")
QUIC_STREAM* Stream = (QUIC_STREAM*)Handle;

CXPLAT_TEL_ASSERT(!Stream->Flags.HandleClosed);
CXPLAT_TEL_ASSERT(!Stream->Flags.Freed);

Connection = Stream->Connection;
QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

//
// Execute this API call inline if called on the worker thread.
//
BOOLEAN IsWorkerThread = Connection->WorkerThreadID == CxPlatCurThreadID();
BOOLEAN IsAlreadyInline = Connection->State.InlineApiExecution;

if (!Stream->Flags.UseAppOwnedRecvBuffers) {
if (Stream->Flags.PeerStreamStartEventActive) {
CXPLAT_DBG_ASSERT(IsWorkerThread);
//
// We are inline from the callback indicating a peer opened a stream.
// No data was received yet so we can setup app-owned buffers.
//
Connection->State.InlineApiExecution = TRUE;
QuicStreamSwitchToAppOwnedBuffers(Stream);
Connection->State.InlineApiExecution = IsAlreadyInline;
} else {
//
// App-owned buffers can't be provided after the stream has been
// started using internal buffers.
//
Status = QUIC_STATUS_INVALID_STATE;
goto Error;
}
}

//
// Allocate a chunk for each buffer, linking them together.
// The allocation is done here to make the worker thread task failure free.
//
for (uint32_t i = 0; i < BufferCount; ++i) {
QUIC_RECV_CHUNK* Chunk =
CxPlatPoolAlloc(&Connection->Worker->AppBufferChunkPool);
if (Chunk == NULL) {
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"provided_chunk",
0);
Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Error;
}
QuicRecvChunkInitialize(Chunk, Buffers[i].Length, Buffers[i].Buffer, TRUE);
CxPlatListInsertTail(&ChunkList, &Chunk->Link);
}

if (IsWorkerThread) {
//
// Execute this API call inline if called on the worker thread.
//
Connection->State.InlineApiExecution = TRUE;
Status = QuicStreamProvideRecvBuffers(Stream, &ChunkList);
Connection->State.InlineApiExecution = IsAlreadyInline;
} else {
//
// Queue the operation to insert the chunks in the recv buffer, without waiting for the result.
//
Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
if (Oper == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"STRM_PROVIDE_RECV_BUFFERS, operation",
0);
goto Error;
}
Oper->API_CALL.Context->Type = QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS;
Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Stream = Stream;
CxPlatListInitializeHead(&Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Chunks);
CxPlatListMoveItems(&ChunkList, &Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Chunks);

//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);

//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
Status = QUIC_STATUS_SUCCESS;
}

Error:
//
// Cleanup allocated chunks if the operation failed.
//
while (!CxPlatListIsEmpty(&ChunkList)) {
CXPLAT_DBG_ASSERT(Connection != NULL);
CxPlatPoolFree(&Connection->Worker->AppBufferChunkPool,
CXPLAT_CONTAINING_RECORD(CxPlatListRemoveHead(&ChunkList), QUIC_RECV_CHUNK, Link));
CXPLAT_FREE(
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&ChunkList),
QUIC_RECV_CHUNK,
Link),
QUIC_POOL_RECVBUF);
}

QuicTraceEvent(
ApiExitStatus,
"[ api] Exit %u",
Status);

return Status;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QUIC_API
Expand Down
9 changes: 9 additions & 0 deletions src/core/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ MsQuicStreamReceiveComplete(
_In_ uint64_t BufferLength
);

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamProvideReceiveBuffers(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ uint32_t BufferCount,
_In_reads_(BufferCount) const QUIC_BUFFER *Buffers
);

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
Expand Down
19 changes: 19 additions & 0 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7561,6 +7561,25 @@ QuicConnProcessApiOperation(
ApiCtx->STRM_RECV_SET_ENABLED.IsEnabled);
break;

case QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS:
Status =
QuicStreamProvideRecvBuffers(
ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream,
&ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Chunks);

if (Status != QUIC_STATUS_SUCCESS) {
//
// If we cannot accept the app provided buffers at this point, we need to abort
// the connection: otherwise, we break the contract with the app about writting
// data to the provided buffers in order.
//
QuicConnFatalError(
ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream->Connection,
Status,
"Failed to accept app provided receive buffers");
}
break;

case QUIC_API_TYPE_SET_PARAM:
Status =
QuicLibrarySetParam(
Expand Down
1 change: 1 addition & 0 deletions src/core/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ QuicCryptoInitialize(
InitialRecvBufferLength,
QUIC_DEFAULT_STREAM_FC_WINDOW_SIZE / 2,
QUIC_RECV_BUF_MODE_SINGLE,
&Connection->Worker->AppBufferChunkPool,
NULL);
if (QUIC_FAILED(Status)) {
goto Exit;
Expand Down
1 change: 1 addition & 0 deletions src/core/library.c
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,7 @@ MsQuicOpenVersion(
Api->StreamSend = MsQuicStreamSend;
Api->StreamReceiveComplete = MsQuicStreamReceiveComplete;
Api->StreamReceiveSetEnabled = MsQuicStreamReceiveSetEnabled;
Api->StreamProvideReceiveBuffers = MsQuicStreamProvideReceiveBuffers;

Api->DatagramSend = MsQuicDatagramSend;

Expand Down
10 changes: 10 additions & 0 deletions src/core/operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ QuicOperationFree(
}
} else if (ApiCtx->Type == QUIC_API_TYPE_STRM_RECV_SET_ENABLED) {
QuicStreamRelease(ApiCtx->STRM_RECV_SET_ENABLED.Stream, QUIC_STREAM_REF_OPERATION);
} else if (ApiCtx->Type == QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS) {
while (!CxPlatListIsEmpty(&ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Chunks)) {
CXPLAT_FREE(
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Chunks),
QUIC_RECV_CHUNK,
Link),
QUIC_POOL_RECVBUF);
}
QuicStreamRelease(ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream, QUIC_STREAM_REF_OPERATION);
}
CxPlatPoolFree(&Worker->ApiContextPool, ApiCtx);
} else if (Oper->Type == QUIC_OPER_TYPE_FLUSH_STREAM_RECV) {
Expand Down
5 changes: 5 additions & 0 deletions src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef enum QUIC_API_TYPE {
QUIC_API_TYPE_STRM_SEND,
QUIC_API_TYPE_STRM_RECV_COMPLETE,
QUIC_API_TYPE_STRM_RECV_SET_ENABLED,
QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS,

QUIC_API_TYPE_SET_PARAM,
QUIC_API_TYPE_GET_PARAM,
Expand Down Expand Up @@ -154,6 +155,10 @@ typedef struct QUIC_API_CONTEXT {
QUIC_STREAM* Stream;
BOOLEAN IsEnabled;
} STRM_RECV_SET_ENABLED;
struct {
QUIC_STREAM* Stream;
CXPLAT_LIST_ENTRY /* QUIC_RECV_CHUNK */ Chunks;
} STRM_PROVIDE_RECV_BUFFERS;

struct {
HQUIC Handle;
Expand Down
Loading

0 comments on commit 7da6fea

Please sign in to comment.