Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce coupling to argparse, and separate Endpoint definitions from the provided parameters. #2021

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ff59eaf
Setting up initial function definitions.
twcurrie May 23, 2023
7b8a04f
Pulling out first function.
twcurrie May 23, 2023
ec4eb09
Adding initial tests.
twcurrie May 23, 2023
d42b534
Pulling out handling of modifications, providing test.
twcurrie May 24, 2023
7ec6cde
Updating comments.
twcurrie May 24, 2023
b52a1ea
Constructing lengthy test to trigger exception.
twcurrie May 24, 2023
7dc0ec0
Adding a container for parameters.
twcurrie May 24, 2023
34e47c2
Adding boolean flag to indicate broker id requirement.
twcurrie May 24, 2023
829d555
Extending parameter set
twcurrie May 24, 2023
6df3cbe
Shifting away from class=based ExecutionContext
twcurrie May 24, 2023
389dfe2
Adding some tests around ParameterSet
twcurrie May 24, 2023
18d9c23
Pushing default to parameter definition, rather than __init__
twcurrie May 24, 2023
4df2177
Pivoting endpoint to not hold parameters.
twcurrie May 24, 2023
74f7c1d
Removing difference between parameter name and request flag.
twcurrie May 24, 2023
9a6ec4d
Removing parameter checks, relying on default rather than __init__ de…
twcurrie May 24, 2023
d294c4f
Correcting argument check.
twcurrie May 24, 2023
c604ced
Modifying test.
twcurrie May 24, 2023
8e1e68b
Reorder class methods.
twcurrie May 24, 2023
7e64ce3
Updating comments.
twcurrie May 24, 2023
f443566
Reversing broker id switch
twcurrie May 25, 2023
630aa14
Providing argument name alignment.
twcurrie May 25, 2023
4b6efcc
Merge remote-tracking branch 'origin/migrate_to_kafka_2_5' into argparse
twcurrie May 25, 2023
d6c3294
Removing redundant class variable.
twcurrie May 25, 2023
15ecbeb
Shifting more logic from endpoint to parameter set.
twcurrie May 25, 2023
558667c
Adding back the deprecated function, maybe a bit much.
twcurrie May 25, 2023
bcd8d46
Updating function names.
twcurrie May 25, 2023
9c6b512
UPdating docs.
twcurrie May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ fileStore/failedBrokers.txt
cruise-control/src/main/resources/webroot/.openapi-generator-ignore
cruise-control/src/main/resources/webroot/.openapi-generator/
cruise-control/src/main/resources/webroot/README.md
*.pyc
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ class SubstatesParameter(AbstractSetOfChoicesParameter):
'args': ('--substate', '--substates'),
# nargs='+' allows for multiple of the set to be specified
'kwargs': dict(help=description, metavar='SUBSTATE', choices=lowercase_set_of_choices, nargs='+',
type=str.lower)
type=str.lower, default='executor')
Copy link
Author

@twcurrie twcurrie May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this is set in the __init__ method of the StateEndpoint definition, but I think it's more appropriate to handle the definition a layer higher which surface it to the user of the cli.

}
182 changes: 28 additions & 154 deletions cruise-control-client/cruisecontrolclient/client/Endpoint.py

Large diffs are not rendered by default.

220 changes: 85 additions & 135 deletions cruise-control-client/cruisecontrolclient/client/ExecutionContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,143 +5,93 @@
import cruisecontrolclient.client.Endpoint as Endpoint

# To be able to make more precise type hints
from typing import Dict, Tuple, Type
from typing import Dict, List, Set, Type

# Define the in-order available endpoints for programmatically building the argparse CLI
AVAILABLE_ENDPOINTS: List[Type[Endpoint.AbstractEndpoint]] = [
Endpoint.AddBrokerEndpoint,
Endpoint.AdminEndpoint,
Endpoint.BootstrapEndpoint,
Endpoint.DemoteBrokerEndpoint,
Endpoint.FixOfflineReplicasEndpoint,
Endpoint.KafkaClusterStateEndpoint,
Endpoint.LoadEndpoint,
Endpoint.PartitionLoadEndpoint,
Endpoint.PauseSamplingEndpoint,
Endpoint.ProposalsEndpoint,
Endpoint.RebalanceEndpoint,
Endpoint.RemoveBrokerEndpoint,
Endpoint.ResumeSamplingEndpoint,
Endpoint.RightsizeEndpoint,
Endpoint.ReviewBoardEndpoint,
Endpoint.ReviewEndpoint,
Endpoint.StateEndpoint,
Endpoint.StopProposalExecutionEndpoint,
Endpoint.TrainEndpoint,
Endpoint.TopicConfigurationEndpoint,
Endpoint.UserTasksEndpoint
]

