diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index 6998a38286..94370314b1 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -54,29 +54,32 @@ def configure(conf) @parser = parser_create end - FAILED_RESULT = [nil, nil].freeze # reduce allocation cost REPLACE_CHAR = '?'.freeze - def filter_with_time(tag, time, record) - raw_value = @accessor.call(record) - if raw_value.nil? - if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist")) - end - if @reserve_data - return time, handle_parsed(tag, record, time, {}) - else - return FAILED_RESULT + def filter_stream(tag, es) + new_es = Fluent::MultiEventStream.new + es.each do |time, record| + begin + raw_value = @accessor.call(record) + if raw_value.nil? + new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data + raise ArgumentError, "#{@key_name} does not exist" + else + filter_one_record(tag, time, record, raw_value) do |result_time, result_record| + new_es.add(result_time, result_record) + end + end + rescue => e + router.emit_error_event(tag, time, record, e) if @emit_invalid_record_to_error end end - begin - # Note: https://github.com/fluent/fluentd/issues/4100 - # If the parser returns multiple records from one raw_value, - # this returns only the first one record. - # This should be fixed in the future version. - result_time = nil - result_record = nil + new_es + end + + private + def filter_one_record(tag, time, record, raw_value) + begin @parser.parse(raw_value) do |t, values| if values t = if @reserve_time @@ -85,38 +88,17 @@ def filter_with_time(tag, time, record) t.nil? ? time : t end @accessor.delete(record) if @remove_key_name_field - r = handle_parsed(tag, record, t, values) - - if result_record.nil? - result_time = t - result_record = r - else - if @emit_invalid_record_to_error - router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new( - "Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'" - )) - end - end else - if @emit_invalid_record_to_error - router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) - end - + router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) if @emit_invalid_record_to_error next unless @reserve_data - next unless result_record.nil? - - result_time = time - result_record = handle_parsed(tag, record, time, {}) + t = time + values = {} end + yield(t, handle_parsed(tag, record, t, values)) end - return result_time, result_record rescue Fluent::Plugin::Parser::ParserError => e - if @emit_invalid_record_to_error - raise e - else - return FAILED_RESULT - end + raise e rescue ArgumentError => e raise unless @replace_invalid_sequence raise unless e.message.index("invalid byte sequence in") == 0 @@ -124,16 +106,10 @@ def filter_with_time(tag, time, record) raw_value = raw_value.scrub(REPLACE_CHAR) retry rescue => e - if @emit_invalid_record_to_error - raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" - else - return FAILED_RESULT - end + raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}" end end - private - def handle_parsed(tag, record, t, values) if values && @inject_key_prefix values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }] diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb index 3b538798b6..d96e44a397 100644 --- a/test/plugin/test_filter_parser.rb +++ b/test/plugin/test_filter_parser.rb @@ -206,6 +206,23 @@ def test_filter end + def test_filter_with_multiple_records + d1 = create_driver(%[ + key_name data + + @type json + + ]) + time = Fluent::EventTime.from_time(@default_time) + d1.run(default_tag: @tag) do + d1.feed(time, {'data' => '[{"xxx_1":"first","yyy":"second"}, {"xxx_2":"first", "yyy_2":"second"}]'}) + end + filtered = d1.filtered + assert_equal 2, filtered.length + assert_equal ({"xxx_1"=>"first", "yyy"=>"second"}), filtered[0][1] + assert_equal ({"xxx_2"=>"first", "yyy_2"=>"second"}), filtered[1][1] + end + data(:keep_key_name => false, :remove_key_name => true) def test_filter_with_reserved_data(remove_key_name)