Skip to content

Commit

Permalink
filter_parser: add error event for multiple parsed results
Browse files Browse the repository at this point in the history
It would be not happy if some events are lost without any
warnings or errors.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Apr 30, 2024
1 parent 35e2210 commit bf7c1b8
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ def filter_with_time(tag, time, record)
end
end
begin
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
result_time = nil
result_record = nil

@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
Expand All @@ -79,24 +86,31 @@ def filter_with_time(tag, time, record)
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
return t, r

if result_record.nil?
result_time = t
result_record = r
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
))
end
end
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
end
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
return t, r
else
return FAILED_RESULT
end

next unless @reserve_data
next unless result_record.nil?

result_time = time
result_record = handle_parsed(tag, record, time, {})
end
end

return result_time, result_record
rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
Expand Down

0 comments on commit bf7c1b8

Please sign in to comment.