Skip to content

Commit

Permalink
initial support for OpenTelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Sep 9, 2024
1 parent d487ff0 commit 5774103
Show file tree
Hide file tree
Showing 8 changed files with 649 additions and 43 deletions.
61 changes: 42 additions & 19 deletions aggrec/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from bson.objectid import ObjectId
from fastapi import APIRouter, Header, HTTPException, Request, Response, status
from fastapi.responses import StreamingResponse
from opentelemetry import metrics, trace
from pydantic import BaseModel, Field

from aggrec.helpers import RequestVerifier
Expand All @@ -22,6 +23,14 @@

logger = logging.getLogger(__name__)

tracer = trace.get_tracer("aggrec.tracer")
meter = metrics.get_meter("aggrec.meter")

aggregates_counter = meter.create_counter(
"aggregates.counter",
description="The number of aggregates stored",
)


METADATA_HTTP_HEADERS = [
"User-Agent",
Expand Down Expand Up @@ -236,10 +245,13 @@ async def create_aggregate(
],
request: Request,
):
http_request_verifier = RequestVerifier(
client_database=request.app.settings.clients_database
)
res = await http_request_verifier.verify(request)
span = trace.get_current_span()

with tracer.start_as_current_span("http_request_verifier"):
http_request_verifier = RequestVerifier(
client_database=request.app.settings.clients_database
)
res = await http_request_verifier.verify(request)

