Skip to content

Commit

Permalink
Set MaxPollPartitionBytes upper limit
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Dec 9, 2024
1 parent 422f99d commit 0544375
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 50 deletions.
6 changes: 6 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ func (cfg *ConsumerConfig) finalize() error {
if cfg.BrokerMaxReadBytes < 0 {
errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative"))
}
if cfg.MaxPollPartitionBytes > 0 {
if cfg.MaxPollPartitionBytes > 1<<30 {
cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB")
cfg.MaxPollPartitionBytes = 1 << 30
}
}
if cfg.BrokerMaxReadBytes > 1<<30 {
cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB")
cfg.BrokerMaxReadBytes = 1 << 30
Expand Down
110 changes: 60 additions & 50 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,11 +771,12 @@ func TestConsumerConfigFinalizer(t *testing.T) {
}
t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
MaxPollPartitionBytes: 1 << 20,
}
err := cfg.finalize()
require.NoError(t, err)
Expand All @@ -785,20 +786,22 @@ func TestConsumerConfigFinalizer(t *testing.T) {
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
BrokerMaxReadBytes: 1 << 21,
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 20,
MaxPollPartitionBytes: 1 << 20,
BrokerMaxReadBytes: 1 << 21,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
MaxPollPartitionBytes: 1 << 28,
}
err := cfg.finalize()
require.NoError(t, err)
Expand All @@ -808,20 +811,22 @@ func TestConsumerConfigFinalizer(t *testing.T) {
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
BrokerMaxReadBytes: 1 << 29,
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 28,
MaxPollPartitionBytes: 1 << 28,
BrokerMaxReadBytes: 1 << 29,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
MaxPollPartitionBytes: 1 << 29,
}
err := cfg.finalize()
require.NoError(t, err)
Expand All @@ -831,20 +836,22 @@ func TestConsumerConfigFinalizer(t *testing.T) {
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
BrokerMaxReadBytes: 1 << 30,
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 29,
MaxPollPartitionBytes: 1 << 29,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
}
err := cfg.finalize()
require.NoError(t, err)
Expand All @@ -854,20 +861,22 @@ func TestConsumerConfigFinalizer(t *testing.T) {
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) {
cfg := ConsumerConfig{
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1<<31 - 1,
CommonConfig: ccfg,
Processor: proc,
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1<<31 - 1,
MaxPollPartitionBytes: 1<<31 - 1,
}
err := cfg.finalize()
require.NoError(t, err)
Expand All @@ -877,11 +886,12 @@ func TestConsumerConfigFinalizer(t *testing.T) {
cfg.Logger = nil

assert.Equal(t, ConsumerConfig{
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
Topics: []apmqueue.Topic{"topic"},
GroupID: "groupid",
MaxPollBytes: 1 << 30,
MaxPollPartitionBytes: 1 << 30,
BrokerMaxReadBytes: 1 << 30,
}, cfg)
})
}
Expand Down

0 comments on commit 0544375

Please sign in to comment.