diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index b34f6c824f..cda7926fab 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -748,23 +748,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block) modified_chunks = [] modified_metadata = metadata get_next_chunk = ->(){ - c = if staged_chunk_used - # Staging new chunk here is bad idea: - # Recovering whole state including newly staged chunks is much harder than current implementation. - modified_metadata = modified_metadata.dup_next - generate_chunk(modified_metadata) - else - synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! } - end - modified_chunks << {chunk: c, adding_bytesize: 0, errors: []} - return c, modified_chunks.last[:errors] + if staged_chunk_used + # Staging new chunk here is bad idea: + # Recovering whole state including newly staged chunks is much harder than current implementation. + modified_metadata = modified_metadata.dup_next + generate_chunk(modified_metadata) + else + synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! } + end } writing_splits_index = 0 enqueue_chunk_before_retry = false while writing_splits_index < splits.size - chunk, errors = get_next_chunk.call + chunk = get_next_chunk.call + errors = [] + modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors} chunk.synchronize do raise ShouldRetry unless chunk.writable? staged_chunk_used = true if chunk.staged?