class ExecutionContext:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a requirement, but there isn't a major benefit in storing these properties within a class. It merely adds an additional layer between the values and things accessing them.

# Define the in-order available endpoints for programmatically building the argparse CLI
available_endpoints: Tuple[Type[Endpoint.AbstractEndpoint]] = (
Endpoint.AddBrokerEndpoint,
Endpoint.AdminEndpoint,
Endpoint.BootstrapEndpoint,
Endpoint.DemoteBrokerEndpoint,
Endpoint.FixOfflineReplicasEndpoint,
Endpoint.KafkaClusterStateEndpoint,
Endpoint.LoadEndpoint,
Endpoint.PartitionLoadEndpoint,
Endpoint.PauseSamplingEndpoint,
Endpoint.ProposalsEndpoint,
Endpoint.RebalanceEndpoint,
Endpoint.RemoveBrokerEndpoint,
Endpoint.ResumeSamplingEndpoint,
Endpoint.RightsizeEndpoint,
Endpoint.ReviewBoardEndpoint,
Endpoint.ReviewEndpoint,
Endpoint.StateEndpoint,
Endpoint.StopProposalExecutionEndpoint,
Endpoint.TrainEndpoint,
Endpoint.TopicConfigurationEndpoint,
Endpoint.UserTasksEndpoint
)
# Define the argparse flags that cannot be used for endpoint parameters
NON_PARAMETER_FLAGS: Set[str] = {
'add_parameter',
'endpoint_subparser',
'remove_parameter',
'socket_address'
}

# A mapping from the names of the subparsers to the Endpoint object they should instantiate.
#
# Like:
# {"add_broker": AddBrokerEndpoint,
# "add_brokers": AddBrokerEndpoint,
# "admin": AdminEndpoint,
# "bootstrap": BootstrapEndpoint,
# "demote_broker": DemoteBrokerEndpoint,
# "demote_brokers": DemoteBrokerEndpoint,
# "fix_offline_replicas": FixOfflineReplicasEndpoint,
# "kafka_cluster_state": KafkaClusterStateEndpoint,
# "load": LoadEndpoint,
# "partition_load": PartitionLoadEndpoint,
# "pause_sampling": PauseSamplingEndpoint,
# "proposals": ProposalsEndpoint,
# "rebalance": RebalanceEndpoint,
# "remove_broker": RemoveBrokerEndpoint,
# "remove_brokers": RemoveBrokerEndpoint,
# "resume_sampling": ResumeSamplingEndpoint,
# "rightsize": RightsizeEndpoint,
# "state": StateEndpoint,
# "stop_proposal_execution": StopProposalExecutionEndpoint,
# "stop": StopProposalExecutionEndpoint,
# "train": TrainEndpoint,
# "user_task": UserTasksEndpoint,
# "user_tasks": UserTasksEndpoint}
dest_to_Endpoint: Dict[str, Type[Endpoint.AbstractEndpoint]] = {}
available_parameter_set = set()
for endpoint in available_endpoints:
dest_to_Endpoint[endpoint.name] = endpoint
if 'aliases' in endpoint.argparse_properties['kwargs']:
endpoint_aliases = endpoint.argparse_properties['kwargs']['aliases']
for alias in endpoint_aliases:
dest_to_Endpoint[alias] = endpoint
for parameter in endpoint.available_Parameters:
available_parameter_set.add(parameter)
# A mapping from the names of the subparsers to the Endpoint object they should instantiate.
#
# Like:
# {"add_broker": AddBrokerEndpoint,
# "add_brokers": AddBrokerEndpoint,
# "admin": AdminEndpoint,
# "bootstrap": BootstrapEndpoint,
# "demote_broker": DemoteBrokerEndpoint,
# ...
# "user_task": UserTasksEndpoint,
# "user_tasks": UserTasksEndpoint}
NAME_TO_ENDPOINT: Dict[str, Type[Endpoint.AbstractEndpoint]] = {}
AVAILABLE_PARAMETER_SET = set()
for endpoint in AVAILABLE_ENDPOINTS:
NAME_TO_ENDPOINT[endpoint.name] = endpoint
if 'aliases' in endpoint.argparse_properties['kwargs']:
endpoint_aliases = endpoint.argparse_properties['kwargs']['aliases']
for alias in endpoint_aliases:
NAME_TO_ENDPOINT[alias] = endpoint
for parameter in endpoint.available_parameters:
AVAILABLE_PARAMETER_SET.add(parameter)

