Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multilink #296

Merged
merged 18 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 73 additions & 21 deletions hsds/attr_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import time
from bisect import bisect_left

from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound
from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound, HTTPGone
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.web import json_response

from .util.attrUtil import validateAttributeName
from .util.attrUtil import validateAttributeName, isEqualAttr
from .util.hdf5dtype import getItemSize, createDataType
from .util.dsetUtil import getShapeDims
from .util.arrayUtil import arrayToBytes, jsonToArray, decodeData
Expand Down Expand Up @@ -270,21 +270,31 @@ async def POST_Attributes(request):
if encoding:
kwargs["encoding"] = encoding

missing_names = set()

for attr_name in titles:
if attr_name not in attr_dict:
missing_names.add(attr_name)
continue
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)

resp_json = {"attributes": attr_list}
if not attr_list:
msg = f"POST attributes - requested {len(titles)} but none were found"
log.warn(msg)
raise HTTPNotFound()
if len(attr_list) != len(titles):

if missing_names:
msg = f"POST attributes - requested {len(titles)} attributes but only "
msg += f"{len(attr_list)} were found"
log.warn(msg)
# one or more attributes not found, check to see if any
# had been previously deleted
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
for attr_name in missing_names:
if attr_name in attr_delete_set:
log.info(f"attribute: {attr_name} was previously deleted, returning 410")
raise HTTPGone()
log.info("one or mores attributes not found, returning 404")
raise HTTPNotFound()
log.debug(f"POST attributes returning: {resp_json}")
resp = json_response(resp_json)
Expand Down Expand Up @@ -392,18 +402,28 @@ async def PUT_Attributes(request):

attributes = obj_json["attributes"]

# check for conflicts, also set timestamp
create_time = time.time()
new_attribute = False # set this if we have any new attributes
# check for conflicts
new_attributes = set() # attribute names that are new or replacements
for attr_name in items:
attribute = items[attr_name]
if attr_name in attributes:
log.debug(f"attribute {attr_name} exists")
if replace:
old_item = attributes[attr_name]
try:
is_dup = isEqualAttr(attribute, old_item)
except TypeError:
log.error(f"isEqualAttr TypeError - new: {attribute} old: {old_item}")
raise HTTPInternalServerError()
if is_dup:
log.debug(f"duplicate attribute: {attr_name}")
continue
elif replace:
# don't change the create timestamp
log.debug(f"attribute {attr_name} exists, but will be updated")
old_item = attributes[attr_name]
attribute["created"] = old_item["created"]
new_attributes.add(attr_name)
else:
# Attribute already exists, return a 409
msg = f"Attempt to overwrite attribute: {attr_name} "
Expand All @@ -414,18 +434,30 @@ async def PUT_Attributes(request):
# set the timestamp
log.debug(f"new attribute {attr_name}")
attribute["created"] = create_time
new_attribute = True
new_attributes.add(attr_name)

# ok - all set, create the attributes
for attr_name in items:
# if any of the attribute names was previously deleted,
# remove from the deleted set
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()

# ok - all set, add the attributes
for attr_name in new_attributes:
log.debug(f"adding attribute {attr_name}")
attr_json = items[attr_name]
attributes[attr_name] = attr_json

# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

if new_attribute:
if attr_name in attr_delete_set:
attr_delete_set.remove(attr_name)

if new_attributes:
# update the obj lastModified
now = time.time()
obj_json["lastModified"] = now
# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
status = 201
else:
status = 200
Expand Down Expand Up @@ -490,15 +522,35 @@ async def DELETE_Attributes(request):
# return a list of attributes based on sorted dictionary keys
attributes = obj_json["attributes"]

# add attribute names to deleted set, so we can return a 410 if they
# are requested in the future
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()
deleted_attrs[obj_id] = attr_delete_set

save_obj = False # set to True if anything is actually modified
for attr_name in attr_names:
if attr_name in attr_delete_set:
log.warn(f"attribute {attr_name} already deleted")
continue

if attr_name not in attributes:
msg = f"Attribute {attr_name} not found in objid: {obj_id}"
msg = f"Attribute {attr_name} not found in obj id: {obj_id}"
log.warn(msg)
raise HTTPNotFound()

del attributes[attr_name]

