Skip to content

Commit

Permalink
Merge branch 'rpma-update-RPMA-engines-with-new-librpma-completions-A…
Browse files Browse the repository at this point in the history
…PI' of https://github.com/ldorau/fio

* 'rpma-update-RPMA-engines-with-new-librpma-completions-API' of https://github.com/ldorau/fio:
  rpma: update RPMA engines with new librpma completions API
  rpma: RPMA engines require librpma>=v0.11.0 with rpma_cq_get_wc()
  • Loading branch information
axboe committed Feb 18, 2022
2 parents c99c81a + 4ef7dd2 commit 933651e
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 52 deletions.
4 changes: 2 additions & 2 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -974,15 +974,15 @@ print_config "rdmacm" "$rdmacm"

##########################################
# librpma probe
# The librpma engine requires librpma>=v0.10.0 with rpma_mr_advise().
# The librpma engines require librpma>=v0.11.0 with rpma_cq_get_wc().
if test "$librpma" != "yes" ; then
librpma="no"
fi
cat > $TMPC << EOF
#include <librpma.h>
int main(void)
{
void *ptr = rpma_mr_advise;
void *ptr = rpma_cq_get_wc;
(void) ptr; /* unused */
return 0;
}
Expand Down
8 changes: 3 additions & 5 deletions engines/librpma_apm.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ static inline int client_io_flush(struct thread_data *td,
struct io_u *first_io_u, struct io_u *last_io_u,
unsigned long long int len);

static int client_get_io_u_index(struct rpma_completion *cmpl,
unsigned int *io_u_index);
static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);

