Skip to content

Commit

Permalink
fluentd command: add --with-source-only
Browse files Browse the repository at this point in the history
It launches Fluentd with Input plugins only.
Here is the specification.

* Those Input plugins emits the events to SourceOnlyBufferAgent.
* The events is kept in the buf_file during the source-only-mode.
* SIGRTMIN(34) cancels the mode.
* After canceled, the new agent starts to load the buffer.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Oct 10, 2024
1 parent 5c3b71f commit 7ab49c3
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 20 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@
cmd_opts[:without_source] = b
}

op.on('--with-source-only', "invoke a fluentd only with input plugins", TrueClass) {|b|
cmd_opts[:with_source_only] = b
}

op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
if (s == 'yaml') || (s == 'yml')
cmd_opts[:config_file_type] = s.to_sym
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def flush!
@root_agent.flush!
end

def cancel_source_only!
@root_agent.cancel_source_only!
end

def now
# TODO thread update
Fluent::EventTime.now
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class FileBuffer < Fluent::Plugin::Buffer
config_param :file_permission, :string, default: nil # '0644' (Fluent::DEFAULT_FILE_PERMISSION)
config_param :dir_permission, :string, default: nil # '0755' (Fluent::DEFAULT_DIR_PERMISSION)

attr_reader :buffer_path

def initialize
super
@symlink_path = nil
Expand Down
78 changes: 78 additions & 0 deletions lib/fluent/plugin/out_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferOutput < Output
Fluent::Plugin.register_output("buffer", self)
helpers :event_emitter

desc "Try to remove the buffer directory after terminate." +
" Mainly for the --with-source-only feature."
config_param :cleanup_after_shutdown, :bool, default: false

config_section :buffer do
config_set_default :@type, "file"
config_set_default :chunk_keys, ["tag"]
config_set_default :flush_mode, :interval
config_set_default :flush_interval, 10
end

def multi_workers_ready?
true
end

def initialize
super

@buffer_path = nil
end

def configure(conf)
super

raise Fluent::ConfigError, "The buffer plugin does not be supported. It must have 'buffer_path' getter. You can use 'file' buffer." unless @buffer.respond_to?(:buffer_path)

@buffer_path = @buffer.buffer_path
end

def write(chunk)
return if chunk.empty?
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
end

def terminate
super

return unless cleanup_after_shutdown

unless Dir.empty?(@buffer_path)
if @buffer_config.flush_at_shutdown
log.warn "some buffer files remain in #{@buffer_path}, so cancel cleanup the directory." +
" Please consider saving or recovering the buffer files in the directory."
end
return
end

begin
FileUtils.remove_dir(@buffer_path)
rescue => e
log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e
end
end
end
end
1 change: 1 addition & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ def retry_state(randomize)
end

def submit_flush_once
return unless @buffer_config.flush_thread_count > 0
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,14 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_set_source_only
@_event_emitter_force_source_only_router = true
end

def event_emitter_cancel_source_only
@_event_emitter_force_source_only_router = false
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +83,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_source_only_router = false
@router = nil
end

Expand Down
90 changes: 72 additions & 18 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
require 'fluent/plugin'
require 'fluent/system_config'
require 'fluent/time'
require 'fluent/source_only_buffer_agent'

module Fluent
#
Expand Down Expand Up @@ -54,17 +55,22 @@ def initialize(log:, system_config: SystemConfig.new)
@inputs = []
@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil
@without_source = false
@enable_input_metrics = false
@without_source = system_config.without_source || false
@with_source_only = system_config.with_source_only || false
@source_only_buffer_agent = nil
@enable_input_metrics = system_config.enable_input_metrics || false

suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@enable_input_metrics = !!system_config.enable_input_metrics
end

attr_reader :inputs
attr_reader :labels

def source_only_router
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
@source_only_buffer_agent.event_router
end

def configure(conf)
used_worker_ids = []
available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a
Expand Down Expand Up @@ -148,6 +154,8 @@ def configure(conf)

super

setup_source_only_buffer_agent if @with_source_only

