Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#65] Performance improvements #66

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
126 changes: 84 additions & 42 deletions hsclient/hydroshare.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import getpass
import threading
import os
import pathlib
import pickle
Expand Down Expand Up @@ -62,12 +63,13 @@ class File(str):
:param checksum: the md5 checksum of the file
"""

def __new__(cls, value, file_url, checksum):
def __new__(cls, value, file_url, checksum, aggregation):
return super(File, cls).__new__(cls, value)

def __init__(self, value, file_url, checksum):
def __init__(self, value, file_url, checksum, aggregation):
self._file_url = file_url
self._checksum = checksum
self._aggregation = aggregation

@property
def path(self) -> str:
Expand All @@ -92,6 +94,10 @@ def folder(self) -> str:
@property
def checksum(self):
"""The md5 checksum of the file"""
if self._checksum is None:
_ = self._aggregation._checksums
path = urljoin('data/contents', quote(self.path))
self._checksum = self._aggregation._checksums.get(path, "")
return self._checksum

@property
Expand Down Expand Up @@ -173,21 +179,26 @@ def _files(self):
"data/contents/",
)[1]
)
f = File(file_path, unquote(file.path), self._checksums[file_checksum_path])
if self._parsed_checksums is not None:
checksum = self._checksums[file_checksum_path]
else:
checksum = None
f = File(file_path, unquote(file.path), checksum, self)

self._parsed_files.append(f)
return self._parsed_files

@property
def _aggregations(self):

def populate_metadata(_aggr):
_aggr._metadata
_ = _aggr._metadata

if not self._parsed_aggregations:
if self._parsed_aggregations is None:
self._parsed_aggregations = []
for file in self._map.describes.files:
if is_aggregation(str(file)):
self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, self._checksums))
self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, None))

# load metadata for all aggregations (metadata is needed to create any typed aggregation)
with ThreadPoolExecutor() as executor:
Expand Down Expand Up @@ -258,8 +269,22 @@ def _download(self, save_path: str = "", unzip_to: str = None) -> str:
return unzip_to
return downloaded_zip

def _reset(self):
self._retrieved_map = None
self._retrieved_metadata = None
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = None
self._main_file_path = None

def _refetch(self):
# not refreshing the checksums here - they will be refreshed when needed
_ = self._map
_ = self._metadata
_ = self._files

@property
def metadata_file(self):
def metadata_file(self) -> str:
"""The path to the metadata file"""
return self.metadata_path.split("/data/contents/", 1)[1]

Expand Down Expand Up @@ -290,7 +315,6 @@ def main_file_path(self) -> str:
self._main_file_path = self.files()[0].path
return self._main_file_path

@refresh
def save(self) -> None:
"""
Saves the metadata back to HydroShare
Expand All @@ -316,7 +340,7 @@ def files(self, search_aggregations: bool = False, **kwargs) -> List[File]:
files = files + list(aggregation.files(search_aggregations=search_aggregations, **kwargs))
return files

def file(self, search_aggregations=False, **kwargs) -> File:
def file(self, search_aggregations=False, **kwargs) -> Union[File, None]:
"""
Returns a single file in the resource that matches the filtering parameters
:param search_aggregations: Defaults False, set to true to search aggregations
Expand Down Expand Up @@ -349,7 +373,7 @@ def aggregations(self, **kwargs) -> List[BaseMetadata]:
aggregations = filter(lambda agg: attribute_filter(agg.metadata, key, value), aggregations)
return list(aggregations)

def aggregation(self, **kwargs) -> BaseMetadata:
def aggregation(self, **kwargs) -> Union[BaseMetadata, None]:
"""
Returns a single Aggregation in the resource that matches the filtering parameters. Uses the same filtering
rules described in the aggregations method.
Expand All @@ -363,17 +387,12 @@ def aggregation(self, **kwargs) -> BaseMetadata:

def refresh(self) -> None:
"""
Forces the retrieval of the resource map and metadata files. Currently this is implemented to be lazy and will
only retrieve those files again after another call to access them is made. This will be later updated to be
eager and retrieve the files asynchronously.
Forces the retrieval of the resource map and metadata files. Files are retrieved asynchronously.
"""
# TODO, refresh should destroy the aggregation objects and async fetch everything.
self._retrieved_map = None
self._retrieved_metadata = None
self._parsed_files = None
self._parsed_aggregations = None
self._parsed_checksums = None
self._main_file_path = None

