From 7dd9541dab4febc3af630b28c148a3c0f82a5989 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 16 Oct 2024 18:17:14 +0200 Subject: [PATCH] TMP --- include/haproxy/quic_conn.h | 2 +- include/haproxy/quic_tx-t.h | 6 ++++ src/mux_quic.c | 65 ++++++++++++++++++++++++++++++------- src/quic_tx.c | 13 +++++--- 4 files changed, 70 insertions(+), 16 deletions(-) diff --git a/include/haproxy/quic_conn.h b/include/haproxy/quic_conn.h index 212e187ba..36bff3be8 100644 --- a/include/haproxy/quic_conn.h +++ b/include/haproxy/quic_conn.h @@ -163,7 +163,7 @@ void quic_set_connection_close(struct quic_conn *qc, const struct quic_err err); void quic_set_tls_alert(struct quic_conn *qc, int alert); int quic_set_app_ops(struct quic_conn *qc, const unsigned char *alpn, size_t alpn_len); int qc_check_dcid(struct quic_conn *qc, unsigned char *dcid, size_t dcid_len); -int qc_send_mux(struct quic_conn *qc, struct list *frms); +enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, int max_pkts); void qc_notify_err(struct quic_conn *qc); int qc_notify_send(struct quic_conn *qc); diff --git a/include/haproxy/quic_tx-t.h b/include/haproxy/quic_tx-t.h index efbdfe687..5f7e23422 100644 --- a/include/haproxy/quic_tx-t.h +++ b/include/haproxy/quic_tx-t.h @@ -64,4 +64,10 @@ enum qc_build_pkt_err { QC_BUILD_PKT_ERR_BUFROOM, /* no more room in input buf or congestion window */ }; +enum quic_tx_err { + QUIC_TX_ERR_NONE, + QUIC_TX_ERR_AGAIN, + QUIC_TX_ERR_FATAL, +}; + #endif /* _HAPROXY_TX_T_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 0ebbd6d0b..e35b17d9d 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -388,6 +389,13 @@ static void qcc_refresh_timeout(struct qcc *qcc) static void qcc_wakeup(struct qcc *qcc) { + HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1); + tasklet_wakeup(qcc->wait_event.tasklet); +} + +static void qcc_wakeup_pacing(struct qcc *qcc) +{ + HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1); tasklet_wakeup(qcc->wait_event.tasklet); } @@ -2071,36 +2079,42 @@ static int qcc_subscribe_send(struct qcc *qcc) * * Returns 0 if all data sent with success else non-zero. */ -static int qcc_send_frames(struct qcc *qcc, struct list *frms) +static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content) { + enum quic_tx_err ret; + int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0; + TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); if (LIST_ISEMPTY(frms)) { TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } - if (!qc_send_mux(qcc->conn->handle.qc, frms)) { + ret = qc_send_mux(qcc->conn->handle.qc, frms, max_burst); + if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); goto err; } + BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst); + /* If there is frames left at this stage, transport layer is blocked. * Subscribe on it to retry later. */ - if (!LIST_ISEMPTY(frms)) { + if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) { TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); goto err; } TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); - return 0; + return ret == QUIC_TX_ERR_AGAIN ? 1 : 0; err: TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); - return 1; + return -1; } /* Emit a RESET_STREAM on . @@ -2125,7 +2139,7 @@ static int qcs_send_reset(struct qcs *qcs) frm->reset_stream.final_size = qcs->tx.fc.off_real; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcs->qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2176,7 +2190,7 @@ static int qcs_send_stop_sending(struct qcs *qcs) frm->stop_sending.app_error_code = qcs->err; LIST_APPEND(&frms, &frm->list); - if (qcc_send_frames(qcs->qcc, &frms)) { + if (qcc_send_frames(qcs->qcc, &frms, 0)) { if (!LIST_ISEMPTY(&frms)) qc_frm_free(qcc->conn->handle.qc, &frm); TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); @@ -2281,7 +2295,7 @@ static int qcc_io_send(struct qcc *qcc) } if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { - if (qcc_send_frames(qcc, &qcc->lfctl.frms)) { + if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) { TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); goto out; } @@ -2360,7 +2374,7 @@ static int qcc_io_send(struct qcc *qcc) /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while (qcc_send_frames(qcc, &qcc->tx.frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + while ((ret = qcc_send_frames(qcc, &qcc->tx.frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { window_conn = qfctl_rcap(&qcc->tx.fc); resent = 0; @@ -2392,7 +2406,10 @@ static int qcc_io_send(struct qcc *qcc) sent_done: /* Deallocate frames that the transport layer has rejected. */ - if (!LIST_ISEMPTY(&qcc->tx.frms)) { + if (ret == 1) { + qcc_wakeup_pacing(qcc); + } + else if (!LIST_ISEMPTY(&qcc->tx.frms)) { struct quic_frame *frm, *frm2; list_for_each_entry_safe(frm, frm2, &qcc->tx.frms, list) @@ -2746,12 +2763,38 @@ static void qcc_release(struct qcc *qcc) TRACE_LEAVE(QMUX_EV_QCC_END); } +static int qcc_purge_sending(struct qcc *qcc) +{ + int ret; + + //fprintf(stderr, "%s\n", __func__); + ret = qcc_send_frames(qcc, &qcc->tx.frms, 1); + if (ret > 0) { + qcc_wakeup_pacing(qcc); + return 1; + } + + return 0; +} + struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) { struct qcc *qcc = ctx; TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + if (status & TASK_F_USR1) { + qcc_purge_sending(qcc); + return NULL; + } + else { + while (!LIST_ISEMPTY(&qcc->tx.frms)) { + struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list); + qc_frm_free(qcc->conn->handle.qc, &frm); + } + LIST_INIT(&qcc->tx.frms); + } + if (!(qcc->wait_event.events & SUB_RETRY_SEND)) qcc_io_send(qcc); diff --git a/src/quic_tx.c b/src/quic_tx.c index 2f2201f8c..5569ed780 100644 --- a/src/quic_tx.c +++ b/src/quic_tx.c @@ -468,10 +468,12 @@ int qc_purge_txbuf(struct quic_conn *qc, struct buffer *buf) * * Returns the result from qc_send() function. */ -int qc_send_mux(struct quic_conn *qc, struct list *frms) +enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, + int max_dgram) { struct list send_list = LIST_HEAD_INIT(send_list); - int ret; + enum quic_tx_err ret = QUIC_TX_ERR_NONE; + int max = max_dgram; TRACE_ENTER(QUIC_EV_CONN_TXPKT, qc); BUG_ON(qc->mux_state != QC_MUX_READY); /* Only MUX can uses this function so it must be ready. */ @@ -479,7 +481,7 @@ int qc_send_mux(struct quic_conn *qc, struct list *frms) if (qc->conn->flags & CO_FL_SOCK_WR_SH) { qc->conn->flags |= CO_FL_ERROR | CO_FL_SOCK_RD_SH; TRACE_DEVEL("connection on error", QUIC_EV_CONN_TXPKT, qc); - return 0; + return QUIC_TX_ERR_FATAL; } /* Try to send post handshake frames first unless on 0-RTT. */ @@ -492,7 +494,10 @@ int qc_send_mux(struct quic_conn *qc, struct list *frms) TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc); qel_register_send(&send_list, qc->ael, frms); - ret = qc_send(qc, 0, &send_list, NULL); + if (!qc_send(qc, 0, &send_list, max_dgram ? &max : NULL)) + ret = QUIC_TX_ERR_FATAL; + else if (max_dgram && !max) + ret = QUIC_TX_ERR_AGAIN; TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc); return ret;