# initialize <source> elements
if @without_source
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
Expand All @@ -169,16 +177,29 @@ def setup_error_label(e)
@error_collector = error_label.event_router
end

def lifecycle(desc: false, kind_callback: nil)
kind_or_label_list = if desc
[:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten
else
[:input, :output_with_router, @labels.values, :filter, :output].flatten
end
kind_or_label_list.each do |kind|
def setup_source_only_buffer_agent(flush: false)
@source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config)
@source_only_buffer_agent.configure(flush: flush)
end

def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
unless kind_or_agent_list
if @with_source_only
kind_or_agent_list = [:input, @source_only_buffer_agent]
elsif @source_only_buffer_agent
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
else
kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten
end

kind_or_agent_list.reverse! if desc
end

kind_or_agent_list.each do |kind|
if kind.respond_to?(:lifecycle)
label = kind
label.lifecycle(desc: desc) do |plugin, display_kind|
agent = kind
agent.lifecycle(desc: desc) do |plugin, display_kind|
yield plugin, display_kind
end
else
Expand All @@ -198,8 +219,8 @@ def lifecycle(desc: false, kind_callback: nil)
end
end

def start
lifecycle(desc: true) do |i| # instance
def start(kind_or_agent_list: nil)
lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance
i.start unless i.started?
# Input#start sometimes emits lots of events with in_tail/`read_from_head true` case
# and it causes deadlock for small buffer/queue output. To avoid such problem,
Expand Down Expand Up @@ -231,13 +252,45 @@ def flush!
flushing_threads.each{|t| t.join }
end

def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
def cancel_source_only!
# TODO exclusive lock
if @with_source_only
log.info "cancel --with-source-only mode and start the other plugins"
@with_source_only = false
start

lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only)

# Want to make sure that the source_only_router finishes all process.
# Strictly speaking, we must make a forwarding feature for the source_only_router
# to make sure it.
# However, it would need exclusive lock for EventRouter and worsen its performance.
# So, just sleep a few seconds here.
sleep 5

shutdown(kind_or_agent_list: [@source_only_buffer_agent])
@source_only_buffer_agent = nil
end

if @source_only_buffer_agent
log.info "do nothing for canceling --with-source-only because it is already canceled, and the loading agent already exists"
return
end

# TODO This agent should stop after flushing its all buffer.
log.info "starts the loading agent for --with-source-only"
setup_source_only_buffer_agent(flush: true)
start(kind_or_agent_list: [@source_only_buffer_agent])
end

def shutdown(kind_or_agent_list: nil)
# Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
# These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
# if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
# Plugins should be separated and be in sandbox to protect data in each plugins/buffers.

lifecycle_safe_sequence = ->(method, checker) {
lifecycle do |instance, kind|
lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind|
begin
log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
Expand All @@ -260,7 +313,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a
operation_threads.each{|t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind|
t = Thread.new do
Thread.current.abort_on_exception = true
begin
Expand Down Expand Up @@ -318,6 +371,7 @@ def add_source(type, conf)
# See also 'fluentd/plugin/input.rb'
input.context_router = @event_router
input.configure(conf)
input.event_emitter_set_source_only if @with_source_only
if @enable_input_metrics
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
end
Expand Down
47 changes: 47 additions & 0 deletions lib/fluent/source_only_buffer_agent.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'securerandom'

require 'fluent/agent'
require 'fluent/system_config'

module Fluent
class SourceOnlyBufferAgent < Agent
BUFFER_DIR_NAME = SecureRandom.uuid

def initialize(log:, system_config:)
super(log: log)

@buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME)
end

def configure(flush: false)
super(
Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [
Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT', 'cleanup_after_shutdown' => 'true'}, [
Config::Element.new('buffer', '', {
'path' => @buffer_path,
'flush_at_shutdown' => flush ? 'true' : 'false',
'flush_thread_count' => flush ? 1 : 0,
'overflow_action' => "drop_oldest_chunk",
}, [])
])
])
)
end
end
end
Loading

0 comments on commit 7ab49c3

Please sign in to comment.