self._reset()
t = threading.Thread(target=self._refetch, daemon=True)
t.start()

def delete(self) -> None:
"""Deletes this aggregation from HydroShare"""
Expand All @@ -385,7 +404,7 @@ def delete(self) -> None:
self.main_file_path,
)
self._hs_session.delete(path, status_code=200)
self.refresh()
self._reset()


class DataObjectSupportingAggregation(Aggregation):
Expand Down Expand Up @@ -890,7 +909,7 @@ def resource_id(self) -> str:
return self._map.identifier

@property
def metadata_file(self):
def metadata_file(self) -> str:
"""The path to the metadata file"""
return self.metadata_path.split("/data/", 1)[1]

Expand Down Expand Up @@ -925,7 +944,7 @@ def access_permission(self):

# resource operations

def new_version(self):
def new_version(self) -> 'Resource':
"""
Creates a new version of the resource on HydroShare
:return: A Resource object of the newly created resource version
Expand All @@ -935,7 +954,7 @@ def new_version(self):
resource_id = response.text
return Resource("/resource/{}/data/resourcemap.xml".format(resource_id), self._hs_session)

def copy(self):
def copy(self) -> 'Resource':
"""
Copies this Resource into a new resource on HydroShare
returns: A Resource object of the newly copied resource
Expand All @@ -953,16 +972,16 @@ def download(self, save_path: str = "") -> str:
"""
return self._hs_session.retrieve_bag(self._hsapi_path, save_path=save_path)

@refresh
def delete(self) -> None:
"""
Deletes the resource on HydroShare
:return: None
"""
hsapi_path = self._hsapi_path
self._hs_session.delete(hsapi_path, status_code=204)
# refresh the resource to clear the cache
self._reset()

@refresh
def save(self) -> None:
"""
Saves the metadata to HydroShare
Expand All @@ -972,6 +991,10 @@ def save(self) -> None:
path = urljoin(self._hsapi_path, "ingest_metadata")
self._hs_session.upload_file(path, files={'file': ('resourcemetadata.xml', metadata_string)})

# if new creators were added, refresh the resource to get the creator order
creators_with_no_order = [cr for cr in self.metadata.creators if cr.creator_order is None]
if creators_with_no_order:
self.refresh()
# referenced content operations

@refresh
Expand Down Expand Up @@ -1008,7 +1031,6 @@ def reference_update(self, file_name: str, url: str, path: str = '') -> None:

# file operations

@refresh
def folder_create(self, folder: str) -> None:
"""
Creates a folder on HydroShare
Expand Down Expand Up @@ -1037,7 +1059,7 @@ def folder_delete(self, path: str = None) -> None:
"""
self._delete_file_folder(path)

def folder_download(self, path: str, save_path: str = ""):
def folder_download(self, path: str, save_path: str = "") -> str:
"""
Downloads a folder from HydroShare
:param path: The path to folder
Expand All @@ -1048,7 +1070,7 @@ def folder_download(self, path: str, save_path: str = ""):
urljoin(self._resource_path, "data", "contents", path), save_path, params={"zipped": "true"}
)

def file_download(self, path: str, save_path: str = "", zipped: bool = False):
def file_download(self, path: str, save_path: str = "", zipped: bool = False) -> str:
"""
Downloads a file from HydroShare
:param path: The path to the file
Expand All @@ -1072,17 +1094,34 @@ def file_delete(self, path: str = None) -> None:
"""
self._delete_file(path)

@refresh
def file_rename(self, path: str, new_path: str) -> None:
def file_rename(self, path: str, new_path: str, refresh=False) -> None:
"""
Rename a file on HydroShare
:param path: The path to the file
:param new_path: the renamed path to the file
:param refresh: Defaults to False, set to True to automatically refresh the resource from HydroShare
:return: None
"""
rename_path = urljoin(self._hsapi_path, "functions", "move-or-rename")
self._hs_session.post(rename_path, status_code=200, data={"source_path": path, "target_path": new_path})

if refresh:
self.refresh()
return
if self._parsed_files is None:
self.refresh()
return
if path in self._parsed_files:
# path is a file path - just refresh checksums from hydroshare and update the cached parsed_files
self._parsed_checksums = None
# update the checksums
_ = self._checksums

# update the parsed_files
checksum_path = urljoin("data", "contents", new_path)
new_file = File(new_path, unquote(new_path), self._checksums[checksum_path], self)
self._parsed_files = [new_file if file == path else file for file in self._parsed_files]

@refresh
def file_zip(self, path: str, zip_name: str = None, remove_file: bool = True) -> None:
"""
Expand Down Expand Up @@ -1113,7 +1152,7 @@ def file_unzip(self, path: str, overwrite: bool = True, ingest_metadata=True) ->
unzip_path, status_code=200, data={"overwrite": overwrite, "ingest_metadata": ingest_metadata}
)

