Skip to content

Commit

Permalink
Implement the API
Browse files Browse the repository at this point in the history
  • Loading branch information
guhetier committed Feb 1, 2025
1 parent d4013db commit 52633e7
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/api/StreamOpen.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | Opens a unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | Indicates that the stream may be sent in 0-RTT.
**QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES**<br>4 | Indicates stream ID flow control limit updates for the connection should be delayed to StreamClose.
**QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS**<br>5 | No buffer will be allocated for the stream, external buffers must be provided using StreamProvideReceiveBuffers.
`Handler`
Expand Down
150 changes: 150 additions & 0 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,15 @@ MsQuicStreamOpen(
goto Error;
}

if (!!(Flags & QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS) &&
Connection->Settings.StreamMultiReceiveEnabled) {
//
// External buffers are not supported with multi-receive.
//
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;
}

Status = QuicStreamInitialize(Connection, FALSE, Flags, (QUIC_STREAM**)NewStream);
if (QUIC_FAILED(Status)) {
goto Error;
Expand Down Expand Up @@ -1351,6 +1360,147 @@ MsQuicStreamReceiveComplete(
"[ api] Exit");
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamProvideReceiveBuffers(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ uint32_t BufferCount,
_In_reads_(BufferCount) const QUIC_BUFFER* Buffers
)
{
QUIC_STATUS Status;
QUIC_OPERATION* Oper;
CXPLAT_LIST_ENTRY ChunkList;
CxPlatListInitializeHead(&ChunkList);

QuicTraceEvent(
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_STREAM_PROVIDE_RECEIVE_BUFFERS,
Handle);

if (!IS_STREAM_HANDLE(Handle) || Buffers == NULL || BufferCount == 0) {
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;
}

for (uint32_t i = 0; i < BufferCount; ++i) {
if (Buffers[i].Length == 0) {
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;
}
}

#pragma prefast(suppress: __WARNING_25024, "Pointer cast already validated.")
QUIC_STREAM* Stream = (QUIC_STREAM*)Handle;

CXPLAT_TEL_ASSERT(!Stream->Flags.HandleClosed);
CXPLAT_TEL_ASSERT(!Stream->Flags.Freed);

QUIC_CONNECTION* Connection = Stream->Connection;
QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

//
// Execute this API call inline if called on the worker thread.
//
BOOLEAN IsWorkerThread = Connection->WorkerThreadID == CxPlatCurThreadID();
BOOLEAN IsAlreadyInline = Connection->State.InlineApiExecution;

if (!Stream->Flags.UseExternalRecvBuffers) {
if (Stream->Flags.PeerStreamStartEventActive) {
CXPLAT_DBG_ASSERT(IsWorkerThread);
//
// We are inline from the callback indicating a peer opened a stream.
// No data was received yet so we can setup external buffers.
//
Connection->State.InlineApiExecution = TRUE;
QuicStreamSwitchToExternalBuffers(Stream);
Connection->State.InlineApiExecution = IsAlreadyInline;
} else {
//
// External buffers can't be provided after the stream has been
// started using internal buffers.
//
Status = QUIC_STATUS_INVALID_STATE;
goto Error;
}
}

//
// Allocate a chunk for each buffer, linking them together.
// The allocation is done here to make the worker thread task failure free.
//
for (uint32_t i = 0; i < BufferCount; ++i) {
QUIC_RECV_CHUNK* Chunk = CXPLAT_ALLOC_NONPAGED(sizeof(QUIC_RECV_CHUNK), QUIC_POOL_RECVBUF);
if (Chunk == NULL) {
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"provided_chunk",
0);
Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Error;
}
QuicRecvChunkInitialize(Chunk, Buffers[i].Length, Buffers[i].Buffer);
CxPlatListInsertTail(&ChunkList, &Chunk->Link);
}

if (IsWorkerThread) {
//
// Execute this API call inline if called on the worker thread.
//
Connection->State.InlineApiExecution = TRUE;
Status = QuicStreamProvideRecvBuffers(Stream, &ChunkList);
Connection->State.InlineApiExecution = IsAlreadyInline;
} else {
//
// Queue the operation to insert the chunks in the recv buffer, without waiting for the result.
//
Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
if (Oper == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
QuicTraceEvent(
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"STRM_PROVIDE_RECV_BUFFERS, operation",
0);
goto Error;
}
Oper->API_CALL.Context->Type = QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS;
Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Stream = Stream;
CxPlatListMoveItems(&ChunkList, &Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Chunks);

//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);

//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
}

Status = QUIC_STATUS_SUCCESS;

Error:
// Cleanup allocated chunks if the operation failed.
while (!CxPlatListIsEmpty(&ChunkList)) {
QUIC_RECV_CHUNK* Chunk = CXPLAT_CONTAINING_RECORD(CxPlatListRemoveHead(&ChunkList), QUIC_RECV_CHUNK, Link);
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
}

QuicTraceEvent(
ApiExitStatus,
"[ api] Exit %u",
Status);

return Status;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QUIC_API
Expand Down
19 changes: 19 additions & 0 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7508,6 +7508,25 @@ QuicConnProcessApiOperation(
ApiCtx->STRM_RECV_SET_ENABLED.IsEnabled);
break;

case QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS:
Status =
QuicStreamProvideRecvBuffers(
ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream,
&ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Chunks);

if (Status != QUIC_STATUS_SUCCESS) {
//
// If we cannot accept the app provided buffers at this point, we need to abort
// the connection: otherwise, we break the contract with the app about writting
// data to the provided buffers in order.
//
QuicConnFatalError(
ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream->Connection,
Status,
"Failed to accept app provided receive buffers");
}
break;

case QUIC_API_TYPE_SET_PARAM:
Status =
QuicLibrarySetParam(
Expand Down
2 changes: 2 additions & 0 deletions src/core/operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ QuicOperationFree(
}
} else if (ApiCtx->Type == QUIC_API_TYPE_STRM_RECV_SET_ENABLED) {
QuicStreamRelease(ApiCtx->STRM_RECV_SET_ENABLED.Stream, QUIC_STREAM_REF_OPERATION);
} else if (ApiCtx->Type == QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS) {
QuicStreamRelease(ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream, QUIC_STREAM_REF_OPERATION);
}
CxPlatPoolFree(&Worker->ApiContextPool, ApiCtx);
} else if (Oper->Type == QUIC_OPER_TYPE_FLUSH_STREAM_RECV) {
Expand Down
5 changes: 5 additions & 0 deletions src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef enum QUIC_API_TYPE {
QUIC_API_TYPE_STRM_SEND,
QUIC_API_TYPE_STRM_RECV_COMPLETE,
QUIC_API_TYPE_STRM_RECV_SET_ENABLED,
QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS,

QUIC_API_TYPE_SET_PARAM,
QUIC_API_TYPE_GET_PARAM,
Expand Down Expand Up @@ -154,6 +155,10 @@ typedef struct QUIC_API_CONTEXT {
QUIC_STREAM* Stream;
BOOLEAN IsEnabled;
} STRM_RECV_SET_ENABLED;
struct {
QUIC_STREAM* Stream;
CXPLAT_LIST_ENTRY /* QUIC_RECV_CHUNK */ Chunks;
} STRM_PROVIDE_RECV_BUFFERS;

struct {
HQUIC Handle;
Expand Down
62 changes: 59 additions & 3 deletions src/core/recv_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,65 @@ QuicRecvBufferWrite(
return QUIC_STATUS_SUCCESS;
}

// TODO guhetier: This could return the number of buffers needed / extra buffer needed?
// For now, only fill up to the number of provided buffer and follow up
_IRQL_requires_max_(DISPATCH_LEVEL)
uint32_t
QuicRecvBufferReadBufferNeededCount(
_In_ const QUIC_RECV_BUFFER* RecvBuffer
)
{
if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_SINGLE) {
//
// Single mode only ever need one buffer, that's what it's designed for.
//
return 1;
} else if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_CIRCULAR) {
//
// Circular mode need up to two buffers to deal with wrap around.
//
return 2;
} else if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE) {
//
// Multiple mode need up to three buffers to deal with wrap around and a
// potential second chunk for overflow data.
//
return 3;
} else { // RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_EXTERNAL
//
// External mode can need any number of buffer, we must count.
//

//
// Determine how much data is readable
//
const QUIC_SUBRANGE* FirstRange = QuicRangeGet(&RecvBuffer->WrittenRanges, 0);
const uint64_t ReadableData = FirstRange->Count - RecvBuffer->BaseOffset;

//
// Iterate through the chunks until they can contain all the readable data,
// to find the number of buffers needed.
//
CXPLAT_DBG_ASSERT(!CxPlatListIsEmpty(&RecvBuffer->Chunks));
QUIC_RECV_CHUNK* Chunk =
CXPLAT_CONTAINING_RECORD(
RecvBuffer->Chunks.Flink,
QUIC_RECV_CHUNK,
Link);
uint32_t DataInChunks = RecvBuffer->Capacity;
uint32_t BufferCount = 1;

while (ReadableData > DataInChunks) {
Chunk =
CXPLAT_CONTAINING_RECORD(
Chunk->Link.Flink,
QUIC_RECV_CHUNK,
Link);
DataInChunks += Chunk->AllocLength;
BufferCount++;
}
return BufferCount;
}
}

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicRecvBufferRead(
Expand Down Expand Up @@ -765,7 +822,6 @@ QuicRecvBufferRead(
Link);
CXPLAT_DBG_ASSERT(!Chunk->ExternalReference);
CXPLAT_DBG_ASSERT(RecvBuffer->ReadStart == 0);
CXPLAT_DBG_ASSERT(*BufferCount >= 1);
CXPLAT_DBG_ASSERT(ContiguousLength <= (uint64_t)Chunk->AllocLength);

*BufferCount = 1;
Expand Down
10 changes: 10 additions & 0 deletions src/core/recv_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ QuicRecvBufferWrite(
_Out_ BOOLEAN* NewDataReady
);

//
// Returns how many QUIC_BUFFERs should be passed to `QuicRecvBufferRead` to
// read all the available data in the buffer.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
uint32_t
QuicRecvBufferReadBufferNeededCount(
_In_ const QUIC_RECV_BUFFER *RecvBuffer
);

//
// Returns a pointer into the buffer for data ready to be delivered to the
// client.
Expand Down
43 changes: 38 additions & 5 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ QuicStreamInitialize(
QUIC_STATUS Status;
QUIC_STREAM* Stream;
QUIC_RECV_CHUNK* PreallocatedRecvChunk = NULL;
uint32_t InitialRecvBufferLength;
QUIC_WORKER* Worker = Connection->Worker;

Stream = CxPlatPoolAlloc(&Worker->StreamPool);
Expand Down Expand Up @@ -63,6 +62,7 @@ QuicStreamInitialize(
Stream,
"Configured for delayed ID FC updates");
}
Stream->Flags.UseExternalRecvBuffers = !!(Flags & QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS);
Stream->Flags.Allocated = TRUE;
Stream->Flags.SendEnabled = TRUE;
Stream->Flags.ReceiveEnabled = TRUE;
Expand Down Expand Up @@ -112,13 +112,26 @@ QuicStreamInitialize(
}
}

