Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let file sources choose a path for uploaded files #19154

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lib/galaxy/files/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def write_from(
native_path: str,
user_context: "OptionalUserContext" = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
"""Write file at native path to target_path (relative to uri root).

:param target_path: url of the target file to write to within the filesource. e.g. `gxfiles://myftp1/myfile.txt`
Expand All @@ -231,6 +231,9 @@ def write_from(
:type user_context: _type_, optional
:param opts: A set of options to exercise additional control over the write_from method. Filesource specific, defaults to None
:type opts: Optional[FilesSourceOptions], optional
:return: Actual url of the written file, fixed by the service backing the FileSource. May differ from the target
path.
:rtype: str
"""

@abc.abstractmethod
Expand Down Expand Up @@ -511,10 +514,10 @@ def write_from(
native_path: str,
user_context: "OptionalUserContext" = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> str:
self._ensure_writeable()
self._check_user_access(user_context)
self._write_from(target_path, native_path, user_context=user_context, opts=opts)
return self._write_from(target_path, native_path, user_context=user_context, opts=opts) or target_path

@abc.abstractmethod
def _write_from(
Expand All @@ -523,7 +526,7 @@ def _write_from(
native_path: str,
user_context: "OptionalUserContext" = None,
opts: Optional[FilesSourceOptions] = None,
):
) -> Optional[str]:
pass

def realize_to(
Expand Down
40 changes: 28 additions & 12 deletions lib/galaxy/managers/model_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ def prepare_history_content_download(self, request: GenerateHistoryContentDownlo
) as export_store:
if request.content_type == HistoryContentType.dataset:
hda = self._sa_session.get(model.HistoryDatasetAssociation, request.content_id)
export_store.add_dataset(hda)
export_store.add_dataset(hda) # type: ignore[arg-type]
else:
hdca = self._sa_session.get(model.HistoryDatasetCollectionAssociation, request.content_id)
export_store.export_collection(
hdca, include_hidden=request.include_hidden, include_deleted=request.include_deleted
hdca, # type: ignore[arg-type]
include_hidden=request.include_hidden,
include_deleted=request.include_deleted,
)

def prepare_invocation_download(self, request: GenerateInvocationDownload):
Expand All @@ -161,7 +163,9 @@ def prepare_invocation_download(self, request: GenerateInvocationDownload):
)(short_term_storage_target.path) as export_store:
invocation = self._sa_session.get(model.WorkflowInvocation, request.invocation_id)
export_store.export_workflow_invocation(
invocation, include_hidden=request.include_hidden, include_deleted=request.include_deleted
invocation, # type: ignore[arg-type]
include_hidden=request.include_hidden,
include_deleted=request.include_deleted,
)

def write_invocation_to(self, request: WriteInvocationTo):
Expand All @@ -178,7 +182,9 @@ def write_invocation_to(self, request: WriteInvocationTo):
)(target_uri) as export_store:
invocation = self._sa_session.get(model.WorkflowInvocation, request.invocation_id)
export_store.export_workflow_invocation(
invocation, include_hidden=request.include_hidden, include_deleted=request.include_deleted
invocation, # type: ignore[arg-type]
include_hidden=request.include_hidden,
include_deleted=request.include_deleted,
)

def _bco_export_options(self, request: BcoGenerationTaskParametersMixin):
Expand All @@ -202,33 +208,43 @@ def write_history_content_to(self, request: WriteHistoryContentTo):
)(target_uri) as export_store:
if request.content_type == HistoryContentType.dataset:
hda = self._sa_session.get(model.HistoryDatasetAssociation, request.content_id)
export_store.add_dataset(hda)
export_store.add_dataset(hda) # type: ignore[arg-type]
else:
hdca = self._sa_session.get(model.HistoryDatasetCollectionAssociation, request.content_id)
export_store.export_collection(
hdca, include_hidden=request.include_hidden, include_deleted=request.include_deleted
hdca, # type: ignore[arg-type]
include_hidden=request.include_hidden,
include_deleted=request.include_deleted,
)

