diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5deecca --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11 AS builder +RUN pip3 install poetry +WORKDIR /src +ADD . /src +RUN poetry build + +FROM python:3.11 +WORKDIR /tmp +COPY --from=builder /src/dist/*.whl . +RUN pip3 install *.whl && rm *.whl +ENTRYPOINT aggrec_server diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index a0f5337..e3aae41 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -1,21 +1,24 @@ import json import logging +import re +from datetime import datetime, timezone from enum import Enum from functools import lru_cache -from typing import Annotated, Dict +from typing import Annotated, Dict, List from urllib.parse import urljoin import aiobotocore.session import aiomqtt import boto3 import bson +import pendulum from bson.objectid import ObjectId from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from fastapi.responses import StreamingResponse -from pydantic import BaseModel +from pydantic import BaseModel, Field from .db_models import AggregateMetadata -from .helpers import RequestVerifier, rfc_3339_datetime_now +from .helpers import RequestVerifier, pendulum_as_datetime, rfc_3339_datetime_now from .settings import Settings logger = logging.getLogger(__name__) @@ -43,16 +46,22 @@ class AggregateType(str, Enum): class AggregateMetadataResponse(BaseModel): - aggregate_id: str - aggregate_type: AggregateType - created: str - creator: str - headers: dict - content_type: str - content_length: int - content_location: str - s3_bucket: str - s3_object_key: str + aggregate_id: str = Field(title="Aggregate identifier") + aggregate_type: AggregateType = Field(title="Aggregate type") + created: datetime = Field(title="Aggregate creation timestamp") + creator: str = Field(title="Aggregate creator") + headers: dict = Field(title="Dictionary of relevant HTTP headers") + content_type: str = Field(title="Content MIME type") + content_length: int = Field(title="Content length") + content_location: str = Field(title="Content local (URL)") + s3_bucket: str = Field(title="S3 Bucket Name") + s3_object_key: str = Field(title="S3 Object Key") + aggregate_interval_start: datetime | None = Field( + default=None, title="Aggregate interval start" + ) + aggregate_interval_duration: int | None = Field( + default=None, title="Aggregate interval duration (seconds)" + ) @classmethod def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): @@ -60,7 +69,9 @@ def from_db_model(cls, metadata: AggregateMetadata, settings: Settings): return cls( aggregate_id=aggregate_id, aggregate_type=metadata.aggregate_type.value, - created=metadata.id.generation_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + aggregate_interval_start=metadata.aggregate_interval_start, + aggregate_interval_duration=metadata.aggregate_interval_duration, + created=metadata.id.generation_time, creator=str(metadata.creator), headers=metadata.http_headers, content_type=metadata.content_type, @@ -102,10 +113,19 @@ async def mqtt_client(settings: Annotated[Settings, Depends(get_settings)]): yield client -def get_http_headers(request: Request) -> Dict[str, str]: +def get_http_headers( + request: Request, covered_components_headers: List[str] +) -> Dict[str, str]: """Get dictionary of relevant metadata HTTP headers""" + + relevant_headers = set([header.lower() for header in METADATA_HTTP_HEADERS]) + + for header in covered_components_headers: + if match := re.match(r"^\"([^@].+)\"$", header): + relevant_headers.add(match.group(1)) + res = {} - for header in METADATA_HTTP_HEADERS: + for header in relevant_headers: if value := request.headers.get(header): res[header] = value return res @@ -133,22 +153,36 @@ def get_new_aggregate_event_message( ), "s3_bucket": metadata.s3_bucket, "s3_object_key": metadata.s3_object_key, + **( + { + "interval_start": metadata.aggregate_interval_start.astimezone( + tz=timezone.utc + ).strftime("%Y-%m-%dT%H:%M:%SZ"), + "interval_duration": metadata.aggregate_interval_duration, + } + if metadata.aggregate_interval_start + and metadata.aggregate_interval_duration + else {} + ), } def get_s3_object_key(metadata: AggregateMetadata) -> str: """Get S3 object key from metadata""" - dt = metadata.id.generation_time - return "/".join( - [ - f"type={metadata.aggregate_type.name.lower()}", - f"year={dt.year}", - f"month={dt.month}", - f"day={dt.day}", - f"creator={metadata.creator}", - f"id={metadata.id}", - ] - ) + dt = metadata.aggregate_interval_start or metadata.id.generation_time + dt = dt.astimezone(tz=timezone.utc) + fields_dict = { + "type": metadata.aggregate_type.name.lower(), + "year": f"{dt.year:04}", + "month": f"{dt.month:02}", + "day": f"{dt.day:02}", + "hour": f"{dt.hour:02}", + "minute": f"{dt.minute:02}", + "creator": metadata.creator, + "id": metadata.id, + } + fields_list = [f"{k}={v}" for k, v in fields_dict.items() if v is not None] + return "/".join(fields_list) @router.post("/api/v1/aggregate/{aggregate_type}") @@ -170,19 +204,35 @@ async def create_aggregate( res = await http_request_verifier.verify(request) - creator = res.get("keyid") + creator = res.parameters.get("keyid") logger.info("Create aggregate request by keyid=%s", creator) + http_headers = get_http_headers(request, res.covered_components.keys()) + aggregate_id = ObjectId() location = f"/api/v1/aggregates/{aggregate_id}" s3_bucket = settings.s3_bucket + if aggregate_interval := request.headers.get("Aggregate-Interval"): + period = pendulum.parse(aggregate_interval) + if not isinstance(period, pendulum.Period): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "Invalid Aggregate-Interval" + ) + aggregate_interval_start = pendulum_as_datetime(period.start) + aggregate_interval_duration = period.start.diff(period.end).in_seconds() + else: + aggregate_interval_start = None + aggregate_interval_duration = None + metadata = AggregateMetadata( id=aggregate_id, aggregate_type=aggregate_type, + aggregate_interval_start=aggregate_interval_start, + aggregate_interval_duration=aggregate_interval_duration, creator=creator, - http_headers=get_http_headers(request), + http_headers=http_headers, content_type=content_type, s3_bucket=s3_bucket, ) diff --git a/aggrec/client.py b/aggrec/client.py index e19dfff..d6c12d5 100644 --- a/aggrec/client.py +++ b/aggrec/client.py @@ -6,6 +6,7 @@ from urllib.parse import urljoin import http_sfv +import pendulum import requests from cryptography.hazmat.primitives.serialization import load_pem_private_key from http_message_signatures import ( @@ -15,7 +16,12 @@ ) DEFAULT_CONTENT_TYPE = "application/vnd.apache.parquet" -DEFAULT_COVERED_COMPONENT_IDS = ["content-type", "content-digest", "content-length"] +DEFAULT_COVERED_COMPONENT_IDS = [ + "content-type", + "content-digest", + "content-length", + "aggregate-interval", +] class MyHTTPSignatureKeyResolver(HTTPSignatureKeyResolver): @@ -32,11 +38,19 @@ def main() -> None: parser = argparse.ArgumentParser(description="Aggregate Sender") + default_interval = f"{pendulum.now().to_iso8601_string()}/PT1M" + parser.add_argument( "aggregate", metavar="filename", help="Aggregate payload", ) + parser.add_argument( + "--interval", + metavar="interval", + help="Aggregate interval", + default=default_interval, + ) parser.add_argument( "--tls-cert-file", metavar="filename", @@ -105,6 +119,8 @@ def main() -> None: req.headers["Content-Encoding"] = "gzip" covered_component_ids.append("content-encoding") + req.headers["Aggregate-Interval"] = args.interval + req = req.prepare() req.headers["Content-Type"] = DEFAULT_CONTENT_TYPE req.headers["Content-Digest"] = str( @@ -142,7 +158,7 @@ def main() -> None: resp.raise_for_status() print(resp) print(resp.headers) - print(json.loads(resp.content)) + print(json.dumps(json.loads(resp.content), indent=4)) resp = session.get(resp.json()["content_location"]) resp.raise_for_status() diff --git a/aggrec/db_models.py b/aggrec/db_models.py index a46d171..318a246 100644 --- a/aggrec/db_models.py +++ b/aggrec/db_models.py @@ -1,6 +1,13 @@ from enum import Enum -from mongoengine import DictField, Document, EnumField, IntField, StringField +from mongoengine import ( + DateTimeField, + DictField, + Document, + EnumField, + IntField, + StringField, +) class AggregateType(Enum): @@ -22,3 +29,6 @@ class AggregateMetadata(Document): s3_bucket = StringField() s3_object_key = StringField() + + aggregate_interval_start = DateTimeField() + aggregate_interval_duration = IntField() diff --git a/aggrec/helpers.py b/aggrec/helpers.py index fffb1a8..54a1d6a 100644 --- a/aggrec/helpers.py +++ b/aggrec/helpers.py @@ -3,6 +3,7 @@ from datetime import datetime, timezone import http_sfv +import pendulum from cryptography.hazmat.primitives.serialization import load_pem_public_key from fastapi import HTTPException, Request, status from http_message_signatures import ( @@ -65,7 +66,7 @@ async def verify_content_digest(self, result: VerifyResult, request: Request): raise UnsupportedContentDigestAlgorithm raise ContentDigestMissing - async def verify(self, request: Request) -> dict: + async def verify(self, request: Request) -> VerifyResult: """Verify request and return signer""" verifier = HTTPMessageVerifier( signature_algorithm=self.algorithm, @@ -89,7 +90,7 @@ async def verify(self, request: Request) -> dict: try: await self.verify_content_digest(result, request) self.logger.debug("Content-Digest verified") - return result.parameters + return result except InvalidContentDigest: raise HTTPException( status.HTTP_401_UNAUTHORIZED, "Content-Digest verification failed" @@ -107,3 +108,16 @@ async def verify(self, request: Request) -> dict: def rfc_3339_datetime_now() -> str: """Return current time(UTC) as ISO 8601 timestamp""" return str(datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")) + + +def pendulum_as_datetime(dt: pendulum.DateTime) -> datetime: + return datetime( + year=dt.year, + month=dt.month, + day=dt.day, + hour=dt.hour, + minute=dt.minute, + second=dt.second, + microsecond=dt.microsecond, + tzinfo=dt.tzinfo, + ) diff --git a/aggrec/openapi.yaml b/aggrec/openapi.yaml index e1cf585..af305cc 100644 --- a/aggrec/openapi.yaml +++ b/aggrec/openapi.yaml @@ -52,6 +52,13 @@ paths: required: false schema: type: string + - name: Aggregate-Interval + description: Aggregate window as an ISO 8601 time interval (start and duration) + in: header + required: false + schema: + type: string + example: "1984-01-01T12:00:00Z/PT1M" requestBody: description: Aggregate as Apache Parquet content: @@ -117,15 +124,15 @@ components: description: Aggregate metadata type: object required: - - id + - aggregate_id - aggregate_type - created - content_type - content_payload_location properties: - id: + aggregate_id: $ref: '#/components/schemas/aggregate_id' - type: + aggregate_type: $ref: '#/components/schemas/aggregate_type' created: type: string @@ -147,6 +154,13 @@ components: type: string s3_object_key: type: string + aggregate_interval_start: + type: string + format: date-time + aggregate_interval_duration: + type: integer + format: int64 + minimum: 0 aggregate_id: type: string diff --git a/poetry.lock b/poetry.lock index 34dc434..9cdd502 100644 --- a/poetry.lock +++ b/poetry.lock @@ -978,6 +978,40 @@ files = [ {file = "pathspec-0.11.2.tar.gz", hash = "sha256:e0d8d0ac2f12da61956eb2306b69f9469b42f4deb0f3cb6ed47b9cce9996ced3"}, ] +[[package]] +name = "pendulum" +version = "2.1.2" +description = "Python datetimes made easy" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +files = [ + {file = "pendulum-2.1.2-cp27-cp27m-macosx_10_15_x86_64.whl", hash = "sha256:b6c352f4bd32dff1ea7066bd31ad0f71f8d8100b9ff709fb343f3b86cee43efe"}, + {file = "pendulum-2.1.2-cp27-cp27m-win_amd64.whl", hash = "sha256:318f72f62e8e23cd6660dbafe1e346950281a9aed144b5c596b2ddabc1d19739"}, + {file = "pendulum-2.1.2-cp35-cp35m-macosx_10_15_x86_64.whl", hash = "sha256:0731f0c661a3cb779d398803655494893c9f581f6488048b3fb629c2342b5394"}, + {file = "pendulum-2.1.2-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:3481fad1dc3f6f6738bd575a951d3c15d4b4ce7c82dce37cf8ac1483fde6e8b0"}, + {file = "pendulum-2.1.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:9702069c694306297ed362ce7e3c1ef8404ac8ede39f9b28b7c1a7ad8c3959e3"}, + {file = "pendulum-2.1.2-cp35-cp35m-win_amd64.whl", hash = "sha256:fb53ffa0085002ddd43b6ca61a7b34f2d4d7c3ed66f931fe599e1a531b42af9b"}, + {file = "pendulum-2.1.2-cp36-cp36m-macosx_10_15_x86_64.whl", hash = "sha256:c501749fdd3d6f9e726086bf0cd4437281ed47e7bca132ddb522f86a1645d360"}, + {file = "pendulum-2.1.2-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:c807a578a532eeb226150d5006f156632df2cc8c5693d778324b43ff8c515dd0"}, + {file = "pendulum-2.1.2-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:2d1619a721df661e506eff8db8614016f0720ac171fe80dda1333ee44e684087"}, + {file = "pendulum-2.1.2-cp36-cp36m-win_amd64.whl", hash = "sha256:f888f2d2909a414680a29ae74d0592758f2b9fcdee3549887779cd4055e975db"}, + {file = "pendulum-2.1.2-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:e95d329384717c7bf627bf27e204bc3b15c8238fa8d9d9781d93712776c14002"}, + {file = "pendulum-2.1.2-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:4c9c689747f39d0d02a9f94fcee737b34a5773803a64a5fdb046ee9cac7442c5"}, + {file = "pendulum-2.1.2-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:1245cd0075a3c6d889f581f6325dd8404aca5884dea7223a5566c38aab94642b"}, + {file = "pendulum-2.1.2-cp37-cp37m-win_amd64.whl", hash = "sha256:db0a40d8bcd27b4fb46676e8eb3c732c67a5a5e6bfab8927028224fbced0b40b"}, + {file = "pendulum-2.1.2-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:f5e236e7730cab1644e1b87aca3d2ff3e375a608542e90fe25685dae46310116"}, + {file = "pendulum-2.1.2-cp38-cp38-manylinux1_i686.whl", hash = "sha256:de42ea3e2943171a9e95141f2eecf972480636e8e484ccffaf1e833929e9e052"}, + {file = "pendulum-2.1.2-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7c5ec650cb4bec4c63a89a0242cc8c3cebcec92fcfe937c417ba18277d8560be"}, + {file = "pendulum-2.1.2-cp38-cp38-win_amd64.whl", hash = "sha256:33fb61601083f3eb1d15edeb45274f73c63b3c44a8524703dc143f4212bf3269"}, + {file = "pendulum-2.1.2-cp39-cp39-manylinux1_i686.whl", hash = "sha256:29c40a6f2942376185728c9a0347d7c0f07905638c83007e1d262781f1e6953a"}, + {file = "pendulum-2.1.2-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:94b1fc947bfe38579b28e1cccb36f7e28a15e841f30384b5ad6c5e31055c85d7"}, + {file = "pendulum-2.1.2.tar.gz", hash = "sha256:b06a0ca1bfe41c990bbf0c029f0b6501a7f2ec4e38bfec730712015e8860f207"}, +] + +[package.dependencies] +python-dateutil = ">=2.6,<3.0" +pytzdata = ">=2020.1" + [[package]] name = "platformdirs" version = "4.0.0" @@ -1412,6 +1446,17 @@ files = [ [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "pytzdata" +version = "2020.1" +description = "The Olson timezone database for Python." +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +files = [ + {file = "pytzdata-2020.1-py2.py3-none-any.whl", hash = "sha256:e1e14750bcf95016381e4d472bad004eef710f2d6417240904070b3d6654485f"}, + {file = "pytzdata-2020.1.tar.gz", hash = "sha256:3efa13b335a00a8de1d345ae41ec78dd11c9f8807f522d39850f2dd828681540"}, +] + [[package]] name = "requests" version = "2.31.0" @@ -1759,4 +1804,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "87389ad5cafc8044ece42cfacab2bf2bae0bfffa10ff0e77beec21e90f16d558" +content-hash = "c3ff6916296c736ad5c2c391dde076bd41148cc79d5ecf8bd4bfba1a5fc0cf9b" diff --git a/pyproject.toml b/pyproject.toml index 939d2f9..dbd7b03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aggrec" -version = "0.4.0" +version = "0.5.0" description = "DNS TAPIR Aggregate Receiver" authors = ["Jakob Schlyter "] readme = "README.md" @@ -23,6 +23,7 @@ aiobotocore = "^2.7.0" aiomqtt = "^1.2.1" setuptools = "^68.2.2" jsonformatter = "^0.3.2" +pendulum = "^2.1.2" [tool.poetry.group.dev.dependencies] isort = "^5.12.0"