Skip to content

Commit

Permalink
Merge pull request #540 from trauter/rhmaster
Browse files Browse the repository at this point in the history
Allow Static Membership Assignment Strategy
  • Loading branch information
ask authored Oct 9, 2020
2 parents b6ebdd9 + 82ecdf6 commit b5e159f
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 0 deletions.
19 changes: 19 additions & 0 deletions docs/includes/settingref.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,25 @@ exists, e.g. when starting a new consumer for the first time.
Options include 'earliest', 'latest', 'none'.


.. setting:: consumer_group_instance_id

``consumer_group_instance_id``
------------------------------

.. versionadded:: 2.1

:type: :class:`str`
:default: :const:`None`
:environment: :envvar:`CONSUMER_GROUP_INSTANCE_ID`

Consumer group instance id.

The group_instance_id for static partition assignment.

If not set, default assignment strategy is used. Otherwise,
each consumer instance has to have a unique id.


.. setting:: ConsumerScheduler

``ConsumerScheduler``
Expand Down
1 change: 1 addition & 0 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def _create_worker_consumer(
api_version=conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
group_instance_id=conf.consumer_group_instance_id,
bootstrap_servers=server_list(
transport.url, transport.default_port),
partition_assignment_strategy=[self._assignor],
Expand Down
16 changes: 16 additions & 0 deletions faust/types/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
consumer_api_version: str = None,
consumer_max_fetch_size: int = None,
consumer_auto_offset_reset: str = None,
consumer_group_instance_id: str = None,
# Topic serialization settings:
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
Expand Down Expand Up @@ -1099,6 +1100,21 @@ def consumer_auto_offset_reset(self) -> str:
Options include 'earliest', 'latest', 'none'.
"""

@sections.Consumer.setting(
params.Str,
version_introduced='2.1',
env_name='CONSUMER_GROUP_INSTANCE_ID',
default=None,
)
def consumer_group_instance_id(self) -> str:
"""Consumer group instance id.
The group_instance_id for static partition assignment.
If not set, default assignment strategy is used. Otherwise,
each consumer instance has to have a unique id.
"""

@sections.Serialization.setting(
params.Codec,
env_name='APP_KEY_SERIALIZER',
Expand Down
1 change: 1 addition & 0 deletions t/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ def assert_create_worker_consumer(self, cthread, app,
api_version=app.conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
group_instance_id=conf.consumer_group_instance_id,
bootstrap_servers=server_list(
transport.url, transport.default_port),
partition_assignment_strategy=[cthread._assignor],
Expand Down

0 comments on commit b5e159f

Please sign in to comment.