Skip to content

Commit

Permalink
Support Priority Work on Connections (#4279)
Browse files Browse the repository at this point in the history
* Support Priority Work on Connections

* few nits

* Make CodeCheck builds happy (hopefully)

* Name tweak

* Initial test

* blocking based

* fix priority section management

* Stream start/send priority test

* cleanup

* change test name and support Kernel mode

* add description for each test

* add one more element to QUIC_IOCTL_BUFFER_SIZES

* fix kernel build error by 'new'

* remove tail adjustment

* fix PriorityTail management

* update test

* revert spinquic change

---------

Co-authored-by: ami-GS <[email protected]>
  • Loading branch information
nibanks and ami-GS authored Jun 14, 2024
1 parent 466d8c9 commit b7b2021
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 10 deletions.
37 changes: 30 additions & 7 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,11 @@ MsQuicStreamStart(
//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
if (Flags & QUIC_STREAM_START_FLAG_PRIORITY_WORK) {
QuicConnQueuePriorityOper(Connection, Oper);
} else {
QuicConnQueueOper(Connection, Oper);
}
Status = QUIC_STATUS_PENDING;

Exit:
Expand Down Expand Up @@ -1004,6 +1008,7 @@ MsQuicStreamSend(
uint64_t TotalLength;
QUIC_SEND_REQUEST* SendRequest;
BOOLEAN QueueOper = TRUE;
const BOOLEAN IsPriority = !!(Flags & QUIC_SEND_FLAG_PRIORITY_WORK);
BOOLEAN SendInline;
QUIC_OPERATION* Oper;

Expand Down Expand Up @@ -1170,7 +1175,7 @@ MsQuicStreamSend(
Oper->API_CALL.Context->CONN_SHUTDOWN.ErrorCode = (QUIC_VAR_INT)QUIC_STATUS_OUT_OF_MEMORY;
Oper->API_CALL.Context->CONN_SHUTDOWN.RegistrationShutdown = FALSE;
Oper->API_CALL.Context->CONN_SHUTDOWN.TransportShutdown = TRUE;
QuicConnQueueOper(Connection, Oper);
QuicConnQueueHighestPriorityOper(Connection, Oper);
goto Exit;
}

Expand All @@ -1180,7 +1185,11 @@ MsQuicStreamSend(
//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
if (IsPriority) {
QuicConnQueuePriorityOper(Connection, Oper);
} else {
QuicConnQueueOper(Connection, Oper);
}
}

Exit:
Expand Down Expand Up @@ -1357,6 +1366,9 @@ MsQuicSetParam(
{
CXPLAT_PASSIVE_CODE();

const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY);
Param &= ~QUIC_PARAM_HIGH_PRIORITY;

if ((Handle == NULL) ^ QUIC_PARAM_IS_GLOBAL(Param)) {
//
// Ensure global parameters don't have a handle passed in, and vice
Expand Down Expand Up @@ -1442,7 +1454,11 @@ MsQuicSetParam(
//
// Queue the operation and wait for it to be processed.
//
QuicConnQueueOper(Connection, &Oper);
if (IsPriority) {
QuicConnQueuePriorityOper(Connection, &Oper);
} else {
QuicConnQueueOper(Connection, &Oper);
}
QuicTraceEvent(
ApiWaitOperation,
"[ api] Waiting on operation");
Expand Down Expand Up @@ -1474,6 +1490,9 @@ MsQuicGetParam(
{
CXPLAT_PASSIVE_CODE();

const BOOLEAN IsPriority = !!(Param & QUIC_PARAM_HIGH_PRIORITY);
Param &= ~QUIC_PARAM_HIGH_PRIORITY;

if ((Handle == NULL) ^ QUIC_PARAM_IS_GLOBAL(Param) ||
BufferLength == NULL) {
//
Expand All @@ -1483,14 +1502,14 @@ MsQuicGetParam(
return QUIC_STATUS_INVALID_PARAMETER;
}

QUIC_STATUS Status;

QuicTraceEvent(
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_GET_PARAM,
Handle);

QUIC_STATUS Status;

if (QUIC_PARAM_IS_GLOBAL(Param)) {
//
// Global parameters are processed inline.
Expand Down Expand Up @@ -1560,7 +1579,11 @@ MsQuicGetParam(
//
// Queue the operation and wait for it to be processed.
//
QuicConnQueueOper(Connection, &Oper);
if (IsPriority) {
QuicConnQueuePriorityOper(Connection, &Oper);
} else {
QuicConnQueueOper(Connection, &Oper);
}
QuicTraceEvent(
ApiWaitOperation,
"[ api] Waiting on operation");
Expand Down
22 changes: 22 additions & 0 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,28 @@ QuicConnQueueOper(
}
}

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicConnQueuePriorityOper(
_In_ QUIC_CONNECTION* Connection,
_In_ QUIC_OPERATION* Oper
)
{
#if DEBUG
if (!Connection->State.Initialized) {
CXPLAT_DBG_ASSERT(QuicConnIsServer(Connection));
CXPLAT_DBG_ASSERT(Connection->SourceCids.Next != NULL || CxPlatIsRandomMemoryFailureEnabled());
}
#endif
if (QuicOperationEnqueuePriority(&Connection->OperQ, Oper)) {
//
// The connection needs to be queued on the worker because this was the
// first operation in our OperQ.
//
QuicWorkerQueueConnection(Connection->Worker, Connection); // TODO - Support priority connections on worker?
}
}

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicConnQueueHighestPriorityOper(
Expand Down
7 changes: 7 additions & 0 deletions src/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,13 @@ QuicConnQueueOper(
_In_ QUIC_OPERATION* Oper
);

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicConnQueuePriorityOper(
_In_ QUIC_CONNECTION* Connection,
_In_ QUIC_OPERATION* Oper
);

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicConnQueueHighestPriorityOper(
Expand Down
7 changes: 6 additions & 1 deletion src/core/datagram.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ QuicDatagramQueueSend(
{
QUIC_STATUS Status;
BOOLEAN QueueOper = TRUE;
const BOOLEAN IsPriority = !!(SendRequest->Flags & QUIC_SEND_FLAG_PRIORITY_WORK);
QUIC_CONNECTION* Connection = QuicDatagramGetConnection(Datagram);

CxPlatDispatchLockAcquire(&Datagram->ApiQueueLock);
Expand Down Expand Up @@ -387,7 +388,11 @@ QuicDatagramQueueSend(
//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
if (IsPriority) {
QuicConnQueuePriorityOper(Connection, Oper);
} else {
QuicConnQueueOper(Connection, Oper);
}
}

Exit:
Expand Down
30 changes: 30 additions & 0 deletions src/core/operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ QuicOperationQueueInitialize(
OperQ->ActivelyProcessing = FALSE;
CxPlatDispatchLockInitialize(&OperQ->Lock);
CxPlatListInitializeHead(&OperQ->List);
OperQ->PriorityTail = &OperQ->List.Flink;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
Expand All @@ -46,6 +47,7 @@ QuicOperationQueueUninitialize(
{
UNREFERENCED_PARAMETER(OperQ);
CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&OperQ->List));
CXPLAT_DBG_ASSERT(OperQ->PriorityTail == &OperQ->List.Flink);
CxPlatDispatchLockUninitialize(&OperQ->Lock);
}

Expand Down Expand Up @@ -149,6 +151,27 @@ QuicOperationEnqueue(
return StartProcessing;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN
QuicOperationEnqueuePriority(
_In_ QUIC_OPERATION_QUEUE* OperQ,
_In_ QUIC_OPERATION* Oper
)
{
BOOLEAN StartProcessing;
CxPlatDispatchLockAcquire(&OperQ->Lock);
#if DEBUG
CXPLAT_DBG_ASSERT(Oper->Link.Flink == NULL);
#endif
StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing;
CxPlatListInsertTail(*OperQ->PriorityTail, &Oper->Link);
OperQ->PriorityTail = &Oper->Link.Flink;
CxPlatDispatchLockRelease(&OperQ->Lock);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUE_DEPTH);
return StartProcessing;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN
QuicOperationEnqueueFront(
Expand All @@ -163,6 +186,9 @@ QuicOperationEnqueueFront(
#endif
StartProcessing = CxPlatListIsEmpty(&OperQ->List) && !OperQ->ActivelyProcessing;
CxPlatListInsertHead(&OperQ->List, &Oper->Link);
if (OperQ->PriorityTail == &OperQ->List.Flink) {
OperQ->PriorityTail = &Oper->Link.Flink;
}
CxPlatDispatchLockRelease(&OperQ->Lock);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUED);
QuicPerfCounterIncrement(QUIC_PERF_COUNTER_CONN_OPER_QUEUE_DEPTH);
Expand All @@ -188,6 +214,9 @@ QuicOperationDequeue(
#if DEBUG
Oper->Link.Flink = NULL;
#endif
if (OperQ->PriorityTail == &Oper->Link.Flink) {
OperQ->PriorityTail = &OperQ->List.Flink;
}
}
CxPlatDispatchLockRelease(&OperQ->Lock);

Expand All @@ -210,6 +239,7 @@ QuicOperationQueueClear(
CxPlatDispatchLockAcquire(&OperQ->Lock);
OperQ->ActivelyProcessing = FALSE;
CxPlatListMoveItems(&OperQ->List, &OldList);
OperQ->PriorityTail = &OperQ->List.Flink;
CxPlatDispatchLockRelease(&OperQ->Lock);

int64_t OperationsDequeued = 0;
Expand Down
12 changes: 12 additions & 0 deletions src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ typedef struct QUIC_OPERATION_QUEUE {
//
CXPLAT_DISPATCH_LOCK Lock;
CXPLAT_LIST_ENTRY List;
CXPLAT_LIST_ENTRY** PriorityTail; // Tail of the priority queue.

} QUIC_OPERATION_QUEUE;

Expand Down Expand Up @@ -348,6 +349,17 @@ QuicOperationEnqueue(
_In_ QUIC_OPERATION* Oper
);

//
// Enqueues an operation into the priority part of the queue. Returns TRUE if
// the queue was previously empty and not already being processed.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
BOOLEAN
QuicOperationEnqueuePriority(
_In_ QUIC_OPERATION_QUEUE* OperQ,
_In_ QUIC_OPERATION* Oper
);

//
// Enqueues an operation at the front of the queue. Returns TRUE if the queue
// was previously empty and not already being processed.
Expand Down
5 changes: 5 additions & 0 deletions src/cs/lib/msquic_generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ internal enum QUIC_STREAM_START_FLAGS
FAIL_BLOCKED = 0x0002,
SHUTDOWN_ON_FAIL = 0x0004,
INDICATE_PEER_ACCEPT = 0x0008,
PRIORITY_WORK = 0x0010,
}

[System.Flags]
Expand Down Expand Up @@ -194,6 +195,7 @@ internal enum QUIC_SEND_FLAGS
DGRAM_PRIORITY = 0x0008,
DELAY_SEND = 0x0010,
CANCEL_ON_LOSS = 0x0020,
PRIORITY_WORK = 0x0040,
}

internal enum QUIC_DATAGRAM_SEND_STATE
Expand Down Expand Up @@ -3261,6 +3263,9 @@ internal static unsafe partial class MsQuic
[NativeTypeName("#define QUIC_PARAM_PREFIX_STREAM 0x08000000")]
internal const uint QUIC_PARAM_PREFIX_STREAM = 0x08000000;

[NativeTypeName("#define QUIC_PARAM_HIGH_PRIORITY 0x40000000")]
internal const uint QUIC_PARAM_HIGH_PRIORITY = 0x40000000;

[NativeTypeName("#define QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT 0x01000000")]
internal const uint QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT = 0x01000000;

Expand Down
6 changes: 5 additions & 1 deletion src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ typedef enum QUIC_STREAM_START_FLAGS {
QUIC_STREAM_START_FLAG_FAIL_BLOCKED = 0x0002, // Only opens the stream if flow control allows.
QUIC_STREAM_START_FLAG_SHUTDOWN_ON_FAIL = 0x0004, // Shutdown the stream immediately after start failure.
QUIC_STREAM_START_FLAG_INDICATE_PEER_ACCEPT = 0x0008, // Indicate PEER_ACCEPTED event if not accepted at start.
QUIC_STREAM_START_FLAG_PRIORITY_WORK = 0x0010, // Higher priority than other connection work.
} QUIC_STREAM_START_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_START_FLAGS)
Expand Down Expand Up @@ -241,6 +242,7 @@ typedef enum QUIC_SEND_FLAGS {
QUIC_SEND_FLAG_DGRAM_PRIORITY = 0x0008, // Indicates the datagram is higher priority than others.
QUIC_SEND_FLAG_DELAY_SEND = 0x0010, // Indicates the send should be delayed because more will be queued soon.
QUIC_SEND_FLAG_CANCEL_ON_LOSS = 0x0020, // Indicates that a stream is to be cancelled when packet loss is detected.
QUIC_SEND_FLAG_PRIORITY_WORK = 0x0040, // Higher priority than other connection work.
} QUIC_SEND_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_SEND_FLAGS)
Expand Down Expand Up @@ -828,7 +830,9 @@ void
#define QUIC_PARAM_PREFIX_TLS_SCHANNEL 0x07000000
#define QUIC_PARAM_PREFIX_STREAM 0x08000000

#define QUIC_PARAM_IS_GLOBAL(Param) ((Param & 0x7F000000) == QUIC_PARAM_PREFIX_GLOBAL)
#define QUIC_PARAM_HIGH_PRIORITY 0x40000000 // Combine with any param to make it high priority.

#define QUIC_PARAM_IS_GLOBAL(Param) ((Param & 0x3F000000) == QUIC_PARAM_PREFIX_GLOBAL)

//
// Parameters for Global.
Expand Down
13 changes: 12 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,14 @@ QuicTestStreamBlockUnblockConnFlowControl(
_In_ BOOLEAN Bidirectional
);

void
QuicTestOperationPriority(
);

void
QuicTestConnectionStreamStartSendPriority(
);

void
QuicTestEcn(
_In_ int Family
Expand Down Expand Up @@ -1294,4 +1302,7 @@ typedef struct {
#define IOCTL_QUIC_RUN_NTH_PACKET_DROP \
QUIC_CTL_CODE(121, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 121
#define IOCTL_QUIC_RUN_OPERATION_PRIORITY \
QUIC_CTL_CODE(122, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 122
9 changes: 9 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2220,6 +2220,15 @@ TEST(Misc, StreamAbortConnFlowControl) {
}
}

TEST(Basic, OperationPriority) {
TestLogger Logger("OperationPriority");
if (TestingKernelMode) {
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_OPERATION_PRIORITY));
} else {
QuicTestOperationPriority();
}
}

TEST(Drill, VarIntEncoder) {
TestLogger Logger("QuicDrillTestVarIntEncoder");
if (TestingKernelMode) {
Expand Down
5 changes: 5 additions & 0 deletions src/test/bin/winkernel/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
sizeof(uint32_t),
sizeof(BOOLEAN),
0,
0,
};

CXPLAT_STATIC_ASSERT(
Expand Down Expand Up @@ -1452,6 +1453,10 @@ QuicTestCtlEvtIoDeviceControl(
QuicTestCtlRun(QuicTestNthPacketDrop());
break;

case IOCTL_QUIC_RUN_OPERATION_PRIORITY:
QuicTestCtlRun(QuicTestOperationPriority());
break;

default:
Status = STATUS_NOT_IMPLEMENTED;
break;
Expand Down
Loading

0 comments on commit b7b2021

Please sign in to comment.