Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiops #295

Merged
merged 7 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 185 additions & 72 deletions hsds/attr_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
from aiohttp.web import json_response

from .util.attrUtil import validateAttributeName
from .util.hdf5dtype import getItemSize, createDataType
from .util.dsetUtil import getShapeDims
from .util.arrayUtil import arrayToBytes, jsonToArray, decodeData
from .util.arrayUtil import bytesToArray, bytesArrayToList
from .datanode_lib import get_obj_id, get_metadata_obj, save_metadata_obj
from . import hsds_logger as log


def _index(items, marker, create_order=False):
"""Locate the leftmost value exactly equal to x"""
if create_order:
# list is not ordered, juse search linearly
# list is not ordered, just search linearly
for i in range(len(items)):
if items[i] == marker:
return i
Expand All @@ -39,6 +43,79 @@ def _index(items, marker, create_order=False):
return -1


def _getAttribute(attr_name, obj_json, include_data=True, encoding=None):
""" copy relevant fields from src to target """

if not isinstance(obj_json, dict):
msg = f"expected dict but got: {type(obj_json)}"
log.error(msg)
raise HTTPInternalServerError()

if "attributes" not in obj_json:
msg = "expected to find attributes key in obj_json"
log.error(msg)
raise HTTPInternalServerError()

attributes = obj_json["attributes"]
if attr_name not in attributes:
# this should be checked before calling this function
msg = f"attribute {attr_name} not found"
log.error(msg)
raise HTTPInternalServerError()

src_attr = attributes[attr_name]
log.debug(f"_getAttribute - src_attr: {src_attr}")

for key in ("created", "type", "shape", "value"):
if key not in src_attr:
msg = f"Expected to find key: {key} in {src_attr}"
log.error(msg)
raise HTTPInternalServerError()

des_attr = {}
type_json = src_attr["type"]
shape_json = src_attr["shape"]
des_attr["created"] = src_attr["created"]
des_attr["type"] = type_json
des_attr["shape"] = shape_json
des_attr["name"] = attr_name

if encoding:
item_size = getItemSize(type_json)
if item_size == "H5T_VARIABLE":
msg = "encoded value request but only json can be returned for "
msg = f"{attr_name} since it has variable length type"
log.warn(msg)
encoding = None
log.debug("base64 encoding requested")

if include_data:
value_json = src_attr["value"]
if "encoding" in src_attr:
des_attr["encoding"] = src_attr["encoding"]
# just copy the encoded value
des_attr["value"] = value_json
elif encoding:
# return base64 encoded value
if value_json is None:
des_attr["value"] = None
else:
arr_dtype = createDataType(type_json)
np_shape = getShapeDims(shape_json)
try:
arr = jsonToArray(np_shape, arr_dtype, value_json)
except ValueError as e:
msg = f"Bad Request: input data doesn't match selection: {e}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
output_data = arrayToBytes(arr, encoding=encoding)
des_attr["value"] = output_data.decode("ascii")
des_attr["encoding"] = encoding
else:
des_attr["value"] = src_attr["value"]
return des_attr


async def GET_Attributes(request):
""" Return JSON for attribute collection
"""
Expand All @@ -50,14 +127,20 @@ async def GET_Attributes(request):
if "bucket" in params:
bucket = params["bucket"]
else:
bucket = None
msg = "POST Attributes without bucket param"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

create_order = False
if "CreateOrder" in params and params["CreateOrder"]:
if params.get("CreateOrder"):
create_order = True

encoding = None
if params.get("encoding"):
encoding = params["encoding"]

include_data = False
if "IncludeData" in params and params["IncludeData"]:
if params.get("IncludeData"):
include_data = True

