From 919e91b6a9251ac227ac62a778f58cae25bdb2ab Mon Sep 17 00:00:00 2001 From: Athish Pranav D Date: Thu, 29 Aug 2024 11:54:01 +0530 Subject: [PATCH] Transition filter_parser from single record to stream Signed-off-by: Athish Pranav D --- lib/fluent/plugin/filter_parser.rb | 122 ++++++++++++++--------------- 1 file changed, 59 insertions(+), 63 deletions(-) diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index 6998a38286..9da07812dc 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -57,78 +57,74 @@ def configure(conf) FAILED_RESULT = [nil, nil].freeze # reduce allocation cost REPLACE_CHAR = '?'.freeze - def filter_with_time(tag, time, record) - raw_value = @accessor.call(record) - if raw_value.nil? - if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) - end - if @reserve_data - return time, handle_parsed(tag, record, time, {}) - else - return FAILED_RESULT - end - end - begin - # Note: https://github.com/fluent/fluentd/issues/4100 - # If the parser returns multiple records from one raw_value, - # this returns only the first one record. - # This should be fixed in the future version. - result_time = nil - result_record = nil - - @parser.parse(raw_value) do |t, values| - if values - t = if @reserve_time - time + def filter_stream(tag, es) + new_es = Fluent::MultiEventStream.new + es.each do |time, record| + begin + raw_value = @accessor.call(record) + if raw_value.nil? + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) + end + if @reserve_data + new_es.add(time, handle_parsed(tag, record, time, {})) + end + next + end + begin + result_time = nil + result_record = nil + + @parser.parse(raw_value) do |t, values| + if values + t = if @reserve_time + time + else + t.nil? ? time : t + end + @accessor.delete(record) if @remove_key_name_field + r = handle_parsed(tag, record, t, values) + + if result_record.nil? + result_time = t + result_record = r else - t.nil? ? time : t + if @emit_invalid_record_to_error + router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new( + "Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'" + )) + end end - @accessor.delete(record) if @remove_key_name_field - r = handle_parsed(tag, record, t, values) - - if result_record.nil? - result_time = t - result_record = r - else - if @emit_invalid_record_to_error - router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new( - "Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'" - )) + else + if @emit_invalid_record_to_error + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) + end + + next unless @reserve_data + next unless result_record.nil? + + result_time = time + result_record = handle_parsed(tag, record, time, {}) end + new_es.add(result_time, result_record) end - else + + rescue Fluent::Plugin::Parser::ParserError => e if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) + raise e end + rescue ArgumentError => e + raise unless @replace_invalid_sequence + raise unless e.message.index("invalid byte sequence in") == 0 - next unless @reserve_data - next unless result_record.nil? - - result_time = time - result_record = handle_parsed(tag, record, time, {}) + raw_value = raw_value.scrub(REPLACE_CHAR) + retry + rescue => e + if @emit_invalid_record_to_error + raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" + end end end - - return result_time, result_record - rescue Fluent::Plugin::Parser::ParserError => e - if @emit_invalid_record_to_error - raise e - else - return FAILED_RESULT - end - rescue ArgumentError => e - raise unless @replace_invalid_sequence - raise unless e.message.index("invalid byte sequence in") == 0 - - raw_value = raw_value.scrub(REPLACE_CHAR) - retry - rescue => e - if @emit_invalid_record_to_error - raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" - else - return FAILED_RESULT - end end end