Skip to content

Commit

Permalink
MAJOR: mux-quic: support pacing emission
Browse files Browse the repository at this point in the history
  • Loading branch information
a-denoyelle committed Oct 25, 2024
1 parent 51d8596 commit 07ac1b1
Showing 1 changed file with 74 additions and 12 deletions.
86 changes: 74 additions & 12 deletions src/mux_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)

static void qcc_wakeup(struct qcc *qcc)
{
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
}

static void qcc_wakeup_pacing(struct qcc *qcc)
{
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
}

Expand Down Expand Up @@ -2083,18 +2090,18 @@ static int qcc_subscribe_send(struct qcc *qcc)
*
* Returns 0 if all data sent with success else non-zero.
*/
static int qcc_send_frames(struct qcc *qcc, struct list *frms)
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
{
enum quic_tx_err ret;

TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);

if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
return 1;
return -1;
}

ret = qc_send_mux(qcc->conn->handle.qc, frms, 0);
ret = qc_send_mux(qcc->conn->handle.qc, frms, stream);
if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
Expand All @@ -2104,18 +2111,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
/* If there is frames left at this stage, transport layer is blocked.
* Subscribe on it to retry later.
*/
if (!LIST_ISEMPTY(frms)) {
if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
goto err;
}

TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return 0;
return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;

err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
return 1;
return -1;
}

/* Emit a RESET_STREAM on <qcs>.
Expand All @@ -2140,7 +2147,7 @@ static int qcs_send_reset(struct qcs *qcs)
frm->reset_stream.final_size = qcs->tx.fc.off_real;

LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) {
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
Expand Down Expand Up @@ -2191,7 +2198,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
frm->stop_sending.app_error_code = qcs->err;

LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) {
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
Expand Down Expand Up @@ -2263,7 +2270,7 @@ static int qcc_io_send(struct qcc *qcc)
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
int ret, total = 0, resent;
int ret = 0, total = 0, resent;

TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);

Expand Down Expand Up @@ -2297,7 +2304,7 @@ static int qcc_io_send(struct qcc *qcc)
}

if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
Expand Down Expand Up @@ -2373,10 +2380,15 @@ static int qcc_io_send(struct qcc *qcc)
}
}

if (!quic_pacing_expired(pacer)) {
qcc_wakeup_pacing(qcc);
return 1;
}

/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
*/
while ((ret = qcc_send_frames(qcc, quic_pacing_frms(pacer))) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
while ((ret = qcc_send_frames(qcc, quic_pacing_frms(pacer), 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0;

Expand Down Expand Up @@ -2408,7 +2420,10 @@ static int qcc_io_send(struct qcc *qcc)

sent_done:
/* Deallocate frames that the transport layer has rejected. */
if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
if (ret == 1) {
qcc_wakeup_pacing(qcc);
}
else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
struct quic_frame *frm, *frm2;

list_for_each_entry_safe(frm, frm2, quic_pacing_frms(pacer), list)
Expand Down Expand Up @@ -2760,12 +2775,59 @@ static void qcc_release(struct qcc *qcc)
TRACE_LEAVE(QMUX_EV_QCC_END);
}

static void qcc_purge_sending(struct qcc *qcc)
{
#if 0
int ret;

if (qcc->tx.next > now_mono_time()) {
qcc_wakeup_pacing(qcc);
return 1;
}

//fprintf(stderr, "%s\n", __func__);
ret = qcc_send_frames(qcc, &qcc->tx.frms, 1);
if (ret > 0) {
//struct quic_conn *qc = qcc->conn->handle.qc;
//qcc->tx.next = now_ns + global.tune.pipesize;
//qcc->tx.next = now_mono_time() + qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1);
qcc_wakeup_pacing(qcc);
return 1;
}
#endif

struct quic_conn *qc = qcc->conn->handle.qc;
enum quic_tx_err ret;

ret = quic_pacing_send(qcc_tx_pacer(qcc), qc);
if (ret == QUIC_TX_ERR_AGAIN) {
BUG_ON(LIST_ISEMPTY(&qcc_tx_pacer(qcc)->frms));
qcc_wakeup_pacing(qcc);
}
else if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
qcc_subscribe_send(qcc);
}
else {
//HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
}
}

struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{
struct qcc *qcc = ctx;

TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);

if (status & TASK_F_USR1) {
qcc_purge_sending(qcc);
return NULL;
}
else {
quic_pacing_reset(qcc_tx_pacer(qcc));
}

if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc);

Expand Down

0 comments on commit 07ac1b1

Please sign in to comment.