# A mapping from the names of the argparse parameters to their cruise-control parameter name
#
# Like:
# {'allow_capacity_estimation': 'allow_capacity_estimation',
# 'brokers': 'brokerid',
# 'clearmetrics': 'clearmetrics',
# 'client_id': 'client_ids',
# 'concurrency': 'concurrent_partition_movements_per_broker',
# 'data_from': 'data_from',
# 'disable_self_healing_for': 'disable_self_healing_for',
# 'dry_run': 'dryRun',
# 'enable_self_healing_for': 'enable_self_healing_for',
# 'end_timestamp': 'end',
# 'endpoint': 'endpoints',
# 'exclude_follower_demotion': 'exclude_follower_demotion',
# 'exclude_recently_demoted_brokers': 'exclude_recently_demoted_brokers',
# 'exclude_recently_removed_brokers': 'exclude_recently_removed_brokers',
# 'excluded_topics': 'excluded_topics',
# 'goals': 'goals',
# 'ignore_proposal_cache': 'ignore_proposal_cache',
# 'leader_concurrency': 'concurrent_leader_movements',
# 'max_load': 'max_load',
# 'min_valid_partition_ratio': 'min_valid_partition_ratio',
# 'number_of_entries_to_show': 'entries',
# 'partition': 'partition',
# 'reason': 'reason',
# 'resource': 'resource',
# 'skip_hard_goal_check': 'skip_hard_goal_check',
# 'skip_urp_demotion': 'skip_urp_demotion',
# 'start_timestamp': 'start',
# 'strategies': 'replica_movement_strategies',
# 'substate': 'substates',
# 'super_verbose': 'super_verbose',
# 'text': 'json',
# 'throttle_removed_broker': 'throttle_removed_broker',
# 'timestamp': 'time',
# 'topic': 'topic',
# 'types': 'types',
# 'use_ready_default_goals': 'use_ready_default_goals',
# 'user_task_ids': 'user_task_ids',
# 'verbose': 'verbose'}
flag_to_parameter_name: Dict[str, str] = {}
for parameter in available_parameter_set:
argparse_parameter_name = None
# argparse names this parameter's flag after the first string in 'args'
for possible_argparse_name in parameter.argparse_properties['args']:
# argparse chooses flag names only from the --flags
if not possible_argparse_name.startswith('-'):
argparse_parameter_name = possible_argparse_name.replace('-', '_')
elif not possible_argparse_name.startswith('--'):
continue
else:
argparse_parameter_name = possible_argparse_name.lstrip('-').replace('-', '_')
break
if argparse_parameter_name and argparse_parameter_name in flag_to_parameter_name:
raise ValueError(f"Colliding parameter flags: {argparse_parameter_name}")
# A mapping from the names of the argparse parameters to their cruise-control parameter name
#
# Like:
# {'allow_capacity_estimation': 'allow_capacity_estimation',
# 'brokers': 'brokerid',
# 'clearmetrics': 'clearmetrics',
# 'client_id': 'client_ids',
# 'concurrency': 'concurrent_partition_movements_per_broker',
# 'data_from': 'data_from',
# 'disable_self_healing_for': 'disable_self_healing_for',
# 'dry_run': 'dryRun',
# ...
# 'types': 'types',
# 'use_ready_default_goals': 'use_ready_default_goals',
# 'user_task_ids': 'user_task_ids',
# 'verbose': 'verbose'}
FLAG_TO_PARAMETER_NAME: Dict[str, str] = {}
for parameter in AVAILABLE_PARAMETER_SET:
argparse_parameter_name = None
# argparse names this parameter's flag after the first string in 'args'
for possible_argparse_name in parameter.argparse_properties['args']:
# argparse chooses flag names only from the --flags
if not possible_argparse_name.startswith('-'):
argparse_parameter_name = possible_argparse_name.replace('-', '_')
elif not possible_argparse_name.startswith('--'):
continue
else:
flag_to_parameter_name[argparse_parameter_name] = parameter.name

def __init__(self):
# Define the argparse flags that cannot be used for endpoint parameters
#
# This helps the multi-stage argument parsing not to conflict with itself
# during the different stages.
#
# To this will probably be added things like:
# 'add_parameter'
# 'endpoint_subparser'
# 'socket_address'
# 'remove_parameter'
#
# If you've added flags to the argument_parser, they should be added to
# this set.
self.non_parameter_flags = set()
argparse_parameter_name = possible_argparse_name.lstrip('-').replace('-', '_')
break
if argparse_parameter_name and argparse_parameter_name in FLAG_TO_PARAMETER_NAME:
raise ValueError(f"Colliding parameter flags: {argparse_parameter_name}")
else:
FLAG_TO_PARAMETER_NAME[argparse_parameter_name] = parameter.name
79 changes: 79 additions & 0 deletions cruise-control-client/cruisecontrolclient/client/ParameterSet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from collections import MutableSet
from itertools import chain
from typing import Callable, Dict, Iterator, Optional, Tuple, Type, Union

