diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index e738f2db69..92148fd8a2 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -203,54 +203,24 @@ def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') - record_time, record = parse_params(params) - # Skip nil record - if record.nil? - log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } - if @respond_with_empty_img - return RESPONSE_IMG - else - if @use_204_response - return RESPONSE_204 - else - return RESPONSE_200 - end + mes = Fluent::MultiEventStream.new + parse_params(params) do |record_time, record| + if record.nil? + log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } + next end - end - mes = nil - # Support batched requests - if record.is_a?(Array) - mes = Fluent::MultiEventStream.new - record.each do |single_record| - add_params_to_record(single_record, params) - - if param_time = params['time'] - param_time = param_time.to_f - single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) - elsif @custom_parser - single_time = @custom_parser.parse_time(single_record) - single_time, single_record = @custom_parser.convert_values(single_time, single_record) - else - single_time = convert_time_field(single_record) - end - - mes.add(single_time, single_record) - end - else add_params_to_record(record, params) time = if param_time = params['time'] param_time = param_time.to_f param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else - if record_time.nil? - convert_time_field(record) - else - record_time - end + record_time.nil? ? convert_time_field(record) : record_time end + + mes.add(time, record) end rescue => e if @dump_error_log @@ -261,11 +231,7 @@ def on_request(path_info, params) # TODO server error begin - if mes - router.emit_stream(tag, mes) - else - router.emit(tag, time, record) - end + router.emit_stream(tag, mes) unless mes.empty? rescue => e if @dump_error_log log.error "failed to emit data", error: e @@ -308,20 +274,27 @@ def on_server_connect(conn) def parse_params_default(params) if msgpack = params['msgpack'] @parser_msgpack.parse(msgpack) do |_time, record| - return nil, record + if record.is_a?(Array) + # TODO: Temporarily supporting this case for compatibility. + # We should not consider this case here. + # We should fix MessagePackParser so that it doesn't return Array. + record.each do |single_record| + yield nil, single_record + end + else + yield nil, record + end end elsif js = params['json'] @parser_json.parse(js) do |_time, record| - return nil, record + yield nil, record end elsif ndjson = params['ndjson'] - events = [] ndjson.split(/\r?\n/).each do |js| @parser_json.parse(js) do |_time, record| - events.push(record) + yield nil, record end end - return nil, events else raise "'json', 'ndjson' or 'msgpack' parameter is required" end @@ -329,10 +302,9 @@ def parse_params_default(params) def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] - @custom_parser.parse(content) { |time, record| - raise "Received event is not #{@format_name}: #{content}" if record.nil? - return time, record - } + @custom_parser.parse(content) do |time, record| + yield time, record + end else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end