diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index b6677c6443..263d1cfc5b 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -51,7 +51,7 @@ def initialize attr_reader :root_agent, :system_config, :supervisor_mode - def init(system_config, supervisor_mode: false) + def init(system_config, supervisor_mode: false, start_in_parallel: false) @system_config = system_config @supervisor_mode = supervisor_mode @@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false) @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil? - @root_agent = RootAgent.new(log: log, system_config: @system_config) + @root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel) self end diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 28ad1c5dd9..3d4b205c98 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -156,6 +156,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index bd2ea83e5b..9ecb6b793d 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -101,6 +101,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index c2d436115f..645adf8f08 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -65,6 +65,10 @@ def multi_workers_ready? true end + def zero_downtime_restart_ready? + true + end + def start super diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 7a6909f7a9..d465ed3a22 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -70,6 +70,10 @@ def metric_callback(es) def multi_workers_ready? false end + + def zero_downtime_restart_ready? + false + end end end end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index c66ccb489c..0a7a34b1cb 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -48,7 +48,35 @@ module Fluent class RootAgent < Agent ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label - def initialize(log:, system_config: SystemConfig.new) + class SourceOnlyMode + DISABELD = 0 + NORMAL = 1 + ONLY_ZERO_DOWNTIME_RESTART_READY = 2 + + def initialize(with_source_only, start_in_parallel) + if start_in_parallel + @mode = ONLY_ZERO_DOWNTIME_RESTART_READY + elsif with_source_only + @mode = NORMAL + else + @mode = DISABELD + end + end + + def enabled? + @mode != DISABELD + end + + def only_zero_downtime_restart_ready? + @mode == ONLY_ZERO_DOWNTIME_RESTART_READY + end + + def disable! + @mode = DISABELD + end + end + + def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false) super(log: log) @labels = {} @@ -56,7 +84,7 @@ def initialize(log:, system_config: SystemConfig.new) @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil @without_source = system_config.without_source || false - @with_source_only = system_config.with_source_only || false + @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel) @source_only_buffer_agent = nil @enable_input_metrics = system_config.enable_input_metrics || false @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def source_only_router - raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled? @source_only_buffer_agent.event_router end @@ -154,7 +182,7 @@ def configure(conf) super - setup_source_only_buffer_agent if @with_source_only + setup_source_only_buffer_agent if @source_only_mode.enabled? # initialize elements if @without_source @@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent end def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + only_zero_downtime_restart_ready = false + unless kind_or_agent_list - if @with_source_only + if @source_only_mode.enabled? kind_or_agent_list = [:input, @source_only_buffer_agent] + only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready? elsif @source_only_buffer_agent # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten @@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) end display_kind = (kind == :output_with_router ? :output : kind) list.each do |instance| + if only_zero_downtime_restart_ready + next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready? + end yield instance, display_kind end end @@ -257,7 +291,7 @@ def flush! end def cancel_source_only! - unless @with_source_only + unless @source_only_mode.enabled? log.info "do nothing for canceling with-source-only because the current mode is not with-source-only." return end @@ -285,7 +319,7 @@ def cancel_source_only! setup_source_only_buffer_agent(flush: true) start(kind_or_agent_list: [@source_only_buffer_agent]) - @with_source_only = false + @source_only_mode.disable! end def shutdown(kind_or_agent_list: nil) @@ -378,7 +412,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) - input.event_emitter_apply_source_only if @with_source_only + input.event_emitter_apply_source_only if @source_only_mode.enabled? if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index a76bd1d40a..1352112a0b 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,11 +43,16 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil + @starting_new_supervisor_with_zero_downtime = false + @new_supervisor_pid = nil + start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + @zero_downtime_restart_mutex = Mutex.new @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir - if config[:rpc_endpoint] + if config[:rpc_endpoint] and not start_in_parallel @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] run_rpc_server @@ -59,16 +64,27 @@ def before_run install_supervisor_signal_handlers end - if counter = config[:counter_server] + if counter = config[:counter_server] and not start_in_parallel run_counter_server(counter) end if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" + elsif start_in_parallel + begin + raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + $log.info "zero-downtime-restart: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + rescue => e + $log.error "zero-downtime-restart: cancel sequence because failed to take over the shared sockets", error: e + raise + end else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s end + + stop_parallel_old_supervisor_after_delay if start_in_parallel end def after_run @@ -76,7 +92,9 @@ def after_run stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir - Fluent::Supervisor.cleanup_resources + Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_with_zero_downtime + + notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_with_zero_downtime end def cleanup_lock_dir @@ -109,6 +127,13 @@ def run_rpc_server end nil } + unless Fluent.windows? + @rpc_server.mount_proc('/api/processes.zeroDowntimeRestart') { |req, res| + $log.debug "fluentd RPC got /api/processes.zeroDowntimeRestart request" + Process.kill :USR2, Process.pid + nil + } + end @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" if Fluent.windows? @@ -137,27 +162,24 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" - if Fluent.windows? - supervisor_sigusr2_handler - else - Process.kill :USR2, Process.pid - end - + graceful_reload nil } - @rpc_server.mount_proc('/api/config.getDump') { |req, res| - $log.debug "fluentd RPC got /api/config.getDump request" - $log.info "get dump in-memory config via HTTP" - res.body = supervisor_get_dump_config_handler - [nil, nil, res] - } if @enable_get_dump + if @enable_get_dump + @rpc_server.mount_proc('/api/config.getDump') { |req, res| + $log.debug "fluentd RPC got /api/config.getDump request" + $log.info "get dump in-memory config via HTTP" + res.body = supervisor_get_dump_config_handler + [nil, nil, res] + } + end @rpc_server.start end def stop_rpc_server - @rpc_server.shutdown + @rpc_server&.shutdown end def run_counter_server(counter_conf) @@ -172,6 +194,44 @@ def stop_counter_server @counter.stop end + def stop_parallel_old_supervisor_after_delay + Thread.new do + # Delay to wait the new workers to start up. + # Even if it takes a long time to start the new workers and stop the old Fluentd first, + # it is no problem because the socket buffer works, as long as the capacity is not exceeded. + sleep 10 + old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i + if old_pid + $log.info "zero-downtime-restart: stop the old supervisor" + Process.kill :TERM, old_pid + end + rescue => e + $log.warn "zero-downtime-restart: failed to stop the old supervisor." + + " If the old one does not exist, please send SIGWINCH to this new process to start to work fully." + + " If it exists, something went wrong. Please kill the old one manually.", + error: e + end + end + + def notify_new_supervisor_that_old_one_has_stopped + if config[:pid_path] + new_pid = File.read(config[:pid_path]).to_i + else + raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid + new_pid = @new_supervisor_pid + end + + $log.info "zero-downtime-restart: notify the new supervisor (pid: #{new_pid}) that old one has stopped" + Process.kill :WINCH, new_pid + rescue => e + $log.error( + "zero-downtime-restart: failed to notify the new supervisor." + + " Please send SIGWINCH to the new supervisor process manually" + + " if it does not start to work fully.", + error: e + ) + end + def install_supervisor_signal_handlers return if Fluent.windows? @@ -187,7 +247,11 @@ def install_supervisor_signal_handlers trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + if Fluent.windows? + graceful_reload + else + zero_downtime_restart + end end trap :WINCH do @@ -259,7 +323,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -289,7 +353,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -317,7 +381,76 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def zero_downtime_restart + Thread.new do + @zero_downtime_restart_mutex.synchronize do + $log.info "start zero-downtime-restart sequence" + + if @starting_new_supervisor_with_zero_downtime + $log.warn "zero-downtime-restart: canceled because it is already starting" + Thread.exit + end + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + $log.warn "zero-downtime-restart: canceled because the previous sequence is still running" + Thread.exit + end + + @starting_new_supervisor_with_zero_downtime = true + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = { + "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN, + "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}", + } + pid = Process.spawn(env_to_add, commands.join(" ")) + @new_supervisor_pid = pid unless config[:daemonize] + + if config[:daemonize] + Thread.new(pid) do |pid| + _, status = Process.wait2(pid) + # check if `ServerEngine::Daemon#daemonize_with_double_fork` succeeded or not + unless status.success? + @starting_new_supervisor_with_zero_downtime = false + $log.error "zero-downtime-restart: failed because new supervisor exits unexpectedly" + end + end + else + Thread.new(pid) do |pid| + _, status = Process.wait2(pid) + @starting_new_supervisor_with_zero_downtime = false + $log.error "zero-downtime-restart: failed because new supervisor exits unexpectedly", status: status + end + end + end + rescue => e + $log.error "zero-downtime-restart: failed", error: e + @starting_new_supervisor_with_zero_downtime = false + end + end + def cancel_source_only + if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + if config[:rpc_endpoint] + begin + @rpc_endpoint = config[:rpc_endpoint] + @enable_get_dump = config[:enable_get_dump] + run_rpc_server + rescue => e + $log.error "failed to start RPC server", error: e + end + end + + if counter = config[:counter_server] + begin + run_counter_server(counter) + rescue => e + $log.error "failed to start counter server", error: e + end + end + + $log.info "zero-downtime-restart: done all sequences, now new processes start to work fully" + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end + send_signal_to_workers(:WINCH) end @@ -510,12 +643,11 @@ def self.default_options } end - def self.cleanup_resources - unless Fluent.windows? - if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') - FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) - end - end + def self.cleanup_socketmanager_path + return if Fluent.windows? + return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + + FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end def initialize(cl_opt) @@ -583,7 +715,7 @@ def run_supervisor(dry_run: false) begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config, supervisor_mode: true) + Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e @@ -632,10 +764,10 @@ def run_worker File.umask(@chumask.to_i(8)) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) - Fluent::Engine.init(@system_config) + Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")) Fluent::Engine.run_configure(@conf) Fluent::Engine.run - self.class.cleanup_resources if @standalone_worker + self.class.cleanup_socketmanager_path if @standalone_worker exit 0 end end @@ -853,6 +985,10 @@ def install_main_process_signal_handlers end trap :USR2 do + # Leave the old GracefulReload feature, just in case. + # We can send SIGUSR2 to the worker process to use this old GracefulReload feature. + # (Note: Normally, we can send SIGUSR2 to the supervisor process to use + # zero-downtime-restart feature as GracefulReload on non-Windows.) reload_config end diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb index 0b40b93c2a..9ff952883a 100644 --- a/test/command/test_fluentd.rb +++ b/test/command/test_fluentd.rb @@ -1350,4 +1350,188 @@ def multi_workers_ready?; true; end patterns_not_match: ["[error]"]) end end + + sub_test_case "zero_downtime_restart" do + setup do + omit "Not supported on Windows" if Fluent.windows? + end + + def conf(udp_port, tcp_port, syslog_port) + <<~CONF + + rpc_endpoint localhost:24444 + + + @type monitor_agent + + + @type udp + tag test.udp + port #{udp_port} + + @type none + + + + @type tcp + tag test.tcp + port #{tcp_port} + + @type none + + + + @type syslog + tag test.syslog + port #{syslog_port} + + + @type record_transformer + + foo foo + + + + @type stdout + + CONF + end + + def run_fluentd(config) + conf_path = create_conf_file("test.conf", config) + assert File.exist?(conf_path) + cmdline = create_cmdline(conf_path) + + stdio_buf = "" + execute_command(cmdline) do |pid, stdout| + begin + waiting(60) do + while true + readables, _, _ = IO.select([stdout], nil, nil, 1) + next unless readables + break if readables.first.eof? + + buf = eager_read(readables.first) + stdio_buf << buf + logs = buf.split("\n") + + yield logs + + break if buf.include? "finish test" + end + end + ensure + supervisor_pids = stdio_buf.scan(SUPERVISOR_PID_PATTERN) + @supervisor_pid = supervisor_pids.last.first.to_i if supervisor_pids.size >= 2 + stdio_buf.scan(WORKER_PID_PATTERN) do |worker_pid| + @worker_pids << worker_pid.first.to_i + end + end + end + end + + def send_udp(port, count:, interval_sec:) + count.times do |i| + s = UDPSocket.new + s.send("udp-#{i}", 0, "localhost", port) + s.close + sleep interval_sec + end + end + + def send_tcp(port, count:, interval_sec:) + count.times do |i| + s = TCPSocket.new("localhost", port) + s.write("tcp-#{i}\n") + s.close + sleep interval_sec + end + end + + def send_syslog(port, count:, interval_sec:) + count.times do |i| + s = UDPSocket.new + s.send("<6>Sep 10 00:00:00 localhost test: syslog-#{i}", 0, "localhost", port) + s.close + sleep interval_sec + end + end + + def send_end(port) + s = TCPSocket.new("localhost", port) + s.write("finish test\n") + s.close + end + + test "should restart with zero downtime (no data loss)" do + print_logs = false # set true if you need to print logs for debug + + udp_port, syslog_port = unused_port(2, protocol: :udp) + tcp_port = unused_port(protocol: :tcp) + + client_threads = [] + end_thread = nil + records_by_type = { + "udp" => [], + "tcp" => [], + "syslog" => [], + } + + phase = "startup" + run_fluentd(conf(udp_port, tcp_port, syslog_port)) do |logs| + logs.map do |log| + if /"message":"(udp|tcp|syslog)-(\d+)","foo":"foo"}/ =~ log + [$1, $2.to_i] + else + p log if print_logs + nil + end + end.compact.each do |type, num| + assert_true records_by_type.key?(type) + records_by_type[type].append(num) + end + + if phase == "startup" and logs.any? { |log| log.include?("fluentd worker is now running worker") } + phase = "zero-downtime-restart" + + client_threads << Thread.new do + send_udp(udp_port, count: 500, interval_sec: 0.01) + end + client_threads << Thread.new do + send_tcp(tcp_port, count: 500, interval_sec: 0.01) + end + client_threads << Thread.new do + send_syslog(syslog_port, count: 500, interval_sec: 0.01) + end + + sleep 1 + response = Net::HTTP.get(URI.parse("http://localhost:24444/api/processes.zeroDowntimeRestart")) + assert_equal '{"ok":true}', response + elsif phase == "zero-downtime-restart" and logs.any? { |log| log.include?("zero-downtime-restart: done all sequences") } + phase = "flush" + response = Net::HTTP.get(URI.parse("http://localhost:24444/api/plugins.flushBuffers")) + assert_equal '{"ok":true}', response + elsif phase == "flush" + phase = "done" + end_thread = Thread.new do + client_threads.each(&:join) + sleep 5 # make sure to flush each chunk (1s flush interval for 1chunk) + send_end(tcp_port) + end + end + end + + assert_equal( + [(0..499).to_a, (0..499).to_a, (0..499).to_a], + [ + records_by_type["udp"].sort, + records_by_type["tcp"].sort, + records_by_type["syslog"].sort, + ] + ) + ensure + client_threads.each(&:kill) + end_thread&.kill + end + end end diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index ccd6a1469c..a973cfba73 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -106,6 +106,14 @@ def initialize @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new end + def multi_workers_ready? + true + end + + def zero_downtime_restart_ready? + true + end + def start super @started = true diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index d10174d0f3..8b44b68699 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -1045,6 +1045,122 @@ def setup sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 0 } end + waiting(3) do + # Wait the last data output + sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 19 } + end + + # all data should be outputted + assert { @root_agent.outputs[0].events["test.event"].size == 20 } + ensure + @root_agent.shutdown + end + end + + sub_test_case 'start_in_parallel' do + def conf + <<~EOC + + @type test_in_gen + @id test_in_gen + num 20 + interval_sec 0.1 + async + + + + @type test_in + @id test_in + + + + @type record_transformer + @id record_transformer + + foo foo + + + + + @type test_out + @id test_out + + EOC + end + + def setup + omit "Not supported on Windows" if Fluent.windows? + system_config = SystemConfig.new( + Config::Element.new('system', '', {}, [ + Config::Element.new('source_only_buffer', '', { + 'flush_interval' => 1, + }, []), + ]) + ) + @root_agent = RootAgent.new(log: $log, system_config: system_config, start_in_parallel: true) + stub(Engine).root_agent { @root_agent } + stub(Engine).system_config { system_config } + @root_agent.configure(Config.parse(conf, "(test)", "(test_dir)")) + end + + test 'only input plugins should start' do + @root_agent.start + + assert_equal( + { + "input started?" => [true, false], + "filter started?" => [false], + "output started?" => [false], + }, + { + "input started?" => @root_agent.inputs.map { |plugin| plugin.started? }, + "filter started?" => @root_agent.filters.map { |plugin| plugin.started? }, + "output started?" => @root_agent.outputs.map { |plugin| plugin.started? }, + } + ) + ensure + @root_agent.shutdown + # Buffer files remain because not cancelling source-only. + # As a test, they should be clean-up-ed. + buf_dir = @root_agent.instance_variable_get(:@source_only_buffer_agent).instance_variable_get(:@base_buffer_dir) + FileUtils.remove_dir(buf_dir) + end + + test '#cancel_source_only! should start all plugins' do + @root_agent.start + @root_agent.cancel_source_only! + + assert_equal( + { + "input started?" => [true, true], + "filter started?" => [true], + "output started?" => [true], + }, + { + "input started?" => @root_agent.inputs.map { |plugin| plugin.started? }, + "filter started?" => @root_agent.filters.map { |plugin| plugin.started? }, + "output started?" => @root_agent.outputs.map { |plugin| plugin.started? }, + } + ) + ensure + @root_agent.shutdown + end + + test 'buffer should be loaded after #cancel_source_only!' do + @root_agent.start + sleep 1 + @root_agent.cancel_source_only! + + waiting(3) do + # Wait buffer loaded after source-only cancelled + sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 0 } + end + + waiting(3) do + # Wait the last data output + sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 19 } + end + # all data should be outputted assert { @root_agent.outputs[0].events["test.event"].size == 20 } ensure diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 6d3363630e..28c20b2e39 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -19,7 +19,7 @@ class SupervisorTest < ::Test::Unit::TestCase class DummyServer include Fluent::ServerModule - attr_accessor :rpc_endpoint, :enable_get_dump + attr_accessor :rpc_endpoint, :enable_get_dump, :socket_manager_server def config {} end @@ -891,6 +891,127 @@ def server.config end end + sub_test_case "zero_downtime_restart" do + setup do + omit "Not supported on Windows" if Fluent.windows? + end + + data( + # When daemonize, exit-status is important. The new spawned process does double-fork and exits soon. + "daemonize and succeeded double-fork of new process" => [true, true, 0, false], + "daemonize and failed double-fork of new process" => [true, false, 0, true], + # When no daemon, whether the new spawned process is alive is important, not exit-status. + "no daemon and new process alive" => [false, false, 3, false], + "no daemon and new process dead" => [false, false, 0, true], + ) + def test_zero_downtime_restart((daemonize, wait_success, wait_sleep, restart_canceled)) + # == Arrange == + env_spawn = {} + pid_wait = nil + + server = DummyServer.new + + stub(server).config do + { + daemonize: daemonize, + pid_path: "test-pid-file", + } + end + process_stub = stub(Process) + process_stub.spawn do |env, commands| + env_spawn = env + -1 + end + process_stub.wait2 do |pid| + pid_wait = pid + sleep wait_sleep + if wait_success + status = Class.new{def success?; true; end}.new + else + status = Class.new{def success?; false; end}.new + end + [pid, status] + end + stub(File).read("test-pid-file") { -1 } + + # mock to check notify_new_supervisor_that_old_one_has_stopped sends SIGWINCH + if restart_canceled + mock(Process).kill(:WINCH, -1).never + else + mock(Process).kill(:WINCH, -1) + end + + # == Act and Assert == + server.before_run + server.zero_downtime_restart.join + sleep 1 # To wait a sub thread for waitpid in zero_downtime_restart + server.after_run + + assert_equal( + [ + !restart_canceled, + true, + Process.pid, + -1, + ], + [ + server.instance_variable_get(:@starting_new_supervisor_with_zero_downtime), + env_spawn.key?("SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN"), + env_spawn["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"].to_i, + pid_wait, + ] + ) + ensure + Fluent::Supervisor.cleanup_socketmanager_path + ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') + end + + def test_share_sockets + server = DummyServer.new + server.before_run + path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + + client = ServerEngine::SocketManager::Client.new(path) + udp_port = unused_port(protocol: :udp) + tcp_port = unused_port(protocol: :tcp) + client.listen_udp("localhost", udp_port) + client.listen_tcp("localhost", tcp_port) + + ENV['FLUENT_RUNNING_IN_PARALLEL_WITH_OLD'] = "" + new_server = DummyServer.new + stub(new_server).stop_parallel_old_supervisor_after_delay + new_server.before_run + + assert_equal( + [[udp_port], [tcp_port]], + [ + new_server.socket_manager_server.udp_sockets.values.map { |v| v.addr[1] }, + new_server.socket_manager_server.tcp_sockets.values.map { |v| v.addr[1] }, + ] + ) + ensure + server&.after_run + new_server&.after_run + ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end + + def test_stop_parallel_old_supervisor_after_delay + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = "" + ENV['FLUENT_RUNNING_IN_PARALLEL_WITH_OLD'] = "-1" + stub(ServerEngine::SocketManager::Server).share_sockets_with_another_server + mock(Process).kill(:TERM, -1) + + server = DummyServer.new + server.before_run + sleep 12 # Can't we skip the delay for this test? + ensure + server&.after_run + ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH') + ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD") + end + end + def create_debug_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG