Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiops #295

Merged
merged 7 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 290 additions & 93 deletions hsds/attr_dn.py

Large diffs are not rendered by default.

1,043 changes: 838 additions & 205 deletions hsds/attr_sn.py

Large diffs are not rendered by default.

24 changes: 11 additions & 13 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from .group_dn import GET_Group, POST_Group, DELETE_Group, PUT_Group
from .group_dn import POST_Root
from .link_dn import GET_Links, GET_Link, PUT_Link, DELETE_Link
from .attr_dn import GET_Attributes, GET_Attribute, PUT_Attribute
from .attr_dn import DELETE_Attribute
from .attr_dn import GET_Attributes, POST_Attributes
from .attr_dn import PUT_Attributes, DELETE_Attributes
from .ctype_dn import GET_Datatype, POST_Datatype, DELETE_Datatype
from .dset_dn import GET_Dataset, POST_Dataset, DELETE_Dataset
from .dset_dn import PUT_DatasetShape
Expand Down Expand Up @@ -63,26 +63,24 @@ async def init():
app.router.add_route("DELETE", "/groups/{id}/links/{title}", DELETE_Link)
app.router.add_route("PUT", "/groups/{id}/links/{title}", PUT_Link)
app.router.add_route("GET", "/groups/{id}/attributes", GET_Attributes)
app.router.add_route("GET", "/groups/{id}/attributes/{name}", GET_Attribute)
app.router.add_route("DELETE", "/groups/{id}/attributes/{name}", DELETE_Attribute)
app.router.add_route("PUT", "/groups/{id}/attributes/{name}", PUT_Attribute)
app.router.add_route("POST", "/groups/{id}/attributes", POST_Attributes)
app.router.add_route("DELETE", "/groups/{id}/attributes", DELETE_Attributes)
app.router.add_route("PUT", "/groups/{id}/attributes", PUT_Attributes)
app.router.add_route("GET", "/datatypes/{id}", GET_Datatype)
app.router.add_route("DELETE", "/datatypes/{id}", DELETE_Datatype)
app.router.add_route("POST", "/datatypes", POST_Datatype)
app.router.add_route("GET", "/datatypes/{id}/attributes", GET_Attributes)
app.router.add_route("GET", "/datatypes/{id}/attributes/{name}", GET_Attribute)
app.router.add_route(
"DELETE", "/datatypes/{id}/attributes/{name}", DELETE_Attribute
)
app.router.add_route("PUT", "/datatypes/{id}/attributes/{name}", PUT_Attribute)
app.router.add_route("POST", "/datatypes/{id}/attributes", POST_Attributes)
app.router.add_route("DELETE", "/datatypes/{id}/attributes", DELETE_Attributes)
app.router.add_route("PUT", "/datatypes/{id}/attributes", PUT_Attributes)
app.router.add_route("GET", "/datasets/{id}", GET_Dataset)
app.router.add_route("DELETE", "/datasets/{id}", DELETE_Dataset)
app.router.add_route("POST", "/datasets", POST_Dataset)
app.router.add_route("PUT", "/datasets/{id}/shape", PUT_DatasetShape)
app.router.add_route("GET", "/datasets/{id}/attributes", GET_Attributes)
app.router.add_route("GET", "/datasets/{id}/attributes/{name}", GET_Attribute)
app.router.add_route("DELETE", "/datasets/{id}/attributes/{name}", DELETE_Attribute)
app.router.add_route("PUT", "/datasets/{id}/attributes/{name}", PUT_Attribute)
app.router.add_route("POST", "/datasets/{id}/attributes", POST_Attributes)
app.router.add_route("DELETE", "/datasets/{id}/attributes", DELETE_Attributes)
app.router.add_route("PUT", "/datasets/{id}/attributes", PUT_Attributes)
app.router.add_route("PUT", "/chunks/{id}", PUT_Chunk)
app.router.add_route("GET", "/chunks/{id}", GET_Chunk)
app.router.add_route("POST", "/chunks/{id}", POST_Chunk)
Expand Down
4 changes: 1 addition & 3 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,7 @@ async def get_metadata_obj(app, obj_id, bucket=None):
if isValidDomain(obj_id):
domain_bucket = getBucketForDomain(obj_id)
if bucket and domain_bucket and bucket != domain_bucket:
msg = (
f"get_metadata_obj for domain: {obj_id} but bucket param was: {bucket}"
)
msg = f"get_metadata_obj for domain: {obj_id} but bucket param was: {bucket}"
log.error(msg)
raise HTTPInternalServerError()
if not bucket:
Expand Down
326 changes: 326 additions & 0 deletions hsds/domain_crawl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
##############################################################################
# Copyright by The HDF Group. #
# All rights reserved. #
# #
# This file is part of HSDS (HDF5 Scalable Data Service), Libraries and #
# Utilities. The full HSDS copyright notice, including #
# terms governing use, modification, and redistribution, is contained in #
# the file COPYING, which can be found at the root of the source code #
# distribution tree. If you do not have access to this file, you may #
# request a copy from [email protected]. #
##############################################################################
#
# domain crawler
#

