Skip to content

Commit

Permalink
Fix possible race condition of processing buffer chunk
Browse files Browse the repository at this point in the history
Fix #3089

Signed-off-by: Takuro Ashie <[email protected]>
  • Loading branch information
ashie committed Oct 31, 2023
1 parent 36220e5 commit 2888c2c
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ def close
synchronize do
log.debug "closing buffer", instance: self.object_id
@dequeued.each_pair do |chunk_id, chunk|
chunk.close
chunk.synchronize { chunk.close }
end
until @queue.empty?
@queue.shift.close
end
@stage.each_pair do |metadata, chunk|
chunk.close
chunk.synchronize { chunk.close }
end
end
end
Expand Down Expand Up @@ -407,9 +407,11 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
# FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
synchronize { @stage_size_metrics.add(bytesize) }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
synchronize do
chunk.synchronize do
@stage_size_metrics.add(bytesize)
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end
end

Expand Down Expand Up @@ -485,8 +487,8 @@ def enqueue_chunk(metadata)
end
return nil unless chunk

chunk.synchronize do
synchronize do
synchronize do
chunk.synchronize do
if chunk.empty?
chunk.close
else
Expand Down Expand Up @@ -597,9 +599,11 @@ def purge_chunk(chunk_id)
log.on_trace { log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata }

begin
bytesize = chunk.bytesize
chunk.purge
@queue_size_metrics.sub(bytesize)
chunk.synchronize do
bytesize = chunk.bytesize
chunk.purge
@queue_size_metrics.sub(bytesize)
end
rescue => e
log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e
log.error_backtrace
Expand Down

0 comments on commit 2888c2c

Please sign in to comment.