Skip to content

Commit

Permalink
Add OpenTelemetry tracing (#96)
Browse files Browse the repository at this point in the history
* Add OpenTelemetry tracing
  • Loading branch information
andmat900 authored May 2, 2024
1 parent d8d20c6 commit 0acdb8d
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 65 deletions.
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ gunicorn~=19.9
jsontas~=1.3
packageurl-python~=0.11
etcd3gw~=2.3
etos_lib==4.0.0
etos_lib==4.2.0
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-sdk~=1.21
opentelemetry-instrumentation-requests==0.45b0
5 changes: 4 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ install_requires =
jsontas~=1.3
packageurl-python~=0.11
etcd3gw~=2.3
etos_lib==4.0.0
etos_lib==4.2.0
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-sdk~=1.21

python_requires = >=3.8

Expand Down
47 changes: 33 additions & 14 deletions src/environment_provider/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from etos_lib import ETOS
from jsontas.jsontas import JsonTas
from opentelemetry import trace

from environment_provider.lib.database import ETCDPath
from environment_provider.lib.registry import ProviderRegistry
Expand All @@ -31,6 +32,9 @@
from log_area_provider.log_area import LogArea


TRACER = trace.get_tracer(__name__)


def checkin_provider(
item: dict, provider: Union[IutProvider, ExecutionSpaceProvider, LogAreaProvider]
) -> tuple[bool, Optional[Exception]]:
Expand Down Expand Up @@ -75,21 +79,36 @@ def release_environment(
).get("log")

failure = None
success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset))
if not success:
failure = exception

success, exception = checkin_provider(
LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset)
)
if not success:
failure = exception
success, exception = checkin_provider(
ExecutionSpace(**executor),
ExecutionSpaceProvider(etos, jsontas, executor_ruleset),
)
if not success:
failure = exception
span_name = "stop_iuts"
with TRACER.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT) as span:
success, exception = checkin_provider(Iut(**iut), IutProvider(etos, jsontas, iut_ruleset))
if not success:
span.record_exception(exception)
span.set_status(trace.Status(trace.StatusCode.ERROR))
failure = exception

span_name = "stop_log_area"
with TRACER.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT) as span:
success, exception = checkin_provider(
LogArea(**log_area), LogAreaProvider(etos, jsontas, log_area_ruleset)
)
if not success:
span.record_exception(exception)
span.set_status(trace.Status(trace.StatusCode.ERROR))
failure = exception

span_name = "stop_execution_space"
with TRACER.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT):
success, exception = checkin_provider(
ExecutionSpace(**executor),
ExecutionSpaceProvider(etos, jsontas, executor_ruleset),
)
if not success:
span.record_exception(exception)
span.set_status(trace.Status(trace.StatusCode.ERROR))
failure = exception

return failure


Expand Down
42 changes: 29 additions & 13 deletions src/environment_provider/environment_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
from etos_lib.etos import ETOS
from etos_lib.lib.events import EiffelEnvironmentDefinedEvent
from etos_lib.logging.logger import FORMAT_CONFIG
from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes
from jsontas.jsontas import JsonTas
import opentelemetry
from opentelemetry.trace import SpanKind

