From 7fd34a20edad5bf617a7a14785b8efbbbbf8315f Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 7 Nov 2024 16:43:25 +0900 Subject: [PATCH] Add with-source-only feature * Mainly intended to be used by command option `--with-source-only` * We can use system-config option `with_source_only` as well * Not supported on Windows It launches Fluentd with Input plugins only. Here is the specification. * Those Input plugins emits the events to SourceOnlyBufferAgent. * The events is kept in the buf_file during the source-only-mode. * Send SIGWINCH to the supervisor to cancels the mode. * After canceled, the new agent starts to load the buffer. Signed-off-by: Daijiro Fukuda --- lib/fluent/command/fluentd.rb | 6 ++ lib/fluent/engine.rb | 24 ++++- lib/fluent/plugin/out_buffer.rb | 40 +++++++++ lib/fluent/plugin/output.rb | 2 + lib/fluent/plugin_helper/event_emitter.rb | 12 +++ lib/fluent/root_agent.rb | 104 ++++++++++++++++++---- lib/fluent/source_only_buffer_agent.rb | 61 +++++++++++++ lib/fluent/supervisor.rb | 26 ++++++ lib/fluent/system_config.rb | 5 +- 9 files changed, 256 insertions(+), 24 deletions(-) create mode 100644 lib/fluent/plugin/out_buffer.rb create mode 100644 lib/fluent/source_only_buffer_agent.rb diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index da25b74cca..c5c4684089 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -127,6 +127,12 @@ cmd_opts[:without_source] = b } +unless Fluent.windows? + op.on('--with-source-only', "Invoke a fluentd only with input plugins. The data is stored in a temporary buffer. Send SIGWINCH to cancel this mode and process the data (Not supported on Windows).", TrueClass) {|b| + cmd_opts[:with_source_only] = b + } +end + op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s| if (s == 'yaml') || (s == 'yml') cmd_opts[:config_file_type] = s.to_sym diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index afac3167ca..6f5ec09e84 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -43,6 +43,8 @@ def initialize @system_config = SystemConfig.new @supervisor_mode = false + + @root_agent_mutex = Mutex.new end MAINLOOP_SLEEP_INTERVAL = 0.3 @@ -133,7 +135,15 @@ def emit_stream(tag, es) end def flush! - @root_agent.flush! + @root_agent_mutex.synchronize do + @root_agent.flush! + end + end + + def cancel_source_only! + @root_agent_mutex.synchronize do + @root_agent.cancel_source_only! + end end def now @@ -230,7 +240,9 @@ def stop_phase(root_agent) @fluent_log_event_router.graceful_stop end $log.info 'shutting down fluentd worker', worker: worker_id - root_agent.shutdown + @root_agent_mutex.synchronize do + root_agent.shutdown + end @fluent_log_event_router.stop end @@ -241,11 +253,15 @@ def start_phase(root_agent) $log.enable_event(true) end - @root_agent.start + @root_agent_mutex.synchronize do + @root_agent.start + end end def start - @root_agent.start + @root_agent_mutex.synchronize do + @root_agent.start + end end end diff --git a/lib/fluent/plugin/out_buffer.rb b/lib/fluent/plugin/out_buffer.rb new file mode 100644 index 0000000000..6f3fe8119e --- /dev/null +++ b/lib/fluent/plugin/out_buffer.rb @@ -0,0 +1,40 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/output' + +module Fluent::Plugin + class BufferOutput < Output + Fluent::Plugin.register_output("buffer", self) + helpers :event_emitter + + config_section :buffer do + config_set_default :@type, "file" + config_set_default :chunk_keys, ["tag"] + config_set_default :flush_mode, :interval + config_set_default :flush_interval, 10 + end + + def multi_workers_ready? + true + end + + def write(chunk) + return if chunk.empty? + router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read)) + end + end +end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0aed67db95..c600e35287 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1384,6 +1384,7 @@ def retry_state(randomize) end def submit_flush_once + return unless @buffer_config.flush_thread_count > 0 # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] @@ -1406,6 +1407,7 @@ def force_flush end def submit_flush_all + return unless @buffer_config.flush_thread_count > 0 while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index ba089e485a..60d34391b7 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -26,6 +26,9 @@ module EventEmitter def router @_event_emitter_used_actually = true + + return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router + if @_event_emitter_lazy_init @router = @primary_instance.router end @@ -48,6 +51,14 @@ def event_emitter_used_actually? @_event_emitter_used_actually end + def event_emitter_set_source_only + @_event_emitter_force_source_only_router = true + end + + def event_emitter_cancel_source_only + @_event_emitter_force_source_only_router = false + end + def event_emitter_router(label_name) if label_name if label_name == "@ROOT" @@ -72,6 +83,7 @@ def initialize super @_event_emitter_used_actually = false @_event_emitter_lazy_init = false + @_event_emitter_force_source_only_router = false @router = nil end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index d10f366044..568d170615 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -22,6 +22,7 @@ require 'fluent/plugin' require 'fluent/system_config' require 'fluent/time' +require 'fluent/source_only_buffer_agent' module Fluent # @@ -54,17 +55,30 @@ def initialize(log:, system_config: SystemConfig.new) @inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil - @without_source = false - @enable_input_metrics = false + @without_source = system_config.without_source || false + @source_only_buffer_agent = nil + @enable_input_metrics = system_config.enable_input_metrics || false + + @with_source_only = false + if system_config.with_source_only + if Fluent.windows? + log.warn "with-source-only is disabled because it is not supported on Windows" + else + @with_source_only = true + end + end suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? - @without_source = system_config.without_source unless system_config.without_source.nil? - @enable_input_metrics = !!system_config.enable_input_metrics end attr_reader :inputs attr_reader :labels + def source_only_router + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + @source_only_buffer_agent.event_router + end + def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a @@ -148,6 +162,8 @@ def configure(conf) super + setup_source_only_buffer_agent if @with_source_only + # initialize elements if @without_source log.info :worker0, "'--without-source' is applied. Ignore sections" @@ -169,16 +185,33 @@ def setup_error_label(e) @error_collector = error_label.event_router end - def lifecycle(desc: false, kind_callback: nil) - kind_or_label_list = if desc - [:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten - else - [:input, :output_with_router, @labels.values, :filter, :output].flatten - end - kind_or_label_list.each do |kind| + def setup_source_only_buffer_agent(flush: false) + @source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config) + @source_only_buffer_agent.configure(flush: flush) + end + + def cleanup_source_only_buffer_agent + @source_only_buffer_agent&.cleanup + end + + def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + unless kind_or_agent_list + if @with_source_only + kind_or_agent_list = [:input, @source_only_buffer_agent] + elsif @source_only_buffer_agent + # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. + kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten + else + kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten + end + + kind_or_agent_list.reverse! if desc + end + + kind_or_agent_list.each do |kind| if kind.respond_to?(:lifecycle) - label = kind - label.lifecycle(desc: desc) do |plugin, display_kind| + agent = kind + agent.lifecycle(desc: desc) do |plugin, display_kind| yield plugin, display_kind end else @@ -198,8 +231,8 @@ def lifecycle(desc: false, kind_callback: nil) end end - def start - lifecycle(desc: true) do |i| # instance + def start(kind_or_agent_list: nil) + lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance i.start unless i.started? # Input#start sometimes emits lots of events with in_tail/`read_from_head true` case # and it causes deadlock for small buffer/queue output. To avoid such problem, @@ -231,13 +264,45 @@ def flush! flushing_threads.each{|t| t.join } end - def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins + def cancel_source_only! + unless @with_source_only + log.info "do nothing for canceling with-source-only because the current mode is not with-source-only." + return + end + + log.info "cancel with-source-only mode and start the other plugins" + start + + lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only) + + # Want to make sure that the source_only_router finishes all process before + # shutting down the agent. + # Strictly speaking, it would be necessary to have exclusive lock between + # EventRouter and the shutting down process of this agent. + # However, adding lock to EventRouter would worsen its performance, and + # the entire shutting down process does not care about it either. + # So, just sleep a few seconds here, though it may not be strictly safe. + sleep 5 + + shutdown(kind_or_agent_list: [@source_only_buffer_agent]) + @source_only_buffer_agent = nil + + # This agent can stop after flushing its all buffer, but it is not implemented for now. + log.info "starts the loading agent for with-source-only" + setup_source_only_buffer_agent(flush: true) + start(kind_or_agent_list: [@source_only_buffer_agent]) + + @with_source_only = false + end + + def shutdown(kind_or_agent_list: nil) + # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. lifecycle_safe_sequence = ->(method, checker) { - lifecycle do |instance, kind| + lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind| begin log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(method) unless instance.__send__(checker) @@ -260,7 +325,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a operation_threads.each{|t| t.join } operation_threads.clear } - lifecycle(kind_callback: callback) do |instance, kind| + lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true begin @@ -301,6 +366,8 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a lifecycle_unsafe_sequence.call(:close, :closed?) lifecycle_safe_sequence.call(:terminate, :terminated?) + + cleanup_source_only_buffer_agent unless kind_or_agent_list end def suppress_interval(interval_time) @@ -318,6 +385,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) + input.event_emitter_set_source_only if @with_source_only if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/source_only_buffer_agent.rb b/lib/fluent/source_only_buffer_agent.rb new file mode 100644 index 0000000000..dc4abb26db --- /dev/null +++ b/lib/fluent/source_only_buffer_agent.rb @@ -0,0 +1,61 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'securerandom' + +require 'fluent/agent' +require 'fluent/system_config' + +module Fluent + class SourceOnlyBufferAgent < Agent + BUFFER_DIR_NAME = SecureRandom.uuid + + def initialize(log:, system_config:) + super(log: log) + + @buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME) + end + + def configure(flush: false) + super( + Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [ + Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT'}, [ + Config::Element.new('buffer', '', { + 'path' => @buffer_path, + 'flush_at_shutdown' => flush ? 'true' : 'false', + 'flush_thread_count' => flush ? 1 : 0, + 'overflow_action' => "drop_oldest_chunk", + }, []) + ]) + ]) + ) + end + + def cleanup + unless Dir.empty?(@buffer_path) + log.warn "some buffer files remain in #{@buffer_path}. They need to be manually recovered." + + " Please consider recovering or saving the buffer files in the directory." + return + end + + begin + FileUtils.remove_dir(@buffer_path) + rescue => e + log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e + end + end + end +end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index d565abf600..20f480023e 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -189,6 +189,11 @@ def install_supervisor_signal_handlers $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end + + trap :WINCH do + $log.debug 'fluentd supervisor process got SIGWINCH' + cancel_source_only + end end if Fluent.windows? @@ -312,6 +317,10 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def cancel_source_only + send_signal_to_workers(:WINCH) + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -486,6 +495,7 @@ def self.default_options suppress_repeated_stacktrace: true, ignore_repeated_log_interval: nil, without_source: nil, + with_source_only: nil, enable_input_metrics: nil, enable_size_metrics: nil, use_v1_config: true, @@ -840,6 +850,10 @@ def install_main_process_signal_handlers trap :CONT do dump_non_windows end + + trap :WINCH do + cancel_source_only + end end end @@ -893,6 +907,18 @@ def flush_buffer end end + def cancel_source_only + Thread.new do + begin + $log.debug "fluentd main process get SIGWINCH" + $log.info "try to cancel with-source-only mode" + Fluent::Engine.cancel_source_only! + rescue Exception => e + $log.warn "failed to cancel source only", error: e + end + end + end + def reload_config Thread.new do $log.debug('worker got SIGUSR2') diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 917889018d..99e7e4742d 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -25,7 +25,7 @@ class SystemConfig :workers, :restart_worker_interval, :root_dir, :log_level, :suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump, :log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval, - :without_source, :rpc_endpoint, :enable_get_dump, :process_name, + :without_source, :with_source_only, :rpc_endpoint, :enable_get_dump, :process_name, :file_permission, :dir_permission, :counter_server, :counter_client, :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket, :metrics, :enable_input_metrics, :enable_size_metrics, :enable_jit @@ -41,7 +41,8 @@ class SystemConfig config_param :emit_error_log_interval, :time, default: nil config_param :suppress_config_dump, :bool, default: nil config_param :log_event_verbose, :bool, default: nil - config_param :without_source, :bool, default: nil + config_param :without_source, :bool, default: nil + config_param :with_source_only, :bool, default: nil config_param :rpc_endpoint, :string, default: nil config_param :enable_get_dump, :bool, default: nil config_param :process_name, :string, default: nil