Skip to content

Commit

Permalink
out_rdkafka2: add patch for v0.16.0 or later
Browse files Browse the repository at this point in the history
Signed-off-by: Kentaro Hayashi <[email protected]>
  • Loading branch information
kenhys committed Jul 31, 2024
1 parent 1348acf commit a9fb20b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
require_relative 'rdkafka_patch/0_12_0'
elsif rdkafka_version >= Gem::Version.create('0.14.0')
require_relative 'rdkafka_patch/0_14_0'
elsif rdkafka_version >= Gem::Version.create('0.16.0')
require_relative 'rdkafka_patch/0_16_0'
end
rescue LoadError, NameError
raise "unable to patch rdkafka."
Expand Down
55 changes: 55 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_16_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
class Rdkafka::NativeKafka
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil, object_id=nil)
return true if closed?

synchronize do
# Indicate to the outside world that we are closing
@closing = true

thread_status = :unknown
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true

# Wait for the polling thread to finish up,
# this can be aborted in practice if this
# code runs from a finalizer.
thread_status = @polling_thread.join(timeout)
end

# Destroy the client after locking both mutexes
@poll_mutex.lock

# This check prevents a race condition, where we would enter the close in two threads
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
# and would continue to run, trying to destroy inner twice
retun unless @inner

Rdkafka::Bindings.rd_kafka_destroy(@inner)
@inner = nil
@opaque = nil

!thread_status.nil?
end
end
end

class Rdkafka::Producer
def close(timeout = nil)
return true if closed?
ObjectSpace.undefine_finalizer(self)

@native_kafka.close(timeout) do
# We need to remove the topics references objects before we destroy the producer,
# otherwise they would leak out
@topics_refs_map.each_value do |refs|
refs.each_value do |ref|
Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
end
end
end

@topics_refs_map.clear
end
end

0 comments on commit a9fb20b

Please sign in to comment.