from execution_space_provider.execution_space import ExecutionSpace
from log_area_provider.log_area import LogArea
Expand Down Expand Up @@ -78,6 +81,7 @@ def __init__(self, suite_id: str, suite_runner_ids: list[str], copy: bool = True
"""
FORMAT_CONFIG.identifier = suite_id
self.logger.info("Initializing EnvironmentProvider task.")
self.tracer = opentelemetry.trace.get_tracer(__name__)

self.etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider")

Expand Down Expand Up @@ -414,14 +418,17 @@ def checkout(
timeout = self.checkout_timeout()
while time.time() < timeout:
self.set_total_test_count_and_test_runners(test_runners)
# Check out and assign IUTs to test runners.
iuts = self.iut_provider.wait_for_and_checkout_iuts(
minimum_amount=1,
maximum_amount=self.dataset.get(
"maximum_amount", self.etos.config.get("TOTAL_TEST_COUNT")
),
)
self.splitter.assign_iuts(test_runners, iuts)

with self.tracer.start_as_current_span("request_iuts", kind=SpanKind.CLIENT) as span:
# Check out and assign IUTs to test runners.
iuts = self.iut_provider.wait_for_and_checkout_iuts(
minimum_amount=1,
maximum_amount=self.dataset.get(
"maximum_amount", self.etos.config.get("TOTAL_TEST_COUNT")
),
)
self.splitter.assign_iuts(test_runners, iuts)
span.set_attribute(SemConvAttributes.IUT_DESCRIPTION, str(iuts))

for test_runner in test_runners.keys():
self.dataset.add("test_runner", test_runner)
Expand All @@ -434,9 +441,19 @@ def checkout(
for iut, suite in test_runners[test_runner].get("iuts", {}).items():
self.dataset.add("iut", iut)
self.dataset.add("suite", suite)
suite["executor"] = self.checkout_an_execution_space()
self.dataset.add("executor", suite["executor"])
suite["log_area"] = self.checkout_a_log_area()

with self.tracer.start_as_current_span(
"request_execution_space", kind=SpanKind.CLIENT
) as span:
span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner)
suite["executor"] = self.checkout_an_execution_space()
self.dataset.add("executor", suite["executor"])

with self.tracer.start_as_current_span(
"request_log_area", kind=SpanKind.CLIENT
) as span:
span.set_attribute(SemConvAttributes.TEST_RUNNER_ID, test_runner)
suite["log_area"] = self.checkout_a_log_area()

# Split the tests into sub suites
self.splitter.split(test_runners[test_runner])
Expand All @@ -446,8 +463,7 @@ def checkout(
sub_suite = test_suite.add(
test_runner, iut, suite, test_runners[test_runner]["priority"]
)
url = self.upload_sub_suite(sub_suite)
self.send_environment_events(url, sub_suite)
self.send_environment_events(self.upload_sub_suite(sub_suite), sub_suite)

self.logger.info(
"Environment for %r checked out and is ready for use",
Expand Down
90 changes: 72 additions & 18 deletions src/execution_space_provider/utilities/external_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
# limitations under the License.
"""External Execution Space provider."""
import logging
import json
import os
import time
from copy import deepcopy
from json.decoder import JSONDecodeError

import opentelemetry
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

import requests
from etos_lib import ETOS
from etos_lib.lib.http import Http
Expand Down Expand Up @@ -94,6 +99,14 @@ def identity(self) -> PackageURL:
"""
return self.dataset.get("identity")

@staticmethod
def _record_exception(exc) -> None:
"""Record the given exception to the current OpenTelemetry span."""
span = opentelemetry.trace.get_current_span()
span.set_attribute("error.type", exc.__class__.__name__)
span.record_exception(exc)
span.set_status(opentelemetry.trace.Status(opentelemetry.trace.StatusCode.ERROR))

def checkin(self, execution_space: ExecutionSpace) -> None:
"""Check in execution spaces.
Expand All @@ -114,6 +127,13 @@ def checkin(self, execution_space: ExecutionSpace) -> None:
execution_spaces = [execution_space.as_dict for execution_space in execution_space]

host = self.ruleset.get("stop", {}).get("host")
headers = {"X-ETOS-ID": self.identifier}
TraceContextTextMapPropagator().inject(headers)
span = opentelemetry.trace.get_current_span()
span.set_attribute("http.request.body", json.dumps(execution_spaces))
span.set_attribute(SpanAttributes.URL_FULL, host)
for header, value in headers.items():
span.set_attribute(f"http.request.headers.{header.lower()}", value)
timeout = time.time() + end
first_iteration = True
while time.time() < timeout:
Expand All @@ -122,25 +142,30 @@ def checkin(self, execution_space: ExecutionSpace) -> None:
else:
time.sleep(2)
try:
response = requests.post(
host, json=execution_spaces, headers={"X-ETOS-ID": self.identifier}
)
response = requests.post(host, json=execution_spaces, headers=headers)
span.set_attribute(SpanAttributes.HTTP_RESPONSE_STATUS_CODE, response.status_code)
if response.status_code == requests.codes["no_content"]:
return
response = response.json()
if response.get("error") is not None:
raise ExecutionSpaceCheckinFailed(
exc = ExecutionSpaceCheckinFailed(
f"Unable to check in {execution_spaces} " f"({response.get('error')})"
)
self._record_exception(exc)
raise exc
except RequestsConnectionError as error:
if "connection refused" in str(error).lower():
self.logger.error("Error connecting to %r: %r", host, error)
continue
span.record_exception(exc)
raise exc
raise
except ConnectionError:
self.logger.error("Error connecting to %r", host)
continue
raise TimeoutError(f"Unable to stop external provider {self.id!r}")
exc = TimeoutError(f"Unable to stop external provider {self.id!r}")
self._record_exception(exc)
raise exc

def checkin_all(self) -> None:
"""Check in all execution spaces.
Expand Down Expand Up @@ -191,16 +216,28 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str:
"dataset": self.dataset.get("dataset"),
"context": self.dataset.get("context"),
}
host = self.ruleset.get("start", {}).get("host")
headers = {"X-ETOS-ID": self.identifier}
TraceContextTextMapPropagator().inject(headers)
span = opentelemetry.trace.get_current_span()
span.set_attribute(SpanAttributes.HTTP_HOST, host)
span.set_attribute("http.request.body", json.dumps(data))
for header, value in headers.items():
span.set_attribute(f"http.request.headers.{header.lower()}", value)

