Skip to content

Commit

Permalink
Configurable strategy for consuming multiple topics
Browse files Browse the repository at this point in the history
The current ConsumerSet implementation will only switch topics when the
current topic returns no messages after the poll timeout.

This is not practical when one or more of the topics has a 'high' message
volume as the broker will always return a message and Racecar may never
switch to poll the other topics.

* 'high' here means message frequency (hz) > poll interval (s).
  • Loading branch information
bestie committed Jan 18, 2024
1 parent d7d3545 commit 236478f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
3 changes: 3 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class Config < KingKonf::Config
desc "Used only by the liveness probe: Max time (in seconds) between liveness events before the process is considered not healthy"
integer :liveness_probe_max_interval, default: 5

desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x"
string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic"

# The error handler must be set directly on the object.
attr_reader :error_handler

Expand Down
12 changes: 9 additions & 3 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def poll_with_retries(max_wait_time_ms)

# polls a message for the current consumer, handling any API edge cases.
def poll_current_consumer(max_wait_time_ms)
@last_poll_read_nil_message = false
msg = current.poll(max_wait_time_ms)
rescue Rdkafka::RdkafkaError => e
case e.code
Expand Down Expand Up @@ -212,9 +213,14 @@ def reset_current_consumer
end

def maybe_select_next_consumer
return unless @last_poll_read_nil_message
@last_poll_read_nil_message = false
select_next_consumer
case @config.multi_subscription_strategy
when "round-robin"
select_next_consumer
else # "exhaust-topic"
if @last_poll_read_nil_message
select_next_consumer
end
end
end

def select_next_consumer
Expand Down
54 changes: 54 additions & 0 deletions spec/consumer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -572,5 +572,59 @@ def message_generator(messages)
count.times { polled += consumer_set.batch_poll(100) rescue [] }
expect(polled).to eq [:msg1, :msg1, :msg1, :msgN, :msgN, :msgN]
end

context "when multiple consumers are configured as 'round-robin'" do
before do
config.multi_subscription_strategy = "round-robin"
allow(rdconsumer1).to receive(:poll).and_return(topic1_message)
allow(rdconsumer2).to receive(:poll).and_return(topic2_message)
allow(rdconsumer3).to receive(:poll).and_return(topic3_message)
end

let(:config) { Racecar::Config.new }
let(:interval) { 1000.0 }
let(:topic1_message) { double(:topic1_message) }
let(:topic2_message) { double(:topic2_message) }
let(:topic3_message) { double(:topic3_message) }


describe "#poll" do
it "consumes 1 message from each topic in turn" do
messages = 6.times.map {
consumer_set.poll(interval)
}

expect(messages).to eq([
topic1_message,
topic2_message,
topic3_message,
topic1_message,
topic2_message,
topic3_message,
])
end
end

describe "#batch_poll" do
before do
config.fetch_messages = 1
end

it "consumes 1 batch from each topic in turn" do
messages = 6.times.map {
consumer_set.batch_poll(interval)
}

expect(messages).to eq([
[topic1_message],
[topic2_message],
[topic3_message],
[topic1_message],
[topic2_message],
[topic3_message],
])
end
end
end
end
end

0 comments on commit 236478f

Please sign in to comment.