diff --git a/docs/includes/settingref.txt b/docs/includes/settingref.txt index 864374390..f9824b01e 100644 --- a/docs/includes/settingref.txt +++ b/docs/includes/settingref.txt @@ -1018,6 +1018,25 @@ exists, e.g. when starting a new consumer for the first time. Options include 'earliest', 'latest', 'none'. +.. setting:: consumer_group_instance_id + +``consumer_group_instance_id`` +------------------------------ + +.. versionadded:: 2.1 + +:type: :class:`str` +:default: :const:`None` +:environment: :envvar:`CONSUMER_GROUP_INSTANCE_ID` + +Consumer group instance id. + +The group_instance_id for static partition assignment. + +If not set, default assignment strategy is used. Otherwise, +each consumer instance has to have a unique id. + + .. setting:: ConsumerScheduler ``ConsumerScheduler`` diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 24f30fce8..2255bf541 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -345,6 +345,7 @@ def _create_worker_consumer( api_version=conf.consumer_api_version, client_id=conf.broker_client_id, group_id=conf.id, + group_instance_id=conf.consumer_group_instance_id, bootstrap_servers=server_list( transport.url, transport.default_port), partition_assignment_strategy=[self._assignor], diff --git a/faust/types/settings/settings.py b/faust/types/settings/settings.py index 4abaa725d..1b37c55b0 100644 --- a/faust/types/settings/settings.py +++ b/faust/types/settings/settings.py @@ -109,6 +109,7 @@ def __init__( consumer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, + consumer_group_instance_id: str = None, # Topic serialization settings: key_serializer: CodecArg = None, value_serializer: CodecArg = None, @@ -1099,6 +1100,21 @@ def consumer_auto_offset_reset(self) -> str: Options include 'earliest', 'latest', 'none'. """ + @sections.Consumer.setting( + params.Str, + version_introduced='2.1', + env_name='CONSUMER_GROUP_INSTANCE_ID', + default=None, + ) + def consumer_group_instance_id(self) -> str: + """Consumer group instance id. + + The group_instance_id for static partition assignment. + + If not set, default assignment strategy is used. Otherwise, + each consumer instance has to have a unique id. + """ + @sections.Serialization.setting( params.Codec, env_name='APP_KEY_SERIALIZER', diff --git a/t/unit/transport/drivers/test_aiokafka.py b/t/unit/transport/drivers/test_aiokafka.py index 0c5dcf194..b184c9466 100644 --- a/t/unit/transport/drivers/test_aiokafka.py +++ b/t/unit/transport/drivers/test_aiokafka.py @@ -737,6 +737,7 @@ def assert_create_worker_consumer(self, cthread, app, api_version=app.conf.consumer_api_version, client_id=conf.broker_client_id, group_id=conf.id, + group_instance_id=conf.consumer_group_instance_id, bootstrap_servers=server_list( transport.url, transport.default_port), partition_assignment_strategy=[cthread._assignor],