def write_history_to(self, request: WriteHistoryTo):
model_store_format = request.model_store_format
export_files = "symlink" if request.include_files else None
target_uri = request.target_uri
user_context = self._build_user_context(request.user.user_id)
export_metadata = self.set_history_export_request_metadata(request)

exception_exporting_history: Optional[Exception] = None
try:
with model.store.get_export_store_factory(
export_store = model.store.get_export_store_factory(
self._app, model_store_format, export_files=export_files, user_context=user_context
)(target_uri) as export_store:
)(target_uri)
with export_store:
history = self._history_manager.by_id(request.history_id)
export_store.export_history(
history, include_hidden=request.include_hidden, include_deleted=request.include_deleted
)
self.set_history_export_result_metadata(request.export_association_id, export_metadata, success=True)
request.target_uri = str(export_store.file_source_uri) or request.target_uri
except Exception as e:
exception_exporting_history = e
raise
finally:
export_metadata = self.set_history_export_request_metadata(request)
self.set_history_export_result_metadata(
request.export_association_id, export_metadata, success=False, error=str(e)
request.export_association_id,
export_metadata,
success=not bool(exception_exporting_history),
error=str(exception_exporting_history) if exception_exporting_history else None,
)
raise
Comment on lines -217 to -231
Copy link
Contributor Author

@kysrpex kysrpex Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before applying the set of changes from 53012bf, the method ModelStoreManager.set_history_export_request_metadata() instantiates a ExportObjectMetadata Pydantic model and dumps it to the database in the form of JSON as the field StoreExportAssociation.export_metadata. After the export is complete, the method set_history_export_result_metadata() takes the same instance of ExportObjectMetadata, instantiates a ExportObjectResultMetadata Pydantic model, sets it as the result_data of the ExportObjectMetadata instance, and then saves the ExportObjectMetadata Pydantic model in the form of JSON to the database again.

After applying the set of changes, the call to ModelStoreManager.set_history_export_request_metadata() is delayed until the file has already been saved to the file source, as the actual URI that the file source assigns to the file is otherwise unknown.

The URI assigned by the file source overwrites the original target URI in the request. This involves a slight deviation from the previous behavior: if for example, power gets cut at the right time, StoreExportAssociation.export_metadata may not exist despite the history having been already saved to the file source, because database writes happen within the finally: block.

Moreover, overwriting the original target URI from the request is formally wrong, because the actual URI assigned by the file source should be part of the export result metadata, as it becomes known when the export completes. However, that implies modifying the other parts of the codebase that reference the URI from the request.

Despite the slight deviation in behavior and the formal incorrectness, rather than jumping straight into fixing these issues, I think it makes sense to leave the chance for discussion open, as doing things this way may still be an interesting tradeoff. Let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking...

After applying the set of changes, the call to ModelStoreManager.set_history_export_request_metadata() is delayed until the file has already been saved to the file source, as the actual URI that the file source assigns to the file is otherwise unknown.

If we merge the PR as it is, then we'd never see an export that is in progress in the list from the UI. If the file is large, that alone would justify attempting to fix

overwriting the original target URI from the request is formally wrong, because the actual URI assigned by the file source should be part of the export result metadata, as it becomes known when the export completes

right? I guess it makes sense to make an attempt.


def set_history_export_request_metadata(
self, request: Union[WriteHistoryTo, GenerateHistoryDownload]
Expand Down
153 changes: 60 additions & 93 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TYPE_CHECKING,
Union,
)
from urllib.parse import urlparse

from bdbag import bdbag_api as bdb
from boltons.iterutils import remap
Expand Down Expand Up @@ -2615,8 +2616,18 @@ class BcoExportOptions:
override_xref: Optional[List[XrefItem]] = None


class BcoModelExportStore(WorkflowInvocationOnlyExportStore):
def __init__(self, uri, export_options: BcoExportOptions, **kwds):
class FileSourceModelExportStore(abc.ABC, DirectoryModelExportStore):
"""
Export to file sources, from where data can be retrieved later on using a URI.
"""

file_source_uri: Optional[StrPath] = None
# data can be retrieved later using this URI

out_file: StrPath
# the output file is written to this path, from which it is written to the file source