def file_aggregate(self, path: str, agg_type: AggregationType, refresh: bool = True):
def file_aggregate(self, path: str, agg_type: AggregationType, refresh: bool = True) -> Union[Aggregation, None]:
"""
Aggregate a file to a HydroShare aggregation type. Aggregating files allows you to specify metadata specific
to the files associated with the aggregation. To set a FileSet aggregation, include the path to the folder or
Expand Down Expand Up @@ -1179,8 +1218,9 @@ def aggregation_remove(self, aggregation: Aggregation) -> None:
aggregation.metadata.type.value + "LogicalFile",
aggregation.main_file_path,
)

aggregation._hs_session.post(path, status_code=200)
aggregation.refresh()
self._parsed_aggregations = [agg for agg in self._parsed_aggregations if agg != aggregation]

@refresh
def aggregation_move(self, aggregation: Aggregation, dst_path: str = "") -> None:
Expand All @@ -1205,24 +1245,25 @@ def aggregation_move(self, aggregation: Aggregation, dst_path: str = "") -> None
if status in ("Not ready", "progress"):
while aggregation._hs_session.check_task(task_id) != 'true':
time.sleep(1)
aggregation.refresh()

@refresh
def aggregation_delete(self, aggregation: Aggregation) -> None:
"""
Deletes an aggregation from HydroShare. This deletes the files and metadata in the aggregation.
:param aggregation: The aggregation object to delete
:return: None
"""
# remove the aggregation from the cache
self._parsed_aggregations = [agg for agg in self._parsed_aggregations if agg != aggregation]
aggregation.delete()

def aggregation_download(self, aggregation: Aggregation, save_path: str = "", unzip_to: str = None) -> str:
@staticmethod
def aggregation_download(aggregation: Aggregation, save_path: str = "", unzip_to: str = None) -> str:
"""
Download an aggregation from HydroShare
:param aggregation: The aggregation to download
:param save_path: The local path to save the aggregation to, defaults to the current directory
:param unzip_to: If set, the resulting download will be unzipped to the specified path
:return: None
:return: The path to the downloaded file
"""
return aggregation._download(save_path=save_path, unzip_to=unzip_to)

Expand Down Expand Up @@ -1464,15 +1505,16 @@ def search(
):
"""
Query the GET /hsapi/resource/ REST end point of the HydroShare server.
:param creator: Filter results by the HydroShare username or email
:param author: Filter results by the HydroShare username or email
:param creator: Filter results by the HydroShare username or email of creator
:param contributor: Filter results by the HydroShare username or email of contributor
:param owner: Filter results by the HydroShare username or email
:param group_name: Filter results by the HydroShare group name associated with resources
:param from_date: Filter results to those created after from_date. Must be datetime.date.
:param to_date: Filter results to those created before to_date. Must be datetime.date. Because dates have
no time information, you must specify date+1 day to get results for date (e.g. use 2015-05-06 to get
resources created up to and including 2015-05-05)
:param types: Filter results to particular HydroShare resource types (Deprecated, all types are Composite)
:param resource_types: Filter results to particular HydroShare resource types
(Deprecated, all types are Composite)
:param subject: Filter by comma separated list of subjects
:param full_text_search: Filter by full text search
:param edit_permission: Filter by boolean edit permission
Expand Down Expand Up @@ -1548,7 +1590,7 @@ def resource(self, resource_id: str, validate: bool = True, use_cache: bool = Tr

res = Resource("/resource/{}/data/resourcemap.xml".format(resource_id), self._hs_session)
if validate:
res.metadata
_ = res.metadata

if use_cache:
self._resource_object_cache[resource_id] = res
Expand All @@ -1575,7 +1617,7 @@ def user(self, user_id: int) -> User:
response = self._hs_session.get(f'/hsapi/userDetails/{user_id}/', status_code=200)
return User(**response.json())

def my_user_info(self):
def my_user_info(self) -> dict:
"""
Retrieves the user info of the user's credentials provided
:return: JSON object representing the user info
Expand Down
Loading
Loading