-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce coupling to argparse, and separate Endpoint definitions from the provided parameters. #2021
Open
twcurrie
wants to merge
27
commits into
linkedin:main
Choose a base branch
from
twcurrie:argparse
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
ff59eaf
Setting up initial function definitions.
twcurrie 7b8a04f
Pulling out first function.
twcurrie ec4eb09
Adding initial tests.
twcurrie d42b534
Pulling out handling of modifications, providing test.
twcurrie 7ec6cde
Updating comments.
twcurrie b52a1ea
Constructing lengthy test to trigger exception.
twcurrie 7dc0ec0
Adding a container for parameters.
twcurrie 34e47c2
Adding boolean flag to indicate broker id requirement.
twcurrie 829d555
Extending parameter set
twcurrie 6df3cbe
Shifting away from class=based ExecutionContext
twcurrie 389dfe2
Adding some tests around ParameterSet
twcurrie 18d9c23
Pushing default to parameter definition, rather than __init__
twcurrie 4df2177
Pivoting endpoint to not hold parameters.
twcurrie 74f7c1d
Removing difference between parameter name and request flag.
twcurrie 9a6ec4d
Removing parameter checks, relying on default rather than __init__ de…
twcurrie d294c4f
Correcting argument check.
twcurrie c604ced
Modifying test.
twcurrie 8e1e68b
Reorder class methods.
twcurrie 7e64ce3
Updating comments.
twcurrie f443566
Reversing broker id switch
twcurrie 630aa14
Providing argument name alignment.
twcurrie 4b6efcc
Merge remote-tracking branch 'origin/migrate_to_kafka_2_5' into argparse
twcurrie d6c3294
Removing redundant class variable.
twcurrie 15ecbeb
Shifting more logic from endpoint to parameter set.
twcurrie 558667c
Adding back the deprecated function, maybe a bit much.
twcurrie bcd8d46
Updating function names.
twcurrie 9c6b512
UPdating docs.
twcurrie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
182 changes: 28 additions & 154 deletions
182
cruise-control-client/cruisecontrolclient/client/Endpoint.py
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,143 +5,93 @@ | |
import cruisecontrolclient.client.Endpoint as Endpoint | ||
|
||
# To be able to make more precise type hints | ||
from typing import Dict, Tuple, Type | ||
from typing import Dict, List, Set, Type | ||
|
||
# Define the in-order available endpoints for programmatically building the argparse CLI | ||
AVAILABLE_ENDPOINTS: List[Type[Endpoint.AbstractEndpoint]] = [ | ||
Endpoint.AddBrokerEndpoint, | ||
Endpoint.AdminEndpoint, | ||
Endpoint.BootstrapEndpoint, | ||
Endpoint.DemoteBrokerEndpoint, | ||
Endpoint.FixOfflineReplicasEndpoint, | ||
Endpoint.KafkaClusterStateEndpoint, | ||
Endpoint.LoadEndpoint, | ||
Endpoint.PartitionLoadEndpoint, | ||
Endpoint.PauseSamplingEndpoint, | ||
Endpoint.ProposalsEndpoint, | ||
Endpoint.RebalanceEndpoint, | ||
Endpoint.RemoveBrokerEndpoint, | ||
Endpoint.ResumeSamplingEndpoint, | ||
Endpoint.RightsizeEndpoint, | ||
Endpoint.ReviewBoardEndpoint, | ||
Endpoint.ReviewEndpoint, | ||
Endpoint.StateEndpoint, | ||
Endpoint.StopProposalExecutionEndpoint, | ||
Endpoint.TrainEndpoint, | ||
Endpoint.TopicConfigurationEndpoint, | ||
Endpoint.UserTasksEndpoint | ||
] | ||
|
||
class ExecutionContext: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a requirement, but there isn't a major benefit in storing these properties within a class. It merely adds an additional layer between the values and things accessing them. |
||
# Define the in-order available endpoints for programmatically building the argparse CLI | ||
available_endpoints: Tuple[Type[Endpoint.AbstractEndpoint]] = ( | ||
Endpoint.AddBrokerEndpoint, | ||
Endpoint.AdminEndpoint, | ||
Endpoint.BootstrapEndpoint, | ||
Endpoint.DemoteBrokerEndpoint, | ||
Endpoint.FixOfflineReplicasEndpoint, | ||
Endpoint.KafkaClusterStateEndpoint, | ||
Endpoint.LoadEndpoint, | ||
Endpoint.PartitionLoadEndpoint, | ||
Endpoint.PauseSamplingEndpoint, | ||
Endpoint.ProposalsEndpoint, | ||
Endpoint.RebalanceEndpoint, | ||
Endpoint.RemoveBrokerEndpoint, | ||
Endpoint.ResumeSamplingEndpoint, | ||
Endpoint.RightsizeEndpoint, | ||
Endpoint.ReviewBoardEndpoint, | ||
Endpoint.ReviewEndpoint, | ||
Endpoint.StateEndpoint, | ||
Endpoint.StopProposalExecutionEndpoint, | ||
Endpoint.TrainEndpoint, | ||
Endpoint.TopicConfigurationEndpoint, | ||
Endpoint.UserTasksEndpoint | ||
) | ||
# Define the argparse flags that cannot be used for endpoint parameters | ||
NON_PARAMETER_FLAGS: Set[str] = { | ||
'add_parameter', | ||
'endpoint_subparser', | ||
'remove_parameter', | ||
'socket_address' | ||
} | ||
|
||
# A mapping from the names of the subparsers to the Endpoint object they should instantiate. | ||
# | ||
# Like: | ||
# {"add_broker": AddBrokerEndpoint, | ||
# "add_brokers": AddBrokerEndpoint, | ||
# "admin": AdminEndpoint, | ||
# "bootstrap": BootstrapEndpoint, | ||
# "demote_broker": DemoteBrokerEndpoint, | ||
# "demote_brokers": DemoteBrokerEndpoint, | ||
# "fix_offline_replicas": FixOfflineReplicasEndpoint, | ||
# "kafka_cluster_state": KafkaClusterStateEndpoint, | ||
# "load": LoadEndpoint, | ||
# "partition_load": PartitionLoadEndpoint, | ||
# "pause_sampling": PauseSamplingEndpoint, | ||
# "proposals": ProposalsEndpoint, | ||
# "rebalance": RebalanceEndpoint, | ||
# "remove_broker": RemoveBrokerEndpoint, | ||
# "remove_brokers": RemoveBrokerEndpoint, | ||
# "resume_sampling": ResumeSamplingEndpoint, | ||
# "rightsize": RightsizeEndpoint, | ||
# "state": StateEndpoint, | ||
# "stop_proposal_execution": StopProposalExecutionEndpoint, | ||
# "stop": StopProposalExecutionEndpoint, | ||
# "train": TrainEndpoint, | ||
# "user_task": UserTasksEndpoint, | ||
# "user_tasks": UserTasksEndpoint} | ||
dest_to_Endpoint: Dict[str, Type[Endpoint.AbstractEndpoint]] = {} | ||
available_parameter_set = set() | ||
for endpoint in available_endpoints: | ||
dest_to_Endpoint[endpoint.name] = endpoint | ||
if 'aliases' in endpoint.argparse_properties['kwargs']: | ||
endpoint_aliases = endpoint.argparse_properties['kwargs']['aliases'] | ||
for alias in endpoint_aliases: | ||
dest_to_Endpoint[alias] = endpoint | ||
for parameter in endpoint.available_Parameters: | ||
available_parameter_set.add(parameter) | ||
# A mapping from the names of the subparsers to the Endpoint object they should instantiate. | ||
# | ||
# Like: | ||
# {"add_broker": AddBrokerEndpoint, | ||
# "add_brokers": AddBrokerEndpoint, | ||
# "admin": AdminEndpoint, | ||
# "bootstrap": BootstrapEndpoint, | ||
# "demote_broker": DemoteBrokerEndpoint, | ||
# ... | ||
# "user_task": UserTasksEndpoint, | ||
# "user_tasks": UserTasksEndpoint} | ||
NAME_TO_ENDPOINT: Dict[str, Type[Endpoint.AbstractEndpoint]] = {} | ||
AVAILABLE_PARAMETER_SET = set() | ||
for endpoint in AVAILABLE_ENDPOINTS: | ||
NAME_TO_ENDPOINT[endpoint.name] = endpoint | ||
if 'aliases' in endpoint.argparse_properties['kwargs']: | ||
endpoint_aliases = endpoint.argparse_properties['kwargs']['aliases'] | ||
for alias in endpoint_aliases: | ||
NAME_TO_ENDPOINT[alias] = endpoint | ||
for parameter in endpoint.available_parameters: | ||
AVAILABLE_PARAMETER_SET.add(parameter) | ||
|
||
# A mapping from the names of the argparse parameters to their cruise-control parameter name | ||
# | ||
# Like: | ||
# {'allow_capacity_estimation': 'allow_capacity_estimation', | ||
# 'brokers': 'brokerid', | ||
# 'clearmetrics': 'clearmetrics', | ||
# 'client_id': 'client_ids', | ||
# 'concurrency': 'concurrent_partition_movements_per_broker', | ||
# 'data_from': 'data_from', | ||
# 'disable_self_healing_for': 'disable_self_healing_for', | ||
# 'dry_run': 'dryRun', | ||
# 'enable_self_healing_for': 'enable_self_healing_for', | ||
# 'end_timestamp': 'end', | ||
# 'endpoint': 'endpoints', | ||
# 'exclude_follower_demotion': 'exclude_follower_demotion', | ||
# 'exclude_recently_demoted_brokers': 'exclude_recently_demoted_brokers', | ||
# 'exclude_recently_removed_brokers': 'exclude_recently_removed_brokers', | ||
# 'excluded_topics': 'excluded_topics', | ||
# 'goals': 'goals', | ||
# 'ignore_proposal_cache': 'ignore_proposal_cache', | ||
# 'leader_concurrency': 'concurrent_leader_movements', | ||
# 'max_load': 'max_load', | ||
# 'min_valid_partition_ratio': 'min_valid_partition_ratio', | ||
# 'number_of_entries_to_show': 'entries', | ||
# 'partition': 'partition', | ||
# 'reason': 'reason', | ||
# 'resource': 'resource', | ||
# 'skip_hard_goal_check': 'skip_hard_goal_check', | ||
# 'skip_urp_demotion': 'skip_urp_demotion', | ||
# 'start_timestamp': 'start', | ||
# 'strategies': 'replica_movement_strategies', | ||
# 'substate': 'substates', | ||
# 'super_verbose': 'super_verbose', | ||
# 'text': 'json', | ||
# 'throttle_removed_broker': 'throttle_removed_broker', | ||
# 'timestamp': 'time', | ||
# 'topic': 'topic', | ||
# 'types': 'types', | ||
# 'use_ready_default_goals': 'use_ready_default_goals', | ||
# 'user_task_ids': 'user_task_ids', | ||
# 'verbose': 'verbose'} | ||
flag_to_parameter_name: Dict[str, str] = {} | ||
for parameter in available_parameter_set: | ||
argparse_parameter_name = None | ||
# argparse names this parameter's flag after the first string in 'args' | ||
for possible_argparse_name in parameter.argparse_properties['args']: | ||
# argparse chooses flag names only from the --flags | ||
if not possible_argparse_name.startswith('-'): | ||
argparse_parameter_name = possible_argparse_name.replace('-', '_') | ||
elif not possible_argparse_name.startswith('--'): | ||
continue | ||
else: | ||
argparse_parameter_name = possible_argparse_name.lstrip('-').replace('-', '_') | ||
break | ||
if argparse_parameter_name and argparse_parameter_name in flag_to_parameter_name: | ||
raise ValueError(f"Colliding parameter flags: {argparse_parameter_name}") | ||
# A mapping from the names of the argparse parameters to their cruise-control parameter name | ||
# | ||
# Like: | ||
# {'allow_capacity_estimation': 'allow_capacity_estimation', | ||
# 'brokers': 'brokerid', | ||
# 'clearmetrics': 'clearmetrics', | ||
# 'client_id': 'client_ids', | ||
# 'concurrency': 'concurrent_partition_movements_per_broker', | ||
# 'data_from': 'data_from', | ||
# 'disable_self_healing_for': 'disable_self_healing_for', | ||
# 'dry_run': 'dryRun', | ||
# ... | ||
# 'types': 'types', | ||
# 'use_ready_default_goals': 'use_ready_default_goals', | ||
# 'user_task_ids': 'user_task_ids', | ||
# 'verbose': 'verbose'} | ||
FLAG_TO_PARAMETER_NAME: Dict[str, str] = {} | ||
for parameter in AVAILABLE_PARAMETER_SET: | ||
argparse_parameter_name = None | ||
# argparse names this parameter's flag after the first string in 'args' | ||
for possible_argparse_name in parameter.argparse_properties['args']: | ||
# argparse chooses flag names only from the --flags | ||
if not possible_argparse_name.startswith('-'): | ||
argparse_parameter_name = possible_argparse_name.replace('-', '_') | ||
elif not possible_argparse_name.startswith('--'): | ||
continue | ||
else: | ||
flag_to_parameter_name[argparse_parameter_name] = parameter.name | ||
|
||
def __init__(self): | ||
# Define the argparse flags that cannot be used for endpoint parameters | ||
# | ||
# This helps the multi-stage argument parsing not to conflict with itself | ||
# during the different stages. | ||
# | ||
# To this will probably be added things like: | ||
# 'add_parameter' | ||
# 'endpoint_subparser' | ||
# 'socket_address' | ||
# 'remove_parameter' | ||
# | ||
# If you've added flags to the argument_parser, they should be added to | ||
# this set. | ||
self.non_parameter_flags = set() | ||
argparse_parameter_name = possible_argparse_name.lstrip('-').replace('-', '_') | ||
break | ||
if argparse_parameter_name and argparse_parameter_name in FLAG_TO_PARAMETER_NAME: | ||
raise ValueError(f"Colliding parameter flags: {argparse_parameter_name}") | ||
else: | ||
FLAG_TO_PARAMETER_NAME[argparse_parameter_name] = parameter.name |
79 changes: 79 additions & 0 deletions
79
cruise-control-client/cruisecontrolclient/client/ParameterSet.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from collections import MutableSet | ||
from itertools import chain | ||
from typing import Callable, Dict, Iterator, Optional, Set, Tuple, Type, Union | ||
|
||
from cruisecontrolclient.client.CCParameter import AbstractParameter | ||
|
||
primitive = Union[bool, float, int, str] | ||
|
||
|
||
class ParameterSet(MutableSet): | ||
|
||
def __init__(self, allowed_parameters: Optional[Tuple[Type[AbstractParameter]]] = None): | ||
self._parameter_name_to_allowed_parameters: Dict[ | ||
str, Callable[[Union[str, int, bool]], AbstractParameter]] = \ | ||
{ap.name: ap for ap in allowed_parameters} if allowed_parameters else {} | ||
self.adhoc_parameters: Dict[str, primitive] = {} | ||
self.instantiated_parameters: Dict[str, AbstractParameter] = {} | ||
|
||
def __len__(self) -> int: | ||
return len(self.adhoc_parameters) + len(self.instantiated_parameters) | ||
|
||
def __contains__(self, parameter: Union[AbstractParameter, str]) -> bool: | ||
try: | ||
return parameter.name in self.instantiated_parameters | ||
except AttributeError: | ||
return parameter in self.adhoc_parameters | ||
|
||
def __iter__(self) -> Iterator[Union[AbstractParameter, Tuple[str, primitive]]]: | ||
yield from chain(self.instantiated_parameters.values(), list(self.adhoc_parameters.items())) | ||
|
||
def is_allowed(self, parameter: Union[AbstractParameter, str]): | ||
try: | ||
return parameter.name in self._parameter_name_to_allowed_parameters | ||
except AttributeError: | ||
return parameter in self._parameter_name_to_allowed_parameters | ||
|
||
def add(self, parameter: Union[AbstractParameter, Tuple[str, primitive]]) -> None: | ||
try: | ||
key, value = parameter | ||
except ValueError: | ||
raise ValueError("Must provide two item tuple with key, value") | ||
except TypeError: | ||
if not self.is_allowed(parameter): | ||
raise ValueError(f"Parameter {parameter.name} not allowed within set.") | ||
if parameter.value is None: | ||
raise ValueError("Parameter has no value") | ||
self.instantiated_parameters[parameter.name] = parameter | ||
return | ||
|
||
if value is None: | ||
raise ValueError("Parameter has no value") | ||
|
||
if self.is_allowed(parameter): | ||
self.instantiated_parameters[key] = self._parameter_name_to_allowed_parameters[key](value) | ||
self.adhoc_parameters[key] = value | ||
|
||
def discard(self, parameter: Union[Type[AbstractParameter], str]) -> None: | ||
try: | ||
del self.instantiated_parameters[parameter.name] | ||
except AttributeError: | ||
if parameter in self.adhoc_parameters: | ||
del self.adhoc_parameters[parameter] | ||
except KeyError: | ||
pass | ||
|
||
def compose(self) -> Dict[str, primitive]: | ||
return {**self.adhoc_parameters, | ||
**{parameter.name: parameter.value for parameter in self.instantiated_parameters.values()}} | ||
|
||
def get(self, parameter: Union[Type[AbstractParameter], str]) -> Optional[Union[AbstractParameter, primitive]]: | ||
try: | ||
lookup = parameter.name | ||
except AttributeError: | ||
lookup = parameter | ||
|
||
try: | ||
return self.instantiated_parameters[lookup] | ||
except KeyError: | ||
return self.adhoc_parameters[lookup] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, this is set in the
__init__
method of the StateEndpoint definition, but I think it's more appropriate to handle the definition a layer higher which surface it to the user of the cli.