def __init__(self, uri, **kwds):
temp_output_dir = tempfile.mkdtemp()
self.temp_output_dir = temp_output_dir
if "://" in str(uri):
Expand All @@ -2627,18 +2638,46 @@ def __init__(self, uri, export_options: BcoExportOptions, **kwds):
self.out_file = uri
self.file_source_uri = None
export_directory = temp_output_dir
self.export_options = export_options
super().__init__(export_directory, **kwds)

@abc.abstractmethod
def _generate_output_file(self) -> None:
"""
Generate the output file that will be uploaded to the file source.

Produce an output file and save it to `self.out_file`. A common pattern for this method to create an archive out
of `self.export_directory`.

This method runs after `DirectoryModelExportStore._finalize()`. Therefore, `self.export_directory` will already
have been populated when it runs.
"""

def _finalize(self):
super()._finalize()
core_biocompute_object, object_id = self._core_biocompute_object_and_object_id()
write_to_file(object_id, core_biocompute_object, self.out_file)
super()._finalize() # populate `self.export_directory`
self._generate_output_file() # generate the output file `self.out_file`
if self.file_source_uri:
# upload output file to file source
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_uri = urlparse(str(self.file_source_uri))
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(self.out_file)
file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
self.file_source_uri = f"{file_source_uri.scheme}://{file_source_uri.netloc}" + file_source.write_from(
file_source_path.path, self.out_file, user_context=self.user_context
)
shutil.rmtree(self.temp_output_dir)


class BcoModelExportStore(FileSourceModelExportStore, WorkflowInvocationOnlyExportStore):

def __init__(self, uri, export_options: BcoExportOptions, **kwds):
self.export_options = export_options
super().__init__(uri, **kwds)

def _generate_output_file(self):
core_biocompute_object, object_id = self._core_biocompute_object_and_object_id()
write_to_file(object_id, core_biocompute_object, self.out_file)

def _core_biocompute_object_and_object_id(self) -> Tuple[BioComputeObjectCore, str]:
assert self.app # need app.security to do anything...
Expand Down Expand Up @@ -2828,74 +2867,28 @@ def _finalize(self) -> None:
ro_crate.write(self.crate_directory)


class ROCrateArchiveModelExportStore(DirectoryModelExportStore, WriteCrates):
file_source_uri: Optional[StrPath]
out_file: StrPath

def __init__(self, uri: StrPath, **kwds) -> None:
temp_output_dir = tempfile.mkdtemp()
self.temp_output_dir = temp_output_dir
if "://" in str(uri):
self.out_file = os.path.join(temp_output_dir, "out")
self.file_source_uri = uri
export_directory = os.path.join(temp_output_dir, "export")
else:
self.out_file = uri
self.file_source_uri = None
export_directory = temp_output_dir
super().__init__(export_directory, **kwds)
class ROCrateArchiveModelExportStore(FileSourceModelExportStore, WriteCrates):

def _finalize(self) -> None:
super()._finalize()
def _generate_output_file(self):
ro_crate = self._init_crate()
ro_crate.write(self.export_directory)
out_file_name = str(self.out_file)
if out_file_name.endswith(".zip"):
out_file = out_file_name[: -len(".zip")]
else:
out_file = out_file_name
rval = shutil.make_archive(out_file, "fastzip", self.export_directory)
if not self.file_source_uri:
shutil.move(rval, self.out_file)
else:
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(rval), rval
file_source.write_from(file_source_path.path, rval, user_context=self.user_context)
shutil.rmtree(self.temp_output_dir)
archive = shutil.make_archive(out_file, "fastzip", self.export_directory)
shutil.move(archive, self.out_file)


class TarModelExportStore(DirectoryModelExportStore):
file_source_uri: Optional[StrPath]
out_file: StrPath
class TarModelExportStore(FileSourceModelExportStore):

