From 054437553ec91e5d976c308f1a2382b465a4978d Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 9 Dec 2024 11:26:26 +0800 Subject: [PATCH] Set MaxPollPartitionBytes upper limit Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 6 +++ kafka/consumer_test.go | 110 ++++++++++++++++++++++------------------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index d0f85f8c..2f89ea2b 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -154,6 +154,12 @@ func (cfg *ConsumerConfig) finalize() error { if cfg.BrokerMaxReadBytes < 0 { errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative")) } + if cfg.MaxPollPartitionBytes > 0 { + if cfg.MaxPollPartitionBytes > 1<<30 { + cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB") + cfg.MaxPollPartitionBytes = 1 << 30 + } + } if cfg.BrokerMaxReadBytes > 1<<30 { cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB") cfg.BrokerMaxReadBytes = 1 << 30 diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 8b24ed7b..60775fa7 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -771,11 +771,12 @@ func TestConsumerConfigFinalizer(t *testing.T) { } t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 20, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + MaxPollPartitionBytes: 1 << 20, } err := cfg.finalize() require.NoError(t, err) @@ -785,20 +786,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 20, - BrokerMaxReadBytes: 1 << 21, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 20, + MaxPollPartitionBytes: 1 << 20, + BrokerMaxReadBytes: 1 << 21, }, cfg) }) t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 28, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + MaxPollPartitionBytes: 1 << 28, } err := cfg.finalize() require.NoError(t, err) @@ -808,20 +811,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 28, - BrokerMaxReadBytes: 1 << 29, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 28, + MaxPollPartitionBytes: 1 << 28, + BrokerMaxReadBytes: 1 << 29, }, cfg) }) t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 29, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + MaxPollPartitionBytes: 1 << 29, } err := cfg.finalize() require.NoError(t, err) @@ -831,20 +836,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 29, - BrokerMaxReadBytes: 1 << 30, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 29, + MaxPollPartitionBytes: 1 << 29, + BrokerMaxReadBytes: 1 << 30, }, cfg) }) t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 30, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, } err := cfg.finalize() require.NoError(t, err) @@ -854,20 +861,22 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 30, - BrokerMaxReadBytes: 1 << 30, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, }, cfg) }) t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) { cfg := ConsumerConfig{ - CommonConfig: ccfg, - Processor: proc, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1<<31 - 1, + CommonConfig: ccfg, + Processor: proc, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1<<31 - 1, + MaxPollPartitionBytes: 1<<31 - 1, } err := cfg.finalize() require.NoError(t, err) @@ -877,11 +886,12 @@ func TestConsumerConfigFinalizer(t *testing.T) { cfg.Logger = nil assert.Equal(t, ConsumerConfig{ - CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, - Topics: []apmqueue.Topic{"topic"}, - GroupID: "groupid", - MaxPollBytes: 1 << 30, - BrokerMaxReadBytes: 1 << 30, + CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}}, + Topics: []apmqueue.Topic{"topic"}, + GroupID: "groupid", + MaxPollBytes: 1 << 30, + MaxPollPartitionBytes: 1 << 30, + BrokerMaxReadBytes: 1 << 30, }, cfg) }) }