InitialRecvBufferLength = Connection->Settings.StreamRecvBufferDefault;
if (InitialRecvBufferLength == QUIC_DEFAULT_STREAM_RECV_BUFFER_SIZE) {
const uint32_t InitialRecvBufferLength = Connection->Settings.StreamRecvBufferDefault;

QUIC_RECV_BUF_MODE RecvBufferMode = QUIC_RECV_BUF_MODE_CIRCULAR;
if (Stream->Flags.ReceiveMultiple) {
RecvBufferMode = QUIC_RECV_BUF_MODE_MULTIPLE;
} else if (Stream->Flags.UseExternalRecvBuffers) {
RecvBufferMode = QUIC_RECV_BUF_MODE_EXTERNAL;
}

if (InitialRecvBufferLength == QUIC_DEFAULT_STREAM_RECV_BUFFER_SIZE &&
RecvBufferMode != QUIC_RECV_BUF_MODE_EXTERNAL) {
PreallocatedRecvChunk = CxPlatPoolAlloc(&Worker->DefaultReceiveBufferPool);
if (PreallocatedRecvChunk == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Exit;
}
QuicRecvChunkInitialize(
PreallocatedRecvChunk,
InitialRecvBufferLength,
(uint8_t *)(PreallocatedRecvChunk + 1));
}

const uint32_t FlowControlWindowSize = Stream->Flags.Unidirectional
Expand All @@ -132,8 +145,7 @@ QuicStreamInitialize(
&Stream->RecvBuffer,
InitialRecvBufferLength,
FlowControlWindowSize,
Stream->Flags.ReceiveMultiple ?
QUIC_RECV_BUF_MODE_MULTIPLE : QUIC_RECV_BUF_MODE_CIRCULAR,
RecvBufferMode,
PreallocatedRecvChunk);
if (QUIC_FAILED(Status)) {
goto Exit;
Expand Down Expand Up @@ -957,3 +969,24 @@ QuicStreamParamGet(

return Status;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicStreamSwitchToExternalBuffers(
_In_ QUIC_STREAM* Stream
)
{
QuicRecvBufferUninitialize(&Stream->RecvBuffer);
(void)QuicRecvBufferInitialize(&Stream->RecvBuffer, 0, 0, QUIC_RECV_BUF_MODE_EXTERNAL, NULL);
Stream->Flags.UseExternalRecvBuffers = TRUE;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QuicStreamProvideRecvBuffers(
_In_ QUIC_STREAM* Stream,
_Inout_ CXPLAT_LIST_ENTRY* /* QUIC_RECV_CHUNK */ Chunks
)
{
return QuicRecvBufferProvideChunks(&Stream->RecvBuffer, Chunks);
}
Loading

0 comments on commit 52633e7

Please sign in to comment.