From 037a9b711afff8f5e78b18f0becada44411bd1d6 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 28 Mar 2024 08:09:13 +0100 Subject: [PATCH] Remove provider registration and configuration (#95) * Remove provider registration and configuration * more unused code removed * more unused code removed --- requirements.txt | 1 - setup.cfg | 1 - .../environment.py | 69 +-- src/environment_provider/lib/registry.py | 68 +-- src/environment_provider_api/.gitignore | 41 -- src/environment_provider_api/__init__.py | 31 -- .../backend/__init__.py | 16 - .../backend/common.py | 49 -- .../backend/configure.py | 135 ----- .../backend/register.py | 88 ---- .../middleware/__init__.py | 18 - .../middleware/json_translator.py | 32 -- .../middleware/require_json.py | 38 -- src/environment_provider_api/webserver.py | 368 -------------- tests/backend/__init__.py | 16 - tests/backend/test_common.py | 83 ---- tests/backend/test_configure.py | 461 ------------------ tests/backend/test_environment.py | 282 ----------- tests/backend/test_register.py | 376 -------------- tests/library/fake_celery.py | 62 --- tests/library/fake_request.py | 63 --- tests/test_environment_provider.py | 93 ---- tests/test_webserver_configure.py | 279 ----------- tests/test_webserver_environment.py | 193 -------- tests/test_webserver_register.py | 113 ----- 25 files changed, 8 insertions(+), 2968 deletions(-) rename src/{environment_provider_api/backend => environment_provider}/environment.py (66%) delete mode 100644 src/environment_provider_api/.gitignore delete mode 100644 src/environment_provider_api/__init__.py delete mode 100644 src/environment_provider_api/backend/__init__.py delete mode 100644 src/environment_provider_api/backend/common.py delete mode 100644 src/environment_provider_api/backend/configure.py delete mode 100644 src/environment_provider_api/backend/register.py delete mode 100644 src/environment_provider_api/middleware/__init__.py delete mode 100644 src/environment_provider_api/middleware/json_translator.py delete mode 100644 src/environment_provider_api/middleware/require_json.py delete mode 100644 src/environment_provider_api/webserver.py delete mode 100644 tests/backend/__init__.py delete mode 100644 tests/backend/test_common.py delete mode 100644 tests/backend/test_configure.py delete mode 100644 tests/backend/test_environment.py delete mode 100644 tests/backend/test_register.py delete mode 100644 tests/library/fake_celery.py delete mode 100644 tests/library/fake_request.py delete mode 100644 tests/test_webserver_configure.py delete mode 100644 tests/test_webserver_environment.py delete mode 100644 tests/test_webserver_register.py diff --git a/requirements.txt b/requirements.txt index b9b5fd0..c7e6e3e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ celery~=5.3 cryptography>=42.0.4,<43.0.0 gevent~=23.7 gunicorn~=19.9 -falcon~=3.1 jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 diff --git a/setup.cfg b/setup.cfg index dd8946e..4bb365f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,7 +31,6 @@ install_requires = cryptography>=42.0.4,<43.0.0 gevent~=23.7 gunicorn~=19.9 - falcon~=3.1 jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 diff --git a/src/environment_provider_api/backend/environment.py b/src/environment_provider/environment.py similarity index 66% rename from src/environment_provider_api/backend/environment.py rename to src/environment_provider/environment.py index e94602c..c034d4f 100644 --- a/src/environment_provider_api/backend/environment.py +++ b/src/environment_provider/environment.py @@ -18,12 +18,9 @@ import traceback from typing import Optional, Union -from celery import Celery from etos_lib import ETOS -from falcon import Request from jsontas.jsontas import JsonTas -from environment_provider.environment_provider import get_environment from environment_provider.lib.database import ETCDPath from environment_provider.lib.registry import ProviderRegistry from execution_space_provider import ExecutionSpaceProvider @@ -34,33 +31,6 @@ from log_area_provider.log_area import LogArea -def get_environment_id(request: Request) -> Optional[str]: - """Get the environment ID from request. - - :param request: The falcon request object. - :return: The ID of the environment. - """ - return request.get_param("id") - - -def get_release_id(request: Request) -> Optional[str]: - """Get the task ID to release, from request. - - :param request: The falcon request object. - :return: The ID of the environment to release. - """ - return request.get_param("release") - - -def get_single_release_id(request: Request) -> Optional[str]: - """Get the environment ID to release, from request. - - :param request: The falcon request object. - :return: The ID of the environment to release. - """ - return request.get_param("single_release") - - def checkin_provider( item: dict, provider: Union[IutProvider, ExecutionSpaceProvider, LogAreaProvider] ) -> tuple[bool, Optional[Exception]]: @@ -134,10 +104,11 @@ def release_full_environment(etos: ETOS, jsontas: JsonTas, suite_id: str) -> tup registry = ProviderRegistry(etos, jsontas, suite_id) for suite, metadata in registry.testrun.join("suite").read_all(): suite = json.loads(suite) - try: - failure = release_environment(etos, jsontas, registry, suite) - except json.JSONDecodeError as exception: - failure = exception + for sub_suite in suite.get("sub_suites", []): + try: + failure = release_environment(etos, jsontas, registry, sub_suite) + except json.JSONDecodeError as exception: + failure = exception ETCDPath(metadata.get("key")).delete() registry.testrun.delete_all() @@ -147,33 +118,3 @@ def release_full_environment(etos: ETOS, jsontas: JsonTas, suite_id: str) -> tup traceback.format_exception(failure, value=failure, tb=failure.__traceback__) ) return True, "" - - -def check_environment_status(celery_worker: Celery, environment_id: str) -> dict: - """Check the status of the environment that is being requested. - - :param celery_worker: The worker holding the task results. - :param environment_id: The environment ID to check status on. - :return: A dictionary of status and and result. - """ - task_result = celery_worker.AsyncResult(environment_id) - result = task_result.result - status = task_result.status - if isinstance(result, Exception): - status = "FAILURE" - result = str(result) - elif result and result.get("error") is not None: - status = "FAILURE" - if result: - task_result.get() - return {"status": status, "result": result} - - -def request_environment(suite_id: str, suite_runner_ids: list[str]) -> str: - """Request an environment for a test suite ID. - - :param suite_id: Suite ID to request an environment for. - :param suite_runner_ids: Suite runner correlation IDs. - :return: The task ID for the request. - """ - return get_environment.delay(suite_id, suite_runner_ids).id diff --git a/src/environment_provider/lib/registry.py b/src/environment_provider/lib/registry.py index 1903741..14c7226 100644 --- a/src/environment_provider/lib/registry.py +++ b/src/environment_provider/lib/registry.py @@ -23,9 +23,9 @@ from etos_lib.etos import ETOS from jsontas.jsontas import JsonTas -from execution_space_provider import ExecutionSpaceProvider, execution_space_provider_schema -from iut_provider import IutProvider, iut_provider_schema -from log_area_provider import LogAreaProvider, log_area_provider_schema +from execution_space_provider import ExecutionSpaceProvider +from iut_provider import IutProvider +from log_area_provider import LogAreaProvider from .database import ETCDPath @@ -128,35 +128,6 @@ def get_execution_space_provider_by_id(self, provider_id: str) -> Optional[dict] return json.loads(provider, object_pairs_hook=OrderedDict) return None - def register_log_area_provider(self, ruleset: dict) -> None: - """Register a new log area provider. - - :param ruleset: Log area JSON definition to register. - """ - data = self.validate(ruleset, log_area_provider_schema(ruleset)) - self.logger.info("Registering %r", data["log"]["id"]) - self.providers.join(f"log-area/{data['log']['id']}").write(json.dumps(data)) - - def register_iut_provider(self, ruleset: dict) -> None: - """Register a new IUT provider. - - :param ruleset: IUT provider JSON definition to register. - """ - data = self.validate(ruleset, iut_provider_schema(ruleset)) - self.logger.info("Registering %r", data["iut"]["id"]) - self.providers.join(f"iut/{data['iut']['id']}").write(json.dumps(data)) - - def register_execution_space_provider(self, ruleset: dict) -> None: - """Register a new execution space provider. - - :param ruleset: Execution space provider JSON definition to register. - """ - data = self.validate(ruleset, execution_space_provider_schema(ruleset)) - self.logger.info("Registering %r", data["execution_space"]["id"]) - self.providers.join(f"execution-space/{data['execution_space']['id']}").write( - json.dumps(data) - ) - def execution_space_provider(self) -> Optional[ExecutionSpaceProvider]: """Get the execution space provider configured to suite ID. @@ -214,36 +185,3 @@ def dataset(self) -> Optional[dict]: if dataset: return json.loads(dataset) return None - - # pylint:disable=too-many-arguments - def configure_environment_provider_for_suite( - self, - iut_provider: dict, - log_area_provider: dict, - execution_space_provider: dict, - dataset: dict, - ) -> None: - """Configure environment provider for a suite ID with providers and dataset. - - :param iut_provider: IUT provider definition to configure for suite ID. - :param log_area_provider: Log area provider definition to configure for suite ID. - :param execution_space_provider: Execution space provider definition to configure - for suite ID. - :param dataset: Dataset to configure for suite ID. - """ - self.logger.info("Configuring environment provider.") - self.logger.info("Dataset: %r", dataset) - self.logger.info("IUT provider: %r", iut_provider.get("iut", {}).get("id")) - self.logger.info( - "Execution space provider: %r", - execution_space_provider.get("execution_space", {}).get("id"), - ) - self.logger.info("Log area provider: %r", log_area_provider.get("log", {}).get("id")) - self.logger.info("Expire: 3600") - - self.testrun.join("provider/dataset").write(json.dumps(dataset), expire=3600) - self.testrun.join("provider/iut").write(json.dumps(iut_provider), expire=3600) - self.testrun.join("provider/log-area").write(json.dumps(log_area_provider), expire=3600) - self.testrun.join("provider/execution-space").write( - json.dumps(execution_space_provider), expire=3600 - ) diff --git a/src/environment_provider_api/.gitignore b/src/environment_provider_api/.gitignore deleted file mode 100644 index f41c0ad..0000000 --- a/src/environment_provider_api/.gitignore +++ /dev/null @@ -1,41 +0,0 @@ -# Temporary and binary files -*~ -*.py[cod] -*.so -*.cfg -!setup.cfg -*.orig -*.log -*.pot -__pycache__/* -.cache/* -.*.swp - -# Project files -.ropeproject -.project -.pydevproject -.settings -.idea - -# Package files -*.egg -*.eggs/ -.installed.cfg -*.egg-info - -# Unittest and coverage -htmlcov/* -.coverage -.tox -junit.xml -coverage.xml - -# Build and docs folder/files -build/* -dist/* -sdist/* -docs/api/* -docs/_build/* -cover/* -MANIFEST diff --git a/src/environment_provider_api/__init__.py b/src/environment_provider_api/__init__.py deleted file mode 100644 index 810b34e..0000000 --- a/src/environment_provider_api/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""ETOS environment provider module.""" -import os -from importlib.metadata import PackageNotFoundError, version - -from etos_lib.logging.logger import setup_logging - -try: - VERSION = version("environment_provider") -except PackageNotFoundError: - VERSION = "Unknown" - -DEV = os.getenv("DEV", "false").lower() == "true" -ENVIRONMENT = "development" if DEV else "production" -# Disable extra logging, if the environment provider is imported instead of executed via celery -if os.getenv("ENVIRONMENT_PROVIDER_DISABLE_LOGGING", "false") == "false": - setup_logging("ETOS Environment Provider API", VERSION, ENVIRONMENT) diff --git a/src/environment_provider_api/backend/__init__.py b/src/environment_provider_api/backend/__init__.py deleted file mode 100644 index e18ab22..0000000 --- a/src/environment_provider_api/backend/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Backend services for the environment provider webserver.""" diff --git a/src/environment_provider_api/backend/common.py b/src/environment_provider_api/backend/common.py deleted file mode 100644 index 3c47ea2..0000000 --- a/src/environment_provider_api/backend/common.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Common functionality for all backend types.""" -from typing import Optional - -from falcon import Request -from falcon.errors import MediaNotFoundError - - -def get_suite_id(request: Request) -> Optional[str]: - """Get a suite ID from the request. - - :param request: The falcon request object. - :return: A Suite ID. - """ - try: - media = request.get_media() - return media.get("suite_id") - except MediaNotFoundError: - return request.get_param("suite_id") - - -def get_suite_runner_ids(request: Request) -> Optional[str]: - """Get suite runner IDs from the request. - - :param request: The falcon request object. - :return: Suite runner IDs. - """ - try: - media = request.get_media() - param = media.get("suite_runner_ids") - except MediaNotFoundError: - param = request.get_param("suite_runner_ids") - if param is None: - return param - return param.split(",") diff --git a/src/environment_provider_api/backend/configure.py b/src/environment_provider_api/backend/configure.py deleted file mode 100644 index e567c4b..0000000 --- a/src/environment_provider_api/backend/configure.py +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Backend for the configuration requests.""" -import json -from typing import Optional, Union - -from falcon import Request - -from environment_provider.lib.registry import ProviderRegistry - - -def get_iut_provider_id(request: Request) -> Optional[str]: - """Get an IUT provider ID from the request. - - :param request: The falcon request object. - :return: An IUT provider ID. - """ - return request.get_media().get("iut_provider") - - -def get_execution_space_provider_id(request: Request) -> Optional[str]: - """Get an execution space provider ID from the request. - - :param request: The falcon request object. - :return: An execution space provider ID. - """ - return request.get_media().get("execution_space_provider") - - -def get_log_area_provider_id(request: Request) -> Optional[str]: - """Get an log area provider ID from the request. - - :param request: The falcon request object. - :return: A log area provider ID. - """ - return request.get_media().get("log_area_provider") - - -def get_dataset(request: Request) -> Union[None, dict, list]: - """Get a dataset from the request. - - :param request: The falcon request object. - :return: A dataset. - """ - dataset = request.get_media().get("dataset") - if dataset is not None: - if not isinstance(dataset, (dict, list)): - dataset = json.loads(dataset) - return dataset - - -# pylint:disable=too-many-arguments -def configure( - provider_registry: ProviderRegistry, - iut_provider_id: str, - execution_space_provider_id: str, - log_area_provider_id: str, - dataset: dict, -) -> tuple[bool, str]: - """Configure the environment provider. - - :param provider_registry: The provider registry to store configuration in. - :param iut_provider_id: The ID of the IUT provider to configure with. - :param execution_space_provider_id: The ID of the execution space provider to configure with. - :param log_area_provider_id: The ID of the log area provider to configure with. - :param dataset: The dataset to configure with. - :return: Whether or not the configuration was successful. - """ - if not all( - [ - provider_registry, - iut_provider_id, - execution_space_provider_id, - log_area_provider_id, - isinstance(dataset, (dict, list)), - ] - ): - return False, "Missing parameters to configure request" - iut_provider = provider_registry.get_iut_provider_by_id(iut_provider_id) - execution_space_provider = provider_registry.get_execution_space_provider_by_id( - execution_space_provider_id - ) - log_area_provider = provider_registry.get_log_area_provider_by_id(log_area_provider_id) - if not all( - [ - iut_provider, - execution_space_provider, - log_area_provider, - ] - ): - return ( - False, - f"Could not find providers {iut_provider_id!r}, {execution_space_provider_id!r} " - f" or {log_area_provider_id!r} registered in database", - ) - provider_registry.configure_environment_provider_for_suite( - iut_provider, - log_area_provider, - execution_space_provider, - dataset, - ) - return True, "" - - -def get_configuration(provider_registry: ProviderRegistry) -> dict: - """Get a stored configuration by suite ID. - - :param provider_registry: The provider registry to get configuration from. - :return: The configuration stored for suite ID. - """ - iut_provider = provider_registry.iut_provider() - execution_space_provider = provider_registry.execution_space_provider() - log_area_provider = provider_registry.log_area_provider() - dataset = provider_registry.dataset() - return { - "iut_provider": iut_provider.ruleset if iut_provider else None, - "execution_space_provider": ( - execution_space_provider.ruleset if execution_space_provider else None - ), - "log_area_provider": log_area_provider.ruleset if log_area_provider else None, - "dataset": dataset, - } diff --git a/src/environment_provider_api/backend/register.py b/src/environment_provider_api/backend/register.py deleted file mode 100644 index 7de1ed0..0000000 --- a/src/environment_provider_api/backend/register.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Backend services for the register endpoint.""" -import json -from typing import Optional - -from falcon import Request - -from environment_provider.lib.registry import ProviderRegistry - - -def get_iut_provider(request: Request) -> Optional[dict]: - """Get IUT provider JSON from request. - - :param request: The falcon request object. - :return: An IUT provider from request. - """ - return json_to_dict(request.get_media().get("iut_provider")) - - -def get_log_area_provider(request: Request) -> Optional[dict]: - """Get log area provider JSON from request. - - :param request: The falcon request object. - :return: A log area provider from request. - """ - return json_to_dict(request.get_media().get("log_area_provider")) - - -def get_execution_space_provider(request: Request) -> Optional[dict]: - """Get execution space provider JSON from request. - - :param request: The falcon request object. - :return: An execution space provider from request. - """ - return json_to_dict(request.get_media().get("execution_space_provider")) - - -def register( - provider_registry: ProviderRegistry, - iut_provider: Optional[dict] = None, - log_area_provider: Optional[dict] = None, - execution_space_provider: Optional[dict] = None, -) -> bool: - """Register one or many providers. - - :param provider_registry: The provider registry to store providers in. - :param iut_provider: An IUT provider to register. - :param log_area_provider: A log area provider to register. - :param execution_space_provider: An execution space provider to register. - :return: The result of the registering. - """ - if not any([iut_provider, log_area_provider, execution_space_provider]): - # At least one provider must be supplied. - return False - if iut_provider is not None: - provider_registry.register_iut_provider(iut_provider) - if log_area_provider is not None: - provider_registry.register_log_area_provider(log_area_provider) - if execution_space_provider is not None: - provider_registry.register_execution_space_provider(execution_space_provider) - return True - - -def json_to_dict(json_str: str) -> Optional[dict]: - """Convert a JSON string to a dictionary if not already a dictionary. - - :param json_str: JSON string to convert. - :return: JSON string as a dictionary. - """ - if json_str: - if isinstance(json_str, dict): - return json_str - return json.loads(json_str) - return json_str diff --git a/src/environment_provider_api/middleware/__init__.py b/src/environment_provider_api/middleware/__init__.py deleted file mode 100644 index 9bf6710..0000000 --- a/src/environment_provider_api/middleware/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""ETOS environment provider API middleware module.""" -from .json_translator import JSONTranslator -from .require_json import RequireJSON diff --git a/src/environment_provider_api/middleware/json_translator.py b/src/environment_provider_api/middleware/json_translator.py deleted file mode 100644 index e0bd841..0000000 --- a/src/environment_provider_api/middleware/json_translator.py +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""JSON translator module.""" -import falcon - -# pylint: disable=too-few-public-methods - - -class JSONTranslator: - """Translate request media to JSON.""" - - def process_request(self, req: falcon.Request, _) -> None: - """Process request.""" - if req.content_length in (None, 0): - return - - body = req.media - if not body: - raise falcon.HTTPBadRequest("Empty request body", "A valid JSON document is required.") diff --git a/src/environment_provider_api/middleware/require_json.py b/src/environment_provider_api/middleware/require_json.py deleted file mode 100644 index 06a825c..0000000 --- a/src/environment_provider_api/middleware/require_json.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Require JSON module.""" -import falcon - -# pylint: disable=too-few-public-methods - - -class RequireJSON: - """Require Accept: application/json headers for this API.""" - - def process_request(self, req: falcon.Request, _) -> None: - """Process request.""" - if not req.client_accepts_json: - raise falcon.HTTPNotAcceptable( - "This API only supports responses encoded as JSON.", - href="http://docs.examples.com/api/json", - ) - - if req.method in ("POST", "PUT"): - if req.content_type is None or "application/json" not in req.content_type: - raise falcon.HTTPUnsupportedMediaType( - "This API only supports requests encoded as JSON.", - href="http://docs.examples.com/api/json", - ) diff --git a/src/environment_provider_api/webserver.py b/src/environment_provider_api/webserver.py deleted file mode 100644 index e477db2..0000000 --- a/src/environment_provider_api/webserver.py +++ /dev/null @@ -1,368 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""ETOS Environment Provider webserver module.""" -import json -import logging -import os -from pathlib import Path -from typing import Iterator - -import falcon -from celery import Celery -from etos_lib.etos import ETOS -from etos_lib.logging.logger import FORMAT_CONFIG -from jsontas.jsontas import JsonTas - -from environment_provider.lib.celery import APP -from environment_provider.lib.database import ETCDPath -from environment_provider.lib.registry import ProviderRegistry - -from .backend.common import get_suite_id, get_suite_runner_ids -from .backend.configure import ( - configure, - get_configuration, - get_dataset, - get_execution_space_provider_id, - get_iut_provider_id, - get_log_area_provider_id, -) -from .backend.environment import ( - check_environment_status, - get_environment_id, - get_release_id, - get_single_release_id, - release_environment, - release_full_environment, - request_environment, -) -from .backend.register import ( - get_execution_space_provider, - get_iut_provider, - get_log_area_provider, - register, -) -from .middleware import JSONTranslator, RequireJSON - - -class Webserver: - """Environment provider base endpoint.""" - - logger = logging.getLogger(__name__) - - def __init__(self, celery_worker: Celery) -> None: - """Init with a db class. - - :param celery_worker: The celery app to use. - """ - self.celery_worker = celery_worker - - def release_by_environment_id(self, response: falcon.Response, environment_id: str) -> None: - """Release a single environment. - - This is a backwards compatibility layer for the ESR so that it can still continue working - with just sending in the environment ID. - - :param response: Response object to edit and return. - :param environment_id: Environment to release. - """ - environment = ETCDPath(f"/environment/{environment_id}") - testrun_id = environment.join("testrun-id").read() - suite_id = environment.join("suite-id").read() - if testrun_id is None or suite_id is None: - self.logger.warning("Environment for %r already checked in", environment_id) - return - try: - self.release_single(response, environment_id, testrun_id.decode(), suite_id.decode()) - finally: - environment.delete_all() - - def release_single( - self, response: falcon.Response, environment_id: str, testrun_id: str, suite_id: str - ) -> None: - """Release a single environment using suite and test run IDs. - - :param response: Response object to edit and return. - :param environment_id: Environment to release. - :param testrun_id: ID of the testrun where the environment to release resides. - :param suite_id: Test suite started ID where the environment to release resides. - """ - etos = ETOS( - "ETOS Environment Provider", - os.getenv("HOSTNAME"), - "Environment Provider", - ) - jsontas = JsonTas() - registry = ProviderRegistry(etos, jsontas, testrun_id) - suite = ETCDPath( - f"/testrun/{testrun_id}/suite/{suite_id}/subsuite/{environment_id}/suite" - ).read() - if suite is None: - self.logger.warning( - f"/testrun/{testrun_id}/suite/{suite_id}/subsuite/{environment_id}/suite" - ) - try: - failure = release_environment(etos, jsontas, registry, json.loads(suite)) - except json.JSONDecodeError as exc: - self.logger.error("Failed to decode test suite JSON: %r", suite) - failure = exc - - if failure: - response.media = { - "error": "Failed to release environment", - "details": "".join( - traceback.format_exception(failure, value=failure, tb=failure.__traceback__) - ), - "status": "FAILURE", - } - return - - response.status = falcon.HTTP_200 - response.media = {"status": "SUCCESS"} - - def release_full(self, response: falcon.Response, testrun_id: str) -> None: - """Release a full test environment using test run ID. - - :param response: Response object to edit and return. - :param testrun_id: Testrun to release. - """ - etos = ETOS( - "ETOS Environment Provider", - os.getenv("HOSTNAME"), - "Environment Provider", - ) - jsontas = JsonTas() - - task_id = ETCDPath(f"/testrun/{testrun_id}/environment-provider/task-id").read().decode() - task_result = None - if task_id is not None: - task_result = self.celery_worker.AsyncResult(task_id) - task_result.forget() - - success, message = release_full_environment(etos, jsontas, testrun_id) - if not success: - self.logger.error(message) - response.media = { - "error": "Failed to release environment", - "details": message, - "status": task_result.status if task_result else "PENDING", - } - return - - response.status = falcon.HTTP_200 - response.media = {"status": task_result.status if task_result else "PENDING"} - - def release_by_task_id(self, response: falcon.Response, task_id: str) -> None: - """Release a full environment. - - This is a backwards compatibility layer for the ESR to continue to release using the task - ID that the environment provider returns when requesting an environment. - - :param response: Response object to edit and return. - :param task_id: Task to release. - """ - suite_id = ETCDPath(f"/environment/{task_id}/suite-id").read() - if suite_id is None: - self.logger.warning("Environment for %r already checked in", task_id) - return - self.release_full(response, suite_id.decode()) - ETCDPath(f"/environment/{task_id}/suite-id").delete() - - def on_get(self, request: falcon.Request, response: falcon.Response) -> None: - """GET endpoint for environment provider API. - - Get environment task or release environment. - - :param request: Falcon request object. - :param response: Falcon response object. - """ - task_id = get_environment_id(request) - release = get_release_id(request) - single_release = get_single_release_id(request) - if not any([task_id, release, single_release]): - raise falcon.HTTPBadRequest( - title="Missing parameters", - description="'id', 'release' or 'single_release' are required parameters.", - ) - if single_release: - self.release_by_environment_id(response, single_release) - elif release: - self.release_by_task_id(response, release) - else: - result = check_environment_status(self.celery_worker, task_id) - response.status = falcon.HTTP_200 - response.media = result - - @staticmethod - def on_post(request: falcon.Request, response: falcon.Response) -> None: - """POST endpoint for environment provider API. - - Create a new environment and return it. - - :param request: Falcon request object. - :param response: Falcon response object. - """ - suite_id = get_suite_id(request) - suite_runner_ids = get_suite_runner_ids(request) - if not all([suite_runner_ids, suite_id]): - raise falcon.HTTPBadRequest( - title="Missing parameters", - description="the 'suite_id' and 'suite_runner_ids' parameters are required.", - ) - - task_id = request_environment(suite_id, suite_runner_ids) - - # TODO: This shall be removed when API version v1 is used by the ESR and API. - ETCDPath(f"/environment/{task_id}/suite-id").write(suite_id) - ETCDPath(f"/testrun/{suite_id}/environment-provider/task-id").write(task_id) - - response.status = falcon.HTTP_200 - response.media = {"result": "success", "data": {"id": task_id}} - - -class Configure: - """Configure endpoint for environment provider. Configure an environment for checkout. - - This endpoint should be called before attempting to checkout an environment so that - the environment provider is configured to handle it. - """ - - logger = logging.getLogger(__name__) - - def on_post(self, request: falcon.Request, response: falcon.Response) -> None: - """Verify that all parameters are available and configure the provider registry. - - :param request: Falcon request object. - :param response: Falcon response object. - """ - etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") - jsontas = JsonTas() - suite_id = get_suite_id(request) - if suite_id is None: - self.logger.error("Missing suite_id in request") - raise falcon.HTTPBadRequest( - title="Bad request", description="missing suite_id in request" - ) - registry = ProviderRegistry(etos, jsontas, suite_id) - FORMAT_CONFIG.identifier = suite_id - - success, message = configure( - registry, - get_iut_provider_id(request), - get_execution_space_provider_id(request), - get_log_area_provider_id(request), - get_dataset(request), - ) - if not success: - self.logger.error(message) - raise falcon.HTTPBadRequest(title="Bad request", description=message) - response.status = falcon.HTTP_200 - - def on_get(self, request: falcon.Request, response: falcon.Response) -> None: - """Get an already configured environment based on suite ID. - - Use only to verify that the environment has been configured properly. - - :param request: Falcon request object. - :param response: Falcon response object. - """ - etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") - jsontas = JsonTas() - suite_id = get_suite_id(request) - if suite_id is None: - raise falcon.HTTPBadRequest( - title="Missing parameters", description="'suite_id' is a required parameter." - ) - - registry = ProviderRegistry(etos, jsontas, suite_id) - - FORMAT_CONFIG.identifier = suite_id - response.status = falcon.HTTP_200 - response.media = get_configuration(registry) - - -class Register: # pylint:disable=too-few-public-methods - """Register one or several new providers to the environment provider.""" - - logger = logging.getLogger(__name__) - - def __init__(self) -> None: - """Load providers.""" - self.load_providers_from_disk() - - def providers(self, directory: Path) -> Iterator[dict]: - """Read provider json files from a directory. - - :param directory: Directory to read provider json files from. - :return: An iterator of the json files. - """ - try: - filenames = os.listdir(directory) - except FileNotFoundError: - return - for provider_filename in filenames: - if not directory.joinpath(provider_filename).is_file(): - self.logger.warn("Not a file: %r", provider_filename) - continue - with directory.joinpath(provider_filename).open() as provider_file: - yield json.load(provider_file) - - def load_providers_from_disk(self) -> None: - """Register provider files from file system, should environment variables be set.""" - etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") - jsontas = JsonTas() - registry = ProviderRegistry(etos, jsontas, None) - - if os.getenv("EXECUTION_SPACE_PROVIDERS"): - for provider in self.providers(Path(os.getenv("EXECUTION_SPACE_PROVIDERS"))): - register(registry, execution_space_provider=provider) - if os.getenv("LOG_AREA_PROVIDERS"): - for provider in self.providers(Path(os.getenv("LOG_AREA_PROVIDERS"))): - register(registry, log_area_provider=provider) - if os.getenv("IUT_PROVIDERS"): - for provider in self.providers(Path(os.getenv("IUT_PROVIDERS"))): - register(registry, iut_provider=provider) - - def on_post(self, request: falcon.Request, response: falcon.Response) -> None: - """Register a new provider. - - :param request: Falcon request object. - :param response: Falcon response object. - """ - etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") - jsontas = JsonTas() - registry = ProviderRegistry(etos, jsontas, None) - registered = register( - registry, - iut_provider=get_iut_provider(request), - log_area_provider=get_log_area_provider(request), - execution_space_provider=get_execution_space_provider(request), - ) - if registered is False: - raise falcon.HTTPBadRequest( - title="Missing parameters", - description="At least one of 'iut_provider', 'log_area_provider' " - "& 'execution_space_provider' is a required parameter.", - ) - response.status = falcon.HTTP_204 - - -FALCON_APP = falcon.App(middleware=[RequireJSON(), JSONTranslator()]) -WEBSERVER = Webserver(APP) -CONFIGURE = Configure() -REGISTER = Register() -FALCON_APP.add_route("/", WEBSERVER) -FALCON_APP.add_route("/configure", CONFIGURE) -FALCON_APP.add_route("/register", REGISTER) diff --git a/tests/backend/__init__.py b/tests/backend/__init__.py deleted file mode 100644 index b7083ce..0000000 --- a/tests/backend/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for the backend services.""" diff --git a/tests/backend/test_common.py b/tests/backend/test_common.py deleted file mode 100644 index fc2986e..0000000 --- a/tests/backend/test_common.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for common functionality.""" -import logging -import unittest - -from tests.library.fake_request import FakeRequest - -from environment_provider_api.backend.common import get_suite_id - - -class TestCommonFunctionality(unittest.TestCase): - """Test the common backend functionality.""" - - logger = logging.getLogger(__name__) - - def test_suite_id(self): - """Test that the configure backend can return suite id. - - Approval criteria: - - The configure backend shall be able to get the suite id from request. - - Test steps: - 1. Get suite id from request via the configure backend. - 2. Verify that the backend returns the correct suite id. - """ - request = FakeRequest() - test_suite_id = "b58415d4-2f39-4ab0-8763-7277e18f9606" - request.fake_params["suite_id"] = test_suite_id - self.logger.info("STEP: Get suite id from request via the configure backend.") - response_suite_id = get_suite_id(request) - - self.logger.info("STEP: Verify that the backend returns the correct suite id.") - self.assertEqual(test_suite_id, response_suite_id) - - def test_suite_id_media_is_none(self): - """Test that the configure backend returns the result of get_param if media is not set. - - Approval criteria: - - The configure backend shall return the value of get_param if media is None. - - Test steps: - 1. Get suite id from request via the configure backend without media. - 2. Verify that the backend returns the suite id. - """ - request = FakeRequest() - request.force_media_none = True - test_suite_id = "b58415d4-2f39-4ab0-8763-7277e18f9606" - request.fake_params["suite_id"] = test_suite_id - self.logger.info("STEP: Get suite id from request via the configure backend without media.") - response_suite_id = get_suite_id(request) - - self.logger.info("STEP: Verify that the backend returns the suite id.") - self.assertEqual(test_suite_id, response_suite_id) - - def test_suite_id_none(self): - """Test that the configure backend returns None if suite id is not set. - - Approval criteria: - - The configure backend shall return None if suite id is not in request. - - Test steps: - 1. Get suite id from request via the configure backend. - 2. Verify that the backend returns None. - """ - self.logger.info("STEP: Get suite id from request via the configure backend.") - response = get_suite_id(FakeRequest()) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response) diff --git a/tests/backend/test_configure.py b/tests/backend/test_configure.py deleted file mode 100644 index 31e223a..0000000 --- a/tests/backend/test_configure.py +++ /dev/null @@ -1,461 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for the configure backend system.""" -import json -import logging -import unittest -from typing import OrderedDict - -from etos_lib import ETOS -from etos_lib.lib.config import Config -from jsontas.jsontas import JsonTas - -from environment_provider.lib.registry import ProviderRegistry -from environment_provider_api.backend.configure import ( - configure, - get_configuration, - get_dataset, - get_execution_space_provider_id, - get_iut_provider_id, - get_log_area_provider_id, -) -from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest - - -class TestConfigureBackend(unittest.TestCase): - """Test the configure backend.""" - - maxDiff = None - - logger = logging.getLogger(__name__) - - def tearDown(self): - """Reset all globally stored data for the next test.""" - Config().reset() - - def test_iut_provider_id(self): - """Test that the configure backend can return IUT provider id. - - Approval criteria: - - The configure backend shall be able to get the IUT provider id from request. - - Test steps: - 1. Get IUT provider id from request via the configure backend. - 2. Verify that the backend returns the correct iut provider id. - """ - request = FakeRequest() - test_provider_id = "test_iut_provider" - request.fake_params["iut_provider"] = test_provider_id - self.logger.info("STEP: Get IUT provider id from request via the configure backend.") - response_provider_id = get_iut_provider_id(request) - - self.logger.info("STEP: Verify that the backend returns the correct iut provider id.") - self.assertEqual(test_provider_id, response_provider_id) - - def test_iut_provider_id_none(self): - """Test that the configure backend returns None if IUT provider id is not set. - - Approval criteria: - - The configure backend shall return None if IUT provider is not in request. - - Test steps: - 1. Get IUT provider from request via the configure backend. - 2. Verify that the backend returns None. - """ - self.logger.info("STEP: Get IUT provider id from request via the configure backend.") - response = get_iut_provider_id(FakeRequest()) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response) - - def test_execution_space_provider_id(self): - """Test that the configure backend can return execution space provider id. - - Approval criteria: - - The configure backend shall be able to get the execution space provider id - from request. - - Test steps: - 1. Get execution space provider id from request via the configure backend. - 2. Verify that the backend returns the correct execution space provider id. - """ - request = FakeRequest() - test_provider_id = "test_execution_space_provider_id" - request.fake_params["execution_space_provider"] = test_provider_id - self.logger.info( - "STEP: Get execution space provider id from request via the configure backend." - ) - response_provider_id = get_execution_space_provider_id(request) - - self.logger.info( - "STEP: Verify that the backend returns the correct execution space provider id." - ) - self.assertEqual(test_provider_id, response_provider_id) - - def test_execution_space_provider_id_none(self): - """Test that the configure backend returns None if execution space provider id is not set. - - Approval criteria: - - The configure backend shall return None if execution space provider is not in request. - - Test steps: - 1. Get execution space provider from request via the configure backend. - 2. Verify that the backend returns None. - """ - self.logger.info( - "STEP: Get execution space provider id from request via the configure backend." - ) - response = get_execution_space_provider_id(FakeRequest()) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response) - - def test_log_area_provider_id(self): - """Test that the configure backend can return log area provider id. - - Approval criteria: - - The configure backend shall be able to get the log area provider id from request. - - Test steps: - 1. Get log area provider id from request via the configure backend. - 2. Verify that the backend returns the correct log area provider id. - """ - request = FakeRequest() - test_provider_id = "test_log_area_provider_id" - request.fake_params["log_area_provider"] = test_provider_id - self.logger.info("STEP: Get log area provider id from request via the configure backend.") - response_provider_id = get_log_area_provider_id(request) - - self.logger.info("STEP: Verify that the backend returns the correct log area provider id.") - self.assertEqual(test_provider_id, response_provider_id) - - def test_log_area_provider_id_none(self): - """Test that the configure backend returns None if log area provider id is not set. - - Approval criteria: - - The configure backend shall return None if log area provider is not in request. - - Test steps: - 1. Get log area provider from request via the configure backend. - 2. Verify that the backend returns None. - """ - self.logger.info("STEP: Get log area provider id from request via the configure backend.") - response = get_log_area_provider_id(FakeRequest()) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response) - - def test_dataset(self): - """Test that the configure backend can return dataset. - - Approval criteria: - - The configure backend shall be able to get the dataset from request. - - Test steps: - 1. Get dataset from request via the configure backend. - 2. Verify that the backend returns the correct dataset. - """ - request = FakeRequest() - test_dataset = {"test_dataset": "my ultimate dataset"} - request.fake_params["dataset"] = json.dumps(test_dataset) - self.logger.info("STEP: Get dataset from request via the configure backend.") - response_dataset = get_dataset(request) - - self.logger.info("STEP: Verify that the backend returns the correct dataset.") - self.assertDictEqual(test_dataset, response_dataset) - - def test_dataset_none(self): - """Test that the configure backend returns None if dataset is not set. - - Approval criteria: - - The configure backend shall return None if dataset is not in request. - - Test steps: - 1. Get dataset from request via the configure backend. - 2. Verify that the backend returns None. - """ - self.logger.info("STEP: Get dataset from request via the configure backend.") - response = get_dataset(FakeRequest()) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response) - - def test_configure(self): - """Test that it is possible to configure the environment provider. - - Approval criteria: - - The configure backend shall set the correct configuration. - - Test steps: - 1. Add providers into the database. - 2. Attempt to configure the environment provider using the provider ids. - 3. Verify that the configuration was stored in the database. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - self.logger.info("STEP: Add providers into the database.") - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - - test_suite_id = "740d1e2a-2309-4c53-beda-569da70c315c" - test_dataset = {"a": "dataset"} - registry = ProviderRegistry(ETOS("", "", ""), JsonTas(), test_suite_id) - self.logger.info( - "STEP: Attempt to configure the environment provider using the provider ids." - ) - success, _ = configure( - registry, - test_iut_provider["iut"]["id"], - test_execution_space_provider["execution_space"]["id"], - test_log_area_provider["log"]["id"], - test_dataset, - ) - - self.logger.info("STEP: Verify that the configuration was stored in the database.") - self.assertTrue(success) - stored_iut_provider = json.loads(database.get(f"/testrun/{test_suite_id}/provider/iut")[0]) - self.assertDictEqual(stored_iut_provider, test_iut_provider) - stored_execution_space_provider = json.loads( - database.get(f"/testrun/{test_suite_id}/provider/execution-space")[0] - ) - self.assertDictEqual(stored_execution_space_provider, test_execution_space_provider) - stored_log_area_provider = json.loads( - database.get(f"/testrun/{test_suite_id}/provider/log-area")[0] - ) - self.assertDictEqual(stored_log_area_provider, test_log_area_provider) - stored_dataset = json.loads(database.get(f"/testrun/{test_suite_id}/provider/dataset")[0]) - self.assertDictEqual(stored_dataset, test_dataset) - - def test_configure_missing_parameter(self): - """Test that the configure backend does not configure if any parameter is missing. - - Approval criteria: - - The configure backend shall return False and not configure if any parameter - is missing. - - Test steps: - 1. Attempt to configure the environment provider without any parameters. - 2. Verify that False was returned and no configuration was made. - """ - database = FakeDatabase() - Config().set("database", database) - self.logger.info( - "STEP: Attempt to configure the environment provider without any parameters." - ) - registry = ProviderRegistry(ETOS("", "", ""), JsonTas(), None) - success, _ = configure(registry, None, None, None, None) - - self.logger.info("STEP: Verify that False was returned and no configuration was made.") - self.assertFalse(success) - self.assertDictEqual(database.db_dict, {}) - - def test_configure_empty_dataset(self): - """Test that it is possible to configure the environment provider if dataset is empty. - - Approval criteria: - - It shall be possible to configure using an empty dataset. - - Test steps: - 1. Add providers into the database. - 2. Attempt to configure the environment provider with an empty dataset. - 3. Verify that the configuration was stored in the database. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - self.logger.info("STEP: Add providers into the database.") - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - - test_suite_id = "740d1e2a-2309-4c53-beda-569da70c315c" - test_dataset = {} - registry = ProviderRegistry(ETOS("", "", ""), JsonTas(), test_suite_id) - self.logger.info( - "STEP: Attempt to configure the environment provider with an empty dataset." - ) - success, _ = configure( - registry, - test_iut_provider["iut"]["id"], - test_execution_space_provider["execution_space"]["id"], - test_log_area_provider["log"]["id"], - test_dataset, - ) - - self.logger.info("STEP: Verify that the configuration was stored in the database.") - self.assertTrue(success) - stored_iut_provider = json.loads(database.get(f"/testrun/{test_suite_id}/provider/iut")[0]) - self.assertDictEqual(stored_iut_provider, test_iut_provider) - stored_execution_space_provider = json.loads( - database.get(f"/testrun/{test_suite_id}/provider/execution-space")[0] - ) - self.assertDictEqual(stored_execution_space_provider, test_execution_space_provider) - stored_log_area_provider = json.loads( - database.get(f"/testrun/{test_suite_id}/provider/log-area")[0] - ) - self.assertDictEqual(stored_log_area_provider, test_log_area_provider) - stored_dataset = json.loads(database.get(f"/testrun/{test_suite_id}/provider/dataset")[0]) - self.assertDictEqual(stored_dataset, test_dataset) - - def test_get_configuration(self): - """Test that it is possible to get a stored configuration. - - Approval criteria: - - It shall be possible to get a stored configuration. - - Test steps: - 1. Store a configuration into the database. - 2. Verify that it is possible to get the stored configuration. - """ - database = FakeDatabase() - Config().set("database", database) - test_suite_id = "8d9344e3-a246-43ec-92b4-fc81ea31067a" - test_dataset = {"dataset": "test"} - test_iut_provider = OrderedDict( - { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - ) - test_execution_space_provider = OrderedDict( - { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - ) - test_log_area_provider = OrderedDict( - { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - ) - self.logger.info("STEP: Store a configuration into the database.") - database.put(f"/testrun/{test_suite_id}/provider/dataset", json.dumps(test_dataset)) - database.put(f"/testrun/{test_suite_id}/provider/iut", json.dumps(test_iut_provider)) - database.put( - f"/testrun/{test_suite_id}/provider/log-area", json.dumps(test_log_area_provider) - ) - database.put( - f"/testrun/{test_suite_id}/provider/execution-space", - json.dumps(test_execution_space_provider), - ) - - self.logger.info("STEP: Verify that it is possible to get the stored configuration.") - registry = ProviderRegistry(ETOS("", "", ""), JsonTas(), test_suite_id) - stored_configuration = get_configuration(registry) - self.assertDictEqual( - stored_configuration, - { - "iut_provider": test_iut_provider["iut"], - "execution_space_provider": test_execution_space_provider["execution_space"], - "log_area_provider": test_log_area_provider["log"], - "dataset": test_dataset, - }, - ) - - def test_get_configuration_missing(self): - """Test that if a configuration is missing, a partial result is returned. - - Approval criteria: - - The configure backend shall return a partial configuration if configuration - is missing. - - Test steps: - 1. Store a faulty configuration into the database. - 2. Verify that it is possible to get the partial configuration. - """ - database = FakeDatabase() - Config().set("database", database) - test_suite_id = "ca51601e-6c9a-4b5d-8038-7dc2561283d2" - test_dataset = {"dataset": "test"} - self.logger.info("STEP: Store a faulty configuration into the database.") - database.put(f"/testrun/{test_suite_id}/provider/dataset", json.dumps(test_dataset)) - - self.logger.info("STEP: Verify that it is possible to get the partial configuration.") - registry = ProviderRegistry(ETOS("", "", ""), JsonTas(), test_suite_id) - stored_configuration = get_configuration(registry) - self.assertDictEqual( - stored_configuration, - { - "dataset": test_dataset, - "iut_provider": None, - "execution_space_provider": None, - "log_area_provider": None, - }, - ) diff --git a/tests/backend/test_environment.py b/tests/backend/test_environment.py deleted file mode 100644 index 433c51d..0000000 --- a/tests/backend/test_environment.py +++ /dev/null @@ -1,282 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for the environment backend system.""" -import json -import logging -import unittest -from typing import OrderedDict - -from etos_lib import ETOS -from etos_lib.lib.config import Config -from jsontas.jsontas import JsonTas -from mock import patch - -from environment_provider_api.backend.common import get_suite_id -from environment_provider_api.backend.environment import ( - check_environment_status, - get_environment_id, - get_release_id, - release_full_environment, - request_environment, -) -from tests.library.fake_celery import FakeCelery, Task -from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest - - -class TestEnvironmentBackend(unittest.TestCase): - """Test the environment backend.""" - - logger = logging.getLogger(__name__) - - def tearDown(self): - """Reset all globally stored data for the next test.""" - Config().reset() - - def test_get_from_request(self): - """Test that it is possible to get all parameters from request. - - Approval criteria: - - It shall be possible to get parameters from request. - - Data driven repetitions: - - Repeat for the following parameters: ["id", "release", "suite_id"] - - Test steps: - 1. For each parameter: - 1.1: Call the function to get the parameter. - 1.2: Verify that the parameter is correct. - """ - requests = ( - ("id", get_environment_id), - ("release", get_release_id), - ("suite_id", get_suite_id), - ) - self.logger.info("STEP: For each parameter:") - for parameter, func in requests: - test_value = f"testing_{parameter}" - request = FakeRequest() - request.fake_params[parameter] = test_value - self.logger.info("STEP: Call the function to get the parameter %r", parameter) - response_value = func(request) - self.logger.info("STEP: Verify that the parameter is correct.") - self.assertEqual(response_value, test_value) - - def test_release_full_environment(self): - """Test that it is possible to release an environment. - - Approval criteria: - - It shall be possible to release an environment. - - Note: - - This is not perfectly testable today due to how the providers are used - when checking in provider items. - - Test steps: - 1. Attempt to release an environment. - 2. Verify that it was possible to release that environment. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = OrderedDict( - { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - ) - test_execution_space_provider = OrderedDict( - { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - ) - test_log_area_provider = OrderedDict( - { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - ) - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - iut = {"id": "test_iut", "provider_id": test_iut_provider["iut"]["id"]} - executor = { - "id": "test_executor", - "provider_id": test_execution_space_provider["execution_space"]["id"], - } - log_area = { - "id": "test_log_area", - "provider_id": test_log_area_provider["log"]["id"], - } - test_suite_id = "ce63f53e-1797-42bb-ae72-861a0b6b7ef6" - jsontas = JsonTas() - etos = ETOS("", "", "") - database.put( - f"/testrun/{test_suite_id}/suite/fakeid/subsuite/fakeid/suite", - json.dumps( - { - "iut": iut, - "executor": executor, - "log_area": log_area, - } - ), - ) - - self.logger.info("STEP: Attempt to release an environment.") - success, _ = release_full_environment(etos, jsontas, test_suite_id) - - self.logger.info("STEP: Verify that it was possible to release that environment.") - self.assertTrue(success) - self.assertListEqual(database.get_prefix("/testrun"), []) - - def test_release_full_environment_failure(self): - """Test that a failure is returned when there is a problem with releasing. - - Approval criteria: - - The environment provider shall return failure if one provider failed. - - Note: - - This is not perfectly testable today due to how the providers are used - when checking in provider items. - - Test steps: - 1. Release an environment where one provider will fail to check in. - 2. Verify that the release return failure. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - "checkin": False, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - - iut = {"id": "test_iut", "provider_id": test_iut_provider["iut"]["id"]} - executor = { - "id": "test_executor", - "provider_id": test_execution_space_provider["execution_space"]["id"], - } - log_area = { - "id": "test_log_area", - "provider_id": test_log_area_provider["log"]["id"], - } - test_suite_id = "ce63f53e-1797-42bb-ae72-861a0b6b7ef6" - jsontas = JsonTas() - etos = ETOS("", "", "") - database.put( - f"/testrun/{test_suite_id}/suite/fakeid/subsuite/fakeid/suite", - json.dumps( - { - "iut": iut, - "executor": executor, - "log_area": log_area, - } - ), - ) - - self.logger.info("STEP: Release an environment where one provider will fail to check in.") - success, _ = release_full_environment(etos, jsontas, test_suite_id) - - self.logger.info("STEP: Verify that the release return failure.") - self.assertFalse(success) - self.assertListEqual(database.get_prefix("/testrun"), []) - - def test_check_environment_status(self): - """Test that it is possible to get the status of an environment. - - Approval criteria: - - The environment provider shall return the status of an environment. - - Test steps: - 1. Get status of an environment. - 2. Verify that the correct status is returned. - """ - environment_id = "49b59fc9-4eab-4747-bd53-c638d95a87ea" - result = {"this": "is", "a": "test"} - status = "PENDING" - worker = FakeCelery(environment_id, status, result) - self.logger.info("STEP: Get status of an environment.") - environment_status = check_environment_status(worker, environment_id) - - self.logger.info("STEP: Verify that the correct status is returned.") - self.assertDictEqual(environment_status, {"status": status, "result": result}) - self.assertIn(environment_id, worker.received) - - @patch("environment_provider_api.backend.environment.get_environment") - def test_request_environment(self, get_environment_mock): - """Test that it is possible to start the environment provider. - - Approval criteria: - - It shall be possible to request an environment from the environment provider. - - Test steps: - 1. Request an environment from the environment provider. - 2. Verify that the environment provider starts the celery task. - """ - task_id = "f3286e6e-946c-4510-a935-abd7c7bdbe17" - get_environment_mock.delay.return_value = Task(task_id) - suite_id = "ca950c50-03d3-4a3c-8507-b4229dd3f8ea" - suite_runner_id = ["dba8267b-d393-4e37-89ee-7657ea286564"] - - self.logger.info("STEP: Request an environment from the environment provider.") - response = request_environment(suite_id, suite_runner_id) - - self.logger.info("STEP: Verify that the environment provider starts the celery task.") - self.assertEqual(response, task_id) - get_environment_mock.delay.assert_called_once_with(suite_id, suite_runner_id) diff --git a/tests/backend/test_register.py b/tests/backend/test_register.py deleted file mode 100644 index d3fc706..0000000 --- a/tests/backend/test_register.py +++ /dev/null @@ -1,376 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for the register backend system.""" -import json -import logging -import unittest - -from etos_lib import ETOS -from etos_lib.lib.config import Config -from jsontas.jsontas import JsonTas - -from environment_provider.lib.registry import ProviderRegistry -from environment_provider_api.backend.register import ( - get_execution_space_provider, - get_iut_provider, - get_log_area_provider, - json_to_dict, - register, -) -from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest - - -class TestRegisterBackend(unittest.TestCase): - """Test the register backend.""" - - logger = logging.getLogger(__name__) - - def tearDown(self): - """Reset all globally stored data for the next test.""" - Config().reset() - - def test_iut_provider(self): - """Test that the register backend can return IUT provider. - - Approval criteria: - - The register backend shall be able to get the IUT provider from request parameters. - - Test steps: - 1. Get IUT provider from request via the register backend. - 2. Verify that the backend returns the correct iut provider. - """ - request = FakeRequest() - test_iut_provider = {"iut": {"id": "providertest"}} - request.fake_params["iut_provider"] = test_iut_provider - self.logger.info("STEP: Get IUT provider from request via the register backend.") - response_iut_provider = get_iut_provider(request) - - self.logger.info("STEP: Verify that the backend returns the correct iut provider.") - self.assertDictEqual(test_iut_provider, response_iut_provider) - - def test_iut_provider_none(self): - """Test that the register backend returns None if IUT provider is not set. - - Approval criteria: - - The register backend shall return None if IUT provider is not in request. - - Test steps: - 1. Get IUT provider from request via the register backend. - 2. Verify that the backend returns None. - """ - request = FakeRequest() - self.logger.info("STEP: Get IUT provider from request via the register backend.") - response_iut_provider = get_iut_provider(request) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response_iut_provider) - - def test_execution_space_provider(self): - """Test that the register backend can return execution space provider. - - Approval criteria: - - The register backend shall be able to get the execution space provider from - request parameters. - - Test steps: - 1. Get execution space provider from request via the register backend. - 2. Verify that the backend returns the correct execution space provider. - """ - request = FakeRequest() - test_execution_space_provider = {"execution_space": {"id": "providertest"}} - request.fake_params["execution_space_provider"] = test_execution_space_provider - self.logger.info( - "STEP: Get execution space provider from request via the register backend." - ) - response_execution_space_provider = get_execution_space_provider(request) - - self.logger.info( - "STEP: Verify that the backend returns the correct execution space provider." - ) - self.assertDictEqual(test_execution_space_provider, response_execution_space_provider) - - def test_execution_space_provider_none(self): - """Test that the register backend returns None if execution space provider is not set. - - Approval criteria: - - The register backend shall return None if execution space provider is not in request. - - Test steps: - 1. Get execution space provider from request via the register backend. - 2. Verify that the backend returns None. - """ - request = FakeRequest() - self.logger.info( - "STEP: Get execution space provider from request via the register backend." - ) - response_execution_space_provider = get_execution_space_provider(request) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response_execution_space_provider) - - def test_log_area_provider(self): - """Test that the register backend can return log area provider. - - Approval criteria: - - The register backend shall be able to get the log area provider from - request parameters. - - Test steps: - 1. Get log area provider from request via the register backend. - 2. Verify that the backend returns the correct log area provider. - """ - request = FakeRequest() - test_log_area_provider = {"log": {"id": "providertest"}} - request.fake_params["log_area_provider"] = test_log_area_provider - self.logger.info("STEP: Get log area provider from request via the register backend.") - response_log_area_provider = get_log_area_provider(request) - - self.logger.info("STEP: Verify that the backend returns the correct log area provider.") - self.assertDictEqual(test_log_area_provider, response_log_area_provider) - - def test_log_area_provider_none(self): - """Test that the register backend returns None if log area provider is not set. - - Approval criteria: - - The register backend shall return None if log area provider is not in request. - - Test steps: - 1. Get log area provider from request via the register backend. - 2. Verify that the backend returns None. - """ - request = FakeRequest() - self.logger.info("STEP: Get log area provider from request via the register backend.") - response_log_area_provider = get_log_area_provider(request) - - self.logger.info("STEP: Verify that the backend returns None.") - self.assertIsNone(response_log_area_provider) - - def test_json_to_dict(self): - """Test that the json to dict function returns a dictionary. - - Approval criteria: - - The json_to_dict function shall convert strings to dictionary. - - Test steps: - 1. Verify that the json_to_dict converts JSON strings to dictionary. - """ - json_string = '{"data": "testing"}' - self.logger.info("STEP: Verify that the json_to_dict converts JSON strings to dictionary.") - json_dict = json_to_dict(json_string) - self.assertDictEqual(json_dict, json.loads(json_string)) - - def test_json_to_dict_none(self): - """Test that the json to dict function returns None. - - Approval criteria: - - The json_to_dict function shall return None if json_str is None. - - Test steps: - 1. Verify that the json_to_dict returns None when json_str is None. - """ - self.logger.info("Verify that the json_to_dict returns None when json_str is None.") - self.assertIsNone(json_to_dict(None)) - - def test_json_to_dict_already_dict(self): - """Test that the json to dict function does not do anything if input is already a dict. - - Approval criteria: - - The json_to_dict function shall return dictionary as it is. - - Test steps: - 1. Verify that the json_to_dict returns the same dictionary when provided with one. - """ - json_dict = {"data": "testing"} - self.logger.info( - "STEP: Verify that the json_to_dict returns the same dictionary when provided with one." - ) - json_dict_response = json_to_dict(json_dict) - self.assertDictEqual(json_dict, json_dict_response) - - def test_register_iut_provider(self): - """Test that the register backend can register iut providers. - - Approval criteria: - - The register backend shall be able to register an IUT provider. - - Test steps: - 1. Register an IUT provider with the register backend. - 2. Verify that the IUT provider was stored in the database. - """ - fake_database = FakeDatabase() - Config().set("database", fake_database) - etos = ETOS("testing_etos", "testing_etos", "testing_etos") - jsontas = JsonTas() - provider = { - "iut": { - "id": "iut_provider_test", - "list": {"possible": [], "available": []}, - } - } - provider_registry = ProviderRegistry(etos, jsontas, None) - self.logger.info("STEP: Register an IUT provider with the register backend.") - response = register(provider_registry, iut_provider=provider) - - self.logger.info("STEP: Verify that the IUT provider was stored in the database.") - stored_provider = json.loads( - fake_database.get("/environment/provider/iut/iut_provider_test")[0] - ) - self.assertDictEqual(stored_provider, provider) - self.assertTrue(response) - - def test_register_log_area_provider(self): - """Test that the register backend can register log area providers. - - Approval criteria: - - The register backend shall be able to register a log area provider. - - Test steps: - 1. Register a log area provider with the register backend. - 2. Verify that the log area provider was stored in the database. - """ - fake_database = FakeDatabase() - Config().set("database", fake_database) - etos = ETOS("testing_etos", "testing_etos", "testing_etos") - jsontas = JsonTas() - provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - provider_registry = ProviderRegistry(etos, jsontas, None) - self.logger.info("STEP: Register a log area provider with the register backend.") - response = register(provider_registry, log_area_provider=provider) - - self.logger.info("STEP: Verify that the log area provider was stored in the database.") - stored_provider = json.loads( - fake_database.get("/environment/provider/log-area/log_area_provider_test")[0] - ) - self.assertDictEqual(stored_provider, provider) - self.assertTrue(response) - - def test_register_execution_space_provider(self): - """Test that the register backend can register execution space providers. - - Approval criteria: - - The register backend shall be able to register an execution space provider. - - Test steps: - 1. Register an execution space provider with the register backend. - 2. Verify that the execution space provider was stored in the database. - """ - fake_database = FakeDatabase() - Config().set("database", fake_database) - etos = ETOS("testing_etos", "testing_etos", "testing_etos") - jsontas = JsonTas() - provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - provider_registry = ProviderRegistry(etos, jsontas, None) - self.logger.info("STEP: Register an execution space provider with the register backend.") - response = register(provider_registry, execution_space_provider=provider) - - self.logger.info( - "STEP: Verify that the execution space provider was stored in the database." - ) - path = "/environment/provider/execution-space/execution_space_provider_test" - stored_provider = json.loads(fake_database.get(path)[0]) - self.assertDictEqual(stored_provider, provider) - self.assertTrue(response) - - def test_register_all_providers(self): - """Test that the register backend can register all providers. - - Approval criteria: - - The register backend shall be able to register all providers. - - Test steps: - 1. Register one of each provider with the register backend. - 2. Verify that the providers were stored in the database. - """ - fake_database = FakeDatabase() - Config().set("database", fake_database) - etos = ETOS("testing_etos", "testing_etos", "testing_etos") - jsontas = JsonTas() - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - provider_registry = ProviderRegistry(etos, jsontas, None) - self.logger.info("STEP: Register one of each provider with the register backend.") - response = register( - provider_registry, - iut_provider=test_iut_provider, - log_area_provider=test_log_area_provider, - execution_space_provider=test_execution_space_provider, - ) - - self.logger.info("STEP: Verify that the providers were stored in the database.") - path = "/environment/provider/execution-space/execution_space_provider_test" - stored_execution_space_provider = json.loads(fake_database.get(path)[0]) - self.assertDictEqual( - stored_execution_space_provider, - test_execution_space_provider, - ) - stored_log_area_provider = json.loads( - fake_database.get("/environment/provider/log-area/log_area_provider_test")[0] - ) - self.assertDictEqual(stored_log_area_provider, test_log_area_provider) - stored_iut_provider = json.loads( - fake_database.get("/environment/provider/iut/iut_provider_test")[0] - ) - self.assertDictEqual(stored_iut_provider, test_iut_provider) - self.assertTrue(response) - - def test_register_provider_none(self): - """Test that the register backend return false if no provider is supplied. - - Approval criteria: - - The register backend shall return False if no provider is supplied. - - Test steps: - 1. Register no provider with the register backend. - 2. Verify that the register backend return False. - """ - fake_database = FakeDatabase() - Config().set("database", fake_database) - etos = ETOS("testing_etos", "testing_etos", "testing_etos") - jsontas = JsonTas() - provider_registry = ProviderRegistry(etos, jsontas, None) - - self.logger.info("STEP: Register no provider with the register backend.") - response = register(provider_registry) - - self.logger.info("STEP: Verify that the register backend return False.") - self.assertFalse(response) diff --git a/tests/library/fake_celery.py b/tests/library/fake_celery.py deleted file mode 100644 index 6ed9277..0000000 --- a/tests/library/fake_celery.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Fake celery library helpers.""" - - -class FakeCeleryResult: - """Fake AsyncResult for celery.""" - - def __init__(self, status, result, task_id, fake_app): - """Init with a celery status and a result dictionary.""" - self.status = status - self.result = result - self.task_id = task_id - self.fake_app = fake_app - - def forget(self): - """Forget a result.""" - self.fake_app.results.pop(self.task_id) - - def get(self): - """Get a result.""" - self.fake_app.received.append(self.task_id) - - -class Task: # pylint:disable=too-few-public-methods - """Fake task.""" - - def __init__(self, task_id): - """Fake task.""" - # pylint:disable=invalid-name - self.id = task_id - - -class FakeCelery: # pylint:disable=too-few-public-methods - """A fake celery application.""" - - def __init__(self, task_id, status, result): - """Init with a task_id status and a result dictionary. - - The results dictionary created after init can be appended - after the fact if necessary. - """ - self.received = [] - self.results = {task_id: FakeCeleryResult(status, result, task_id, self)} - - # pylint:disable=invalid-name - def AsyncResult(self, task_id): - """Get the results of a specific task ID.""" - return self.results.get(task_id) diff --git a/tests/library/fake_request.py b/tests/library/fake_request.py deleted file mode 100644 index 00db202..0000000 --- a/tests/library/fake_request.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""A fake request and response structure that can be used in tests.""" -from falcon.errors import MediaNotFoundError - - -class FakeRequest: # pylint:disable=too-few-public-methods - """Fake request structure.""" - - force_media_none = False - - def __init__(self): - """Init some fake parameters.""" - self.fake_params = {} - - def get_param(self, name): - """Get a parameter from the fake params dictionary. - - This is a fake version of the :obj:`falcon.Request` object. - - :param name: Name of parameter to get. - :type name: str - :return: The value in fake params. - :rtype: any - """ - return self.fake_params.get(name) - - def get_media(self): - """Media is used for POST requests.""" - if self.force_media_none: - raise MediaNotFoundError("") - return self.fake_params - - -class FakeResponse: # pylint:disable=too-few-public-methods - """Fake response structure.""" - - fake_responses = None - media = {} - status = 0 - - def __init__(self): - """Init a fake response storage dict.""" - self.fake_responses = {} - - def __setattr__(self, key, value): - """Set attributes to the fake responses dictionary.""" - if self.fake_responses is not None: - self.fake_responses[key] = value - super().__setattr__(key, value) diff --git a/tests/test_environment_provider.py b/tests/test_environment_provider.py index df001c3..73fab81 100644 --- a/tests/test_environment_provider.py +++ b/tests/test_environment_provider.py @@ -24,10 +24,7 @@ from etos_lib.lib.debug import Debug from environment_provider.environment_provider import get_environment -from environment_provider_api.webserver import Webserver -from tests.library.fake_celery import FakeCelery from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest, FakeResponse from tests.library.fake_server import FakeServer from tests.library.graphql_handler import GraphQLHandler @@ -160,7 +157,6 @@ def test_get_environment_sub_suites(self): json.dumps({"host": server.host}), ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host - os.environ["ETOS_ENVIRONMENT_PROVIDER"] = server.host os.environ["ETOS_API"] = server.host self.logger.info("STEP: Run the environment provider.") @@ -209,7 +205,6 @@ def test_get_environment(self): json.dumps({"host": server.host}), ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host - os.environ["ETOS_ENVIRONMENT_PROVIDER"] = server.host os.environ["ETOS_API"] = server.host self.logger.info("STEP: Run the environment provider.") @@ -261,7 +256,6 @@ def test_get_environment_permutation(self): json.dumps({"host": server.host}), ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host - os.environ["ETOS_ENVIRONMENT_PROVIDER"] = server.host os.environ["ETOS_API"] = server.host self.logger.info("STEP: Run the environment provider.") @@ -311,7 +305,6 @@ def test_get_environment_sub_suites_sequential(self): json.dumps({"host": server.host}), ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host - os.environ["ETOS_ENVIRONMENT_PROVIDER"] = server.host os.environ["ETOS_API"] = server.host self.logger.info("STEP: Run the environment provider.") @@ -325,89 +318,3 @@ def test_get_environment_sub_suites_sequential(self): if event.meta.type == "EiffelEnvironmentDefinedEvent": environments.append(event) self.assertEqual(len(environments), 2) - - def test_release_environment(self): # pylint:disable=too-many-locals - """Test that it is possible to release an environment. - - Approval criteria: - - It shall be possible to release an environment. - - Test steps: - 1. Start up a fake server. - 2. Run the environment provider. - 3. Verify that two environments were sent. - 4. Store the environments in celery task. - 5. Send a release request for that environment. - 6. Verify that the environment was released. - """ - tercc = TERCC_SUB_SUITES - database = FakeDatabase() - Config().set("database", database) - - suite_id = tercc["meta"]["id"] - suite_runner_ids = ["14ffc8d7-572a-4f2f-9382-923de2bcf50a"] - task_id = "d9689ea5-837b-48c1-87b1-3de122b3f2fe" - database.put(f"/environment/{task_id}/suite-id", suite_id) - database.put( - f"/environment/provider/iut/{IUT_PROVIDER['iut']['id']}", json.dumps(IUT_PROVIDER) - ) - database.put( - f"/environment/provider/log-area/{LOG_AREA_PROVIDER['log']['id']}", - json.dumps(LOG_AREA_PROVIDER), - ) - database.put( - ( - "/environment/provider/execution-space/" - f"{EXECUTION_SPACE_PROVIDER['execution_space']['id']}" - ), - json.dumps(EXECUTION_SPACE_PROVIDER), - ) - database.put(f"/testrun/{suite_id}/environment-provider/task-id", task_id) - - database.put(f"/testrun/{suite_id}/tercc", json.dumps(tercc)) - database.put(f"/testrun/{suite_id}/provider/iut", json.dumps(IUT_PROVIDER)) - database.put(f"/testrun/{suite_id}/provider/log-area", json.dumps(LOG_AREA_PROVIDER)) - database.put( - f"/testrun/{suite_id}/provider/execution-space", json.dumps(EXECUTION_SPACE_PROVIDER) - ) - - handler = functools.partial(GraphQLHandler, tercc) - - self.logger.info("STEP: Start up a fake server.") - with FakeServer(None, None, handler) as server: - database.put( - f"/testrun/{suite_id}/provider/dataset", - json.dumps({"host": server.host}), - ) - os.environ["ETOS_GRAPHQL_SERVER"] = server.host - os.environ["ETOS_ENVIRONMENT_PROVIDER"] = server.host - os.environ["ETOS_API"] = server.host - - self.logger.info("STEP: Run the environment provider.") - result = get_environment(suite_id, suite_runner_ids) - print(result) - self.assertIsNone(result.get("error")) - - self.logger.info("STEP: Verify that two environments were sent.") - environments = [] - for event in Debug().events_published: - if event.meta.type == "EiffelEnvironmentDefinedEvent": - environments.append(event) - self.assertEqual(len(environments), 2) - - request = FakeRequest() - request.fake_params = {"release": task_id} - response = FakeResponse() - - self.logger.info("STEP: Store the environments in celery task.") - test_status = "SUCCESS" - worker = FakeCelery(task_id, test_status, result) - - self.logger.info("STEP: Send a release request for that environment.") - environment = Webserver(worker) - environment.on_get(request, response) - - self.logger.info("STEP: Verify that the environment was released.") - self.assertDictEqual(response.media, {"status": test_status}) - self.assertIsNone(worker.results.get(task_id)) - self.assertListEqual(database.get_prefix("/testrun"), []) diff --git a/tests/test_webserver_configure.py b/tests/test_webserver_configure.py deleted file mode 100644 index b94ae96..0000000 --- a/tests/test_webserver_configure.py +++ /dev/null @@ -1,279 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for webserver. Specifically the configure endpoint.""" -import json -import logging -import unittest -from uuid import uuid4 - -import falcon -from etos_lib.lib.config import Config - -from environment_provider_api.webserver import Configure -from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest, FakeResponse - - -class TestConfigure(unittest.TestCase): - """Tests for the configure endpoint.""" - - logger = logging.getLogger(__name__) - - def tearDown(self): - """Reset all globally stored data for the next test.""" - Config().reset() - - def test_get_configuration(self): - """Test that it is possible to get a stored configuration. - - Approval criteria: - - It shall be possible to get a stored configuration. - - Test steps: - 1. Store a configuration in the database. - 2. Send a GET request to the configure endpoint. - 3. Verify that the configuration is the same as in the database. - """ - database = FakeDatabase() - Config().set("database", database) - test_suite_id = "5ef5a01c-8ff9-448d-9ac5-21836a2fa6ff" - test_dataset = {"dataset": "test"} - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - self.logger.info("STEP: Store a configuration in the database.") - database.put(f"/testrun/{test_suite_id}/provider/dataset", json.dumps(test_dataset)) - database.put(f"/testrun/{test_suite_id}/provider/iut", json.dumps(test_iut_provider)) - database.put( - f"/testrun/{test_suite_id}/provider/log-area", json.dumps(test_log_area_provider) - ) - database.put( - f"/testrun/{test_suite_id}/provider/execution-space", - json.dumps(test_execution_space_provider), - ) - - response = FakeResponse() - request = FakeRequest() - request.fake_params["suite_id"] = test_suite_id - self.logger.info("STEP: Send a GET request to the configure endpoint.") - Configure().on_get(request, response) - - self.logger.info("STEP: Verify that the configuration is the same as in the database.") - self.assertEqual(response.status, falcon.HTTP_200) - self.assertDictEqual(response.media.get("iut_provider", {}), test_iut_provider["iut"]) - self.assertDictEqual( - response.media.get("log_area_provider", {}), test_log_area_provider["log"] - ) - self.assertDictEqual( - response.media.get("execution_space_provider", {}), - test_execution_space_provider["execution_space"], - ) - self.assertDictEqual(response.media.get("dataset", {}), test_dataset) - - def test_get_configuration_no_suite_id(self): - """Test that it is not possible to get a configuration without suite id. - - Approval criteria: - - The configure endpoint shall return BadRequest when missing suite id. - - Test steps: - 1. Send a GET request to the configure endpoint without suite id. - 2. Verify that a BadRequest is returned. - """ - database = FakeDatabase() - Config().set("database", database) - response = FakeResponse() - request = FakeRequest() - self.logger.info("STEP: Send a GET request to the configure endpoint without suite id.") - with self.assertRaises(falcon.HTTPBadRequest): - self.logger.info("STEP: Verify that a BadRequest is returned.") - Configure().on_get(request, response) - - def test_configure(self): - """Test that it is possible to configure the environment provider for a suite. - - Approval criteria: - - It shall be possible to configure the environment provider. - - The configure endpoint shall return with the configured IUT, execution space & - log area provider. - - Test steps: - 1. Store some providers in the database. - 2. Send a configure request to use those providers. - 3. Verify that the configuration matches the providers in database. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_suite_id = "2a4cb06d-4ebf-4aaa-a53b-1293194827d8" - self.logger.info("STEP: Store some providers in the database.") - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - - response = FakeResponse() - request = FakeRequest() - request.fake_params = { - "iut_provider": test_iut_provider["iut"]["id"], - "execution_space_provider": test_execution_space_provider["execution_space"]["id"], - "log_area_provider": test_log_area_provider["log"]["id"], - "dataset": {}, - "suite_id": test_suite_id, - } - self.logger.info("STEP: Send a configure request to use those providers.") - Configure().on_post(request, response) - - self.logger.info("STEP: Verify that the configuration matches the providers in database.") - self.assertEqual(response.status, falcon.HTTP_200) - stored_iut_provider = json.loads(database.get(f"/testrun/{test_suite_id}/provider/iut")[0]) - self.assertDictEqual(stored_iut_provider, test_iut_provider) - stored_execution_space_provider = json.loads( - database.get(f"/testrun/{test_suite_id}/provider/execution-space")[0] - ) - self.assertDictEqual(stored_execution_space_provider, test_execution_space_provider) - stored_log_area_provider = json.loads( - database.get(f"/testrun/{test_suite_id}/provider/log-area")[0] - ) - self.assertDictEqual(stored_log_area_provider, test_log_area_provider) - stored_dataset = json.loads(database.get(f"/testrun/{test_suite_id}/provider/dataset")[0]) - self.assertDictEqual(stored_dataset, {}) - - def test_configure_missing_parameters(self): - """Test that it is not possible to configure the environment provider without providers. - - Approval criteria: - - It shall not be possible to configure the environment provider when - missing parameters. - - Test steps: - 1. Store some providers in the database. - 2. For each parameter: - 2.1. Send a configure request missing that parameter. - 2.2. Verify that it was not possible to configure. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - self.logger.info("STEP: Store some providers in the database.") - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - - response = FakeResponse() - request = FakeRequest() - test_params = { - "iut_provider": test_iut_provider["iut"]["id"], - "execution_space_provider": test_execution_space_provider["execution_space"]["id"], - "log_area_provider": test_log_area_provider["log"]["id"], - "dataset": {}, - } - self.logger.info("STEP: For each parameter:") - for parameter in ( - "iut_provider", - "log_area_provider", - "execution_space_provider", - "dataset", - "suite_id", - ): - self.logger.info("Missing parameter: %s", parameter) - # Make sure we get a new suite id for each test. - # that way we don't have to clear the database every time. - test_suite_id = str(uuid4()) - test_params["suite_id"] = test_suite_id - - request.fake_params = test_params.copy() - request.fake_params.pop(parameter) - - self.logger.info("STEP: Send a configure request missing that parameter.") - with self.assertRaises(falcon.HTTPBadRequest): - Configure().on_post(request, response) - - self.logger.info("STEP: Verify that it was not possible to configure.") - self.assertListEqual(database.get(f"/testrun/{test_suite_id}/provider/iut"), []) - self.assertListEqual( - database.get(f"/testrun/{test_suite_id}/provider/execution-space"), [] - ) - self.assertListEqual(database.get(f"/testrun/{test_suite_id}/provider/log-area"), []) - self.assertListEqual(database.get(f"/testrun/{test_suite_id}/provider/dataset"), []) diff --git a/tests/test_webserver_environment.py b/tests/test_webserver_environment.py deleted file mode 100644 index 9b554ff..0000000 --- a/tests/test_webserver_environment.py +++ /dev/null @@ -1,193 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for webserver. Specifically the environment endpoint.""" -import json -import logging -import unittest - -import falcon -from etos_lib.lib.config import Config -from mock import patch - -from environment_provider_api.webserver import Webserver -from tests.library.fake_celery import FakeCelery, Task -from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest, FakeResponse - - -class TestEnvironment(unittest.TestCase): - """Tests for the environment endpoint.""" - - logger = logging.getLogger(__name__) - - def tearDown(self): - """Reset all globally stored data for the next test.""" - Config().reset() - - def test_release_environment(self): # pylint:disable=too-many-locals - """Test that it is possible to release an environment. - - Approval criteria: - - It shall be possible to release an environment. - - Test steps: - 1. Store an environment i celery task. - 2. Send a release request for that environment. - 3. Verify that the environment was released. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - database.put( - f"/environment/provider/iut/{test_iut_provider['iut']['id']}", - json.dumps(test_iut_provider), - ) - provider_id = test_execution_space_provider["execution_space"]["id"] - database.put( - f"/environment/provider/execution-space/{provider_id}", - json.dumps(test_execution_space_provider), - ) - database.put( - f"/environment/provider/log-area/{test_log_area_provider['log']['id']}", - json.dumps(test_log_area_provider), - ) - task_id = "d9689ea5-837b-48c1-87b1-3de122b3f2fe" - suite_id = "6d779f97-67e7-4dfa-8260-c053fc0d7a2c" - database.put(f"/environment/{task_id}/suite-id", suite_id) - database.put(f"/testrun/{suite_id}/environment-provider/task-id", task_id) - request = FakeRequest() - request.fake_params = {"release": task_id} - response = FakeResponse() - - iut = {"id": "test_iut", "provider_id": test_iut_provider["iut"]["id"]} - executor = { - "id": "test_executor", - "provider_id": test_execution_space_provider["execution_space"]["id"], - } - log_area = { - "id": "test_log_area", - "provider_id": test_log_area_provider["log"]["id"], - } - - self.logger.info("STEP: Store an environment i celery task.") - test_status = "SUCCESS" - worker = FakeCelery( - task_id, - test_status, - { - "suites": [ - { - "iut": iut, - "executor": executor, - "log_area": log_area, - } - ] - }, - ) - - self.logger.info("STEP: Send a release request for that environment.") - environment = Webserver(worker) - environment.on_get(request, response) - - self.logger.info("STEP: Verify that the environment was released.") - self.assertDictEqual(response.media, {"status": test_status}) - self.assertIsNone(worker.results.get(task_id)) - - def test_get_environment_status(self): - """Test that it is possible to get status from an environment. - - Approval criteria: - - It shall be possible to get status from environment that is being checked out. - - Test steps: - 1. Store a PENDING environment request in a celery task. - 2. Send a status request for that environment. - 3. Verify that the status for the environment was returned. - """ - task_id = "d9689ea5-837b-48c1-87b1-3de122b3f2fe" - database = FakeDatabase() - Config().set("database", database) - request = FakeRequest() - request.fake_params = {"id": task_id} - response = FakeResponse() - test_result = {"this": "is", "results": ":)"} - test_status = "PENDING" - - self.logger.info("STEP: Store a PENDING environment request in a celery task.") - celery_worker = FakeCelery(task_id, test_status, test_result) - - self.logger.info("STEP: Send a status request for that environment.") - environment = Webserver(celery_worker) - environment.on_get(request, response) - - self.logger.info("STEP: Verify that the status for the environment was returned.") - self.assertEqual(response.status, falcon.HTTP_200) - self.assertDictEqual(response.media, {"status": test_status, "result": test_result}) - - @patch("environment_provider_api.backend.environment.get_environment") - def test_get_environment(self, get_environment_mock): - """Test that it is possible to get environments from the environment provider. - - Approval criteria: - - It shall be possible to get an environment from the environment provider. - - Note: - - This test is mocked due to not wanting to run celery tasks. - - Test steps: - 1. Send a request for an environment. - 2. Verify that the environment provider gets an environment. - """ - task_id = "f3286e6e-946c-4510-a935-abd7c7bdbe17" - database = FakeDatabase() - Config().set("database", database) - get_environment_mock.delay.return_value = Task(task_id) - celery_worker = FakeCelery(task_id, "", {}) - suite_id = "ca950c50-03d3-4a3c-8507-b4229dd3f8ea" - suite_runner_ids = ( - "835cd892-7eda-408a-9e4c-84aaa71d05be,50146754-8b4f-4253-b5a9-2ee56960612c" - ) - request = FakeRequest() - request.fake_params = { - "suite_id": suite_id, - "suite_runner_ids": suite_runner_ids, - } - response = FakeResponse() - - self.logger.info("STEP: Send a request for an environment.") - environment = Webserver(celery_worker) - environment.on_post(request, response) - - self.logger.info("STEP: Verify that the environment provider gets an environment.") - self.assertEqual(response.media, {"result": "success", "data": {"id": task_id}}) - get_environment_mock.delay.assert_called_once_with(suite_id, suite_runner_ids.split(",")) diff --git a/tests/test_webserver_register.py b/tests/test_webserver_register.py deleted file mode 100644 index cf74681..0000000 --- a/tests/test_webserver_register.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for webserver. Specifically the register endpoint.""" -import json -import logging -import unittest - -import falcon -from etos_lib.lib.config import Config - -from environment_provider_api.webserver import Register -from tests.library.fake_database import FakeDatabase -from tests.library.fake_request import FakeRequest, FakeResponse - - -class TestRegister(unittest.TestCase): - """Tests for the register endpoint.""" - - logger = logging.getLogger(__name__) - - def tearDown(self): - """Reset all globally stored data for the next test.""" - Config().reset() - - def test_register_all_providers(self): - """Test that it is possible to register providers via the register endpoint - - Approval criteria: - - It shall be possible to register providers using the endpoint. - - Test steps: - 1. Send a register request for new providers. - 2. Verify that the new providers were registered in the database. - """ - database = FakeDatabase() - Config().set("database", database) - test_iut_provider = { - "iut": { - "id": "iut_provider_test", - "list": {"available": [], "possible": []}, - } - } - test_execution_space_provider = { - "execution_space": { - "id": "execution_space_provider_test", - "list": {"available": [{"identifier": "123"}], "possible": []}, - } - } - test_log_area_provider = { - "log": { - "id": "log_area_provider_test", - "list": {"available": [], "possible": []}, - } - } - fake_request = FakeRequest() - fake_request.fake_params = { - "iut_provider": json.dumps(test_iut_provider), - "execution_space_provider": json.dumps(test_execution_space_provider), - "log_area_provider": json.dumps(test_log_area_provider), - } - fake_response = FakeResponse() - self.logger.info("STEP: Send a register request for new providers.") - Register().on_post(fake_request, fake_response) - - self.logger.info("STEP: Verify that the new providers were registered in the database.") - self.assertEqual(fake_response.fake_responses.get("status"), falcon.HTTP_204) - stored_execution_space_provider = json.loads( - database.get("/environment/provider/execution-space/execution_space_provider_test")[0] - ) - self.assertDictEqual( - stored_execution_space_provider, - test_execution_space_provider, - ) - stored_log_area_provider = json.loads( - database.get("/environment/provider/log-area/log_area_provider_test")[0] - ) - self.assertDictEqual(stored_log_area_provider, test_log_area_provider) - stored_iut_provider = json.loads( - database.get("/environment/provider/iut/iut_provider_test")[0] - ) - self.assertDictEqual(stored_iut_provider, test_iut_provider) - - def test_register_no_providers(self): - """Test that it is not possible to register no providers. - - Approval criteria: - - It shall not be possible to register no providers. - - Test steps: - 1. Send a register request with no providers. - 2. Verify that a 400 Bad Request is returned. - """ - database = FakeDatabase() - Config().set("database", database) - fake_request = FakeRequest() - fake_response = FakeResponse() - self.logger.info("STEP: Send a register request with no providers.") - with self.assertRaises(falcon.HTTPBadRequest): - self.logger.info("STEP: Verify that a 400 Bad Request is returned.") - Register().on_post(fake_request, fake_response)