Skip to content

Commit

Permalink
Support listing file sources asynchronously
Browse files Browse the repository at this point in the history
The method `list()` from `galaxy.files.sources.BaseFilesSource` lists the directories and files within a file source. An optional keyword argument `recursive` (`False` by default) lets it recursively retrieve directories and files within a specific directory.

This operation is very cheap in terms of CPU and expensive in IO terms, be it network or filesystem IO. Depending on how the underlying system is built, it may support retrieving directories and files recursively or not. If it does not, then every time a directory is listed, it is necessary to make another request to list each subdirectory. This may end up involving hundreds of requests. Done sequentially, this can be extremely slow, especially if each one involves network access.

This commit makes the `list()` method asynchronous, which enables Galaxy to wait for the underlying system to complete the requests concurrently, resulting in a massive speedup. The price to pay is the extra complexity of using the async primitives.

Since this change implies that all functions in the chain up to the API endpoints and the test functions must also be made asynchronous, this commit also takes care of it.
  • Loading branch information
kysrpex committed Dec 5, 2024
1 parent f12646d commit 668437a
Show file tree
Hide file tree
Showing 23 changed files with 207 additions and 144 deletions.
8 changes: 4 additions & 4 deletions lib/galaxy/files/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def get_uri_root(self) -> str:
"""Return a prefix for the root (e.g. gxfiles://prefix/)."""

