diff --git a/docs/conf.py b/docs/conf.py index e4c6495..cc72923 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -92,8 +92,8 @@ master_doc = "index" # General information about the project. -project = u"etos_suite_runner" -copyright = u"2020, Axis Communications AB" +project = "etos_suite_runner" +copyright = "Axis Communications AB" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -241,7 +241,7 @@ # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ("index", "user_guide.tex", u"etos_suite_runner Documentation", u"", "manual"), + ("index", "user_guide.tex", "etos_suite_runner Documentation", "", "manual"), ] # The name of an image file (relative to this directory) to place at the top of diff --git a/pylintrc b/pylintrc index a7b08fa..ea32629 100644 --- a/pylintrc +++ b/pylintrc @@ -1,3 +1,3 @@ [messages control] disable= - duplicate-code + duplicate-code,fixme \ No newline at end of file diff --git a/src/etos_suite_runner/__main__.py b/src/etos_suite_runner/__main__.py index 1a4672a..a83b542 100644 --- a/src/etos_suite_runner/__main__.py +++ b/src/etos_suite_runner/__main__.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2020-2021 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -20,12 +20,15 @@ import logging import traceback import signal +import threading +from uuid import uuid4 from etos_lib import ETOS from etos_lib.logging.logger import FORMAT_CONFIG from etos_suite_runner.lib.runner import SuiteRunner from etos_suite_runner.lib.esr_parameters import ESRParameters +from etos_suite_runner.lib.exceptions import EnvironmentProviderException # Remove spam from pika. logging.getLogger("pika").setLevel(logging.WARNING) @@ -34,15 +37,6 @@ BASE_DIR = os.path.dirname(os.path.relpath(__file__)) -class EnvironmentProviderException(Exception): - """Exception from EnvironmentProvider.""" - - def __init__(self, msg, task_id): - """Initialize with task_id.""" - self.task_id = task_id - super().__init__(msg) - - class ESR: # pylint:disable=too-many-instance-attributes """Suite runner for ETOS main program. @@ -67,13 +61,19 @@ def __init__(self): int(os.getenv("ESR_WAIT_FOR_ENVIRONMENT_TIMEOUT")), ) - def _request_environment(self): + def _request_environment(self, ids): """Request an environment from the environment provider. + :param ids: Generated suite runner IDs used to correlate environments and the suite + runners. + :type ids: list :return: Task ID and an error message. :rtype: tuple """ - params = {"suite_id": self.params.tercc.meta.event_id} + params = { + "suite_id": self.params.tercc.meta.event_id, + "suite_runner_ids": ",".join(ids), + } wait_generator = self.etos.http.retry( "POST", self.etos.debug.environment_provider, json=params ) @@ -92,13 +92,11 @@ def _request_environment(self): return None, str(exception) return task_id, "" - def _wait_for_environment(self, task_id): + def _get_environment_status(self, task_id): """Wait for an environment being provided. :param task_id: Task ID to wait for. :type task_id: str - :return: Environment and an error message. - :rtype: tuple """ timeout = self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT") wait_generator = self.etos.utils.wait( @@ -107,30 +105,26 @@ def _wait_for_environment(self, task_id): timeout=timeout, params={"id": task_id}, ) - environment = None result = {} response = None for generator in wait_generator: for response in generator: - result = response.get("result", {}) - if response and result and result.get("error") is None: - environment = response + result = ( + response.get("result", {}) + if response.get("result") is not None + else {} + ) + self.params.set_status(response.get("status"), result.get("error")) + if response and result: break - if result and result.get("error"): - return None, result.get("error") - if environment is not None: + if response and result: break else: - if result and result.get("error"): - return None, result.get("error") - return ( - None, - ( - "Unknown Error: Did not receive an environment " - f"within {self.etos.debug.default_http_timeout}s" - ), + self.params.set_status( + "FAILURE", + "Unknown Error: Did not receive an environment " + f"within {self.etos.debug.default_http_timeout}s", ) - return environment, "" def _release_environment(self, task_id): """Release an environment from the environment provider. @@ -145,21 +139,23 @@ def _release_environment(self, task_id): if response: break - def _reserve_workers(self): - """Reserve workers for test.""" + def _reserve_workers(self, ids): + """Reserve workers for test. + + :param ids: Generated suite runner IDs used to correlate environments and the suite + runners. + :type ids: list + :return: The environment provider task ID + :rtype: str + """ LOGGER.info("Request environment from environment provider") - task_id, msg = self._request_environment() + task_id, msg = self._request_environment(ids) if task_id is None: raise EnvironmentProviderException(msg, task_id) + return task_id - LOGGER.info("Wait for environment to become ready.") - environment, msg = self._wait_for_environment(task_id) - if environment is None: - raise EnvironmentProviderException(msg, task_id) - return environment, task_id - - def run_suite(self, triggered): - """Trigger an activity and starts the actual test runner. + def run_suites(self, triggered): + """Start up a suite runner handling multiple suites that execute within test runners. Will only start the test activity if there's a 'slot' available. @@ -173,22 +169,30 @@ def run_suite(self, triggered): ) runner = SuiteRunner(self.params, self.etos, context) + ids = [] + for suite in self.params.test_suite: + suite["test_suite_started_id"] = str(uuid4()) + ids.append(suite["test_suite_started_id"]) + task_id = None try: LOGGER.info("Wait for test environment.") - environment, task_id = self._reserve_workers() + task_id = self._reserve_workers(ids) + self.etos.config.set("task_id", task_id) + threading.Thread( + target=self._get_environment_status, args=(task_id,), daemon=True + ).start() self.etos.events.send_activity_started(triggered, {"CONTEXT": context}) LOGGER.info("Starting ESR.") - runner.run(environment.get("result")) + runner.start_suites_and_wait() except EnvironmentProviderException as exception: task_id = exception.task_id - raise - finally: LOGGER.info("Release test environment.") if task_id is not None: self._release_environment(task_id) + raise @staticmethod def verify_input(): @@ -239,7 +243,7 @@ def run(self): raise try: - self.run_suite(triggered) + self.run_suites(triggered) self.etos.events.send_activity_finished( triggered, {"conclusion": "SUCCESSFUL"}, {"CONTEXT": context} ) diff --git a/src/etos_suite_runner/lib/esr_parameters.py b/src/etos_suite_runner/lib/esr_parameters.py index 6de1338..b3faf1c 100644 --- a/src/etos_suite_runner/lib/esr_parameters.py +++ b/src/etos_suite_runner/lib/esr_parameters.py @@ -1,4 +1,4 @@ -# Copyright 2020 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -17,6 +17,7 @@ import os import json import logging +from threading import Lock from packageurl import PackageURL from eiffellib.events import EiffelTestExecutionRecipeCollectionCreatedEvent @@ -44,11 +45,20 @@ class ESRParameters: """Parameters required for ESR.""" logger = logging.getLogger("ESRParameters") + lock = Lock() + __test_suite = None def __init__(self, etos): """ESR parameters instance.""" self.etos = etos self.issuer = {"name": "ETOS Suite Runner"} + self.environment_status = {"status": "NOT_STARTED", "error": None} + + def set_status(self, status, error): + """Set environment provider status.""" + with self.lock: + self.environment_status["status"] = status + self.environment_status["error"] = error def get_node(self, response): """Get a single node from a GraphQL response. @@ -106,6 +116,28 @@ def tercc(self): self.etos.config.set("tercc", tercc) return self.etos.config.get("tercc") + @property + def test_suite(self): + """Download and return test batches. + + :return: Batches. + :rtype: list + """ + with self.lock: + if self.__test_suite is None: + tercc = self.tercc.json + batch_uri = tercc.get("data", {}).get("batchesUri") + json_header = {"Accept": "application/json"} + json_response = self.etos.http.wait_for_request( + batch_uri, + headers=json_header, + ) + response = {} + for response in json_response: + break + self.__test_suite = response + return self.__test_suite if self.__test_suite else [] + @property def product(self): """Product name from artifact created event. diff --git a/src/etos_suite_runner/lib/exceptions.py b/src/etos_suite_runner/lib/exceptions.py new file mode 100644 index 0000000..29cc663 --- /dev/null +++ b/src/etos_suite_runner/lib/exceptions.py @@ -0,0 +1,25 @@ +# Copyright 2022 Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ESR exceptions.""" + + +class EnvironmentProviderException(Exception): + """Exception from EnvironmentProvider.""" + + def __init__(self, msg, task_id): + """Initialize with task_id.""" + self.task_id = task_id + super().__init__(msg) diff --git a/src/etos_suite_runner/lib/executor.py b/src/etos_suite_runner/lib/executor.py index 11866f2..4ccf6b1 100644 --- a/src/etos_suite_runner/lib/executor.py +++ b/src/etos_suite_runner/lib/executor.py @@ -1,4 +1,4 @@ -# Copyright 2020 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -52,8 +52,8 @@ def __auth(username, password, type="basic"): # pylint:disable=redefined-builti def run_tests(self, test_suite): """Run tests in jenkins. - :param test_file: Tests to execute. - :type test_file: dict + :param test_suite: Tests to execute. + :type test_suite: dict """ executor = test_suite.get("executor") request = executor.get("request") diff --git a/src/etos_suite_runner/lib/graphql.py b/src/etos_suite_runner/lib/graphql.py index 233f54e..4eb3c7e 100644 --- a/src/etos_suite_runner/lib/graphql.py +++ b/src/etos_suite_runner/lib/graphql.py @@ -1,4 +1,4 @@ -# Copyright 2020-2021 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -15,9 +15,9 @@ # limitations under the License. """Graphql query handler module.""" from .graphql_queries import ( - ACTIVITY_TRIGGERED, TEST_SUITE_STARTED, TEST_SUITE_FINISHED, + ENVIRONMENTS, ) @@ -35,39 +35,17 @@ def request(etos, query): yield from wait_generator -def request_activity(etos, suite_id): - """Request an activity event from graphql. - - :param etos: ETOS client instance. - :type etos: :obj:`etos_lib.etos.Etos` - :param suite_id: ID of execution recipe triggering this activity. - :type suite_id: str - :return: Response from graphql or None - :rtype: dict or None - """ - for response in request(etos, ACTIVITY_TRIGGERED % suite_id): - if response: - try: - _, activity = next( - etos.graphql.search_for_nodes(response, "activityTriggered") - ) - except StopIteration: - return None - return activity - return None - - -def request_test_suite_started(etos, activity_id): +def request_test_suite_started(etos, main_suite_id): """Request test suite started from graphql. :param etos: ETOS client instance. :type etos: :obj:`etos_lib.etos.Etos` - :param activity_id: ID of activity in which the test suites started - :type activity_id: str + :param main_suite_id: ID of test suite which caused the test suites started + :type main_suite_id: str :return: Iterator of test suite started graphql responses. :rtype: iterator """ - for response in request(etos, TEST_SUITE_STARTED % activity_id): + for response in request(etos, TEST_SUITE_STARTED % main_suite_id): if response: for _, test_suite_started in etos.graphql.search_for_nodes( response, "testSuiteStarted" @@ -103,3 +81,23 @@ def request_test_suite_finished(etos, test_suite_ids): yield test_suite_finished return None # StopIteration return None # StopIteration + + +def request_environment_defined(etos, activity_id): + """Request environment defined from graphql. + + :param etos: ETOS client instance. + :type etos: :obj:`etos_lib.etos.Etos` + :param activity_id: ID of activity in which the environment defined are sent + :type activity_id: str + :return: Iterator of environment defined graphql responses. + :rtype: iterator + """ + for response in request(etos, ENVIRONMENTS % activity_id): + if response: + for _, environment in etos.graphql.search_for_nodes( + response, "environmentDefined" + ): + yield environment + return None # StopIteration + return None # StopIteration diff --git a/src/etos_suite_runner/lib/graphql_queries.py b/src/etos_suite_runner/lib/graphql_queries.py index c4e0475..d127541 100644 --- a/src/etos_suite_runner/lib/graphql_queries.py +++ b/src/etos_suite_runner/lib/graphql_queries.py @@ -1,4 +1,4 @@ -# Copyright 2020 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -14,11 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. """Graphql queries.""" -ACTIVITY_TRIGGERED = """ +TEST_SUITE_STARTED = """ { - activityTriggered(search: "{'links.type': 'CAUSE', 'links.target': '%s'}") { + testSuiteStarted(search:"{'links.type': 'CAUSE', 'links.target': '%s'}") { edges { node { + data { + testSuiteCategories { + type + } + } meta { id } @@ -28,34 +33,33 @@ } """ -TEST_SUITE_STARTED = """ +TEST_SUITE_FINISHED = """ { - testSuiteStarted(search:"{'links.type': 'CONTEXT', 'links.target': '%s'}") { + testSuiteFinished(search: "%s") { edges { node { data { - testSuiteCategories { - type + testSuiteOutcome { + verdict } } - meta { - id - } } } } } """ -TEST_SUITE_FINISHED = """ +ENVIRONMENTS = """ { - testSuiteFinished(search: "%s") { + environmentDefined(search:"{'links.type': 'CONTEXT', 'links.target': '%s'}") { edges { node { data { - testSuiteOutcome { - verdict - } + name + uri + } + meta { + id } } } diff --git a/src/etos_suite_runner/lib/log_filter.py b/src/etos_suite_runner/lib/log_filter.py index 2ef80d8..a07d0d4 100644 --- a/src/etos_suite_runner/lib/log_filter.py +++ b/src/etos_suite_runner/lib/log_filter.py @@ -40,7 +40,7 @@ def filter(self, record): :return: Whether or not to filter. :rtype: bool """ - msg = str(record.msg) + msg = f"{record.msg}{record.args}" is_duplicate = msg in self.msgs if not is_duplicate: self.msgs.append(msg) diff --git a/src/etos_suite_runner/lib/result_handler.py b/src/etos_suite_runner/lib/result_handler.py index 96fa019..672c657 100644 --- a/src/etos_suite_runner/lib/result_handler.py +++ b/src/etos_suite_runner/lib/result_handler.py @@ -18,7 +18,6 @@ import logging from .log_filter import DuplicateFilter from .graphql import ( - request_activity, request_test_suite_finished, request_test_suite_started, ) @@ -27,19 +26,24 @@ class ResultHandler: """ESR test result handler.""" - activity_id = None - logger = logging.getLogger("ESR - ResultHandler") + results = None + expected_number_of_suites = 0 - def __init__(self, etos): + def __init__(self, etos, test_suite_started): """ESR test result handler.""" self.etos = etos + self.test_suite_started = test_suite_started.json + test_suite_name = self.test_suite_started["meta"]["id"] + self.logger = logging.getLogger(f"ESR - ResultHandler({test_suite_name})") self.events = {} @property def has_started(self): """Whether or not test suites have started.""" - expected_number_of_suites = self.etos.config.get("nbr_of_suites") - return len(self.events.get("subSuiteStarted", [])) == expected_number_of_suites + return ( + len(self.events.get("subSuiteStarted", [])) + == self.expected_number_of_suites + ) @property def has_finished(self): @@ -54,8 +58,7 @@ def has_finished(self): if not self.events.get("subSuiteFinished"): return False nbr_of_finished = len(self.events.get("subSuiteFinished")) - expected_number_of_suites = self.etos.config.get("nbr_of_suites") - return nbr_of_finished == expected_number_of_suites + return nbr_of_finished == self.expected_number_of_suites @property def test_suites_finished(self): @@ -73,7 +76,7 @@ def test_results(self): conclusion = "SUCCESSFUL" description = "" - if self.etos.config.get("results") is None: + if self.results is None: verdict = "INCONCLUSIVE" conclusion = "FAILED" description = "Did not receive test results from sub suites." @@ -95,42 +98,29 @@ def test_results(self): description = "No description received from ESR or ETR." return verdict, conclusion, description - def get_events(self, suite_id): + def get_events(self): """Get events from an activity started from suite id. - :param suite_id: ID of test execution recipe that triggered this activity. - :type suite_id: str :return: Dictionary of all events generated for this suite. :rtype: dict """ self.logger.info("Requesting events from GraphQL") - if self.activity_id is None: - self.logger.info("Getting activity ID.") - activity = request_activity(self.etos, suite_id) - if activity is None: - self.logger.warning("Activity ID not found yet.") - return - self.logger.info("Activity id: %r", activity["meta"]["id"]) - self.activity_id = activity["meta"]["id"] - - expected_number_of_suites = self.etos.config.get("nbr_of_suites") - self.logger.info("Expected number of suites: %r", expected_number_of_suites) + self.logger.info( + "Expected number of suites: %r", self.expected_number_of_suites + ) events = { "subSuiteStarted": [], "subSuiteFinished": [], } - main_suite = self.etos.config.get("test_suite_started") - self.logger.info("Main suite: %r", main_suite["meta"]["id"]) - if len(self.events.get("subSuiteStarted", [])) != expected_number_of_suites: + main_suite_id = self.test_suite_started["meta"]["id"] + self.logger.info("Main suite: %r", main_suite_id) + if ( + len(self.events.get("subSuiteStarted", [])) + != self.expected_number_of_suites + ): self.logger.info("Getting subSuiteStarted") - started = [ - test_suite_started - for test_suite_started in request_test_suite_started( - self.etos, self.activity_id - ) - if test_suite_started["meta"]["id"] != main_suite["meta"]["id"] - ] + started = list(request_test_suite_started(self.etos, main_suite_id)) if not started: self.logger.info("No subSuitesStarted yet.") self.events = events @@ -143,7 +133,10 @@ def get_events(self, suite_id): started_ids = [ test_suite_started["meta"]["id"] for test_suite_started in started ] - if len(self.events.get("subSuiteFinished", [])) != expected_number_of_suites: + if ( + len(self.events.get("subSuiteFinished", [])) + != self.expected_number_of_suites + ): self.logger.info("Getting subSuiteFinished") finished = list(request_test_suite_finished(self.etos, started_ids)) if not finished: @@ -155,20 +148,23 @@ def get_events(self, suite_id): events["subSuiteFinished"] = self.events.get("subSuiteFinished", []) self.events = events - def wait_for_test_suite_finished(self): - """Wait for test suites to finish.""" - tercc = self.etos.config.get("tercc") + def wait_for_test_suite_finished(self, expected): + """Wait for test suites to finish. + + :param expected: Expected number of test suites. + :type expected: int + """ + self.expected_number_of_suites = expected timeout = time.time() + self.etos.debug.default_test_result_timeout print_once = False with DuplicateFilter(self.logger): while time.time() < timeout: time.sleep(10) - self.get_events(tercc.meta.event_id) - expected_number_of_suites = self.etos.config.get("nbr_of_suites") + self.get_events() self.logger.info( "Expected number of test suites: %r, currently active: %r", - expected_number_of_suites, + self.expected_number_of_suites, len(self.events.get("subSuiteStarted", [])), ) if not self.has_started: @@ -178,7 +174,7 @@ def wait_for_test_suite_finished(self): self.logger.info("Test suites have started.") if self.has_finished: self.logger.info("Test suites have finished.") - self.etos.config.set("results", self.events) + self.results = self.events return True self.logger.info("Waiting for test suites to finish.") diff --git a/src/etos_suite_runner/lib/runner.py b/src/etos_suite_runner/lib/runner.py index ba535f2..65d107c 100644 --- a/src/etos_suite_runner/lib/runner.py +++ b/src/etos_suite_runner/lib/runner.py @@ -1,4 +1,4 @@ -# Copyright 2020-2021 Axis Communications AB. +# Copyright 2020-2022 Axis Communications AB. # # For a full list of individual contributors, please see the commit history. # @@ -16,10 +16,18 @@ """ETOS suite runner executor.""" import logging import time -import threading +from threading import Lock, Thread +from multiprocessing.pool import ThreadPool + +from etos_lib.logging.logger import FORMAT_CONFIG +from eiffellib.events.eiffel_test_suite_started_event import EiffelTestSuiteStartedEvent from etos_suite_runner.lib.result_handler import ResultHandler from etos_suite_runner.lib.executor import Executor +from etos_suite_runner.lib.exceptions import EnvironmentProviderException +from etos_suite_runner.lib.graphql import ( + request_environment_defined, +) class SuiteRunner: # pylint:disable=too-few-public-methods @@ -29,8 +37,9 @@ class SuiteRunner: # pylint:disable=too-few-public-methods Starts ETOS test runner (ETR) and sends out a test suite finished. """ - test_suite_started = None - result_handler = None + lock = Lock() + environment_provider_done = False + error = False logger = logging.getLogger("ESR - Runner") def __init__(self, params, etos, context): @@ -45,37 +54,133 @@ def __init__(self, params, etos, context): """ self.params = params self.etos = etos - self.result_handler = ResultHandler(self.etos) - self.context = context + self.sub_suites = {} - def _run_etr_and_wait(self, environment): - """Run ETR based on number of IUTs and wait for them to finish. + def _release_environment(self, task_id): + """Release an environment from the environment provider. + + :param task_id: Task ID to release. + :type task_id: str + """ + wait_generator = self.etos.http.wait_for_request( + self.etos.debug.environment_provider, params={"release": task_id} + ) + for response in wait_generator: + if response: + break + + def _run_etr(self, environment): + """Trigger an instance of ETR. :param environment: Environment which to execute in. :type environment: dict - :return: List of test results from all ETR instances. - :rtype: list """ + executor = Executor(self.etos) + executor.run_tests(environment) + + def _environments(self): + """Get environments for all test suites in this ETOS run.""" + FORMAT_CONFIG.identifier = self.params.tercc.meta.event_id + downloaded = [] + status = { + "status": "FAILURE", + "error": "Couldn't collect any error information", + } + timeout = time.time() + self.etos.config.get("WAIT_FOR_ENVIRONMENT_TIMEOUT") + while time.time() < timeout: + status = self.params.environment_status + self.logger.info(status) + for environment in request_environment_defined(self.etos, self.context): + if environment["meta"]["id"] in downloaded: + continue + suite = self._download_sub_suite(environment) + if self.error: + break + downloaded.append(environment["meta"]["id"]) + if suite is None: # Not a real sub suite environment defined event. + continue + with self.lock: + self.sub_suites.setdefault(suite["test_suite_started_id"], []) + self.sub_suites[suite["test_suite_started_id"]].append(suite) + # We must have found at least one environment for each test suite. + if status["status"] != "PENDING" and len(downloaded) >= len( + self.params.test_suite + ): + self.environment_provider_done = True + break + time.sleep(5) + if status["status"] == "FAILURE": + self.error = EnvironmentProviderException( + status["error"], self.etos.config.get("task_id") + ) + + def _download_sub_suite(self, environment): + """Download a sub suite from an EnvironmentDefined event. + + :param environment: Environment defined event to download from. + :type environment: dict + :return: Downloaded sub suite information. + :rtype: dict + """ + if environment["data"].get("uri") is None: + return None + uri = environment["data"]["uri"] + json_header = {"Accept": "application/json"} + json_response = self.etos.http.wait_for_request( + uri, + headers=json_header, + ) + suite = {} + for suite in json_response: + break + else: + self.error = Exception("Could not download sub suite instructions") + return suite + + def _sub_suites(self, main_suite_id): + """Get all sub suites that correlates with ID. + + :param main_suite_id: Main suite ID to correlate sub suites to. + :type main_suite_id: str + :return: Each correlated sub suite. + :rtype: Iterator + """ + while not self.error: + downloaded_all = self.environment_provider_done + time.sleep(1) + with self.lock: + sub_suites = self.sub_suites.get(main_suite_id, []).copy() + for sub_suite in sub_suites: + with self.lock: + self.sub_suites[main_suite_id].remove(sub_suite) + yield sub_suite + if downloaded_all: + break + + def start_sub_suites(self, suite): + """Start up all sub suites within a TERCC suite. + + :param suite: TERCC suite to start up sub suites from. + :type suite: dict + """ + suite_name = suite.get("name") self.etos.events.send_announcement_published( "[ESR] Starting tests.", "Starting test suites on all checked out IUTs.", "MINOR", {"CONTEXT": self.context}, ) - self.etos.config.set("nbr_of_suites", len(environment.get("suites", []))) + self.logger.info("Starting sub suites for %r", suite_name) + started = [] + for sub_suite in self._sub_suites(suite["test_suite_started_id"]): + started.append(sub_suite) - executor = Executor(self.etos) - threads = [] - for suite in environment.get("suites", []): - thread = threading.Thread(target=executor.run_tests, args=(suite,)) - threads.append(thread) - thread.start() - time.sleep(5) - self.logger.info("Test suites triggered.") - for thread in threads: - thread.join() - self.logger.info("Test suites started.") + self.logger.info("Triggering sub suite %r", sub_suite["name"]) + self._run_etr(sub_suite) + self.logger.info("%r Triggered", sub_suite["name"]) + time.sleep(1) + self.logger.info("All %d sub suites for %r started", len(started), suite_name) self.etos.events.send_announcement_published( "[ESR] Waiting.", @@ -83,36 +188,40 @@ def _run_etr_and_wait(self, environment): "MINOR", {"CONTEXT": self.context}, ) + return started - self.logger.info("Wait for test results.") - self.result_handler.wait_for_test_suite_finished() + def start_suite(self, suite): + """Send test suite events and launch test runners. - def run(self, environment): - """Run the suite runner. - - :param environment: Environment in which to run the suite. - :type environment: dict + :param suite: Test suite to start. + :type suite: dict """ - self.logger.info("Started.") + FORMAT_CONFIG.identifier = self.params.tercc.meta.event_id + suite_name = suite.get("name") + self.logger.info("Starting %s.", suite_name) categories = ["Regression test suite"] if self.params.product: categories.append(self.params.product) - test_suite_started = self.etos.events.send_test_suite_started( - environment.get("suite_name"), - {"CONTEXT": self.context}, - categories=categories, - types=["FUNCTIONAL"], - ) - self.etos.config.set("test_suite_started", test_suite_started.json) + + test_suite_started = EiffelTestSuiteStartedEvent() + + # This ID has been stored in Environment so that the ETR know which test suite to link to. + test_suite_started.meta.event_id = suite.get("test_suite_started_id") + data = {"name": suite_name, "categories": categories, "types": ["FUNCTIONAL"]} + links = {"CONTEXT": self.context} + self.etos.events.send(test_suite_started, links, data) verdict = "INCONCLUSIVE" conclusion = "INCONCLUSIVE" description = "" + result_handler = ResultHandler(self.etos, test_suite_started) try: - self._run_etr_and_wait(environment) - verdict, conclusion, description = self.result_handler.test_results() + started = self.start_sub_suites(suite) + self.logger.info("Wait for test results.") + result_handler.wait_for_test_suite_finished(len(started)) + verdict, conclusion, description = result_handler.test_results() time.sleep(5) except Exception as exc: conclusion = "FAILED" @@ -128,4 +237,19 @@ def run(self, environment): "description": description, }, ) + # TODO: Add releasing of environment defined IDs when that is supported self.logger.info("Test suite finished.") + + def start_suites_and_wait(self): + """Get environments and start all test suites.""" + Thread(target=self._environments, daemon=True).start() + try: + with ThreadPool() as pool: + pool.map(self.start_suite, self.params.test_suite) + if self.error: + raise self.error + finally: + task_id = self.etos.config.get("task_id") + self.logger.info("Release test environment.") + if task_id is not None: + self._release_environment(task_id)