From b4a03ced70505983ceaf4f0c0a79f04be3a58632 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 12:09:49 +0100 Subject: [PATCH 01/15] add aggregate interval --- aggrec/aggregates.py | 65 +++++++++++++++++++++++++++++++++++--------- aggrec/client.py | 13 ++++++++- aggrec/db_models.py | 13 ++++++++- aggrec/helpers.py | 14 ++++++++++ aggrec/openapi.yaml | 7 +++++ poetry.lock | 47 +++++++++++++++++++++++++++++++- pyproject.toml | 3 +- 7 files changed, 145 insertions(+), 17 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index a0f5337..eed2d2a 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -1,21 +1,23 @@ import json import logging +from datetime import datetime, timezone from enum import Enum from functools import lru_cache -from typing import Annotated, Dict +from typing import Annotated, Dict, Optional from urllib.parse import urljoin import aiobotocore.session import aiomqtt import boto3 import bson +import pendulum from bson.objectid import ObjectId from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from fastapi.responses import StreamingResponse from pydantic import BaseModel from .db_models import AggregateMetadata -from .helpers import RequestVerifier, rfc_3339_datetime_now +from .helpers import RequestVerifier, pendulum_as_datetime, rfc_3339_datetime_now from .settings import Settings logger = logging.getLogger(__name__) @@ -53,6 +55,9 @@ class AggregateMetadataResponse(BaseModel): content_location: str s3_bucket: str s3_object_key: str + aggregate_interval: Optional[str] = None + aggregate_interval_start: Optional[datetime] = None + aggregate_interval_duration: Optional[int] = None @classmethod def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): @@ -60,6 +65,9 @@ def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): return cls( aggregate_id=aggregate_id, aggregate_type=metadata.aggregate_type.value, + aggregate_interval=metadata.aggregate_interval, + aggregate_interval_start=metadata.aggregate_interval_start, + aggregate_interval_duration=metadata.aggregate_interval_duration, created=metadata.id.generation_time.strftime("%Y-%m-%dT%H:%M:%SZ"), creator=str(metadata.creator), headers=metadata.http_headers, @@ -133,22 +141,38 @@ def get_new_aggregate_event_message( ), "s3_bucket": metadata.s3_bucket, "s3_object_key": metadata.s3_object_key, + **( + { + "aggregate_interval": metadata.aggregate_interval, + "aggregate_interval_start": metadata.aggregate_interval_start.astimezone( + tz=timezone.utc + ).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ), + "aggregate_interval_duration": metadata.aggregate_interval_duration, + } + if metadata.aggregate_interval + else {} + ), } def get_s3_object_key(metadata: AggregateMetadata) -> str: """Get S3 object key from metadata""" - dt = metadata.id.generation_time - return "/".join( - [ - f"type={metadata.aggregate_type.name.lower()}", - f"year={dt.year}", - f"month={dt.month}", - f"day={dt.day}", - f"creator={metadata.creator}", - f"id={metadata.id}", - ] - ) + dt = metadata.aggregate_interval_start or metadata.id.generation_time + fields_dict = { + "type": metadata.aggregate_type.name.lower(), + "length": metadata.aggregate_interval_duration, + "year": dt.year, + "month": dt.month, + "day": dt.day, + "hour": dt.minute, + "minute": dt.second, + "creator": metadata.creator, + "id": metadata.id, + } + fields_list = [f"{k}={v}" for k, v in fields_dict.items() if v is not None] + return "/".join(fields_list) @router.post("/api/v1/aggregate/{aggregate_type}") @@ -178,9 +202,24 @@ async def create_aggregate( s3_bucket = settings.s3_bucket + if aggregate_interval := request.headers.get("Aggregate-Interval"): + period = pendulum.parse(aggregate_interval) + if not isinstance(period, pendulum.Period): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "Invalid Aggregate-Interval" + ) + aggregate_interval_start = pendulum_as_datetime(period.start) + aggregate_interval_duration = period.start.diff(period.end).in_seconds() + else: + aggregate_interval_start = None + aggregate_interval_duration = None + metadata = AggregateMetadata( id=aggregate_id, aggregate_type=aggregate_type, + aggregate_interval=aggregate_interval, + aggregate_interval_start=aggregate_interval_start, + aggregate_interval_duration=aggregate_interval_duration, creator=creator, http_headers=get_http_headers(request), content_type=content_type, diff --git a/aggrec/client.py b/aggrec/client.py index e19dfff..56ec832 100644 --- a/aggrec/client.py +++ b/aggrec/client.py @@ -6,6 +6,7 @@ from urllib.parse import urljoin import http_sfv +import pendulum import requests from cryptography.hazmat.primitives.serialization import load_pem_private_key from http_message_signatures import ( @@ -32,11 +33,19 @@ def main() -> None: parser = argparse.ArgumentParser(description="Aggregate Sender") + default_interval = f"{pendulum.now().to_iso8601_string()}/PT1M" + parser.add_argument( "aggregate", metavar="filename", help="Aggregate payload", ) + parser.add_argument( + "--interval", + metavar="interval", + help="Aggregate interval", + default=default_interval, + ) parser.add_argument( "--tls-cert-file", metavar="filename", @@ -105,6 +114,8 @@ def main() -> None: req.headers["Content-Encoding"] = "gzip" covered_component_ids.append("content-encoding") + req.headers["Aggregate-Interval"] = args.interval + req = req.prepare() req.headers["Content-Type"] = DEFAULT_CONTENT_TYPE req.headers["Content-Digest"] = str( @@ -142,7 +153,7 @@ def main() -> None: resp.raise_for_status() print(resp) print(resp.headers) - print(json.loads(resp.content)) + print(json.dumps(json.loads(resp.content), indent=4)) resp = session.get(resp.json()["content_location"]) resp.raise_for_status() diff --git a/aggrec/db_models.py b/aggrec/db_models.py index a46d171..7cefd7e 100644 --- a/aggrec/db_models.py +++ b/aggrec/db_models.py @@ -1,6 +1,13 @@ from enum import Enum -from mongoengine import DictField, Document, EnumField, IntField, StringField +from mongoengine import ( + DateTimeField, + DictField, + Document, + EnumField, + IntField, + StringField, +) class AggregateType(Enum): @@ -22,3 +29,7 @@ class AggregateMetadata(Document): s3_bucket = StringField() s3_object_key = StringField() + + aggregate_interval = StringField() + aggregate_interval_start = DateTimeField() + aggregate_interval_duration = IntField() diff --git a/aggrec/helpers.py b/aggrec/helpers.py index fffb1a8..31fa946 100644 --- a/aggrec/helpers.py +++ b/aggrec/helpers.py @@ -3,6 +3,7 @@ from datetime import datetime, timezone import http_sfv +import pendulum from cryptography.hazmat.primitives.serialization import load_pem_public_key from fastapi import HTTPException, Request, status from http_message_signatures import ( @@ -107,3 +108,16 @@ async def verify(self, request: Request) -> dict: def rfc_3339_datetime_now() -> str: """Return current time(UTC) as ISO 8601 timestamp""" return str(datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")) + + +def pendulum_as_datetime(dt: pendulum.DateTime) -> datetime: + return datetime( + year=dt.year, + month=dt.month, + day=dt.day, + hour=dt.hour, + minute=dt.minute, + second=dt.second, + microsecond=dt.microsecond, + tzinfo=dt.tzinfo, + ) diff --git a/aggrec/openapi.yaml b/aggrec/openapi.yaml index e1cf585..d51c61b 100644 --- a/aggrec/openapi.yaml +++ b/aggrec/openapi.yaml @@ -52,6 +52,13 @@ paths: required: false schema: type: string + - name: Aggregate-Interval + description: Aggregate window as an ISO 8601 time interval (start and duration) + in: header + required: false + schema: + type: string + example: "1984-01-01T12:00:00Z/PT1M" requestBody: description: Aggregate as Apache Parquet content: diff --git a/poetry.lock b/poetry.lock index 34dc434..9cdd502 100644 --- a/poetry.lock +++ b/poetry.lock @@ -978,6 +978,40 @@ files = [ {file = "pathspec-0.11.2.tar.gz", hash = "sha256:e0d8d0ac2f12da61956eb2306b69f9469b42f4deb0f3cb6ed47b9cce9996ced3"}, ] +[[package]] +name = "pendulum" +version = "2.1.2" +description = "Python datetimes made easy" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +files = [ + {file = "pendulum-2.1.2-cp27-cp27m-macosx_10_15_x86_64.whl", hash = "sha256:b6c352f4bd32dff1ea7066bd31ad0f71f8d8100b9ff709fb343f3b86cee43efe"}, + {file = "pendulum-2.1.2-cp27-cp27m-win_amd64.whl", hash = "sha256:318f72f62e8e23cd6660dbafe1e346950281a9aed144b5c596b2ddabc1d19739"}, + {file = "pendulum-2.1.2-cp35-cp35m-macosx_10_15_x86_64.whl", hash = "sha256:0731f0c661a3cb779d398803655494893c9f581f6488048b3fb629c2342b5394"}, + {file = "pendulum-2.1.2-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:3481fad1dc3f6f6738bd575a951d3c15d4b4ce7c82dce37cf8ac1483fde6e8b0"}, + {file = "pendulum-2.1.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:9702069c694306297ed362ce7e3c1ef8404ac8ede39f9b28b7c1a7ad8c3959e3"}, + {file = "pendulum-2.1.2-cp35-cp35m-win_amd64.whl", hash = "sha256:fb53ffa0085002ddd43b6ca61a7b34f2d4d7c3ed66f931fe599e1a531b42af9b"}, + {file = "pendulum-2.1.2-cp36-cp36m-macosx_10_15_x86_64.whl", hash = "sha256:c501749fdd3d6f9e726086bf0cd4437281ed47e7bca132ddb522f86a1645d360"}, + {file = "pendulum-2.1.2-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:c807a578a532eeb226150d5006f156632df2cc8c5693d778324b43ff8c515dd0"}, + {file = "pendulum-2.1.2-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:2d1619a721df661e506eff8db8614016f0720ac171fe80dda1333ee44e684087"}, + {file = "pendulum-2.1.2-cp36-cp36m-win_amd64.whl", hash = "sha256:f888f2d2909a414680a29ae74d0592758f2b9fcdee3549887779cd4055e975db"}, + {file = "pendulum-2.1.2-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:e95d329384717c7bf627bf27e204bc3b15c8238fa8d9d9781d93712776c14002"}, + {file = "pendulum-2.1.2-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:4c9c689747f39d0d02a9f94fcee737b34a5773803a64a5fdb046ee9cac7442c5"}, + {file = "pendulum-2.1.2-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:1245cd0075a3c6d889f581f6325dd8404aca5884dea7223a5566c38aab94642b"}, + {file = "pendulum-2.1.2-cp37-cp37m-win_amd64.whl", hash = "sha256:db0a40d8bcd27b4fb46676e8eb3c732c67a5a5e6bfab8927028224fbced0b40b"}, + {file = "pendulum-2.1.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:f5e236e7730cab1644e1b87aca3d2ff3e375a608542e90fe25685dae46310116"}, + {file = "pendulum-2.1.2-cp38-cp38-manylinux1_i686.whl", hash = "sha256:de42ea3e2943171a9e95141f2eecf972480636e8e484ccffaf1e833929e9e052"}, + {file = "pendulum-2.1.2-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7c5ec650cb4bec4c63a89a0242cc8c3cebcec92fcfe937c417ba18277d8560be"}, + {file = "pendulum-2.1.2-cp38-cp38-win_amd64.whl", hash = "sha256:33fb61601083f3eb1d15edeb45274f73c63b3c44a8524703dc143f4212bf3269"}, + {file = "pendulum-2.1.2-cp39-cp39-manylinux1_i686.whl", hash = "sha256:29c40a6f2942376185728c9a0347d7c0f07905638c83007e1d262781f1e6953a"}, + {file = "pendulum-2.1.2-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:94b1fc947bfe38579b28e1cccb36f7e28a15e841f30384b5ad6c5e31055c85d7"}, + {file = "pendulum-2.1.2.tar.gz", hash = "sha256:b06a0ca1bfe41c990bbf0c029f0b6501a7f2ec4e38bfec730712015e8860f207"}, +] + +[package.dependencies] +python-dateutil = ">=2.6,<3.0" +pytzdata = ">=2020.1" + [[package]] name = "platformdirs" version = "4.0.0" @@ -1412,6 +1446,17 @@ files = [ [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "pytzdata" +version = "2020.1" +description = "The Olson timezone database for Python." +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +files = [ + {file = "pytzdata-2020.1-py2.py3-none-any.whl", hash = "sha256:e1e14750bcf95016381e4d472bad004eef710f2d6417240904070b3d6654485f"}, + {file = "pytzdata-2020.1.tar.gz", hash = "sha256:3efa13b335a00a8de1d345ae41ec78dd11c9f8807f522d39850f2dd828681540"}, +] + [[package]] name = "requests" version = "2.31.0" @@ -1759,4 +1804,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "87389ad5cafc8044ece42cfacab2bf2bae0bfffa10ff0e77beec21e90f16d558" +content-hash = "c3ff6916296c736ad5c2c391dde076bd41148cc79d5ecf8bd4bfba1a5fc0cf9b" diff --git a/pyproject.toml b/pyproject.toml index 939d2f9..dbd7b03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aggrec" -version = "0.4.0" +version = "0.5.0" description = "DNS TAPIR Aggregate Receiver" authors = ["Jakob Schlyter "] readme = "README.md" @@ -23,6 +23,7 @@ aiobotocore = "^2.7.0" aiomqtt = "^1.2.1" setuptools = "^68.2.2" jsonformatter = "^0.3.2" +pendulum = "^2.1.2" [tool.poetry.group.dev.dependencies] isort = "^5.12.0" From a5aba263c34ece8281fa1273872cb6be53b327d7 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 12:49:23 +0100 Subject: [PATCH 02/15] rename some properties drop raw aggregate_interval from responses --- aggrec/aggregates.py | 11 +++-------- aggrec/openapi.yaml | 13 ++++++++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index eed2d2a..0d895c0 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -55,7 +55,6 @@ class AggregateMetadataResponse(BaseModel): content_location: str s3_bucket: str s3_object_key: str - aggregate_interval: Optional[str] = None aggregate_interval_start: Optional[datetime] = None aggregate_interval_duration: Optional[int] = None @@ -65,7 +64,6 @@ def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): return cls( aggregate_id=aggregate_id, aggregate_type=metadata.aggregate_type.value, - aggregate_interval=metadata.aggregate_interval, aggregate_interval_start=metadata.aggregate_interval_start, aggregate_interval_duration=metadata.aggregate_interval_duration, created=metadata.id.generation_time.strftime("%Y-%m-%dT%H:%M:%SZ"), @@ -143,13 +141,10 @@ def get_new_aggregate_event_message( "s3_object_key": metadata.s3_object_key, **( { - "aggregate_interval": metadata.aggregate_interval, - "aggregate_interval_start": metadata.aggregate_interval_start.astimezone( + "interval_start": metadata.aggregate_interval_start.astimezone( tz=timezone.utc - ).strftime( - "%Y-%m-%dT%H:%M:%SZ" - ), - "aggregate_interval_duration": metadata.aggregate_interval_duration, + ).strftime("%Y-%m-%dT%H:%M:%SZ"), + "interval_duration": metadata.aggregate_interval_duration, } if metadata.aggregate_interval else {} diff --git a/aggrec/openapi.yaml b/aggrec/openapi.yaml index d51c61b..af305cc 100644 --- a/aggrec/openapi.yaml +++ b/aggrec/openapi.yaml @@ -124,15 +124,15 @@ components: description: Aggregate metadata type: object required: - - id + - aggregate_id - aggregate_type - created - content_type - content_payload_location properties: - id: + aggregate_id: $ref: '#/components/schemas/aggregate_id' - type: + aggregate_type: $ref: '#/components/schemas/aggregate_type' created: type: string @@ -154,6 +154,13 @@ components: type: string s3_object_key: type: string + aggregate_interval_start: + type: string + format: date-time + aggregate_interval_duration: + type: integer + format: int64 + minimum: 0 aggregate_id: type: string From 351c3a7595348baf76013a00ee16ae7a18d65f15 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 12:50:52 +0100 Subject: [PATCH 03/15] fix type --- aggrec/aggregates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index 0d895c0..db40282 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -47,7 +47,7 @@ class AggregateType(str, Enum): class AggregateMetadataResponse(BaseModel): aggregate_id: str aggregate_type: AggregateType - created: str + created: Optional[datetime] creator: str headers: dict content_type: str From 85e887f9b34998ec5740fd996b7c44daababbdd2 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 12:55:33 +0100 Subject: [PATCH 04/15] store raw aggregate interval as header --- aggrec/aggregates.py | 5 +++-- aggrec/db_models.py | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index db40282..0f58916 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -30,6 +30,7 @@ "Content-Encoding", "Signature", "Signature-Input", + "Aggregate-Interval", ] ALLOWED_AGGREGATE_TYPES = ["histogram", "vector"] @@ -146,7 +147,8 @@ def get_new_aggregate_event_message( ).strftime("%Y-%m-%dT%H:%M:%SZ"), "interval_duration": metadata.aggregate_interval_duration, } - if metadata.aggregate_interval + if metadata.aggregate_interval_start + and metadata.aggregate_interval_duration else {} ), } @@ -212,7 +214,6 @@ async def create_aggregate( metadata = AggregateMetadata( id=aggregate_id, aggregate_type=aggregate_type, - aggregate_interval=aggregate_interval, aggregate_interval_start=aggregate_interval_start, aggregate_interval_duration=aggregate_interval_duration, creator=creator, diff --git a/aggrec/db_models.py b/aggrec/db_models.py index 7cefd7e..318a246 100644 --- a/aggrec/db_models.py +++ b/aggrec/db_models.py @@ -30,6 +30,5 @@ class AggregateMetadata(Document): s3_bucket = StringField() s3_object_key = StringField() - aggregate_interval = StringField() aggregate_interval_start = DateTimeField() aggregate_interval_duration = IntField() From ec515f400f139e6c62c33075de7be286d19bb7af Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 13:06:10 +0100 Subject: [PATCH 05/15] pad for lazy people --- aggrec/aggregates.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index 0f58916..d1510b0 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -160,11 +160,11 @@ def get_s3_object_key(metadata: AggregateMetadata) -> str: fields_dict = { "type": metadata.aggregate_type.name.lower(), "length": metadata.aggregate_interval_duration, - "year": dt.year, - "month": dt.month, - "day": dt.day, - "hour": dt.minute, - "minute": dt.second, + "year": f"{dt.year:04}", + "month": f"{dt.month:02}", + "day": f"{dt.day:02}", + "hour": f"{dt.minute:02}", + "minute": f"{dt.second:02}", "creator": metadata.creator, "id": metadata.id, } From 5a66c7e53d6b4852f6d2aa1829b6f4fd4da792fa Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 13:36:55 +0100 Subject: [PATCH 06/15] drop length from s3 object key --- aggrec/aggregates.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index d1510b0..fed047f 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -159,7 +159,6 @@ def get_s3_object_key(metadata: AggregateMetadata) -> str: dt = metadata.aggregate_interval_start or metadata.id.generation_time fields_dict = { "type": metadata.aggregate_type.name.lower(), - "length": metadata.aggregate_interval_duration, "year": f"{dt.year:04}", "month": f"{dt.month:02}", "day": f"{dt.day:02}", From 2348efa69a882cbc42299830298a7cbf11072d86 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 13:38:34 +0100 Subject: [PATCH 07/15] fix embarrasing typo --- aggrec/aggregates.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index fed047f..c9e31d1 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -162,8 +162,8 @@ def get_s3_object_key(metadata: AggregateMetadata) -> str: "year": f"{dt.year:04}", "month": f"{dt.month:02}", "day": f"{dt.day:02}", - "hour": f"{dt.minute:02}", - "minute": f"{dt.second:02}", + "hour": f"{dt.hour:02}", + "minute": f"{dt.minute:02}", "creator": metadata.creator, "id": metadata.id, } From 73148f748ffa8788ca13f694e72aa5bc9b385210 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 14:31:32 +0100 Subject: [PATCH 08/15] fix type --- aggrec/aggregates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index c9e31d1..e521e46 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -48,7 +48,7 @@ class AggregateType(str, Enum): class AggregateMetadataResponse(BaseModel): aggregate_id: str aggregate_type: AggregateType - created: Optional[datetime] + created: str creator: str headers: dict content_type: str From cabb8d86b42e22ab4176f648aa8d2870d986ce20 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Thu, 16 Nov 2023 07:35:27 +0100 Subject: [PATCH 09/15] annotate --- aggrec/aggregates.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index e521e46..3b9f89e 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -14,7 +14,7 @@ from bson.objectid import ObjectId from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from fastapi.responses import StreamingResponse -from pydantic import BaseModel +from pydantic import BaseModel, Field from .db_models import AggregateMetadata from .helpers import RequestVerifier, pendulum_as_datetime, rfc_3339_datetime_now @@ -46,18 +46,22 @@ class AggregateType(str, Enum): class AggregateMetadataResponse(BaseModel): - aggregate_id: str - aggregate_type: AggregateType - created: str - creator: str - headers: dict - content_type: str - content_length: int - content_location: str - s3_bucket: str - s3_object_key: str - aggregate_interval_start: Optional[datetime] = None - aggregate_interval_duration: Optional[int] = None + aggregate_id: str = Field(title="Aggregate identifier") + aggregate_type: AggregateType = Field(title="Aggregate type") + created: datetime = Field(title="Aggregate creation timestamp") + creator: str = Field(title="Aggregate creator") + headers: dict = Field(title="Dictionary of relevant HTTP headers") + content_type: str = Field(title="Content MIME type") + content_length: int = Field(title="Content length") + content_location: str = Field(title="Content local (URL)") + s3_bucket: str = Field(title="S3 Bucket Name") + s3_object_key: str = Field(title="S3 Object Key") + aggregate_interval_start: datetime | None = Field( + default=None, title="Aggregate interval start" + ) + aggregate_interval_duration: int | None = Field( + default=None, title="Aggregate interval duration (seconds)" + ) @classmethod def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): @@ -67,7 +71,7 @@ def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): aggregate_type=metadata.aggregate_type.value, aggregate_interval_start=metadata.aggregate_interval_start, aggregate_interval_duration=metadata.aggregate_interval_duration, - created=metadata.id.generation_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + created=metadata.id.generation_time, creator=str(metadata.creator), headers=metadata.http_headers, content_type=metadata.content_type, From 96bcf335e745f35c27b506e6048394999262b5f0 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Thu, 16 Nov 2023 07:37:23 +0100 Subject: [PATCH 10/15] cover aggregate-interval --- aggrec/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggrec/client.py b/aggrec/client.py index 56ec832..ce47fdd 100644 --- a/aggrec/client.py +++ b/aggrec/client.py @@ -16,7 +16,7 @@ ) DEFAULT_CONTENT_TYPE = "application/vnd.apache.parquet" -DEFAULT_COVERED_COMPONENT_IDS = ["content-type", "content-digest", "content-length"] +DEFAULT_COVERED_COMPONENT_IDS = ["content-type", "content-digest", "content-length", "aggregate-interval"] class MyHTTPSignatureKeyResolver(HTTPSignatureKeyResolver): From f333a0def90bcc745bec6e0a26c923f42cda77c2 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Thu, 16 Nov 2023 07:41:09 +0100 Subject: [PATCH 11/15] reformat --- aggrec/client.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/aggrec/client.py b/aggrec/client.py index ce47fdd..d6c12d5 100644 --- a/aggrec/client.py +++ b/aggrec/client.py @@ -16,7 +16,12 @@ ) DEFAULT_CONTENT_TYPE = "application/vnd.apache.parquet" -DEFAULT_COVERED_COMPONENT_IDS = ["content-type", "content-digest", "content-length", "aggregate-interval"] +DEFAULT_COVERED_COMPONENT_IDS = [ + "content-type", + "content-digest", + "content-length", + "aggregate-interval", +] class MyHTTPSignatureKeyResolver(HTTPSignatureKeyResolver): From a65cd79512672a0684506e8afd4d39ecc3234ccd Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Thu, 16 Nov 2023 07:41:28 +0100 Subject: [PATCH 12/15] lint --- aggrec/aggregates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index 3b9f89e..b373b1c 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -3,7 +3,7 @@ from datetime import datetime, timezone from enum import Enum from functools import lru_cache -from typing import Annotated, Dict, Optional +from typing import Annotated, Dict from urllib.parse import urljoin import aiobotocore.session From 9d454ef217a43c5beb21c2b7459bc39e0ca7ac4f Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Wed, 15 Nov 2023 14:23:01 +0100 Subject: [PATCH 13/15] init --- Dockerfile | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5deecca --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11 AS builder +RUN pip3 install poetry +WORKDIR /src +ADD . /src +RUN poetry build + +FROM python:3.11 +WORKDIR /tmp +COPY --from=builder /src/dist/*.whl . +RUN pip3 install *.whl && rm *.whl +ENTRYPOINT aggrec_server From fbd7d04a740f6ff7767f80efde9b19d23e8b7991 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Thu, 16 Nov 2023 09:24:54 +0100 Subject: [PATCH 14/15] use utc for s3 object key --- aggrec/aggregates.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index b373b1c..be56f29 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -161,6 +161,7 @@ def get_new_aggregate_event_message( def get_s3_object_key(metadata: AggregateMetadata) -> str: """Get S3 object key from metadata""" dt = metadata.aggregate_interval_start or metadata.id.generation_time + dt = dt.astimezone(tz=timezone.utc) fields_dict = { "type": metadata.aggregate_type.name.lower(), "year": f"{dt.year:04}", From 66c1e23598068b3483461da72d6217bfe106c974 Mon Sep 17 00:00:00 2001 From: Jakob Schlyter Date: Thu, 16 Nov 2023 10:40:55 +0100 Subject: [PATCH 15/15] include all relevant headers closes #6 --- aggrec/aggregates.py | 23 +++++++++++++++++------ aggrec/helpers.py | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index be56f29..e3aae41 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -1,9 +1,10 @@ import json import logging +import re from datetime import datetime, timezone from enum import Enum from functools import lru_cache -from typing import Annotated, Dict +from typing import Annotated, Dict, List from urllib.parse import urljoin import aiobotocore.session @@ -30,7 +31,6 @@ "Content-Encoding", "Signature", "Signature-Input", - "Aggregate-Interval", ] ALLOWED_AGGREGATE_TYPES = ["histogram", "vector"] @@ -113,10 +113,19 @@ async def mqtt_client(settings: Annotated[Settings, Depends(get_settings)]): yield client -def get_http_headers(request: Request) -> Dict[str, str]: +def get_http_headers( + request: Request, covered_components_headers: List[str] +) -> Dict[str, str]: """Get dictionary of relevant metadata HTTP headers""" + + relevant_headers = set([header.lower() for header in METADATA_HTTP_HEADERS]) + + for header in covered_components_headers: + if match := re.match(r"^\"([^@].+)\"$", header): + relevant_headers.add(match.group(1)) + res = {} - for header in METADATA_HTTP_HEADERS: + for header in relevant_headers: if value := request.headers.get(header): res[header] = value return res @@ -195,9 +204,11 @@ async def create_aggregate( res = await http_request_verifier.verify(request) - creator = res.get("keyid") + creator = res.parameters.get("keyid") logger.info("Create aggregate request by keyid=%s", creator) + http_headers = get_http_headers(request, res.covered_components.keys()) + aggregate_id = ObjectId() location = f"/api/v1/aggregates/{aggregate_id}" @@ -221,7 +232,7 @@ async def create_aggregate( aggregate_interval_start=aggregate_interval_start, aggregate_interval_duration=aggregate_interval_duration, creator=creator, - http_headers=get_http_headers(request), + http_headers=http_headers, content_type=content_type, s3_bucket=s3_bucket, ) diff --git a/aggrec/helpers.py b/aggrec/helpers.py index 31fa946..54a1d6a 100644 --- a/aggrec/helpers.py +++ b/aggrec/helpers.py @@ -66,7 +66,7 @@ async def verify_content_digest(self, result: VerifyResult, request: Request): raise UnsupportedContentDigestAlgorithm raise ContentDigestMissing - async def verify(self, request: Request) -> dict: + async def verify(self, request: Request) -> VerifyResult: """Verify request and return signer""" verifier = HTTPMessageVerifier( signature_algorithm=self.algorithm, @@ -90,7 +90,7 @@ async def verify(self, request: Request) -> dict: try: await self.verify_content_digest(result, request) self.logger.debug("Content-Digest verified") - return result.parameters + return result except InvalidContentDigest: raise HTTPException( status.HTTP_401_UNAUTHORIZED, "Content-Digest verification failed"