From 500b3c8502ad1993f3a6e390bb33199c72bf6be3 Mon Sep 17 00:00:00 2001 From: Toni Finger Date: Wed, 20 Nov 2024 08:31:20 +0100 Subject: [PATCH] Fixup: Update from flake8 Signed-off-by: Toni Finger --- .../plugin_cluster_stacks_remote_api.py | 92 +++++++++++++------ 1 file changed, 62 insertions(+), 30 deletions(-) diff --git a/Tests/kaas/plugin/plugin_cluster_stacks_remote_api.py b/Tests/kaas/plugin/plugin_cluster_stacks_remote_api.py index 6ac52f4e4..566772a4d 100644 --- a/Tests/kaas/plugin/plugin_cluster_stacks_remote_api.py +++ b/Tests/kaas/plugin/plugin_cluster_stacks_remote_api.py @@ -1,13 +1,13 @@ import os import yaml import subprocess -import base64 import time import logging from interface import KubernetesClusterPlugin logger = logging.getLogger("PluginClusterStacks") + # Helper functions def wait_for_pods(self, namespaces, timeout=240, interval=15, kubeconfig=None): """ @@ -27,13 +27,19 @@ def wait_for_pods(self, namespaces, timeout=240, interval=15, kubeconfig=None): for namespace in namespaces: try: # Get pod status in the namespace - wait_pods_command = ( - f"kubectl wait -n {namespace} --for=condition=Ready --timeout={timeout}s pod --all" + wait_pods_command = f"kubectl wait --kubeconfig={kubeconfig} -n {namespace} --for=condition=Ready --timeout={timeout}s pod --all" + result = self._run_subprocess( + wait_pods_command, + f"Error fetching pods in {namespace}", + shell=True, + capture_output=True, + text=True, ) - result = self._run_subprocess(wait_pods_command, f"Error fetching pods in {namespace}", shell=True, capture_output=True, text=True, kubeconfig=kubeconfig) if result.returncode != 0: - logger.warning(f"Not all pods in namespace {namespace} are ready. Details: {result.stderr}") + logger.warning( + f"Not all pods in namespace {namespace} are ready. Details: {result.stderr}" + ) all_pods_ready = False else: logger.info(f"All pods in namespace {namespace} are ready.") @@ -49,7 +55,9 @@ def wait_for_pods(self, namespaces, timeout=240, interval=15, kubeconfig=None): logger.info("Waiting for all pods in specified namespaces to become ready...") time.sleep(interval) - raise TimeoutError(f"Timed out after {timeout} seconds waiting for pods in namespaces {namespaces} to become ready.") + raise TimeoutError( + f"Timed out after {timeout} seconds waiting for pods in namespaces {namespaces} to become ready." + ) def load_config(config_path): @@ -57,20 +65,20 @@ def load_config(config_path): Loads the configuration from a YAML file. """ - with open(config_path, 'r') as file: + with open(config_path, "r") as file: config = yaml.safe_load(file) or {} return config + class PluginClusterStacksRemoteAPI(KubernetesClusterPlugin): def __init__(self, config_file=None): self.config = load_config(config_file) if config_file else {} logger.debug(self.config) self.working_directory = os.getcwd() logger.debug(f"Working from {self.working_directory}") - self.kubeconfig_mgmnt = self.config['kubeconfig'] - self.workloadclusters = self.config['workloadcluster'] - self.cs_namespace = self.config['namespace'] - + self.kubeconfig_mgmnt = self.config["kubeconfig"] + self.workloadclusters = self.config["workloadcluster"] + self.cs_namespace = self.config["namespace"] def create_cluster(self, cluster_name=None, version=None, kubeconfig_filepath=None): self.cluster_name = cluster_name @@ -78,40 +86,56 @@ def create_cluster(self, cluster_name=None, version=None, kubeconfig_filepath=No self.kubeconfig_cs_cluster = kubeconfig_filepath # Create workload cluster - self._apply_yaml(self.workloadclusters, "Error applying cluster.yaml", kubeconfig=self.kubeconfig_mgmnt) + self._apply_yaml( + self.workloadclusters, + "Error applying cluster.yaml", + kubeconfig=self.kubeconfig_mgmnt, + ) - #TODO:!!! We also need to introduce a waiting function here + # TODO:!!! We also need to introduce a waiting function here print("retrieve kubeconfig to path") - self._retrieve_kubeconfig(namespace=self.cs_namespace, kubeconfig=self.kubeconfig_mgmnt) + self._retrieve_kubeconfig( + namespace=self.cs_namespace, kubeconfig=self.kubeconfig_mgmnt + ) # Wait for workload system pods to be ready # wait_for_workload_pods_ready(kubeconfig_path=self.kubeconfig_cs_cluster) # ~ wait_for_pods(self, ["kube-system"], timeout=600, interval=15, kubeconfig=self.kubeconfig_cs_cluster) - - def delete_cluster(self, cluster_name=None): #TODO:!!! need to adjust delete method + def delete_cluster( + self, cluster_name=None + ): # TODO:!!! need to adjust delete method self.cluster_name = cluster_name - #Get the name of the workloadcluster from the config file + # Get the name of the workloadcluster from the config file workload_cluster_config = load_config(self.workloadclusters) - workload_cluster_name = workload_cluster_config['metadata']['name'] + workload_cluster_name = workload_cluster_config["metadata"]["name"] try: # Check if the cluster exists check_cluster_command = f"kubectl --kubeconfig={self.kubeconfig_mgmnt} get cluster {workload_cluster_name} -n {self.cs_namespace}" - result = self._run_subprocess(check_cluster_command, "Failed to get cluster resource", shell=True, capture_output=True, text=True) + result = self._run_subprocess( + check_cluster_command, + "Failed to get cluster resource", + shell=True, + capture_output=True, + text=True, + ) # Proceed with deletion only if the cluster exists if result.returncode == 0: delete_command = f"kubectl --kubeconfig={self.kubeconfig_mgmnt} delete cluster {workload_cluster_name} --timeout=600s -n {self.cs_namespace}" - self._run_subprocess(delete_command, "Timeout while deleting the cluster", shell=True) + self._run_subprocess( + delete_command, "Timeout while deleting the cluster", shell=True + ) except subprocess.CalledProcessError as error: if "NotFound" in error.stderr: - logger.info(f"Cluster {workload_cluster_name} not found. Skipping deletion.") + logger.info( + f"Cluster {workload_cluster_name} not found. Skipping deletion." + ) else: raise RuntimeError(f"Error checking for cluster existence: {error}") - def _apply_yaml(self, yaml_file, error_msg, kubeconfig=None): """ Applies a Kubernetes YAML configuration file to the cluster, substituting environment variables as needed. @@ -132,7 +156,6 @@ def _apply_yaml(self, yaml_file, error_msg, kubeconfig=None): except subprocess.CalledProcessError as error: raise RuntimeError(f"{error_msg}: {error}") - def _retrieve_kubeconfig(self, namespace="default", kubeconfig=None): """ Retrieves the kubeconfig for the specified cluster and saves it to a local file. @@ -141,25 +164,28 @@ def _retrieve_kubeconfig(self, namespace="default", kubeconfig=None): :param kubeconfig: Optional path to the kubeconfig file for the target Kubernetes cluster. """ - #Get the name of the workloadcluster from the config file + # Get the name of the workloadcluster from the config file workload_cluster_config = load_config(self.workloadclusters) - workload_cluster_name = workload_cluster_config['metadata']['name'] + workload_cluster_name = workload_cluster_config["metadata"]["name"] command_args = [ "kubectl ", f"--kubeconfig={self.kubeconfig_mgmnt}", f"-n {self.cs_namespace}", - f"get secret {workload_cluster_name}-kubeconfig", + f"get secret {workload_cluster_name}-kubeconfig", "-o go-template='{{.data.value|base64decode}}'", f"> {self.kubeconfig_cs_cluster}", ] kubeconfig_command = "" for entry in command_args: kubeconfig_command += entry + " " - self._run_subprocess(kubeconfig_command, "Error retrieving kubeconfig", shell=True) - + self._run_subprocess( + kubeconfig_command, "Error retrieving kubeconfig", shell=True + ) - def _run_subprocess(self, command, error_msg, shell=False, capture_output=False, text=False): + def _run_subprocess( + self, command, error_msg, shell=False, capture_output=False, text=False + ): """ Executes a subprocess command with the specified environment variables and parameters. @@ -172,7 +198,13 @@ def _run_subprocess(self, command, error_msg, shell=False, capture_output=False, """ try: # Run the subprocess - result = subprocess.run(command, shell=shell, capture_output=capture_output, text=text, check=True) + result = subprocess.run( + command, + shell=shell, + capture_output=capture_output, + text=text, + check=True, + ) return result except subprocess.CalledProcessError as error: logger.error(f"{error_msg}: {error}")