import asyncio

from aiohttp.web_exceptions import HTTPServiceUnavailable, HTTPConflict, HTTPBadRequest
from aiohttp.web_exceptions import HTTPInternalServerError, HTTPNotFound, HTTPGone


from .util.idUtil import getCollectionForId, getDataNodeUrl

from .servicenode_lib import getObjectJson, getAttributes, putAttributes
from . import hsds_logger as log


class DomainCrawler:
def __init__(
self,
app,
objs,
action="get_obj",
params=None,
max_tasks=40,
max_objects_limit=0,
raise_error=False
):
log.info(f"DomainCrawler.__init__ root_id: {len(objs)} objs")
log.debug(f"params: {params}")
self._app = app
self._action = action
self._max_objects_limit = max_objects_limit
self._params = params
self._max_tasks = max_tasks
self._q = asyncio.Queue()
self._obj_dict = {}
self.seen_ids = set()
self._raise_error = raise_error
if not objs:
log.error("no objs for crawler to crawl!")
raise ValueError()

for obj_id in objs:
log.debug(f"adding {obj_id} to the queue")
self._q.put_nowait(obj_id)
if isinstance(objs, dict):
self._objs = objs
else:
self._objs = None

async def get_attributes(self, obj_id, attr_names):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be merged with _get_attributes from attr_sn? The domain crawler self seems like it contains the app and parameter fields needed for _get_attributes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Created a getAttributes function in servicenode_lib.py that both attr_sn and DomainCrawl call.

# get the given attributes for the obj_id
msg = f"get_attributes for {obj_id}"
if attr_names:
msg += f", {len(attr_names)} attributes"
log.debug(msg)

kwargs = {}
for key in ("include_data", "ignore_nan", "bucket"):
if key in self._params:
kwargs[key] = self._params[key]
if attr_names:
kwargs["attr_names"] = attr_names
log.debug(f"using kwargs: {kwargs}")

status = 200
# make sure to catch all expected exceptions, otherwise
# the task will never complete
try:
attributes = await getAttributes(self._app, obj_id, **kwargs)
except HTTPBadRequest:
status = 400
except HTTPNotFound:
status = 404
except HTTPGone:
status = 410
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception from post request: {e}")
status = 500

if status == 200:
log.debug(f"got attributes: {attributes}")
self._obj_dict[obj_id] = attributes
else:
log.warn(f"Domain crawler - got {status} status for obj_id {obj_id}")
self._obj_dict[obj_id] = {"status": status}

async def put_attributes(self, obj_id, attr_items):
# write the given attributes for the obj_id
log.debug(f"put_attributes for {obj_id}, {len(attr_items)} attributes")
req = getDataNodeUrl(self._app, obj_id)
collection = getCollectionForId(obj_id)
req += f"/{collection}/{obj_id}/attributes"
kwargs = {}
if "bucket" in self._params:
kwargs["bucket"] = self._params["bucket"]
if "replace" in self._params:
kwargs["replace"] = self._params["replace"]
status = None
try:
status = await putAttributes(self._app, obj_id, attr_items, **kwargs)
except HTTPConflict:
log.warn("DomainCrawler - got HTTPConflict from http_put")
status = 409
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception {e}")

log.debug(f"DomainCrawler fetch for {obj_id} - returning status: {status}")
self._obj_dict[obj_id] = {"status": status}

async def get_obj_json(self, obj_id):
""" get the given obj_json for the obj_id.
for each group found, search the links if include_links is set """
log.debug(f"get_obj_json: {obj_id}")
collection = getCollectionForId(obj_id)
kwargs = {}

for k in ("include_links", "include_attrs", "bucket"):
if k in self._params:
kwargs[k] = self._params[k]
if collection == "groups" and self._params.get("follow_links"):
follow_links = True
kwargs["include_links"] = True # get them so we can follow them
else:
follow_links = False
if follow_links or self._params.get("include_attrs"):
kwargs["refresh"] = True # don't want a cached version in this case

log.debug(f"follow_links: {follow_links}")
log.debug(f"getObjectJson kwargs: {kwargs}")
obj_json = None
status = 200
try:
obj_json = await getObjectJson(self._app, obj_id, **kwargs)
except HTTPNotFound:
status = 404
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception {e}")
status = 500
log.debug(f"getObjectJson status: {status}")

if obj_json is None:
msg = f"DomainCrawler - getObjectJson for {obj_id} "
if status >= 500:
msg += f"failed, status: {status}"
log.error(msg)
else:
msg += f"returned status: {status}"
log.warn(msg)
return

