Skip to content

Commit

Permalink
added timestamp parsing, md5 calculations, fixed prefixes
Browse files Browse the repository at this point in the history
  • Loading branch information
krokicki committed Aug 19, 2024
1 parent 38fbae3 commit 7d42e71
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 69 deletions.
11 changes: 7 additions & 4 deletions jproxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,14 @@ async def browse_bucket(request: Request,
xml = response.body.decode("utf-8")
root = parse_xml(xml)

cps = root.find('CommonPrefixes')
common_prefixes = [dir_path(e.text) for e in cps.iter('Prefix')] if cps else []
common_prefixes = []
cps = [c for c in root.findall('CommonPrefixes')]
if cps:
for cp in cps:
common_prefixes += [dir_path(e.text) for e in cp.iter('Prefix')] if cps else []

contents = []
cs =[c for c in root.findall('Contents')]
cs = [c for c in root.findall('Contents')]
if cs:
for c in cs:
key_elem = c.find('Key')
Expand All @@ -157,7 +160,7 @@ async def browse_bucket(request: Request,

lm_elem = c.find('LastModified')
if lm_elem is not None and lm_elem.text:
content['lastmod'] = lm_elem.text
content['lastmod'] = format_isoformat_as_local(lm_elem.text)

contents.append(content)

Expand Down
82 changes: 40 additions & 42 deletions jproxy/client_aioboto.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, proxy_kwargs, **kwargs):

self.proxy_kwargs = proxy_kwargs or {}
self.target_name = self.proxy_kwargs['target_name']
self.prefix = self.proxy_kwargs.get('prefix')
self.target_prefix = self.proxy_kwargs.get('prefix')
self.bucket_name = kwargs.get('bucket', self.target_name)

self.anonymous = True
Expand Down Expand Up @@ -96,8 +96,8 @@ async def head_object(self, key: str):

@override
async def get_object(self, key: str):
if self.prefix:
key = os.path.join(self.prefix, key) if key else self.prefix
if self.target_prefix:
key = os.path.join(self.target_prefix, key) if key else self.target_prefix

filename = os.path.basename(key)
headers = {}
Expand Down Expand Up @@ -129,12 +129,13 @@ async def list_objects_v2(self,
start_after: str):

# prefix user-supplied prefix with configured prefix
if self.prefix:
prefix = os.path.join(self.prefix, prefix) if prefix else self.prefix
real_prefix = prefix
if self.target_prefix:
real_prefix = os.path.join(self.target_prefix, prefix) if prefix else self.target_prefix

# ensure the prefix ends with a slash
if prefix and not prefix.endswith('/'):
prefix += '/'
if real_prefix and not real_prefix.endswith('/'):
real_prefix += '/'

async with self.get_client_creator() as client:
try:
Expand All @@ -145,48 +146,46 @@ async def list_objects_v2(self,
"EncodingType": encoding_type,
"FetchOwner": fetch_owner,
"MaxKeys": max_keys,
"Prefix": prefix,
"Prefix": real_prefix,
"StartAfter": start_after
}
# Remove any None values because boto3 doesn't like those
params = {k: v for k, v in params.items() if v is not None}

response = await client.list_objects_v2(**params)
res_prefix = remove_prefix(self.prefix, prefix)
next_token = remove_prefix(self.prefix, response.get("NextContinuationToken", ""))
truncated = "true" if response.get("IsTruncated", False) else "false"

root = ET.Element("ListBucketResult")
add_telem(root, "IsTruncated", truncated)
add_telem(root, "Name", self.target_name)
add_telem(root, "Prefix", res_prefix)
add_telem(root, "Delimiter", delimiter)
add_telem(root, "MaxKeys", max_keys)
add_telem(root, "EncodingType", encoding_type)
add_telem(root, "KeyCount", response.get("KeyCount", 0))
add_telem(root, "ContinuationToken", continuation_token)
add_telem(root, "NextContinuationToken", next_token)
add_telem(root, "StartAfter", start_after)

common_prefixes = add_elem(root, "CommonPrefixes")
for cp in response.get("CommonPrefixes", []):
common_prefix = remove_prefix(self.prefix, cp["Prefix"])
add_telem(common_prefixes, "Prefix", common_prefix)
next_token = remove_prefix(self.target_prefix, response.get("NextContinuationToken", ""))
is_truncated = "true" if response.get("IsTruncated", False) else "false"

contents = []
for obj in response.get("Contents", []):
contents = add_elem(root, "Contents")
add_telem(contents, "Key", remove_prefix(self.prefix, obj["Key"]))
add_telem(contents, "LastModified", obj["LastModified"].isoformat())
add_telem(contents, "ETag", obj["ETag"])
add_telem(contents, "Size", obj["Size"])
add_telem(contents, "StorageClass", obj.get("StorageClass", ""))

if "Owner" in obj:
display_name = obj["Owner"]["DisplayName"] if "DisplayName" in obj["Owner"] else ''
owner_id = obj["Owner"]["ID"] if "ID" in obj["Owner"] else ''
owner = add_elem(root, "Owner")
add_telem(owner, "DisplayName", display_name)
add_telem(owner, "ID", owner_id)
contents.append({
'Key': remove_prefix(self.target_prefix, obj["Key"]),
'LastModified': obj["LastModified"].isoformat(),
'ETag': obj.get("ETag"),
'Size': obj.get("Size"),
'StorageClass': obj.get("StorageClass")
})
logger.info(contents)

common_prefixes = []
for cp in response.get("CommonPrefixes", []):
common_prefix = remove_prefix(self.target_prefix, cp["Prefix"])
common_prefixes.append(common_prefix)

kwargs = {
'Name': self.target_name,
'Prefix': prefix,
'Delimiter': delimiter,
'MaxKeys': max_keys,
'EncodingType': encoding_type,
'KeyCount': response.get("KeyCount", 0),
'IsTruncated': is_truncated,
'ContinuationToken': continuation_token,
'NextContinuationToken': next_token,
'StartAfter': start_after
}

root = get_list_xml_elem(contents, common_prefixes, **kwargs)

xml_output = elem_to_str(root)
return Response(content=xml_output, media_type="application/xml")
Expand Down Expand Up @@ -216,7 +215,6 @@ def __init__(
self.key = key

async def stream_response(self, send) -> None:
logger.info(self.media_type)
async with self.client_creator() as client:
try:
result = await client.get_object(Bucket=self.bucket, Key=self.key)
Expand Down
62 changes: 43 additions & 19 deletions jproxy/client_file.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
import sys
import time
from hashlib import md5
from pathlib import Path
from typing_extensions import override

from loguru import logger
from fastapi.responses import Response, StreamingResponse, JSONResponse

from pathlib import Path
from jproxy.utils import *
from jproxy.client import ProxyClient

# This introduced latency and is usually not necessary
CALCULATE_ETAGS = False

def handle_exception(e, key=None):
""" Handle various cases of generic errors.
Expand Down Expand Up @@ -51,10 +53,7 @@ async def head_object(self, key: str):
stats = os.stat(path)
file_size = stats.st_size
headers["Content-Length"] = str(file_size)

last_modified_time = time.gmtime(stats.st_mtime)
last_modified = time.strftime("%a, %d %b %Y %H:%M:%S GMT", last_modified_time)
headers["Last-Modified"] = last_modified
headers["Last-Modified"] = format_timestamp_s3(stats.st_mtime)

return Response(headers=headers)
except Exception as e:
Expand Down Expand Up @@ -82,10 +81,7 @@ async def get_object(self, key: str):
stats = os.stat(path)
file_size = stats.st_size
headers["Content-Length"] = str(file_size)

last_modified_time = time.gmtime(stats.st_mtime)
last_modified = time.strftime("%a, %d %b %Y %H:%M:%S GMT", last_modified_time)
headers["Last-Modified"] = last_modified
headers["Last-Modified"] = format_timestamp_s3(stats.st_mtime)

return StreamingResponse(file_iterator(path), headers=headers, media_type=content_type)

Expand Down Expand Up @@ -122,6 +118,7 @@ async def list_objects_v2(self,

res = self.walk_path(path, continuation_token, delimiter, max_keys)
contents = res['contents']
is_truncated = res['is_truncated']
common_prefixes = sorted(res['common_prefixes'])

kwargs = {
Expand All @@ -130,13 +127,14 @@ async def list_objects_v2(self,
'Delimiter': delimiter,
'MaxKeys': max_keys,
'EncodingType': encoding_type,
'KeyCount': len(contents),
'KeyCount': len(contents) + len(common_prefixes),
'IsTruncated': is_truncated,
'ContinuationToken': continuation_token,
'NextContinuationToken': res['next_token'],
'StartAfter': start_after
}

root = get_list_xml_elem(contents, common_prefixes, **kwargs)
root = get_list_xml_elem(contents, common_prefixes, **kwargs)
return Response(content=elem_to_str(root), media_type="application/xml")

except Exception as e:
Expand All @@ -163,24 +161,41 @@ def walk_path(self, path, continuation_token, delimiter, max_keys):
started = started or continuation_token == key
logger.trace(f"found {key} (started={started}, len={len(contents)})")

if len(contents)==max_keys:
if len(contents)+len(commons) == max_keys:
# Reached max keys to be retrieved
return {
'contents': contents,
'common_prefixes': commons,
'next_token': key
'next_token': key,
'is_truncated': 'true'
}

if started:
# Get details
stats = os.stat(file_path)
file_size = stats.st_size

etag = '"48ed760a742c2263777c00b27df3024c"'
if CALCULATE_ETAGS:
# This is VERY slow because it needs to read every file it
# the 8388608 part size is used by AWS CLI and boto3
etag = f'"{calc_etag(file_path, 8388608)}"'

contents.append({
'Key': key,
'Size': str(os.stat(file_path).st_size),
'Key': remove_prefix(self.target_prefix, key),
'Size': str(file_size),
'ETag': etag,
'LastModified': format_timestamp_s3(stats.st_mtime),
'StorageClass': 'STANDARD'
})
logger.info(contents)

if started:
if started and delimiter:
# CommonPrefixes are only generated when there is a delimiter
for d in dirs:
commons.add(os.path.join(p, d))
common_prefix = dir_path(os.path.join(p, d))
common_prefix = remove_prefix(self.target_prefix, common_prefix)
commons.add(common_prefix)

if delimiter=='/':
# Do not recurse
Expand All @@ -189,5 +204,14 @@ def walk_path(self, path, continuation_token, delimiter, max_keys):
return {
'contents': contents,
'common_prefixes': commons,
'next_token': None
'next_token': None,
'is_truncated': 'false'
}

# From https://teppen.io/2018/10/23/aws_s3_verify_etags/
def calc_etag(inputfile, partsize):
md5_digests = []
with open(inputfile, 'rb') as f:
for chunk in iter(lambda: f.read(partsize), b''):
md5_digests.append(md5(chunk).digest())
return md5(b''.join(md5_digests)).hexdigest() + '-' + str(len(md5_digests))
28 changes: 24 additions & 4 deletions jproxy/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import inspect
from datetime import datetime, timezone
import xml.etree.ElementTree as ET
from mimetypes import guess_type

from dateutil import parser
from fastapi.responses import Response

# From https://stackoverflow.com/questions/1094841/get-a-human-readable-version-of-a-file-size
Expand Down Expand Up @@ -68,9 +69,9 @@ def get_list_xml_elem(contents, common_prefixes, **kwargs):
'Name',
'Prefix',
'Delimiter',
'KeyCount',
'MaxKeys',
'EncodingType',
'KeyCount',
'IsTruncated',
'ContinuationToken',
'NextContinuationToken',
Expand All @@ -81,8 +82,8 @@ def get_list_xml_elem(contents, common_prefixes, **kwargs):
add_telem(root, key, kwargs.get(key))

if common_prefixes:
common_prefixes_elem = add_elem(root, "CommonPrefixes")
for cp in common_prefixes:
common_prefixes_elem = add_elem(root, "CommonPrefixes")
add_telem(common_prefixes_elem, "Prefix", cp)

if contents:
Expand All @@ -93,10 +94,29 @@ def get_list_xml_elem(contents, common_prefixes, **kwargs):
add_telem(contents_elem, "Size", obj.get("Size"))
add_telem(contents_elem, "LastModified", obj.get("LastModified"))
add_telem(contents_elem, "StorageClass", obj.get("StorageClass"))

return root


def format_timestamp_s3(timestamp):
""" Format the given timestamp to ISO date format compatible with AWS S3.
"""
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
return f"{dt.isoformat().split('+')[0][:-3]}Z"


def format_isoformat_as_local(isodate):
""" Given a date formatted with ISO format, parse it and output it as a
local date string for human consumption.
"""
# Parse it
dt = parser.isoparse(isodate)
# Convert it to the local timezone
dt = dt.astimezone()
# Format it for humans
return dt.strftime("%Y-%m-%d at %I:%M %p")


def get_nosuchkey_response(key):
return Response(content=inspect.cleandoc(f"""
<?xml version="1.0" encoding="UTF-8"?>
Expand Down

0 comments on commit 7d42e71

Please sign in to comment.