static int client_init(struct thread_data *td)
{
Expand Down Expand Up @@ -188,10 +187,9 @@ static inline int client_io_flush(struct thread_data *td,
return 0;
}

static int client_get_io_u_index(struct rpma_completion *cmpl,
unsigned int *io_u_index)
static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
{
memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
memcpy(io_u_index, &wc->wr_id, sizeof(*io_u_index));

return 1;
}
Expand Down
46 changes: 29 additions & 17 deletions engines/librpma_fio.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ int librpma_fio_client_init(struct thread_data *td,
if (ccd->conn == NULL)
goto err_peer_delete;

/* get the connection's main CQ */
if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
librpma_td_verror(td, ret, "rpma_conn_get_cq");
goto err_conn_delete;
}

/* get the connection's private data sent from the server */
if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
librpma_td_verror(td, ret, "rpma_conn_get_private_data");
Expand Down Expand Up @@ -455,7 +461,7 @@ static enum fio_q_status client_queue_sync(struct thread_data *td,
struct io_u *io_u)
{
struct librpma_fio_client_data *ccd = td->io_ops_data;
struct rpma_completion cmpl;
struct ibv_wc wc;
unsigned io_u_index;
int ret;

Expand All @@ -478,31 +484,31 @@ static enum fio_q_status client_queue_sync(struct thread_data *td,

do {
/* get a completion */
ret = rpma_conn_completion_get(ccd->conn, &cmpl);
ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
continue;
} else if (ret != 0) {
/* an error occurred */
librpma_td_verror(td, ret, "rpma_conn_completion_get");
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err;
}

/* if io_us has completed with an error */
if (cmpl.op_status != IBV_WC_SUCCESS)
if (wc.status != IBV_WC_SUCCESS)
goto err;

if (cmpl.op == RPMA_OP_SEND)
if (wc.opcode == IBV_WC_SEND)
++ccd->op_send_completed;
else {
if (cmpl.op == RPMA_OP_RECV)
if (wc.opcode == IBV_WC_RECV)
++ccd->op_recv_completed;

break;
}
} while (1);

if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
goto err;

if (io_u->index != io_u_index) {
Expand Down Expand Up @@ -654,8 +660,8 @@ int librpma_fio_client_commit(struct thread_data *td)
static int client_getevent_process(struct thread_data *td)
{
struct librpma_fio_client_data *ccd = td->io_ops_data;
struct rpma_completion cmpl;
/* io_u->index of completed io_u (cmpl.op_context) */
struct ibv_wc wc;
/* io_u->index of completed io_u (wc.wr_id) */
unsigned int io_u_index;
/* # of completed io_us */
int cmpl_num = 0;
Expand All @@ -665,30 +671,30 @@ static int client_getevent_process(struct thread_data *td)
int ret;

/* get a completion */
if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
/* lack of completion is not an error */
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
return 0;
}

/* an error occurred */
librpma_td_verror(td, ret, "rpma_conn_completion_get");
librpma_td_verror(td, ret, "rpma_cq_get_wc");
return -1;
}

/* if io_us has completed with an error */
if (cmpl.op_status != IBV_WC_SUCCESS) {
td->error = cmpl.op_status;
if (wc.status != IBV_WC_SUCCESS) {
td->error = wc.status;
return -1;
}

if (cmpl.op == RPMA_OP_SEND)
if (wc.opcode == IBV_WC_SEND)
++ccd->op_send_completed;
else if (cmpl.op == RPMA_OP_RECV)
else if (wc.opcode == IBV_WC_RECV)
++ccd->op_recv_completed;

if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
return ret;

/* look for an io_u being completed */
Expand Down Expand Up @@ -750,7 +756,7 @@ int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,

/*
* To reduce CPU consumption one can use
* the rpma_conn_completion_wait() function.
* the rpma_cq_wait() function.
* Note this greatly increase the latency
* and make the results less stable.
* The bandwidth stays more or less the same.
Expand Down Expand Up @@ -1029,6 +1035,12 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
csd->ws_ptr = ws_ptr;
csd->conn = conn;

/* get the connection's main CQ */
if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
librpma_td_verror(td, ret, "rpma_conn_get_cq");
goto err_conn_delete;
}

return 0;

err_conn_delete:
Expand Down
16 changes: 9 additions & 7 deletions engines/librpma_fio.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ typedef int (*librpma_fio_flush_t)(struct thread_data *td,
* - ( 0) - skip
* - (-1) - on error
*/
typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl,
typedef int (*librpma_fio_get_io_u_index_t)(struct ibv_wc *wc,
unsigned int *io_u_index);

struct librpma_fio_client_data {
struct rpma_peer *peer;
struct rpma_conn *conn;
struct rpma_cq *cq;

/* aligned td->orig_buffer */
char *orig_buffer_aligned;
Expand Down Expand Up @@ -199,29 +200,29 @@ static inline int librpma_fio_client_io_complete_all_sends(
struct thread_data *td)
{
struct librpma_fio_client_data *ccd = td->io_ops_data;
struct rpma_completion cmpl;
struct ibv_wc wc;
int ret;

while (ccd->op_send_posted != ccd->op_send_completed) {
/* get a completion */
ret = rpma_conn_completion_get(ccd->conn, &cmpl);
ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
continue;
} else if (ret != 0) {
/* an error occurred */
librpma_td_verror(td, ret, "rpma_conn_completion_get");
librpma_td_verror(td, ret, "rpma_cq_get_wc");
break;
}

if (cmpl.op_status != IBV_WC_SUCCESS)
if (wc.status != IBV_WC_SUCCESS)
return -1;

if (cmpl.op == RPMA_OP_SEND)
if (wc.opcode == IBV_WC_SEND)
++ccd->op_send_completed;
else {
log_err(
"A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n");
"A completion other than IBV_WC_SEND got during cleaning up the CQ from SENDs\n");
return -1;
}
}
Expand Down Expand Up @@ -251,6 +252,7 @@ struct librpma_fio_server_data {

/* resources of an incoming connection */
struct rpma_conn *conn;
struct rpma_cq *cq;

char *ws_ptr;
struct rpma_mr_local *ws_mr;
Expand Down
39 changes: 18 additions & 21 deletions engines/librpma_gpspm.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ static inline int client_io_flush(struct thread_data *td,
struct io_u *first_io_u, struct io_u *last_io_u,
unsigned long long int len);

static int client_get_io_u_index(struct rpma_completion *cmpl,
unsigned int *io_u_index);
static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);

static int client_init(struct thread_data *td)
{
Expand Down Expand Up @@ -317,17 +316,16 @@ static inline int client_io_flush(struct thread_data *td,
return 0;
}

static int client_get_io_u_index(struct rpma_completion *cmpl,
unsigned int *io_u_index)
static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
{
GPSPMFlushResponse *flush_resp;

if (cmpl->op != RPMA_OP_RECV)
if (wc->opcode != IBV_WC_RECV)
return 0;

/* unpack a response from the received buffer */
flush_resp = gpspm_flush_response__unpack(NULL,
cmpl->byte_len, cmpl->op_context);
wc->byte_len, (void *)wc->wr_id);
if (flush_resp == NULL) {
log_err("Cannot unpack the flush response buffer\n");
return -1;
Expand Down Expand Up @@ -373,7 +371,7 @@ struct server_data {
uint32_t msg_sqe_available; /* # of free SQ slots */

/* in-memory queues */
struct rpma_completion *msgs_queued;
struct ibv_wc *msgs_queued;
uint32_t msg_queued_nr;
};

Expand Down Expand Up @@ -562,8 +560,7 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
return ret;
}

static int server_qe_process(struct thread_data *td,
struct rpma_completion *cmpl)
static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
Expand All @@ -580,15 +577,15 @@ static int server_qe_process(struct thread_data *td,
int ret;

/* calculate SEND/RECV pair parameters */
msg_index = (int)(uintptr_t)cmpl->op_context;
msg_index = (int)(uintptr_t)wc->wr_id;
io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
send_buff_offset = io_u_buff_offset + SEND_OFFSET;
recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;

/* unpack a flush request from the received buffer */
flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
recv_buff_ptr);
if (flush_req == NULL) {
log_err("cannot unpack the flush request buffer\n");
Expand Down Expand Up @@ -682,46 +679,46 @@ static int server_cmpl_process(struct thread_data *td)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
struct librpma_fio_options_values *o = td->eo;
int ret;

ret = rpma_conn_completion_get(csd->conn, cmpl);
ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
if (o->busy_wait_polling == 0) {
ret = rpma_conn_completion_wait(csd->conn);
ret = rpma_cq_wait(csd->cq);
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
return 0;
} else if (ret != 0) {
librpma_td_verror(td, ret, "rpma_conn_completion_wait");
librpma_td_verror(td, ret, "rpma_cq_wait");
goto err_terminate;
}

ret = rpma_conn_completion_get(csd->conn, cmpl);
ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
return 0;
} else if (ret != 0) {
librpma_td_verror(td, ret, "rpma_conn_completion_get");
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}
} else {
/* lack of completion is not an error */
return 0;
}
} else if (ret != 0) {
librpma_td_verror(td, ret, "rpma_conn_completion_get");
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}

/* validate the completion */
if (cmpl->op_status != IBV_WC_SUCCESS)
if (wc->status != IBV_WC_SUCCESS)
goto err_terminate;

if (cmpl->op == RPMA_OP_RECV)
if (wc->opcode == IBV_WC_RECV)
++sd->msg_queued_nr;
else if (cmpl->op == RPMA_OP_SEND)
else if (wc->opcode == IBV_WC_SEND)
++sd->msg_sqe_available;

return 0;
Expand Down

0 comments on commit 933651e

Please sign in to comment.