await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
attr_delete_set.add(attr_name)
save_obj = True

if save_obj:
# update the object lastModified
now = time.time()
obj_json["lastModified"] = now
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

resp_json = {}
resp = json_response(resp_json)
Expand Down
2 changes: 2 additions & 0 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ def create_app():
}
app["chunk_cache"] = LruCache(**kwargs)
app["deleted_ids"] = set()
app["deleted_attrs"] = {} # map of objectid to set of deleted attribute names
app["deleted_links"] = {} # map of objecctid to set of deleted link names
# map of objids to timestamp and bucket of which they were last updated
app["dirty_ids"] = {}
# map of dataset ids to deflate levels (if compressed)
Expand Down
35 changes: 34 additions & 1 deletion hsds/domain_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from aiohttp.web_exceptions import HTTPInternalServerError, HTTPNotFound, HTTPGone

from .util.idUtil import getCollectionForId, getDataNodeUrl
from .servicenode_lib import getObjectJson, getAttributes, putAttributes, getLinks
from .servicenode_lib import getObjectJson, getAttributes, putAttributes, getLinks, putLinks
from . import hsds_logger as log


Expand Down Expand Up @@ -295,6 +295,30 @@ async def get_links(self, grp_id, titles=None):
else:
log.debug(f"link: {link_id} already in object dict")

async def put_links(self, grp_id, link_items):
# write the given links for the obj_id
log.debug(f"put_links for {grp_id}, {len(link_items)} links")
req = getDataNodeUrl(self._app, grp_id)
req += f"/groups/{grp_id}/links"
kwargs = {}
if "bucket" in self._params:
kwargs["bucket"] = self._params["bucket"]
status = None
try:
status = await putLinks(self._app, grp_id, link_items, **kwargs)
except HTTPConflict:
log.warn("DomainCrawler - got HTTPConflict from http_put")
status = 409
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception {e}")

log.debug(f"DomainCrawler fetch for {grp_id} - returning status: {status}")
self._obj_dict[grp_id] = {"status": status}

def get_status(self):
""" return the highest status of any of the returned objects """
status = None
Expand Down Expand Up @@ -419,7 +443,16 @@ async def fetch(self, obj_id):

log.debug(f"DomainCrawler - get link titles: {link_titles}")
await self.get_links(obj_id, link_titles)
elif self._action == "put_link":
log.debug("DomainCrawlwer - put links")
# write links
if self._objs and obj_id not in self._objs:
log.error(f"couldn't find {obj_id} in self._objs")
return
link_items = self._objs[obj_id]
log.debug(f"got {len(link_items)} link items for {obj_id}")

await self.put_links(obj_id, link_items)
else:
msg = f"DomainCrawler: unexpected action: {self._action}"
log.error(msg)
Expand Down
125 changes: 125 additions & 0 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,131 @@ async def getScanTime(app, root_id, bucket=None):
return root_scan


async def POST_Domain(request):
""" return object defined by h5path list """

log.request(request)
app = request.app
params = request.rel_url.query
log.debug(f"POST_Domain query params: {params}")

include_links = False
include_attrs = False
follow_soft_links = False
follow_external_links = False

if "include_links" in params and params["include_links"]:
mattjala marked this conversation as resolved.
Show resolved Hide resolved
include_links = True
if "include_attrs" in params and params["include_attrs"]:
include_attrs = True
if "follow_soft_links" in params and params["follow_soft_links"]:
follow_soft_links = True
if "follow_external_links" in params and params["follow_external_links"]:
follow_external_links = True

if not request.has_body:
msg = "POST Domain with no body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

try:
body = await request.json()
except json.JSONDecodeError:
msg = "Unable to load JSON body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if "h5paths" in body:
h5paths = body["h5paths"]
if not isinstance(h5paths, list):
msg = f"expected list for h5paths but got: {type(h5paths)}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
else:
msg = "expected h5paths key in body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

(username, pswd) = getUserPasswordFromRequest(request)
if username is None and app["allow_noauth"]:
username = "default"
else:
await validateUserPassword(app, username, pswd)

domain = None
try:
domain = getDomainFromRequest(request)
except ValueError:
log.warn(f"Invalid domain: {domain}")
raise HTTPBadRequest(reason="Invalid domain name")

