Skip to content

Commit

Permalink
Initial support for pread/pwrite in uring backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Oct 5, 2024
1 parent 795d143 commit 041f7db
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 7 deletions.
103 changes: 97 additions & 6 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
enum {
DEBUG = 0,
DEBUG_COMPLETION = 0,
DEBUG_IO_READ = 1,
};

enum {URING_ENTRIES = 64};
Expand Down Expand Up @@ -629,6 +630,7 @@ struct io_read_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
int descriptor;
off_t offset;
char *buffer;
size_t length;
};
Expand All @@ -642,7 +644,7 @@ io_read_submit(VALUE _arguments)
if (DEBUG) fprintf(stderr, "io_read_submit:io_uring_prep_read(waiting=%p, completion=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion, arguments->descriptor, arguments->buffer, arguments->length);

struct io_uring_sqe *sqe = io_get_sqe(selector);
io_uring_prep_read(sqe, arguments->descriptor, arguments->buffer, arguments->length, io_seekable(arguments->descriptor));
io_uring_prep_read(sqe, arguments->descriptor, arguments->buffer, arguments->length, arguments->offset);
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
io_uring_submit_now(selector);

Expand Down Expand Up @@ -672,7 +674,7 @@ io_read_ensure(VALUE _arguments)
}

static int
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length)
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
{
struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -684,6 +686,7 @@ io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, c
.selector = selector,
.waiting = &waiting,
.descriptor = descriptor,
.offset = offset,
.buffer = buffer,
.length = length
};
Expand All @@ -706,10 +709,11 @@ VALUE IO_Event_Selector_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE b
size_t length = NUM2SIZET(_length);
size_t offset = NUM2SIZET(_offset);
size_t total = 0;
off_t from = io_seekable(descriptor);

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size);
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down Expand Up @@ -743,12 +747,52 @@ static VALUE IO_Event_Selector_URing_io_read_compatible(int argc, VALUE *argv, V
return IO_Event_Selector_URing_io_read(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _from, VALUE _length, VALUE _offset) {
struct IO_Event_Selector_URing *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);

int descriptor = IO_Event_Selector_io_descriptor(io);

void *base;
size_t size;
rb_io_buffer_get_bytes_for_writing(buffer, &base, &size);

size_t length = NUM2SIZET(_length);
size_t offset = NUM2SIZET(_offset);
size_t total = 0;
off_t from = NUM2OFFT(_from);

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
offset += result;
from += result;
if ((size_t)result >= length) break;
length -= result;
} else if (result == 0) {
break;
} else if (length > 0 && IO_Event_try_again(-result)) {
IO_Event_Selector_URing_io_wait(self, fiber, io, RB_INT2NUM(IO_EVENT_READABLE));
} else {
return rb_fiber_scheduler_io_result(-1, -result);
}

maximum_size = size - offset;
}

return rb_fiber_scheduler_io_result(total, 0);
}

#pragma mark - IO#write

struct io_write_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
int descriptor;
off_t offset;
char *buffer;
size_t length;
};
Expand All @@ -762,7 +806,7 @@ io_write_submit(VALUE _argument)
if (DEBUG) fprintf(stderr, "io_write_submit:io_uring_prep_write(waiting=%p, completion=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion, arguments->descriptor, arguments->buffer, arguments->length);

struct io_uring_sqe *sqe = io_get_sqe(selector);
io_uring_prep_write(sqe, arguments->descriptor, arguments->buffer, arguments->length, io_seekable(arguments->descriptor));
io_uring_prep_write(sqe, arguments->descriptor, arguments->buffer, arguments->length, arguments->offset);
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
io_uring_submit_pending(selector);

Expand Down Expand Up @@ -792,7 +836,7 @@ io_write_ensure(VALUE _argument)
}

