From 304ec54d89515186ce7a2572d72d8cc25cbe1a16 Mon Sep 17 00:00:00 2001 From: Yuxuan 'fishy' Wang Date: Mon, 8 Jan 2024 11:08:13 -0800 Subject: [PATCH] kafkabp: Add NewConsumerWithConfigOverriders We occasionally have cases that services need to set some sarama config that's not supported by kafkabp.ConsumerConfig, and there's currently no way for them to do that besides forking kafkabp code locally. Provide this as an escape hatch for easier iteration. --- kafkabp/consumer.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/kafkabp/consumer.go b/kafkabp/consumer.go index 79ba2e2ac..7f3a7e3da 100644 --- a/kafkabp/consumer.go +++ b/kafkabp/consumer.go @@ -114,11 +114,25 @@ type consumer struct { // topic. This implementation of Kafka consumer is suitable for use cases like // deliver config/data through Kafka to services. func NewConsumer(cfg ConsumerConfig) (Consumer, error) { + return NewConsumerWithConfigOverriders(cfg) +} + +// SaramaConfigOverrider provides a way for users to override certain fields in +// *sarama.Config generated from ConsumerConfig. +type SaramaConfigOverrider func(*sarama.Config) + +// NewConsumerWithConfigOverriders is provided as an escape hatch for use cases +// requiring specific sarama config not supported by ConsumerConfig. +func NewConsumerWithConfigOverriders(cfg ConsumerConfig, overriders ...SaramaConfigOverrider) (Consumer, error) { sc, err := cfg.NewSaramaConfig() if err != nil { return nil, err } + for _, override := range overriders { + override(sc) + } + switch { default: return newTopicConsumer(cfg, sc)