Skip to content

Commit

Permalink
Fix race condition in Thread#join
Browse files Browse the repository at this point in the history
  • Loading branch information
noteflakes committed Aug 5, 2023
1 parent a7fc4e9 commit 3964332
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 42 deletions.
2 changes: 0 additions & 2 deletions ext/polyphony/fiber.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

ID ID_ivar_auto_watcher;
ID ID_ivar_mailbox;
ID ID_ivar_result;
ID ID_ivar_waiting_fibers;

VALUE SYM_dead;
Expand Down Expand Up @@ -239,7 +238,6 @@ void Init_Fiber(void) {

ID_ivar_auto_watcher = rb_intern("@auto_watcher");
ID_ivar_mailbox = rb_intern("@mailbox");
ID_ivar_result = rb_intern("@result");
ID_ivar_waiting_fibers = rb_intern("@waiting_fibers");

SYM_spin = ID2SYM(rb_intern("spin"));
Expand Down
2 changes: 2 additions & 0 deletions ext/polyphony/polyphony.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ID ID_ivar_blocking_mode;
ID ID_ivar_io;
ID ID_ivar_multishot_accept_queue;
ID ID_ivar_parked;
ID ID_ivar_result;
ID ID_ivar_runnable;
ID ID_ivar_running;
ID ID_ivar_thread;
Expand Down Expand Up @@ -474,6 +475,7 @@ void Init_Polyphony(void) {
ID_ivar_io = rb_intern("@io");
ID_ivar_multishot_accept_queue = rb_intern("@multishot_accept_queue");
ID_ivar_parked = rb_intern("@parked");
ID_ivar_result = rb_intern("@result");
ID_ivar_runnable = rb_intern("@runnable");
ID_ivar_running = rb_intern("@running");
ID_ivar_thread = rb_intern("@thread");
Expand Down
4 changes: 4 additions & 0 deletions ext/polyphony/polyphony.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ extern ID ID_ivar_blocking_mode;
extern ID ID_ivar_io;
extern ID ID_ivar_multishot_accept_queue;
extern ID ID_ivar_parked;
extern ID ID_ivar_result;
extern ID ID_ivar_runnable;
extern ID ID_ivar_running;
extern ID ID_ivar_thread;
Expand Down Expand Up @@ -148,6 +149,9 @@ void Thread_schedule_fiber(VALUE thread, VALUE fiber, VALUE value);
void Thread_schedule_fiber_with_priority(VALUE thread, VALUE fiber, VALUE value);
VALUE Thread_switch_fiber(VALUE thread);

VALUE Event_signal(int argc, VALUE *argv, VALUE event);
VALUE Event_await(VALUE event);

VALUE Polyphony_snooze(VALUE self);

#endif /* POLYPHONY_H */
63 changes: 63 additions & 0 deletions ext/polyphony/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

ID ID_deactivate_all_watchers_post_fork;
ID ID_ivar_backend;
ID ID_ivar_done;
ID ID_ivar_join_wait_queue;
ID ID_ivar_main_fiber;
ID ID_ivar_ready;
ID ID_ivar_terminated;
ID ID_ivar_waiters;
ID ID_stop;

/* :nop-doc: */
Expand Down Expand Up @@ -83,20 +86,80 @@ VALUE Thread_class_backend(VALUE _self) {
return rb_ivar_get(rb_thread_current(), ID_ivar_backend);
}


VALUE Thread_done_p(VALUE self)
{
return rb_ivar_get(self, ID_ivar_done);
}

VALUE Thread_kill_safe(VALUE self)
{
static VALUE eTerminate = Qnil;
if (rb_ivar_get(self, ID_ivar_done) == Qtrue) return self;

if (eTerminate == Qnil)
eTerminate = rb_const_get(mPolyphony, rb_intern("Terminate"));

while (rb_ivar_get(self, ID_ivar_ready) != Qtrue)
rb_thread_schedule();

VALUE main_fiber = rb_ivar_get(self, ID_ivar_main_fiber);
VALUE exception = rb_funcall(eTerminate, ID_new, 0);
Thread_schedule_fiber(self, main_fiber, exception);
return self;
}

VALUE Thread_mark_as_done(VALUE self, VALUE result)
{
rb_ivar_set(self, ID_ivar_done, Qtrue);
VALUE waiters = rb_ivar_get(self, ID_ivar_waiters);
if (waiters == Qnil) return self;

int len = RARRAY_LEN(waiters);
for (int i = 0; i < len; i++) {
VALUE waiter = RARRAY_AREF(waiters, i);
Event_signal(1, &result, waiter);
}
return self;
}

VALUE Thread_await_done(VALUE self)
{
if (Thread_done_p(self) == Qtrue) return rb_ivar_get(self, ID_ivar_result);

VALUE waiter = Fiber_auto_watcher(rb_fiber_current());
VALUE waiters = rb_ivar_get(self, ID_ivar_waiters);
if (waiters == Qnil) {
waiters = rb_ary_new();
rb_ivar_set(self, ID_ivar_waiters, waiters);
}
rb_ary_push(waiters, waiter);

return Event_await(waiter);
}

void Init_Thread(void) {
rb_define_method(rb_cThread, "setup_fiber_scheduling", Thread_setup_fiber_scheduling, 0);
rb_define_method(rb_cThread, "schedule_and_wakeup", Thread_fiber_schedule_and_wakeup, 2);
rb_define_method(rb_cThread, "switch_fiber", Thread_switch_fiber, 0);
rb_define_method(rb_cThread, "fiber_unschedule", Thread_fiber_unschedule, 1);

rb_define_method(rb_cThread, "done?", Thread_done_p, 0);
rb_define_method(rb_cThread, "kill_safe", Thread_kill_safe, 0);
rb_define_method(rb_cThread, "mark_as_done", Thread_mark_as_done, 1);
rb_define_method(rb_cThread, "await_done", Thread_await_done, 0);

rb_define_singleton_method(rb_cThread, "backend", Thread_class_backend, 0);

rb_define_method(rb_cThread, "debug!", Thread_debug, 0);

ID_deactivate_all_watchers_post_fork = rb_intern("deactivate_all_watchers_post_fork");
ID_ivar_backend = rb_intern("@backend");
ID_ivar_done = rb_intern("@done");
ID_ivar_join_wait_queue = rb_intern("@join_wait_queue");
ID_ivar_main_fiber = rb_intern("@main_fiber");
ID_ivar_ready = rb_intern("@ready");
ID_ivar_terminated = rb_intern("@terminated");
ID_ivar_waiters = rb_intern("@waiters");
ID_stop = rb_intern("stop");
}
11 changes: 4 additions & 7 deletions lib/polyphony.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ def terminate_threads
threads = Thread.list - [Thread.current]
return if threads.empty?

trace '*' * 40
trace threads_left: threads

threads.each(&:kill)
threads.each(&:join)
end
Expand All @@ -123,10 +120,10 @@ def install_at_exit_handler
# processes,) we use a separate mechanism to terminate fibers in forked
# processes (see Polyphony.fork).
at_exit do
next unless @original_pid == ::Process.pid

terminate_threads
Fiber.current.shutdown_all_children
if @original_pid == ::Process.pid
terminate_threads
Fiber.current.shutdown_all_children
end
end
end
end
Expand Down
20 changes: 8 additions & 12 deletions lib/polyphony/core/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def synchronize(&block)
#
# @return [nil]
def conditional_release
@store << @token
@token = nil
@store << true
@holding_fiber = nil
end

Expand All @@ -39,7 +38,7 @@ def conditional_release
#
# @return [Fiber] current fiber
def conditional_reacquire
@token = @store.shift
@store.shift
@holding_fiber = Fiber.current
end

Expand All @@ -65,7 +64,7 @@ def lock
check_dead_holder
raise ThreadError if owned?

@token = @store.shift
@store.shift
@holding_fiber = Fiber.current
self
end
Expand All @@ -78,8 +77,7 @@ def unlock
raise ThreadError if !owned?

@holding_fiber = nil
@store << @token if @token
@token = nil
@store << true
end

# Attempts to obtain the lock and returns immediately. Returns `true` if the
Expand All @@ -89,7 +87,7 @@ def unlock
def try_lock
return false if @holding_fiber

@token = @store.shift
@store.shift
@holding_fiber = Fiber.current
true
end
Expand Down Expand Up @@ -117,23 +115,21 @@ def sleep(timeout = nil)
#
# @return [any] return value of given block.
def synchronize_not_holding
@token = @store.shift
@store.shift
begin
@holding_fiber = Fiber.current
yield
ensure
@holding_fiber = nil
@store << @token if @token
@token = nil
@store << true
end
end

def check_dead_holder
return if !@holding_fiber&.dead?

@holding_fiber = nil
@store << @token if @token
@token = nil
@store << true
end
end

Expand Down
28 changes: 7 additions & 21 deletions lib/polyphony/extensions/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class ::Thread
# @param args [Array] arguments to pass to thread block
def initialize(*args, &block)
@join_wait_queue = []
@finalization_mutex = Mutex.new
@args = args
@block = block
orig_initialize { execute }
Expand All @@ -41,16 +40,7 @@ def setup
# @param timeout [Number] timeout interval
# @return [any] thread's return value
def join(timeout = nil)
watcher = Fiber.current.auto_watcher

@finalization_mutex.synchronize do
if @terminated
@result.is_a?(Exception) ? (raise @result) : (return @result)
else
@join_wait_queue << watcher
end
end
timeout ? move_on_after(timeout) { watcher.await } : watcher.await
timeout ? move_on_after(timeout) { await_done } : await_done
end
alias_method :await, :join

Expand Down Expand Up @@ -82,11 +72,7 @@ def raise(error = nil)
# Terminates the thread.
#
# @return [Thread] self
def kill
return self if @terminated

self.raise Polyphony::Terminate
end
alias_method :kill, :kill_safe

# @!visibility private
alias_method :orig_inspect, :inspect
Expand Down Expand Up @@ -169,13 +155,13 @@ def execute
#
# @param result [any] thread's return value
def finalize(result)
# We need to make sure the fiber is not on the runqueue. This, in order to
# prevent a race condition between #finalize and #kill.
fiber_unschedule(Fiber.current)
Fiber.current.shutdown_all_children if !Fiber.current.children.empty?

@finalization_mutex.synchronize do
@terminated = true
@result = result
signal_waiters(result)
end
@result = result
mark_as_done(result)
@backend&.finalize
end

Expand Down

0 comments on commit 3964332

Please sign in to comment.