From 7ab49c3f7209adcdbffcd51b053e4d4cf60add0a Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 10 Oct 2024 23:33:42 +0900 Subject: [PATCH] fluentd command: add --with-source-only It launches Fluentd with Input plugins only. Here is the specification. * Those Input plugins emits the events to SourceOnlyBufferAgent. * The events is kept in the buf_file during the source-only-mode. * SIGRTMIN(34) cancels the mode. * After canceled, the new agent starts to load the buffer. Signed-off-by: Daijiro Fukuda --- lib/fluent/command/fluentd.rb | 4 + lib/fluent/engine.rb | 4 + lib/fluent/plugin/buf_file.rb | 2 + lib/fluent/plugin/out_buffer.rb | 78 ++++++++++++++++++++ lib/fluent/plugin/output.rb | 1 + lib/fluent/plugin_helper/event_emitter.rb | 12 +++ lib/fluent/root_agent.rb | 90 ++++++++++++++++++----- lib/fluent/source_only_buffer_agent.rb | 47 ++++++++++++ lib/fluent/supervisor.rb | 24 ++++++ lib/fluent/system_config.rb | 5 +- 10 files changed, 247 insertions(+), 20 deletions(-) create mode 100644 lib/fluent/plugin/out_buffer.rb create mode 100644 lib/fluent/source_only_buffer_agent.rb diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index da25b74cca..5e4cff55e9 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -127,6 +127,10 @@ cmd_opts[:without_source] = b } +op.on('--with-source-only', "invoke a fluentd only with input plugins", TrueClass) {|b| + cmd_opts[:with_source_only] = b +} + op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s| if (s == 'yaml') || (s == 'yml') cmd_opts[:config_file_type] = s.to_sym diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index afac3167ca..65db5c7a39 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -136,6 +136,10 @@ def flush! @root_agent.flush! end + def cancel_source_only! + @root_agent.cancel_source_only! + end + def now # TODO thread update Fluent::EventTime.now diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 7f11478a01..e36d2a1dad 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -42,6 +42,8 @@ class FileBuffer < Fluent::Plugin::Buffer config_param :file_permission, :string, default: nil # '0644' (Fluent::DEFAULT_FILE_PERMISSION) config_param :dir_permission, :string, default: nil # '0755' (Fluent::DEFAULT_DIR_PERMISSION) + attr_reader :buffer_path + def initialize super @symlink_path = nil diff --git a/lib/fluent/plugin/out_buffer.rb b/lib/fluent/plugin/out_buffer.rb new file mode 100644 index 0000000000..e72b718084 --- /dev/null +++ b/lib/fluent/plugin/out_buffer.rb @@ -0,0 +1,78 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/output' + +module Fluent::Plugin + class BufferOutput < Output + Fluent::Plugin.register_output("buffer", self) + helpers :event_emitter + + desc "Try to remove the buffer directory after terminate." + + " Mainly for the --with-source-only feature." + config_param :cleanup_after_shutdown, :bool, default: false + + config_section :buffer do + config_set_default :@type, "file" + config_set_default :chunk_keys, ["tag"] + config_set_default :flush_mode, :interval + config_set_default :flush_interval, 10 + end + + def multi_workers_ready? + true + end + + def initialize + super + + @buffer_path = nil + end + + def configure(conf) + super + + raise Fluent::ConfigError, "The buffer plugin does not be supported. It must have 'buffer_path' getter. You can use 'file' buffer." unless @buffer.respond_to?(:buffer_path) + + @buffer_path = @buffer.buffer_path + end + + def write(chunk) + return if chunk.empty? + router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read)) + end + + def terminate + super + + return unless cleanup_after_shutdown + + unless Dir.empty?(@buffer_path) + if @buffer_config.flush_at_shutdown + log.warn "some buffer files remain in #{@buffer_path}, so cancel cleanup the directory." + + " Please consider saving or recovering the buffer files in the directory." + end + return + end + + begin + FileUtils.remove_dir(@buffer_path) + rescue => e + log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e + end + end + end +end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 0aed67db95..8aa4fc8522 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1384,6 +1384,7 @@ def retry_state(randomize) end def submit_flush_once + return unless @buffer_config.flush_thread_count > 0 # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index ba089e485a..60d34391b7 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -26,6 +26,9 @@ module EventEmitter def router @_event_emitter_used_actually = true + + return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router + if @_event_emitter_lazy_init @router = @primary_instance.router end @@ -48,6 +51,14 @@ def event_emitter_used_actually? @_event_emitter_used_actually end + def event_emitter_set_source_only + @_event_emitter_force_source_only_router = true + end + + def event_emitter_cancel_source_only + @_event_emitter_force_source_only_router = false + end + def event_emitter_router(label_name) if label_name if label_name == "@ROOT" @@ -72,6 +83,7 @@ def initialize super @_event_emitter_used_actually = false @_event_emitter_lazy_init = false + @_event_emitter_force_source_only_router = false @router = nil end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index d10f366044..8411152da4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -22,6 +22,7 @@ require 'fluent/plugin' require 'fluent/system_config' require 'fluent/time' +require 'fluent/source_only_buffer_agent' module Fluent # @@ -54,17 +55,22 @@ def initialize(log:, system_config: SystemConfig.new) @inputs = [] @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil - @without_source = false - @enable_input_metrics = false + @without_source = system_config.without_source || false + @with_source_only = system_config.with_source_only || false + @source_only_buffer_agent = nil + @enable_input_metrics = system_config.enable_input_metrics || false suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? - @without_source = system_config.without_source unless system_config.without_source.nil? - @enable_input_metrics = !!system_config.enable_input_metrics end attr_reader :inputs attr_reader :labels + def source_only_router + raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only + @source_only_buffer_agent.event_router + end + def configure(conf) used_worker_ids = [] available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a @@ -148,6 +154,8 @@ def configure(conf) super + setup_source_only_buffer_agent if @with_source_only + # initialize elements if @without_source log.info :worker0, "'--without-source' is applied. Ignore sections" @@ -169,16 +177,29 @@ def setup_error_label(e) @error_collector = error_label.event_router end - def lifecycle(desc: false, kind_callback: nil) - kind_or_label_list = if desc - [:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten - else - [:input, :output_with_router, @labels.values, :filter, :output].flatten - end - kind_or_label_list.each do |kind| + def setup_source_only_buffer_agent(flush: false) + @source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config) + @source_only_buffer_agent.configure(flush: flush) + end + + def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil) + unless kind_or_agent_list + if @with_source_only + kind_or_agent_list = [:input, @source_only_buffer_agent] + elsif @source_only_buffer_agent + # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router. + kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten + else + kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten + end + + kind_or_agent_list.reverse! if desc + end + + kind_or_agent_list.each do |kind| if kind.respond_to?(:lifecycle) - label = kind - label.lifecycle(desc: desc) do |plugin, display_kind| + agent = kind + agent.lifecycle(desc: desc) do |plugin, display_kind| yield plugin, display_kind end else @@ -198,8 +219,8 @@ def lifecycle(desc: false, kind_callback: nil) end end - def start - lifecycle(desc: true) do |i| # instance + def start(kind_or_agent_list: nil) + lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance i.start unless i.started? # Input#start sometimes emits lots of events with in_tail/`read_from_head true` case # and it causes deadlock for small buffer/queue output. To avoid such problem, @@ -231,13 +252,45 @@ def flush! flushing_threads.each{|t| t.join } end - def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins + def cancel_source_only! + # TODO exclusive lock + if @with_source_only + log.info "cancel --with-source-only mode and start the other plugins" + @with_source_only = false + start + + lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only) + + # Want to make sure that the source_only_router finishes all process. + # Strictly speaking, we must make a forwarding feature for the source_only_router + # to make sure it. + # However, it would need exclusive lock for EventRouter and worsen its performance. + # So, just sleep a few seconds here. + sleep 5 + + shutdown(kind_or_agent_list: [@source_only_buffer_agent]) + @source_only_buffer_agent = nil + end + + if @source_only_buffer_agent + log.info "do nothing for canceling --with-source-only because it is already canceled, and the loading agent already exists" + return + end + + # TODO This agent should stop after flushing its all buffer. + log.info "starts the loading agent for --with-source-only" + setup_source_only_buffer_agent(flush: true) + start(kind_or_agent_list: [@source_only_buffer_agent]) + end + + def shutdown(kind_or_agent_list: nil) + # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. lifecycle_safe_sequence = ->(method, checker) { - lifecycle do |instance, kind| + lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind| begin log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id instance.__send__(method) unless instance.__send__(checker) @@ -260,7 +313,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a operation_threads.each{|t| t.join } operation_threads.clear } - lifecycle(kind_callback: callback) do |instance, kind| + lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true begin @@ -318,6 +371,7 @@ def add_source(type, conf) # See also 'fluentd/plugin/input.rb' input.context_router = @event_router input.configure(conf) + input.event_emitter_set_source_only if @with_source_only if @enable_input_metrics @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) }) end diff --git a/lib/fluent/source_only_buffer_agent.rb b/lib/fluent/source_only_buffer_agent.rb new file mode 100644 index 0000000000..3e31082d40 --- /dev/null +++ b/lib/fluent/source_only_buffer_agent.rb @@ -0,0 +1,47 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'securerandom' + +require 'fluent/agent' +require 'fluent/system_config' + +module Fluent + class SourceOnlyBufferAgent < Agent + BUFFER_DIR_NAME = SecureRandom.uuid + + def initialize(log:, system_config:) + super(log: log) + + @buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME) + end + + def configure(flush: false) + super( + Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [ + Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT', 'cleanup_after_shutdown' => 'true'}, [ + Config::Element.new('buffer', '', { + 'path' => @buffer_path, + 'flush_at_shutdown' => flush ? 'true' : 'false', + 'flush_thread_count' => flush ? 1 : 0, + 'overflow_action' => "drop_oldest_chunk", + }, []) + ]) + ]) + ) + end + end +end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index d565abf600..c95ab125fa 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -189,6 +189,11 @@ def install_supervisor_signal_handlers $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end + + trap 34 do + $log.debug 'fluentd supervisor process got SIGRTMIN' + cancel_source_only + end end if Fluent.windows? @@ -312,6 +317,10 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def cancel_source_only + send_signal_to_workers(34) + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -486,6 +495,7 @@ def self.default_options suppress_repeated_stacktrace: true, ignore_repeated_log_interval: nil, without_source: nil, + with_source_only: nil, enable_input_metrics: nil, enable_size_metrics: nil, use_v1_config: true, @@ -840,6 +850,10 @@ def install_main_process_signal_handlers trap :CONT do dump_non_windows end + + trap 34 do + cancel_source_only + end end end @@ -893,6 +907,16 @@ def flush_buffer end end + def cancel_source_only + Thread.new do + begin + Fluent::Engine.cancel_source_only! + rescue Exception => e + $log.warn "failed to cancel source only", error: e + end + end + end + def reload_config Thread.new do $log.debug('worker got SIGUSR2') diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 917889018d..99e7e4742d 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -25,7 +25,7 @@ class SystemConfig :workers, :restart_worker_interval, :root_dir, :log_level, :suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump, :log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval, - :without_source, :rpc_endpoint, :enable_get_dump, :process_name, + :without_source, :with_source_only, :rpc_endpoint, :enable_get_dump, :process_name, :file_permission, :dir_permission, :counter_server, :counter_client, :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket, :metrics, :enable_input_metrics, :enable_size_metrics, :enable_jit @@ -41,7 +41,8 @@ class SystemConfig config_param :emit_error_log_interval, :time, default: nil config_param :suppress_config_dump, :bool, default: nil config_param :log_event_verbose, :bool, default: nil - config_param :without_source, :bool, default: nil + config_param :without_source, :bool, default: nil + config_param :with_source_only, :bool, default: nil config_param :rpc_endpoint, :string, default: nil config_param :enable_get_dump, :bool, default: nil config_param :process_name, :string, default: nil