Skip to content

Commit

Permalink
implement active-ns pacing
Browse files Browse the repository at this point in the history
  • Loading branch information
a-denoyelle committed Oct 17, 2024
1 parent 34a777a commit 2a01c39
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 13 deletions.
1 change: 1 addition & 0 deletions include/haproxy/mux_quic-t.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct qcc {
struct quic_fctl fc; /* stream flow control applied on sending */
uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */
struct list frms; /* prepared STREAM frames */
ullong next;
} tx;

uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
Expand Down
2 changes: 1 addition & 1 deletion include/haproxy/quic_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void quic_set_connection_close(struct quic_conn *qc, const struct quic_err err);
void quic_set_tls_alert(struct quic_conn *qc, int alert);
int quic_set_app_ops(struct quic_conn *qc, const unsigned char *alpn, size_t alpn_len);
int qc_check_dcid(struct quic_conn *qc, unsigned char *dcid, size_t dcid_len);
enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, int max_pkts);
enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, int *max_pkts);

void qc_notify_err(struct quic_conn *qc);
int qc_notify_send(struct quic_conn *qc);
Expand Down
3 changes: 3 additions & 0 deletions include/haproxy/tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -1246,4 +1246,7 @@ int backup_env(void);
int clean_env(void);
int restore_env(void);

void work_gtod(int usec);


#endif /* _HAPROXY_TOOLS_H */
35 changes: 31 additions & 4 deletions src/mux_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -2082,7 +2082,12 @@ static int qcc_subscribe_send(struct qcc *qcc)
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content)
{
enum quic_tx_err ret;
int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0;
//int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0;

struct quic_conn *qc = qcc->conn->handle.qc;
ullong ns_pkts = qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1);
int max_burst = strm_content ? 4000000 / (ns_pkts + 1) + 1 : 0;
//int max_burst = 1;

TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);

Expand All @@ -2091,14 +2096,14 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content)
return -1;
}

ret = qc_send_mux(qcc->conn->handle.qc, frms, max_burst);
ret = qc_send_mux(qcc->conn->handle.qc, frms, &max_burst);
if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
goto err;
}

BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst);
//BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst);

/* If there is frames left at this stage, transport layer is blocked.
* Subscribe on it to retry later.
Expand All @@ -2109,6 +2114,10 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content)
goto err;
}

BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst);
qcc->tx.next = now_mono_time() + (qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1)) * max_burst;
//qcc->tx.next = now_mono_time() + (MAX(qc->path->loss.srtt, 10) * 800000 / (qc->path->cwnd / 1200 + 1)) * max_burst;

TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;

Expand Down Expand Up @@ -2261,7 +2270,7 @@ static int qcc_io_send(struct qcc *qcc)
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
int ret, total = 0, resent;
int ret = 0, total = 0, resent;

TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);

Expand Down Expand Up @@ -2371,6 +2380,11 @@ static int qcc_io_send(struct qcc *qcc)
}
}

if (qcc->tx.next > now_mono_time()) {
qcc_wakeup_pacing(qcc);
return 1;
}

/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
*/
Expand Down Expand Up @@ -2407,6 +2421,9 @@ static int qcc_io_send(struct qcc *qcc)
sent_done:
/* Deallocate frames that the transport layer has rejected. */
if (ret == 1) {
//struct quic_conn *qc = qcc->conn->handle.qc;
//qcc->tx.next = now_ns + global.tune.pipesize;
//qcc->tx.next = now_mono_time() + qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1);
qcc_wakeup_pacing(qcc);
}
else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
Expand Down Expand Up @@ -2767,9 +2784,17 @@ static int qcc_purge_sending(struct qcc *qcc)
{
int ret;

if (qcc->tx.next > now_mono_time()) {
qcc_wakeup_pacing(qcc);
return 1;
}

//fprintf(stderr, "%s\n", __func__);
ret = qcc_send_frames(qcc, &qcc->tx.frms, 1);
if (ret > 0) {
//struct quic_conn *qc = qcc->conn->handle.qc;
//qcc->tx.next = now_ns + global.tune.pipesize;
//qcc->tx.next = now_mono_time() + qc->path->loss.srtt * 1000000 / (qc->path->cwnd / 1200 + 1);
qcc_wakeup_pacing(qcc);
return 1;
}
Expand Down Expand Up @@ -2824,6 +2849,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
int expired = tick_is_expired(t->expire, now_ms);

TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc ? qcc->conn : NULL);
//ABORT_NOW();

if (qcc) {
if (!expired) {
Expand Down Expand Up @@ -2876,6 +2902,7 @@ static void _qcc_init(struct qcc *qcc)
qcc->wait_event.tasklet = NULL;
qcc->app_ops = NULL;
qcc->streams_by_id = EB_ROOT_UNIQUE;
qcc->tx.next = 0;
LIST_INIT(&qcc->lfctl.frms);
LIST_INIT(&qcc->tx.frms);
}
Expand Down
18 changes: 10 additions & 8 deletions src/quic_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ static int qc_send_ppkts(struct buffer *buf, struct ssl_sock_ctx *ctx)
qc->bytes.tx += tmpbuf.data;
time_sent = now_ms;

//work_gtod(global.tune.pipesize);

for (pkt = first_pkt; pkt; pkt = next_pkt) {
struct quic_cc *cc = &qc->path->cc;

Expand Down Expand Up @@ -469,11 +471,11 @@ int qc_purge_txbuf(struct quic_conn *qc, struct buffer *buf)
* Returns the result from qc_send() function.
*/
enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
int max_dgram)
int *max_dgram)
{
struct list send_list = LIST_HEAD_INIT(send_list);
enum quic_tx_err ret = QUIC_TX_ERR_NONE;
int max = max_dgram;
int max = *max_dgram;

TRACE_ENTER(QUIC_EV_CONN_TXPKT, qc);
BUG_ON(qc->mux_state != QC_MUX_READY); /* Only MUX can uses this function so it must be ready. */
Expand All @@ -494,14 +496,14 @@ enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,

TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
qel_register_send(&send_list, qc->ael, frms);
if (!qc_send(qc, 0, &send_list, max_dgram ? &max : NULL)) {
if (!qc_send(qc, 0, &send_list, *max_dgram ? &max : NULL))
ret = QUIC_TX_ERR_FATAL;
ABORT_NOW();
}

if (max_dgram && !max) {
else if (*max_dgram && !max)
ret = QUIC_TX_ERR_AGAIN;
//ABORT_NOW();
else {
if (*max_dgram)
*max_dgram = *max_dgram - max;

}

TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc);
Expand Down
20 changes: 20 additions & 0 deletions src/tools.c
Original file line number Diff line number Diff line change
Expand Up @@ -6958,6 +6958,26 @@ void free_all_file_names()
HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &file_names.lock);
}

void work_gtod(int usec)
{
struct timeval now, expire;

gettimeofday(&expire, NULL);
expire.tv_sec += usec / 1000000;
expire.tv_usec += usec % 1000000;

if (expire.tv_usec >= 1000000) {
expire.tv_usec -= 1000000;
expire.tv_sec += 1;
}

do {
gettimeofday(&now, NULL);
} while (now.tv_sec < expire.tv_sec ||
(now.tv_sec == expire.tv_sec &&
now.tv_usec < expire.tv_usec));
}

/*
* Local variables:
* c-indent-level: 8
Expand Down

0 comments on commit 2a01c39

Please sign in to comment.