diff --git a/em-posix-spawn/Gemfile b/em-posix-spawn/Gemfile new file mode 100644 index 00000000..1aa98e4b --- /dev/null +++ b/em-posix-spawn/Gemfile @@ -0,0 +1,2 @@ +source "http://rubygems.org" +gemspec diff --git a/em-posix-spawn/Gemfile.lock b/em-posix-spawn/Gemfile.lock new file mode 100644 index 00000000..877fb6e8 --- /dev/null +++ b/em-posix-spawn/Gemfile.lock @@ -0,0 +1,25 @@ +PATH + remote: . + specs: + em-posix-spawn (0.1.10) + eventmachine + posix-spawn + +GEM + remote: http://rubygems.org/ + specs: + eventmachine (1.0.8) + minitest (5.8.3) + posix-spawn (0.3.11) + rake (10.4.2) + +PLATFORMS + ruby + +DEPENDENCIES + em-posix-spawn! + minitest + rake + +BUNDLED WITH + 1.11.2 diff --git a/em-posix-spawn/README.md b/em-posix-spawn/README.md new file mode 100644 index 00000000..9ef62f19 --- /dev/null +++ b/em-posix-spawn/README.md @@ -0,0 +1,57 @@ +# `em-posix-spawn` + +This module provides an interface to `POSIX::Spawn` for EventMachine. In +particular, it contains an EventMachine equivalent to `POSIX::Spawn::Child`. +This class encapsulates writing to the child process its stdin and reading from +both its stdout and stderr. Only when the process has exited, it triggers a +callback to notify others of its completion. Just as `POSIX::Spawn::Child`, +this module allows the caller to include limits for execution time and number +of bytes read from stdout and stderr. + +# Usage + +Please refer to the documentation of `POSIX::Spawn::Child` for the complete set +of options that can be passed when creating `Child`. + +```ruby +require "em/posix/spawn" + +EM.run { + p = EM::POSIX::Spawn::Child.new("echo something") + + p.callback { + puts "Child process echo'd: #{p.out.inspect}" + EM.stop + } + + p.errback { |err| + puts "Error running child process: #{err.inspect}" + EM.stop + } + + # Add callbacks to listen to the child process' output streams. + listeners = p.add_streams_listener { |listener, data| + # Do something with the data. + # Use listener.name to get the name of the stream. + # Use listener.closed? to check if listener is closed. + # This block is called exactly once after the listener is closed. + } + + # Optionally, wait for all the listeners to be closed. + while !listeners.all?(&:closed?) { + ... + } + + # Sends SIGTERM to the process, and SIGKILL after 5 seconds. + # Returns true if this kill was successful, false otherwise. + # The timeout is optional, default timeout is 0 (immediate SIGKILL + # after SIGTERM). + p.kill(5) +} +``` + +# Credit + +The implementation for `EM::POSIX::Spawn::Child` and its tests are based on the +implementation and tests for `POSIX::Spawn::Child`, which is Copyright (c) 2011 +by Ryan Tomayko and Aman Gupta . diff --git a/em-posix-spawn/Rakefile b/em-posix-spawn/Rakefile new file mode 100644 index 00000000..0b700501 --- /dev/null +++ b/em-posix-spawn/Rakefile @@ -0,0 +1,7 @@ +require "bundler/gem_tasks" + +require 'rake/testtask' + +Rake::TestTask.new 'test' do |t| + t.test_files = FileList['test/test_*.rb'] +end diff --git a/em-posix-spawn/em-posix-spawn.gemspec b/em-posix-spawn/em-posix-spawn.gemspec new file mode 100644 index 00000000..b32b9a88 --- /dev/null +++ b/em-posix-spawn/em-posix-spawn.gemspec @@ -0,0 +1,21 @@ +# -*- encoding: utf-8 -*- +$:.push File.expand_path("../lib", __FILE__) +require "em/posix/spawn/version" + +Gem::Specification.new do |s| + s.name = "em-posix-spawn" + s.version = EventMachine::POSIX::Spawn::VERSION + + s.authors = ["Pieter Noordhuis"] + s.email = ["pcnoordhuis@gmail.com"] + s.summary = "EventMachine-aware POSIX::Spawn::Child" + + s.files = Dir.glob("lib/**/*") + s.test_files = Dir.glob("test/**/*") + s.require_paths = ["lib"] + + s.add_runtime_dependency "eventmachine" + s.add_runtime_dependency "posix-spawn" + s.add_development_dependency "rake" + s.add_development_dependency "minitest" +end diff --git a/em-posix-spawn/lib/em-posix-spawn.rb b/em-posix-spawn/lib/em-posix-spawn.rb new file mode 100644 index 00000000..a4c4fa89 --- /dev/null +++ b/em-posix-spawn/lib/em-posix-spawn.rb @@ -0,0 +1 @@ +require "em/posix/spawn" diff --git a/em-posix-spawn/lib/em/posix/spawn.rb b/em-posix-spawn/lib/em/posix/spawn.rb new file mode 100644 index 00000000..fde46924 --- /dev/null +++ b/em-posix-spawn/lib/em/posix/spawn.rb @@ -0,0 +1,2 @@ +require "em/posix/spawn/version" +require "em/posix/spawn/child" diff --git a/em-posix-spawn/lib/em/posix/spawn/child.rb b/em-posix-spawn/lib/em/posix/spawn/child.rb new file mode 100644 index 00000000..806851a7 --- /dev/null +++ b/em-posix-spawn/lib/em/posix/spawn/child.rb @@ -0,0 +1,488 @@ +require 'eventmachine' +require 'posix/spawn' + +module EventMachine + + module POSIX + + module Spawn + + include ::POSIX::Spawn + + class Child + + include Spawn + include Deferrable + + # Spawn a new process, write all input and read all output. Supports + # the standard spawn interface as described in the POSIX::Spawn module + # documentation: + # + # new([env], command, [argv1, ...], [options]) + # + # The following options are supported in addition to the standard + # POSIX::Spawn options: + # + # :input => str Write str to the new process's standard input. + # :timeout => int Maximum number of seconds to allow the process + # to execute before aborting with a TimeoutExceeded + # exception. + # :max => total Maximum number of bytes of output to allow the + # process to generate before aborting with a + # MaximumOutputExceeded exception. + # :prepend_stdout => str Data to prepend to stdout + # :prepend_stderr => str Data to prepend to stderr + # + # Returns a new Child instance that is being executed. The object + # includes the Deferrable module, and executes the success callback + # when the process has exited, or the failure callback when the process + # was killed because of exceeding the timeout, or exceeding the maximum + # number of bytes to read from stdout and stderr combined. Once the + # success callback is triggered, this objects's out, err and status + # attributes are available. Clients can register callbacks to listen to + # updates from out and err streams of the process. + def initialize(*args) + @env, @argv, options = extract_process_spawn_arguments(*args) + @options = options.dup + @input = @options.delete(:input) + @timeout = @options.delete(:timeout) + @max = @options.delete(:max) + @discard_output = @options.delete(:discard_output) + @prepend_stdout = @options.delete(:prepend_stdout) || "" + @prepend_stderr = @options.delete(:prepend_stderr) || "" + @options.delete(:chdir) if @options[:chdir].nil? + + exec! + end + + # All data written to the child process's stdout stream as a String. + attr_reader :out + + # All data written to the child process's stderr stream as a String. + attr_reader :err + + # A Process::Status object with information on how the child exited. + attr_reader :status + + # Total command execution time (wall-clock time) + attr_reader :runtime + + attr_reader :pid + + # Determine if the process did exit with a zero exit status. + def success? + @status && @status.success? + end + + # Determine if the process has already terminated. + def terminated? + !! @status + end + + # Send the SIGTERM signal to the process. + # Then send the SIGKILL signal to the process after the + # specified timeout. + def kill(timeout = 0) + return false if terminated? || @sigkill_timer + timeout ||= 0 + request_termination + @sigkill_timer = Timer.new(timeout) { + ::Process.kill('KILL', @pid) rescue nil + } + + true + end + + # Send the SIGTERM signal to the process. + # + # Returns the Process::Status object obtained by reaping the process. + def request_termination + @sigterm_timer.cancel if @sigterm_timer + ::Process.kill('TERM', @pid) rescue nil + end + + def add_streams_listener(&listener) + [@cout.after_read(&listener), @cerr.after_read(&listener)] + end + + class SignalHandler + + def self.setup! + @instance ||= begin + new.tap do |instance| + instance.setup! + end + end + end + + def self.teardown! + if @instance + @instance.teardown! + @instance = nil + end + end + + def self.instance + @instance + end + + def initialize + @pid_callback = {} + @pid_to_process_status = {} + end + + def setup! + @pipe = ::IO.pipe + @notifier = ::EM.watch @pipe[0], SignalNotifier, self + @notifier.notify_readable = true + + @prev_handler = ::Signal.trap(:CHLD) do + begin + @pipe[1].write_nonblock("x") + rescue IO::WaitWritable + end + + @prev_handler.call unless @prev_handler == 'SYSTEM_DEFAULT' + end + + @prev_handler ||= lambda { |*_| ; } + end + + def teardown! + ::Signal.trap(:CHLD, @prev_handler) + + @notifier.detach if ::EM.reactor_running? + @pipe[0].close rescue nil + @pipe[1].close rescue nil + end + + def pid_callback(pid, &blk) + @pid_callback[pid] = blk + end + + def pid_to_process_status(pid) + @pid_to_process_status.delete(pid) + end + + def signal + # The SIGCHLD handler may not be called exactly once for every + # child. I.e., multiple children exiting concurrently may trigger + # only one SIGCHLD in the parent. Therefore, reap all processes + # that can be reaped. + while pid = ::Process.wait(-1, ::Process::WNOHANG) + @pid_to_process_status[pid] = $? + blk = @pid_callback.delete(pid) + EM.next_tick(&blk) if blk + end + rescue ::Errno::ECHILD + end + + class SignalNotifier < ::EM::Connection + def initialize(handler) + @handler = handler + end + + def notify_readable + begin + @io.read_nonblock(65536) + rescue IO::WaitReadable + end + + @handler.signal + end + end + end + + # Execute command, write input, and read output. This is called + # immediately when a new instance of this object is initialized. + def exec! + # The signal handler MUST be installed before spawning a new process + SignalHandler.setup! + + if RUBY_PLATFORM =~ /linux/i && @options.delete(:close_others) + @options[:in] = :in + @options[:out] = :out + @options[:err] = :err + + ::Dir.glob("/proc/%d/fd/*" % Process.pid).map do |file| + fd = File.basename(file).to_i + + if fd > 2 + @options[fd] = :close + end + end + end + + @pid, stdin, stdout, stderr = popen4(@env, *(@argv + [@options])) + @start = Time.now + + # Don't leak into processes spawned after us. + [stdin, stdout, stderr].each { |io| io.close_on_exec = true } + + # watch fds + @cin = EM.watch stdin, WritableStream, (@input || "").dup, "stdin" + @cout = EM.watch stdout, ReadableStream, @prepend_stdout, "stdout", @discard_output + @cerr = EM.watch stderr, ReadableStream, @prepend_stderr, "stderr", @discard_output + + # register events + @cin.notify_writable = true + @cout.notify_readable = true + @cerr.notify_readable = true + + # keep track of open fds + in_flight = [@cin, @cout, @cerr].compact + in_flight.each { |io| + # force binary encoding + io.force_encoding + + # register finalize hook + io.callback { in_flight.delete(io) } + } + + failure = nil + + # keep track of max output + max = @max + if max && max > 0 + check_buffer_size = lambda { |listener, _| + if !terminated? && !listener.closed? + if @cout.buffer.size + @cerr.buffer.size > max + failure = MaximumOutputExceeded + in_flight.each(&:close) + in_flight.clear + request_termination + end + end + } + + @cout.after_read(&check_buffer_size) + @cerr.after_read(&check_buffer_size) + end + + # request termination of process when it doesn't terminate + # in time + timeout = @timeout + if timeout && timeout > 0 + @sigterm_timer = Timer.new(timeout) { + failure = TimeoutExceeded + in_flight.each(&:close) + in_flight.clear + request_termination + } + end + + # run block when pid is reaped + SignalHandler.instance.pid_callback(@pid) { + @sigterm_timer.cancel if @sigterm_timer + @sigkill_timer.cancel if @sigkill_timer + @runtime = Time.now - @start + @status = SignalHandler.instance.pid_to_process_status(@pid) + + in_flight.each do |io| + # Trigger final read to make sure buffer is drained + if io.respond_to?(:notify_readable) + io.notify_readable + end + + io.close + end + + in_flight.clear + + @out = @cout.buffer + @err = @cerr.buffer + + if failure + set_deferred_failure failure + else + set_deferred_success + end + } + end + + class Stream < Connection + + include Deferrable + + attr_reader :buffer + + def initialize(buffer, name) + @buffer = buffer + @name = name + @closed = false + end + + def force_encoding + if @buffer.respond_to?(:force_encoding) + @io.set_encoding('BINARY', 'BINARY') + @buffer.force_encoding('BINARY') + end + end + + def close + return if closed? + + + # NB: Defer detach to the next tick, because EventMachine blows up + # when a file descriptor is attached and detached in the same + # tick. This can happen when the child process dies in the same + # tick it started, and the `#waitpid` loop in the signal + # handler picks it up afterwards. The signal handler, in turn, + # queues the child's callback to the executed via + # `EM#next_tick`. If the blocks queued by `EM#next_tick` are + # executed after that, still in the same tick, the child's file + # descriptors can be detached in the same tick they were + # attached. + EM.next_tick do + # NB: The ordering here is important. If we're using epoll, + # detach() attempts to deregister the associated fd via + # EPOLL_CTL_DEL and marks the EventableDescriptor for + # deletion upon completion of the iteration of the event + # loop. However, if the fd was closed before calling + # detach(), epoll_ctl() will sometimes return EBADFD and fail + # to remove the fd. This can lead to epoll_wait() returning + # an event whose data pointer is invalid (since it was + # deleted in a prior iteration of the event loop). + detach + @io.close rescue nil + end + + @closed = true + end + + def closed? + @closed + end + end + + class ReadableStream < Stream + + class Listener + + attr_reader :name + + def initialize(name, &block) + @name = name + @block = block + @offset = 0 + end + + # Sends the part of the buffer that has not yet been sent. + def call(buffer) + return if @block.nil? + + to_call = @block + to_call.call(self, slice_from_buffer(buffer)) + end + + # Sends the part of the buffer that has not yet been sent, + # after closing the listener. After this, the listener + # will not receive any more calls. + def close(buffer = "") + return if @block.nil? + + to_call, @block = @block, nil + to_call.call(self, slice_from_buffer(buffer)) + end + + def closed? + @block.nil? + end + + private + + def slice_from_buffer(buffer) + to_be_sent = buffer.slice(@offset..-1) + to_be_sent ||= "" + @offset = buffer.length + to_be_sent + end + end + + # Maximum buffer size for reading + BUFSIZE = (64 * 1024) + + def initialize(buffer, name, discard_output = false, &block) + super(buffer, name, &block) + @discard_output = discard_output + @after_read = [] + end + + def close + # Ensure that the listener receives the entire buffer if it + # attaches to the process only just before the stream is closed. + @after_read.each do |listener| + listener.close(@buffer) + end + + @after_read.clear + + super + end + + def after_read(&block) + if block + listener = Listener.new(@name, &block) + if @closed + # If this stream is already closed, then close the listener in + # the next Event Machine tick. This ensures that the listener + # receives the entire buffer if it attaches to the process only + # after its completion. + EM.next_tick do + listener.close(@buffer) + end + elsif !@buffer.empty? + # If this stream's buffer is non-empty, pass it to the listener + # in the next tick to avoid having to wait for the next piece + # of data to be read. + EM.next_tick do + listener.call(@buffer) + end + end + + @after_read << listener + listener + end + end + + def notify_readable + # Close and detach are decoupled, check if this notification is + # supposed to go through. + return if closed? + + begin + out = @io.read_nonblock(BUFSIZE) + @buffer << out unless @discard_output + @after_read.each { |listener| listener.call(@buffer) } + rescue Errno::EAGAIN, Errno::EINTR + rescue EOFError + close + set_deferred_success + end + end + end + + class WritableStream < Stream + + def notify_writable + # Close and detach are decoupled, check if this notification is + # supposed to go through. + return if closed? + + begin + boom = nil + size = @io.write_nonblock(@buffer) + @buffer = @buffer[size, @buffer.size] + rescue Errno::EPIPE => boom + rescue Errno::EAGAIN, Errno::EINTR + end + if boom || @buffer.size == 0 + close + set_deferred_success + end + end + end + end + end + end +end diff --git a/em-posix-spawn/lib/em/posix/spawn/version.rb b/em-posix-spawn/lib/em/posix/spawn/version.rb new file mode 100644 index 00000000..45b06a06 --- /dev/null +++ b/em-posix-spawn/lib/em/posix/spawn/version.rb @@ -0,0 +1,7 @@ +module EventMachine + module POSIX + module Spawn + VERSION = "0.1.10" + end + end +end diff --git a/em-posix-spawn/test/test_child.rb b/em-posix-spawn/test/test_child.rb new file mode 100644 index 00000000..7cc5bdc7 --- /dev/null +++ b/em-posix-spawn/test/test_child.rb @@ -0,0 +1,578 @@ +# coding: UTF-8 + +require 'minitest/autorun' +require 'em/posix/spawn/child' + +module Helpers + + def em(options = {}) + raise "no block given" unless block_given? + timeout = options[:timeout] ||= 1.0 + + ::EM.run do + quantum = 0.005 + ::EM.set_quantum(quantum * 1000) # Lowest possible timer resolution + ::EM.set_heartbeat_interval(quantum) # Timeout connections asap + ::EM.add_timer(timeout) { raise "timeout" } + yield + end + + ::EM::POSIX::Spawn::Child::SignalHandler.teardown! + end + + def done + raise "reactor not running" if !::EM.reactor_running? + + ::EM.next_tick do + # Assert something to show a spec-pass + assert true + ::EM.stop_event_loop + end + end +end + +class ChildTest < Minitest::Test + + include ::EM::POSIX::Spawn + include Helpers + + def teardown + ::EM::POSIX::Spawn::Child::SignalHandler.teardown! + end + + def test_sanity + assert_same ::EM::POSIX::Spawn::Child, Child + end + + def test_argv_string_uses_sh + em do + p = Child.new("echo via /bin/sh") + p.callback do + assert p.success? + assert_equal "via /bin/sh\n", p.out + done + end + end + end + + def test_stdout + em do + p = Child.new('echo', 'boom') + p.callback do + assert_equal "boom\n", p.out + assert_equal "", p.err + done + end + end + end + + def test_stderr + em do + p = Child.new('echo boom 1>&2') + p.callback do + assert_equal "", p.out + assert_equal "boom\n", p.err + done + end + end + end + + def test_status + em do + p = Child.new('exit 3') + p.callback do + assert !p.status.success? + assert_equal 3, p.status.exitstatus + done + end + end + end + + def test_env + em do + p = Child.new({ 'FOO' => 'BOOYAH' }, 'echo $FOO') + p.callback do + assert_equal "BOOYAH\n", p.out + done + end + end + end + + def test_chdir + em do + p = Child.new("pwd", :chdir => File.dirname(Dir.pwd)) + p.callback do + assert_equal File.dirname(Dir.pwd) + "\n", p.out + done + end + end + end + + def test_input + input = "HEY NOW\n" * 100_000 # 800K + + em do + p = Child.new('wc', '-l', :input => input) + p.callback do + assert_equal 100_000, p.out.strip.to_i + done + end + end + end + + def test_max + em do + p = Child.new('yes', :max => 100_000) + p.callback { fail } + p.errback do |err| + assert_equal MaximumOutputExceeded, err + done + end + end + end + + def test_discard_output + em do + p = Child.new('echo hi', :discard_output => true) + p.callback do + assert_equal 0, p.out.size + assert_equal 0, p.err.size + done + end + end + end + + def test_max_with_child_hierarchy + em do + p = Child.new('/bin/sh', '-c', 'yes', :max => 100_000) + p.callback { fail } + p.errback do |err| + assert_equal MaximumOutputExceeded, err + done + end + end + end + + def test_max_with_stubborn_child + em do + p = Child.new("trap '' TERM; yes", :max => 100_000) + p.callback { fail } + p.errback do |err| + assert_equal MaximumOutputExceeded, err + done + end + end + end + + def test_timeout + em do + start = Time.now + p = Child.new('sleep', '1', :timeout => 0.05) + p.callback { fail } + p.errback do |err| + assert_equal TimeoutExceeded, err + assert (Time.now-start) <= 0.2 + done + end + end + end + + def test_timeout_with_child_hierarchy + em do + p = Child.new('/bin/sh', '-c', 'sleep 1', :timeout => 0.05) + p.callback { fail } + p.errback do |err| + assert_equal TimeoutExceeded, err + done + end + end + end + + def test_lots_of_input_and_lots_of_output_at_the_same_time + input = "stuff on stdin \n" * 1_000 + command = " + while read line + do + echo stuff on stdout; + echo stuff on stderr 1>&2; + done + " + + em do + p = Child.new(command, :input => input) + p.callback do + assert_equal input.size, p.out.size + assert_equal input.size, p.err.size + assert p.success? + done + end + end + end + + def test_input_cannot_be_written_due_to_broken_pipe + input = "1" * 100_000 + + em do + p = Child.new('false', :input => input) + p.callback do + assert !p.success? + done + end + end + end + + def test_utf8_input + input = "hålø" + + em do + p = Child.new('cat', :input => input) + p.callback do + assert p.success? + done + end + end + end + + def test_many_pending_processes + EM.epoll + + em do + target = 100 + finished = 0 + + finish = lambda do |p| + finished += 1 + + if finished == target + done + end + end + + spawn = lambda do |i| + EM.next_tick do + if i < target + p = Child.new('sleep %.6f' % (rand(10_000) / 1_000_000.0)) + p.callback { finish.call(p) } + spawn.call(i+1) + end + end + end + + spawn.call(0) + end + end + + # This tries to exercise faulty EventMachine behavior. + # EventMachine crashes when a file descriptor is attached and + # detached in the same event loop tick. + def test_short_lived_process_started_from_io_callback + EM.epoll + + em do + m = Module.new do + def initialize(handlers) + @handlers = handlers + end + + def notify_readable + begin + @io.read_nonblock(1) + @handlers[:readable].call + rescue EOFError + @handlers[:eof].call + end + end + end + + r, w = IO.pipe + + s = lambda do + Child.new("echo") + end + + l = EM.watch(r, m, :readable => s, :eof => method(:done)) + l.notify_readable = true + + # Trigger listener (it reads one byte per tick) + w.write_nonblock("x" * 100) + w.close + end + end + + # Tests if expected listeners are returned by + # Child#add_stream_listeners(&block). + def test_add_listeners + em do + p = Child.new("printf ''") + + listeners = p.add_streams_listener { |*args| } + + assert listeners + assert_equal 2, listeners.size + listeners = listeners.sort_by { |x| x.name } + + assert !listeners[0].closed? + assert "stderr", listeners[0].name + + assert !listeners[1].closed? + assert "stdout", listeners[1].name + + p.callback do + assert p.success? + done + end + end + end + + def test_listener_closed_on_exceeding_max_output + em do + p = Child.new("yes", :max => 2) + + listeners = p.add_streams_listener do |listener, data| + if listener.closed? + listeners.delete(listener) + end + end + + p.errback do + assert listeners.empty? + done + end + end + end + + def test_listener_closed_on_exceeding_timeout + em do + p = Child.new("sleep 0.1", :timeout => 0.05) + + listeners = p.add_streams_listener do |listener, data| + if listener.closed? + listeners.delete(listener) + end + end + + p.errback do + assert listeners.empty? + done + end + end + end + + # Tests if a listener correctly receives stream updates after it attaches to a + # process that has already finished execution without producing any output in + # its stdout and stderr. + def test_listener_empty_streams_completed_process + em do + p = Child.new("printf ''") + p.callback do + assert p.success? + + num_calls = 0 + listeners = p.add_streams_listener do |listener, data| + assert listeners.include?(listener) + assert listener.closed? + + assert data.empty? + + listeners.delete(listener) + num_calls += 1 + # The test times out if listeners are not called required number + # of times. + done if num_calls == 2 + end + end + end + end + + # Tests if a listener correctly receives out and err stream updates after it + # attaches to a process that has already finished execution, and has produced + # some output in its stdout and stderr. + def test_listener_nonempty_streams_completed_process + em do + p = Child.new("printf test >& 1; printf test >& 2") + p.callback do + assert p.success? + + num_calls = 0 + listeners = p.add_streams_listener do |listener, data| + assert listeners.include?(listener) + assert listener.closed? + + assert_equal "test", data + + listeners.delete(listener) + num_calls += 1 + + # The test times out if listeners are not called required number + # of times. + done if num_calls == 2 + end + end + end + end + + # Tests if a listener correctly receives incremental stream updates after it + # attaches to an active process that produces large output in stdout. + def test_listener_large_stdout + output_a = "a" * 1024 * 32 + output_b = "b" * 1024 * 32 + + em do + p = Child.new("printf #{output_a}; sleep 0.1; printf #{output_b}") + received_data = '' + listeners = p.add_streams_listener do |listener, data| + assert listener + assert data + if listener.name == "stdout" + received_data << data + end + end + + p.callback do + assert p.success? + assert "#{output_a}#{output_b}", received_data + done + end + end + end + + # Tests if multiple listeners correctly receives stream updates after they + # attached to the same process. + def test_listener_nonempty_streams_active_process + em do + command = ['A', 'B', 'C'].map do |e| + 'printf %s; sleep 0.01' % e + end.join(';') + + p = Child.new(command) + + data = ['', ''] + closed = [false, false] + called = false + p.add_streams_listener do |listener_outer, data_outer| + data[0] << data_outer + if listener_outer.closed? + closed[0] = true + end + unless called + EM.next_tick do + p.add_streams_listener do |listener_inner, data_inner| + data[1] << data_inner + if listener_inner.closed? + closed[1] = true + end + end + end + + called = true + end + end + + p.callback do + assert p.success? + assert_equal "ABC", data[0] + assert_equal "ABC", data[1] + done + end + end + end + + # Tests if a listener receives the current buffer when it attaches to a process. + def test_listener_is_called_with_buffer_first + em do + command = "printf A; sleep 0.1" + command << "; printf B; sleep 0.1" + command << "; printf C; sleep 0.1" + p = Child.new(command) + + i = 0 + p.add_streams_listener do |listener_outer, data_outer| + i += 1 + + case i + when 1 + assert_equal listener_outer.name, "stdout" + assert_equal data_outer, "A" + + # Add streams listener from fresh stack to avoid mutating @after_read while iterating + EM.next_tick do + j = 0 + p.add_streams_listener do |listener_inner, data_inner| + j += 1 + + case j + when 1 + assert_equal "stdout", listener_inner.name + assert_equal "A", data_inner + when 2 + assert_equal "stdout", listener_inner.name + assert_equal "B", data_inner + when 3 + assert_equal "stdout", listener_inner.name + assert_equal "C", data_inner + done + end + end + end + end + end + end + end + + # Test if duplicate kill is ignored. + def test_duplicate_kill + em do + command = "trap ':' TERM; while :; do :; done" + p = Child.new(command) + p.callback do + done + end + + sleep 0.005 + assert p.kill(0.005) + assert !p.kill(0.005) + end + end + + # Test if kill on terminated job is ignored + def test_kill_terminated_job + em do + command = "printf ''" + p = Child.new(command) + p.callback do + assert !p.kill(1) + done + end + end + end + + # Test kill on active job. + def test_kill_active_job + em do + command = "trap ':' TERM; while :; do :; done" + p = Child.new(command) + p.callback do + done + end + + sleep 0.005 + assert p.kill(0.005) + end + end + + def test_close_others_true + r, w = IO.pipe + + em do + p = Child.new("ls /proc/$$/fd", :close_others => true) + p.callback do + fds = p.out.split.map(&:to_i) + assert !fds.empty? + + assert !fds.include?(r.fileno) + assert !fds.include?(w.fileno) + done + end + end + end +end