Skip to content

Commit

Permalink
buffer: Simplify get_next_chunk
Browse files Browse the repository at this point in the history
Signed-off-by: Takuro Ashie <[email protected]>
  • Loading branch information
ashie committed Dec 13, 2023
1 parent fb37f08 commit 861d59e
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -748,23 +748,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
modified_chunks = []
modified_metadata = metadata
get_next_chunk = ->(){
c = if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
modified_chunks << {chunk: c, adding_bytesize: 0, errors: []}
return c, modified_chunks.last[:errors]
if staged_chunk_used
# Staging new chunk here is bad idea:
# Recovering whole state including newly staged chunks is much harder than current implementation.
modified_metadata = modified_metadata.dup_next
generate_chunk(modified_metadata)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
}

writing_splits_index = 0
enqueue_chunk_before_retry = false

while writing_splits_index < splits.size
chunk, errors = get_next_chunk.call
chunk = get_next_chunk.call
errors = []
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}
chunk.synchronize do
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?
Expand Down

0 comments on commit 861d59e

Please sign in to comment.