limit = None
Expand Down Expand Up @@ -121,14 +204,9 @@ async def GET_Attributes(request):
attr_list = []
for i in range(start_index, end_index):
attr_name = titles[i]
src_attr = attr_dict[attr_name]
des_attr = {}
des_attr["created"] = src_attr["created"]
des_attr["type"] = src_attr["type"]
des_attr["shape"] = src_attr["shape"]
des_attr["name"] = attr_name
if include_data:
des_attr["value"] = src_attr["value"]
kwargs = {"include_data": include_data, "encoding": encoding}
log.debug(f"_getAttribute kwargs: {kwargs}")
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)

resp_json = {"attributes": attr_list}
Expand Down Expand Up @@ -162,13 +240,20 @@ async def POST_Attributes(request):
if "bucket" in params:
bucket = params["bucket"]
else:
bucket = None
msg = "POST Attributes without bucket param"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

include_data = False
log.debug(f"got params: {params}")
if "IncludeData" in params and params["IncludeData"]:
include_data = True
log.debug("include attr data")
if params.get("encoding"):
encoding = params["encoding"]
log.debug("POST_Attributes requested base64 encoding")
else:
encoding = None

obj_json = await get_metadata_obj(app, obj_id, bucket=bucket)

Expand All @@ -181,61 +266,28 @@ async def POST_Attributes(request):
# return a list of attributes based on sorted dictionary keys
attr_dict = obj_json["attributes"]
attr_list = []
kwargs = {"include_data": include_data}
if encoding:
kwargs["encoding"] = encoding

for attr_name in titles:
if attr_name not in attr_dict:
continue
src_attr = attr_dict[attr_name]
des_attr = {}
des_attr["created"] = src_attr["created"]
des_attr["type"] = src_attr["type"]
des_attr["shape"] = src_attr["shape"]
des_attr["name"] = attr_name
if include_data:
des_attr["value"] = src_attr["value"]
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)

resp_json = {"attributes": attr_list}
log.debug(f"POST attributes returning: {resp_json}")
resp = json_response(resp_json)
log.response(request, resp=resp)
return resp


async def GET_Attribute(request):
"""HTTP GET method to return JSON for /(obj)/<id>/attributes/<name>
"""
log.request(request)
app = request.app
params = request.rel_url.query

obj_id = get_obj_id(request)

attr_name = request.match_info.get('name')
validateAttributeName(attr_name)
if "bucket" in params:
bucket = params["bucket"]
else:
bucket = None

obj_json = await get_metadata_obj(app, obj_id, bucket=bucket)
msg = f"GET attribute obj_id: {obj_id} name: {attr_name} bucket: {bucket}"
log.info(msg)
log.debug(f"got obj_json: {obj_json}")

if "attributes" not in obj_json:
log.error(f"unexpected obj data for id: {obj_id}")
raise HTTPInternalServerError()

attributes = obj_json["attributes"]
if attr_name not in attributes:
msg = f"Attribute '{attr_name}' not found for id: {obj_id}"
if not attr_list:
msg = f"POST attributes - requested {len(titles)} but none were found"
log.warn(msg)
raise HTTPNotFound()

attr_json = attributes[attr_name]

resp = json_response(attr_json)
if len(attr_list) != len(titles):
msg = f"POST attributes - requested {len(titles)} attributes but only "
msg += f"{len(attr_list)} were found"
log.warn(msg)
raise HTTPNotFound()
log.debug(f"POST attributes returning: {resp_json}")
resp = json_response(resp_json)
log.response(request, resp=resp)
return resp

Expand All @@ -254,12 +306,15 @@ async def PUT_Attributes(request):
raise HTTPBadRequest(message="body expected")

body = await request.json()
log.debug(f"got body: {body}")
if "bucket" in params:
bucket = params["bucket"]
elif "bucket" in body:
bucket = params["bucket"]
else:
bucket = None
msg = "PUT Attributes without bucket param"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

replace = False
if "replace" in params and params["replace"]:
Expand All @@ -283,6 +338,8 @@ async def PUT_Attributes(request):
attribute["shape"] = body["shape"]
if "value" in body:
attribute["value"] = body["value"]
if "encoding" in body:
attribute["encoding"] = body["encoding"]
items[attr_name] = attribute