creator = res.parameters.get("keyid")
logger.info("Create aggregate request by keyid=%s", creator)
Expand All @@ -249,6 +261,10 @@ async def create_aggregate(
aggregate_id = ObjectId()
location = f"/api/v1/aggregates/{aggregate_id}"

span.set_attribute("aggregate.id", str(aggregate_id))
span.set_attribute("aggregate.type", aggregate_type.value)
span.set_attribute("aggregate.creator", creator)

s3_bucket = request.app.settings.s3.get_bucket_name()

if aggregate_interval:
Expand Down Expand Up @@ -287,23 +303,29 @@ async def create_aggregate(
with suppress(Exception):
await s3_client.create_bucket(Bucket=s3_bucket)

await s3_client.put_object(
Bucket=s3_bucket,
Key=metadata.s3_object_key,
Metadata=s3_object_metadata,
ContentType=content_type,
Body=content,
)
with tracer.start_as_current_span("s3.put_object"):
await s3_client.put_object(
Bucket=s3_bucket,
Key=metadata.s3_object_key,
Metadata=s3_object_metadata,
ContentType=content_type,
Body=content,
)
logger.info("Object created: %s", metadata.s3_object_key)

metadata.save()
logger.info("Metadata saved: %s", metadata.id)

aggregates_counter.add(1, {"aggregate_type": aggregate_type.value})

async with request.app.get_mqtt_client() as mqtt_client:
await mqtt_client.publish(
request.app.settings.mqtt.topic,
json.dumps(get_new_aggregate_event_message(metadata, request.app.settings)),
)
with tracer.start_as_current_span("mqtt.publish"):
await mqtt_client.publish(
request.app.settings.mqtt.topic,
json.dumps(
get_new_aggregate_event_message(metadata, request.app.settings)
),
)

return Response(status_code=status.HTTP_201_CREATED, headers={"Location": location})

Expand Down Expand Up @@ -361,10 +383,11 @@ async def get_aggregate_payload(
raise HTTPException(status.HTTP_404_NOT_FOUND) from exc

if metadata := AggregateMetadata.objects(id=aggregate_object_id).first():
async with request.app.get_s3_client() as s3_client:
s3_obj = await s3_client.get_object(
Bucket=metadata.s3_bucket, Key=metadata.s3_object_key
)
with tracer.start_as_current_span("s3.get_object"):
async with request.app.get_s3_client() as s3_client:
s3_obj = await s3_client.get_object(
Bucket=metadata.s3_bucket, Key=metadata.s3_object_key
)

metadata_location = f"/api/v1/aggregates/{aggregate_id}"

Expand Down
54 changes: 33 additions & 21 deletions aggrec/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def main() -> None:
parser.add_argument(
"--gzip", action="store_true", help="Compress payload using GZIP"
)
parser.add_argument(
"--count",
metavar="number",
help="Number of aggregate copies to submit",
type=int,
default=1,
)
parser.add_argument("--debug", action="store_true", help="Enable debugging")

args = parser.parse_args()
Expand Down Expand Up @@ -144,27 +151,32 @@ def main() -> None:
print(f"{k}: {v}")
print("")

resp = session.send(req)
resp.raise_for_status()

print(resp)
for k, v in resp.headers.items():
print(f"{k}: {v}")
print("")
print(resp.text)

location = resp.headers["location"]
resp = session.get(urljoin(args.server, location))
resp.raise_for_status()
print(resp)
print(resp.headers)
print(json.dumps(json.loads(resp.content), indent=4))

resp = session.get(resp.json()["content_location"])
resp.raise_for_status()
print(resp)
print(resp.headers)
print(len(resp.content))
for _ in range(args.count):
resp = session.send(req)
resp.raise_for_status()
print(resp)

if args.count == 1:
for k, v in resp.headers.items():
print(f"{k}: {v}")
print("")
print(resp.text)
else:
print(resp.headers["location"])

if args.count == 1:
location = resp.headers["location"]
resp = session.get(urljoin(args.server, location))
resp.raise_for_status()
print(resp)
print(resp.headers)
print(json.dumps(json.loads(resp.content), indent=4))

resp = session.get(resp.json()["content_location"])
resp.raise_for_status()
print(resp)
print(resp.headers)
print(len(resp.content))


if __name__ == "__main__":
Expand Down
15 changes: 13 additions & 2 deletions aggrec/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from . import OPENAPI_METADATA, __verbose_version__
from .logging import JsonFormatter # noqa
from .settings import Settings
from .telemetry import configure_opentelemetry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,6 +61,12 @@ def __init__(self, settings: Settings):
self.add_middleware(ProxyHeadersMiddleware)
self.include_router(aggrec.aggregates.router)
self.include_router(aggrec.extras.router)
configure_opentelemetry(
self,
spans_endpoint=str(settings.otlp.spans_endpoint),
metrics_endpoint=str(settings.otlp.metrics_endpoint),
insecure=settings.otlp.insecure,
)

@staticmethod
def connect_mongodb(settings: Settings):
Expand All @@ -74,23 +81,27 @@ def connect_mongodb(settings: Settings):
mongoengine.connect(**params, tz_aware=True)

def get_mqtt_client(self) -> aiomqtt.Client:
return aiomqtt.Client(
client = aiomqtt.Client(
hostname=self.settings.mqtt.broker.host,
port=self.settings.mqtt.broker.port,
username=self.settings.mqtt.broker.username,
password=self.settings.mqtt.broker.password,
)
self.logger.debug("Created MQTT client %s", client)
return client

def get_s3_client(self) -> aiobotocore.session.ClientCreatorContext:
session = aiobotocore.session.AioSession()
return session.create_client(
client = session.create_client(
service_name="s3",
endpoint_url=str(self.settings.s3.endpoint_url),
aws_access_key_id=self.settings.s3.access_key_id,
aws_secret_access_key=self.settings.s3.secret_access_key,
aws_session_token=None,
config=boto3.session.Config(signature_version="s3v4"),
)
self.logger.debug("Created S3 client %s", client)
return client

@classmethod
def factory(cls):
Expand Down
7 changes: 7 additions & 0 deletions aggrec/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,19 @@ def get_bucket_name(self) -> str:
return datetime.now(tz=timezone.utc).strftime(self.bucket)


class OtlpSettings(BaseModel):
spans_endpoint: AnyHttpUrl | None = None
metrics_endpoint: AnyHttpUrl | None = None
insecure: bool = False


class Settings(BaseSettings):
metadata_base_url: AnyHttpUrl = Field(default="http://127.0.0.1")
clients_database: DirectoryPath = Field(default="clients")
s3: S3 = Field(default=S3())
mqtt: MqttSettings = Field(default=MqttSettings())
mongodb: MongoDB = Field(default=MongoDB())
otlp: OtlpSettings = Field(default=OtlpSettings())

model_config = SettingsConfigDict(toml_file="aggrec.toml")

Expand Down
46 changes: 46 additions & 0 deletions aggrec/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from fastapi import FastAPI
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter


def configure_opentelemetry(
app: FastAPI,
spans_endpoint: str | None = None,
metrics_endpoint: str | None = None,
insecure: bool = True,
) -> None:
resource = Resource(attributes={SERVICE_NAME: "aggrec"})

traceProvider = TracerProvider(resource=resource)

processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint=spans_endpoint, insecure=insecure)
if spans_endpoint
else ConsoleSpanExporter()
)
traceProvider.add_span_processor(processor)
trace.set_tracer_provider(traceProvider)

reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=metrics_endpoint, insecure=insecure)
if metrics_endpoint
else ConsoleMetricExporter()
)
meterProvider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(meterProvider)

FastAPIInstrumentor.instrument_app(app)
PymongoInstrumentor().instrument()
BotocoreInstrumentor().instrument()
68 changes: 68 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,77 @@ services:
configs:
- source: mosquitto.conf
target: /mosquitto/config/mosquitto.conf
otel-collector:
image: otel/opentelemetry-collector:latest
command: ["--config=/etc/otel-collector-config.yaml"]
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
configs:
- source: otel-collector-config.yaml
target: /etc/otel-collector-config.yaml
jaeger-all-in-one:
# http://127.0.0.1:16686
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268"
- "14250"
prometheus:
# http://127.0.0.1:9090
image: prom/prometheus:latest
ports:
- "9090:9090"
configs:
- source: prometheus.yaml
target: /etc/prometheus/prometheus.yml

configs:
mosquitto.conf:
content: |
listener 1883
allow_anonymous true
otel-collector-config.yaml:
content: |
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
exporters:
debug:
verbosity: detailed
otlp:
endpoint: jaeger-all-in-one:4317
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:8889"
processors:
batch:
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [debug, otlp]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug, prometheus]
logs:
receivers: [otlp]
processors: [batch]
exporters: [debug]
prometheus.yaml:
content: |
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: ['otel-collector:8889']
- targets: ['otel-collector:8888']
Loading

0 comments on commit 5774103

Please sign in to comment.