@abc.abstractmethod
def list(
async def list(
self,
path="/",
recursive=False,
Expand Down Expand Up @@ -443,7 +443,7 @@ def _serialization_props(self, user_context: "OptionalUserContext" = None) -> Fi
Used in to_dict method if for_serialization is True.
"""

def list(
async def list(
self,
path="/",
recursive=False,
Expand All @@ -467,9 +467,9 @@ def list(
if offset is not None and offset < 0:
raise RequestParameterInvalidException("Offset must be greater than or equal to 0.")

return self._list(path, recursive, user_context, opts, limit, offset, query)
return await self._list(path, recursive, user_context, opts, limit, offset, query)

def _list(
async def _list(
self,
path="/",
recursive=False,
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/_pyfilesystem2.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, **kwd: Unpack[FilesSourceProperties]):
def _open_fs(self, user_context: OptionalUserContext = None, opts: Optional[FilesSourceOptions] = None) -> FS:
"""Subclasses must instantiate a PyFilesystem2 handle for this file system."""

def _list(
async def _list(
self,
path="/",
recursive=False,
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/invenio.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def to_relative_path(self, url: str) -> str:
def get_repository_interactor(self, repository_url: str) -> RDMRepositoryInteractor:
return InvenioRepositoryInteractor(repository_url, self)

def _list(
async def _list(
self,
path="/",
recursive=True,
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, **kwd: Unpack[PosixFilesSourceProperties]):
def prefer_links(self) -> bool:
return self._prefer_links

def _list(
async def _list(
self,
path="/",
recursive=True,
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/files/sources/s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self, **kwd: Unpack[S3FsFilesSourceProperties]):
if self._endpoint_url:
self._props.update({"client_kwargs": {"endpoint_url": self._endpoint_url}})

def _list(
async def _list(
self,
path="/",
recursive=True,
Expand Down
34 changes: 17 additions & 17 deletions lib/galaxy/managers/file_source_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,45 +334,45 @@ def create_instance(self, trans: ProvidesUserContext, payload: CreateInstancePay
self._save(persisted_file_source)
return self._to_model(trans, persisted_file_source)

def test_modify_instance(
async def test_modify_instance(
self, trans: ProvidesUserContext, id: UUID4, payload: TestModifyInstancePayload
) -> PluginStatus:
persisted_file_source = self._get(trans, id)
if isinstance(payload, TestUpgradeInstancePayload):
return self._plugin_status_for_upgrade(trans, payload, persisted_file_source)
return await self._plugin_status_for_upgrade(trans, payload, persisted_file_source)
else:
assert isinstance(payload, TestUpdateInstancePayload)
return self._plugin_status_for_update(trans, payload, persisted_file_source)
return await self._plugin_status_for_update(trans, payload, persisted_file_source)

def _plugin_status_for_update(
async def _plugin_status_for_update(
self, trans: ProvidesUserContext, payload: TestUpdateInstancePayload, persisted_file_source: UserFileSource
) -> PluginStatus:
template = self._get_template(persisted_file_source)
target = UpdateTestTarget(persisted_file_source, payload)
return self._plugin_status_for_template(trans, target, template)
return await self._plugin_status_for_template(trans, target, template)

def _plugin_status_for_upgrade(
async def _plugin_status_for_upgrade(
self, trans: ProvidesUserContext, payload: TestUpgradeInstancePayload, persisted_file_source: UserFileSource
) -> PluginStatus:
template = self._get_and_validate_target_upgrade_template(persisted_file_source, payload)
target = UpgradeTestTarget(persisted_file_source, payload)
return self._plugin_status_for_template(trans, target, template)
return await self._plugin_status_for_template(trans, target, template)

def plugin_status_for_instance(self, trans: ProvidesUserContext, id: UUID4):
async def plugin_status_for_instance(self, trans: ProvidesUserContext, id: UUID4):
persisted_file_source = self._get(trans, id)
return self._plugin_status(trans, persisted_file_source, to_template_reference(persisted_file_source))
return await self._plugin_status(trans, persisted_file_source, to_template_reference(persisted_file_source))

def plugin_status(self, trans: ProvidesUserContext, payload: CreateInstancePayload) -> PluginStatus:
async def plugin_status(self, trans: ProvidesUserContext, payload: CreateInstancePayload) -> PluginStatus:
target = CreateTestTarget(payload, UserFileSource)
return self._plugin_status(trans, target, payload)
return await self._plugin_status(trans, target, payload)

def _plugin_status(
async def _plugin_status(
self, trans: ProvidesUserContext, target: CanTestPluginStatus, template_reference: TemplateReference
):
template = self._catalog.find_template(template_reference)
return self._plugin_status_for_template(trans, target, template)
return await self._plugin_status_for_template(trans, target, template)

def _plugin_status_for_template(
async def _plugin_status_for_template(
self, trans: ProvidesUserContext, payload: CanTestPluginStatus, template: FileSourceTemplate
):
template_definition_status = status_template_definition(template)
Expand All @@ -396,7 +396,7 @@ def _plugin_status_for_template(
if template_settings_status.is_not_ok:
return PluginStatus(**status_kwds)
assert configuration
file_source, connection_status = self._connection_status(trans, payload, configuration)
file_source, connection_status = await self._connection_status(trans, payload, configuration)
status_kwds["connection"] = connection_status
if connection_status.is_not_ok:
return PluginStatus(**status_kwds)
Expand Down Expand Up @@ -443,7 +443,7 @@ def _template_settings_status(
exception = e
return configuration, settings_exception_to_status(exception)

def _connection_status(
async def _connection_status(
self, trans: ProvidesUserContext, target: CanTestPluginStatus, configuration: FileSourceConfiguration
) -> Tuple[Optional[BaseFilesSource], PluginAspectStatus]:
file_source = None
Expand Down Expand Up @@ -471,7 +471,7 @@ def _connection_status(
# a connection problem if we cannot
browsable_file_source = cast(SupportsBrowsing, file_source)
user_context = ProvidesFileSourcesUserContext(trans)
browsable_file_source.list("/", recursive=False, user_context=user_context)
await browsable_file_source.list("/", recursive=False, user_context=user_context)
except Exception as e:
exception = e
return file_source, connection_exception_to_status("file source", exception)
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/managers/remote_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RemoteFilesManager:
def __init__(self, app: MinimalManagerApp):
self._app = app

def index(
async def index(
self,
user_ctx: ProvidesUserContext,
target: str,
Expand Down Expand Up @@ -93,7 +93,7 @@ def index(
opts = FilesSourceOptions()
opts.writeable = writeable or False
try:
index, count = file_source.list(
index, count = await file_source.list(
file_source_path.path,
recursive=recursive,
user_context=user_file_source_context,
Expand Down
23 changes: 23 additions & 0 deletions lib/galaxy/util/unittest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import inspect

import pytest


Expand Down Expand Up @@ -43,3 +45,24 @@ def assertRaises(self, exception):

def assertRaisesRegex(self, exception, regex):
return pytest.raises(exception, match=regex)


class MarkAsyncMeta(type):
"""
Metaclass that marks all asynchronous methods of a class as async tests.
Methods that are not recognized by pytest as tests will simply be ignored, despite having been marked as async
tests.
"""

def __new__(cls, name, bases, dict_):
for attribute_name, attribute_value in dict_.items():
if inspect.iscoroutinefunction(attribute_value):
dict_[attribute_name] = pytest.mark.asyncio(attribute_value)
return super().__new__(cls, name, bases, dict_)


class IsolatedAsyncioTestCase(TestCase, metaclass=MarkAsyncMeta):
"""
Partial re-implementation of standard library `unittest.IsolatedAsyncioTestCase` using pytest methods.
"""
12 changes: 6 additions & 6 deletions lib/galaxy/webapps/galaxy/api/file_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ def create(
summary="Test payload for creating user-bound file source.",
operation_id="file_sources__test_new_instance_configuration",
)
def test_instance_configuration(
async def test_instance_configuration(
self,
trans: ProvidesUserContext = DependsOnTrans,
payload: CreateInstancePayload = Body(...),
) -> PluginStatus:
return self.file_source_instances_manager.plugin_status(trans, payload)
return await self.file_source_instances_manager.plugin_status(trans, payload)

@router.get(
"/api/file_source_instances",
Expand Down Expand Up @@ -131,12 +131,12 @@ def instances_show(
summary="Test a file source instance and return status.",
operation_id="file_sources__instances_test_instance",
)
def instance_test(
async def instance_test(
self,
trans: ProvidesUserContext = DependsOnTrans,
uuid: UUID4 = UserFileSourceIdPathParam,
) -> PluginStatus:
return self.file_source_instances_manager.plugin_status_for_instance(trans, uuid)
return await self.file_source_instances_manager.plugin_status_for_instance(trans, uuid)

@router.put(
"/api/file_source_instances/{uuid}",
Expand All @@ -156,13 +156,13 @@ def update_instance(
summary="Test updating or upgrading user file source instance.",
operation_id="file_sources__test_instances_update",
)
def test_update_instance(
async def test_update_instance(
self,
trans: ProvidesUserContext = DependsOnTrans,
uuid: UUID4 = UserFileSourceIdPathParam,
payload: TestModifyInstancePayload = Body(...),
) -> PluginStatus:
return self.file_source_instances_manager.test_modify_instance(trans, uuid, payload)
return await self.file_source_instances_manager.test_modify_instance(trans, uuid, payload)

@router.delete(
"/api/file_source_instances/{uuid}",
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/webapps/galaxy/api/remote_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class FastAPIRemoteFiles:
deprecated=True,
summary="Displays remote files available to the user. Please use /api/remote_files instead.",
)
def index(
async def index(
self,
response: Response,
user_ctx: ProvidesUserContext = DependsOnTrans,
Expand All @@ -146,7 +146,7 @@ def index(
The total count of files and directories is returned in the 'total_matches' header.
"""
result, count = self.manager.index(
result, count = await self.manager.index(
user_ctx, target, format, recursive, disable, writeable, limit, offset, query, sort_by
)
response.headers["total_matches"] = str(count)
Expand Down
1 change: 1 addition & 0 deletions packages/app/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ mock-ssh-server
pkce
pykwalify
pytest
pytest-asyncio
testfixtures
1 change: 1 addition & 0 deletions packages/files/test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pytest
pytest-asyncio
fs-gcsfs
s3fs>=2023.1.0,<2024
15 changes: 14 additions & 1 deletion test/unit/app/managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@

from galaxy.app_unittest_utils import galaxy_mock
from galaxy.managers.users import UserManager
from galaxy.util.unittest import TestCase
from galaxy.util.unittest import (
IsolatedAsyncioTestCase,
TestCase,
)
from galaxy.work.context import SessionRequestContext

__all__ = ("BaseIsolatedAsyncioTestCase", "BaseTestCase", "CreatesCollectionsMixin")

# =============================================================================
admin_email = "[email protected]"
admin_users = admin_email
Expand Down Expand Up @@ -104,6 +109,14 @@ def assertIsJsonifyable(self, item):
assert isinstance(json.dumps(item), str)


class BaseIsolatedAsyncioTestCase(BaseTestCase, IsolatedAsyncioTestCase):
"""
Asynchronous version of `BaseTestCase`.
Can run sync tests too.
"""


class CreatesCollectionsMixin:
trans: SessionRequestContext

Expand Down
Loading

0 comments on commit 668437a

Please sign in to comment.