From 52633e762bf53a2de01aaac39d97082d1767d28b Mon Sep 17 00:00:00 2001 From: Guillaume Hetier Date: Fri, 31 Jan 2025 17:05:59 -0800 Subject: [PATCH] Implement the API --- docs/api/StreamOpen.md | 1 + src/core/api.c | 150 +++++++++++++++++++++++++++++++++++++++++ src/core/connection.c | 19 ++++++ src/core/operation.c | 2 + src/core/operation.h | 5 ++ src/core/recv_buffer.c | 62 ++++++++++++++++- src/core/recv_buffer.h | 10 +++ src/core/stream.c | 43 ++++++++++-- src/core/stream.h | 29 ++++++++ src/core/stream_recv.c | 43 ++++++++++-- src/core/stream_set.c | 3 + src/inc/msquic.h | 18 +++++ src/inc/quic_trace.h | 1 + 13 files changed, 373 insertions(+), 13 deletions(-) diff --git a/docs/api/StreamOpen.md b/docs/api/StreamOpen.md index f843045935..c878559365 100644 --- a/docs/api/StreamOpen.md +++ b/docs/api/StreamOpen.md @@ -35,6 +35,7 @@ Value | Meaning **QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**
1 | Opens a unidirectional stream. **QUIC_STREAM_OPEN_FLAG_0_RTT**
2 | Indicates that the stream may be sent in 0-RTT. **QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES**
4 | Indicates stream ID flow control limit updates for the connection should be delayed to StreamClose. +**QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS**
5 | No buffer will be allocated for the stream, external buffers must be provided using StreamProvideReceiveBuffers. `Handler` diff --git a/src/core/api.c b/src/core/api.c index f18a7f106c..f9bfd0d1c7 100644 --- a/src/core/api.c +++ b/src/core/api.c @@ -658,6 +658,15 @@ MsQuicStreamOpen( goto Error; } + if (!!(Flags & QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS) && + Connection->Settings.StreamMultiReceiveEnabled) { + // + // External buffers are not supported with multi-receive. + // + Status = QUIC_STATUS_INVALID_PARAMETER; + goto Error; + } + Status = QuicStreamInitialize(Connection, FALSE, Flags, (QUIC_STREAM**)NewStream); if (QUIC_FAILED(Status)) { goto Error; @@ -1351,6 +1360,147 @@ MsQuicStreamReceiveComplete( "[ api] Exit"); } +_IRQL_requires_max_(DISPATCH_LEVEL) +QUIC_STATUS +QUIC_API +MsQuicStreamProvideReceiveBuffers( + _In_ _Pre_defensive_ HQUIC Handle, + _In_ uint32_t BufferCount, + _In_reads_(BufferCount) const QUIC_BUFFER* Buffers + ) +{ + QUIC_STATUS Status; + QUIC_OPERATION* Oper; + CXPLAT_LIST_ENTRY ChunkList; + CxPlatListInitializeHead(&ChunkList); + + QuicTraceEvent( + ApiEnter, + "[ api] Enter %u (%p).", + QUIC_TRACE_API_STREAM_PROVIDE_RECEIVE_BUFFERS, + Handle); + + if (!IS_STREAM_HANDLE(Handle) || Buffers == NULL || BufferCount == 0) { + Status = QUIC_STATUS_INVALID_PARAMETER; + goto Error; + } + + for (uint32_t i = 0; i < BufferCount; ++i) { + if (Buffers[i].Length == 0) { + Status = QUIC_STATUS_INVALID_PARAMETER; + goto Error; + } + } + +#pragma prefast(suppress: __WARNING_25024, "Pointer cast already validated.") + QUIC_STREAM* Stream = (QUIC_STREAM*)Handle; + + CXPLAT_TEL_ASSERT(!Stream->Flags.HandleClosed); + CXPLAT_TEL_ASSERT(!Stream->Flags.Freed); + + QUIC_CONNECTION* Connection = Stream->Connection; + QUIC_CONN_VERIFY(Connection, !Connection->State.Freed); + + // + // Execute this API call inline if called on the worker thread. + // + BOOLEAN IsWorkerThread = Connection->WorkerThreadID == CxPlatCurThreadID(); + BOOLEAN IsAlreadyInline = Connection->State.InlineApiExecution; + + if (!Stream->Flags.UseExternalRecvBuffers) { + if (Stream->Flags.PeerStreamStartEventActive) { + CXPLAT_DBG_ASSERT(IsWorkerThread); + // + // We are inline from the callback indicating a peer opened a stream. + // No data was received yet so we can setup external buffers. + // + Connection->State.InlineApiExecution = TRUE; + QuicStreamSwitchToExternalBuffers(Stream); + Connection->State.InlineApiExecution = IsAlreadyInline; + } else { + // + // External buffers can't be provided after the stream has been + // started using internal buffers. + // + Status = QUIC_STATUS_INVALID_STATE; + goto Error; + } + } + + // + // Allocate a chunk for each buffer, linking them together. + // The allocation is done here to make the worker thread task failure free. + // + for (uint32_t i = 0; i < BufferCount; ++i) { + QUIC_RECV_CHUNK* Chunk = CXPLAT_ALLOC_NONPAGED(sizeof(QUIC_RECV_CHUNK), QUIC_POOL_RECVBUF); + if (Chunk == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "provided_chunk", + 0); + Status = QUIC_STATUS_OUT_OF_MEMORY; + goto Error; + } + QuicRecvChunkInitialize(Chunk, Buffers[i].Length, Buffers[i].Buffer); + CxPlatListInsertTail(&ChunkList, &Chunk->Link); + } + + if (IsWorkerThread) { + // + // Execute this API call inline if called on the worker thread. + // + Connection->State.InlineApiExecution = TRUE; + Status = QuicStreamProvideRecvBuffers(Stream, &ChunkList); + Connection->State.InlineApiExecution = IsAlreadyInline; + } else { + // + // Queue the operation to insert the chunks in the recv buffer, without waiting for the result. + // + Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL); + if (Oper == NULL) { + Status = QUIC_STATUS_OUT_OF_MEMORY; + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "STRM_PROVIDE_RECV_BUFFERS, operation", + 0); + goto Error; + } + Oper->API_CALL.Context->Type = QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS; + Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Stream = Stream; + CxPlatListMoveItems(&ChunkList, &Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Chunks); + + // + // Async stream operations need to hold a ref on the stream so that the + // stream isn't freed before the operation can be processed. The ref is + // released after the operation is processed. + // + QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION); + + // + // Queue the operation but don't wait for the completion. + // + QuicConnQueueOper(Connection, Oper); + } + + Status = QUIC_STATUS_SUCCESS; + +Error: + // Cleanup allocated chunks if the operation failed. + while (!CxPlatListIsEmpty(&ChunkList)) { + QUIC_RECV_CHUNK* Chunk = CXPLAT_CONTAINING_RECORD(CxPlatListRemoveHead(&ChunkList), QUIC_RECV_CHUNK, Link); + CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF); + } + + QuicTraceEvent( + ApiExitStatus, + "[ api] Exit %u", + Status); + + return Status; +} + _IRQL_requires_max_(PASSIVE_LEVEL) QUIC_STATUS QUIC_API diff --git a/src/core/connection.c b/src/core/connection.c index b59d19ba45..04a9b0fb3b 100644 --- a/src/core/connection.c +++ b/src/core/connection.c @@ -7508,6 +7508,25 @@ QuicConnProcessApiOperation( ApiCtx->STRM_RECV_SET_ENABLED.IsEnabled); break; + case QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS: + Status = + QuicStreamProvideRecvBuffers( + ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream, + &ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Chunks); + + if (Status != QUIC_STATUS_SUCCESS) { + // + // If we cannot accept the app provided buffers at this point, we need to abort + // the connection: otherwise, we break the contract with the app about writting + // data to the provided buffers in order. + // + QuicConnFatalError( + ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream->Connection, + Status, + "Failed to accept app provided receive buffers"); + } + break; + case QUIC_API_TYPE_SET_PARAM: Status = QuicLibrarySetParam( diff --git a/src/core/operation.c b/src/core/operation.c index 96f1bfcc86..574abdff06 100644 --- a/src/core/operation.c +++ b/src/core/operation.c @@ -119,6 +119,8 @@ QuicOperationFree( } } else if (ApiCtx->Type == QUIC_API_TYPE_STRM_RECV_SET_ENABLED) { QuicStreamRelease(ApiCtx->STRM_RECV_SET_ENABLED.Stream, QUIC_STREAM_REF_OPERATION); + } else if (ApiCtx->Type == QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS) { + QuicStreamRelease(ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream, QUIC_STREAM_REF_OPERATION); } CxPlatPoolFree(&Worker->ApiContextPool, ApiCtx); } else if (Oper->Type == QUIC_OPER_TYPE_FLUSH_STREAM_RECV) { diff --git a/src/core/operation.h b/src/core/operation.h index 3146588a07..7abb760a10 100644 --- a/src/core/operation.h +++ b/src/core/operation.h @@ -56,6 +56,7 @@ typedef enum QUIC_API_TYPE { QUIC_API_TYPE_STRM_SEND, QUIC_API_TYPE_STRM_RECV_COMPLETE, QUIC_API_TYPE_STRM_RECV_SET_ENABLED, + QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS, QUIC_API_TYPE_SET_PARAM, QUIC_API_TYPE_GET_PARAM, @@ -154,6 +155,10 @@ typedef struct QUIC_API_CONTEXT { QUIC_STREAM* Stream; BOOLEAN IsEnabled; } STRM_RECV_SET_ENABLED; + struct { + QUIC_STREAM* Stream; + CXPLAT_LIST_ENTRY /* QUIC_RECV_CHUNK */ Chunks; + } STRM_PROVIDE_RECV_BUFFERS; struct { HQUIC Handle; diff --git a/src/core/recv_buffer.c b/src/core/recv_buffer.c index f806dc2102..b8900fccb3 100644 --- a/src/core/recv_buffer.c +++ b/src/core/recv_buffer.c @@ -717,8 +717,65 @@ QuicRecvBufferWrite( return QUIC_STATUS_SUCCESS; } -// TODO guhetier: This could return the number of buffers needed / extra buffer needed? -// For now, only fill up to the number of provided buffer and follow up +_IRQL_requires_max_(DISPATCH_LEVEL) +uint32_t +QuicRecvBufferReadBufferNeededCount( + _In_ const QUIC_RECV_BUFFER* RecvBuffer +) +{ + if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_SINGLE) { + // + // Single mode only ever need one buffer, that's what it's designed for. + // + return 1; + } else if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_CIRCULAR) { + // + // Circular mode need up to two buffers to deal with wrap around. + // + return 2; + } else if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE) { + // + // Multiple mode need up to three buffers to deal with wrap around and a + // potential second chunk for overflow data. + // + return 3; + } else { // RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_EXTERNAL + // + // External mode can need any number of buffer, we must count. + // + + // + // Determine how much data is readable + // + const QUIC_SUBRANGE* FirstRange = QuicRangeGet(&RecvBuffer->WrittenRanges, 0); + const uint64_t ReadableData = FirstRange->Count - RecvBuffer->BaseOffset; + + // + // Iterate through the chunks until they can contain all the readable data, + // to find the number of buffers needed. + // + CXPLAT_DBG_ASSERT(!CxPlatListIsEmpty(&RecvBuffer->Chunks)); + QUIC_RECV_CHUNK* Chunk = + CXPLAT_CONTAINING_RECORD( + RecvBuffer->Chunks.Flink, + QUIC_RECV_CHUNK, + Link); + uint32_t DataInChunks = RecvBuffer->Capacity; + uint32_t BufferCount = 1; + + while (ReadableData > DataInChunks) { + Chunk = + CXPLAT_CONTAINING_RECORD( + Chunk->Link.Flink, + QUIC_RECV_CHUNK, + Link); + DataInChunks += Chunk->AllocLength; + BufferCount++; + } + return BufferCount; + } +} + _IRQL_requires_max_(DISPATCH_LEVEL) void QuicRecvBufferRead( @@ -765,7 +822,6 @@ QuicRecvBufferRead( Link); CXPLAT_DBG_ASSERT(!Chunk->ExternalReference); CXPLAT_DBG_ASSERT(RecvBuffer->ReadStart == 0); - CXPLAT_DBG_ASSERT(*BufferCount >= 1); CXPLAT_DBG_ASSERT(ContiguousLength <= (uint64_t)Chunk->AllocLength); *BufferCount = 1; diff --git a/src/core/recv_buffer.h b/src/core/recv_buffer.h index 94b6114f6e..50c2f2a8a2 100644 --- a/src/core/recv_buffer.h +++ b/src/core/recv_buffer.h @@ -177,6 +177,16 @@ QuicRecvBufferWrite( _Out_ BOOLEAN* NewDataReady ); +// +// Returns how many QUIC_BUFFERs should be passed to `QuicRecvBufferRead` to +// read all the available data in the buffer. +// +_IRQL_requires_max_(DISPATCH_LEVEL) +uint32_t +QuicRecvBufferReadBufferNeededCount( + _In_ const QUIC_RECV_BUFFER *RecvBuffer + ); + // // Returns a pointer into the buffer for data ready to be delivered to the // client. diff --git a/src/core/stream.c b/src/core/stream.c index a96ca37d7a..652a78f711 100644 --- a/src/core/stream.c +++ b/src/core/stream.c @@ -28,7 +28,6 @@ QuicStreamInitialize( QUIC_STATUS Status; QUIC_STREAM* Stream; QUIC_RECV_CHUNK* PreallocatedRecvChunk = NULL; - uint32_t InitialRecvBufferLength; QUIC_WORKER* Worker = Connection->Worker; Stream = CxPlatPoolAlloc(&Worker->StreamPool); @@ -63,6 +62,7 @@ QuicStreamInitialize( Stream, "Configured for delayed ID FC updates"); } + Stream->Flags.UseExternalRecvBuffers = !!(Flags & QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS); Stream->Flags.Allocated = TRUE; Stream->Flags.SendEnabled = TRUE; Stream->Flags.ReceiveEnabled = TRUE; @@ -112,13 +112,26 @@ QuicStreamInitialize( } } - InitialRecvBufferLength = Connection->Settings.StreamRecvBufferDefault; - if (InitialRecvBufferLength == QUIC_DEFAULT_STREAM_RECV_BUFFER_SIZE) { + const uint32_t InitialRecvBufferLength = Connection->Settings.StreamRecvBufferDefault; + + QUIC_RECV_BUF_MODE RecvBufferMode = QUIC_RECV_BUF_MODE_CIRCULAR; + if (Stream->Flags.ReceiveMultiple) { + RecvBufferMode = QUIC_RECV_BUF_MODE_MULTIPLE; + } else if (Stream->Flags.UseExternalRecvBuffers) { + RecvBufferMode = QUIC_RECV_BUF_MODE_EXTERNAL; + } + + if (InitialRecvBufferLength == QUIC_DEFAULT_STREAM_RECV_BUFFER_SIZE && + RecvBufferMode != QUIC_RECV_BUF_MODE_EXTERNAL) { PreallocatedRecvChunk = CxPlatPoolAlloc(&Worker->DefaultReceiveBufferPool); if (PreallocatedRecvChunk == NULL) { Status = QUIC_STATUS_OUT_OF_MEMORY; goto Exit; } + QuicRecvChunkInitialize( + PreallocatedRecvChunk, + InitialRecvBufferLength, + (uint8_t *)(PreallocatedRecvChunk + 1)); } const uint32_t FlowControlWindowSize = Stream->Flags.Unidirectional @@ -132,8 +145,7 @@ QuicStreamInitialize( &Stream->RecvBuffer, InitialRecvBufferLength, FlowControlWindowSize, - Stream->Flags.ReceiveMultiple ? - QUIC_RECV_BUF_MODE_MULTIPLE : QUIC_RECV_BUF_MODE_CIRCULAR, + RecvBufferMode, PreallocatedRecvChunk); if (QUIC_FAILED(Status)) { goto Exit; @@ -957,3 +969,24 @@ QuicStreamParamGet( return Status; } + +_IRQL_requires_max_(DISPATCH_LEVEL) +void +QuicStreamSwitchToExternalBuffers( + _In_ QUIC_STREAM* Stream +) +{ + QuicRecvBufferUninitialize(&Stream->RecvBuffer); + (void)QuicRecvBufferInitialize(&Stream->RecvBuffer, 0, 0, QUIC_RECV_BUF_MODE_EXTERNAL, NULL); + Stream->Flags.UseExternalRecvBuffers = TRUE; +} + +_IRQL_requires_max_(DISPATCH_LEVEL) +QUIC_STATUS +QuicStreamProvideRecvBuffers( + _In_ QUIC_STREAM* Stream, + _Inout_ CXPLAT_LIST_ENTRY* /* QUIC_RECV_CHUNK */ Chunks +) +{ + return QuicRecvBufferProvideChunks(&Stream->RecvBuffer, Chunks); +} \ No newline at end of file diff --git a/src/core/stream.h b/src/core/stream.h index c210e129fb..87021e194d 100644 --- a/src/core/stream.h +++ b/src/core/stream.h @@ -112,6 +112,7 @@ typedef union QUIC_STREAM_FLAGS { BOOLEAN Initialized : 1; // Initialized successfully. Used for Debugging. BOOLEAN Started : 1; // The app has started the stream. BOOLEAN StartedIndicated : 1; // The app received a start complete event. + BOOLEAN PeerStreamStartEventActive : 1; // The app is processing QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED BOOLEAN Unidirectional : 1; // Sends/receives in 1 direction only. BOOLEAN Opened0Rtt : 1; // A 0-RTT packet opened the stream. BOOLEAN IndicatePeerAccepted : 1; // The app requested the PEER_ACCEPTED event. @@ -140,6 +141,7 @@ typedef union QUIC_STREAM_FLAGS { BOOLEAN SendEnabled : 1; // Application is allowed to send data. BOOLEAN ReceiveEnabled : 1; // Application is ready for receive callbacks. BOOLEAN ReceiveMultiple : 1; // The app supports multiple parallel receive indications. + BOOLEAN UseExternalRecvBuffers : 1; // The stream is using external, app provided receive buffers. BOOLEAN ReceiveFlushQueued : 1; // The receive flush operation is queued. BOOLEAN ReceiveDataPending : 1; // Data (or FIN) is queued and ready for delivery. BOOLEAN ReceiveCallActive : 1; // There is an active receive to the app. @@ -389,7 +391,14 @@ typedef struct QUIC_STREAM { // uint64_t MaxAllowedRecvOffset; + // + // The number of bytes received since the last recv window update. + // uint64_t RecvWindowBytesDelivered; + + // + // Timestamp of the last recv window update. + // uint64_t RecvWindowLastUpdate; // @@ -1005,3 +1014,23 @@ QuicStreamRecvSetEnabledState( _In_ QUIC_STREAM* Stream, _In_ BOOLEAN NewRecvEnabled ); + +// +// Convert a stream receive buffer to external mode. +// +_IRQL_requires_max_(DISPATCH_LEVEL) +void +QuicStreamSwitchToExternalBuffers( + _In_ QUIC_STREAM *Stream + ); + +// +// Provide new chunks for the stream receive buffer. +// Terminate the connection on failure. +// +_IRQL_requires_max_(DISPATCH_LEVEL) +QUIC_STATUS +QuicStreamProvideRecvBuffers( + _In_ QUIC_STREAM* Stream, + _Inout_ CXPLAT_LIST_ENTRY* /* QUIC_RECV_CHUNK */ Chunks + ); \ No newline at end of file diff --git a/src/core/stream_recv.c b/src/core/stream_recv.c index b1ff769d0a..5223156d34 100644 --- a/src/core/stream_recv.c +++ b/src/core/stream_recv.c @@ -870,29 +870,55 @@ QuicStreamRecvFlush( return; } + // + // Have 3 buffers one the stack, it's enough for most use cases. + // + QUIC_BUFFER StackRecvBuffers[3]; + QUIC_BUFFER *RecvBuffers = StackRecvBuffers; + uint32_t RecvBufferCount = ARRAYSIZE(StackRecvBuffers); + BOOLEAN FlushRecv = TRUE; while (FlushRecv) { CXPLAT_DBG_ASSERT(!Stream->Flags.SentStopSending); - // TODO guhetier: Need to allocate a variable nb of buffers - QUIC_BUFFER RecvBuffers[3]; QUIC_STREAM_EVENT Event = {0}; Event.Type = QUIC_STREAM_EVENT_RECEIVE; - Event.RECEIVE.BufferCount = ARRAYSIZE(RecvBuffers); - Event.RECEIVE.Buffers = RecvBuffers; + Event.RECEIVE.BufferCount = ARRAYSIZE(StackRecvBuffers); + Event.RECEIVE.Buffers = StackRecvBuffers; // // Try to read the next available buffers. // BOOLEAN DataAvailable = QuicRecvBufferHasUnreadData(&Stream->RecvBuffer); if (DataAvailable) { + uint32_t NumBuffersNeeded = QuicRecvBufferReadBufferNeededCount(&Stream->RecvBuffer); + if (NumBuffersNeeded > RecvBufferCount) { + // + // We need more buffer than what we have currently, allocate them. + // If the allocation fails, we read what we can with the buffers we have. + // + QUIC_BUFFER* NewRecvBuffers = + CXPLAT_ALLOC_NONPAGED(sizeof(QUIC_BUFFER) * NumBuffersNeeded, QUIC_POOL_RECVBUF); + + if (NewRecvBuffers != NULL) { + if (RecvBuffers != StackRecvBuffers) { + CXPLAT_FREE(RecvBuffers, QUIC_POOL_RECVBUF); + } + RecvBuffers = NewRecvBuffers; + RecvBufferCount = NumBuffersNeeded; + } + } + + Event.RECEIVE.Buffers = RecvBuffers; + Event.RECEIVE.BufferCount = RecvBufferCount; + QuicRecvBufferRead( &Stream->RecvBuffer, &Event.RECEIVE.AbsoluteOffset, &Event.RECEIVE.BufferCount, RecvBuffers); for (uint32_t i = 0; i < Event.RECEIVE.BufferCount; ++i) { - Event.RECEIVE.TotalBufferLength += RecvBuffers[i].Length; + Event.RECEIVE.TotalBufferLength += Event.RECEIVE.Buffers[i].Length; } CXPLAT_DBG_ASSERT(Event.RECEIVE.TotalBufferLength != 0); @@ -984,6 +1010,13 @@ QuicStreamRecvFlush( FlushRecv = QuicStreamReceiveComplete(Stream, BufferLength); } } + + // + // Cleanup receive buffers if needed + // + if (RecvBuffers != StackRecvBuffers) { + CXPLAT_FREE(RecvBuffers, QUIC_POOL_RECVBUF); + } } _IRQL_requires_max_(PASSIVE_LEVEL) diff --git a/src/core/stream_set.c b/src/core/stream_set.c index 5d37a8a15c..fe7d71009b 100644 --- a/src/core/stream_set.c +++ b/src/core/stream_set.c @@ -706,7 +706,10 @@ QuicStreamSetGetStreamForPeer( "Indicating QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED [%p, 0x%x]", Event.PEER_STREAM_STARTED.Stream, Event.PEER_STREAM_STARTED.Flags); + + Stream->Flags.PeerStreamStartEventActive = TRUE; Status = QuicConnIndicateEvent(Connection, &Event); + Stream->Flags.PeerStreamStartEventActive = FALSE; if (QUIC_FAILED(Status)) { QuicTraceLogStreamWarning( diff --git a/src/inc/msquic.h b/src/inc/msquic.h index b8dc5471db..ccf85e9995 100644 --- a/src/inc/msquic.h +++ b/src/inc/msquic.h @@ -199,6 +199,8 @@ typedef enum QUIC_STREAM_OPEN_FLAGS { QUIC_STREAM_OPEN_FLAG_0_RTT = 0x0002, // The stream was opened via a 0-RTT packet. QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES = 0x0004, // Indicates stream ID flow control limit updates for the // connection should be delayed to StreamClose. + QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS = 0x0005, // No buffer will be allocated for the stream, external buffers + // must be provided (see StreamProvideReceiveBuffers) } QUIC_STREAM_OPEN_FLAGS; DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_OPEN_FLAGS) @@ -1563,6 +1565,20 @@ QUIC_STATUS _In_ BOOLEAN IsEnabled ); +// +// Provides receive buffers to the stream. +// The buffers are owned by the caller and must remain valid until a receive +// indication for all bytes in the buffer, or the stream is closed. +// +typedef +_IRQL_requires_max_(DISPATCH_LEVEL) +QUIC_STATUS +(QUIC_API * QUIC_STREAM_PROVIDE_RECEIVE_BUFFERS_FN)( + _In_ _Pre_defensive_ HQUIC Stream, + _In_ uint32_t BufferCount, + _In_reads_(BufferCount) const QUIC_BUFFER* Buffers + ); + // // Datagrams // @@ -1631,6 +1647,8 @@ typedef struct QUIC_API_TABLE { QUIC_CONNECTION_COMP_RESUMPTION_FN ConnectionResumptionTicketValidationComplete; // Available from v2.2 QUIC_CONNECTION_COMP_CERT_FN ConnectionCertificateValidationComplete; // Available from v2.2 + QUIC_STREAM_PROVIDE_RECEIVE_BUFFERS_FN + StreamProvideReceiveBuffers; // Available from v2.2 } QUIC_API_TABLE; #define QUIC_API_VERSION_1 1 // Not supported any more diff --git a/src/inc/quic_trace.h b/src/inc/quic_trace.h index 367088438d..e868d06741 100644 --- a/src/inc/quic_trace.h +++ b/src/inc/quic_trace.h @@ -103,6 +103,7 @@ typedef enum QUIC_TRACE_API_TYPE { QUIC_TRACE_API_DATAGRAM_SEND, QUIC_TRACE_API_CONNECTION_COMPLETE_RESUMPTION_TICKET_VALIDATION, QUIC_TRACE_API_CONNECTION_COMPLETE_CERTIFICATE_VALIDATION, + QUIC_TRACE_API_STREAM_PROVIDE_RECEIVE_BUFFERS, QUIC_TRACE_API_COUNT // Must be last } QUIC_TRACE_API_TYPE;