try:
response = self.http.post(
self.ruleset.get("start", {}).get("host"),
host,
json=data,
headers={"X-ETOS-ID": self.identifier},
headers=headers,
)
span.set_attribute(SpanAttributes.HTTP_RESPONSE_STATUS_CODE, response.status_code)
response.raise_for_status()
return response.json().get("id")
except (HTTPError, JSONDecodeError) as error:
raise Exception(f"Could not start external provider {self.id!r}") from error
exc = Exception(f"Could not start external provider {self.id!r}")
self._record_exception(exc)
raise exc from error

def wait(self, provider_id: str) -> dict:
"""Wait for external execution space provider to finish its request.
Expand All @@ -215,9 +252,14 @@ def wait(self, provider_id: str) -> dict:

host = self.ruleset.get("status", {}).get("host")
timeout = time.time() + self.etos.config.get("WAIT_FOR_EXECUTION_SPACE_TIMEOUT")

params = {"id": provider_id}
response = None
first_iteration = True
headers = {"X-ETOS-ID": self.identifier}
TraceContextTextMapPropagator().inject(headers)
span = opentelemetry.trace.get_current_span()
span.set_attribute("http.request.params", json.dumps(params))
span.set_attribute(SpanAttributes.HTTP_HOST, host)
while time.time() < timeout:
if first_iteration:
first_iteration = False
Expand All @@ -226,8 +268,8 @@ def wait(self, provider_id: str) -> dict:
try:
response = requests.get(
host,
params={"id": provider_id},
headers={"X-ETOS-ID": self.identifier},
params=params,
headers=headers,
)
self.check_error(response)
response = response.json()
Expand All @@ -236,21 +278,26 @@ def wait(self, provider_id: str) -> dict:
continue

if response.get("status") == "FAILED":
raise ExecutionSpaceCheckoutFailed(response.get("description"))
exc = ExecutionSpaceCheckoutFailed(response.get("description"))
self._record_exception(exc)
raise exc
if response.get("status") == "DONE":
break
else:
raise TimeoutError(
exc = TimeoutError(
"Status request timed out after "
f"{self.etos.config.get('WAIT_FOR_EXECUTION_SPACE_TIMEOUT')}s"
)
self._record_exception(exc)
raise exc
return response

def check_error(self, response: dict) -> None:
"""Check response for errors and try to translate them to something usable.
:param response: The response from the external execution space provider.
"""
span = opentelemetry.trace.get_current_span()
self.logger.debug("Checking response from external execution space provider")
try:
if response.json().get("error") is not None:
Expand All @@ -259,13 +306,17 @@ def check_error(self, response: dict) -> None:
self.logger.error("Could not parse response as JSON")

if response.status_code == requests.codes["not_found"]:
raise ExecutionSpaceNotAvailable(
exc = ExecutionSpaceNotAvailable(
f"External provider {self.id!r} did not respond properly"
)
self._record_exception(exc)
raise exc
if response.status_code == requests.codes["bad_request"]:
raise RuntimeError(
exc = RuntimeError(
f"Execution space provider for {self.id!r} is not properly configured"
)
self._record_exception(exc)
raise exc

# This should work, no other errors found.
# If this does not work, propagate JSONDecodeError up the stack.
Expand Down Expand Up @@ -299,7 +350,9 @@ def request_and_wait_for_execution_spaces(
response = self.wait(provider_id)
execution_spaces = self.build_execution_spaces(response)
if len(execution_spaces) < minimum_amount:
raise ExecutionSpaceNotAvailable(self.id)
exc = ExecutionSpaceNotAvailable(self.id)
self._record_exception(exc)
raise exc
if len(execution_spaces) > maximum_amount:
self.logger.warning(
"Too many execution spaces from external execution space provider "
Expand Down Expand Up @@ -353,8 +406,9 @@ def wait_for_and_checkout_execution_spaces(
)
self.etos.events.send_activity_started(triggered)
return self.request_and_wait_for_execution_spaces(minimum_amount, maximum_amount)
except Exception as exception:
error = exception
except Exception as exc:
self._record_exception(exc)
error = exc
raise
finally:
if error is None:
Expand Down
Loading

0 comments on commit 0acdb8d

Please sign in to comment.