From 2888c2cdffd491579ab4dd5af21b8af0e166ea6c Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 31 Oct 2023 17:50:54 +0900 Subject: [PATCH] Fix possible race condition of processing buffer chunk Fix #3089 Signed-off-by: Takuro Ashie --- lib/fluent/plugin/buffer.rb | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 3871b4f25b..7fa678a3ce 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -271,13 +271,13 @@ def close synchronize do log.debug "closing buffer", instance: self.object_id @dequeued.each_pair do |chunk_id, chunk| - chunk.close + chunk.synchronize { chunk.close } end until @queue.empty? @queue.shift.close end @stage.each_pair do |metadata, chunk| - chunk.close + chunk.synchronize { chunk.close } end end end @@ -407,9 +407,11 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) # FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712 # staged_bytesizes_by_chunk.each do |chunk, bytesize| - chunk.synchronize do - synchronize { @stage_size_metrics.add(bytesize) } - log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } + synchronize do + chunk.synchronize do + @stage_size_metrics.add(bytesize) + log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } + end end end @@ -485,8 +487,8 @@ def enqueue_chunk(metadata) end return nil unless chunk - chunk.synchronize do - synchronize do + synchronize do + chunk.synchronize do if chunk.empty? chunk.close else @@ -597,9 +599,11 @@ def purge_chunk(chunk_id) log.on_trace { log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata } begin - bytesize = chunk.bytesize - chunk.purge - @queue_size_metrics.sub(bytesize) + chunk.synchronize do + bytesize = chunk.bytesize + chunk.purge + @queue_size_metrics.sub(bytesize) + end rescue => e log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e log.error_backtrace