diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0baa39bc5..387ab60aa 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -84,9 +84,9 @@ jobs: echo "test_namespace" >> ./CI/tests/functional_tests echo "test_net_chaos" >> ./CI/tests/functional_tests echo "test_time" >> ./CI/tests/functional_tests - echo "test_arca_cpu_hog" >> ./CI/tests/functional_tests - echo "test_arca_memory_hog" >> ./CI/tests/functional_tests - echo "test_arca_io_hog" >> ./CI/tests/functional_tests + echo "test_cpu_hog" >> ./CI/tests/functional_tests + echo "test_memory_hog" >> ./CI/tests/functional_tests + echo "test_io_hog" >> ./CI/tests/functional_tests # Push on main only steps + all other functional to collect coverage @@ -113,9 +113,9 @@ jobs: echo "test_namespace" >> ./CI/tests/functional_tests echo "test_net_chaos" >> ./CI/tests/functional_tests echo "test_time" >> ./CI/tests/functional_tests - echo "test_arca_cpu_hog" >> ./CI/tests/functional_tests - echo "test_arca_memory_hog" >> ./CI/tests/functional_tests - echo "test_arca_io_hog" >> ./CI/tests/functional_tests + echo "test_cpu_hog" >> ./CI/tests/functional_tests + echo "test_memory_hog" >> ./CI/tests/functional_tests + echo "test_io_hog" >> ./CI/tests/functional_tests # Final common steps - name: Run Functional tests diff --git a/CI/tests/test_arca_cpu_hog.sh b/CI/tests/test_arca_cpu_hog.sh deleted file mode 100644 index 7989be212..000000000 --- a/CI/tests/test_arca_cpu_hog.sh +++ /dev/null @@ -1,19 +0,0 @@ -set -xeEo pipefail - -source CI/tests/common.sh - -trap error ERR -trap finish EXIT - - -function functional_test_arca_cpu_hog { - yq -i '.input_list[0].node_selector={"kubernetes.io/hostname":"kind-worker2"}' scenarios/kube/cpu-hog/input.yaml - export scenario_type="hog_scenarios" - export scenario_file="scenarios/kube/cpu-hog/input.yaml" - export post_config="" - envsubst < CI/config/common_test_config.yaml > CI/config/arca_cpu_hog.yaml - python3 -m coverage run -a run_kraken.py -c CI/config/arca_cpu_hog.yaml - echo "Arcaflow CPU Hog: Success" -} - -functional_test_arca_cpu_hog \ No newline at end of file diff --git a/CI/tests/test_arca_io_hog.sh b/CI/tests/test_arca_io_hog.sh deleted file mode 100644 index 155cbd118..000000000 --- a/CI/tests/test_arca_io_hog.sh +++ /dev/null @@ -1,19 +0,0 @@ -set -xeEo pipefail - -source CI/tests/common.sh - -trap error ERR -trap finish EXIT - - -function functional_test_arca_io_hog { - yq -i '.input_list[0].node_selector={"kubernetes.io/hostname":"kind-worker2"}' scenarios/kube/io-hog/input.yaml - export scenario_type="hog_scenarios" - export scenario_file="scenarios/kube/io-hog/input.yaml" - export post_config="" - envsubst < CI/config/common_test_config.yaml > CI/config/arca_io_hog.yaml - python3 -m coverage run -a run_kraken.py -c CI/config/arca_io_hog.yaml - echo "Arcaflow IO Hog: Success" -} - -functional_test_arca_io_hog \ No newline at end of file diff --git a/CI/tests/test_arca_memory_hog.sh b/CI/tests/test_arca_memory_hog.sh deleted file mode 100644 index 83e12961b..000000000 --- a/CI/tests/test_arca_memory_hog.sh +++ /dev/null @@ -1,19 +0,0 @@ -set -xeEo pipefail - -source CI/tests/common.sh - -trap error ERR -trap finish EXIT - - -function functional_test_arca_memory_hog { - yq -i '.input_list[0].node_selector={"kubernetes.io/hostname":"kind-worker2"}' scenarios/kube/memory-hog/input.yaml - export scenario_type="hog_scenarios" - export scenario_file="scenarios/kube/memory-hog/input.yaml" - export post_config="" - envsubst < CI/config/common_test_config.yaml > CI/config/arca_memory_hog.yaml - python3 -m coverage run -a run_kraken.py -c CI/config/arca_memory_hog.yaml - echo "Arcaflow Memory Hog: Success" -} - -functional_test_arca_memory_hog \ No newline at end of file diff --git a/CI/tests/test_cpu_hog.sh b/CI/tests/test_cpu_hog.sh new file mode 100644 index 000000000..edc09e7ed --- /dev/null +++ b/CI/tests/test_cpu_hog.sh @@ -0,0 +1,20 @@ +set -xeEo pipefail + +source CI/tests/common.sh + +trap error ERR +trap finish EXIT + + +function functional_test_cpu_hog { + yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml + + export scenario_type="hog_scenarios" + export scenario_file="scenarios/kube/cpu-hog.yml" + export post_config="" + envsubst < CI/config/common_test_config.yaml > CI/config/cpu_hog.yaml + python3 -m coverage run -a run_kraken.py -c CI/config/cpu_hog.yaml + echo "CPU Hog: Success" +} + +functional_test_cpu_hog \ No newline at end of file diff --git a/CI/tests/test_io_hog.sh b/CI/tests/test_io_hog.sh new file mode 100644 index 000000000..bf7b44ff7 --- /dev/null +++ b/CI/tests/test_io_hog.sh @@ -0,0 +1,19 @@ +set -xeEo pipefail + +source CI/tests/common.sh + +trap error ERR +trap finish EXIT + + +function functional_test_io_hog { + yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml + export scenario_type="hog_scenarios" + export scenario_file="scenarios/kube/io-hog.yml" + export post_config="" + envsubst < CI/config/common_test_config.yaml > CI/config/io_hog.yaml + python3 -m coverage run -a run_kraken.py -c CI/config/io_hog.yaml + echo "IO Hog: Success" +} + +functional_test_io_hog \ No newline at end of file diff --git a/CI/tests/test_memory_hog.sh b/CI/tests/test_memory_hog.sh new file mode 100644 index 000000000..5239e2819 --- /dev/null +++ b/CI/tests/test_memory_hog.sh @@ -0,0 +1,19 @@ +set -xeEo pipefail + +source CI/tests/common.sh + +trap error ERR +trap finish EXIT + + +function functional_test_memory_hog { + yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml + export scenario_type="hog_scenarios" + export scenario_file="scenarios/kube/memory-hog.yml" + export post_config="" + envsubst < CI/config/common_test_config.yaml > CI/config/memory_hog.yaml + python3 -m coverage run -a run_kraken.py -c CI/config/memory_hog.yaml + echo "Memory Hog: Success" +} + +functional_test_memory_hog \ No newline at end of file diff --git a/README.md b/README.md index c75052f93..184c48971 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ Scenario type | Kubernetes [Container Scenarios](docs/container_scenarios.md) | :heavy_check_mark: | [Node Scenarios](docs/node_scenarios.md) | :heavy_check_mark: | [Time Scenarios](docs/time_scenarios.md) | :heavy_check_mark: | -[Hog Scenarios: CPU, Memory](docs/arcaflow_scenarios.md) | :heavy_check_mark: | +[Hog Scenarios: CPU, Memory](docs/hog_scenarios.md) | :heavy_check_mark: | [Cluster Shut Down Scenarios](docs/cluster_shut_down_scenarios.md) | :heavy_check_mark: | [Service Disruption Scenarios](docs/service_disruption_scenarios.md.md) | :heavy_check_mark: | [Zone Outage Scenarios](docs/zone_outage.md) | :heavy_check_mark: | diff --git a/config/config.yaml b/config/config.yaml index f26121de4..bf95b95ed 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -9,10 +9,9 @@ kraken: chaos_scenarios: # List of policies/chaos scenarios to load - hog_scenarios: - - scenarios/kube/cpu-hog/input.yaml - - scenarios/kube/memory-hog/input.yaml - - scenarios/kube/io-hog/input.yaml - - scenarios/kube/io-hog/input.yaml + - scenarios/kube/cpu-hog.yml + - scenarios/kube/memory-hog.yml + - scenarios/kube/io-hog.yml - application_outages_scenarios: - scenarios/openshift/app_outage.yaml - container_scenarios: # List of chaos pod scenarios to load diff --git a/docs/arcaflow_scenarios.md b/docs/arcaflow_scenarios.md deleted file mode 100644 index 02483d1dd..000000000 --- a/docs/arcaflow_scenarios.md +++ /dev/null @@ -1,70 +0,0 @@ -## Arcaflow Scenarios -Arcaflow is a workflow engine in development which provides the ability to execute workflow steps in sequence, in parallel, repeatedly, etc. The main difference to competitors such as Netflix Conductor is the ability to run ad-hoc workflows without an infrastructure setup required. - -The engine uses containers to execute plugins and runs them either locally in Docker/Podman or remotely on a Kubernetes cluster. The workflow system is strongly typed and allows for generating JSON schema and OpenAPI documents for all data formats involved. - -### Available Scenarios -#### Hog scenarios: -- [CPU Hog](arcaflow_scenarios/cpu_hog.md) -- [Memory Hog](arcaflow_scenarios/memory_hog.md) -- [I/O Hog](arcaflow_scenarios/io_hog.md) - - -### Prequisites -Arcaflow supports three deployment technologies: -- Docker -- Podman -- Kubernetes - -#### Docker -In order to run Arcaflow Scenarios with the Docker deployer, be sure that: -- Docker is correctly installed in your Operating System (to find instructions on how to install docker please refer to [Docker Documentation](https://www.docker.com/)) -- The Docker daemon is running - -#### Podman -The podman deployer is built around the podman CLI and doesn't need necessarily to be run along with the podman daemon. -To run Arcaflow Scenarios in your Operating system be sure that: -- podman is correctly installed in your Operating System (to find instructions on how to install podman refer to [Podman Documentation](https://podman.io/)) -- the podman CLI is in your shell PATH - -#### Kubernetes -The kubernetes deployer integrates directly the Kubernetes API Client and needs only a valid kubeconfig file and a reachable Kubernetes/OpenShift Cluster. - -### Usage - -To enable arcaflow scenarios edit the kraken config file, go to the section `kraken -> chaos_scenarios` of the yaml structure -and add a new element to the list named `arcaflow_scenarios` then add the desired scenario -pointing to the `input.yaml` file. -``` -kraken: - ... - chaos_scenarios: - - arcaflow_scenarios: - - scenarios/arcaflow/cpu-hog/input.yaml -``` - -#### input.yaml -The implemented scenarios can be found in *scenarios/arcaflow/* folder. -The entrypoint of each scenario is the *input.yaml* file. -In this file there are all the options to set up the scenario accordingly to the desired target -### config.yaml -The arcaflow config file. Here you can set the arcaflow deployer and the arcaflow log level. -The supported deployers are: -- Docker -- Podman (podman daemon not needed, suggested option) -- Kubernetes - -The supported log levels are: -- debug -- info -- warning -- error -### workflow.yaml -This file contains the steps that will be executed to perform the scenario against the target. -Each step is represented by a container that will be executed from the deployer and its options. -Note that we provide the scenarios as a template, but they can be manipulated to define more complex workflows. -To have more details regarding the arcaflow workflows architecture and syntax it is suggested to refer to the [Arcaflow Documentation](https://arcalot.io/arcaflow/). - -This edit is no longer in quay image -Working on fix in ticket: https://issues.redhat.com/browse/CHAOS-494 -This will effect all versions 4.12 and higher of OpenShift \ No newline at end of file diff --git a/docs/arcaflow_scenarios/cpu_hog.md b/docs/arcaflow_scenarios/cpu_hog.md deleted file mode 100644 index 981a3ae42..000000000 --- a/docs/arcaflow_scenarios/cpu_hog.md +++ /dev/null @@ -1,19 +0,0 @@ -# CPU Hog -This scenario is based on the arcaflow [arcaflow-plugin-stressng](https://github.com/arcalot/arcaflow-plugin-stressng) plugin. -The purpose of this scenario is to create cpu pressure on a particular node of the Kubernetes/OpenShift cluster for a time span. -To enable this plugin add the pointer to the scenario input file `scenarios/arcaflow/cpu-hog/input.yaml` as described in the -Usage section. -This scenario takes a list of objects named `input_list` with the following properties: - -- **kubeconfig :** *string* the kubeconfig needed by the deployer to deploy the sysbench plugin in the target cluster -- **namespace :** *string* the namespace where the scenario container will be deployed -**Note:** this parameter will be automatically filled by kraken if the `kubeconfig_path` property is correctly set -- **node_selector :** *key-value map* the node label that will be used as `nodeSelector` by the pod to target a specific cluster node -- **duration :** *string* stop stress test after N seconds. One can also specify the units of time in seconds, minutes, hours, days or years with the suffix s, m, h, d or y. -- **cpu_count :** *int* the number of CPU cores to be used (0 means all) -- **cpu_method :** *string* a fine-grained control of which cpu stressors to use (ackermann, cfloat etc. see [manpage](https://manpages.org/sysbench) for all the cpu_method options) -- **cpu_load_percentage :** *int* the CPU load by percentage - -To perform several load tests in the same run simultaneously (eg. stress two or more nodes in the same run) add another item -to the `input_list` with the same properties (and eventually different values eg. different node_selectors -to schedule the pod on different nodes). To reduce (or increase) the parallelism change the value `parallelism` in `workload.yaml` file \ No newline at end of file diff --git a/docs/arcaflow_scenarios/io_hog.md b/docs/arcaflow_scenarios/io_hog.md deleted file mode 100644 index d6ebb6983..000000000 --- a/docs/arcaflow_scenarios/io_hog.md +++ /dev/null @@ -1,21 +0,0 @@ -# I/O Hog -This scenario is based on the arcaflow [arcaflow-plugin-stressng](https://github.com/arcalot/arcaflow-plugin-stressng) plugin. -The purpose of this scenario is to create disk pressure on a particular node of the Kubernetes/OpenShift cluster for a time span. -The scenario allows to attach a node path to the pod as a `hostPath` volume. -To enable this plugin add the pointer to the scenario input file `scenarios/arcaflow/io-hog/input.yaml` as described in the -Usage section. -This scenario takes a list of objects named `input_list` with the following properties: - -- **kubeconfig :** *string* the kubeconfig needed by the deployer to deploy the sysbench plugin in the target cluster -- **namespace :** *string* the namespace where the scenario container will be deployed -**Note:** this parameter will be automatically filled by kraken if the `kubeconfig_path` property is correctly set -- **node_selector :** *key-value map* the node label that will be used as `nodeSelector` by the pod to target a specific cluster node -- **duration :** *string* stop stress test after N seconds. One can also specify the units of time in seconds, minutes, hours, days or years with the suffix s, m, h, d or y. -- **target_pod_folder :** *string* the path in the pod where the volume is mounted -- **target_pod_volume :** *object* the `hostPath` volume definition in the [Kubernetes/OpenShift](https://docs.openshift.com/container-platform/3.11/install_config/persistent_storage/using_hostpath.html) format, that will be attached to the pod as a volume -- **io_write_bytes :** *string* writes N bytes for each hdd process. The size can be expressed as % of free space on the file system or in units of Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g -- **io_block_size :** *string* size of each write in bytes. Size can be from 1 byte to 4m. - -To perform several load tests in the same run simultaneously (eg. stress two or more nodes in the same run) add another item -to the `input_list` with the same properties (and eventually different values eg. different node_selectors -to schedule the pod on different nodes). To reduce (or increase) the parallelism change the value `parallelism` in `workload.yaml` file \ No newline at end of file diff --git a/docs/arcaflow_scenarios/memory_hog.md b/docs/arcaflow_scenarios/memory_hog.md deleted file mode 100644 index 2eee25f41..000000000 --- a/docs/arcaflow_scenarios/memory_hog.md +++ /dev/null @@ -1,18 +0,0 @@ -# Memory Hog -This scenario is based on the arcaflow [arcaflow-plugin-stressng](https://github.com/arcalot/arcaflow-plugin-stressng) plugin. -The purpose of this scenario is to create Virtual Memory pressure on a particular node of the Kubernetes/OpenShift cluster for a time span. -To enable this plugin add the pointer to the scenario input file `scenarios/arcaflow/memory-hog/input.yaml` as described in the -Usage section. -This scenario takes a list of objects named `input_list` with the following properties: - -- **kubeconfig :** *string* the kubeconfig needed by the deployer to deploy the sysbench plugin in the target cluster -- **namespace :** *string* the namespace where the scenario container will be deployed -**Note:** this parameter will be automatically filled by kraken if the `kubeconfig_path` property is correctly set -- **node_selector :** *key-value map* the node label that will be used as `nodeSelector` by the pod to target a specific cluster node -- **duration :** *string* stop stress test after N seconds. One can also specify the units of time in seconds, minutes, hours, days or years with the suffix s, m, h, d or y. -- **vm_bytes :** *string* N bytes per vm process or percentage of memory used (using the % symbol). The size can be expressed in units of Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g. -- **vm_workers :** *int* Number of VM stressors to be run (0 means 1 stressor per CPU) - -To perform several load tests in the same run simultaneously (eg. stress two or more nodes in the same run) add another item -to the `input_list` with the same properties (and eventually different values eg. different node_selectors -to schedule the pod on different nodes). To reduce (or increase) the parallelism change the value `parallelism` in `workload.yaml` file \ No newline at end of file diff --git a/docs/hog_scenarios.md b/docs/hog_scenarios.md new file mode 100644 index 000000000..fa10108a6 --- /dev/null +++ b/docs/hog_scenarios.md @@ -0,0 +1,49 @@ +### Hog Scenarios + +Hog Scenarios are designed to push the limits of memory, CPU, or I/O on one or more nodes in your cluster. +They also serve to evaluate whether your cluster can withstand rogue pods that excessively consume resources +without any limits. + +These scenarios involve deploying one or more workloads in the cluster. Based on the specific configuration, +these workloads will use a predetermined amount of resources for a specified duration. + +#### Common options + +| Option | Type | Description | +|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +|`duration`| number | the duration of the stress test in seconds | +|`workers`| number (Optional) | the number of threads instantiated by stress-ng, if left empty the number of workers will match the number of available cores in the node. | +|`hog-type`| string (Enum) | can be cpu, memory or io. | +|`image`| string | the container image of the stress workload | +|`namespace`| string | the namespace where the stress workload will be deployed | +|`node-selector`| string (Optional) | defines the node selector for choosing target nodes. If not specified, one schedulable node in the cluster will be chosen at random. If multiple nodes match the selector, all of them will be subjected to stress. If number-of-nodes is specified, that many nodes will be randomly selected from those identified by the selector. | +|`number-of-nodes`| number (Optional) | restricts the number of selected nodes by the selector| + + +#### `cpu-hog` options + +| Option | Type |Description| +|---|--------|---| +|`cpu-load-percentage`| number | the amount of cpu that will be consumed by the hog| +|`cpu-method`| string | reflects the cpu load strategy adopted by stress-ng, please refer to the stress-ng documentation for all the available options| + + + + +#### `io-hog` options + +| Option | Type | Description | +|-----------------------|--------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `io-block-size` |string| the block size written by the stressor | +| `io-write-bytes` |string| the total amount of data that will be written by the stressor. The size can be specified as % of free space on the file system or in units of Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g | +| `io-target-pod-folder` |string| the folder where the volume will be mounted in the pod | +| `io-target-pod-volume`| dictionary | the pod volume definition that will be stressed by the scenario. | + +> [!CAUTION] +> Modifying the structure of `io-target-pod-volume` might alter how the hog operates, potentially rendering it ineffective. + +#### `memory-hog` options + +| Option | Type |Description| +|-----------------------|--------|---| +|`memory-vm-bytes`| string | the amount of memory that the scenario will try to hog.The size can be specified as % of free space on the file system or in units of Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g | \ No newline at end of file diff --git a/krkn/scenario_plugins/abstract_scenario_plugin.py b/krkn/scenario_plugins/abstract_scenario_plugin.py index 060d9ec36..378f001ca 100644 --- a/krkn/scenario_plugins/abstract_scenario_plugin.py +++ b/krkn/scenario_plugins/abstract_scenario_plugin.py @@ -56,6 +56,7 @@ def run_scenarios( scenario_telemetries: list[ScenarioTelemetry] = [] failed_scenarios = [] wait_duration = krkn_config["tunings"]["wait_duration"] + events_backup = krkn_config["telemetry"]["events_backup"] for scenario_config in scenarios_list: if isinstance(scenario_config, list): logging.error( @@ -99,13 +100,15 @@ def run_scenarios( int(scenario_telemetry.start_timestamp), int(scenario_telemetry.end_timestamp), ) - utils.populate_cluster_events( - scenario_telemetry, - parsed_scenario_config, - telemetry.get_lib_kubernetes(), - int(scenario_telemetry.start_timestamp), - int(scenario_telemetry.end_timestamp), - ) + + if events_backup: + utils.populate_cluster_events( + scenario_telemetry, + parsed_scenario_config, + telemetry.get_lib_kubernetes(), + int(scenario_telemetry.start_timestamp), + int(scenario_telemetry.end_timestamp), + ) if scenario_telemetry.exit_status != 0: failed_scenarios.append(scenario_config) diff --git a/krkn/scenario_plugins/arcaflow/arcaflow_scenario_plugin.py b/krkn/scenario_plugins/arcaflow/arcaflow_scenario_plugin.py deleted file mode 100644 index a61cd167c..000000000 --- a/krkn/scenario_plugins/arcaflow/arcaflow_scenario_plugin.py +++ /dev/null @@ -1,197 +0,0 @@ -import logging -import os -from pathlib import Path -import arcaflow -import yaml -from krkn_lib.models.telemetry import ScenarioTelemetry -from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift -from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin -from krkn.scenario_plugins.arcaflow.context_auth import ContextAuth - - -class ArcaflowScenarioPlugin(AbstractScenarioPlugin): - - def run( - self, - run_uuid: str, - scenario: str, - krkn_config: dict[str, any], - lib_telemetry: KrknTelemetryOpenshift, - scenario_telemetry: ScenarioTelemetry, - ) -> int: - try: - engine_args = self.build_args(scenario) - status_code = self.run_workflow( - engine_args, lib_telemetry.get_lib_kubernetes().get_kubeconfig_path() - ) - return status_code - except Exception as e: - logging.error("ArcaflowScenarioPlugin exiting due to Exception %s" % e) - return 1 - - def get_scenario_types(self) -> [str]: - return ["hog_scenarios", "arcaflow_scenario"] - - def run_workflow( - self, engine_args: arcaflow.EngineArgs, kubeconfig_path: str - ) -> int: - self.set_arca_kubeconfig(engine_args, kubeconfig_path) - exit_status = arcaflow.run(engine_args) - return exit_status - - def build_args(self, input_file: str) -> arcaflow.EngineArgs: - """sets the kubeconfig parsed by setArcaKubeConfig as an input to the arcaflow workflow""" - current_path = Path().resolve() - context = f"{current_path}/{Path(input_file).parent}" - workflow = f"{context}/workflow.yaml" - config = f"{context}/config.yaml" - if not os.path.exists(context): - raise Exception( - "context folder for arcaflow workflow not found: {}".format(context) - ) - if not os.path.exists(input_file): - raise Exception( - "input file for arcaflow workflow not found: {}".format(input_file) - ) - if not os.path.exists(workflow): - raise Exception( - "workflow file for arcaflow workflow not found: {}".format(workflow) - ) - if not os.path.exists(config): - raise Exception( - "configuration file for arcaflow workflow not found: {}".format(config) - ) - - engine_args = arcaflow.EngineArgs() - engine_args.context = context - engine_args.config = config - engine_args.workflow = workflow - engine_args.input = f"{current_path}/{input_file}" - return engine_args - - def set_arca_kubeconfig( - self, engine_args: arcaflow.EngineArgs, kubeconfig_path: str - ): - - context_auth = ContextAuth() - if not os.path.exists(kubeconfig_path): - raise Exception("kubeconfig not found in {}".format(kubeconfig_path)) - - with open(kubeconfig_path, "r") as stream: - try: - kubeconfig = yaml.safe_load(stream) - context_auth.fetch_auth_data(kubeconfig) - except Exception as e: - logging.error( - "impossible to read kubeconfig file in: {}".format(kubeconfig_path) - ) - raise e - - kubeconfig_str = self.set_kubeconfig_auth(kubeconfig, context_auth) - - with open(engine_args.input, "r") as stream: - input_file = yaml.safe_load(stream) - if "input_list" in input_file and isinstance( - input_file["input_list"], list - ): - for index, _ in enumerate(input_file["input_list"]): - if isinstance(input_file["input_list"][index], dict): - input_file["input_list"][index]["kubeconfig"] = kubeconfig_str - else: - input_file["kubeconfig"] = kubeconfig_str - stream.close() - with open(engine_args.input, "w") as stream: - yaml.safe_dump(input_file, stream) - - with open(engine_args.config, "r") as stream: - config_file = yaml.safe_load(stream) - if config_file["deployers"]["image"]["deployer_name"] == "kubernetes": - kube_connection = self.set_kubernetes_deployer_auth( - config_file["deployers"]["image"]["connection"], context_auth - ) - config_file["deployers"]["image"]["connection"] = kube_connection - with open(engine_args.config, "w") as stream: - yaml.safe_dump(config_file, stream, explicit_start=True, width=4096) - - def set_kubernetes_deployer_auth( - self, deployer: any, context_auth: ContextAuth - ) -> any: - if context_auth.clusterHost is not None: - deployer["host"] = context_auth.clusterHost - if context_auth.clientCertificateData is not None: - deployer["cert"] = context_auth.clientCertificateData - if context_auth.clientKeyData is not None: - deployer["key"] = context_auth.clientKeyData - if context_auth.clusterCertificateData is not None: - deployer["cacert"] = context_auth.clusterCertificateData - if context_auth.username is not None: - deployer["username"] = context_auth.username - if context_auth.password is not None: - deployer["password"] = context_auth.password - if context_auth.bearerToken is not None: - deployer["bearerToken"] = context_auth.bearerToken - return deployer - - def set_kubeconfig_auth(self, kubeconfig: any, context_auth: ContextAuth) -> str: - """ - Builds an arcaflow-compatible kubeconfig representation and returns it as a string. - In order to run arcaflow plugins in kubernetes/openshift the kubeconfig must contain client certificate/key - and server certificate base64 encoded within the kubeconfig file itself in *-data fields. That is not always the - case, infact kubeconfig may contain filesystem paths to those files, this function builds an arcaflow-compatible - kubeconfig file and returns it as a string that can be safely included in input.yaml - """ - - if "current-context" not in kubeconfig.keys(): - raise Exception( - "invalid kubeconfig file, impossible to determine current-context" - ) - user_id = None - cluster_id = None - user_name = None - cluster_name = None - current_context = kubeconfig["current-context"] - for context in kubeconfig["contexts"]: - if context["name"] == current_context: - user_name = context["context"]["user"] - cluster_name = context["context"]["cluster"] - if user_name is None: - raise Exception( - "user not set for context {} in kubeconfig file".format(current_context) - ) - if cluster_name is None: - raise Exception( - "cluster not set for context {} in kubeconfig file".format( - current_context - ) - ) - - for index, user in enumerate(kubeconfig["users"]): - if user["name"] == user_name: - user_id = index - for index, cluster in enumerate(kubeconfig["clusters"]): - if cluster["name"] == cluster_name: - cluster_id = index - - if cluster_id is None: - raise Exception( - "no cluster {} found in kubeconfig users".format(cluster_name) - ) - if "client-certificate" in kubeconfig["users"][user_id]["user"]: - kubeconfig["users"][user_id]["user"][ - "client-certificate-data" - ] = context_auth.clientCertificateDataBase64 - del kubeconfig["users"][user_id]["user"]["client-certificate"] - - if "client-key" in kubeconfig["users"][user_id]["user"]: - kubeconfig["users"][user_id]["user"][ - "client-key-data" - ] = context_auth.clientKeyDataBase64 - del kubeconfig["users"][user_id]["user"]["client-key"] - - if "certificate-authority" in kubeconfig["clusters"][cluster_id]["cluster"]: - kubeconfig["clusters"][cluster_id]["cluster"][ - "certificate-authority-data" - ] = context_auth.clusterCertificateDataBase64 - del kubeconfig["clusters"][cluster_id]["cluster"]["certificate-authority"] - kubeconfig_str = yaml.dump(kubeconfig) - return kubeconfig_str diff --git a/krkn/scenario_plugins/arcaflow/context_auth.py b/krkn/scenario_plugins/arcaflow/context_auth.py deleted file mode 100644 index bb07e926b..000000000 --- a/krkn/scenario_plugins/arcaflow/context_auth.py +++ /dev/null @@ -1,142 +0,0 @@ -import os -import base64 - - -class ContextAuth: - clusterCertificate: str = None - clusterCertificateData: str = None - clusterHost: str = None - clientCertificate: str = None - clientCertificateData: str = None - clientKey: str = None - clientKeyData: str = None - clusterName: str = None - username: str = None - password: str = None - bearerToken: str = None - # TODO: integrate in krkn-lib-kubernetes in the next iteration - - @property - def clusterCertificateDataBase64(self): - if self.clusterCertificateData is not None: - return base64.b64encode(bytes(self.clusterCertificateData, "utf8")).decode( - "ascii" - ) - return - - @property - def clientCertificateDataBase64(self): - if self.clientCertificateData is not None: - return base64.b64encode(bytes(self.clientCertificateData, "utf8")).decode( - "ascii" - ) - return - - @property - def clientKeyDataBase64(self): - if self.clientKeyData is not None: - return base64.b64encode(bytes(self.clientKeyData, "utf-8")).decode("ascii") - return - - def fetch_auth_data(self, kubeconfig: any): - context_username = None - current_context = kubeconfig["current-context"] - if current_context is None: - raise Exception("no current-context found in kubeconfig") - - for context in kubeconfig["contexts"]: - if context["name"] == current_context: - context_username = context["context"]["user"] - self.clusterName = context["context"]["cluster"] - if context_username is None: - raise Exception("user not found for context {0}".format(current_context)) - if self.clusterName is None: - raise Exception("cluster not found for context {0}".format(current_context)) - cluster_id = None - user_id = None - for index, user in enumerate(kubeconfig["users"]): - if user["name"] == context_username: - user_id = index - if user_id is None: - raise Exception( - "user {0} not found in kubeconfig users".format(context_username) - ) - - for index, cluster in enumerate(kubeconfig["clusters"]): - if cluster["name"] == self.clusterName: - cluster_id = index - - if cluster_id is None: - raise Exception( - "no cluster {} found in kubeconfig users".format(self.clusterName) - ) - - user = kubeconfig["users"][user_id]["user"] - cluster = kubeconfig["clusters"][cluster_id]["cluster"] - # sets cluster api URL - self.clusterHost = cluster["server"] - # client certificates - - if "client-key" in user: - try: - self.clientKey = user["client-key"] - self.clientKeyData = self.read_file(user["client-key"]) - except Exception as e: - raise e - - if "client-key-data" in user: - try: - self.clientKeyData = base64.b64decode(user["client-key-data"]).decode( - "utf-8" - ) - except Exception as e: - raise Exception("impossible to decode client-key-data") - - if "client-certificate" in user: - try: - self.clientCertificate = user["client-certificate"] - self.clientCertificateData = self.read_file(user["client-certificate"]) - except Exception as e: - raise e - - if "client-certificate-data" in user: - try: - self.clientCertificateData = base64.b64decode( - user["client-certificate-data"] - ).decode("utf-8") - except Exception as e: - raise Exception("impossible to decode client-certificate-data") - - # cluster certificate authority - - if "certificate-authority" in cluster: - try: - self.clusterCertificate = cluster["certificate-authority"] - self.clusterCertificateData = self.read_file( - cluster["certificate-authority"] - ) - except Exception as e: - raise e - - if "certificate-authority-data" in cluster: - try: - self.clusterCertificateData = base64.b64decode( - cluster["certificate-authority-data"] - ).decode("utf-8") - except Exception as e: - raise Exception("impossible to decode certificate-authority-data") - - if "username" in user: - self.username = user["username"] - - if "password" in user: - self.password = user["password"] - - if "token" in user: - self.bearerToken = user["token"] - - def read_file(self, filename: str) -> str: - if not os.path.exists(filename): - raise Exception("file not found {0} ".format(filename)) - with open(filename, "rb") as file_stream: - return file_stream.read().decode("utf-8") diff --git a/krkn/scenario_plugins/arcaflow/fixtures/ca.crt b/krkn/scenario_plugins/arcaflow/fixtures/ca.crt deleted file mode 100644 index e3264358f..000000000 --- a/krkn/scenario_plugins/arcaflow/fixtures/ca.crt +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDBjCCAe6gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p -a3ViZUNBMB4XDTIzMDMxMzE1NDAxM1oXDTMzMDMxMTE1NDAxM1owFTETMBEGA1UE -AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMnz -U/gIbJBRGOgNYVKX2fV03ANOwnM4VjquR28QMAdxURqgOFZ6IxYNysHEyxxE9I+I -DAm9hi4vQPbOX7FlxUezuzw+ExEfa6RRJ+n+AGJOV1lezCVph6OaJxB1+L1UqaDZ -eM3B4cUf/iCc5Y4bs927+CBG3MJL/jmCVPCO+MiSn/l73PXSFNJAYMvRj42zkXqD -CVG9CwY2vWgZnnzl01l7jNGtie871AmV2uqKakJrQ2ILhD+8fZk4jE5JBDTCZnqQ -pXIc+vERNKLUS8cvjO6Ux8dMv/Z7+xonpXOU59LlpUdHWP9jgCvMTwiOriwqGjJ+ -pQJWpX9Dm+oxJiVOJzsCAwEAAaNhMF8wDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW -MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQW -BBQU9pDMtbayJdNM6bp0IG8dcs15qTANBgkqhkiG9w0BAQsFAAOCAQEAtl9TVKPA -hTnPODqv0AGTqreS9kLg4WUUjZRaPUkPWmtCoTh2Yf55nRWdHOHeZnCWDSg24x42 -lpt+13IdqKew1RKTpKCTkicMFi090A01bYu/w39Cm6nOAA5h8zkgSkV5czvQotuV -SoN2vB+nbuY28ah5PkdqjMHEZbNwa59cgEke8wB1R1DWFQ/pqflrH2v9ACAuY+5Q -i673tA6CXrb1YfaCQnVBzcfvjGS1MqShPKpOLMF+/GccPczNimaBxMnKvYLvf3pN -qEUrJC00mAcein8HmxR2Xz8wredbMUUyrQxW29pZJwfGE5GU0olnlsA0lZLbTwio -xoolo5y+fsK/dA== ------END CERTIFICATE----- \ No newline at end of file diff --git a/krkn/scenario_plugins/arcaflow/fixtures/client.crt b/krkn/scenario_plugins/arcaflow/fixtures/client.crt deleted file mode 100644 index 64e4aef91..000000000 --- a/krkn/scenario_plugins/arcaflow/fixtures/client.crt +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDITCCAgmgAwIBAgIBAjANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p -a3ViZUNBMB4XDTIzMDUwMTA4NTc0N1oXDTI2MDUwMTA4NTc0N1owMTEXMBUGA1UE -ChMOc3lzdGVtOm1hc3RlcnMxFjAUBgNVBAMTDW1pbmlrdWJlLXVzZXIwggEiMA0G -CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC0b7uy9nQYrh7uC5NODve7dFNLAgo5 -pWRS6Kx13ULA55gOpieZiI5/1jwUBjOz0Hhl5QAdHC1HDNu5wf4MmwIEheuq3kMA -mfuvNxW2BnWSDuXyUMlBfqlwg5o6W8ndEWaK33D7wd2WQsSsAnhQPJSjnzWKvWKq -+Kbcygc4hdss/ZWN+SXLTahNpHBw0sw8AcJqddNeXs2WI5GdZmbXL4QZI36EaNUm -m4xKmKRKYIP9wYkmXOV/D2h1meM44y4lul5v2qvo6I+umJ84q4W1/W1vVmAzyVfL -v1TQCUx8cpKMHzw3ma6CTBCtU3Oq9HKHBnf8GyHZicmV7ESzf/phJu4ZAgMBAAGj -YDBeMA4GA1UdDwEB/wQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUH -AwIwDAYDVR0TAQH/BAIwADAfBgNVHSMEGDAWgBQU9pDMtbayJdNM6bp0IG8dcs15 -qTANBgkqhkiG9w0BAQsFAAOCAQEABNzEQQMYUcLsBASHladEjr46avKn7gREfaDl -Y5PBvgCPP42q/sW/9iCNY3UpT9TJZWM6s01+0p6I96jYbRQER1NX7O4OgQYHmFw2 -PF6UOG2vMo54w11OvL7sbr4d+nkE6ItdM9fLDIJ3fEOYJZkSoxhOL/U3jSjIl7Wu -KCIlpM/M/gcZ4w2IvcLrWtvswbFNUd+dwQfBGcQTmSQDOLE7MqSvzYAkeNv73GLB -ieba7gs/PmoTFsf9nW60iXymDDF4MtODn15kqT/y1uD6coujmiEiIomBfxqAkUCU -0ciP/KF5oOEMmMedm7/peQxaRTMdRSk4yu7vbj/BxnTcj039Qg== ------END CERTIFICATE----- \ No newline at end of file diff --git a/krkn/scenario_plugins/arcaflow/fixtures/client.key b/krkn/scenario_plugins/arcaflow/fixtures/client.key deleted file mode 100644 index 03cfb8798..000000000 --- a/krkn/scenario_plugins/arcaflow/fixtures/client.key +++ /dev/null @@ -1,27 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEowIBAAKCAQEAtG+7svZ0GK4e7guTTg73u3RTSwIKOaVkUuisdd1CwOeYDqYn -mYiOf9Y8FAYzs9B4ZeUAHRwtRwzbucH+DJsCBIXrqt5DAJn7rzcVtgZ1kg7l8lDJ -QX6pcIOaOlvJ3RFmit9w+8HdlkLErAJ4UDyUo581ir1iqvim3MoHOIXbLP2Vjfkl -y02oTaRwcNLMPAHCanXTXl7NliORnWZm1y+EGSN+hGjVJpuMSpikSmCD/cGJJlzl -fw9odZnjOOMuJbpeb9qr6OiPrpifOKuFtf1tb1ZgM8lXy79U0AlMfHKSjB88N5mu -gkwQrVNzqvRyhwZ3/Bsh2YnJlexEs3/6YSbuGQIDAQABAoIBAQCdJxPb8zt6o2zc -98f8nJy378D7+3LccmjGrVBH98ZELXIKkDy9RGqYfQcmiaBOZKv4U1OeBwSIdXKK -f6O9ZuSC/AEeeSbyRysmmFuYhlewNrmgKyyelqsNDBIv8fIHUTh2i9Xj8B4G2XBi -QGR5vcnYGLqRdBGTx63Nb0iKuksDCwPAuPA/e0ySz9HdWL1j4bqpVSYsOIXsqTDr -CVnxUeSIL0fFQnRm3IASXQD7zdq9eEFX7vESeleZoz8qNcKb4Na/C3N6crScjgH7 -qyNZ2zNLfy1LT84k8uc1TMX2KcEVEmfdDv5cCnUH2ic12CwXMZ0vgId5LJTaHx4x -ytIQIe5hAoGBANB+TsRXP4KzcjZlUUfiAp/pWUM4kVktbsfZa1R2NEuIGJUxPk3P -7WS0WX5W75QKRg+UWTubg5kfd0f9fklLgofmliBnY/HrpgdyugJmUZBgzIxmy0k+ -aCe0biD1gULfyyrKtfe8k5wRFstzhfGszlOf2ebR87sSVNBuF2lEwPTvAoGBAN2M -0/XrsodGU4B9Mj86Go2gb2k2WU2izI0cO+tm2S5U5DvKmVEnmjXfPRaOFj2UUQjo -cljnDAinbN+O0+Inc35qsEeYdAIepNAPglzcpfTHagja9mhx2idLYTXGhbZLL+Ei -TRzMyP27NF+GVVfYU/cA86ns6NboG6spohmnqh13AoGAKPc4aNGv0/GIVnHP56zb -0SnbdR7PSFNp+fCZay4Slmi2U9IqKMXbIjdhgjZ4uoDORU9jvReQYuzQ1h9TyfkB -O8yt4M4P0D/6DmqXa9NI4XJznn6wIMMXWf3UybsTW913IQBVgsjVxAuDjBQ11Eec -/sdg3D6SgkZWzeFjzjZJJ5cCgYBSYVg7fE3hERxhjawOaJuRCBQFSklAngVzfwkk -yhR9ruFC/l2uGIy19XFwnprUgP700gIa3qbR3PeV1TUiRcsjOaacqKqSUzSzjODL -iNxIvZHHAyxWv+b/b38REOWNWD3QeAG2cMtX1bFux7OaO31VPkxcZhRaPOp05cE5 -yudtlwKBgDBbR7RLYn03OPm3NDBLLjTybhD8Iu8Oj7UeNCiEWAdZpqIKYnwSxMzQ -kdo4aTENA/seEwq+XDV7TwbUIFFJg5gDXIhkcK2c9kiO2bObCAmKpBlQCcrp0a5X -NSBk1N/ZG/Qhqns7z8k01KN4LNcdpRoNiYYPgY+p3xbY8+nWhv+q ------END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/krkn/scenario_plugins/arcaflow/test_context_auth.py b/krkn/scenario_plugins/arcaflow/test_context_auth.py deleted file mode 100644 index 75e48113a..000000000 --- a/krkn/scenario_plugins/arcaflow/test_context_auth.py +++ /dev/null @@ -1,98 +0,0 @@ -import os -import unittest - -import yaml - -from .context_auth import ContextAuth - - -class TestCurrentContext(unittest.TestCase): - - def get_kubeconfig_with_data(self) -> str: - """ - This function returns a test kubeconfig file as a string. - - :return: a test kubeconfig file in string format (for unit testing purposes) - """ # NOQA - return """apiVersion: v1 -clusters: -- cluster: - certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM5ekNDQWQrZ0F3SUJBZ0lVV01PTVBNMVUrRi9uNXN6TSthYzlMcGZISHB3d0RRWUpLb1pJaHZjTkFRRUwKQlFBd0hqRWNNQm9HQTFVRUF3d1RhM1ZpZFc1MGRTNXNiMk5oYkdSdmJXRnBiakFlRncweU1URXlNRFl4T0RBdwpNRFJhRncwek1URXlNRFF4T0RBd01EUmFNQjR4SERBYUJnTlZCQU1NRTJ0MVluVnVkSFV1Ykc5allXeGtiMjFoCmFXNHdnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDNExhcG00SDB0T1NuYTNXVisKdzI4a0tOWWRwaHhYOUtvNjUwVGlOK2c5ZFNQU3VZK0V6T1JVOWVONlgyWUZkMEJmVFNodno4Y25rclAvNysxegpETEoxQ3MwRi9haEV3ZDQxQXN5UGFjbnRiVE80dGRLWm9POUdyODR3YVdBN1hSZmtEc2ZxRGN1YW5UTmVmT1hpCkdGbmdDVzU5Q285M056alB1eEFrakJxdVF6eE5GQkgwRlJPbXJtVFJ4cnVLZXo0aFFuUW1OWEFUNnp0M21udzMKWUtWTzU4b2xlcUxUcjVHNlRtVFQyYTZpVGdtdWY2N0cvaVZlalJGbkw3YkNHWmgzSjlCSTNMcVpqRzE4dWxvbgpaVDdQcGQrQTlnaTJOTm9UZlI2TVB5SndxU1BCL0xZQU5ZNGRoZDVJYlVydDZzbmViTlRZSHV2T0tZTDdNTWRMCmVMSzFBZ01CQUFHakxUQXJNQWtHQTFVZEV3UUNNQUF3SGdZRFZSMFJCQmN3RllJVGEzVmlkVzUwZFM1c2IyTmgKYkdSdmJXRnBiakFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBQTVqUHVpZVlnMExySE1PSkxYY0N4d3EvVzBDNApZeFpncVd3VHF5VHNCZjVKdDlhYTk0SkZTc2dHQWdzUTN3NnA2SlBtL0MyR05MY3U4ZWxjV0E4UXViQWxueXRRCnF1cEh5WnYrZ08wMG83TXdrejZrTUxqQVZ0QllkRzJnZ21FRjViTEk5czBKSEhjUGpHUkl1VHV0Z0tHV1dPWHgKSEg4T0RzaG9wZHRXMktrR2c2aThKaEpYaWVIbzkzTHptM00xRUNGcXAvMEdtNkN1RFphVVA2SGpJMWRrYllLdgpsSHNVZ1U1SmZjSWhNYmJLdUllTzRkc1YvT3FHcm9iNW5vcmRjaExBQmRDTnc1cmU5T1NXZGZ1VVhSK0ViZVhrCjVFM0tFYzA1RGNjcGV2a1NTdlJ4SVQrQzNMOTltWGcxL3B5NEw3VUhvNFFLTXlqWXJXTWlLRlVKV1E9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== - server: https://127.0.0.1:6443 - name: default -contexts: -- context: - cluster: default - namespace: default - user: testuser - name: default -current-context: default -kind: Config -preferences: {} -users: -- name: testuser - user: - client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUM5ekNDQWQrZ0F3SUJBZ0lVV01PTVBNMVUrRi9uNXN6TSthYzlMcGZISHB3d0RRWUpLb1pJaHZjTkFRRUwKQlFBd0hqRWNNQm9HQTFVRUF3d1RhM1ZpZFc1MGRTNXNiMk5oYkdSdmJXRnBiakFlRncweU1URXlNRFl4T0RBdwpNRFJhRncwek1URXlNRFF4T0RBd01EUmFNQjR4SERBYUJnTlZCQU1NRTJ0MVluVnVkSFV1Ykc5allXeGtiMjFoCmFXNHdnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDNExhcG00SDB0T1NuYTNXVisKdzI4a0tOWWRwaHhYOUtvNjUwVGlOK2c5ZFNQU3VZK0V6T1JVOWVONlgyWUZkMEJmVFNodno4Y25rclAvNysxegpETEoxQ3MwRi9haEV3ZDQxQXN5UGFjbnRiVE80dGRLWm9POUdyODR3YVdBN1hSZmtEc2ZxRGN1YW5UTmVmT1hpCkdGbmdDVzU5Q285M056alB1eEFrakJxdVF6eE5GQkgwRlJPbXJtVFJ4cnVLZXo0aFFuUW1OWEFUNnp0M21udzMKWUtWTzU4b2xlcUxUcjVHNlRtVFQyYTZpVGdtdWY2N0cvaVZlalJGbkw3YkNHWmgzSjlCSTNMcVpqRzE4dWxvbgpaVDdQcGQrQTlnaTJOTm9UZlI2TVB5SndxU1BCL0xZQU5ZNGRoZDVJYlVydDZzbmViTlRZSHV2T0tZTDdNTWRMCmVMSzFBZ01CQUFHakxUQXJNQWtHQTFVZEV3UUNNQUF3SGdZRFZSMFJCQmN3RllJVGEzVmlkVzUwZFM1c2IyTmgKYkdSdmJXRnBiakFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBQTVqUHVpZVlnMExySE1PSkxYY0N4d3EvVzBDNApZeFpncVd3VHF5VHNCZjVKdDlhYTk0SkZTc2dHQWdzUTN3NnA2SlBtL0MyR05MY3U4ZWxjV0E4UXViQWxueXRRCnF1cEh5WnYrZ08wMG83TXdrejZrTUxqQVZ0QllkRzJnZ21FRjViTEk5czBKSEhjUGpHUkl1VHV0Z0tHV1dPWHgKSEg4T0RzaG9wZHRXMktrR2c2aThKaEpYaWVIbzkzTHptM00xRUNGcXAvMEdtNkN1RFphVVA2SGpJMWRrYllLdgpsSHNVZ1U1SmZjSWhNYmJLdUllTzRkc1YvT3FHcm9iNW5vcmRjaExBQmRDTnc1cmU5T1NXZGZ1VVhSK0ViZVhrCjVFM0tFYzA1RGNjcGV2a1NTdlJ4SVQrQzNMOTltWGcxL3B5NEw3VUhvNFFLTXlqWXJXTWlLRlVKV1E9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== - client-key-data: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktZd2dnU2lBZ0VBQW9JQkFRQzRMYXBtNEgwdE9TbmEKM1dWK3cyOGtLTllkcGh4WDlLbzY1MFRpTitnOWRTUFN1WStFek9SVTllTjZYMllGZDBCZlRTaHZ6OGNua3JQLwo3KzF6RExKMUNzMEYvYWhFd2Q0MUFzeVBhY250YlRPNHRkS1pvTzlHcjg0d2FXQTdYUmZrRHNmcURjdWFuVE5lCmZPWGlHRm5nQ1c1OUNvOTNOempQdXhBa2pCcXVRenhORkJIMEZST21ybVRSeHJ1S2V6NGhRblFtTlhBVDZ6dDMKbW53M1lLVk81OG9sZXFMVHI1RzZUbVRUMmE2aVRnbXVmNjdHL2lWZWpSRm5MN2JDR1poM0o5QkkzTHFaakcxOAp1bG9uWlQ3UHBkK0E5Z2kyTk5vVGZSNk1QeUp3cVNQQi9MWUFOWTRkaGQ1SWJVcnQ2c25lYk5UWUh1dk9LWUw3Ck1NZExlTEsxQWdNQkFBRUNnZ0VBQ28rank4NW5ueVk5L2l6ZjJ3cjkzb2J3OERaTVBjYnIxQURhOUZYY1hWblEKT2c4bDZhbU9Ga2tiU0RNY09JZ0VDdkx6dEtXbmQ5OXpydU5sTEVtNEdmb0trNk5kK01OZEtKRUdoZHE5RjM1Qgpqdi91R1owZTIyRE5ZLzFHNVdDTE5DcWMwQkVHY2RFOTF0YzJuMlppRVBTNWZ6WVJ6L1k4cmJ5K1NqbzJkWE9RCmRHYWRlUFplbi9UbmlHTFlqZWhrbXZNQjJvU0FDbVMycTd2OUNrcmdmR1RZbWJzeGVjSU1QK0JONG9KS3BOZ28KOUpnRWJ5SUxkR1pZS2pQb2lLaHNjMVhmSy8zZStXSmxuYjJBaEE5Y1JMUzhMcDdtcEYySWp4SjNSNE93QTg3WQpNeGZvZWFGdnNuVUFHWUdFWFo4Z3BkWmhQMEoxNWRGdERjajIrcngrQVFLQmdRRDFoSE9nVGdFbERrVEc5bm5TCjE1eXYxRzUxYnJMQU1UaWpzNklEMU1qelhzck0xY2ZvazVaaUlxNVJsQ3dReTlYNDdtV1RhY0lZRGR4TGJEcXEKY0IydjR5Wm1YK1VleGJ3cDU1OWY0V05HdzF5YzQrQjdaNFF5aTRFelN4WmFjbldjMnBzcHJMUFVoOUFXRXVNcApOaW1vcXNiVGNnNGs5QWRxeUIrbWhIWmJRUUtCZ1FEQUNzU09qNXZMU1VtaVpxYWcrOVMySUxZOVNOdDZzS1VyCkprcjdCZEVpN3N2YmU5cldRR2RBb0xkQXNzcU94aENydmtPNkpSSHB1YjlRRjlYdlF4Riszc2ZpZm4yYkQ0ZloKMlVsclA1emF3RlNrNDNLbjdMZzRscURpaVUxVGlqTkJBL3dUcFlmbTB4dW5WeFRWNDZpNVViQW1XRk12TWV0bQozWUZYQmJkK2RRS0JnRGl6Q1B6cFpzeEcrazAwbUxlL2dYajl4ekNwaXZCbHJaM29teTdsVWk4YUloMmg5VlBaCjJhMzZNbVcyb1dLVG9HdW5xcCtibWU1eUxRRGlFcjVQdkJ0bGl2V3ppYmRNbFFMY2Nlcnpveml4WDA4QU5WUnEKZUpZdnIzdklDSGFFM25LRjdiVjNJK1NlSk1ra1BYL0QrV1R4WTQ5clZLYm1FRnh4c1JXRW04ekJBb0dBWEZ3UgpZanJoQTZqUW1DRmtYQ0loa0NJMVkwNEorSHpDUXZsY3NGT0EzSnNhUWduVUdwekl5OFUvdlFiLzhpQ0IzZ2RZCmpVck16YXErdnVkbnhYVnRFYVpWWGJIVitPQkVSdHFBdStyUkprZS9yYm1SNS84cUxsVUxOVWd4ZjA4RkRXeTgKTERxOUhKOUZPbnJnRTJvMU9FTjRRMGpSWU81U041dXFXODd0REEwQ2dZQXpXbk1KSFgrbmlyMjhRRXFyVnJKRAo4ZUEwOHIwWTJRMDhMRlcvMjNIVWQ4WU12VnhTUTdwcUwzaE41RXVJQ2dCbEpGVFI3TndBREo3eDY2M002akFMCm1DNlI4dWxSZStwa08xN2Y0UUs3MnVRanJGZEhESnlXQmdDL0RKSkV6d1dwY0Q4VVNPK3A5bVVIbllLTUJTOEsKTVB1ejYrZ3h0VEtsRU5pZUVacXhxZz09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K - username: testuser - password: testpassword - token: sha256~fFyEqjf1xxFMO0tbEyGRvWeNOd7QByuEgS4hyEq_A9o - """ # NOQA - - def get_kubeconfig_with_paths(self) -> str: - """ - This function returns a test kubeconfig file as a string. - - :return: a test kubeconfig file in string format (for unit testing purposes) - """ # NOQA - return """apiVersion: v1 -clusters: -- cluster: - certificate-authority: fixtures/ca.crt - server: https://127.0.0.1:6443 - name: default -contexts: -- context: - cluster: default - namespace: default - user: testuser - name: default -current-context: default -kind: Config -preferences: {} -users: -- name: testuser - user: - client-certificate: fixtures/client.crt - client-key: fixtures/client.key - username: testuser - password: testpassword - token: sha256~fFyEqjf1xxFMO0tbEyGRvWeNOd7QByuEgS4hyEq_A9o - """ # NOQA - - def test_current_context(self): - cwd = os.getcwd() - current_context_data = ContextAuth() - data = yaml.safe_load(self.get_kubeconfig_with_data()) - current_context_data.fetch_auth_data(data) - self.assertIsNotNone(current_context_data.clusterCertificateData) - self.assertIsNotNone(current_context_data.clientCertificateData) - self.assertIsNotNone(current_context_data.clientKeyData) - self.assertIsNotNone(current_context_data.username) - self.assertIsNotNone(current_context_data.password) - self.assertIsNotNone(current_context_data.bearerToken) - self.assertIsNotNone(current_context_data.clusterHost) - - current_context_no_data = ContextAuth() - data = yaml.safe_load(self.get_kubeconfig_with_paths()) - current_context_no_data.fetch_auth_data(data) - self.assertIsNotNone(current_context_no_data.clusterCertificate) - self.assertIsNotNone(current_context_no_data.clusterCertificateData) - self.assertIsNotNone(current_context_no_data.clientCertificate) - self.assertIsNotNone(current_context_no_data.clientCertificateData) - self.assertIsNotNone(current_context_no_data.clientKey) - self.assertIsNotNone(current_context_no_data.clientKeyData) - self.assertIsNotNone(current_context_no_data.username) - self.assertIsNotNone(current_context_no_data.password) - self.assertIsNotNone(current_context_no_data.bearerToken) - self.assertIsNotNone(current_context_data.clusterHost) diff --git a/krkn/scenario_plugins/arcaflow/__init__.py b/krkn/scenario_plugins/hogs/__init__.py similarity index 100% rename from krkn/scenario_plugins/arcaflow/__init__.py rename to krkn/scenario_plugins/hogs/__init__.py diff --git a/krkn/scenario_plugins/hogs/hogs_scenario_plugin.py b/krkn/scenario_plugins/hogs/hogs_scenario_plugin.py new file mode 100644 index 000000000..2ef0180ce --- /dev/null +++ b/krkn/scenario_plugins/hogs/hogs_scenario_plugin.py @@ -0,0 +1,142 @@ +import copy +import logging +import queue +import random +import re +import threading +import time + + +import yaml +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from krkn_lib.models.krkn import HogConfig, HogType +from krkn_lib.models.k8s import NodeResources +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.utils import get_random_string + +from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin + + +class HogsScenarioPlugin(AbstractScenarioPlugin): + def run(self, run_uuid: str, scenario: str, krkn_config: dict[str, any], lib_telemetry: KrknTelemetryOpenshift, + scenario_telemetry: ScenarioTelemetry) -> int: + try: + with open(scenario, "r") as f: + scenario = yaml.full_load(f) + scenario_config = HogConfig.from_yaml_dict(scenario) + has_selector = True + if not scenario_config.node_selector or not re.match("^.+=.*$", scenario_config.node_selector): + if scenario_config.node_selector: + logging.warning(f"node selector {scenario_config.node_selector} not in right format (key=value)") + node_selector = "" + else: + node_selector = scenario_config.node_selector + + available_nodes = lib_telemetry.get_lib_kubernetes().list_schedulable_nodes(node_selector) + if len(available_nodes) == 0: + raise Exception("no available nodes to schedule workload") + + if not has_selector: + # if selector not specified picks a random node between the available + available_nodes = [available_nodes[random.randint(0, len(available_nodes))]] + + if scenario_config.number_of_nodes and len(available_nodes) > scenario_config.number_of_nodes: + available_nodes = random.sample(available_nodes, scenario_config.number_of_nodes) + + exception_queue = queue.Queue() + self.run_scenario(scenario_config, lib_telemetry.get_lib_kubernetes(), available_nodes, exception_queue) + return 0 + except Exception as e: + logging.error(f"scenario exception: {e}") + return 1 + + def get_scenario_types(self) -> list[str]: + return ["hog_scenarios"] + + def run_scenario_worker(self, config: HogConfig, + lib_k8s: KrknKubernetes, node: str, + exception_queue: queue.Queue): + try: + if not config.workers: + config.workers = lib_k8s.get_node_cpu_count(node) + logging.info(f"[{node}] detected {config.workers} cpus for node {node}") + + logging.info(f"[{node}] workers number: {config.workers}") + + # using kubernetes.io/hostname = selector to + # precisely deploy each workload on each selected node + config.node_selector = f"kubernetes.io/hostname={node}" + pod_name = f"{config.type.value}-hog-{get_random_string(5)}" + node_resources_start = lib_k8s.get_node_resources_info(node) + lib_k8s.deploy_hog(pod_name, config) + start = time.time() + # waiting 3 seconds before starting sample collection + time.sleep(3) + node_resources_end = lib_k8s.get_node_resources_info(node) + + samples: list[NodeResources] = [] + avg_node_resources = NodeResources() + + while time.time() - start < config.duration-1: + samples.append(lib_k8s.get_node_resources_info(node)) + + max_wait = 30 + wait = 0 + logging.info(f"[{node}] waiting {max_wait} up to seconds pod: {pod_name} namespace: {config.namespace} to finish") + while lib_k8s.is_pod_running(pod_name, config.namespace): + if wait >= max_wait: + raise Exception(f"[{node}] hog workload pod: {pod_name} namespace: {config.namespace} " + f"didn't finish after {max_wait}") + time.sleep(1) + wait += 1 + continue + + logging.info(f"[{node}] deleting pod: {pod_name} namespace: {config.namespace}") + lib_k8s.delete_pod(pod_name, config.namespace) + + for resource in samples: + avg_node_resources.cpu += resource.cpu + avg_node_resources.memory += resource.memory + avg_node_resources.disk_space += resource.disk_space + + avg_node_resources.cpu = avg_node_resources.cpu/len(samples) + avg_node_resources.memory = avg_node_resources.memory / len(samples) + avg_node_resources.disk_space = avg_node_resources.disk_space / len(samples) + + if config.type == HogType.cpu: + logging.info(f"[{node}] detected cpu consumption: " + f"{(avg_node_resources.cpu / (config.workers * 1000000000)) * 100} %") + if config.type == HogType.memory: + logging.info(f"[{node}] detected memory increase: " + f"{avg_node_resources.memory / node_resources_start.memory * 100} %") + if config.type == HogType.io: + logging.info(f"[{node}] detected disk space allocated: " + f"{(avg_node_resources.disk_space - node_resources_end.disk_space) / 1024 / 1024} MB") + except Exception as e: + exception_queue.put(e) + + def run_scenario(self, config: HogConfig, + lib_k8s: KrknKubernetes, + available_nodes: list[str], + exception_queue: queue.Queue): + workers = [] + logging.info(f"running {config.type.value} hog scenario") + logging.info(f"targeting nodes: [{','.join(available_nodes)}]") + for node in available_nodes: + config_copy = copy.deepcopy(config) + worker = threading.Thread(target=self.run_scenario_worker, + args=(config_copy, lib_k8s, node, exception_queue)) + worker.daemon = True + worker.start() + workers.append(worker) + + for worker in workers: + worker.join() + + try: + while True: + exception = exception_queue.get_nowait() + raise exception + except queue.Empty: + pass diff --git a/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py b/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py index 0602dff76..f9552f5e3 100644 --- a/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/abstract_node_scenarios.py @@ -4,14 +4,16 @@ import krkn.invoke.command as runcommand import krkn.scenario_plugins.node_actions.common_node_functions as nodeaction from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus # krkn_lib class abstract_node_scenarios: kubecli: KrknKubernetes + affected_nodes_status: AffectedNodeStatus - def __init__(self, kubecli: KrknKubernetes): + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): self.kubecli = kubecli + self.affected_nodes_status = affected_nodes_status # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): @@ -28,6 +30,7 @@ def node_stop_start_scenario(self, instance_kill_count, node, timeout, duration) logging.info("Waiting for %s seconds before starting the node" % (duration)) time.sleep(duration) self.node_start_scenario(instance_kill_count, node, timeout) + self.affected_nodes_status.merge_affected_nodes() logging.info("node_stop_start_scenario has been successfully injected!") def helper_node_stop_start_scenario(self, instance_kill_count, node, timeout): @@ -61,13 +64,15 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): # Node scenario to stop the kubelet def stop_kubelet_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting stop_kubelet_scenario injection") logging.info("Stopping the kubelet of the node %s" % (node)) runcommand.run( "oc debug node/" + node + " -- chroot /host systemctl stop kubelet" ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + logging.info("The kubelet of the node %s has been stopped" % (node)) logging.info("stop_kubelet_scenario has been successfuly injected!") except Exception as e: @@ -77,17 +82,20 @@ def stop_kubelet_scenario(self, instance_kill_count, node, timeout): ) logging.error("stop_kubelet_scenario injection failed!") raise e + self.add_affected_node(affected_node) # Node scenario to stop and start the kubelet def stop_start_kubelet_scenario(self, instance_kill_count, node, timeout): logging.info("Starting stop_start_kubelet_scenario injection") self.stop_kubelet_scenario(instance_kill_count, node, timeout) self.node_reboot_scenario(instance_kill_count, node, timeout) + self.affected_nodes_status.merge_affected_nodes() logging.info("stop_start_kubelet_scenario has been successfully injected!") # Node scenario to restart the kubelet def restart_kubelet_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting restart_kubelet_scenario injection") logging.info("Restarting the kubelet of the node %s" % (node)) @@ -96,8 +104,8 @@ def restart_kubelet_scenario(self, instance_kill_count, node, timeout): + node + " -- chroot /host systemctl restart kubelet &" ) - nodeaction.wait_for_not_ready_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_not_ready_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli,affected_node) logging.info("The kubelet of the node %s has been restarted" % (node)) logging.info("restart_kubelet_scenario has been successfuly injected!") except Exception as e: @@ -107,6 +115,7 @@ def restart_kubelet_scenario(self, instance_kill_count, node, timeout): ) logging.error("restart_kubelet_scenario injection failed!") raise e + self.add_affected_node(affected_node) # Node scenario to crash the node def node_crash_scenario(self, instance_kill_count, node, timeout): diff --git a/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py b/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py index b9ce0f491..fa47241c0 100644 --- a/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/alibaba_node_scenarios.py @@ -18,7 +18,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class Alibaba: def __init__(self): @@ -161,8 +161,9 @@ def get_vm_status(self, instance_id): return None # Wait until the node instance is running - def wait_until_running(self, instance_id, timeout): + def wait_until_running(self, instance_id, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(instance_id) while status != "Running": status = self.get_vm_status(instance_id) @@ -174,11 +175,15 @@ def wait_until_running(self, instance_id, timeout): if time_counter >= timeout: logging.info("ECS %s is still not ready in allotted time" % instance_id) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True # Wait until the node instance is stopped - def wait_until_stopped(self, instance_id, timeout): + def wait_until_stopped(self, instance_id, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(instance_id) while status != "Stopped": status = self.get_vm_status(instance_id) @@ -192,10 +197,14 @@ def wait_until_stopped(self, instance_id, timeout): "Vm %s is still not stopped in allotted time" % instance_id ) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True # Wait until the node instance is terminated - def wait_until_released(self, instance_id, timeout): + def wait_until_released(self, instance_id, timeout, affected_node): + start_time = time.time() statuses = self.get_vm_status(instance_id) time_counter = 0 while statuses and statuses != "Released": @@ -210,17 +219,23 @@ def wait_until_released(self, instance_id, timeout): return False logging.info("ECS %s is released" % instance_id) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) return True # krkn_lib class alibaba_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.alibaba = Alibaba() + # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") vm_id = self.alibaba.get_instance_id(node) @@ -228,8 +243,8 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with instance ID: %s " % (node, vm_id) ) self.alibaba.start_instances(vm_id) - self.alibaba.wait_until_running(vm_id, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.alibaba.wait_until_running(vm_id, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with instance ID: %s is in running state" % node) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -239,10 +254,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_start_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") vm_id = self.alibaba.get_instance_id(node) @@ -250,9 +267,9 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with instance ID: %s " % (node, vm_id) ) self.alibaba.stop_instances(vm_id) - self.alibaba.wait_until_stopped(vm_id, timeout) + self.alibaba.wait_until_stopped(vm_id, timeout, affected_node) logging.info("Node with instance ID: %s is in stopped state" % vm_id) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -260,23 +277,25 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_stop_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Might need to stop and then release the instance # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info( "Starting node_termination_scenario injection by first stopping instance" ) vm_id = self.alibaba.get_instance_id(node) self.alibaba.stop_instances(vm_id) - self.alibaba.wait_until_stopped(vm_id, timeout) + self.alibaba.wait_until_stopped(vm_id, timeout, affected_node) logging.info( "Releasing the node %s with instance ID: %s " % (node, vm_id) ) self.alibaba.release_instance(vm_id) - self.alibaba.wait_until_released(vm_id, timeout) + self.alibaba.wait_until_released(vm_id, timeout, affected_node) logging.info("Node with instance ID: %s has been released" % node) logging.info( "node_termination_scenario has been successfully injected!" @@ -288,17 +307,19 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_termination_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") instance_id = self.alibaba.get_instance_id(node) logging.info("Rebooting the node with instance ID: %s " % (instance_id)) self.alibaba.reboot_instances(instance_id) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s has been rebooted" % (instance_id) ) @@ -310,3 +331,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_reboot_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/aws_node_scenarios.py b/krkn/scenario_plugins/node_actions/aws_node_scenarios.py index f4784506b..205869ba4 100644 --- a/krkn/scenario_plugins/node_actions/aws_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/aws_node_scenarios.py @@ -7,7 +7,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class AWS: def __init__(self): @@ -77,9 +77,13 @@ def reboot_instances(self, instance_id): # until a successful state is reached. An error is returned after 40 failed checks # Setting timeout for consistency with other cloud functions # Wait until the node instance is running - def wait_until_running(self, instance_id, timeout=600): + def wait_until_running(self, instance_id, timeout=600, affected_node=None): try: + start_time = time.time() self.boto_instance.wait_until_running(InstanceIds=[instance_id]) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True except Exception as e: logging.error( @@ -89,9 +93,13 @@ def wait_until_running(self, instance_id, timeout=600): return False # Wait until the node instance is stopped - def wait_until_stopped(self, instance_id, timeout=600): + def wait_until_stopped(self, instance_id, timeout=600, affected_node= None): try: + start_time = time.time() self.boto_instance.wait_until_stopped(InstanceIds=[instance_id]) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) return True except Exception as e: logging.error( @@ -101,9 +109,13 @@ def wait_until_stopped(self, instance_id, timeout=600): return False # Wait until the node instance is terminated - def wait_until_terminated(self, instance_id, timeout=600): + def wait_until_terminated(self, instance_id, timeout=600, affected_node= None): try: + start_time = time.time() self.boto_instance.wait_until_terminated(InstanceIds=[instance_id]) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) return True except Exception as e: logging.error( @@ -249,13 +261,14 @@ def get_volume_state(self, volume_id: str): # krkn_lib class aws_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.aws = AWS() # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") instance_id = self.aws.get_instance_id(node) @@ -263,8 +276,8 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with instance ID: %s " % (node, instance_id) ) self.aws.start_instances(instance_id) - self.aws.wait_until_running(instance_id) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.aws.wait_until_running(instance_id, affected_node=affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s is in running state" % (instance_id) ) @@ -277,10 +290,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") instance_id = self.aws.get_instance_id(node) @@ -288,11 +303,11 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with instance ID: %s " % (node, instance_id) ) self.aws.stop_instances(instance_id) - self.aws.wait_until_stopped(instance_id) + self.aws.wait_until_stopped(instance_id, affected_node=affected_node) logging.info( "Node with instance ID: %s is in stopped state" % (instance_id) ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node=affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -301,10 +316,12 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_termination_scenario injection") instance_id = self.aws.get_instance_id(node) @@ -313,7 +330,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): % (node, instance_id) ) self.aws.terminate_instances(instance_id) - self.aws.wait_until_terminated(instance_id) + self.aws.wait_until_terminated(instance_id, affected_node=affected_node) for _ in range(timeout): if node not in self.kubecli.list_nodes(): break @@ -332,10 +349,12 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): logging.error("node_termination_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection" + str(node)) instance_id = self.aws.get_instance_id(node) @@ -343,8 +362,8 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with instance ID: %s " % (node, instance_id) ) self.aws.reboot_instances(instance_id) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s has been rebooted" % (instance_id) ) @@ -357,6 +376,7 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Get volume attachment info def get_disk_attachment_info(self, instance_kill_count, node): diff --git a/krkn/scenario_plugins/node_actions/az_node_scenarios.py b/krkn/scenario_plugins/node_actions/az_node_scenarios.py index 6cad8c12a..156a4bb40 100644 --- a/krkn/scenario_plugins/node_actions/az_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/az_node_scenarios.py @@ -8,7 +8,7 @@ from azure.mgmt.compute import ComputeManagementClient from azure.identity import DefaultAzureCredential from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class Azure: def __init__(self): @@ -18,8 +18,11 @@ def __init__(self): logging.info("credential " + str(credentials)) # az_account = runcommand.invoke("az account list -o yaml") # az_account_yaml = yaml.safe_load(az_account, Loader=yaml.FullLoader) + logger = logging.getLogger("azure") + logger.setLevel(logging.WARNING) subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID") - self.compute_client = ComputeManagementClient(credentials, subscription_id) + self.compute_client = ComputeManagementClient(credentials, subscription_id,logging=logger) + # Get the instance ID of the node def get_instance_id(self, node_name): @@ -90,8 +93,9 @@ def get_vm_status(self, resource_group, vm_name): return status # Wait until the node instance is running - def wait_until_running(self, resource_group, vm_name, timeout): + def wait_until_running(self, resource_group, vm_name, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(resource_group, vm_name) while status and status.code != "PowerState/running": status = self.get_vm_status(resource_group, vm_name) @@ -101,11 +105,15 @@ def wait_until_running(self, resource_group, vm_name, timeout): if time_counter >= timeout: logging.info("Vm %s is still not ready in allotted time" % vm_name) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) return True # Wait until the node instance is stopped - def wait_until_stopped(self, resource_group, vm_name, timeout): + def wait_until_stopped(self, resource_group, vm_name, timeout, affected_node): time_counter = 0 + start_time = time.time() status = self.get_vm_status(resource_group, vm_name) while status and status.code != "PowerState/stopped": status = self.get_vm_status(resource_group, vm_name) @@ -115,10 +123,14 @@ def wait_until_stopped(self, resource_group, vm_name, timeout): if time_counter >= timeout: logging.info("Vm %s is still not stopped in allotted time" % vm_name) return False + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) return True # Wait until the node instance is terminated - def wait_until_terminated(self, resource_group, vm_name, timeout): + def wait_until_terminated(self, resource_group, vm_name, timeout, affected_node): + start_time = time.time() statuses = self.compute_client.virtual_machines.instance_view( resource_group, vm_name ).statuses[0] @@ -137,29 +149,35 @@ def wait_until_terminated(self, resource_group, vm_name, timeout): return False except Exception: logging.info("Vm %s is terminated" % vm_name) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) return True # krkn_lib class azure_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) logging.info("init in azure") self.azure = Azure() + # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") vm_name, resource_group = self.azure.get_instance_id(node) + logging.info( "Starting the node %s with instance ID: %s " % (vm_name, resource_group) ) self.azure.start_instances(resource_group, vm_name) - self.azure.wait_until_running(resource_group, vm_name, timeout) - nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli) + self.azure.wait_until_running(resource_group, vm_name, timeout, affected_node=affected_node) + nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli, affected_node) logging.info("Node with instance ID: %s is in running state" % node) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -170,10 +188,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") vm_name, resource_group = self.azure.get_instance_id(node) @@ -182,9 +202,9 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): % (vm_name, resource_group) ) self.azure.stop_instances(resource_group, vm_name) - self.azure.wait_until_stopped(resource_group, vm_name, timeout) + self.azure.wait_until_stopped(resource_group, vm_name, timeout, affected_node=affected_node) logging.info("Node with instance ID: %s is in stopped state" % vm_name) - nodeaction.wait_for_unknown_status(vm_name, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(vm_name, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -193,19 +213,22 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_termination_scenario injection") + affected_node = AffectedNode(node) vm_name, resource_group = self.azure.get_instance_id(node) logging.info( "Terminating the node %s with instance ID: %s " % (vm_name, resource_group) ) self.azure.terminate_instances(resource_group, vm_name) - self.azure.wait_until_terminated(resource_group, vm_name, timeout) + self.azure.wait_until_terminated(resource_group, vm_name, timeout, affected_node) for _ in range(timeout): if vm_name not in self.kubecli.list_nodes(): break @@ -224,10 +247,13 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): logging.error("node_termination_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) + # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") vm_name, resource_group = self.azure.get_instance_id(node) @@ -235,9 +261,11 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with instance ID: %s " % (vm_name, resource_group) ) + self.azure.reboot_instances(resource_group, vm_name) - nodeaction.wait_for_unknown_status(vm_name, timeout, self.kubecli) - nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli) + + nodeaction.wait_for_ready_status(vm_name, timeout, self.kubecli, affected_node) + logging.info("Node with instance ID: %s has been rebooted" % (vm_name)) logging.info("node_reboot_scenario has been successfully injected!") except Exception as e: @@ -248,3 +276,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/bm_node_scenarios.py b/krkn/scenario_plugins/node_actions/bm_node_scenarios.py index 27f7d35b2..4a9a4eb1e 100644 --- a/krkn/scenario_plugins/node_actions/bm_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/bm_node_scenarios.py @@ -9,7 +9,7 @@ import time import traceback from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class BM: def __init__(self, bm_info, user, passwd): @@ -127,8 +127,8 @@ def wait_until_stopped(self, bmc_addr, node_name): # krkn_lib class bm_node_scenarios(abstract_node_scenarios): - def __init__(self, bm_info, user, passwd, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, bm_info, user, passwd, kubecli: KrknKubernetes,affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.bm = BM(bm_info, user, passwd) # Node scenario to start the node @@ -159,6 +159,7 @@ def node_start_scenario(self, instance_kill_count, node, timeout): # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") bmc_addr = self.bm.get_bmc_addr(node) @@ -166,11 +167,11 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with bmc address: %s " % (node, bmc_addr) ) self.bm.stop_instances(bmc_addr, node) - self.bm.wait_until_stopped(bmc_addr, node) + self.bm.wait_until_stopped(bmc_addr, node, affected_node) logging.info( "Node with bmc address: %s is in stopped state" % (bmc_addr) ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -179,6 +180,7 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_stop_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): @@ -187,6 +189,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") bmc_addr = self.bm.get_bmc_addr(node) @@ -195,8 +198,8 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with bmc address: %s " % (node, bmc_addr) ) self.bm.reboot_instances(bmc_addr, node) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with bmc address: %s has been rebooted" % (bmc_addr)) logging.info("node_reboot_scenario has been successfuly injected!") except Exception as e: @@ -208,3 +211,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): traceback.print_exc() logging.error("node_reboot_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/common_node_functions.py b/krkn/scenario_plugins/node_actions/common_node_functions.py index ddd788076..43424f41d 100644 --- a/krkn/scenario_plugins/node_actions/common_node_functions.py +++ b/krkn/scenario_plugins/node_actions/common_node_functions.py @@ -1,9 +1,13 @@ +import datetime import time import random import logging import paramiko +from krkn_lib.models.k8s import AffectedNode import krkn.invoke.command as runcommand from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus +from krkn_lib.models.k8s import AffectedNode node_general = False @@ -40,23 +44,25 @@ def get_node(label_selector, instance_kill_count, kubecli: KrknKubernetes): nodes.remove(node_to_add) return nodes_to_return - # krkn_lib # Wait until the node status becomes Ready -def wait_for_ready_status(node, timeout, kubecli: KrknKubernetes): - kubecli.watch_node_status(node, "True", timeout) - +def wait_for_ready_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode = None): + affected_node = kubecli.watch_node_status(node, "True", timeout, affected_node) + return affected_node + # krkn_lib # Wait until the node status becomes Not Ready -def wait_for_not_ready_status(node, timeout, kubecli: KrknKubernetes): - kubecli.watch_node_status(node, "False", timeout) - +def wait_for_not_ready_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode = None): + affected_node = kubecli.watch_node_status(node, "False", timeout, affected_node) + return affected_node + # krkn_lib # Wait until the node status becomes Unknown -def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes): - kubecli.watch_node_status(node, "Unknown", timeout) +def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_node: AffectedNode = None): + affected_node = kubecli.watch_node_status(node, "Unknown", timeout, affected_node) + return affected_node # Get the ip of the cluster node diff --git a/krkn/scenario_plugins/node_actions/docker_node_scenarios.py b/krkn/scenario_plugins/node_actions/docker_node_scenarios.py index a2cdf116e..2e050b6ef 100644 --- a/krkn/scenario_plugins/node_actions/docker_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/docker_node_scenarios.py @@ -5,7 +5,7 @@ import logging import docker from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class Docker: def __init__(self): @@ -38,13 +38,14 @@ def terminate_instances(self, node_name): class docker_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.docker = Docker() # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") container_id = self.docker.get_container_id(node) @@ -52,7 +53,7 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with container ID: %s " % (node, container_id) ) self.docker.start_instances(node) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with container ID: %s is in running state" % (container_id) ) @@ -64,10 +65,12 @@ def node_start_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_start_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") container_id = self.docker.get_container_id(node) @@ -78,7 +81,7 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.info( "Node with container ID: %s is in stopped state" % (container_id) ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -86,6 +89,7 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_stop_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): @@ -113,6 +117,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") container_id = self.docker.get_container_id(node) @@ -121,8 +126,8 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): % (node, container_id) ) self.docker.reboot_instances(node) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with container ID: %s has been rebooted" % (container_id) ) @@ -134,3 +139,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): ) logging.error("node_reboot_scenario injection failed!") raise e + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py b/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py index d982a5f83..ec39538d6 100644 --- a/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/gcp_node_scenarios.py @@ -7,7 +7,7 @@ ) from google.cloud import compute_v1 from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class GCP: def __init__(self): @@ -173,10 +173,10 @@ def get_instance_status(self, instance_id, expected_status, timeout): "Failed to get status of instance %s. Encountered following " "exception: %s." % (instance_id, e) ) - raise RuntimeError() if instance_status == expected_status: + logging.info('status matches, end' + str(expected_status) + str(instance_status)) return True time.sleep(sleeper) i += sleeper @@ -191,28 +191,45 @@ def wait_until_suspended(self, instance_id, timeout): return self.get_instance_status(instance_id, "SUSPENDED", timeout) # Wait until the node instance is running - def wait_until_running(self, instance_id, timeout): - return self.get_instance_status(instance_id, "RUNNING", timeout) + def wait_until_running(self, instance_id, timeout, affected_node): + start_time = time.time() + instance_status = self.get_instance_status(instance_id, "RUNNING", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) + return instance_status # Wait until the node instance is stopped - def wait_until_stopped(self, instance_id, timeout): + def wait_until_stopped(self, instance_id, timeout, affected_node): # In GCP, the next state after STOPPING is TERMINATED - return self.get_instance_status(instance_id, "TERMINATED", timeout) + start_time = time.time() + instance_status = self.get_instance_status(instance_id, "TERMINATED", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) + return instance_status # Wait until the node instance is terminated - def wait_until_terminated(self, instance_id, timeout): - return self.get_instance_status(instance_id, "TERMINATED", timeout) + def wait_until_terminated(self, instance_id, timeout, affected_node): + start_time = time.time() + instance_status = self.get_instance_status(instance_id, "TERMINATED", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("terminated", end_time - start_time) + return instance_status # krkn_lib class gcp_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.gcp = GCP() + print("selfkeys" + str(vars(self))) # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") instance = self.gcp.get_node_instance(node) @@ -221,8 +238,8 @@ def node_start_scenario(self, instance_kill_count, node, timeout): "Starting the node %s with instance ID: %s " % (node, instance_id) ) self.gcp.start_instances(instance_id) - self.gcp.wait_until_running(instance_id, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.gcp.wait_until_running(instance_id, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s is in running state" % instance_id ) @@ -235,10 +252,13 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + logging.info("started affected node" + str(affected_node.to_json())) + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") instance = self.gcp.get_node_instance(node) @@ -247,11 +267,11 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): "Stopping the node %s with instance ID: %s " % (node, instance_id) ) self.gcp.stop_instances(instance_id) - self.gcp.wait_until_stopped(instance_id, timeout) + self.gcp.wait_until_stopped(instance_id, timeout, affected_node=affected_node) logging.info( "Node with instance ID: %s is in stopped state" % instance_id ) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -260,10 +280,13 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + logging.info("stopedd affected node" + str(affected_node.to_json())) + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to terminate the node def node_termination_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_termination_scenario injection") instance = self.gcp.get_node_instance(node) @@ -273,7 +296,7 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): % (node, instance_id) ) self.gcp.terminate_instances(instance_id) - self.gcp.wait_until_terminated(instance_id, timeout) + self.gcp.wait_until_terminated(instance_id, timeout, affected_node=affected_node) for _ in range(timeout): if node not in self.kubecli.list_nodes(): break @@ -292,10 +315,12 @@ def node_termination_scenario(self, instance_kill_count, node, timeout): logging.error("node_termination_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") instance = self.gcp.get_node_instance(node) @@ -304,8 +329,9 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): "Rebooting the node %s with instance ID: %s " % (node, instance_id) ) self.gcp.reboot_instances(instance_id) - self.gcp.wait_until_running(instance_id, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + self.gcp.wait_until_running(instance_id, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info( "Node with instance ID: %s has been rebooted" % instance_id ) @@ -318,3 +344,4 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) diff --git a/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py b/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py index c0a7ac8b9..151de42d9 100644 --- a/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/general_cloud_node_scenarios.py @@ -3,7 +3,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNodeStatus class GENERAL: def __init__(self): @@ -12,8 +12,8 @@ def __init__(self): # krkn_lib class general_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): - super().__init__(kubecli) + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): + super().__init__(kubecli, affected_nodes_status) self.general = GENERAL() # Node scenario to start the node diff --git a/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py b/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py index f5b917498..739b48d7a 100644 --- a/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py +++ b/krkn/scenario_plugins/node_actions/node_actions_scenario_plugin.py @@ -6,6 +6,7 @@ import yaml from krkn_lib.k8s import KrknKubernetes from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.models.k8s import AffectedNodeStatus from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift from krkn_lib.utils import get_yaml_item_value, log_exception @@ -49,6 +50,7 @@ def run( node_scenario, node_scenario_object, lib_telemetry.get_lib_kubernetes(), + scenario_telemetry, ) end_time = int(time.time()) cerberus.get_status(krkn_config, start_time, end_time) @@ -59,28 +61,29 @@ def run( return 0 def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): + affected_nodes_status = AffectedNodeStatus() if ( "cloud_type" not in node_scenario.keys() or node_scenario["cloud_type"] == "generic" ): global node_general node_general = True - return general_node_scenarios(kubecli) + return general_node_scenarios(kubecli, affected_nodes_status) if node_scenario["cloud_type"].lower() == "aws": - return aws_node_scenarios(kubecli) + return aws_node_scenarios(kubecli, affected_nodes_status) elif node_scenario["cloud_type"].lower() == "gcp": - return gcp_node_scenarios(kubecli) + return gcp_node_scenarios(kubecli, affected_nodes_status) elif node_scenario["cloud_type"].lower() == "openstack": from krkn.scenario_plugins.node_actions.openstack_node_scenarios import ( openstack_node_scenarios, ) - return openstack_node_scenarios(kubecli) + return openstack_node_scenarios(kubecli, affected_nodes_status) elif ( node_scenario["cloud_type"].lower() == "azure" or node_scenario["cloud_type"] == "az" ): - return azure_node_scenarios(kubecli) + return azure_node_scenarios(kubecli, affected_nodes_status) elif ( node_scenario["cloud_type"].lower() == "alibaba" or node_scenario["cloud_type"] == "alicloud" @@ -89,7 +92,7 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): alibaba_node_scenarios, ) - return alibaba_node_scenarios(kubecli) + return alibaba_node_scenarios(kubecli, affected_nodes_status) elif node_scenario["cloud_type"].lower() == "bm": from krkn.scenario_plugins.node_actions.bm_node_scenarios import ( bm_node_scenarios, @@ -100,9 +103,10 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): node_scenario.get("bmc_user", None), node_scenario.get("bmc_password", None), kubecli, + affected_nodes_status ) elif node_scenario["cloud_type"].lower() == "docker": - return docker_node_scenarios(kubecli) + return docker_node_scenarios(kubecli, affected_nodes_status) else: logging.error( "Cloud type " @@ -120,7 +124,7 @@ def get_node_scenario_object(self, node_scenario, kubecli: KrknKubernetes): ) def inject_node_scenario( - self, action, node_scenario, node_scenario_object, kubecli: KrknKubernetes + self, action, node_scenario, node_scenario_object, kubecli: KrknKubernetes, scenario_telemetry: ScenarioTelemetry ): # Get the node scenario configurations for setting nodes @@ -138,17 +142,18 @@ def inject_node_scenario( nodes = common_node_functions.get_node( label_selector, instance_kill_count, kubecli ) - + # GCP api doesn't support multiprocessing calls, will only actually run 1 - if parallel_nodes and node_scenario['cloud_type'].lower() != "gcp": + if parallel_nodes: self.multiprocess_nodes(nodes, node_scenario_object, action, node_scenario) else: for single_node in nodes: self.run_node(single_node, node_scenario_object, action, node_scenario) + affected_nodes_status = node_scenario_object.affected_nodes_status + scenario_telemetry.affected_nodes.extend(affected_nodes_status.affected_nodes) def multiprocess_nodes(self, nodes, node_scenario_object, action, node_scenario): try: - logging.info("parallely call to nodes") # pool object with number of element pool = ThreadPool(processes=len(nodes)) @@ -160,7 +165,6 @@ def multiprocess_nodes(self, nodes, node_scenario_object, action, node_scenario) def run_node(self, single_node, node_scenario_object, action, node_scenario): - logging.info("action" + str(action)) # Get the scenario specifics for running action nodes run_kill_count = get_yaml_item_value(node_scenario, "runs", 1) if action in ("node_stop_start_scenario", "node_disk_detach_attach_scenario"): @@ -248,5 +252,6 @@ def run_node(self, single_node, node_scenario_object, action, node_scenario): % action ) + def get_scenario_types(self) -> list[str]: return ["node_scenarios"] diff --git a/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py b/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py index f7ce8563e..8e5466ada 100644 --- a/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py +++ b/krkn/scenario_plugins/node_actions/openstack_node_scenarios.py @@ -7,7 +7,7 @@ abstract_node_scenarios, ) from krkn_lib.k8s import KrknKubernetes - +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus class OPENSTACKCLOUD: def __init__(self): @@ -56,12 +56,22 @@ def reboot_instances(self, node): raise RuntimeError() # Wait until the node instance is running - def wait_until_running(self, node, timeout): - return self.get_instance_status(node, "ACTIVE", timeout) + def wait_until_running(self, node, timeout, affected_node): + start_time = time.time() + instance_status= self.get_instance_status(node, "ACTIVE", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("running", end_time - start_time) + return instance_status # Wait until the node instance is stopped - def wait_until_stopped(self, node, timeout): - return self.get_instance_status(node, "SHUTOFF", timeout) + def wait_until_stopped(self, node, timeout, affected_node): + start_time = time.time() + instance_status = self.get_instance_status(node, "SHUTOFF", timeout) + end_time = time.time() + if affected_node: + affected_node.set_affected_node_status("stopped", end_time - start_time) + return instance_status # Get instance status def get_instance_status(self, node, expected_status, timeout): @@ -107,19 +117,21 @@ def get_openstack_nodename(self, os_node_ip): # krkn_lib class openstack_node_scenarios(abstract_node_scenarios): - def __init__(self, kubecli: KrknKubernetes): + def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus ): + super().__init__(kubecli, affected_nodes_status) self.openstackcloud = OPENSTACKCLOUD() - + # Node scenario to start the node def node_start_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_start_scenario injection") logging.info("Starting the node %s" % (node)) openstack_node_name = self.openstackcloud.get_instance_id(node) self.openstackcloud.start_instances(openstack_node_name) - self.openstackcloud.wait_until_running(openstack_node_name, timeout) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + self.openstackcloud.wait_until_running(openstack_node_name, timeout, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with instance ID: %s is in running state" % (node)) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -130,18 +142,20 @@ def node_start_scenario(self, instance_kill_count, node, timeout): logging.error("node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def node_stop_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_stop_scenario injection") logging.info("Stopping the node %s " % (node)) openstack_node_name = self.openstackcloud.get_instance_id(node) self.openstackcloud.stop_instances(openstack_node_name) - self.openstackcloud.wait_until_stopped(openstack_node_name, timeout) + self.openstackcloud.wait_until_stopped(openstack_node_name, timeout, affected_node) logging.info("Node with instance name: %s is in stopped state" % (node)) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_not_ready_status(node, timeout, self.kubecli, affected_node) except Exception as e: logging.error( "Failed to stop node instance. Encountered following exception: %s. " @@ -150,17 +164,19 @@ def node_stop_scenario(self, instance_kill_count, node, timeout): logging.error("node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to reboot the node def node_reboot_scenario(self, instance_kill_count, node, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node) try: logging.info("Starting node_reboot_scenario injection") logging.info("Rebooting the node %s" % (node)) openstack_node_name = self.openstackcloud.get_instance_id(node) self.openstackcloud.reboot_instances(openstack_node_name) - nodeaction.wait_for_unknown_status(node, timeout, self.kubecli) - nodeaction.wait_for_ready_status(node, timeout, self.kubecli) + nodeaction.wait_for_unknown_status(node, timeout, self.kubecli, affected_node) + nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node) logging.info("Node with instance name: %s has been rebooted" % (node)) logging.info("node_reboot_scenario has been successfuly injected!") except Exception as e: @@ -171,10 +187,12 @@ def node_reboot_scenario(self, instance_kill_count, node, timeout): logging.error("node_reboot_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to start the node def helper_node_start_scenario(self, instance_kill_count, node_ip, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node_ip) try: logging.info("Starting helper_node_start_scenario injection") openstack_node_name = self.openstackcloud.get_openstack_nodename( @@ -182,7 +200,7 @@ def helper_node_start_scenario(self, instance_kill_count, node_ip, timeout): ) logging.info("Starting the helper node %s" % (openstack_node_name)) self.openstackcloud.start_instances(openstack_node_name) - self.openstackcloud.wait_until_running(openstack_node_name, timeout) + self.openstackcloud.wait_until_running(openstack_node_name, timeout, affected_node) logging.info("Helper node with IP: %s is in running state" % (node_ip)) logging.info("node_start_scenario has been successfully injected!") except Exception as e: @@ -193,10 +211,12 @@ def helper_node_start_scenario(self, instance_kill_count, node_ip, timeout): logging.error("helper_node_start_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) # Node scenario to stop the node def helper_node_stop_scenario(self, instance_kill_count, node_ip, timeout): for _ in range(instance_kill_count): + affected_node = AffectedNode(node_ip) try: logging.info("Starting helper_node_stop_scenario injection") openstack_node_name = self.openstackcloud.get_openstack_nodename( @@ -204,7 +224,7 @@ def helper_node_stop_scenario(self, instance_kill_count, node_ip, timeout): ) logging.info("Stopping the helper node %s " % (openstack_node_name)) self.openstackcloud.stop_instances(openstack_node_name) - self.openstackcloud.wait_until_stopped(openstack_node_name, timeout) + self.openstackcloud.wait_until_stopped(openstack_node_name, timeout, affected_node) logging.info("Helper node with IP: %s is in stopped state" % (node_ip)) except Exception as e: logging.error( @@ -214,6 +234,7 @@ def helper_node_stop_scenario(self, instance_kill_count, node_ip, timeout): logging.error("helper_node_stop_scenario injection failed!") raise RuntimeError() + self.affected_nodes_status.affected_nodes.append(affected_node) def helper_node_service_status(self, node_ip, service, ssh_private_key, timeout): try: diff --git a/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py b/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py index b81906ff5..8c7ec751b 100644 --- a/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py +++ b/krkn/scenario_plugins/shut_down/shut_down_scenario_plugin.py @@ -15,6 +15,7 @@ from krkn.scenario_plugins.node_actions.openstack_node_scenarios import OPENSTACKCLOUD from krkn.scenario_plugins.native.node_scenarios.ibmcloud_plugin import IbmCloud +from krkn_lib.models.k8s import AffectedNodeStatus, AffectedNode class ShutDownScenarioPlugin(AbstractScenarioPlugin): def run( @@ -32,9 +33,12 @@ def run( "cluster_shut_down_scenario" ] start_time = int(time.time()) + affected_nodes_status = AffectedNodeStatus() self.cluster_shut_down( - shut_down_config_scenario, lib_telemetry.get_lib_kubernetes() + shut_down_config_scenario, lib_telemetry.get_lib_kubernetes(), affected_nodes_status ) + + scenario_telemetry.affected_nodes = affected_nodes_status end_time = int(time.time()) cerberus.publish_kraken_status(krkn_config, [], start_time, end_time) return 0 @@ -72,7 +76,7 @@ def multiprocess_nodes(self, cloud_object_function, nodes, processes=0): # Inject the cluster shut down scenario # krkn_lib - def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): + def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus): runs = shut_down_config["runs"] shut_down_duration = shut_down_config["shut_down_duration"] cloud_type = shut_down_config["cloud_type"] @@ -101,6 +105,7 @@ def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): node_id = [] for node in nodes: instance_id = cloud_object.get_instance_id(node) + affected_nodes_status.affected_nodes.append(AffectedNode(node)) node_id.append(instance_id) logging.info("node id list " + str(node_id)) for _ in range(runs): @@ -108,14 +113,18 @@ def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): stopping_nodes = set(node_id) self.multiprocess_nodes(cloud_object.stop_instances, node_id, processes) stopped_nodes = stopping_nodes.copy() + start_time = time.time() while len(stopping_nodes) > 0: for node in stopping_nodes: + affected_node = affected_nodes_status.get_affected_node_index(node) + # need to add in time that is passing while waiting for other nodes to be stopped + affected_node.set_cloud_stopping_time(time.time() - start_time) if type(node) is tuple: node_status = cloud_object.wait_until_stopped( - node[1], node[0], timeout + node[1], node[0], timeout, affected_node ) else: - node_status = cloud_object.wait_until_stopped(node, timeout) + node_status = cloud_object.wait_until_stopped(node, timeout, affected_node) # Only want to remove node from stopping list # when fully stopped/no error @@ -132,16 +141,20 @@ def cluster_shut_down(self, shut_down_config, kubecli: KrknKubernetes): logging.info("Restarting the nodes") restarted_nodes = set(node_id) self.multiprocess_nodes(cloud_object.start_instances, node_id, processes) + start_time = time.time() logging.info("Wait for each node to be running again") not_running_nodes = restarted_nodes.copy() while len(not_running_nodes) > 0: for node in not_running_nodes: + affected_node = affected_nodes_status.get_affected_node_index(node) + # need to add in time that is passing while waiting for other nodes to be running + affected_node.set_cloud_running_time(time.time() - start_time) if type(node) is tuple: node_status = cloud_object.wait_until_running( - node[1], node[0], timeout + node[1], node[0], timeout, affected_node ) else: - node_status = cloud_object.wait_until_running(node, timeout) + node_status = cloud_object.wait_until_running(node, timeout, affected_node) if node_status: restarted_nodes.remove(node) not_running_nodes = restarted_nodes.copy() diff --git a/requirements.txt b/requirements.txt index 54688893e..388f17586 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ aliyun-python-sdk-core==2.13.36 aliyun-python-sdk-ecs==4.24.25 arcaflow-plugin-sdk==0.14.0 -arcaflow==0.19.1 boto3==1.28.61 azure-identity==1.16.1 azure-keyvault==4.2.0 @@ -16,7 +15,7 @@ google-cloud-compute==1.22.0 ibm_cloud_sdk_core==3.18.0 ibm_vpc==0.20.0 jinja2==3.1.5 -krkn-lib==4.0.4 +krkn-lib==4.0.7 lxml==5.1.0 kubernetes==28.1.0 numpy==1.26.4 diff --git a/run_kraken.py b/run_kraken.py index 03677cdee..3e143796a 100644 --- a/run_kraken.py +++ b/run_kraken.py @@ -31,6 +31,10 @@ ScenarioPluginNotFound, ) +# removes TripleDES warning +import warnings +warnings.filterwarnings(action='ignore', module='.*paramiko.*') + report_file = "" diff --git a/scenarios/kube/cpu-hog.yml b/scenarios/kube/cpu-hog.yml new file mode 100644 index 000000000..0601efa7d --- /dev/null +++ b/scenarios/kube/cpu-hog.yml @@ -0,0 +1,9 @@ +duration: 60 +workers: '' # leave it empty '' node cpu auto-detection +hog-type: cpu +image: quay.io/krkn-chaos/krkn-hog +namespace: default +cpu-load-percentage: 90 +cpu-method: all +node-selector: "node-role.kubernetes.io/worker=" +number-of-nodes: 2 diff --git a/scenarios/kube/cpu-hog/config.yaml b/scenarios/kube/cpu-hog/config.yaml deleted file mode 100644 index 263cf5032..000000000 --- a/scenarios/kube/cpu-hog/config.yaml +++ /dev/null @@ -1,12 +0,0 @@ ---- -deployers: - image: - connection: {} - deployer_name: kubernetes -log: - level: error -logged_outputs: - error: - level: error - success: - level: debug diff --git a/scenarios/kube/cpu-hog/input.yaml b/scenarios/kube/cpu-hog/input.yaml deleted file mode 100644 index 28152c428..000000000 --- a/scenarios/kube/cpu-hog/input.yaml +++ /dev/null @@ -1,13 +0,0 @@ -input_list: - - cpu_count: 1 - cpu_load_percentage: 80 - cpu_method: all - duration: 30 - kubeconfig: '' - namespace: default - # set the node selector as a key-value pair eg. - # node_selector: - # kubernetes.io/hostname: kind-worker2 - node_selector: {} - - diff --git a/scenarios/kube/cpu-hog/sub-workflow.yaml b/scenarios/kube/cpu-hog/sub-workflow.yaml deleted file mode 100644 index 73c9fd1dd..000000000 --- a/scenarios/kube/cpu-hog/sub-workflow.yaml +++ /dev/null @@ -1,98 +0,0 @@ -version: v0.2.0 -input: - root: SubRootObject - objects: - SubRootObject: - id: SubRootObject - properties: - kubeconfig: - display: - description: The complete kubeconfig file as a string - name: Kubeconfig file contents - type: - type_id: string - required: true - namespace: - display: - description: The namespace where the container will be deployed - name: Namespace - type: - type_id: string - required: true - node_selector: - display: - description: kubernetes node name where the plugin must be deployed - type: - type_id: map - values: - type_id: string - keys: - type_id: string - required: true - duration: - display: - name: duration the scenario expressed in seconds - description: stop stress test after T seconds. One can also specify the units of time in - seconds, minutes, hours, days or years with the suffix s, m, h, d or y - type: - type_id: integer - required: true - cpu_count: - display: - description: Number of CPU cores to be used (0 means all) - name: number of CPUs - type: - type_id: integer - required: true - cpu_method: - display: - description: CPU stress method - name: fine grained control of which cpu stressors to use (ackermann, cfloat etc.) - type: - type_id: string - required: true - cpu_load_percentage: - display: - description: load CPU by percentage - name: CPU load - type: - type_id: integer - required: true - -steps: - kubeconfig: - plugin: - src: quay.io/arcalot/arcaflow-plugin-kubeconfig:0.2.0 - deployment_type: image - input: - kubeconfig: !expr $.input.kubeconfig - stressng: - plugin: - src: quay.io/arcalot/arcaflow-plugin-stressng:0.6.0 - deployment_type: image - step: workload - input: - cleanup: "true" - - timeout: !expr $.input.duration - stressors: - - stressor: cpu - workers: !expr $.input.cpu_count - cpu-method: "all" - cpu-load: !expr $.input.cpu_load_percentage - deploy: - deployer_name: kubernetes - connection: !expr $.steps.kubeconfig.outputs.success.connection - pod: - metadata: - namespace: !expr $.input.namespace - labels: - arcaflow: stressng - spec: - nodeSelector: !expr $.input.node_selector - pluginContainer: - imagePullPolicy: Always -outputs: - success: - stressng: !expr $.steps.stressng.outputs.success - diff --git a/scenarios/kube/cpu-hog/workflow.yaml b/scenarios/kube/cpu-hog/workflow.yaml deleted file mode 100644 index ca33a0fc5..000000000 --- a/scenarios/kube/cpu-hog/workflow.yaml +++ /dev/null @@ -1,25 +0,0 @@ -version: v0.2.0 -input: - root: RootObject - objects: - RootObject: - id: RootObject - properties: - input_list: - type: - type_id: list - items: - id: SubRootObject - type_id: ref - namespace: $.steps.workload_loop.execute.inputs.items - -steps: - workload_loop: - kind: foreach - items: !expr $.input.input_list - workflow: sub-workflow.yaml - parallelism: 1000 -outputs: - success: - workloads: !expr $.steps.workload_loop.outputs.success.data - diff --git a/scenarios/kube/io-hog.yml b/scenarios/kube/io-hog.yml new file mode 100644 index 000000000..bca41c2aa --- /dev/null +++ b/scenarios/kube/io-hog.yml @@ -0,0 +1,14 @@ +duration: 30 +workers: '' # leave it empty '' node cpu auto-detection +hog-type: io +image: quay.io/krkn-chaos/krkn-hog +namespace: default +io-block-size: 1m +io-write-bytes: 1g +io-target-pod-folder: /hog-data +io-target-pod-volume: + name: node-volume + hostPath: + path: /root # a path writable by kubelet in the root filesystem of the node +node-selector: "node-role.kubernetes.io/worker=" +number-of-nodes: '' \ No newline at end of file diff --git a/scenarios/kube/io-hog/config.yaml b/scenarios/kube/io-hog/config.yaml deleted file mode 100644 index 431661a00..000000000 --- a/scenarios/kube/io-hog/config.yaml +++ /dev/null @@ -1,11 +0,0 @@ -deployers: - image: - connection: {} - deployer_name: kubernetes -log: - level: error -logged_outputs: - error: - level: error - success: - level: debug diff --git a/scenarios/kube/io-hog/input.yaml b/scenarios/kube/io-hog/input.yaml deleted file mode 100644 index bbb776cdc..000000000 --- a/scenarios/kube/io-hog/input.yaml +++ /dev/null @@ -1,16 +0,0 @@ -input_list: -- duration: 30 - io_block_size: 1m - io_workers: 1 - io_write_bytes: 10m - kubeconfig: '' - namespace: default - # set the node selector as a key-value pair eg. - # node_selector: - # kubernetes.io/hostname: kind-worker2 - node_selector: {} - target_pod_folder: /hog-data - target_pod_volume: - hostPath: - path: /tmp - name: node-volume diff --git a/scenarios/kube/io-hog/sub-workflow.yaml b/scenarios/kube/io-hog/sub-workflow.yaml deleted file mode 100644 index c0f5dd0d9..000000000 --- a/scenarios/kube/io-hog/sub-workflow.yaml +++ /dev/null @@ -1,141 +0,0 @@ -version: v0.2.0 -input: - root: SubRootObject - objects: - hostPath: - id: HostPathVolumeSource - properties: - path: - type: - type_id: string - Volume: - id: Volume - properties: - name: - type: - type_id: string - hostPath: - type: - id: hostPath - type_id: ref - SubRootObject: - id: SubRootObject - properties: - kubeconfig: - display: - description: The complete kubeconfig file as a string - name: Kubeconfig file contents - type: - type_id: string - required: true - namespace: - display: - description: The namespace where the container will be deployed - name: Namespace - type: - type_id: string - required: true - node_selector: - display: - description: kubernetes node name where the plugin must be deployed - type: - type_id: map - values: - type_id: string - keys: - type_id: string - required: true - duration: - display: - name: duration the scenario expressed in seconds - description: stop stress test after T seconds. One can also specify the units of time in - seconds, minutes, hours, days or years with the suffix s, m, h, d or y - type: - type_id: integer - required: true - io_workers: - display: - description: number of workers - name: start N workers continually writing, reading and removing temporary files - type: - type_id: integer - required: true - io_block_size: - display: - description: single write size - name: specify size of each write in bytes. Size can be from 1 byte to 4MB. - type: - type_id: string - required: true - io_write_bytes: - display: - description: Total number of bytes written - name: write N bytes for each hdd process, the default is 1 GB. One can specify the size - as % of free space on the file system or in units of Bytes, KBytes, MBytes and - GBytes using the suffix b, k, m or g - type: - type_id: string - required: true - target_pod_folder: - display: - description: Target Folder - name: Folder in the pod where the test will be executed and the test files will be written - type: - type_id: string - required: true - target_pod_volume: - display: - name: kubernetes volume definition - description: the volume that will be attached to the pod. In order to stress - the node storage only hosPath mode is currently supported - type: - type_id: ref - id: Volume - required: true - -steps: - kubeconfig: - plugin: - src: quay.io/arcalot/arcaflow-plugin-kubeconfig:0.2.0 - deployment_type: image - input: - kubeconfig: !expr $.input.kubeconfig - stressng: - plugin: - src: quay.io/arcalot/arcaflow-plugin-stressng:0.6.0 - deployment_type: image - step: workload - input: - cleanup: "true" - timeout: !expr $.input.duration - workdir: !expr $.input.target_pod_folder - stressors: - - stressor: hdd - workers: !expr $.input.io_workers - hdd-bytes: !expr $.input.io_write_bytes - hdd-write-size: !expr $.input.io_block_size - - deploy: - deployer_name: kubernetes - connection: !expr $.steps.kubeconfig.outputs.success.connection - pod: - metadata: - namespace: !expr $.input.namespace - labels: - arcaflow: stressng - spec: - nodeSelector: !expr $.input.node_selector - pluginContainer: - imagePullPolicy: Always - securityContext: - privileged: true - volumeMounts: - - mountPath: /hog-data - name: node-volume - volumes: - - !expr $.input.target_pod_volume - -outputs: - success: - stressng: !expr $.steps.stressng.outputs.success - diff --git a/scenarios/kube/io-hog/workflow.yaml b/scenarios/kube/io-hog/workflow.yaml deleted file mode 100644 index 13df6485d..000000000 --- a/scenarios/kube/io-hog/workflow.yaml +++ /dev/null @@ -1,26 +0,0 @@ -version: v0.2.0 -input: - root: RootObject - objects: - RootObject: - id: RootObject - properties: - input_list: - type: - type_id: list - items: - id: SubRootObject - type_id: ref - namespace: $.steps.workload_loop.execute.inputs.items -steps: - workload_loop: - kind: foreach - items: !expr $.input.input_list - workflow: sub-workflow.yaml - parallelism: 1000 -outputs: - success: - workloads: !expr $.steps.workload_loop.outputs.success.data - - - diff --git a/scenarios/kube/memory-hog.yml b/scenarios/kube/memory-hog.yml new file mode 100644 index 000000000..c426fcb8b --- /dev/null +++ b/scenarios/kube/memory-hog.yml @@ -0,0 +1,8 @@ +duration: 60 +workers: '' # leave it empty '' node cpu auto-detection +hog-type: memory +image: quay.io/krkn-chaos/krkn-hog +namespace: default +memory-vm-bytes: 90% +node-selector: "node-role.kubernetes.io/worker=" +number-of-nodes: '' diff --git a/scenarios/kube/memory-hog/config.yaml b/scenarios/kube/memory-hog/config.yaml deleted file mode 100644 index 263cf5032..000000000 --- a/scenarios/kube/memory-hog/config.yaml +++ /dev/null @@ -1,12 +0,0 @@ ---- -deployers: - image: - connection: {} - deployer_name: kubernetes -log: - level: error -logged_outputs: - error: - level: error - success: - level: debug diff --git a/scenarios/kube/memory-hog/input.yaml b/scenarios/kube/memory-hog/input.yaml deleted file mode 100644 index 444b47676..000000000 --- a/scenarios/kube/memory-hog/input.yaml +++ /dev/null @@ -1,13 +0,0 @@ -input_list: -- duration: 30 - vm_bytes: 10% - vm_workers: 2 - # set the node selector as a key-value pair eg. - # node_selector: - # kubernetes.io/hostname: kind-worker2 - node_selector: { } - kubeconfig: "" - namespace: default - -# duplicate this section to run simultaneous stressors in the same run - diff --git a/scenarios/kube/memory-hog/sub-workflow.yaml b/scenarios/kube/memory-hog/sub-workflow.yaml deleted file mode 100644 index 06a0f5233..000000000 --- a/scenarios/kube/memory-hog/sub-workflow.yaml +++ /dev/null @@ -1,89 +0,0 @@ -version: v0.2.0 -input: - root: SubRootObject - objects: - SubRootObject: - id: SubRootObject - properties: - kubeconfig: - display: - description: The complete kubeconfig file as a string - name: Kubeconfig file contents - type: - type_id: string - required: true - namespace: - display: - description: The namespace where the container will be deployed - name: Namespace - type: - type_id: string - required: true - node_selector: - display: - description: kubernetes node name where the plugin must be deployed - type: - type_id: map - values: - type_id: string - keys: - type_id: string - required: true - duration: - display: - name: duration the scenario expressed in seconds - description: stop stress test after T seconds. One can also specify the units of time in seconds, minutes, hours, days or years with the suffix s, m, h, d or y - type: - type_id: integer - required: true - vm_workers: - display: - description: Number of VM stressors to be run (0 means 1 stressor per CPU) - name: Number of VM stressors - type: - type_id: integer - required: true - vm_bytes: - display: - description: N bytes per vm process, the default is 256MB. The size can be expressed in units of Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g. - name: Kubeconfig file contents - type: - type_id: string - required: true - -steps: - kubeconfig: - plugin: - src: quay.io/arcalot/arcaflow-plugin-kubeconfig:0.2.0 - deployment_type: image - input: - kubeconfig: !expr $.input.kubeconfig - stressng: - plugin: - src: quay.io/arcalot/arcaflow-plugin-stressng:0.6.0 - deployment_type: image - step: workload - input: - cleanup: "true" - timeout: !expr $.input.duration - stressors: - - stressor: vm - workers: !expr $.input.vm_workers - vm-bytes: !expr $.input.vm_bytes - deploy: - deployer_name: kubernetes - connection: !expr $.steps.kubeconfig.outputs.success.connection - pod: - metadata: - namespace: !expr $.input.namespace - labels: - arcaflow: stressng - spec: - nodeSelector: !expr $.input.node_selector - pluginContainer: - imagePullPolicy: Always - -outputs: - success: - stressng: !expr $.steps.stressng.outputs.success - diff --git a/scenarios/kube/memory-hog/workflow.yaml b/scenarios/kube/memory-hog/workflow.yaml deleted file mode 100644 index fc51a5e4d..000000000 --- a/scenarios/kube/memory-hog/workflow.yaml +++ /dev/null @@ -1,29 +0,0 @@ -version: v0.2.0 -input: - root: RootObject - objects: - RootObject: - id: RootObject - properties: - input_list: - type: - type_id: list - items: - id: SubRootObject - type_id: ref - namespace: $.steps.workload_loop.execute.inputs.items - -steps: - workload_loop: - kind: foreach - items: !expr $.input.input_list - workflow: sub-workflow.yaml - parallelism: 1000 -outputs: - success: - workloads: !expr $.steps.workload_loop.outputs.success.data - - - - -