From 8703c326180a8fbb2f3d49e416eaf3e8d0a67f53 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Wed, 10 Apr 2024 10:31:20 +0200 Subject: [PATCH 01/18] Add OpenTelemetry tracing --- requirements.txt | 3 ++ setup.cfg | 5 ++- src/environment_provider/environment.py | 40 ++++++++++++------- .../environment_provider.py | 37 ++++++++++++----- src/environment_provider/lib/otel_tracing.py | 34 ++++++++++++++++ .../utilities/external_provider.py | 22 ++++++++-- .../utilities/external_provider.py | 22 ++++++++-- .../utilities/external_provider.py | 23 +++++++++-- 8 files changed, 151 insertions(+), 35 deletions(-) create mode 100644 src/environment_provider/lib/otel_tracing.py diff --git a/requirements.txt b/requirements.txt index c7e6e3e..e5a4c5d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,6 @@ jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 etos_lib==4.0.0 +opentelemetry-api~=1.21 +opentelemetry-exporter-otlp~=1.21 +opentelemetry-sdk~=1.21 diff --git a/setup.cfg b/setup.cfg index 4bb365f..6d3f218 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,10 @@ install_requires = jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 - etos_lib==4.0.0 + #etos_lib==4.0.0 + opentelemetry-api~=1.21 + opentelemetry-exporter-otlp~=1.21 + opentelemetry-sdk~=1.21 python_requires = >=3.8 diff --git a/src/environment_provider/environment.py b/src/environment_provider/environment.py index c034d4f..36bf439 100644 --- a/src/environment_provider/environment.py +++ b/src/environment_provider/environment.py @@ -20,6 +20,7 @@ from etos_lib import ETOS from jsontas.jsontas import JsonTas +from opentelemetry import trace from environment_provider.lib.database import ETCDPath from environment_provider.lib.registry import ProviderRegistry @@ -31,6 +32,8 @@ from log_area_provider.log_area import LogArea +TRACER = trace.get_tracer(__name__) + def checkin_provider( item: dict, provider: Union[IutProvider, ExecutionSpaceProvider, LogAreaProvider] ) -> tuple[bool, Optional[Exception]]: @@ -75,21 +78,30 @@ def release_environment( ).get("log") failure = None - success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset)) - if not success: - failure = exception - success, exception = checkin_provider( - LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset) - ) - if not success: - failure = exception - success, exception = checkin_provider( - ExecutionSpace(**executor), - ExecutionSpaceProvider(etos, jsontas, executor_ruleset), - ) - if not success: - failure = exception + span_name="stop_iuts" + with TRACER.start_as_current_span(span_name): + success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset)) + if not success: + failure = exception + + span_name="stop_log_area" + with TRACER.start_as_current_span(span_name): + success, exception = checkin_provider( + LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset) + ) + if not success: + failure = exception + + span_name="stop_execution_space" + with TRACER.start_as_current_span(span_name): + success, exception = checkin_provider( + ExecutionSpace(**executor), + ExecutionSpaceProvider(etos, jsontas, executor_ruleset), + ) + if not success: + failure = exception + return failure diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 8a13677..ca71b3f 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -30,6 +30,7 @@ from etos_lib.lib.events import EiffelEnvironmentDefinedEvent from etos_lib.logging.logger import FORMAT_CONFIG from jsontas.jsontas import JsonTas +import opentelemetry from execution_space_provider.execution_space import ExecutionSpace from log_area_provider.log_area import LogArea @@ -45,6 +46,7 @@ from .lib.registry import ProviderRegistry from .lib.test_suite import TestSuite from .lib.uuid_generate import UuidGenerate +from .lib.otel_tracing import get_current_context from .splitter.split import Splitter logging.getLogger("pika").setLevel(logging.WARNING) @@ -78,6 +80,8 @@ def __init__(self, suite_id: str, suite_runner_ids: list[str], copy: bool = True """ FORMAT_CONFIG.identifier = suite_id self.logger.info("Initializing EnvironmentProvider task.") + self.tracer = opentelemetry.trace.get_tracer(__name__) + self.tracer_context = get_current_context() self.etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") @@ -414,14 +418,17 @@ def checkout( timeout = self.checkout_timeout() while time.time() < timeout: self.set_total_test_count_and_test_runners(test_runners) - # Check out and assign IUTs to test runners. - iuts = self.iut_provider.wait_for_and_checkout_iuts( - minimum_amount=1, - maximum_amount=self.dataset.get( - "maximum_amount", self.etos.config.get("TOTAL_TEST_COUNT") - ), - ) - self.splitter.assign_iuts(test_runners, iuts) + + with self.tracer.start_as_current_span("request_iuts") as span: + # Check out and assign IUTs to test runners. + iuts = self.iut_provider.wait_for_and_checkout_iuts( + minimum_amount=1, + maximum_amount=self.dataset.get( + "maximum_amount", self.etos.config.get("TOTAL_TEST_COUNT") + ), + ) + self.splitter.assign_iuts(test_runners, iuts) + span.set_attribute("iuts", str(iuts)) for test_runner in test_runners.keys(): self.dataset.add("test_runner", test_runner) @@ -434,9 +441,17 @@ def checkout( for iut, suite in test_runners[test_runner].get("iuts", {}).items(): self.dataset.add("iut", iut) self.dataset.add("suite", suite) - suite["executor"] = self.checkout_an_execution_space() - self.dataset.add("executor", suite["executor"]) - suite["log_area"] = self.checkout_a_log_area() + + span_name = f"request_execution_space" + with self.tracer.start_as_current_span(span_name) as span: + span.set_attribute("test_runner", test_runner) + suite["executor"] = self.checkout_an_execution_space() + self.dataset.add("executor", suite["executor"]) + + span_name = f"request_log_area" + with self.tracer.start_as_current_span(span_name) as span: + span.set_attribute("test_runner", test_runner) + suite["log_area"] = self.checkout_a_log_area() # Split the tests into sub suites self.splitter.split(test_runners[test_runner]) diff --git a/src/environment_provider/lib/otel_tracing.py b/src/environment_provider/lib/otel_tracing.py new file mode 100644 index 0000000..4965f30 --- /dev/null +++ b/src/environment_provider/lib/otel_tracing.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# -*- coding: utf-8 -*- +import logging +import os +from opentelemetry.propagate import extract + +LOGGER = logging.getLogger(__name__) + +def get_current_context(): + # Get the context from the environment variable + carrier = {} + LOGGER.info("Current OpenTelemetry context env: %s", os.environ.get("OTEL_CONTEXT")) + for kv in os.environ.get("OTEL_CONTEXT", "").split(","): + if kv: + k, v = kv.split("=", 1) + carrier[k] = v + ctx = extract(carrier) + LOGGER.info("Current OpenTelemetry context %s", ctx) + return ctx diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 0e891d9..adba95e 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -15,11 +15,13 @@ # limitations under the License. """External Execution Space provider.""" import logging +import json import os import time from copy import deepcopy from json.decoder import JSONDecodeError +import opentelemetry import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -114,6 +116,13 @@ def checkin(self, execution_space: ExecutionSpace) -> None: execution_spaces = [execution_space.as_dict for execution_space in execution_space] host = self.ruleset.get("stop", {}).get("host") + headers = {"X-ETOS-ID": self.identifier} + otel_span = opentelemetry.trace.get_current_span() + if otel_span is not None: + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(execution_spaces, indent=4)) + timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -123,7 +132,7 @@ def checkin(self, execution_space: ExecutionSpace) -> None: time.sleep(2) try: response = requests.post( - host, json=execution_spaces, headers={"X-ETOS-ID": self.identifier} + host, json=execution_spaces, headers=headers ) if response.status_code == requests.codes["no_content"]: return @@ -191,11 +200,18 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: "dataset": self.dataset.get("dataset"), "context": self.dataset.get("context"), } + host = self.ruleset.get("start", {}).get("host") + headers = {"X-ETOS-ID": self.identifier} + otel_span = opentelemetry.trace.get_current_span() + if otel_span is not None: + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(data, indent=4)) try: response = self.http.post( - self.ruleset.get("start", {}).get("host"), + host, json=data, - headers={"X-ETOS-ID": self.identifier}, + headers=headers, ) response.raise_for_status() return response.json().get("id") diff --git a/src/iut_provider/utilities/external_provider.py b/src/iut_provider/utilities/external_provider.py index b285442..5a58d4e 100644 --- a/src/iut_provider/utilities/external_provider.py +++ b/src/iut_provider/utilities/external_provider.py @@ -15,11 +15,13 @@ # limitations under the License. """IUT provider for external providers.""" import logging +import json import os import time from copy import deepcopy from json.decoder import JSONDecodeError +import opentelemetry import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -108,6 +110,13 @@ def checkin(self, iut: Iut) -> None: iuts = [iut.as_dict for iut in iut] host = self.ruleset.get("stop", {}).get("host") + headers = {"X-ETOS-ID": self.identifier} + otel_span = opentelemetry.trace.get_current_span() + if otel_span is not None: + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(iuts, indent=4)) + timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -116,7 +125,7 @@ def checkin(self, iut: Iut) -> None: else: time.sleep(2) try: - response = requests.post(host, json=iuts, headers={"X-ETOS-ID": self.identifier}) + response = requests.post(host, json=iuts, headers=headers) if response.status_code == requests.codes["no_content"]: return response = response.json() @@ -159,11 +168,18 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: "dataset": self.dataset.get("dataset"), "context": self.dataset.get("context"), } + host = self.ruleset.get("start", {}).get("host") + headers = {"X-ETOS-ID": self.identifier} + otel_span = opentelemetry.trace.get_current_span() + if otel_span is not None: + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(data, indent=4)) try: response = self.http.post( - self.ruleset.get("start", {}).get("host"), + host, json=data, - headers={"X-ETOS-ID": self.identifier}, + headers=headers, ) response.raise_for_status() return response.json().get("id") diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index 415ad15..13e878a 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -15,11 +15,13 @@ # limitations under the License. """External log area provider.""" import logging +import json import os import time from copy import deepcopy from json.decoder import JSONDecodeError +import opentelemetry import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -108,6 +110,13 @@ def checkin(self, log_area: LogArea) -> None: log_areas = [log_area.as_dict for log_area in log_area] host = self.ruleset.get("stop", {}).get("host") + headers = {"X-ETOS-ID": self.identifier} + otel_span = opentelemetry.trace.get_current_span() + if otel_span is not None: + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(log_areas, indent=4)) + timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -117,7 +126,7 @@ def checkin(self, log_area: LogArea) -> None: time.sleep(2) try: response = requests.post( - host, json=log_areas, headers={"X-ETOS-ID": self.identifier} + host, json=log_areas, headers=headers ) if response.status_code == requests.codes["no_content"]: return @@ -163,11 +172,19 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: "dataset": self.dataset.get("dataset"), "context": self.dataset.get("context"), } + host = self.ruleset.get("start", {}).get("host") + headers = {"X-ETOS-ID": self.identifier} + otel_span = opentelemetry.trace.get_current_span() + if otel_span is not None: + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", str(headers)) + otel_span.set_attribute("request.body", str(data)) + try: response = self.http.post( - self.ruleset.get("start", {}).get("host"), + host, json=data, - headers={"X-ETOS-ID": self.identifier}, + headers=headers, ) response.raise_for_status() return response.json().get("id") From d71111557dddfba17773622e53554f83632c3247 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Fri, 19 Apr 2024 14:24:50 +0200 Subject: [PATCH 02/18] Additional fixes for OpenTelemetry tracing --- requirements.txt | 2 +- setup.cfg | 2 +- .../environment_provider.py | 2 -- src/environment_provider/lib/otel_tracing.py | 34 ------------------- .../utilities/external_provider.py | 24 +++++++------ .../utilities/external_provider.py | 14 ++++---- .../utilities/external_provider.py | 14 ++++---- 7 files changed, 28 insertions(+), 64 deletions(-) delete mode 100644 src/environment_provider/lib/otel_tracing.py diff --git a/requirements.txt b/requirements.txt index e5a4c5d..43a0171 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ gunicorn~=19.9 jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 -etos_lib==4.0.0 +etos_lib==4.1.1 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 diff --git a/setup.cfg b/setup.cfg index 6d3f218..b7185df 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,7 @@ install_requires = jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 - #etos_lib==4.0.0 + etos_lib==4.1.1 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index ca71b3f..f52f9aa 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -46,7 +46,6 @@ from .lib.registry import ProviderRegistry from .lib.test_suite import TestSuite from .lib.uuid_generate import UuidGenerate -from .lib.otel_tracing import get_current_context from .splitter.split import Splitter logging.getLogger("pika").setLevel(logging.WARNING) @@ -81,7 +80,6 @@ def __init__(self, suite_id: str, suite_runner_ids: list[str], copy: bool = True FORMAT_CONFIG.identifier = suite_id self.logger.info("Initializing EnvironmentProvider task.") self.tracer = opentelemetry.trace.get_tracer(__name__) - self.tracer_context = get_current_context() self.etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") diff --git a/src/environment_provider/lib/otel_tracing.py b/src/environment_provider/lib/otel_tracing.py deleted file mode 100644 index 4965f30..0000000 --- a/src/environment_provider/lib/otel_tracing.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -# Copyright Axis Communications AB. -# -# For a full list of individual contributors, please see the commit history. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -*- coding: utf-8 -*- -import logging -import os -from opentelemetry.propagate import extract - -LOGGER = logging.getLogger(__name__) - -def get_current_context(): - # Get the context from the environment variable - carrier = {} - LOGGER.info("Current OpenTelemetry context env: %s", os.environ.get("OTEL_CONTEXT")) - for kv in os.environ.get("OTEL_CONTEXT", "").split(","): - if kv: - k, v = kv.split("=", 1) - carrier[k] = v - ctx = extract(carrier) - LOGGER.info("Current OpenTelemetry context %s", ctx) - return ctx diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index adba95e..3f9bc1a 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -118,11 +118,11 @@ def checkin(self, execution_space: ExecutionSpace) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - if otel_span is not None: - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(execution_spaces, indent=4)) - + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(execution_spaces, indent=4)) + #opentelemetry.propagate.inject(headers) # inject current OpenTelemetry conext to HTTP request headers + self.logger.info("OpenTelemetry context headers: %s", headers) timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -203,10 +203,11 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - if otel_span is not None: - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(data, indent=4)) + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(data, indent=4)) + #opentelemetry.propagate.inject(headers) # inject current OpenTelemetry conext to HTTP request headers + self.logger.info("OpenTelemetry context headers: %s", headers) try: response = self.http.post( host, @@ -239,11 +240,14 @@ def wait(self, provider_id: str) -> dict: first_iteration = False else: time.sleep(2) + headers = {"X-ETOS-ID": self.identifier} + #opentelemetry.propagate.inject(headers) # inject current OpenTelemetry conext to HTTP request headers + self.logger.info("OpenTelemetry context headers: %s", headers) try: response = requests.get( host, params={"id": provider_id}, - headers={"X-ETOS-ID": self.identifier}, + headers=headers, ) self.check_error(response) response = response.json() diff --git a/src/iut_provider/utilities/external_provider.py b/src/iut_provider/utilities/external_provider.py index 5a58d4e..8ca35e4 100644 --- a/src/iut_provider/utilities/external_provider.py +++ b/src/iut_provider/utilities/external_provider.py @@ -112,10 +112,9 @@ def checkin(self, iut: Iut) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - if otel_span is not None: - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(iuts, indent=4)) + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(iuts, indent=4)) timeout = time.time() + end first_iteration = True @@ -171,10 +170,9 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - if otel_span is not None: - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(data, indent=4)) + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(data, indent=4)) try: response = self.http.post( host, diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index 13e878a..5c8cadd 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -112,10 +112,9 @@ def checkin(self, log_area: LogArea) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - if otel_span is not None: - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(log_areas, indent=4)) + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) + otel_span.set_attribute("request.body", json.dumps(log_areas, indent=4)) timeout = time.time() + end first_iteration = True @@ -175,10 +174,9 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - if otel_span is not None: - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", str(headers)) - otel_span.set_attribute("request.body", str(data)) + otel_span.set_attribute("request.host", host) + otel_span.set_attribute("request.headers", str(headers)) + otel_span.set_attribute("request.body", str(data)) try: response = self.http.post( From 2573ccb805d8110a3827c91634ac9da492e84cd1 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Fri, 19 Apr 2024 14:56:26 +0200 Subject: [PATCH 03/18] minor fixes --- src/environment_provider/environment_provider.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index f52f9aa..39b9d1a 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -440,14 +440,12 @@ def checkout( self.dataset.add("iut", iut) self.dataset.add("suite", suite) - span_name = f"request_execution_space" - with self.tracer.start_as_current_span(span_name) as span: + with self.tracer.start_as_current_span("request_execution_space") as span: span.set_attribute("test_runner", test_runner) suite["executor"] = self.checkout_an_execution_space() self.dataset.add("executor", suite["executor"]) - span_name = f"request_log_area" - with self.tracer.start_as_current_span(span_name) as span: + with self.tracer.start_as_current_span("request_log_area") as span: span.set_attribute("test_runner", test_runner) suite["log_area"] = self.checkout_a_log_area() From cc6b598ace5a4c01dfb214a7bcb6718a22a32b55 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Mon, 22 Apr 2024 11:42:07 +0200 Subject: [PATCH 04/18] minor fixes --- src/environment_provider/environment.py | 7 ++++--- src/environment_provider/environment_provider.py | 3 +-- .../utilities/external_provider.py | 7 +------ src/log_area_provider/utilities/external_provider.py | 4 +--- 4 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/environment_provider/environment.py b/src/environment_provider/environment.py index 36bf439..8f1fbda 100644 --- a/src/environment_provider/environment.py +++ b/src/environment_provider/environment.py @@ -34,6 +34,7 @@ TRACER = trace.get_tracer(__name__) + def checkin_provider( item: dict, provider: Union[IutProvider, ExecutionSpaceProvider, LogAreaProvider] ) -> tuple[bool, Optional[Exception]]: @@ -79,13 +80,13 @@ def release_environment( failure = None - span_name="stop_iuts" + span_name = "stop_iuts" with TRACER.start_as_current_span(span_name): success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset)) if not success: failure = exception - span_name="stop_log_area" + span_name = "stop_log_area" with TRACER.start_as_current_span(span_name): success, exception = checkin_provider( LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset) @@ -93,7 +94,7 @@ def release_environment( if not success: failure = exception - span_name="stop_execution_space" + span_name = "stop_execution_space" with TRACER.start_as_current_span(span_name): success, exception = checkin_provider( ExecutionSpace(**executor), diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 39b9d1a..293c0e9 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -413,8 +413,7 @@ def checkout( self.environment_provider_config, ) finished = [] - timeout = self.checkout_timeout() - while time.time() < timeout: + while time.time() < self.checkout_timeout(): self.set_total_test_count_and_test_runners(test_runners) with self.tracer.start_as_current_span("request_iuts") as span: diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 3f9bc1a..69ce5b1 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -121,7 +121,6 @@ def checkin(self, execution_space: ExecutionSpace) -> None: otel_span.set_attribute("request.host", host) otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) otel_span.set_attribute("request.body", json.dumps(execution_spaces, indent=4)) - #opentelemetry.propagate.inject(headers) # inject current OpenTelemetry conext to HTTP request headers self.logger.info("OpenTelemetry context headers: %s", headers) timeout = time.time() + end first_iteration = True @@ -131,9 +130,7 @@ def checkin(self, execution_space: ExecutionSpace) -> None: else: time.sleep(2) try: - response = requests.post( - host, json=execution_spaces, headers=headers - ) + response = requests.post(host, json=execution_spaces, headers=headers) if response.status_code == requests.codes["no_content"]: return response = response.json() @@ -206,7 +203,6 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: otel_span.set_attribute("request.host", host) otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) otel_span.set_attribute("request.body", json.dumps(data, indent=4)) - #opentelemetry.propagate.inject(headers) # inject current OpenTelemetry conext to HTTP request headers self.logger.info("OpenTelemetry context headers: %s", headers) try: response = self.http.post( @@ -241,7 +237,6 @@ def wait(self, provider_id: str) -> dict: else: time.sleep(2) headers = {"X-ETOS-ID": self.identifier} - #opentelemetry.propagate.inject(headers) # inject current OpenTelemetry conext to HTTP request headers self.logger.info("OpenTelemetry context headers: %s", headers) try: response = requests.get( diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index 5c8cadd..9cd49ed 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -124,9 +124,7 @@ def checkin(self, log_area: LogArea) -> None: else: time.sleep(2) try: - response = requests.post( - host, json=log_areas, headers=headers - ) + response = requests.post(host, json=log_areas, headers=headers) if response.status_code == requests.codes["no_content"]: return response = response.json() From 1841c2d5aa5dcf43b7ff1c5bd40cc452cdb58a07 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Mon, 22 Apr 2024 11:45:56 +0200 Subject: [PATCH 05/18] minor fixes --- src/environment_provider/environment_provider.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 293c0e9..30ea8c9 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -413,7 +413,8 @@ def checkout( self.environment_provider_config, ) finished = [] - while time.time() < self.checkout_timeout(): + timeout = self.checkout_timeout() + while time.time() < timeout: self.set_total_test_count_and_test_runners(test_runners) with self.tracer.start_as_current_span("request_iuts") as span: @@ -456,8 +457,7 @@ def checkout( sub_suite = test_suite.add( test_runner, iut, suite, test_runners[test_runner]["priority"] ) - url = self.upload_sub_suite(sub_suite) - self.send_environment_events(url, sub_suite) + self.send_environment_events(self.upload_sub_suite(sub_suite), sub_suite) self.logger.info( "Environment for %r checked out and is ready for use", From c06985468df06d6378c26ec302f2a633f0f12775 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Mon, 22 Apr 2024 11:55:27 +0200 Subject: [PATCH 06/18] minor fix --- src/execution_space_provider/utilities/external_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 69ce5b1..65dc3bf 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -236,7 +236,7 @@ def wait(self, provider_id: str) -> dict: first_iteration = False else: time.sleep(2) - headers = {"X-ETOS-ID": self.identifier} + headers = {"X-ETOS-ID": self.identifier} self.logger.info("OpenTelemetry context headers: %s", headers) try: response = requests.get( From 7d40bf7ddbbb8610bfa28182b2852606f5768664 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Tue, 23 Apr 2024 13:46:14 +0200 Subject: [PATCH 07/18] review updates --- setup.cfg | 2 +- src/environment_provider/environment.py | 10 ++++++++-- .../environment_provider.py | 7 ++++--- .../utilities/external_provider.py | 17 ++++++++--------- src/iut_provider/utilities/external_provider.py | 15 ++++++++------- .../utilities/external_provider.py | 16 ++++++++-------- 6 files changed, 37 insertions(+), 30 deletions(-) diff --git a/setup.cfg b/setup.cfg index b7185df..e030d66 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,7 @@ install_requires = jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 - etos_lib==4.1.1 + #etos_lib==4.1.1 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 diff --git a/src/environment_provider/environment.py b/src/environment_provider/environment.py index 8f1fbda..102c079 100644 --- a/src/environment_provider/environment.py +++ b/src/environment_provider/environment.py @@ -81,17 +81,21 @@ def release_environment( failure = None span_name = "stop_iuts" - with TRACER.start_as_current_span(span_name): + with TRACER.start_as_current_span(span_name) as span: success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset)) if not success: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR)) failure = exception span_name = "stop_log_area" - with TRACER.start_as_current_span(span_name): + with TRACER.start_as_current_span(span_name) as span: success, exception = checkin_provider( LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset) ) if not success: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR)) failure = exception span_name = "stop_execution_space" @@ -101,6 +105,8 @@ def release_environment( ExecutionSpaceProvider(etos, jsontas, executor_ruleset), ) if not success: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR)) failure = exception return failure diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 30ea8c9..0d734f2 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -29,6 +29,7 @@ from etos_lib.etos import ETOS from etos_lib.lib.events import EiffelEnvironmentDefinedEvent from etos_lib.logging.logger import FORMAT_CONFIG +from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes from jsontas.jsontas import JsonTas import opentelemetry @@ -426,7 +427,7 @@ def checkout( ), ) self.splitter.assign_iuts(test_runners, iuts) - span.set_attribute("iuts", str(iuts)) + span.set_attribute(SemConvAttributes.IUT_DESCRIPTION, str(iuts)) for test_runner in test_runners.keys(): self.dataset.add("test_runner", test_runner) @@ -441,12 +442,12 @@ def checkout( self.dataset.add("suite", suite) with self.tracer.start_as_current_span("request_execution_space") as span: - span.set_attribute("test_runner", test_runner) + span.set_attribute(SemConvAttributes.TESTRUNNER_ID, test_runner) suite["executor"] = self.checkout_an_execution_space() self.dataset.add("executor", suite["executor"]) with self.tracer.start_as_current_span("request_log_area") as span: - span.set_attribute("test_runner", test_runner) + span.set_attribute(SemConvAttributes.TESTRUNNER_ID, test_runner) suite["log_area"] = self.checkout_a_log_area() # Split the tests into sub suites diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 65dc3bf..99fcbee 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -118,10 +118,10 @@ def checkin(self, execution_space: ExecutionSpace) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(execution_spaces, indent=4)) - self.logger.info("OpenTelemetry context headers: %s", headers) + otel_span.set_attribute("http.request.host", host) + otel_span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) + for header, value in headers.items(): + otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -200,10 +200,10 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(data, indent=4)) - self.logger.info("OpenTelemetry context headers: %s", headers) + otel_span.set_attribute("http.request.host", host) + otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) + for header, value in headers.items(): + otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) try: response = self.http.post( host, @@ -237,7 +237,6 @@ def wait(self, provider_id: str) -> dict: else: time.sleep(2) headers = {"X-ETOS-ID": self.identifier} - self.logger.info("OpenTelemetry context headers: %s", headers) try: response = requests.get( host, diff --git a/src/iut_provider/utilities/external_provider.py b/src/iut_provider/utilities/external_provider.py index 8ca35e4..05feb89 100644 --- a/src/iut_provider/utilities/external_provider.py +++ b/src/iut_provider/utilities/external_provider.py @@ -112,10 +112,10 @@ def checkin(self, iut: Iut) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(iuts, indent=4)) - + otel_span.set_attribute("http.request.host", host) + otel_span.set_attribute("http.request.body", json.dumps(iuts, indent=4)) + for header, value in headers.items(): + otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -170,9 +170,10 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(data, indent=4)) + otel_span.set_attribute("http.request.host", host) + otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) + for header, value in headers.items(): + otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) try: response = self.http.post( host, diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index 9cd49ed..38f0629 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -112,10 +112,10 @@ def checkin(self, log_area: LogArea) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", json.dumps(headers, indent=4)) - otel_span.set_attribute("request.body", json.dumps(log_areas, indent=4)) - + otel_span.set_attribute("http.request.host", host) + otel_span.set_attribute("http.request.body", json.dumps(log_areas, indent=4)) + for header, value in headers.items(): + otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -172,10 +172,10 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("request.host", host) - otel_span.set_attribute("request.headers", str(headers)) - otel_span.set_attribute("request.body", str(data)) - + otel_span.set_attribute("http.request.host", host) + otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) + for header, value in headers.items(): + otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) try: response = self.http.post( host, From eb4023bb63dd2580fbb205658b8801e4bbe02426 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 25 Apr 2024 13:04:17 +0200 Subject: [PATCH 08/18] Review updates --- src/environment_provider/environment_provider.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 0d734f2..5e99bc0 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -442,12 +442,12 @@ def checkout( self.dataset.add("suite", suite) with self.tracer.start_as_current_span("request_execution_space") as span: - span.set_attribute(SemConvAttributes.TESTRUNNER_ID, test_runner) + span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["executor"] = self.checkout_an_execution_space() self.dataset.add("executor", suite["executor"]) with self.tracer.start_as_current_span("request_log_area") as span: - span.set_attribute(SemConvAttributes.TESTRUNNER_ID, test_runner) + span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["log_area"] = self.checkout_a_log_area() # Split the tests into sub suites From 9c06b7442b9462b14eb858d561ab861ee3c0b5e0 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Fri, 26 Apr 2024 13:11:32 +0200 Subject: [PATCH 09/18] review updates --- .../utilities/external_provider.py | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 99fcbee..ddbe078 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -117,11 +117,13 @@ def checkin(self, execution_space: ExecutionSpace) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} - otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("http.request.host", host) - otel_span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) + span = opentelemetry.trace.get_current_span() + span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) + span.set_attribute("http.request.host", host) + span.set_attribute("http.request.method", "POST") + span.set_attribute("network.protocol.name", "http") for header, value in headers.items(): - otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -131,22 +133,30 @@ def checkin(self, execution_space: ExecutionSpace) -> None: time.sleep(2) try: response = requests.post(host, json=execution_spaces, headers=headers) + span.set_attribute("http.response.status_code", response.status_code) if response.status_code == requests.codes["no_content"]: return response = response.json() if response.get("error") is not None: - raise ExecutionSpaceCheckinFailed( + exc = ExecutionSpaceCheckinFailed( f"Unable to check in {execution_spaces} " f"({response.get('error')})" ) + span.record_exception(exc) + raise exc except RequestsConnectionError as error: if "connection refused" in str(error).lower(): self.logger.error("Error connecting to %r: %r", host, error) continue + span.record_exception(exc) + raise exc raise except ConnectionError: self.logger.error("Error connecting to %r", host) continue - raise TimeoutError(f"Unable to stop external provider {self.id!r}") + exc = TimeoutError(f"Unable to stop external provider {self.id!r}") + span.record_exception(exc) + raise exc + def checkin_all(self) -> None: """Check in all execution spaces. From be0b8e7eb37b1fbaac354b0e21495e91c185fa96 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Fri, 26 Apr 2024 13:37:55 +0200 Subject: [PATCH 10/18] Review updates --- .../utilities/external_provider.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index ddbe078..bac5ce9 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -22,6 +22,8 @@ from json.decoder import JSONDecodeError import opentelemetry +from opentelemetry.semconv.trace import SpanAttributes + import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -119,9 +121,9 @@ def checkin(self, execution_space: ExecutionSpace) -> None: headers = {"X-ETOS-ID": self.identifier} span = opentelemetry.trace.get_current_span() span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) - span.set_attribute("http.request.host", host) - span.set_attribute("http.request.method", "POST") - span.set_attribute("network.protocol.name", "http") + span.set_attribute(SpanAttributes.HTTP_HOST, host) + span.set_attribute(SpanAttributes.HTTP_REQUEST_METHOD, "POST") + span.set_attribute(SpanAttributes.NETWORK_PROTOCOL_NAME, "http") for header, value in headers.items(): span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end @@ -133,7 +135,7 @@ def checkin(self, execution_space: ExecutionSpace) -> None: time.sleep(2) try: response = requests.post(host, json=execution_spaces, headers=headers) - span.set_attribute("http.response.status_code", response.status_code) + span.set_attribute(SpanAttributes.HTTP_RESPONSE_STATUS_CODE, response.status_code) if response.status_code == requests.codes["no_content"]: return response = response.json() From 1fbaa551410ab00e992736019cceb99f99459a96 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Mon, 29 Apr 2024 12:11:42 +0200 Subject: [PATCH 11/18] Code review updates --- requirements.txt | 1 + setup.cfg | 2 +- .../utilities/external_provider.py | 8 +++++++- src/iut_provider/utilities/external_provider.py | 7 ++++++- src/log_area_provider/utilities/external_provider.py | 7 ++++++- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index 43a0171..58c8092 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,4 @@ etos_lib==4.1.1 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 +opentelemetry-instrumentation-requests==0.45b0 diff --git a/setup.cfg b/setup.cfg index e030d66..6ed6f2d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,7 @@ install_requires = jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 - #etos_lib==4.1.1 + etos_lib==4.2.0 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index bac5ce9..6935018 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -23,6 +23,8 @@ import opentelemetry from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.propagate import inject import requests from etos_lib import ETOS @@ -119,6 +121,7 @@ def checkin(self, execution_space: ExecutionSpace) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) span = opentelemetry.trace.get_current_span() span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) span.set_attribute(SpanAttributes.HTTP_HOST, host) @@ -211,11 +214,13 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: } host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) otel_span = opentelemetry.trace.get_current_span() otel_span.set_attribute("http.request.host", host) otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) for header, value in headers.items(): otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + try: response = self.http.post( host, @@ -243,12 +248,13 @@ def wait(self, provider_id: str) -> dict: response = None first_iteration = True + headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) while time.time() < timeout: if first_iteration: first_iteration = False else: time.sleep(2) - headers = {"X-ETOS-ID": self.identifier} try: response = requests.get( host, diff --git a/src/iut_provider/utilities/external_provider.py b/src/iut_provider/utilities/external_provider.py index 05feb89..d78aad0 100644 --- a/src/iut_provider/utilities/external_provider.py +++ b/src/iut_provider/utilities/external_provider.py @@ -22,6 +22,7 @@ from json.decoder import JSONDecodeError import opentelemetry +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -111,6 +112,7 @@ def checkin(self, iut: Iut) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) otel_span = opentelemetry.trace.get_current_span() otel_span.set_attribute("http.request.host", host) otel_span.set_attribute("http.request.body", json.dumps(iuts, indent=4)) @@ -169,6 +171,7 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: } host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) otel_span = opentelemetry.trace.get_current_span() otel_span.set_attribute("http.request.host", host) otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) @@ -200,13 +203,15 @@ def wait(self, provider_id: str) -> dict: timeout = time.time() + self.etos.config.get("WAIT_FOR_IUT_TIMEOUT") response = None + headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) while time.time() < timeout: time.sleep(2) try: response = requests.get( host, params={"id": provider_id}, - headers={"X-ETOS-ID": self.identifier}, + headers=headers, ) self.check_error(response) response = response.json() diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index 38f0629..bc1c494 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -22,6 +22,7 @@ from json.decoder import JSONDecodeError import opentelemetry +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -111,6 +112,7 @@ def checkin(self, log_area: LogArea) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) otel_span = opentelemetry.trace.get_current_span() otel_span.set_attribute("http.request.host", host) otel_span.set_attribute("http.request.body", json.dumps(log_areas, indent=4)) @@ -171,6 +173,7 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: } host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) otel_span = opentelemetry.trace.get_current_span() otel_span.set_attribute("http.request.host", host) otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) @@ -205,6 +208,8 @@ def wait(self, provider_id: str) -> dict: response = None first_iteration = True + headers={"X-ETOS-ID": self.identifier} + TraceContextTextMapPropagator().inject(headers) while time.time() < timeout: if first_iteration: first_iteration = False @@ -214,7 +219,7 @@ def wait(self, provider_id: str) -> dict: response = requests.get( host, params={"id": provider_id}, - headers={"X-ETOS-ID": self.identifier}, + headers=headers, ) self.check_error(response) response = response.json() From 6d63052ef1ede3819f290322ee74f51ce46a113b Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Mon, 29 Apr 2024 12:41:45 +0200 Subject: [PATCH 12/18] Code review updates --- .../utilities/external_provider.py | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 6935018..5cf4714 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -24,7 +24,6 @@ import opentelemetry from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.propagate import inject import requests from etos_lib import ETOS @@ -100,6 +99,14 @@ def identity(self) -> PackageURL: """ return self.dataset.get("identity") + @staticmethod + def _record_exception(exc) -> None: + """Record the given exception to the current OpenTelemetry span.""" + span = opentelemetry.trace.get_current_span() + span.set_attribute("error.type", exc.__name__) + span.record_exception(exc) + span.set_status(opentelemetry.trace.Status(opentelemetry.StatusCode.ERROR)) + def checkin(self, execution_space: ExecutionSpace) -> None: """Check in execution spaces. @@ -125,8 +132,6 @@ def checkin(self, execution_space: ExecutionSpace) -> None: span = opentelemetry.trace.get_current_span() span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) span.set_attribute(SpanAttributes.HTTP_HOST, host) - span.set_attribute(SpanAttributes.HTTP_REQUEST_METHOD, "POST") - span.set_attribute(SpanAttributes.NETWORK_PROTOCOL_NAME, "http") for header, value in headers.items(): span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end @@ -146,7 +151,7 @@ def checkin(self, execution_space: ExecutionSpace) -> None: exc = ExecutionSpaceCheckinFailed( f"Unable to check in {execution_spaces} " f"({response.get('error')})" ) - span.record_exception(exc) + self._record_exception(exc) raise exc except RequestsConnectionError as error: if "connection refused" in str(error).lower(): @@ -159,7 +164,7 @@ def checkin(self, execution_space: ExecutionSpace) -> None: self.logger.error("Error connecting to %r", host) continue exc = TimeoutError(f"Unable to stop external provider {self.id!r}") - span.record_exception(exc) + self._record_exception(exc) raise exc @@ -215,11 +220,11 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) - otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("http.request.host", host) - otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) + span = opentelemetry.trace.get_current_span() + span.set_attribute(SpanAttributes.HTTP_HOST, host) + span.set_attribute("http.request.body", json.dumps(data, indent=4)) for header, value in headers.items(): - otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + span.set_attribute(f"http.request.headers.{header.lower()}", value) try: response = self.http.post( @@ -227,10 +232,13 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: json=data, headers=headers, ) + span.set_attribute(SpanAttributes.HTTP_RESPONSE_STATUS_CODE, response.status_code) response.raise_for_status() return response.json().get("id") except (HTTPError, JSONDecodeError) as error: - raise Exception(f"Could not start external provider {self.id!r}") from error + exc = Exception(f"Could not start external provider {self.id!r}") + self._record_exception(exc) + raise exc from error def wait(self, provider_id: str) -> dict: """Wait for external execution space provider to finish its request. @@ -245,11 +253,14 @@ def wait(self, provider_id: str) -> dict: host = self.ruleset.get("status", {}).get("host") timeout = time.time() + self.etos.config.get("WAIT_FOR_EXECUTION_SPACE_TIMEOUT") - + params = {"id": provider_id} response = None first_iteration = True headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) + span = opentelemetry.trace.get_current_span() + span.set_attribute("http.request.params", json.dumps(params, indent=4)) + span.set_attribute(SpanAttributes.HTTP_HOST, host) while time.time() < timeout: if first_iteration: first_iteration = False @@ -258,7 +269,7 @@ def wait(self, provider_id: str) -> dict: try: response = requests.get( host, - params={"id": provider_id}, + params=params, headers=headers, ) self.check_error(response) @@ -268,14 +279,18 @@ def wait(self, provider_id: str) -> dict: continue if response.get("status") == "FAILED": - raise ExecutionSpaceCheckoutFailed(response.get("description")) + exc = ExecutionSpaceCheckoutFailed(response.get("description")) + self._record_exception(exc) + raise exc if response.get("status") == "DONE": break else: - raise TimeoutError( + exc = TimeoutError( "Status request timed out after " f"{self.etos.config.get('WAIT_FOR_EXECUTION_SPACE_TIMEOUT')}s" ) + self._record_exception(exc) + raise exc return response def check_error(self, response: dict) -> None: @@ -283,6 +298,7 @@ def check_error(self, response: dict) -> None: :param response: The response from the external execution space provider. """ + span = opentelemetry.trace.get_current_span() self.logger.debug("Checking response from external execution space provider") try: if response.json().get("error") is not None: @@ -291,13 +307,18 @@ def check_error(self, response: dict) -> None: self.logger.error("Could not parse response as JSON") if response.status_code == requests.codes["not_found"]: - raise ExecutionSpaceNotAvailable( + exc = ExecutionSpaceNotAvailable( f"External provider {self.id!r} did not respond properly" ) + self._record_exception(exc) + raise exc if response.status_code == requests.codes["bad_request"]: - raise RuntimeError( + exc = RuntimeError( f"Execution space provider for {self.id!r} is not properly configured" ) + self._record_exception(exc) + raise exc + # This should work, no other errors found. # If this does not work, propagate JSONDecodeError up the stack. @@ -326,12 +347,15 @@ def request_and_wait_for_execution_spaces( :param maximum_amount: Maximum amount of execution spaces to checkout. :return: List of checked out execution spaces. """ + span = opentelemetry.trace.get_current_span() try: provider_id = self.start(minimum_amount, maximum_amount) response = self.wait(provider_id) execution_spaces = self.build_execution_spaces(response) if len(execution_spaces) < minimum_amount: - raise ExecutionSpaceNotAvailable(self.id) + exc = ExecutionSpaceNotAvailable(self.id) + self._record_exception(exc) + raise exc if len(execution_spaces) > maximum_amount: self.logger.warning( "Too many execution spaces from external execution space provider " @@ -364,6 +388,7 @@ def wait_for_and_checkout_execution_spaces( :param maximum_amount: Maximum amount of execution spaces to checkout. :return: List of checked out execution spaces. """ + span = opentelemetry.trace.get_current_span() error = None triggered = None try: @@ -385,8 +410,9 @@ def wait_for_and_checkout_execution_spaces( ) self.etos.events.send_activity_started(triggered) return self.request_and_wait_for_execution_spaces(minimum_amount, maximum_amount) - except Exception as exception: - error = exception + except Exception as exc: + self._record_exception(exc) + error = exc raise finally: if error is None: From e5b7a54ac250c7ec198d9f5c9777f77e7f596c5e Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 2 May 2024 09:14:51 +0200 Subject: [PATCH 13/18] Code review updates --- src/environment_provider/environment.py | 6 +-- .../environment_provider.py | 7 +-- .../utilities/external_provider.py | 14 +++--- .../utilities/external_provider.py | 35 ++++++++++---- .../utilities/external_provider.py | 48 ++++++++++++++----- 5 files changed, 74 insertions(+), 36 deletions(-) diff --git a/src/environment_provider/environment.py b/src/environment_provider/environment.py index 102c079..176a3e5 100644 --- a/src/environment_provider/environment.py +++ b/src/environment_provider/environment.py @@ -81,7 +81,7 @@ def release_environment( failure = None span_name = "stop_iuts" - with TRACER.start_as_current_span(span_name) as span: + with TRACER.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT) as span: success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset)) if not success: span.record_exception(exception) @@ -89,7 +89,7 @@ def release_environment( failure = exception span_name = "stop_log_area" - with TRACER.start_as_current_span(span_name) as span: + with TRACER.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT) as span: success, exception = checkin_provider( LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset) ) @@ -99,7 +99,7 @@ def release_environment( failure = exception span_name = "stop_execution_space" - with TRACER.start_as_current_span(span_name): + with TRACER.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT): success, exception = checkin_provider( ExecutionSpace(**executor), ExecutionSpaceProvider(etos, jsontas, executor_ruleset), diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 5e99bc0..36eebbc 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -32,6 +32,7 @@ from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes from jsontas.jsontas import JsonTas import opentelemetry +from opentelemetry.trace import SpanKind from execution_space_provider.execution_space import ExecutionSpace from log_area_provider.log_area import LogArea @@ -418,7 +419,7 @@ def checkout( while time.time() < timeout: self.set_total_test_count_and_test_runners(test_runners) - with self.tracer.start_as_current_span("request_iuts") as span: + with self.tracer.start_as_current_span("request_iuts", kind=SpanKind.CLIENT) as span: # Check out and assign IUTs to test runners. iuts = self.iut_provider.wait_for_and_checkout_iuts( minimum_amount=1, @@ -441,12 +442,12 @@ def checkout( self.dataset.add("iut", iut) self.dataset.add("suite", suite) - with self.tracer.start_as_current_span("request_execution_space") as span: + with self.tracer.start_as_current_span("request_execution_space", kind=SpanKind.CLIENT) as span: span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["executor"] = self.checkout_an_execution_space() self.dataset.add("executor", suite["executor"]) - with self.tracer.start_as_current_span("request_log_area") as span: + with self.tracer.start_as_current_span("request_log_area", kind=SpanKind.CLIENT) as span: span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["log_area"] = self.checkout_a_log_area() diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 5cf4714..357c92d 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -103,9 +103,9 @@ def identity(self) -> PackageURL: def _record_exception(exc) -> None: """Record the given exception to the current OpenTelemetry span.""" span = opentelemetry.trace.get_current_span() - span.set_attribute("error.type", exc.__name__) + span.set_attribute("error.type", exc.__class__.__name__) span.record_exception(exc) - span.set_status(opentelemetry.trace.Status(opentelemetry.StatusCode.ERROR)) + span.set_status(opentelemetry.trace.Status(opentelemetry.trace.StatusCode.ERROR)) def checkin(self, execution_space: ExecutionSpace) -> None: """Check in execution spaces. @@ -130,8 +130,8 @@ def checkin(self, execution_space: ExecutionSpace) -> None: headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) span = opentelemetry.trace.get_current_span() - span.set_attribute("http.request.body", json.dumps(execution_spaces, indent=4)) - span.set_attribute(SpanAttributes.HTTP_HOST, host) + span.set_attribute("http.request.body", json.dumps(execution_spaces)) + span.set_attribute(SpanAttributes.URL_FULL, host) for header, value in headers.items(): span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end @@ -222,7 +222,7 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: TraceContextTextMapPropagator().inject(headers) span = opentelemetry.trace.get_current_span() span.set_attribute(SpanAttributes.HTTP_HOST, host) - span.set_attribute("http.request.body", json.dumps(data, indent=4)) + span.set_attribute("http.request.body", json.dumps(data)) for header, value in headers.items(): span.set_attribute(f"http.request.headers.{header.lower()}", value) @@ -259,7 +259,7 @@ def wait(self, provider_id: str) -> dict: headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) span = opentelemetry.trace.get_current_span() - span.set_attribute("http.request.params", json.dumps(params, indent=4)) + span.set_attribute("http.request.params", json.dumps(params)) span.set_attribute(SpanAttributes.HTTP_HOST, host) while time.time() < timeout: if first_iteration: @@ -347,7 +347,6 @@ def request_and_wait_for_execution_spaces( :param maximum_amount: Maximum amount of execution spaces to checkout. :return: List of checked out execution spaces. """ - span = opentelemetry.trace.get_current_span() try: provider_id = self.start(minimum_amount, maximum_amount) response = self.wait(provider_id) @@ -388,7 +387,6 @@ def wait_for_and_checkout_execution_spaces( :param maximum_amount: Maximum amount of execution spaces to checkout. :return: List of checked out execution spaces. """ - span = opentelemetry.trace.get_current_span() error = None triggered = None try: diff --git a/src/iut_provider/utilities/external_provider.py b/src/iut_provider/utilities/external_provider.py index d78aad0..b1264a2 100644 --- a/src/iut_provider/utilities/external_provider.py +++ b/src/iut_provider/utilities/external_provider.py @@ -23,6 +23,7 @@ import opentelemetry from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.semconv.trace import SpanAttributes import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -91,6 +92,14 @@ def identity(self) -> PackageURL: """ return self.dataset.get("identity") + @staticmethod + def _record_exception(exc) -> None: + """Record the given exception to the current OpenTelemetry span.""" + span = opentelemetry.trace.get_current_span() + span.set_attribute("error.type", exc.__class__.__name__) + span.record_exception(exc) + span.set_status(opentelemetry.trace.Status(opentelemetry.trace.StatusCode.ERROR)) + def checkin(self, iut: Iut) -> None: """Check in IUTs. @@ -113,11 +122,11 @@ def checkin(self, iut: Iut) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) - otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("http.request.host", host) - otel_span.set_attribute("http.request.body", json.dumps(iuts, indent=4)) + span = opentelemetry.trace.get_current_span() + span.set_attribute(SpanAttributes.URL_FULL, host) + span.set_attribute("http.request.body", json.dumps(iuts)) for header, value in headers.items(): - otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end first_iteration = True while time.time() < timeout: @@ -131,16 +140,21 @@ def checkin(self, iut: Iut) -> None: return response = response.json() if response.get("error") is not None: - raise IutCheckinFailed(f"Unable to check in {iuts} ({response.get('error')})") + exc = IutCheckinFailed(f"Unable to check in {iuts} ({response.get('error')})") + self._record_exception(exc) + raise exc except RequestsConnectionError as error: if "connection refused" in str(error).lower(): self.logger.error("Error connecting to %r: %r", host, error) continue + self._record_exception(error) raise except ConnectionError: self.logger.error("Error connecting to %r", host) continue - raise TimeoutError(f"Unable to stop external provider {self.id!r}") + exc = TimeoutError(f"Unable to stop external provider {self.id!r}") + self._record_exception(exc) + raise exc def checkin_all(self) -> None: """Check in all IUTs. @@ -172,11 +186,11 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) - otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("http.request.host", host) - otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) + span = opentelemetry.trace.get_current_span() + span.set_attribute(SpanAttributes.URL_FULL, host) + span.set_attribute("http.request.body", json.dumps(data)) for header, value in headers.items(): - otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + span.set_attribute(f"http.request.headers.{header.lower()}", value) try: response = self.http.post( host, @@ -186,6 +200,7 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: response.raise_for_status() return response.json().get("id") except (HTTPError, JSONDecodeError) as error: + self._record_exception(error) raise Exception(f"Could not start external provider {self.id!r}") from error def wait(self, provider_id: str) -> dict: diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index bc1c494..2c49f6d 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -23,6 +23,7 @@ import opentelemetry from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.semconv.trace import SpanAttributes import requests from etos_lib import ETOS from etos_lib.lib.http import Http @@ -91,6 +92,14 @@ def identity(self) -> PackageURL: """ return self.dataset.get("identity") + @staticmethod + def _record_exception(exc) -> None: + """Record the given exception to the current OpenTelemetry span.""" + span = opentelemetry.trace.get_current_span() + span.set_attribute("error.type", exc.__class__.__name__) + span.record_exception(exc) + span.set_status(opentelemetry.trace.Status(opentelemetry.trace.StatusCode.ERROR)) + def checkin(self, log_area: LogArea) -> None: """Check in log areas. @@ -114,8 +123,8 @@ def checkin(self, log_area: LogArea) -> None: headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("http.request.host", host) - otel_span.set_attribute("http.request.body", json.dumps(log_areas, indent=4)) + otel_span.set_attribute(SpanAttributes.URL_FULL, host) + otel_span.set_attribute("http.request.body", json.dumps(log_areas)) for header, value in headers.items(): otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end @@ -131,18 +140,23 @@ def checkin(self, log_area: LogArea) -> None: return response = response.json() if response.get("error") is not None: - raise LogAreaCheckinFailed( + exc = LogAreaCheckinFailed( f"Unable to check in {log_areas} ({response.get('error')})" ) + self._record_exception(exc) + raise exc except RequestsConnectionError as error: if "connection refused" in str(error).lower(): self.logger.error("Error connecting to %r: %r", host, error) continue + self._record_exception(error) raise except ConnectionError: self.logger.error("Error connecting to %r", host) continue - raise TimeoutError(f"Unable to stop external provider {self.id!r}") + exc = TimeoutError(f"Unable to stop external provider {self.id!r}") + self._record_exception(exc) + raise exc def checkin_all(self) -> None: """Check in all log areas. @@ -174,11 +188,11 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) - otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute("http.request.host", host) - otel_span.set_attribute("http.request.body", json.dumps(data, indent=4)) + span = opentelemetry.trace.get_current_span() + span.set_attribute(SpanAttributes.URL_FULL, host) + span.set_attribute("http.request.body", json.dumps(data)) for header, value in headers.items(): - otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + span.set_attribute(f"http.request.headers.{header.lower()}", value) try: response = self.http.post( host, @@ -188,6 +202,7 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: response.raise_for_status() return response.json().get("id") except (HTTPError, JSONDecodeError) as error: + self._record_exception(error) raise Exception(f"Could not start external provider {self.id!r}") from error def wait(self, provider_id: str) -> dict: @@ -228,14 +243,18 @@ def wait(self, provider_id: str) -> dict: continue if response.get("status") == "FAILED": - raise LogAreaCheckoutFailed(response.get("description")) + exc = LogAreaCheckoutFailed(response.get("description")) + self._record_exception(exc) + raise exc if response.get("status") == "DONE": break else: - raise TimeoutError( + exc = TimeoutError( "Status request timed out after " f"{self.etos.config.get('WAIT_FOR_LOG_AREA_TIMEOUT')}s" ) + self._record_exception(exc) + raise exc return response def check_error(self, response: dict) -> None: @@ -251,9 +270,13 @@ def check_error(self, response: dict) -> None: self.logger.error("Could not parse response as JSON") if response.status_code == requests.codes["not_found"]: - raise LogAreaNotAvailable(f"External provider {self.id!r} did not respond properly") + exc = LogAreaNotAvailable(f"External provider {self.id!r} did not respond properly") + self._record_exception(exc) + raise exc if response.status_code == requests.codes["bad_request"]: - raise RuntimeError(f"Log area provider for {self.id!r} is not properly configured") + exc = RuntimeError(f"Log area provider for {self.id!r} is not properly configured") + self._record_exception(exc) + raise exc # This should work, no other errors found. # If this does not work, propagate JSONDecodeError up the stack. @@ -335,6 +358,7 @@ def wait_for_and_checkout_log_areas( self.etos.events.send_activity_started(triggered) return self.request_and_wait_for_log_areas(minimum_amount, maximum_amount) except Exception as exception: + self._record_exception(exception) error = exception raise finally: From e53b9847764024c77eaa6b6d8780b5acff41f2da Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 2 May 2024 10:50:28 +0200 Subject: [PATCH 14/18] Code review updates --- requirements.txt | 2 +- src/log_area_provider/utilities/external_provider.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/requirements.txt b/requirements.txt index 58c8092..5d16d4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ gunicorn~=19.9 jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 -etos_lib==4.1.1 +etos_lib==4.2.0 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index 2c49f6d..e8045db 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -122,11 +122,11 @@ def checkin(self, log_area: LogArea) -> None: host = self.ruleset.get("stop", {}).get("host") headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) - otel_span = opentelemetry.trace.get_current_span() - otel_span.set_attribute(SpanAttributes.URL_FULL, host) - otel_span.set_attribute("http.request.body", json.dumps(log_areas)) + span = opentelemetry.trace.get_current_span() + span.set_attribute(SpanAttributes.URL_FULL, host) + span.set_attribute("http.request.body", json.dumps(log_areas)) for header, value in headers.items(): - otel_span.set_attribute(f"http.request.headers.{header.lower()}", value) + span.set_attribute(f"http.request.headers.{header.lower()}", value) timeout = time.time() + end first_iteration = True while time.time() < timeout: From 8c5237dae5117752e72221f49b4345f7007bf885 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 2 May 2024 12:19:49 +0200 Subject: [PATCH 15/18] Code review updates --- src/execution_space_provider/utilities/external_provider.py | 1 - src/log_area_provider/utilities/external_provider.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 357c92d..6f05a5e 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -167,7 +167,6 @@ def checkin(self, execution_space: ExecutionSpace) -> None: self._record_exception(exc) raise exc - def checkin_all(self) -> None: """Check in all execution spaces. diff --git a/src/log_area_provider/utilities/external_provider.py b/src/log_area_provider/utilities/external_provider.py index e8045db..152c871 100644 --- a/src/log_area_provider/utilities/external_provider.py +++ b/src/log_area_provider/utilities/external_provider.py @@ -223,7 +223,7 @@ def wait(self, provider_id: str) -> dict: response = None first_iteration = True - headers={"X-ETOS-ID": self.identifier} + headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) while time.time() < timeout: if first_iteration: From 0c56c5effff8c480f837289bfea19868fc3ecd67 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 2 May 2024 12:35:46 +0200 Subject: [PATCH 16/18] Code review changes --- src/environment_provider/environment_provider.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 36eebbc..2657ceb 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -442,12 +442,16 @@ def checkout( self.dataset.add("iut", iut) self.dataset.add("suite", suite) - with self.tracer.start_as_current_span("request_execution_space", kind=SpanKind.CLIENT) as span: + with self.tracer.start_as_current_span( + "request_execution_space", kind=SpanKind.CLIENT + ) as span: span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["executor"] = self.checkout_an_execution_space() self.dataset.add("executor", suite["executor"]) - with self.tracer.start_as_current_span("request_log_area", kind=SpanKind.CLIENT) as span: + with self.tracer.start_as_current_span( + "request_log_area", kind=SpanKind.CLIENT + ) as span: span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["log_area"] = self.checkout_a_log_area() From cfb7b5e02cbde967fed9a0a0f944ec2e183ce27c Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 2 May 2024 12:39:23 +0200 Subject: [PATCH 17/18] Code review changes --- src/environment_provider/environment_provider.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 2657ceb..8dcf1f1 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -444,14 +444,14 @@ def checkout( with self.tracer.start_as_current_span( "request_execution_space", kind=SpanKind.CLIENT - ) as span: + ) as span: span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["executor"] = self.checkout_an_execution_space() self.dataset.add("executor", suite["executor"]) with self.tracer.start_as_current_span( "request_log_area", kind=SpanKind.CLIENT - ) as span: + ) as span: span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner) suite["log_area"] = self.checkout_a_log_area() From f1dcf420b4b9e4e9f1350ac9ca610f56b4f867b9 Mon Sep 17 00:00:00 2001 From: Andrei Matveyeu Date: Thu, 2 May 2024 12:44:34 +0200 Subject: [PATCH 18/18] Code review changes --- src/execution_space_provider/utilities/external_provider.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index 6f05a5e..e79bdff 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -318,7 +318,6 @@ def check_error(self, response: dict) -> None: self._record_exception(exc) raise exc - # This should work, no other errors found. # If this does not work, propagate JSONDecodeError up the stack. self.logger.debug("Status for response %r", response.json().get("status"))