def __init__(self, uri: StrPath, gzip: bool = True, **kwds) -> None:
self.gzip = gzip
temp_output_dir = tempfile.mkdtemp()
self.temp_output_dir = temp_output_dir
if "://" in str(uri):
self.out_file = os.path.join(temp_output_dir, "out")
self.file_source_uri = uri
export_directory = os.path.join(temp_output_dir, "export")
else:
self.out_file = uri
self.file_source_uri = None
export_directory = temp_output_dir
super().__init__(export_directory, **kwds)
super().__init__(uri, **kwds)

def _finalize(self) -> None:
super()._finalize()
def _generate_output_file(self):
tar_export_directory(self.export_directory, self.out_file, self.gzip)
if self.file_source_uri:
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(self.out_file)
file_source.write_from(file_source_path.path, self.out_file, user_context=self.user_context)
shutil.rmtree(self.temp_output_dir)


class BagDirectoryModelExportStore(DirectoryModelExportStore):
Expand All @@ -2908,37 +2901,16 @@ def _finalize(self) -> None:
bdb.make_bag(self.out_directory)


class BagArchiveModelExportStore(BagDirectoryModelExportStore):
file_source_uri: Optional[StrPath]
class BagArchiveModelExportStore(FileSourceModelExportStore, BagDirectoryModelExportStore):

def __init__(self, uri: StrPath, bag_archiver: str = "tgz", **kwds) -> None:
# bag_archiver in tgz, zip, tar
self.bag_archiver = bag_archiver
temp_output_dir = tempfile.mkdtemp()
self.temp_output_dir = temp_output_dir
if "://" in str(uri):
# self.out_file = os.path.join(temp_output_dir, "out")
self.file_source_uri = uri
export_directory = os.path.join(temp_output_dir, "export")
else:
self.out_file = uri
self.file_source_uri = None
export_directory = temp_output_dir
super().__init__(export_directory, **kwds)
super().__init__(uri, **kwds)

def _finalize(self) -> None:
super()._finalize()
rval = bdb.archive_bag(self.export_directory, self.bag_archiver)
if not self.file_source_uri:
shutil.move(rval, self.out_file)
else:
if not self.file_sources:
raise Exception(f"Need self.file_sources but {type(self)} is missing it: {self.file_sources}.")
file_source_path = self.file_sources.get_file_source_path(self.file_source_uri)
file_source = file_source_path.file_source
assert os.path.exists(rval)
file_source.write_from(file_source_path.path, rval, user_context=self.user_context)
shutil.rmtree(self.temp_output_dir)
def _generate_output_file(self):
archive = bdb.archive_bag(self.export_directory, self.bag_archiver)
shutil.move(archive, self.out_file)


def get_export_store_factory(
Expand All @@ -2947,13 +2919,8 @@ def get_export_store_factory(
export_files=None,
bco_export_options: Optional[BcoExportOptions] = None,
user_context=None,
) -> Callable[[StrPath], ModelExportStore]:
export_store_class: Union[
Type[TarModelExportStore],
Type[BagArchiveModelExportStore],
Type[ROCrateArchiveModelExportStore],
Type[BcoModelExportStore],
]
) -> Callable[[StrPath], FileSourceModelExportStore]:
export_store_class: Type[FileSourceModelExportStore]
export_store_class_kwds = {
"app": app,
"export_files": export_files,
Expand Down
8 changes: 5 additions & 3 deletions lib/galaxy/tools/imp_exp/export_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@ def main(argv=None):
# Create archive.
exit = create_archive(temp_directory, out_file, gzip=gzip)
if destination_uri is not None and exit == 0:
_write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri)
actual_uri = _write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri)
if destination_uri != actual_uri:
print(f"Saved history archive to {actual_uri}.")
return exit


def _write_to_destination(file_sources_path: str, out_file: str, destination_uri: str):
def _write_to_destination(file_sources_path: str, out_file: str, destination_uri: str) -> str:
file_sources = get_file_sources(file_sources_path)
file_source_path = file_sources.get_file_source_path(destination_uri)
file_source = file_source_path.file_source
assert os.path.exists(out_file)
file_source.write_from(file_source_path.path, out_file)
return file_source.write_from(file_source_path.path, out_file)


def get_file_sources(file_sources_path: str) -> ConfiguredFileSources:
Expand Down
Loading
Loading