diff --git a/krkn/scenario_plugins/abstract_scenario_plugin.py b/krkn/scenario_plugins/abstract_scenario_plugin.py index 060d9ec3..378f001c 100644 --- a/krkn/scenario_plugins/abstract_scenario_plugin.py +++ b/krkn/scenario_plugins/abstract_scenario_plugin.py @@ -56,6 +56,7 @@ def run_scenarios( scenario_telemetries: list[ScenarioTelemetry] = [] failed_scenarios = [] wait_duration = krkn_config["tunings"]["wait_duration"] + events_backup = krkn_config["telemetry"]["events_backup"] for scenario_config in scenarios_list: if isinstance(scenario_config, list): logging.error( @@ -99,13 +100,15 @@ def run_scenarios( int(scenario_telemetry.start_timestamp), int(scenario_telemetry.end_timestamp), ) - utils.populate_cluster_events( - scenario_telemetry, - parsed_scenario_config, - telemetry.get_lib_kubernetes(), - int(scenario_telemetry.start_timestamp), - int(scenario_telemetry.end_timestamp), - ) + + if events_backup: + utils.populate_cluster_events( + scenario_telemetry, + parsed_scenario_config, + telemetry.get_lib_kubernetes(), + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp), + ) if scenario_telemetry.exit_status != 0: failed_scenarios.append(scenario_config) diff --git a/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py b/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py index a4c14183..f9552f5e 100644 --- a/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py @@ -4,33 +4,36 @@ import krkn.invoke.command as runcommand import krkn.scenario_plugins.node_actions.common_node_functions as nodeaction from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus # krkn_lib class abstract_node_scenarios: kubecli: KrknKubernetes + affected_nodes_status: AffectedNodeStatus - def __init__(self, kubecli: KrknKubernetes): + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): self.kubecli = kubecli + self.affected_nodes_status = affected_nodes_status # Node scenario to start the node - def node_start_scenario(self, instance_kill_count, node, timeout, affected_node): + def node_start_scenario(self, instance_kill_count, node, timeout): pass # Node scenario to stop the node - def node_stop_scenario(self, instance_kill_count, node, timeout, affected_node): + def node_stop_scenario(self, instance_kill_count, node, timeout): pass # Node scenario to stop and then start the node - def node_stop_start_scenario(self, instance_kill_count, node, timeout, duration, affected_node): + def node_stop_start_scenario(self, instance_kill_count, node, timeout, duration): logging.info("Starting node_stop_start_scenario injection") self.node_stop_scenario(instance_kill_count, node, timeout) logging.info("Waiting for %s seconds before starting the node" % (duration)) time.sleep(duration) self.node_start_scenario(instance_kill_count, node, timeout) + self.affected_nodes_status.merge_affected_nodes() logging.info("node_stop_start_scenario has been successfully injected!") - def helper_node_stop_start_scenario(self, instance_kill_count, node, timeout, affected_node): + def helper_node_stop_start_scenario(self, instance_kill_count, node, timeout): logging.info("Starting helper_node_stop_start_scenario injection") self.helper_node_stop_scenario(instance_kill_count, node, timeout) self.helper_node_start_scenario(instance_kill_count, node, timeout) @@ -51,16 +54,17 @@ def node_disk_detach_attach_scenario(self, instance_kill_count, node, timeout, d logging.error("node_disk_detach_attach_scenario failed!") # Node scenario to terminate the node - def node_termination_scenario(self, instance_kill_count, node, timeout, affected_node): + def node_termination_scenario(self, instance_kill_count, node, timeout): pass # Node scenario to reboot the node - def node_reboot_scenario(self, instance_kill_count, node, timeout, affected_node): + def node_reboot_scenario(self, instance_kill_count, node, timeout): pass # Node scenario to stop the kubelet - def stop_kubelet_scenario(self, instance_kill_count, node, timeout, affected_node): + def stop_kubelet_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting stop_kubelet_scenario injection") logging.info("Stopping the kubelet of the node %s" % (node)) @@ -78,17 +82,20 @@ def stop_kubelet_scenario(self, instance_kill_count, node, timeout, affected_nod ) logging.error("stop_kubelet_scenario injection failed!") raise e + self.add_affected_node(affected_node) # Node scenario to stop and start the kubelet - def stop_start_kubelet_scenario(self, instance_kill_count, node, timeout, affected_node): + def stop_start_kubelet_scenario(self, instance_kill_count, node, timeout): logging.info("Starting stop_start_kubelet_scenario injection") - self.stop_kubelet_scenario(instance_kill_count, node, timeout, affected_node) - self.node_reboot_scenario(instance_kill_count, node, timeout, affected_node) + self.stop_kubelet_scenario(instance_kill_count, node, timeout) + self.node_reboot_scenario(instance_kill_count, node, timeout) + self.affected_nodes_status.merge_affected_nodes() logging.info("stop_start_kubelet_scenario has been successfully injected!") # Node scenario to restart the kubelet - def restart_kubelet_scenario(self, instance_kill_count, node, timeout, affected_node): + def restart_kubelet_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting restart_kubelet_scenario injection") logging.info("Restarting the kubelet of the node %s" % (node)) @@ -98,7 +105,7 @@ def restart_kubelet_scenario(self, instance_kill_count, node, timeout, affected_ + " -- chroot /host systemctl restart kubelet &" ) nodeaction.wait_for_not_ready_status(node, timeout, self.kubecli, affected_node) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli,affected_node) logging.info("The kubelet of the node %s has been restarted" % (node)) logging.info("restart_kubelet_scenario has been successfuly injected!") except Exception as e: @@ -108,9 +115,10 @@ def restart_kubelet_scenario(self, instance_kill_count, node, timeout, affected_ ) logging.error("restart_kubelet_scenario injection failed!") raise e + self.add_affected_node(affected_node) # Node scenario to crash the node - def node_crash_scenario(self, instance_kill_count, node, timeout, affected_node): + def node_crash_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): try: logging.info("Starting node_crash_scenario injection") diff --git a/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py b/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py index b9ce0f49..fa47241c 100644 --- a/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py @@ -18,7 +18,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class Alibaba: def __init__(self): @@ -161,8 +161,9 @@ def get_vm_status(self, instance_id): return None # Wait until the node instance is running - def wait_until_running(self, instance_id, timeout): + def wait_until_running(self, instance_id, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(instance_id) while status != "Running": status = self.get_vm_status(instance_id) @@ -174,11 +175,15 @@ def wait_until_running(self, instance_id, timeout): if time_counter >= timeout: logging.info("ECS %s is still not ready in allotted time" % instance_id) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True # Wait until the node instance is stopped - def wait_until_stopped(self, instance_id, timeout): + def wait_until_stopped(self, instance_id, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(instance_id) while status != "Stopped": status = self.get_vm_status(instance_id) @@ -192,10 +197,14 @@ def wait_until_stopped(self, instance_id, timeout): "Vm %s is still not stopped in allotted time" % instance_id ) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True # Wait until the node instance is terminated - def wait_until_released(self, instance_id, timeout): + def wait_until_released(self, instance_id, timeout, affected_node): + start_time = time.time() statuses = self.get_vm_status(instance_id) time_counter = 0 while statuses and statuses != "Released": @@ -210,17 +219,23 @@ def wait_until_released(self, instance_id, timeout): return False logging.info("ECS %s is released" % instance_id) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) return True # krkn_lib class alibaba_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.alibaba = Alibaba() + # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") vm_id = self.alibaba.get_instance_id(node) @@ -228,8 +243,8 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with instance ID: %s " % (node, vm_id) ) self.alibaba.start_instances(vm_id) - self.alibaba.wait_until_running(vm_id, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.alibaba.wait_until_running(vm_id, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with instance ID: %s is in running state" % node) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -239,10 +254,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_start_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") vm_id = self.alibaba.get_instance_id(node) @@ -250,9 +267,9 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with instance ID: %s " % (node, vm_id) ) self.alibaba.stop_instances(vm_id) - self.alibaba.wait_until_stopped(vm_id, timeout) + self.alibaba.wait_until_stopped(vm_id, timeout, affected_node) logging.info("Node with instance ID: %s is in stopped state" % vm_id) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -260,23 +277,25 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_stop_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Might need to stop and then release the instance # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info( "Starting node_termination_scenario injection by first stopping instance" ) vm_id = self.alibaba.get_instance_id(node) self.alibaba.stop_instances(vm_id) - self.alibaba.wait_until_stopped(vm_id, timeout) + self.alibaba.wait_until_stopped(vm_id, timeout, affected_node) logging.info( "Releasing the node %s with instance ID: %s " % (node, vm_id) ) self.alibaba.release_instance(vm_id) - self.alibaba.wait_until_released(vm_id, timeout) + self.alibaba.wait_until_released(vm_id, timeout, affected_node) logging.info("Node with instance ID: %s has been released" % node) logging.info( "node_termination_scenario has been successfully injected!" @@ -288,17 +307,19 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_termination_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") instance_id = self.alibaba.get_instance_id(node) logging.info("Rebooting the node with instance ID: %s " % (instance_id)) self.alibaba.reboot_instances(instance_id) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s has been rebooted" % (instance_id) ) @@ -310,3 +331,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_reboot_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/aws_node_scenarios.py b/krkn/scenario_plugins/node_actions/aws_node_scenarios.py index f4784506..205869ba 100644 --- a/krkn/scenario_plugins/node_actions/aws_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/aws_node_scenarios.py @@ -7,7 +7,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class AWS: def __init__(self): @@ -77,9 +77,13 @@ def reboot_instances(self, instance_id): # until a successful state is reached. An error is returned after 40 failed checks # Setting timeout for consistency with other cloud functions # Wait until the node instance is running - def wait_until_running(self, instance_id, timeout=600): + def wait_until_running(self, instance_id, timeout=600, affected_node=None): try: + start_time = time.time() self.boto_instance.wait_until_running(InstanceIds=[instance_id]) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True except Exception as e: logging.error( @@ -89,9 +93,13 @@ def wait_until_running(self, instance_id, timeout=600): return False # Wait until the node instance is stopped - def wait_until_stopped(self, instance_id, timeout=600): + def wait_until_stopped(self, instance_id, timeout=600, affected_node= None): try: + start_time = time.time() self.boto_instance.wait_until_stopped(InstanceIds=[instance_id]) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) return True except Exception as e: logging.error( @@ -101,9 +109,13 @@ def wait_until_stopped(self, instance_id, timeout=600): return False # Wait until the node instance is terminated - def wait_until_terminated(self, instance_id, timeout=600): + def wait_until_terminated(self, instance_id, timeout=600, affected_node= None): try: + start_time = time.time() self.boto_instance.wait_until_terminated(InstanceIds=[instance_id]) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) return True except Exception as e: logging.error( @@ -249,13 +261,14 @@ def get_volume_state(self, volume_id: str): # krkn_lib class aws_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.aws = AWS() # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") instance_id = self.aws.get_instance_id(node) @@ -263,8 +276,8 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with instance ID: %s " % (node, instance_id) ) self.aws.start_instances(instance_id) - self.aws.wait_until_running(instance_id) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.aws.wait_until_running(instance_id, affected_node=affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s is in running state" % (instance_id) ) @@ -277,10 +290,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") instance_id = self.aws.get_instance_id(node) @@ -288,11 +303,11 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with instance ID: %s " % (node, instance_id) ) self.aws.stop_instances(instance_id) - self.aws.wait_until_stopped(instance_id) + self.aws.wait_until_stopped(instance_id, affected_node=affected_node) logging.info( "Node with instance ID: %s is in stopped state" % (instance_id) ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node=affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -301,10 +316,12 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_termination_scenario injection") instance_id = self.aws.get_instance_id(node) @@ -313,7 +330,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): % (node, instance_id) ) self.aws.terminate_instances(instance_id) - self.aws.wait_until_terminated(instance_id) + self.aws.wait_until_terminated(instance_id, affected_node=affected_node) for _ in range(timeout): if node not in self.kubecli.list_nodes(): break @@ -332,10 +349,12 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): logging.error("node_termination_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection" + str(node)) instance_id = self.aws.get_instance_id(node) @@ -343,8 +362,8 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with instance ID: %s " % (node, instance_id) ) self.aws.reboot_instances(instance_id) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s has been rebooted" % (instance_id) ) @@ -357,6 +376,7 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Get volume attachment info def get_disk_attachment_info(self, instance_kill_count, node): diff --git a/krkn/scenario_plugins/node_actions/az_node_scenarios.py b/krkn/scenario_plugins/node_actions/az_node_scenarios.py index 6cad8c12..156a4bb4 100644 --- a/krkn/scenario_plugins/node_actions/az_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/az_node_scenarios.py @@ -8,7 +8,7 @@ from azure.mgmt.compute import ComputeManagementClient from azure.identity import DefaultAzureCredential from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class Azure: def __init__(self): @@ -18,8 +18,11 @@ def __init__(self): logging.info("credential " + str(credentials)) # az_account = runcommand.invoke("az account list -o yaml") # az_account_yaml = yaml.safe_load(az_account, Loader=yaml.FullLoader) + logger = logging.getLogger("azure") + logger.setLevel(logging.WARNING) subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID") - self.compute_client = ComputeManagementClient(credentials, subscription_id) + self.compute_client = ComputeManagementClient(credentials, subscription_id,logging=logger) + # Get the instance ID of the node def get_instance_id(self, node_name): @@ -90,8 +93,9 @@ def get_vm_status(self, resource_group, vm_name): return status # Wait until the node instance is running - def wait_until_running(self, resource_group, vm_name, timeout): + def wait_until_running(self, resource_group, vm_name, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(resource_group, vm_name) while status and status.code != "PowerState/running": status = self.get_vm_status(resource_group, vm_name) @@ -101,11 +105,15 @@ def wait_until_running(self, resource_group, vm_name, timeout): if time_counter >= timeout: logging.info("Vm %s is still not ready in allotted time" % vm_name) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True # Wait until the node instance is stopped - def wait_until_stopped(self, resource_group, vm_name, timeout): + def wait_until_stopped(self, resource_group, vm_name, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(resource_group, vm_name) while status and status.code != "PowerState/stopped": status = self.get_vm_status(resource_group, vm_name) @@ -115,10 +123,14 @@ def wait_until_stopped(self, resource_group, vm_name, timeout): if time_counter >= timeout: logging.info("Vm %s is still not stopped in allotted time" % vm_name) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) return True # Wait until the node instance is terminated - def wait_until_terminated(self, resource_group, vm_name, timeout): + def wait_until_terminated(self, resource_group, vm_name, timeout, affected_node): + start_time = time.time() statuses = self.compute_client.virtual_machines.instance_view( resource_group, vm_name ).statuses[0] @@ -137,29 +149,35 @@ def wait_until_terminated(self, resource_group, vm_name, timeout): return False except Exception: logging.info("Vm %s is terminated" % vm_name) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) return True # krkn_lib class azure_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) logging.info("init in azure") self.azure = Azure() + # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") vm_name, resource_group = self.azure.get_instance_id(node) + logging.info( "Starting the node %s with instance ID: %s " % (vm_name, resource_group) ) self.azure.start_instances(resource_group, vm_name) - self.azure.wait_until_running(resource_group, vm_name, timeout) - nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli) + self.azure.wait_until_running(resource_group, vm_name, timeout, affected_node=affected_node) + nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli, affected_node) logging.info("Node with instance ID: %s is in running state" % node) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -170,10 +188,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") vm_name, resource_group = self.azure.get_instance_id(node) @@ -182,9 +202,9 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): % (vm_name, resource_group) ) self.azure.stop_instances(resource_group, vm_name) - self.azure.wait_until_stopped(resource_group, vm_name, timeout) + self.azure.wait_until_stopped(resource_group, vm_name, timeout, affected_node=affected_node) logging.info("Node with instance ID: %s is in stopped state" % vm_name) - nodeaction.wait_for_unknown_status(vm_name, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(vm_name, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -193,19 +213,22 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_termination_scenario injection") + affected_node = AffectedNode(node) vm_name, resource_group = self.azure.get_instance_id(node) logging.info( "Terminating the node %s with instance ID: %s " % (vm_name, resource_group) ) self.azure.terminate_instances(resource_group, vm_name) - self.azure.wait_until_terminated(resource_group, vm_name, timeout) + self.azure.wait_until_terminated(resource_group, vm_name, timeout, affected_node) for _ in range(timeout): if vm_name not in self.kubecli.list_nodes(): break @@ -224,10 +247,13 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): logging.error("node_termination_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) + # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") vm_name, resource_group = self.azure.get_instance_id(node) @@ -235,9 +261,11 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with instance ID: %s " % (vm_name, resource_group) ) + self.azure.reboot_instances(resource_group, vm_name) - nodeaction.wait_for_unknown_status(vm_name, timeout, self.kubecli) - nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli) + + nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli, affected_node) + logging.info("Node with instance ID: %s has been rebooted" % (vm_name)) logging.info("node_reboot_scenario has been successfully injected!") except Exception as e: @@ -248,3 +276,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/bm_node_scenarios.py b/krkn/scenario_plugins/node_actions/bm_node_scenarios.py index 27f7d35b..4a9a4eb1 100644 --- a/krkn/scenario_plugins/node_actions/bm_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/bm_node_scenarios.py @@ -9,7 +9,7 @@ import time import traceback from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class BM: def __init__(self, bm_info, user, passwd): @@ -127,8 +127,8 @@ def wait_until_stopped(self, bmc_addr, node_name): # krkn_lib class bm_node_scenarios(abstract_node_scenarios): - def __init__(self, bm_info, user, passwd, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, bm_info, user, passwd, kubecli: KrknKubernetes,affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.bm = BM(bm_info, user, passwd) # Node scenario to start the node @@ -159,6 +159,7 @@ def node_start_scenario(self, instance_kill_count, node, timeout): # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") bmc_addr = self.bm.get_bmc_addr(node) @@ -166,11 +167,11 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with bmc address: %s " % (node, bmc_addr) ) self.bm.stop_instances(bmc_addr, node) - self.bm.wait_until_stopped(bmc_addr, node) + self.bm.wait_until_stopped(bmc_addr, node, affected_node) logging.info( "Node with bmc address: %s is in stopped state" % (bmc_addr) ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -179,6 +180,7 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_stop_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): @@ -187,6 +189,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") bmc_addr = self.bm.get_bmc_addr(node) @@ -195,8 +198,8 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with bmc address: %s " % (node, bmc_addr) ) self.bm.reboot_instances(bmc_addr, node) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with bmc address: %s has been rebooted" % (bmc_addr)) logging.info("node_reboot_scenario has been successfuly injected!") except Exception as e: @@ -208,3 +211,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): traceback.print_exc() logging.error("node_reboot_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/common_node_functions.py b/krkn/scenario_plugins/node_actions/common_node_functions.py index da1c6ea9..43424f41 100644 --- a/krkn/scenario_plugins/node_actions/common_node_functions.py +++ b/krkn/scenario_plugins/node_actions/common_node_functions.py @@ -6,6 +6,8 @@ from krkn_lib.models.k8s import AffectedNode import krkn.invoke.command as runcommand from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus +from krkn_lib.models.k8s import AffectedNode node_general = False @@ -42,27 +44,25 @@ def get_node(label_selector, instance_kill_count, kubecli: KrknKubernetes): nodes.remove(node_to_add) return nodes_to_return - # krkn_lib # Wait until the node status becomes Ready -def wait_for_ready_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode ): - ready_time = kubecli.watch_node_status(node, "True", timeout) - affected_node.set_not_ready_time(ready_time) +def wait_for_ready_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode = None): + affected_node = kubecli.watch_node_status(node, "True", timeout, affected_node) + return affected_node - # krkn_lib # Wait until the node status becomes Not Ready -def wait_for_not_ready_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode): - not_ready_time = kubecli.watch_node_status(node, "False", timeout) - affected_node.set_not_ready_time(not_ready_time) +def wait_for_not_ready_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode = None): + affected_node = kubecli.watch_node_status(node, "False", timeout, affected_node) + return affected_node # krkn_lib # Wait until the node status becomes Unknown -def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode): - unknown_time = kubecli.watch_node_status(node, "Unknown", timeout) - affected_node.set_unknown_time(unknown_time) +def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode = None): + affected_node = kubecli.watch_node_status(node, "Unknown", timeout, affected_node) + return affected_node # Get the ip of the cluster node diff --git a/krkn/scenario_plugins/node_actions/docker_node_scenarios.py b/krkn/scenario_plugins/node_actions/docker_node_scenarios.py index a2cdf116..2e050b6e 100644 --- a/krkn/scenario_plugins/node_actions/docker_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/docker_node_scenarios.py @@ -5,7 +5,7 @@ import logging import docker from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class Docker: def __init__(self): @@ -38,13 +38,14 @@ def terminate_instances(self, node_name): class docker_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.docker = Docker() # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") container_id = self.docker.get_container_id(node) @@ -52,7 +53,7 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with container ID: %s " % (node, container_id) ) self.docker.start_instances(node) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with container ID: %s is in running state" % (container_id) ) @@ -64,10 +65,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_start_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") container_id = self.docker.get_container_id(node) @@ -78,7 +81,7 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.info( "Node with container ID: %s is in stopped state" % (container_id) ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -86,6 +89,7 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_stop_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): @@ -113,6 +117,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") container_id = self.docker.get_container_id(node) @@ -121,8 +126,8 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): % (node, container_id) ) self.docker.reboot_instances(node) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with container ID: %s has been rebooted" % (container_id) ) @@ -134,3 +139,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_reboot_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py b/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py index d982a5f8..ec39538d 100644 --- a/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py @@ -7,7 +7,7 @@ ) from google.cloud import compute_v1 from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class GCP: def __init__(self): @@ -173,10 +173,10 @@ def get_instance_status(self, instance_id, expected_status, timeout): "Failed to get status of instance %s. Encountered following " "exception: %s." % (instance_id, e) ) - raise RuntimeError() if instance_status == expected_status: + logging.info('status matches, end' + str(expected_status) + str(instance_status)) return True time.sleep(sleeper) i += sleeper @@ -191,28 +191,45 @@ def wait_until_suspended(self, instance_id, timeout): return self.get_instance_status(instance_id, "SUSPENDED", timeout) # Wait until the node instance is running - def wait_until_running(self, instance_id, timeout): - return self.get_instance_status(instance_id, "RUNNING", timeout) + def wait_until_running(self, instance_id, timeout, affected_node): + start_time = time.time() + instance_status = self.get_instance_status(instance_id, "RUNNING", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) + return instance_status # Wait until the node instance is stopped - def wait_until_stopped(self, instance_id, timeout): + def wait_until_stopped(self, instance_id, timeout, affected_node): # In GCP, the next state after STOPPING is TERMINATED - return self.get_instance_status(instance_id, "TERMINATED", timeout) + start_time = time.time() + instance_status = self.get_instance_status(instance_id, "TERMINATED", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) + return instance_status # Wait until the node instance is terminated - def wait_until_terminated(self, instance_id, timeout): - return self.get_instance_status(instance_id, "TERMINATED", timeout) + def wait_until_terminated(self, instance_id, timeout, affected_node): + start_time = time.time() + instance_status = self.get_instance_status(instance_id, "TERMINATED", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) + return instance_status # krkn_lib class gcp_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.gcp = GCP() + print("selfkeys" + str(vars(self))) # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") instance = self.gcp.get_node_instance(node) @@ -221,8 +238,8 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with instance ID: %s " % (node, instance_id) ) self.gcp.start_instances(instance_id) - self.gcp.wait_until_running(instance_id, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.gcp.wait_until_running(instance_id, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s is in running state" % instance_id ) @@ -235,10 +252,13 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + logging.info("started affected node" + str(affected_node.to_json())) + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") instance = self.gcp.get_node_instance(node) @@ -247,11 +267,11 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with instance ID: %s " % (node, instance_id) ) self.gcp.stop_instances(instance_id) - self.gcp.wait_until_stopped(instance_id, timeout) + self.gcp.wait_until_stopped(instance_id, timeout, affected_node=affected_node) logging.info( "Node with instance ID: %s is in stopped state" % instance_id ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -260,10 +280,13 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + logging.info("stopedd affected node" + str(affected_node.to_json())) + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_termination_scenario injection") instance = self.gcp.get_node_instance(node) @@ -273,7 +296,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): % (node, instance_id) ) self.gcp.terminate_instances(instance_id) - self.gcp.wait_until_terminated(instance_id, timeout) + self.gcp.wait_until_terminated(instance_id, timeout, affected_node=affected_node) for _ in range(timeout): if node not in self.kubecli.list_nodes(): break @@ -292,10 +315,12 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): logging.error("node_termination_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") instance = self.gcp.get_node_instance(node) @@ -304,8 +329,9 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with instance ID: %s " % (node, instance_id) ) self.gcp.reboot_instances(instance_id) - self.gcp.wait_until_running(instance_id, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + self.gcp.wait_until_running(instance_id, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s has been rebooted" % instance_id ) @@ -318,3 +344,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py b/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py index c0a7ac8b..151de42d 100644 --- a/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py @@ -3,7 +3,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNodeStatus class GENERAL: def __init__(self): @@ -12,8 +12,8 @@ def __init__(self): # krkn_lib class general_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.general = GENERAL() # Node scenario to start the node diff --git a/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py b/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py index c84ef841..739b48d7 100644 --- a/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py +++ b/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py @@ -6,7 +6,7 @@ import yaml from krkn_lib.k8s import KrknKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry -from krkn_lib.models.k8s import AffectedNode +from krkn_lib.models.k8s import AffectedNodeStatus from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift from krkn_lib.utils import get_yaml_item_value, log_exception @@ -61,28 +61,29 @@ def run( return 0 def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): + affected_nodes_status = AffectedNodeStatus() if ( "cloud_type" not in node_scenario.keys() or node_scenario["cloud_type"] == "generic" ): global node_general node_general = True - return general_node_scenarios(kubecli) + return general_node_scenarios(kubecli, affected_nodes_status) if node_scenario["cloud_type"].lower() == "aws": - return aws_node_scenarios(kubecli) + return aws_node_scenarios(kubecli, affected_nodes_status) elif node_scenario["cloud_type"].lower() == "gcp": - return gcp_node_scenarios(kubecli) + return gcp_node_scenarios(kubecli, affected_nodes_status) elif node_scenario["cloud_type"].lower() == "openstack": from krkn.scenario_plugins.node_actions.openstack_node_scenarios import ( openstack_node_scenarios, ) - return openstack_node_scenarios(kubecli) + return openstack_node_scenarios(kubecli, affected_nodes_status) elif ( node_scenario["cloud_type"].lower() == "azure" or node_scenario["cloud_type"] == "az" ): - return azure_node_scenarios(kubecli) + return azure_node_scenarios(kubecli, affected_nodes_status) elif ( node_scenario["cloud_type"].lower() == "alibaba" or node_scenario["cloud_type"] == "alicloud" @@ -91,7 +92,7 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): alibaba_node_scenarios, ) - return alibaba_node_scenarios(kubecli) + return alibaba_node_scenarios(kubecli, affected_nodes_status) elif node_scenario["cloud_type"].lower() == "bm": from krkn.scenario_plugins.node_actions.bm_node_scenarios import ( bm_node_scenarios, @@ -102,9 +103,10 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): node_scenario.get("bmc_user", None), node_scenario.get("bmc_password", None), kubecli, + affected_nodes_status ) elif node_scenario["cloud_type"].lower() == "docker": - return docker_node_scenarios(kubecli) + return docker_node_scenarios(kubecli, affected_nodes_status) else: logging.error( "Cloud type " @@ -140,18 +142,18 @@ def inject_node_scenario( nodes = common_node_functions.get_node( label_selector, instance_kill_count, kubecli ) - + # GCP api doesn't support multiprocessing calls, will only actually run 1 - if parallel_nodes and node_scenario['cloud_type'].lower() != "gcp": + if parallel_nodes: self.multiprocess_nodes(nodes, node_scenario_object, action, node_scenario) else: for single_node in nodes: self.run_node(single_node, node_scenario_object, action, node_scenario) - scenario_telemetry. + affected_nodes_status = node_scenario_object.affected_nodes_status + scenario_telemetry.affected_nodes.extend(affected_nodes_status.affected_nodes) def multiprocess_nodes(self, nodes, node_scenario_object, action, node_scenario): try: - logging.info("parallely call to nodes") # pool object with number of element pool = ThreadPool(processes=len(nodes)) @@ -163,7 +165,6 @@ def multiprocess_nodes(self, nodes, node_scenario_object, action, node_scenario) def run_node(self, single_node, node_scenario_object, action, node_scenario): - logging.info("action" + str(action)) # Get the scenario specifics for running action nodes run_kill_count = get_yaml_item_value(node_scenario, "runs", 1) if action in ("node_stop_start_scenario", "node_disk_detach_attach_scenario"): @@ -175,7 +176,7 @@ def run_node(self, single_node, node_scenario_object, action, node_scenario): node_scenario, "ssh_private_key", "~/.ssh/id_rsa" ) generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario") - affected_node = AffectedNode() + if node_general and action not in generic_cloud_scenarios: logging.info( "Scenario: " @@ -185,42 +186,42 @@ def run_node(self, single_node, node_scenario_object, action, node_scenario): else: if action == "node_start_scenario": node_scenario_object.node_start_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "node_stop_scenario": node_scenario_object.node_stop_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "node_stop_start_scenario": node_scenario_object.node_stop_start_scenario( - run_kill_count, single_node, timeout, duration, affected_node + run_kill_count, single_node, timeout, duration ) elif action == "node_termination_scenario": node_scenario_object.node_termination_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "node_reboot_scenario": node_scenario_object.node_reboot_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "node_disk_detach_attach_scenario": node_scenario_object.node_disk_detach_attach_scenario( run_kill_count, single_node, timeout, duration) elif action == "stop_start_kubelet_scenario": node_scenario_object.stop_start_kubelet_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "restart_kubelet_scenario": node_scenario_object.restart_kubelet_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "stop_kubelet_scenario": node_scenario_object.stop_kubelet_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "node_crash_scenario": node_scenario_object.node_crash_scenario( - run_kill_count, single_node, timeout, affected_node + run_kill_count, single_node, timeout ) elif action == "stop_start_helper_node_scenario": if node_scenario["cloud_type"] != "openstack": @@ -251,5 +252,6 @@ def run_node(self, single_node, node_scenario_object, action, node_scenario): % action ) + def get_scenario_types(self) -> list[str]: return ["node_scenarios"] diff --git a/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py b/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py index f7ce8563..8e5466ad 100644 --- a/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py @@ -7,7 +7,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class OPENSTACKCLOUD: def __init__(self): @@ -56,12 +56,22 @@ def reboot_instances(self, node): raise RuntimeError() # Wait until the node instance is running - def wait_until_running(self, node, timeout): - return self.get_instance_status(node, "ACTIVE", timeout) + def wait_until_running(self, node, timeout, affected_node): + start_time = time.time() + instance_status= self.get_instance_status(node, "ACTIVE", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) + return instance_status # Wait until the node instance is stopped - def wait_until_stopped(self, node, timeout): - return self.get_instance_status(node, "SHUTOFF", timeout) + def wait_until_stopped(self, node, timeout, affected_node): + start_time = time.time() + instance_status = self.get_instance_status(node, "SHUTOFF", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) + return instance_status # Get instance status def get_instance_status(self, node, expected_status, timeout): @@ -107,19 +117,21 @@ def get_openstack_nodename(self, os_node_ip): # krkn_lib class openstack_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus ): + super().__init__(kubecli, affected_nodes_status) self.openstackcloud = OPENSTACKCLOUD() - + # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") logging.info("Starting the node %s" % (node)) openstack_node_name = self.openstackcloud.get_instance_id(node) self.openstackcloud.start_instances(openstack_node_name) - self.openstackcloud.wait_until_running(openstack_node_name, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.openstackcloud.wait_until_running(openstack_node_name, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with instance ID: %s is in running state" % (node)) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -130,18 +142,20 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") logging.info("Stopping the node %s " % (node)) openstack_node_name = self.openstackcloud.get_instance_id(node) self.openstackcloud.stop_instances(openstack_node_name) - self.openstackcloud.wait_until_stopped(openstack_node_name, timeout) + self.openstackcloud.wait_until_stopped(openstack_node_name, timeout, affected_node) logging.info("Node with instance name: %s is in stopped state" % (node)) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_not_ready_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -150,17 +164,19 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") logging.info("Rebooting the node %s" % (node)) openstack_node_name = self.openstackcloud.get_instance_id(node) self.openstackcloud.reboot_instances(openstack_node_name) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with instance name: %s has been rebooted" % (node)) logging.info("node_reboot_scenario has been successfuly injected!") except Exception as e: @@ -171,10 +187,12 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to start the node def helper_node_start_scenario(self, instance_kill_count, node_ip, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node_ip) try: logging.info("Starting helper_node_start_scenario injection") openstack_node_name = self.openstackcloud.get_openstack_nodename( @@ -182,7 +200,7 @@ def helper_node_start_scenario(self, instance_kill_count, node_ip, timeout): ) logging.info("Starting the helper node %s" % (openstack_node_name)) self.openstackcloud.start_instances(openstack_node_name) - self.openstackcloud.wait_until_running(openstack_node_name, timeout) + self.openstackcloud.wait_until_running(openstack_node_name, timeout, affected_node) logging.info("Helper node with IP: %s is in running state" % (node_ip)) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -193,10 +211,12 @@ def helper_node_start_scenario(self, instance_kill_count, node_ip, timeout): logging.error("helper_node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def helper_node_stop_scenario(self, instance_kill_count, node_ip, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node_ip) try: logging.info("Starting helper_node_stop_scenario injection") openstack_node_name = self.openstackcloud.get_openstack_nodename( @@ -204,7 +224,7 @@ def helper_node_stop_scenario(self, instance_kill_count, node_ip, timeout): ) logging.info("Stopping the helper node %s " % (openstack_node_name)) self.openstackcloud.stop_instances(openstack_node_name) - self.openstackcloud.wait_until_stopped(openstack_node_name, timeout) + self.openstackcloud.wait_until_stopped(openstack_node_name, timeout, affected_node) logging.info("Helper node with IP: %s is in stopped state" % (node_ip)) except Exception as e: logging.error( @@ -214,6 +234,7 @@ def helper_node_stop_scenario(self, instance_kill_count, node_ip, timeout): logging.error("helper_node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) def helper_node_service_status(self, node_ip, service, ssh_private_key, timeout): try: diff --git a/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py b/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py index b81906ff..8c7ec751 100644 --- a/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py +++ b/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py @@ -15,6 +15,7 @@ from krkn.scenario_plugins.node_actions.openstack_node_scenarios import OPENSTACKCLOUD from krkn.scenario_plugins.native.node_scenarios.ibmcloud_plugin import IbmCloud +from krkn_lib.models.k8s import AffectedNodeStatus, AffectedNode class ShutDownScenarioPlugin(AbstractScenarioPlugin): def run( @@ -32,9 +33,12 @@ def run( "cluster_shut_down_scenario" ] start_time = int(time.time()) + affected_nodes_status = AffectedNodeStatus() self.cluster_shut_down( - shut_down_config_scenario, lib_telemetry.get_lib_kubernetes() + shut_down_config_scenario, lib_telemetry.get_lib_kubernetes(), affected_nodes_status ) + + scenario_telemetry.affected_nodes = affected_nodes_status end_time = int(time.time()) cerberus.publish_kraken_status(krkn_config, [], start_time, end_time) return 0 @@ -72,7 +76,7 @@ def multiprocess_nodes(self, cloud_object_function, nodes, processes=0): # Inject the cluster shut down scenario # krkn_lib - def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): + def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): runs = shut_down_config["runs"] shut_down_duration = shut_down_config["shut_down_duration"] cloud_type = shut_down_config["cloud_type"] @@ -101,6 +105,7 @@ def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): node_id = [] for node in nodes: instance_id = cloud_object.get_instance_id(node) + affected_nodes_status.affected_nodes.append(AffectedNode(node)) node_id.append(instance_id) logging.info("node id list " + str(node_id)) for _ in range(runs): @@ -108,14 +113,18 @@ def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): stopping_nodes = set(node_id) self.multiprocess_nodes(cloud_object.stop_instances, node_id, processes) stopped_nodes = stopping_nodes.copy() + start_time = time.time() while len(stopping_nodes) > 0: for node in stopping_nodes: + affected_node = affected_nodes_status.get_affected_node_index(node) + # need to add in time that is passing while waiting for other nodes to be stopped + affected_node.set_cloud_stopping_time(time.time() - start_time) if type(node) is tuple: node_status = cloud_object.wait_until_stopped( - node[1], node[0], timeout + node[1], node[0], timeout, affected_node ) else: - node_status = cloud_object.wait_until_stopped(node, timeout) + node_status = cloud_object.wait_until_stopped(node, timeout, affected_node) # Only want to remove node from stopping list # when fully stopped/no error @@ -132,16 +141,20 @@ def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): logging.info("Restarting the nodes") restarted_nodes = set(node_id) self.multiprocess_nodes(cloud_object.start_instances, node_id, processes) + start_time = time.time() logging.info("Wait for each node to be running again") not_running_nodes = restarted_nodes.copy() while len(not_running_nodes) > 0: for node in not_running_nodes: + affected_node = affected_nodes_status.get_affected_node_index(node) + # need to add in time that is passing while waiting for other nodes to be running + affected_node.set_cloud_running_time(time.time() - start_time) if type(node) is tuple: node_status = cloud_object.wait_until_running( - node[1], node[0], timeout + node[1], node[0], timeout, affected_node ) else: - node_status = cloud_object.wait_until_running(node, timeout) + node_status = cloud_object.wait_until_running(node, timeout, affected_node) if node_status: restarted_nodes.remove(node) not_running_nodes = restarted_nodes.copy() diff --git a/requirements.txt b/requirements.txt index 54688893..48300b1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,7 @@ google-cloud-compute==1.22.0 ibm_cloud_sdk_core==3.18.0 ibm_vpc==0.20.0 jinja2==3.1.5 -krkn-lib==4.0.4 +#krkn-lib==4.0.4 lxml==5.1.0 kubernetes==28.1.0 numpy==1.26.4 @@ -36,6 +36,7 @@ werkzeug==3.0.6 wheel==0.42.0 zope.interface==5.4.0 +git+https://github.com/krkn-chaos/krkn-lib.git@node_timing git+https://github.com/krkn-chaos/arcaflow-plugin-kill-pod.git@v0.1.0 git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0 cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability