Skip to content

Commit

Permalink
Add with-source-only feature
Browse files Browse the repository at this point in the history
* Mainly intended to be used by command option `--with-source-only`
* We can use system-config option `with_source_only` as well
* Not supported on Windows

It launches Fluentd with Input plugins only.
Here is the specification.

* Those Input plugins emits the events to SourceOnlyBufferAgent.
* The events is kept in the buf_file during the source-only-mode.
* Send SIGWINCH to the supervisor to cancels the mode.
* After canceled, the new agent starts to load the buffer.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Nov 7, 2024
1 parent 5c3b71f commit 7fd34a2
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 24 deletions.
6 changes: 6 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
cmd_opts[:without_source] = b
}

unless Fluent.windows?
op.on('--with-source-only', "Invoke a fluentd only with input plugins. The data is stored in a temporary buffer. Send SIGWINCH to cancel this mode and process the data (Not supported on Windows).", TrueClass) {|b|
cmd_opts[:with_source_only] = b
}
end

op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
if (s == 'yaml') || (s == 'yml')
cmd_opts[:config_file_type] = s.to_sym
Expand Down
24 changes: 20 additions & 4 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def initialize
@system_config = SystemConfig.new

@supervisor_mode = false

@root_agent_mutex = Mutex.new
end

MAINLOOP_SLEEP_INTERVAL = 0.3
Expand Down Expand Up @@ -133,7 +135,15 @@ def emit_stream(tag, es)
end

def flush!
@root_agent.flush!
@root_agent_mutex.synchronize do
@root_agent.flush!
end
end

def cancel_source_only!
@root_agent_mutex.synchronize do
@root_agent.cancel_source_only!
end
end

def now
Expand Down Expand Up @@ -230,7 +240,9 @@ def stop_phase(root_agent)
@fluent_log_event_router.graceful_stop
end
$log.info 'shutting down fluentd worker', worker: worker_id
root_agent.shutdown
@root_agent_mutex.synchronize do
root_agent.shutdown
end

@fluent_log_event_router.stop
end
Expand All @@ -241,11 +253,15 @@ def start_phase(root_agent)
$log.enable_event(true)
end

@root_agent.start
@root_agent_mutex.synchronize do
@root_agent.start
end
end

def start
@root_agent.start
@root_agent_mutex.synchronize do
@root_agent.start
end
end
end

Expand Down
40 changes: 40 additions & 0 deletions lib/fluent/plugin/out_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferOutput < Output
Fluent::Plugin.register_output("buffer", self)
helpers :event_emitter

config_section :buffer do
config_set_default :@type, "file"
config_set_default :chunk_keys, ["tag"]
config_set_default :flush_mode, :interval
config_set_default :flush_interval, 10
end

def multi_workers_ready?
true
end

def write(chunk)
return if chunk.empty?
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ def retry_state(randomize)
end

def submit_flush_once
return unless @buffer_config.flush_thread_count > 0
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
Expand All @@ -1406,6 +1407,7 @@ def force_flush
end

def submit_flush_all
return unless @buffer_config.flush_thread_count > 0
while !@retry && @buffer.queued?
submit_flush_once
sleep @buffer_config.flush_thread_burst_interval
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,14 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_set_source_only
@_event_emitter_force_source_only_router = true
end

def event_emitter_cancel_source_only
@_event_emitter_force_source_only_router = false
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +83,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_source_only_router = false
@router = nil
end

Expand Down
104 changes: 86 additions & 18 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
require 'fluent/plugin'
require 'fluent/system_config'
require 'fluent/time'
require 'fluent/source_only_buffer_agent'

module Fluent
#
Expand Down Expand Up @@ -54,17 +55,30 @@ def initialize(log:, system_config: SystemConfig.new)
@inputs = []
@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil
@without_source = false
@enable_input_metrics = false
@without_source = system_config.without_source || false
@source_only_buffer_agent = nil
@enable_input_metrics = system_config.enable_input_metrics || false

@with_source_only = false
if system_config.with_source_only
if Fluent.windows?
log.warn "with-source-only is disabled because it is not supported on Windows"
else
@with_source_only = true
end
end

suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@enable_input_metrics = !!system_config.enable_input_metrics
end

attr_reader :inputs
attr_reader :labels

def source_only_router
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
@source_only_buffer_agent.event_router
end

def configure(conf)
used_worker_ids = []
available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a
Expand Down Expand Up @@ -148,6 +162,8 @@ def configure(conf)

super

setup_source_only_buffer_agent if @with_source_only

# initialize <source> elements
if @without_source
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
Expand All @@ -169,16 +185,33 @@ def setup_error_label(e)
@error_collector = error_label.event_router
end

def lifecycle(desc: false, kind_callback: nil)
kind_or_label_list = if desc
[:output, :filter, @labels.values.reverse, :output_with_router, :input].flatten
else
[:input, :output_with_router, @labels.values, :filter, :output].flatten
end
kind_or_label_list.each do |kind|
def setup_source_only_buffer_agent(flush: false)
@source_only_buffer_agent = SourceOnlyBufferAgent.new(log: log, system_config: Fluent::Engine.system_config)
@source_only_buffer_agent.configure(flush: flush)
end