static int
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length)
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
{
struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -804,6 +848,7 @@ io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor,
.selector = selector,
.waiting = &waiting,
.descriptor = descriptor,
.offset = offset,
.buffer = buffer,
.length = length,
};
Expand All @@ -826,14 +871,15 @@ VALUE IO_Event_Selector_URing_io_write(VALUE self, VALUE fiber, VALUE io, VALUE
size_t length = NUM2SIZET(_length);
size_t offset = NUM2SIZET(_offset);
size_t total = 0;
off_t from = io_seekable(descriptor);

if (length > size) {
rb_raise(rb_eRuntimeError, "Length exceeds size of buffer!");
}

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size);
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down Expand Up @@ -867,6 +913,49 @@ static VALUE IO_Event_Selector_URing_io_write_compatible(int argc, VALUE *argv,
return IO_Event_Selector_URing_io_write(self, argv[0], argv[1], argv[2], argv[3], _offset);
}

VALUE IO_Event_Selector_URing_io_pwrite(VALUE self, VALUE fiber, VALUE io, VALUE buffer, VALUE _from, VALUE _length, VALUE _offset) {
struct IO_Event_Selector_URing *selector = NULL;
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);

int descriptor = IO_Event_Selector_io_descriptor(io);

const void *base;
size_t size;
rb_io_buffer_get_bytes_for_reading(buffer, &base, &size);

size_t length = NUM2SIZET(_length);
size_t offset = NUM2SIZET(_offset);
size_t total = 0;
off_t from = NUM2OFFT(_from);

if (length > size) {
rb_raise(rb_eRuntimeError, "Length exceeds size of buffer!");
}

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
offset += result;
from += result;
if ((size_t)result >= length) break;
length -= result;
} else if (result == 0) {
break;
} else if (length > 0 && IO_Event_try_again(-result)) {
IO_Event_Selector_URing_io_wait(self, fiber, io, RB_INT2NUM(IO_EVENT_WRITABLE));
} else {
return rb_fiber_scheduler_io_result(-1, -result);
}

maximum_size = size - offset;
}

return rb_fiber_scheduler_io_result(total, 0);
}

#endif

#pragma mark - IO#close
Expand Down Expand Up @@ -1118,6 +1207,8 @@ void Init_IO_Event_Selector_URing(VALUE IO_Event_Selector) {
#ifdef HAVE_RUBY_IO_BUFFER_H
rb_define_method(IO_Event_Selector_URing, "io_read", IO_Event_Selector_URing_io_read_compatible, -1);
rb_define_method(IO_Event_Selector_URing, "io_write", IO_Event_Selector_URing_io_write_compatible, -1);
rb_define_method(IO_Event_Selector_URing, "io_pread", IO_Event_Selector_URing_io_pread, 6);
rb_define_method(IO_Event_Selector_URing, "io_pwrite", IO_Event_Selector_URing_io_pwrite, 6);
#endif

rb_define_method(IO_Event_Selector_URing, "io_close", IO_Event_Selector_URing_io_close, 1);
Expand Down
34 changes: 33 additions & 1 deletion test/io/event/selector/file_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,42 @@
reader = Fiber.new do
buffer = IO::Buffer.new(64)
file.seek(0)

# The read will return 0 if the data is not written yet:
read_result = selector.io_read(Fiber.current, file, buffer, 0)
end

# The seek and the read/write are potentially racing since we use the same file descriptor. So we wait for the write to complete, and then wait for the read to complete.
writer.transfer

while write_result.nil?
selector.select(0)
end

reader.transfer

while read_result.nil?
selector.select(0)
end

expect(write_result).to be == 128
expect(read_result).to be == 64
end

it "can pread using a buffer" do
skip "io_pread is not implemented" unless selector.respond_to?(:io_pread)

write_result = nil
read_result = nil

writer = Fiber.new do
buffer = IO::Buffer.new(128)
write_result = selector.io_pwrite(Fiber.current, file, buffer, 0, 128, 0)
end

reader = Fiber.new do
buffer = IO::Buffer.new(64)
read_result = selector.io_pread(Fiber.current, file, buffer, 0, 64, 0)
end

writer.transfer

Expand Down

0 comments on commit 041f7db

Please sign in to comment.