Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add aggregate interval #5

Merged
merged 15 commits into from
Nov 16, 2023
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.11 AS builder
RUN pip3 install poetry
WORKDIR /src
ADD . /src
RUN poetry build

FROM python:3.11
WORKDIR /tmp
COPY --from=builder /src/dist/*.whl .
RUN pip3 install *.whl && rm *.whl
ENTRYPOINT aggrec_server
108 changes: 79 additions & 29 deletions aggrec/aggregates.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import json
import logging
import re
from datetime import datetime, timezone
from enum import Enum
from functools import lru_cache
from typing import Annotated, Dict
from typing import Annotated, Dict, List
from urllib.parse import urljoin

import aiobotocore.session
import aiomqtt
import boto3
import bson
import pendulum
from bson.objectid import ObjectId
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from pydantic import BaseModel, Field

from .db_models import AggregateMetadata
from .helpers import RequestVerifier, rfc_3339_datetime_now
from .helpers import RequestVerifier, pendulum_as_datetime, rfc_3339_datetime_now
from .settings import Settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,24 +46,32 @@ class AggregateType(str, Enum):


class AggregateMetadataResponse(BaseModel):
aggregate_id: str
aggregate_type: AggregateType
created: str
creator: str
headers: dict
content_type: str
content_length: int
content_location: str
s3_bucket: str
s3_object_key: str
aggregate_id: str = Field(title="Aggregate identifier")
aggregate_type: AggregateType = Field(title="Aggregate type")
created: datetime = Field(title="Aggregate creation timestamp")
creator: str = Field(title="Aggregate creator")
headers: dict = Field(title="Dictionary of relevant HTTP headers")
content_type: str = Field(title="Content MIME type")
content_length: int = Field(title="Content length")
content_location: str = Field(title="Content local (URL)")
s3_bucket: str = Field(title="S3 Bucket Name")
s3_object_key: str = Field(title="S3 Object Key")
aggregate_interval_start: datetime | None = Field(
default=None, title="Aggregate interval start"
)
aggregate_interval_duration: int | None = Field(
default=None, title="Aggregate interval duration (seconds)"
)

@classmethod
def from_db_model(cls, metadata: AggregateMetadata, settings: Settings):
aggregate_id = str(metadata.id)
return cls(
aggregate_id=aggregate_id,
aggregate_type=metadata.aggregate_type.value,
created=metadata.id.generation_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
aggregate_interval_start=metadata.aggregate_interval_start,
aggregate_interval_duration=metadata.aggregate_interval_duration,
created=metadata.id.generation_time,
creator=str(metadata.creator),
headers=metadata.http_headers,
content_type=metadata.content_type,
Expand Down Expand Up @@ -102,10 +113,19 @@ async def mqtt_client(settings: Annotated[Settings, Depends(get_settings)]):
yield client


def get_http_headers(request: Request) -> Dict[str, str]:
def get_http_headers(
request: Request, covered_components_headers: List[str]
) -> Dict[str, str]:
"""Get dictionary of relevant metadata HTTP headers"""

relevant_headers = set([header.lower() for header in METADATA_HTTP_HEADERS])

for header in covered_components_headers:
if match := re.match(r"^\"([^@].+)\"$", header):
relevant_headers.add(match.group(1))

res = {}
for header in METADATA_HTTP_HEADERS:
for header in relevant_headers:
if value := request.headers.get(header):
res[header] = value
return res
Expand Down Expand Up @@ -133,22 +153,36 @@ def get_new_aggregate_event_message(
),
"s3_bucket": metadata.s3_bucket,
"s3_object_key": metadata.s3_object_key,
**(
{
"interval_start": metadata.aggregate_interval_start.astimezone(
tz=timezone.utc
).strftime("%Y-%m-%dT%H:%M:%SZ"),
"interval_duration": metadata.aggregate_interval_duration,
}
if metadata.aggregate_interval_start
and metadata.aggregate_interval_duration
else {}
),
}


def get_s3_object_key(metadata: AggregateMetadata) -> str:
"""Get S3 object key from metadata"""
dt = metadata.id.generation_time
return "/".join(
[
f"type={metadata.aggregate_type.name.lower()}",
f"year={dt.year}",
f"month={dt.month}",
f"day={dt.day}",
f"creator={metadata.creator}",
f"id={metadata.id}",
]
)
dt = metadata.aggregate_interval_start or metadata.id.generation_time
dt = dt.astimezone(tz=timezone.utc)
fields_dict = {
"type": metadata.aggregate_type.name.lower(),
"year": f"{dt.year:04}",
"month": f"{dt.month:02}",
"day": f"{dt.day:02}",
"hour": f"{dt.hour:02}",
"minute": f"{dt.minute:02}",
"creator": metadata.creator,
"id": metadata.id,
}
fields_list = [f"{k}={v}" for k, v in fields_dict.items() if v is not None]
return "/".join(fields_list)


@router.post("/api/v1/aggregate/{aggregate_type}")
Expand All @@ -170,19 +204,35 @@ async def create_aggregate(

res = await http_request_verifier.verify(request)

creator = res.get("keyid")
creator = res.parameters.get("keyid")
logger.info("Create aggregate request by keyid=%s", creator)

http_headers = get_http_headers(request, res.covered_components.keys())

aggregate_id = ObjectId()
location = f"/api/v1/aggregates/{aggregate_id}"

s3_bucket = settings.s3_bucket

if aggregate_interval := request.headers.get("Aggregate-Interval"):
period = pendulum.parse(aggregate_interval)
if not isinstance(period, pendulum.Period):
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Invalid Aggregate-Interval"
)
aggregate_interval_start = pendulum_as_datetime(period.start)
aggregate_interval_duration = period.start.diff(period.end).in_seconds()
else:
aggregate_interval_start = None
aggregate_interval_duration = None

metadata = AggregateMetadata(
id=aggregate_id,
aggregate_type=aggregate_type,
aggregate_interval_start=aggregate_interval_start,
aggregate_interval_duration=aggregate_interval_duration,
creator=creator,
http_headers=get_http_headers(request),
http_headers=http_headers,
content_type=content_type,
s3_bucket=s3_bucket,
)
Expand Down
20 changes: 18 additions & 2 deletions aggrec/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from urllib.parse import urljoin

import http_sfv
import pendulum
import requests
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from http_message_signatures import (
Expand All @@ -15,7 +16,12 @@
)

DEFAULT_CONTENT_TYPE = "application/vnd.apache.parquet"
DEFAULT_COVERED_COMPONENT_IDS = ["content-type", "content-digest", "content-length"]
DEFAULT_COVERED_COMPONENT_IDS = [
"content-type",
"content-digest",
"content-length",
"aggregate-interval",
]


class MyHTTPSignatureKeyResolver(HTTPSignatureKeyResolver):
Expand All @@ -32,11 +38,19 @@ def main() -> None:

parser = argparse.ArgumentParser(description="Aggregate Sender")

default_interval = f"{pendulum.now().to_iso8601_string()}/PT1M"

parser.add_argument(
"aggregate",
metavar="filename",
help="Aggregate payload",
)
parser.add_argument(
"--interval",
metavar="interval",
help="Aggregate interval",
default=default_interval,
)
parser.add_argument(
"--tls-cert-file",
metavar="filename",
Expand Down Expand Up @@ -105,6 +119,8 @@ def main() -> None:
req.headers["Content-Encoding"] = "gzip"
covered_component_ids.append("content-encoding")

req.headers["Aggregate-Interval"] = args.interval

req = req.prepare()
req.headers["Content-Type"] = DEFAULT_CONTENT_TYPE
req.headers["Content-Digest"] = str(
Expand Down Expand Up @@ -142,7 +158,7 @@ def main() -> None:
resp.raise_for_status()
print(resp)
print(resp.headers)
print(json.loads(resp.content))
print(json.dumps(json.loads(resp.content), indent=4))

resp = session.get(resp.json()["content_location"])
resp.raise_for_status()
Expand Down
12 changes: 11 additions & 1 deletion aggrec/db_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from enum import Enum

from mongoengine import DictField, Document, EnumField, IntField, StringField
from mongoengine import (
DateTimeField,
DictField,
Document,
EnumField,
IntField,
StringField,
)


class AggregateType(Enum):
Expand All @@ -22,3 +29,6 @@ class AggregateMetadata(Document):

s3_bucket = StringField()
s3_object_key = StringField()

aggregate_interval_start = DateTimeField()
aggregate_interval_duration = IntField()
18 changes: 16 additions & 2 deletions aggrec/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime, timezone

import http_sfv
import pendulum
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from fastapi import HTTPException, Request, status
from http_message_signatures import (
Expand Down Expand Up @@ -65,7 +66,7 @@ async def verify_content_digest(self, result: VerifyResult, request: Request):
raise UnsupportedContentDigestAlgorithm
raise ContentDigestMissing

async def verify(self, request: Request) -> dict:
async def verify(self, request: Request) -> VerifyResult:
"""Verify request and return signer"""
verifier = HTTPMessageVerifier(
signature_algorithm=self.algorithm,
Expand All @@ -89,7 +90,7 @@ async def verify(self, request: Request) -> dict:
try:
await self.verify_content_digest(result, request)
self.logger.debug("Content-Digest verified")
return result.parameters
return result
except InvalidContentDigest:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, "Content-Digest verification failed"
Expand All @@ -107,3 +108,16 @@ async def verify(self, request: Request) -> dict:
def rfc_3339_datetime_now() -> str:
"""Return current time(UTC) as ISO 8601 timestamp"""
return str(datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))


def pendulum_as_datetime(dt: pendulum.DateTime) -> datetime:
return datetime(
year=dt.year,
month=dt.month,
day=dt.day,
hour=dt.hour,
minute=dt.minute,
second=dt.second,
microsecond=dt.microsecond,
tzinfo=dt.tzinfo,
)
20 changes: 17 additions & 3 deletions aggrec/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ paths:
required: false
schema:
type: string
- name: Aggregate-Interval
description: Aggregate window as an ISO 8601 time interval (start and duration)
in: header
required: false
schema:
type: string
example: "1984-01-01T12:00:00Z/PT1M"
requestBody:
description: Aggregate as Apache Parquet
content:
Expand Down Expand Up @@ -117,15 +124,15 @@ components:
description: Aggregate metadata
type: object
required:
- id
- aggregate_id
- aggregate_type
- created
- content_type
- content_payload_location
properties:
id:
aggregate_id:
$ref: '#/components/schemas/aggregate_id'
type:
aggregate_type:
$ref: '#/components/schemas/aggregate_type'
created:
type: string
Expand All @@ -147,6 +154,13 @@ components:
type: string
s3_object_key:
type: string
aggregate_interval_start:
type: string
format: date-time
aggregate_interval_duration:
type: integer
format: int64
minimum: 0

aggregate_id:
type: string
Expand Down
Loading