diff --git a/TODO.md b/TODO.md index 85d803c..73c635d 100644 --- a/TODO.md +++ b/TODO.md @@ -28,3 +28,7 @@ # read or timeout in 3 seconds ring.prep_read(fd: fd, buffer: +'', len: 4096, timeout: 3) ``` + +- Add support for ractors + + https://news.ycombinator.com/item?id=41490988 diff --git a/ext/iou/iou.h b/ext/iou/iou.h index c2ee242..0d429e2 100644 --- a/ext/iou/iou.h +++ b/ext/iou/iou.h @@ -32,7 +32,7 @@ struct buf_ring_descriptor { #define BUFFER_RING_MAX_COUNT 10 -typedef struct IOU_t { +typedef struct IOURing_t { struct io_uring ring; unsigned int ring_initialized; unsigned int op_counter; @@ -41,7 +41,7 @@ typedef struct IOU_t { struct buf_ring_descriptor brs[BUFFER_RING_MAX_COUNT]; unsigned int br_counter; -} IOU_t; +} IOURing_t; struct sa_data { struct sockaddr addr; diff --git a/ext/iou/iou_ext.c b/ext/iou/iou_ext.c index 9aeed15..2b6dbda 100644 --- a/ext/iou/iou_ext.c +++ b/ext/iou/iou_ext.c @@ -1,9 +1,9 @@ #include "iou.h" -void Init_IOU(); +void Init_IOURing(); void Init_OpCtx(); void Init_iou_ext(void) { - Init_IOU(); + Init_IOURing(); Init_OpCtx(); } diff --git a/ext/iou/ring.c b/ext/iou/ring.c index 88dbdab..05e0fbe 100644 --- a/ext/iou/ring.c +++ b/ext/iou/ring.c @@ -18,6 +18,7 @@ VALUE SYM_fd; VALUE SYM_id; VALUE SYM_interval; VALUE SYM_len; +VALUE SYM_link; VALUE SYM_multishot; VALUE SYM_op; VALUE SYM_read; @@ -30,56 +31,56 @@ VALUE SYM_timeout; VALUE SYM_utf8; VALUE SYM_write; -static void IOU_mark(void *ptr) { - IOU_t *iou = ptr; - rb_gc_mark_movable(iou->pending_ops); +static void IOURing_mark(void *ptr) { + IOURing_t *iour = ptr; + rb_gc_mark_movable(iour->pending_ops); } -static void IOU_compact(void *ptr) { - IOU_t *iou = ptr; - iou->pending_ops = rb_gc_location(iou->pending_ops); +static void IOURing_compact(void *ptr) { + IOURing_t *iour = ptr; + iour->pending_ops = rb_gc_location(iour->pending_ops); } -void cleanup_iou(IOU_t *iou) { - if (!iou->ring_initialized) return; +void cleanup_iour(IOURing_t *iour) { + if (!iour->ring_initialized) return; - for (unsigned i = 0; i < iou->br_counter; i++) { - struct buf_ring_descriptor *desc = iou->brs + i; - io_uring_free_buf_ring(&iou->ring, desc->br, desc->buf_count, i); + for (unsigned i = 0; i < iour->br_counter; i++) { + struct buf_ring_descriptor *desc = iour->brs + i; + io_uring_free_buf_ring(&iour->ring, desc->br, desc->buf_count, i); free(desc->buf_base); } - iou->br_counter = 0; - io_uring_queue_exit(&iou->ring); - iou->ring_initialized = 0; + iour->br_counter = 0; + io_uring_queue_exit(&iour->ring); + iour->ring_initialized = 0; } static void IOU_free(void *ptr) { - cleanup_iou((IOU_t *)ptr); + cleanup_iour((IOURing_t *)ptr); } -static size_t IOU_size(const void *ptr) { - return sizeof(IOU_t); +static size_t IOURing_size(const void *ptr) { + return sizeof(IOURing_t); } -static const rb_data_type_t IOU_type = { +static const rb_data_type_t IOURing_type = { "IOURing", - {IOU_mark, IOU_free, IOU_size, IOU_compact}, + {IOURing_mark, IOU_free, IOURing_size, IOURing_compact}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED }; -static VALUE IOU_allocate(VALUE klass) { - IOU_t *iou = ALLOC(IOU_t); +static VALUE IOURing_allocate(VALUE klass) { + IOURing_t *iour = ALLOC(IOURing_t); - return TypedData_Wrap_Struct(klass, &IOU_type, iou); + return TypedData_Wrap_Struct(klass, &IOURing_type, iour); } -VALUE IOU_initialize(VALUE self) { - IOU_t *iou = RTYPEDDATA_DATA(self); +VALUE IOURing_initialize(VALUE self) { + IOURing_t *iour = RTYPEDDATA_DATA(self); - iou->ring_initialized = 0; - iou->op_counter = 0; - iou->unsubmitted_sqes = 0; - iou->br_counter = 0; + iour->ring_initialized = 0; + iour->op_counter = 0; + iour->unsubmitted_sqes = 0; + iour->br_counter = 0; RB_OBJ_WRITE(self, &iou->pending_ops, rb_hash_new()); @@ -93,7 +94,7 @@ VALUE IOU_initialize(VALUE self) { #endif while (1) { - int ret = io_uring_queue_init(prepared_limit, &iou->ring, flags); + int ret = io_uring_queue_init(prepared_limit, &iour->ring, flags); if (likely(!ret)) break; // if ENOMEM is returned, try with half as much entries @@ -102,37 +103,37 @@ VALUE IOU_initialize(VALUE self) { else rb_syserr_fail(-ret, strerror(-ret)); } - iou->ring_initialized = 1; + iour->ring_initialized = 1; return self; } -VALUE IOU_close(VALUE self) { - IOU_t *iou = RTYPEDDATA_DATA(self); - cleanup_iou(iou); +VALUE IOURing_close(VALUE self) { + IOURing_t *iour = RTYPEDDATA_DATA(self); + cleanup_iour(iour); return self; } -VALUE IOU_closed_p(VALUE self) { - IOU_t *iou = RTYPEDDATA_DATA(self); - return iou->ring_initialized ? Qfalse : Qtrue; +VALUE IOURing_closed_p(VALUE self) { + IOURing_t *iour = RTYPEDDATA_DATA(self); + return iour->ring_initialized ? Qfalse : Qtrue; } -VALUE IOU_pending_ops(VALUE self) { - IOU_t *iou = RTYPEDDATA_DATA(self); - return iou->pending_ops; +VALUE IOURing_pending_ops(VALUE self) { + IOURing_t *iour = RTYPEDDATA_DATA(self); + return iour->pending_ops; } -inline IOU_t *get_iou(VALUE self) { - IOU_t *iou = RTYPEDDATA_DATA(self); - if (!iou->ring_initialized) +inline IOURing_t *get_iou(VALUE self) { + IOURing_t *iour = RTYPEDDATA_DATA(self); + if (!iour->ring_initialized) rb_raise(rb_eRuntimeError, "IOU ring was not initialized"); - return iou; + return iour; } -static inline struct io_uring_sqe *get_sqe(IOU_t *iou) { +static inline struct io_uring_sqe *get_sqe(IOURing_t *iour) { struct io_uring_sqe *sqe; - sqe = io_uring_get_sqe(&iou->ring); + sqe = io_uring_get_sqe(&iour->ring); if (likely(sqe)) goto done; rb_raise(rb_eRuntimeError, "Failed to get SQE"); @@ -165,10 +166,10 @@ static inline void get_required_kwargs(VALUE spec, VALUE *values, int argc, ...) va_end(ptr); } -VALUE IOU_setup_buffer_ring(VALUE self, VALUE opts) { - IOU_t *iou = get_iou(self); +VALUE IOURing_setup_buffer_ring(VALUE self, VALUE opts) { + IOURing_t *iour = get_iou(self); - if (iou->br_counter == BUFFER_RING_MAX_COUNT) + if (iour->br_counter == BUFFER_RING_MAX_COUNT) rb_raise(rb_eRuntimeError, "Cannot setup more than BUFFER_RING_MAX_COUNT buffer rings"); VALUE values[2]; @@ -176,7 +177,7 @@ VALUE IOU_setup_buffer_ring(VALUE self, VALUE opts) { VALUE count = values[0]; VALUE size = values[1]; - struct buf_ring_descriptor *desc = iou->brs + iou->br_counter; + struct buf_ring_descriptor *desc = iour->brs + iour->br_counter; desc->buf_count = NUM2UINT(count); desc->buf_size = NUM2UINT(size); @@ -191,13 +192,13 @@ VALUE IOU_setup_buffer_ring(VALUE self, VALUE opts) { desc->br = (struct io_uring_buf_ring *)mapped; io_uring_buf_ring_init(desc->br); - unsigned bg_id = iou->br_counter; + unsigned bg_id = iour->br_counter; struct io_uring_buf_reg reg = { .ring_addr = (unsigned long)desc->br, .ring_entries = desc->buf_count, .bgid = bg_id }; - int ret = io_uring_register_buf_ring(&iou->ring, ®, 0); + int ret = io_uring_register_buf_ring(&iour->ring, ®, 0); if (ret) { munmap(desc->br, desc->br_size); rb_syserr_fail(-ret, strerror(-ret)); @@ -205,7 +206,7 @@ VALUE IOU_setup_buffer_ring(VALUE self, VALUE opts) { desc->buf_base = malloc(desc->buf_count * desc->buf_size); if (!desc->buf_base) { - io_uring_free_buf_ring(&iou->ring, desc->br, desc->buf_count, bg_id); + io_uring_free_buf_ring(&iour->ring, desc->br, desc->buf_count, bg_id); rb_raise(rb_eRuntimeError, "Failed to allocate buffers"); } @@ -216,11 +217,11 @@ VALUE IOU_setup_buffer_ring(VALUE self, VALUE opts) { i, mask, i); } io_uring_buf_ring_advance(desc->br, desc->buf_count); - iou->br_counter++; + iour->br_counter++; return UINT2NUM(bg_id); } -static inline VALUE setup_op_ctx(IOU_t *iou, enum op_type type, VALUE op, VALUE id, VALUE spec) { +static inline VALUE setup_op_ctx(IOURing_t *iour, enum op_type type, VALUE op, VALUE id, VALUE spec) { rb_hash_aset(spec, SYM_id, id); rb_hash_aset(spec, SYM_op, op); VALUE block_proc = rb_block_given_p() ? rb_block_proc() : Qnil; @@ -228,33 +229,40 @@ static inline VALUE setup_op_ctx(IOU_t *iou, enum op_type type, VALUE op, VALUE rb_hash_aset(spec, SYM_block, block_proc); VALUE ctx = rb_funcall(cOpCtx, rb_intern("new"), 2, spec, block_proc); OpCtx_type_set(ctx, type); - rb_hash_aset(iou->pending_ops, id, ctx); + rb_hash_aset(iour->pending_ops, id, ctx); return ctx; } -VALUE IOU_emit(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); - unsigned id_i = ++iou->op_counter; +static inline void setup_sqe(struct io_uring_sqe *sqe, int id, VALUE spec) { + sqe->user_data = id; + sqe->flags = 0; + if (spec != Qnil && RTEST(rb_hash_aref(spec, SYM_link))) + sqe->flags |= IOSQE_IO_LINK; +} + +VALUE IOURing_emit(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); - struct io_uring_sqe *sqe = get_sqe(iou); + struct io_uring_sqe *sqe = get_sqe(iour); sqe->user_data = id_i; - VALUE ctx = setup_op_ctx(iou, OP_emit, SYM_emit, id, spec); + VALUE ctx = setup_op_ctx(iour, OP_emit, SYM_emit, id, spec); if (rb_hash_aref(spec, SYM_signal) == SYM_stop) OpCtx_stop_signal_set(ctx); io_uring_prep_nop(sqe); // immediately submit - io_uring_submit(&iou->ring); - iou->unsubmitted_sqes = 0; + io_uring_submit(&iour->ring); + iour->unsubmitted_sqes = 0; return id; } -VALUE IOU_prep_accept(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); - unsigned id_i = ++iou->op_counter; +VALUE IOURing_prep_accept(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); VALUE values[1]; @@ -262,75 +270,75 @@ VALUE IOU_prep_accept(VALUE self, VALUE spec) { VALUE fd = values[0]; VALUE multishot = rb_hash_aref(spec, SYM_multishot); - struct io_uring_sqe *sqe = get_sqe(iou); - sqe->user_data = id_i; + struct io_uring_sqe *sqe = get_sqe(iour); + setup_sqe(sqe, id_i, spec); - VALUE ctx = setup_op_ctx(iou, OP_accept, SYM_accept, id, spec); + VALUE ctx = setup_op_ctx(iour, OP_accept, SYM_accept, id, spec); struct sa_data *sa = OpCtx_sa_get(ctx); if (RTEST(multishot)) io_uring_prep_multishot_accept(sqe, NUM2INT(fd), &sa->addr, &sa->len, 0); else io_uring_prep_accept(sqe, NUM2INT(fd), &sa->addr, &sa->len, 0); - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE prep_cancel_id(IOU_t *iou, unsigned op_id_i) { - unsigned id_i = ++iou->op_counter; +VALUE prep_cancel_id(IOURing_t *iour, unsigned op_id_i) { + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); - struct io_uring_sqe *sqe = get_sqe(iou); + struct io_uring_sqe *sqe = get_sqe(iour); io_uring_prep_cancel64(sqe, op_id_i, 0); sqe->user_data = id_i; - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE IOU_prep_cancel(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); +VALUE IOURing_prep_cancel(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); if (TYPE(spec) == T_FIXNUM) - return prep_cancel_id(iou, NUM2UINT(spec)); + return prep_cancel_id(iour, NUM2UINT(spec)); if (TYPE(spec) != T_HASH) rb_raise(cArgumentError, "Expected operation id or keyword arguments"); VALUE id = rb_hash_aref(spec, SYM_id); if (!NIL_P(id)) - return prep_cancel_id(iou, NUM2UINT(id)); + return prep_cancel_id(iour, NUM2UINT(id)); rb_raise(cArgumentError, "Missing operation id"); } -VALUE IOU_prep_close(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); - unsigned id_i = ++iou->op_counter; +VALUE IOURing_prep_close(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); VALUE values[1]; get_required_kwargs(spec, values, 1, SYM_fd); VALUE fd = values[0]; - struct io_uring_sqe *sqe = get_sqe(iou); - sqe->user_data = id_i; + struct io_uring_sqe *sqe = get_sqe(iour); + setup_sqe(sqe, id_i, spec); - setup_op_ctx(iou, OP_close, SYM_close, id, spec); + setup_op_ctx(iour, OP_close, SYM_close, id, spec); io_uring_prep_close(sqe, NUM2INT(fd)); - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE IOU_prep_nop(VALUE self) { - IOU_t *iou = get_iou(self); - unsigned id_i = ++iou->op_counter; +VALUE IOURing_prep_nop(VALUE self) { + IOURing_t *iour = get_iou(self); + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); - struct io_uring_sqe *sqe = get_sqe(iou); + struct io_uring_sqe *sqe = get_sqe(iour); io_uring_prep_nop(sqe); sqe->user_data = id_i; - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } @@ -355,8 +363,8 @@ static inline void adjust_read_buffer_len(VALUE buffer, int result, int ofs) { rb_str_set_len(buffer, len + (unsigned)ofs); } -VALUE prep_read_multishot(IOU_t *iou, VALUE spec) { - unsigned id_i = ++iou->op_counter; +VALUE prep_read_multishot(IOURing_t *iour, VALUE spec) { + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); VALUE values[2]; @@ -365,24 +373,24 @@ VALUE prep_read_multishot(IOU_t *iou, VALUE spec) { unsigned bg_id = NUM2UINT(values[1]); int utf8 = RTEST(rb_hash_aref(spec, SYM_utf8)); - struct io_uring_sqe *sqe = get_sqe(iou); - sqe->user_data = id_i; + struct io_uring_sqe *sqe = get_sqe(iour); + setup_sqe(sqe, id_i, spec); - VALUE ctx = setup_op_ctx(iou, OP_read, SYM_read, id, spec); + VALUE ctx = setup_op_ctx(iour, OP_read, SYM_read, id, spec); OpCtx_rd_set(ctx, Qnil, 0, bg_id, utf8); io_uring_prep_read_multishot(sqe, fd, 0, -1, bg_id); - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE IOU_prep_read(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); +VALUE IOURing_prep_read(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); if (RTEST(rb_hash_aref(spec, SYM_multishot))) - return prep_read_multishot(iou, spec); + return prep_read_multishot(iour, spec); - unsigned id_i = ++iou->op_counter; + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); VALUE values[3]; @@ -397,21 +405,21 @@ VALUE IOU_prep_read(VALUE self, VALUE spec) { int buffer_offset_i = NIL_P(buffer_offset) ? 0 : NUM2INT(buffer_offset); int utf8 = RTEST(rb_hash_aref(spec, SYM_utf8)); - struct io_uring_sqe *sqe = get_sqe(iou); - sqe->user_data = id_i; + struct io_uring_sqe *sqe = get_sqe(iour); + setup_sqe(sqe, id_i, spec); - VALUE ctx = setup_op_ctx(iou, OP_read, SYM_read, id, spec); + VALUE ctx = setup_op_ctx(iour, OP_read, SYM_read, id, spec); OpCtx_rd_set(ctx, buffer, buffer_offset_i, 0, utf8); void *ptr = prepare_read_buffer(buffer, len_i, buffer_offset_i); io_uring_prep_read(sqe, NUM2INT(fd), ptr, len_i, -1); - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE IOU_prep_timeout(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); - unsigned id_i = ++iou->op_counter; +VALUE IOURing_prep_timeout(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); VALUE values[1]; @@ -420,20 +428,20 @@ VALUE IOU_prep_timeout(VALUE self, VALUE spec) { VALUE multishot = rb_hash_aref(spec, SYM_multishot); unsigned flags = RTEST(multishot) ? IORING_TIMEOUT_MULTISHOT : 0; - struct io_uring_sqe *sqe = get_sqe(iou); - sqe->user_data = id_i; + struct io_uring_sqe *sqe = get_sqe(iour); + setup_sqe(sqe, id_i, spec); - VALUE ctx = setup_op_ctx(iou, OP_timeout, SYM_timeout, id, spec); + VALUE ctx = setup_op_ctx(iour, OP_timeout, SYM_timeout, id, spec); OpCtx_ts_set(ctx, interval); io_uring_prep_timeout(sqe, OpCtx_ts_get(ctx), 0, flags); - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE IOU_prep_write(VALUE self, VALUE spec) { - IOU_t *iou = get_iou(self); - unsigned id_i = ++iou->op_counter; +VALUE IOURing_prep_write(VALUE self, VALUE spec) { + IOURing_t *iour = get_iou(self); + unsigned id_i = ++iour->op_counter; VALUE id = UINT2NUM(id_i); VALUE values[2]; @@ -443,27 +451,27 @@ VALUE IOU_prep_write(VALUE self, VALUE spec) { VALUE len = rb_hash_aref(spec, SYM_len); unsigned nbytes = NIL_P(len) ? RSTRING_LEN(buffer) : NUM2UINT(len); - struct io_uring_sqe *sqe = get_sqe(iou); - sqe->user_data = id_i; + struct io_uring_sqe *sqe = get_sqe(iour); + setup_sqe(sqe, id_i, spec); - setup_op_ctx(iou, OP_write, SYM_write, id, spec); + setup_op_ctx(iour, OP_write, SYM_write, id, spec); io_uring_prep_write(sqe, NUM2INT(fd), RSTRING_PTR(buffer), nbytes, -1); - iou->unsubmitted_sqes++; + iour->unsubmitted_sqes++; return id; } -VALUE IOU_submit(VALUE self) { - IOU_t *iou = get_iou(self); - if (!iou->unsubmitted_sqes) goto done; +VALUE IOURing_submit(VALUE self) { + IOURing_t *iour = get_iou(self); + if (!iour->unsubmitted_sqes) + return INT2NUM(0); - iou->unsubmitted_sqes = 0; - int ret = io_uring_submit(&iou->ring); + iour->unsubmitted_sqes = 0; + int ret = io_uring_submit(&iour->ring); if (ret < 0) rb_syserr_fail(-ret, strerror(-ret)); -done: - return self; + return INT2NUM(ret); } inline VALUE make_empty_op_with_result(VALUE id, VALUE result) { @@ -475,18 +483,18 @@ inline VALUE make_empty_op_with_result(VALUE id, VALUE result) { } typedef struct { - IOU_t *iou; + IOURing_t *iour; struct io_uring_cqe *cqe; int ret; } wait_for_completion_ctx_t; void *wait_for_completion_without_gvl(void *ptr) { wait_for_completion_ctx_t *ctx = (wait_for_completion_ctx_t *)ptr; - ctx->ret = io_uring_wait_cqe(&ctx->iou->ring, &ctx->cqe); + ctx->ret = io_uring_wait_cqe(&ctx->iour->ring, &ctx->cqe); return NULL; } -static inline void update_read_buffer_from_buffer_ring(IOU_t *iou, VALUE ctx, struct io_uring_cqe *cqe) { +static inline void update_read_buffer_from_buffer_ring(IOURing_t *iour, VALUE ctx, struct io_uring_cqe *cqe) { VALUE buf = Qnil; if (cqe->res == 0) { buf = rb_str_new_literal(""); @@ -496,7 +504,7 @@ static inline void update_read_buffer_from_buffer_ring(IOU_t *iou, VALUE ctx, st struct read_data *rd = OpCtx_rd_get(ctx); unsigned buf_idx = cqe->flags >> IORING_CQE_BUFFER_SHIFT; - struct buf_ring_descriptor *desc = iou->brs + rd->bg_id; + struct buf_ring_descriptor *desc = iour->brs + rd->bg_id; char *src = desc->buf_base + desc->buf_size * buf_idx; buf = rd->utf8_encoding ? rb_utf8_str_new(src, cqe->res) : rb_str_new(src, cqe->res); @@ -512,11 +520,11 @@ static inline void update_read_buffer_from_buffer_ring(IOU_t *iou, VALUE ctx, st return; } -static inline void update_read_buffer(IOU_t *iou, VALUE ctx, struct io_uring_cqe *cqe) { +static inline void update_read_buffer(IOURing_t *iour, VALUE ctx, struct io_uring_cqe *cqe) { if (cqe->res < 0) return; if (cqe->flags & IORING_CQE_F_BUFFER) { - update_read_buffer_from_buffer_ring(iou, ctx, cqe); + update_read_buffer_from_buffer_ring(iour, ctx, cqe); return; } @@ -526,9 +534,9 @@ static inline void update_read_buffer(IOU_t *iou, VALUE ctx, struct io_uring_cqe adjust_read_buffer_len(rd->buffer, cqe->res, rd->buffer_offset); } -static inline VALUE get_cqe_ctx(IOU_t *iou, struct io_uring_cqe *cqe, int *stop_flag, VALUE *spec) { +static inline VALUE get_cqe_ctx(IOURing_t *iour, struct io_uring_cqe *cqe, int *stop_flag, VALUE *spec) { VALUE id = UINT2NUM(cqe->user_data); - VALUE ctx = rb_hash_aref(iou->pending_ops, id); + VALUE ctx = rb_hash_aref(iour->pending_ops, id); VALUE result = INT2NUM(cqe->res); if (NIL_P(ctx)) { *spec = make_empty_op_with_result(id, result); @@ -538,7 +546,7 @@ static inline VALUE get_cqe_ctx(IOU_t *iou, struct io_uring_cqe *cqe, int *stop_ // post completion work switch (OpCtx_type_get(ctx)) { case OP_read: - update_read_buffer(iou, ctx, cqe); + update_read_buffer(iour, ctx, cqe); break; case OP_emit: if (stop_flag && OpCtx_stop_signal_p(ctx)) @@ -550,7 +558,7 @@ static inline VALUE get_cqe_ctx(IOU_t *iou, struct io_uring_cqe *cqe, int *stop_ // for multishot ops, the IORING_CQE_F_MORE flag indicates more completions // will be coming, so we need to keep the spec. Otherwise, we remove it. if (!(cqe->flags & IORING_CQE_F_MORE)) - rb_hash_delete(iou->pending_ops, id); + rb_hash_delete(iour->pending_ops, id); *spec = OpCtx_spec_get(ctx); rb_hash_aset(*spec, SYM_result, result); @@ -558,11 +566,11 @@ static inline VALUE get_cqe_ctx(IOU_t *iou, struct io_uring_cqe *cqe, int *stop_ return ctx; } -VALUE IOU_wait_for_completion(VALUE self) { - IOU_t *iou = get_iou(self); +VALUE IOURing_wait_for_completion(VALUE self) { + IOURing_t *iour = get_iou(self); wait_for_completion_ctx_t cqe_ctx = { - .iou = iou + .iour = iour }; rb_thread_call_without_gvl(wait_for_completion_without_gvl, (void *)&cqe_ctx, RUBY_UBF_IO, 0); @@ -570,17 +578,17 @@ VALUE IOU_wait_for_completion(VALUE self) { if (unlikely(cqe_ctx.ret < 0)) { rb_syserr_fail(-cqe_ctx.ret, strerror(-cqe_ctx.ret)); } - io_uring_cqe_seen(&iou->ring, cqe_ctx.cqe); + io_uring_cqe_seen(&iour->ring, cqe_ctx.cqe); VALUE spec = Qnil; - get_cqe_ctx(iou, cqe_ctx.cqe, 0, &spec); + get_cqe_ctx(iour, cqe_ctx.cqe, 0, &spec); return spec; } -static inline void process_cqe(IOU_t *iou, struct io_uring_cqe *cqe, int block_given, int *stop_flag) { +static inline void process_cqe(IOURing_t *iour, struct io_uring_cqe *cqe, int block_given, int *stop_flag) { if (stop_flag) *stop_flag = 0; VALUE spec; - VALUE ctx = get_cqe_ctx(iou, cqe, stop_flag, &spec); + VALUE ctx = get_cqe_ctx(iour, cqe, stop_flag, &spec); if (stop_flag && *stop_flag) return; if (block_given) @@ -601,7 +609,7 @@ static inline bool cq_ring_needs_flush(struct io_uring *ring) { // adapted from io_uring_peek_batch_cqe in liburing/queue.c // this peeks at cqes and handles each available cqe -static inline int process_ready_cqes(IOU_t *iou, int block_given, int *stop_flag) { +static inline int process_ready_cqes(IOURing_t *iour, int block_given, int *stop_flag) { unsigned total_count = 0; iterate: @@ -609,21 +617,21 @@ static inline int process_ready_cqes(IOU_t *iou, int block_given, int *stop_flag struct io_uring_cqe *cqe; unsigned head; unsigned count = 0; - io_uring_for_each_cqe(&iou->ring, head, cqe) { + io_uring_for_each_cqe(&iour->ring, head, cqe) { ++count; if (stop_flag) *stop_flag = 0; - process_cqe(iou, cqe, block_given, stop_flag); + process_cqe(iour, cqe, block_given, stop_flag); if (stop_flag && *stop_flag) break; } - io_uring_cq_advance(&iou->ring, count); + io_uring_cq_advance(&iour->ring, count); total_count += count; if (overflow_checked) goto done; if (stop_flag && *stop_flag) goto done; - if (cq_ring_needs_flush(&iou->ring)) { - io_uring_enter(iou->ring.ring_fd, 0, 0, IORING_ENTER_GETEVENTS, NULL); + if (cq_ring_needs_flush(&iour->ring)) { + io_uring_enter(iour->ring.ring_fd, 0, 0, IORING_ENTER_GETEVENTS, NULL); overflow_checked = true; goto iterate; } @@ -632,8 +640,8 @@ static inline int process_ready_cqes(IOU_t *iou, int block_given, int *stop_flag return total_count; } -VALUE IOU_process_completions(int argc, VALUE *argv, VALUE self) { - IOU_t *iou = get_iou(self); +VALUE IOURing_process_completions(int argc, VALUE *argv, VALUE self) { + IOURing_t *iour = get_iou(self); int block_given = rb_block_given_p(); VALUE wait; @@ -642,49 +650,49 @@ VALUE IOU_process_completions(int argc, VALUE *argv, VALUE self) { unsigned count = 0; // automatically submit any unsubmitted SQEs - if (iou->unsubmitted_sqes) { - io_uring_submit(&iou->ring); - iou->unsubmitted_sqes = 0; + if (iour->unsubmitted_sqes) { + io_uring_submit(&iour->ring); + iour->unsubmitted_sqes = 0; } if (wait_i) { - wait_for_completion_ctx_t ctx = { .iou = iou }; + wait_for_completion_ctx_t ctx = { .iour = iour }; rb_thread_call_without_gvl(wait_for_completion_without_gvl, (void *)&ctx, RUBY_UBF_IO, 0); if (unlikely(ctx.ret < 0)) { rb_syserr_fail(-ctx.ret, strerror(-ctx.ret)); } ++count; - io_uring_cqe_seen(&iou->ring, ctx.cqe); - process_cqe(iou, ctx.cqe, block_given, 0); + io_uring_cqe_seen(&iour->ring, ctx.cqe); + process_cqe(iour, ctx.cqe, block_given, 0); } - count += process_ready_cqes(iou, block_given, 0); + count += process_ready_cqes(iour, block_given, 0); return UINT2NUM(count); } -VALUE IOU_process_completions_loop(VALUE self) { - IOU_t *iou = get_iou(self); +VALUE IOURing_process_completions_loop(VALUE self) { + IOURing_t *iour = get_iou(self); int block_given = rb_block_given_p(); int stop_flag = 0; - wait_for_completion_ctx_t ctx = { .iou = iou }; + wait_for_completion_ctx_t ctx = { .iour = iour }; while (1) { // automatically submit any unsubmitted SQEs - if (iou->unsubmitted_sqes) { - io_uring_submit(&iou->ring); - iou->unsubmitted_sqes = 0; + if (iour->unsubmitted_sqes) { + io_uring_submit(&iour->ring); + iour->unsubmitted_sqes = 0; } rb_thread_call_without_gvl(wait_for_completion_without_gvl, (void *)&ctx, RUBY_UBF_IO, 0); if (unlikely(ctx.ret < 0)) { rb_syserr_fail(-ctx.ret, strerror(-ctx.ret)); } - io_uring_cqe_seen(&iou->ring, ctx.cqe); - process_cqe(iou, ctx.cqe, block_given, &stop_flag); + io_uring_cqe_seen(&iour->ring, ctx.cqe); + process_cqe(iour, ctx.cqe, block_given, &stop_flag); if (stop_flag) goto done; - process_ready_cqes(iou, block_given, &stop_flag); + process_ready_cqes(iour, block_given, &stop_flag); if (stop_flag) goto done; } done: @@ -693,31 +701,31 @@ VALUE IOU_process_completions_loop(VALUE self) { #define MAKE_SYM(sym) ID2SYM(rb_intern(sym)) -void Init_IOU(void) { +void Init_IOURing(void) { mIOU = rb_define_module("IOU"); cRing = rb_define_class_under(mIOU, "Ring", rb_cObject); - rb_define_alloc_func(cRing, IOU_allocate); + rb_define_alloc_func(cRing, IOURing_allocate); - rb_define_method(cRing, "initialize", IOU_initialize, 0); - rb_define_method(cRing, "close", IOU_close, 0); - rb_define_method(cRing, "closed?", IOU_closed_p, 0); - rb_define_method(cRing, "pending_ops", IOU_pending_ops, 0); - rb_define_method(cRing, "setup_buffer_ring", IOU_setup_buffer_ring, 1); + rb_define_method(cRing, "initialize", IOURing_initialize, 0); + rb_define_method(cRing, "close", IOURing_close, 0); + rb_define_method(cRing, "closed?", IOURing_closed_p, 0); + rb_define_method(cRing, "pending_ops", IOURing_pending_ops, 0); + rb_define_method(cRing, "setup_buffer_ring", IOURing_setup_buffer_ring, 1); - rb_define_method(cRing, "emit", IOU_emit, 1); + rb_define_method(cRing, "emit", IOURing_emit, 1); - rb_define_method(cRing, "prep_accept", IOU_prep_accept, 1); - rb_define_method(cRing, "prep_cancel", IOU_prep_cancel, 1); - rb_define_method(cRing, "prep_close", IOU_prep_close, 1); - rb_define_method(cRing, "prep_nop", IOU_prep_nop, 0); - rb_define_method(cRing, "prep_read", IOU_prep_read, 1); - rb_define_method(cRing, "prep_timeout", IOU_prep_timeout, 1); - rb_define_method(cRing, "prep_write", IOU_prep_write, 1); + rb_define_method(cRing, "prep_accept", IOURing_prep_accept, 1); + rb_define_method(cRing, "prep_cancel", IOURing_prep_cancel, 1); + rb_define_method(cRing, "prep_close", IOURing_prep_close, 1); + rb_define_method(cRing, "prep_nop", IOURing_prep_nop, 0); + rb_define_method(cRing, "prep_read", IOURing_prep_read, 1); + rb_define_method(cRing, "prep_timeout", IOURing_prep_timeout, 1); + rb_define_method(cRing, "prep_write", IOURing_prep_write, 1); - rb_define_method(cRing, "submit", IOU_submit, 0); - rb_define_method(cRing, "wait_for_completion", IOU_wait_for_completion, 0); - rb_define_method(cRing, "process_completions", IOU_process_completions, -1); - rb_define_method(cRing, "process_completions_loop", IOU_process_completions_loop, 0); + rb_define_method(cRing, "submit", IOURing_submit, 0); + rb_define_method(cRing, "wait_for_completion", IOURing_wait_for_completion, 0); + rb_define_method(cRing, "process_completions", IOURing_process_completions, -1); + rb_define_method(cRing, "process_completions_loop", IOURing_process_completions_loop, 0); cArgumentError = rb_const_get(rb_cObject, rb_intern("ArgumentError")); @@ -733,6 +741,7 @@ void Init_IOU(void) { SYM_id = MAKE_SYM("id"); SYM_interval = MAKE_SYM("interval"); SYM_len = MAKE_SYM("len"); + SYM_link = MAKE_SYM("link"); SYM_multishot = MAKE_SYM("multishot"); SYM_op = MAKE_SYM("op"); SYM_read = MAKE_SYM("read"); diff --git a/test/test_iou.rb b/test/test_iou.rb index e7bf165..ef234b1 100644 --- a/test/test_iou.rb +++ b/test/test_iou.rb @@ -828,3 +828,20 @@ def test_ctx_type assert_equal :close, ring.pending_ops[id].spec[:op] end end + +class LinkTest < IOURingBaseTest + def test_linked_submissions + r, w = IO.pipe + id1 = ring.prep_write(fd: w.fileno, buffer: 'foo', link: true) + id2 = ring.prep_write(fd: w.fileno, buffer: 'bar') + + ret = ring.submit + assert_equal 2, ret + + ret = ring.process_completions(true) + assert_equal 2, ret + + w.close + assert_equal 'foobar', r.read + end +end