Skip to content

Commit

Permalink
add aggregate interval
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Nov 15, 2023
1 parent 9fd5b39 commit b4a03ce
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 17 deletions.
65 changes: 52 additions & 13 deletions aggrec/aggregates.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import json
import logging
from datetime import datetime, timezone
from enum import Enum
from functools import lru_cache
from typing import Annotated, Dict
from typing import Annotated, Dict, Optional
from urllib.parse import urljoin

import aiobotocore.session
import aiomqtt
import boto3
import bson
import pendulum
from bson.objectid import ObjectId
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from .db_models import AggregateMetadata
from .helpers import RequestVerifier, rfc_3339_datetime_now
from .helpers import RequestVerifier, pendulum_as_datetime, rfc_3339_datetime_now
from .settings import Settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -53,13 +55,19 @@ class AggregateMetadataResponse(BaseModel):
content_location: str
s3_bucket: str
s3_object_key: str
aggregate_interval: Optional[str] = None
aggregate_interval_start: Optional[datetime] = None
aggregate_interval_duration: Optional[int] = None

@classmethod
def from_db_model(cls, metadata: AggregateMetadata, settings: Settings):
aggregate_id = str(metadata.id)
return cls(
aggregate_id=aggregate_id,
aggregate_type=metadata.aggregate_type.value,
aggregate_interval=metadata.aggregate_interval,
aggregate_interval_start=metadata.aggregate_interval_start,
aggregate_interval_duration=metadata.aggregate_interval_duration,
created=metadata.id.generation_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
creator=str(metadata.creator),
headers=metadata.http_headers,
Expand Down Expand Up @@ -133,22 +141,38 @@ def get_new_aggregate_event_message(
),
"s3_bucket": metadata.s3_bucket,
"s3_object_key": metadata.s3_object_key,
**(
{
"aggregate_interval": metadata.aggregate_interval,
"aggregate_interval_start": metadata.aggregate_interval_start.astimezone(
tz=timezone.utc
).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
"aggregate_interval_duration": metadata.aggregate_interval_duration,
}
if metadata.aggregate_interval
else {}
),
}


def get_s3_object_key(metadata: AggregateMetadata) -> str:
"""Get S3 object key from metadata"""
dt = metadata.id.generation_time
return "/".join(
[
f"type={metadata.aggregate_type.name.lower()}",
f"year={dt.year}",
f"month={dt.month}",
f"day={dt.day}",
f"creator={metadata.creator}",
f"id={metadata.id}",
]
)
dt = metadata.aggregate_interval_start or metadata.id.generation_time
fields_dict = {
"type": metadata.aggregate_type.name.lower(),
"length": metadata.aggregate_interval_duration,
"year": dt.year,
"month": dt.month,
"day": dt.day,
"hour": dt.minute,
"minute": dt.second,
"creator": metadata.creator,
"id": metadata.id,
}
fields_list = [f"{k}={v}" for k, v in fields_dict.items() if v is not None]
return "/".join(fields_list)


@router.post("/api/v1/aggregate/{aggregate_type}")
Expand Down Expand Up @@ -178,9 +202,24 @@ async def create_aggregate(

s3_bucket = settings.s3_bucket

if aggregate_interval := request.headers.get("Aggregate-Interval"):
period = pendulum.parse(aggregate_interval)
if not isinstance(period, pendulum.Period):
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Invalid Aggregate-Interval"
)
aggregate_interval_start = pendulum_as_datetime(period.start)
aggregate_interval_duration = period.start.diff(period.end).in_seconds()
else:
aggregate_interval_start = None
aggregate_interval_duration = None

metadata = AggregateMetadata(
id=aggregate_id,
aggregate_type=aggregate_type,
aggregate_interval=aggregate_interval,
aggregate_interval_start=aggregate_interval_start,
aggregate_interval_duration=aggregate_interval_duration,
creator=creator,
http_headers=get_http_headers(request),
content_type=content_type,
Expand Down
13 changes: 12 additions & 1 deletion aggrec/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from urllib.parse import urljoin

import http_sfv
import pendulum
import requests
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from http_message_signatures import (
Expand All @@ -32,11 +33,19 @@ def main() -> None:

parser = argparse.ArgumentParser(description="Aggregate Sender")

default_interval = f"{pendulum.now().to_iso8601_string()}/PT1M"

parser.add_argument(
"aggregate",
metavar="filename",
help="Aggregate payload",
)
parser.add_argument(
"--interval",
metavar="interval",
help="Aggregate interval",
default=default_interval,
)
parser.add_argument(
"--tls-cert-file",
metavar="filename",
Expand Down Expand Up @@ -105,6 +114,8 @@ def main() -> None:
req.headers["Content-Encoding"] = "gzip"
covered_component_ids.append("content-encoding")

req.headers["Aggregate-Interval"] = args.interval

req = req.prepare()
req.headers["Content-Type"] = DEFAULT_CONTENT_TYPE
req.headers["Content-Digest"] = str(
Expand Down Expand Up @@ -142,7 +153,7 @@ def main() -> None:
resp.raise_for_status()
print(resp)
print(resp.headers)
print(json.loads(resp.content))
print(json.dumps(json.loads(resp.content), indent=4))

resp = session.get(resp.json()["content_location"])
resp.raise_for_status()
Expand Down
13 changes: 12 additions & 1 deletion aggrec/db_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from enum import Enum

from mongoengine import DictField, Document, EnumField, IntField, StringField
from mongoengine import (
DateTimeField,
DictField,
Document,
EnumField,
IntField,
StringField,
)


class AggregateType(Enum):
Expand All @@ -22,3 +29,7 @@ class AggregateMetadata(Document):

s3_bucket = StringField()
s3_object_key = StringField()

aggregate_interval = StringField()
aggregate_interval_start = DateTimeField()
aggregate_interval_duration = IntField()
14 changes: 14 additions & 0 deletions aggrec/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime, timezone

import http_sfv
import pendulum
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from fastapi import HTTPException, Request, status
from http_message_signatures import (
Expand Down Expand Up @@ -107,3 +108,16 @@ async def verify(self, request: Request) -> dict:
def rfc_3339_datetime_now() -> str:
"""Return current time(UTC) as ISO 8601 timestamp"""
return str(datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))


def pendulum_as_datetime(dt: pendulum.DateTime) -> datetime:
return datetime(
year=dt.year,
month=dt.month,
day=dt.day,
hour=dt.hour,
minute=dt.minute,
second=dt.second,
microsecond=dt.microsecond,
tzinfo=dt.tzinfo,
)
7 changes: 7 additions & 0 deletions aggrec/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ paths:
required: false
schema:
type: string
- name: Aggregate-Interval
description: Aggregate window as an ISO 8601 time interval (start and duration)
in: header
required: false
schema:
type: string
example: "1984-01-01T12:00:00Z/PT1M"
requestBody:
description: Aggregate as Apache Parquet
content:
Expand Down
47 changes: 46 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aggrec"
version = "0.4.0"
version = "0.5.0"
description = "DNS TAPIR Aggregate Receiver"
authors = ["Jakob Schlyter <[email protected]>"]
readme = "README.md"
Expand All @@ -23,6 +23,7 @@ aiobotocore = "^2.7.0"
aiomqtt = "^1.2.1"
setuptools = "^68.2.2"
jsonformatter = "^0.3.2"
pendulum = "^2.1.2"

[tool.poetry.group.dev.dependencies]
isort = "^5.12.0"
Expand Down

0 comments on commit b4a03ce

Please sign in to comment.