# validate input
Expand All @@ -295,6 +352,36 @@ async def PUT_Attributes(request):
if "shape" not in attr_json:
log.error("PUT attribute with no shape in body")
raise HTTPInternalServerError()
if "value" in attr_json and attr_json.get("encoding"):
# decode and store as JSON if possible
value = attr_json["value"]
arr_dtype = createDataType(attr_json["type"]) # np datatype
attr_shape = attr_json["shape"]
np_dims = getShapeDims(attr_shape)
log.debug(f"np_dims: {np_dims}")
try:
arr = bytesToArray(value, arr_dtype, np_dims, encoding="base64")
except ValueError as e:
msg = f"Bad Request: encoded input data doesn't match shape and type: {e}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
log.debug(f"got arr: {arr}")
log.debug(f"arr.shape: {arr.shape}")
data = arr.tolist()
try:
json_data = bytesArrayToList(data)
log.debug(f"converted encoded data to {json_data}")
if attr_shape["class"] == "H5S_SCALAR":
attr_json["value"] = json_data[0] # just store the scalar
else:
attr_json["value"] = json_data
del attr_json["encoding"] # don't need to store as base64
except ValueError as err:
msg = f"Cannot decode bytes to list: {err}, will store as base64"
log.warn(msg)
attr_json["value"] = value # use the base64 data

log.debug(f"attribute {attr_name}: {attr_json}")

log.info(f"PUT {len(items)} attributes to obj_id: {obj_id} bucket: {bucket}")

Expand Down Expand Up @@ -350,8 +437,8 @@ async def PUT_Attributes(request):
return resp


async def DELETE_Attribute(request):
"""HTTP DELETE method for /(obj)/<id>/attributes/<name>
async def DELETE_Attributes(request):
"""HTTP DELETE method for /(obj)/<id>/attributes
"""
log.request(request)
app = request.app
Expand All @@ -361,15 +448,40 @@ async def DELETE_Attribute(request):
if "bucket" in params:
bucket = params["bucket"]
else:
bucket = None
msg = "DELETE Attributes without bucket param"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if "encoding" in params:
encoding = params["encoding"]
if encoding != "base64":
msg = "only base64 encoding is supported"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
else:
encoding = None

if "seperator" in params:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seperator -> separator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

seperator = params["seperator"]
else:
seperator = "/"

attr_name = request.match_info.get('name')
log.info(f"DELETE attribute {attr_name} in {obj_id} bucket: {bucket}")
validateAttributeName(attr_name)
if "attr_names" not in params:
msg = "expected attr_names for DELETE attributes"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

attr_names_param = params["attr_names"]
if encoding:
attr_names_param = decodeData(attr_names_param).decode("utf-8")

attr_names = attr_names_param.split(seperator)

log.info(f"DELETE attribute {attr_names} in {obj_id} bucket: {bucket}")

obj_json = await get_metadata_obj(app, obj_id, bucket=bucket)

log.debug(f"DELETE attribute obj_id: {obj_id} got json")
log.debug(f"DELETE attributes obj_id: {obj_id} got json")
if "attributes" not in obj_json:
msg = f"unexpected data for obj id: {obj_id}"
msg.error(msg)
Expand All @@ -378,12 +490,13 @@ async def DELETE_Attribute(request):
# return a list of attributes based on sorted dictionary keys
attributes = obj_json["attributes"]

if attr_name not in attributes:
msg = f"Attribute {attr_name} not found in objid: {obj_id}"
log.warn(msg)
raise HTTPNotFound()
for attr_name in attr_names:
if attr_name not in attributes:
msg = f"Attribute {attr_name} not found in objid: {obj_id}"
log.warn(msg)
raise HTTPNotFound()

del attributes[attr_name]
del attributes[attr_name]

await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

Expand Down
Loading
Loading