def cleanup_source_only_buffer_agent
@source_only_buffer_agent&.cleanup
end

def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
unless kind_or_agent_list
if @with_source_only
kind_or_agent_list = [:input, @source_only_buffer_agent]
elsif @source_only_buffer_agent
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
else
kind_or_agent_list = [:input, :output_with_router, @labels.values, :filter, :output].flatten
end

kind_or_agent_list.reverse! if desc
end

kind_or_agent_list.each do |kind|
if kind.respond_to?(:lifecycle)
label = kind
label.lifecycle(desc: desc) do |plugin, display_kind|
agent = kind
agent.lifecycle(desc: desc) do |plugin, display_kind|
yield plugin, display_kind
end
else
Expand All @@ -198,8 +231,8 @@ def lifecycle(desc: false, kind_callback: nil)
end
end

def start
lifecycle(desc: true) do |i| # instance
def start(kind_or_agent_list: nil)
lifecycle(desc: true, kind_or_agent_list: kind_or_agent_list) do |i| # instance
i.start unless i.started?
# Input#start sometimes emits lots of events with in_tail/`read_from_head true` case
# and it causes deadlock for small buffer/queue output. To avoid such problem,
Expand Down Expand Up @@ -231,13 +264,45 @@ def flush!
flushing_threads.each{|t| t.join }
end

def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
def cancel_source_only!
unless @with_source_only
log.info "do nothing for canceling with-source-only because the current mode is not with-source-only."
return
end

log.info "cancel with-source-only mode and start the other plugins"
start

lifecycle_control_list[:input].each(&:event_emitter_cancel_source_only)

# Want to make sure that the source_only_router finishes all process before
# shutting down the agent.
# Strictly speaking, it would be necessary to have exclusive lock between
# EventRouter and the shutting down process of this agent.
# However, adding lock to EventRouter would worsen its performance, and
# the entire shutting down process does not care about it either.
# So, just sleep a few seconds here, though it may not be strictly safe.
sleep 5

shutdown(kind_or_agent_list: [@source_only_buffer_agent])
@source_only_buffer_agent = nil

# This agent can stop after flushing its all buffer, but it is not implemented for now.
log.info "starts the loading agent for with-source-only"
setup_source_only_buffer_agent(flush: true)
start(kind_or_agent_list: [@source_only_buffer_agent])

@with_source_only = false
end

def shutdown(kind_or_agent_list: nil)
# Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins
# These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible
# if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others.
# Plugins should be separated and be in sandbox to protect data in each plugins/buffers.

lifecycle_safe_sequence = ->(method, checker) {
lifecycle do |instance, kind|
lifecycle(kind_or_agent_list: kind_or_agent_list) do |instance, kind|
begin
log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id
instance.__send__(method) unless instance.__send__(checker)
Expand All @@ -260,7 +325,7 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a
operation_threads.each{|t| t.join }
operation_threads.clear
}
lifecycle(kind_callback: callback) do |instance, kind|
lifecycle(kind_callback: callback, kind_or_agent_list: kind_or_agent_list) do |instance, kind|
t = Thread.new do
Thread.current.abort_on_exception = true
begin
Expand Down Expand Up @@ -301,6 +366,8 @@ def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, a
lifecycle_unsafe_sequence.call(:close, :closed?)

lifecycle_safe_sequence.call(:terminate, :terminated?)

cleanup_source_only_buffer_agent unless kind_or_agent_list
end

def suppress_interval(interval_time)
Expand All @@ -318,6 +385,7 @@ def add_source(type, conf)
# See also 'fluentd/plugin/input.rb'
input.context_router = @event_router
input.configure(conf)
input.event_emitter_set_source_only if @with_source_only
if @enable_input_metrics
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
end
Expand Down
61 changes: 61 additions & 0 deletions lib/fluent/source_only_buffer_agent.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'securerandom'

require 'fluent/agent'
require 'fluent/system_config'

module Fluent
class SourceOnlyBufferAgent < Agent
BUFFER_DIR_NAME = SecureRandom.uuid

def initialize(log:, system_config:)
super(log: log)

@buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME)
end

def configure(flush: false)
super(
Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [
Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT'}, [
Config::Element.new('buffer', '', {
'path' => @buffer_path,
'flush_at_shutdown' => flush ? 'true' : 'false',
'flush_thread_count' => flush ? 1 : 0,
'overflow_action' => "drop_oldest_chunk",
}, [])
])
])
)
end

def cleanup
unless Dir.empty?(@buffer_path)
log.warn "some buffer files remain in #{@buffer_path}. They need to be manually recovered." +
" Please consider recovering or saving the buffer files in the directory."
return
end

begin
FileUtils.remove_dir(@buffer_path)
rescue => e
log.warn "failed to remove the empty buffer dirctory: #{@buffer_path}", error: e
end
end
end
end
Loading

0 comments on commit 7fd34a2

Please sign in to comment.