diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 381a49264..b3cca303e 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -69,6 +69,7 @@ struct qcc { struct quic_fctl fc; /* stream flow control applied on sending */ uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */ struct list frms; /* prepared STREAM frames */ + ullong next; } tx; uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ diff --git a/include/haproxy/quic_conn.h b/include/haproxy/quic_conn.h index 36bff3be8..783c82e01 100644 --- a/include/haproxy/quic_conn.h +++ b/include/haproxy/quic_conn.h @@ -163,7 +163,7 @@ void quic_set_connection_close(struct quic_conn *qc, const struct quic_err err); void quic_set_tls_alert(struct quic_conn *qc, int alert); int quic_set_app_ops(struct quic_conn *qc, const unsigned char *alpn, size_t alpn_len); int qc_check_dcid(struct quic_conn *qc, unsigned char *dcid, size_t dcid_len); -enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, int max_pkts); +enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, int *max_pkts); void qc_notify_err(struct quic_conn *qc); int qc_notify_send(struct quic_conn *qc); diff --git a/include/haproxy/tools.h b/include/haproxy/tools.h index 8d1afb5d7..be994bc3c 100644 --- a/include/haproxy/tools.h +++ b/include/haproxy/tools.h @@ -1246,4 +1246,7 @@ int backup_env(void); int clean_env(void); int restore_env(void); +void work_gtod(int usec); + + #endif /* _HAPROXY_TOOLS_H */ diff --git a/src/mux_quic.c b/src/mux_quic.c index e35b17d9d..b3a0d3b34 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -2082,7 +2082,12 @@ static int qcc_subscribe_send(struct qcc *qcc) static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content) { enum quic_tx_err ret; - int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0; + //int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0; + + struct quic_conn *qc = qcc->conn->handle.qc; + ullong ns_pkts = qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1); + int max_burst = strm_content ? 4000000 / (ns_pkts + 1) + 1 : 0; + //int max_burst = 1; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2091,14 +2096,14 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content) return -1; } - ret = qc_send_mux(qcc->conn->handle.qc, frms, max_burst); + ret = qc_send_mux(qcc->conn->handle.qc, frms, &max_burst); if (ret == QUIC_TX_ERR_FATAL) { TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn); qcc_subscribe_send(qcc); goto err; } - BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst); + //BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst); /* If there is frames left at this stage, transport layer is blocked. * Subscribe on it to retry later. @@ -2109,6 +2114,10 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content) goto err; } + BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst); + qcc->tx.next = now_mono_time() + (qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1)) * max_burst; + //qcc->tx.next = now_mono_time() + (MAX(qc->path->loss.srtt, 10) * 800000 / (qc->path->cwnd / 1200 + 1)) * max_burst; + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); return ret == QUIC_TX_ERR_AGAIN ? 1 : 0; @@ -2261,7 +2270,7 @@ static int qcc_io_send(struct qcc *qcc) struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); - int ret, total = 0, resent; + int ret = 0, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2371,6 +2380,11 @@ static int qcc_io_send(struct qcc *qcc) } } + if (qcc->tx.next > now_mono_time()) { + qcc_wakeup_pacing(qcc); + return 1; + } + /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ @@ -2407,6 +2421,9 @@ static int qcc_io_send(struct qcc *qcc) sent_done: /* Deallocate frames that the transport layer has rejected. */ if (ret == 1) { + //struct quic_conn *qc = qcc->conn->handle.qc; + //qcc->tx.next = now_ns + global.tune.pipesize; + //qcc->tx.next = now_mono_time() + qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1); qcc_wakeup_pacing(qcc); } else if (!LIST_ISEMPTY(&qcc->tx.frms)) { @@ -2767,9 +2784,17 @@ static int qcc_purge_sending(struct qcc *qcc) { int ret; + if (qcc->tx.next > now_mono_time()) { + qcc_wakeup_pacing(qcc); + return 1; + } + //fprintf(stderr, "%s\n", __func__); ret = qcc_send_frames(qcc, &qcc->tx.frms, 1); if (ret > 0) { + //struct quic_conn *qc = qcc->conn->handle.qc; + //qcc->tx.next = now_ns + global.tune.pipesize; + //qcc->tx.next = now_mono_time() + qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1); qcc_wakeup_pacing(qcc); return 1; } @@ -2824,6 +2849,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta int expired = tick_is_expired(t->expire, now_ms); TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc ? qcc->conn : NULL); + //ABORT_NOW(); if (qcc) { if (!expired) { @@ -2876,6 +2902,7 @@ static void _qcc_init(struct qcc *qcc) qcc->wait_event.tasklet = NULL; qcc->app_ops = NULL; qcc->streams_by_id = EB_ROOT_UNIQUE; + qcc->tx.next = 0; LIST_INIT(&qcc->lfctl.frms); LIST_INIT(&qcc->tx.frms); } diff --git a/src/quic_tx.c b/src/quic_tx.c index c6cf77e4e..9ebce96e8 100644 --- a/src/quic_tx.c +++ b/src/quic_tx.c @@ -359,6 +359,8 @@ static int qc_send_ppkts(struct buffer *buf, struct ssl_sock_ctx *ctx) qc->bytes.tx += tmpbuf.data; time_sent = now_ms; + //work_gtod(global.tune.pipesize); + for (pkt = first_pkt; pkt; pkt = next_pkt) { struct quic_cc *cc = &qc->path->cc; @@ -469,11 +471,11 @@ int qc_purge_txbuf(struct quic_conn *qc, struct buffer *buf) * Returns the result from qc_send() function. */ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, - int max_dgram) + int *max_dgram) { struct list send_list = LIST_HEAD_INIT(send_list); enum quic_tx_err ret = QUIC_TX_ERR_NONE; - int max = max_dgram; + int max = *max_dgram; TRACE_ENTER(QUIC_EV_CONN_TXPKT, qc); BUG_ON(qc->mux_state != QC_MUX_READY); /* Only MUX can uses this function so it must be ready. */ @@ -494,14 +496,14 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc); qel_register_send(&send_list, qc->ael, frms); - if (!qc_send(qc, 0, &send_list, max_dgram ? &max : NULL)) { + if (!qc_send(qc, 0, &send_list, *max_dgram ? &max : NULL)) ret = QUIC_TX_ERR_FATAL; - ABORT_NOW(); - } - - if (max_dgram && !max) { + else if (*max_dgram && !max) ret = QUIC_TX_ERR_AGAIN; - //ABORT_NOW(); + else { + if (*max_dgram) + *max_dgram = *max_dgram - max; + } TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc); diff --git a/src/tools.c b/src/tools.c index 49c273bb6..1fb32744b 100644 --- a/src/tools.c +++ b/src/tools.c @@ -6958,6 +6958,26 @@ void free_all_file_names() HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &file_names.lock); } +void work_gtod(int usec) +{ + struct timeval now, expire; + + gettimeofday(&expire, NULL); + expire.tv_sec += usec / 1000000; + expire.tv_usec += usec % 1000000; + + if (expire.tv_usec >= 1000000) { + expire.tv_usec -= 1000000; + expire.tv_sec += 1; + } + + do { + gettimeofday(&now, NULL); + } while (now.tv_sec < expire.tv_sec || + (now.tv_sec == expire.tv_sec && + now.tv_usec < expire.tv_usec)); +} + /* * Local variables: * c-indent-level: 8