from cruisecontrolclient.client.CCParameter import AbstractParameter

primitive = Union[bool, float, int, str]


class ParameterSet(MutableSet):

def __init__(self, allowed_parameters: Optional[Tuple[Type[AbstractParameter]]] = None):
self._parameter_name_to_allowed_parameters: Dict[
str, Callable[[Union[str, int, bool]], AbstractParameter]] = \
{ap.name: ap for ap in allowed_parameters} if allowed_parameters else {}
self.adhoc_parameters: Dict[str, primitive] = {}
self.instantiated_parameters: Dict[str, AbstractParameter] = {}

def __len__(self) -> int:
return len(self.adhoc_parameters) + len(self.instantiated_parameters)

def __contains__(self, parameter: Union[AbstractParameter, str]) -> bool:
try:
return parameter.name in self.instantiated_parameters
except AttributeError:
return parameter in self.adhoc_parameters

def __iter__(self) -> Iterator[Union[AbstractParameter, Tuple[str, primitive]]]:
yield from chain(self.instantiated_parameters.values(), list(self.adhoc_parameters.items()))

def is_allowed(self, parameter: Union[AbstractParameter, str]):
try:
return parameter.name in self._parameter_name_to_allowed_parameters
except AttributeError:
return parameter in self._parameter_name_to_allowed_parameters

def add(self, parameter: Optional[Union[AbstractParameter, Tuple[str, primitive]]]) -> None:
try:
key, value = parameter
except ValueError:
raise ValueError("Must provide two item tuple with key, value")
except TypeError:
if not self.is_allowed(parameter):
raise ValueError(f"Parameter {parameter.name} not allowed within set.")
if parameter.value is None:
raise ValueError("Parameter has no value")
self.instantiated_parameters[parameter.name] = parameter
return

if value is None:
raise ValueError("Parameter has no value")

if self.is_allowed(parameter):
self.instantiated_parameters[key] = self._parameter_name_to_allowed_parameters[key](value)
self.adhoc_parameters[key] = value

def discard(self, parameter: Union[Type[AbstractParameter], str]) -> None:
try:
del self.instantiated_parameters[parameter.name]
except AttributeError:
if parameter in self.adhoc_parameters:
del self.adhoc_parameters[parameter]
except KeyError:
pass

def compose(self) -> Dict[str, primitive]:
return {**self.adhoc_parameters,
**{parameter.name: parameter.value for parameter in self.instantiated_parameters.values()}}

def get(self, parameter: Union[Type[AbstractParameter], str]) -> Optional[Union[AbstractParameter, primitive]]:
try:
lookup = parameter.name
except AttributeError:
lookup = parameter

try:
return self.instantiated_parameters[lookup]
except KeyError:
return self.adhoc_parameters[lookup]
2 changes: 0 additions & 2 deletions cruise-control-client/cruisecontrolclient/client/Query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ def generate_url_from_cc_socket_address(cc_socket_address: str, endpoint: Abstra
"""
Given a cruise-control hostname[:port] and an Endpoint, return the correct URL
for this cruise-control operation.

Note that this URL _includes_ parameters.

:param cc_socket_address: like hostname[:port], ip-address[:port]
:param endpoint:
:return: URL, the correct URL to perform the Endpoint's operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
# To be able to more-easily retrieve the base url of cruise-control
from cruisecontrolclient.client.Query import generate_base_url_from_cc_socket_address

# To be able to separate an Endpoint class instance from the parameters it executes
from cruisecontrolclient.client.ParameterSet import ParameterSet

# To be able to make HTTP calls
import requests

Expand Down Expand Up @@ -124,6 +127,7 @@ def json_or_text_guesser():
def retrieve_response_from_Endpoint(self,
cc_socket_address: str,
endpoint: AbstractEndpoint,
parameters: ParameterSet,
**kwargs):
"""
Returns a final requests.Response object from cruise-control
Expand All @@ -134,11 +138,12 @@ def retrieve_response_from_Endpoint(self,
:return: requests.Response
:param cc_socket_address: like someCruiseControlAddress:9090
:param endpoint: an instance of an Endpoint
:param parameters: set of parameters to pass to endpoint
:return:
"""
return self.retrieve_response(
method=endpoint.http_method,
url=generate_base_url_from_cc_socket_address(cc_socket_address, endpoint),
params=endpoint.get_composed_params(),
params=parameters.compose(),
**kwargs
)
Loading