log.debug(f"DomainCrawler - got json for {obj_id}")
log.debug(f"obj_json: {obj_json}")

log.debug("store obj json")
self._obj_dict[obj_id] = obj_json # store the obj_json

# for groups iterate through all the hard links and
# add to the lookup ids set

log.debug(f"gotCollection: {collection}")

if collection == "groups" and follow_links:
if "links" not in obj_json:
log.error("expected links key in obj_json")
return
links = obj_json["links"]
log.debug(f"DomainCrawler links: {links}")
for title in links:
log.debug(f"DomainCrawler - got link: {title}")
link_obj = links[title]
num_objects = len(self._obj_dict)
if self._params.get("max_objects_limit") is not None:
max_objects_limit = self._params["max_objects_limit"]
if num_objects >= max_objects_limit:
msg = "DomainCrawler reached limit of "
msg += f"{max_objects_limit}"
log.info(msg)
break
if link_obj["class"] != "H5L_TYPE_HARD":
# just follow hardlinks
continue
link_id = link_obj["id"]
if link_id not in self._obj_dict:
# haven't seen this object yet, get obj json
log.debug(f"DomainCrawler - adding link_id: {link_id}")
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)

def get_status(self):
""" return the highest status of any of the returned objects """
status = None
for obj_id in self._obj_dict:
item = self._obj_dict[obj_id]
log.debug(f"item: {item}")
if "status" in item:
item_status = item["status"]
if status is None or item_status > status:
# return the more severe error
log.debug(f"setting status to {item_status}")
status = item_status
return status

async def crawl(self):
workers = [asyncio.Task(self.work()) for _ in range(self._max_tasks)]
# When all work is done, exit.
msg = "DomainCrawler - await queue.join - "
msg += f"count: {len(self._obj_dict)}"
log.info(msg)
await self._q.join()
msg = "DomainCrawler - join complete - "
msg += f"count: {len(self._obj_dict)}"
log.info(msg)

for w in workers:
w.cancel()
log.debug("DomainCrawler - workers canceled")

status = self.get_status()
if status:
log.debug(f"DomainCrawler -- status: {status}")
log.debug(f"raise_error: {self._raise_error}")
if self._raise_error:
# throw the approriate exception if other than 200, 201
if status == 200:
pass # ok
elif status == 201:
pass # also ok
elif status == 400:
log.warn("DomainCrawler - BadRequest")
raise HTTPBadRequest(reason="unkown")
elif status == 404:
log.warn("DomainCrawler - not found")
raise HTTPNotFound()
elif status == 409:
log.warn("DomainCrawler - conflict")
raise HTTPConflict()
elif status == 410:
log.warn("DomainCrawler - gone")
raise HTTPGone()
elif status == 500:
log.error("DomainCrawler - internal server error")
raise HTTPInternalServerError()
elif status == 503:
log.error("DomainCrawler - server busy")
raise HTTPServiceUnavailable()
else:
log.error(f"DomainCrawler - unexpected status: {status}")
raise HTTPInternalServerError()

async def work(self):
while True:
obj_id = await self._q.get()
await self.fetch(obj_id)
self._q.task_done()

async def fetch(self, obj_id):
log.debug(f"DomainCrawler fetch for id: {obj_id}")
log.debug(f"action: {self._action}")
if self._action == "get_obj":
log.debug("DomainCrawler - get obj")
# just get the obj json
await self.get_obj_json(obj_id)
elif self._action == "get_attr":
log.debug("DomainCrawler - get attributes")
# fetch the given attributes
if self._objs is None:
log.error("DomainCrawler - self._objs not set")
return
if obj_id not in self._objs:
log.error(f"couldn't find {obj_id} in self._objs")
return
attr_names = self._objs[obj_id]
if attr_names is None:
log.debug(f"fetch all attributes for {obj_id}")
else:
if not isinstance(attr_names, list):
log.error("expected list for attribute names")
return
if len(attr_names) == 0:
log.warn("expected at least one name in attr_names list")
return

log.debug(f"DomainCrawler - got attribute names: {attr_names}")
await self.get_attributes(obj_id, attr_names)
elif self._action == "put_attr":
log.debug("DomainCrawler - put attributes")
# write attributes
if self._objs and obj_id not in self._objs:
log.error(f"couldn't find {obj_id} in self._objs")
return
attr_items = self._objs[obj_id]
log.debug(f"got {len(attr_items)} attr_items")

await self.put_attributes(obj_id, attr_items)
else:
msg = f"DomainCrawler: unexpected action: {self._action}"
log.error(msg)

msg = f"DomainCrawler - fetch complete obj_id: {obj_id}, "
msg += f"{len(self._obj_dict)} objects found"
log.debug(msg)
log.debug(f"obj_dict: {self._obj_dict}")
Loading
Loading