bucket = getBucketForDomain(domain)
log.debug(f"GET_Domain domain: {domain} bucket: {bucket}")

if not bucket:
# no bucket defined, raise 400
msg = "Bucket not provided"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
if bucket:
checkBucketAccess(app, bucket)

if not domain:
msg = "no domain given"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

log.info(f"got domain: {domain}")

domain_json = await getDomainJson(app, domain, reload=True)

if domain_json is None:
log.warn(f"domain: {domain} not found")
raise HTTPNotFound()

if "acls" not in domain_json:
log.error("No acls key found in domain")
raise HTTPInternalServerError()

log.debug(f"got domain_json: {domain_json}")
# validate that the requesting user has permission to read this domain
# aclCheck throws exception if not authorized
aclCheck(app, domain_json, "read", username)

json_objs = {}

for h5path in h5paths:
root_id = domain_json["root"]

# getObjectIdByPath throws 404 if not found
obj_id, domain, _ = await getObjectIdByPath(
app, root_id, h5path, bucket=bucket, domain=domain,
follow_soft_links=follow_soft_links,
follow_external_links=follow_external_links)
log.info(f"get obj_id: {obj_id} from h5path: {h5path}")
# get authoritative state for object from DN (even if
# it's in the meta_cache).
kwargs = {"refresh": True, "bucket": bucket,
"include_attrs": include_attrs, "include_links": include_links}
log.debug(f"kwargs for getObjectJson: {kwargs}")

obj_json = await getObjectJson(app, obj_id, **kwargs)

obj_json = respJsonAssemble(obj_json, params, obj_id)

obj_json["domain"] = getPathForDomain(domain)

# client may not know class of object retrieved via path
obj_json["class"] = getObjectClass(obj_id)

json_objs[h5path] = obj_json

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some response parsing callbacks in the REST VOL use hrefs to get information about the target domain, so POST_Domain not returning any hrefs becomes a problem. As a first pass, this solved the problem by attaching hrefs to each returned object individually (though a complete version might need to handle when some objects have different domains due to external links):


    hrefs = []
    hrefs.append({"rel": "self", "href": getHref(request, "/")})
    if "root" in domain_json:
        root_uuid = domain_json["root"]
        href = getHref(request, "/datasets")
        hrefs.append({"rel": "database", "href": href})
        href = getHref(request, "/groups")
        hrefs.append({"rel": "groupbase", "href": href})
        href = getHref(request, "/datatypes")
        hrefs.append({"rel": "typebase", "href": href})
        href = getHref(request, "/groups/" + root_uuid)
        hrefs.append({"rel": "root", "href": href})
        href = getHref(request, "/")
        hrefs.append({"rel": "home", "href": href})

    hrefs.append({"rel": "acls", "href": getHref(request, "/acls")})
    parent_domain = getParentDomain(domain)
    if not parent_domain or getPathForDomain(parent_domain) == "/":
        is_toplevel = True
    else:
        is_toplevel = False

    if not is_toplevel:
        href = getHref(request, "/", domain=parent_domain)
        hrefs.append({"rel": "parent", "href": href})

    for h5path in json_objs:
        (json_objs[h5path])["hrefs"] = hrefs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've come to think of the hrefs as mainly for human consumption - i.e. clicking on a link that shows up in the browser. As such, hrefs for POST and PUT don't seem as useful.
Another issue is that some of the POST responses could potentially involve a lot of hrefs in the response - e.g. multiple hrefs for each link in POST_links response.
Could you describe what info that you are currently pulling out of the hrefs? How about we just include whatever that is in the POST response as regular json?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The REST VOL gets the domain that contains the object from the hrefs, to satisfy some library API calls that retrieve the object's file number (H5Oget_info3). It stores the hash of the domain name as a file number, so that comparisons between different objects' file numbers still work.

Returning the Domain that contains the object directly could also solve this. The REST VOL uses this information in GET_Datatype/Group/Dataset and POST_Domain requests.

jsonRsp = {"h5paths": json_objs}
resp = await jsonResponse(request, jsonRsp)
log.response(request, resp=resp)
return resp


async def PUT_Domain(request):
"""HTTP method to create a new domain"""
log.request(request)
Expand Down
Loading
Loading