diff --git a/invenio_records_resources/config.py b/invenio_records_resources/config.py index 1cadb303..2e739357 100644 --- a/invenio_records_resources/config.py +++ b/invenio_records_resources/config.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2022 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -24,3 +25,16 @@ RECORDS_RESOURCES_ALLOW_EMPTY_FILES = True """Allow empty files to be uploaded.""" + +RECORDS_RESOURCES_TRANSFERS = [ + "invenio_records_resources.services.files.transfer.LocalTransfer", + "invenio_records_resources.services.files.transfer.FetchTransfer", + "invenio_records_resources.services.files.transfer.RemoteTransfer", + "invenio_records_resources.services.files.transfer.MultipartTransfer", +] +"""List of transfer classes to register.""" + + +RECORDS_RESOURCES_DEFAULT_TRANSFER_TYPE = "L" +"""Default transfer class to use. +One of 'L' (local), 'F' (fetch), 'R' (point to remote), 'M' (multipart).""" diff --git a/invenio_records_resources/ext.py b/invenio_records_resources/ext.py index 1ecb539f..6c53aea1 100644 --- a/invenio_records_resources/ext.py +++ b/invenio_records_resources/ext.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2020-2022 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -8,6 +9,10 @@ """Invenio Records Resources module to create REST APIs.""" +from functools import cached_property + +from invenio_base.utils import obj_or_import_string + from . import config from .registry import NotificationRegistry, ServiceRegistry @@ -22,11 +27,25 @@ def __init__(self, app=None): def init_app(self, app): """Flask application initialization.""" + self.app = app self.init_config(app) self.registry = ServiceRegistry() self.notification_registry = NotificationRegistry() app.extensions["invenio-records-resources"] = self + @cached_property + def transfer_registry(self): + """Return the transfer registry.""" + # imported here to prevent circular imports + from .services.files.transfer.registry import TransferRegistry + + registry = TransferRegistry( + self.app.config["RECORDS_RESOURCES_DEFAULT_TRANSFER_TYPE"] + ) + for transfer_cls in self.app.config["RECORDS_RESOURCES_TRANSFERS"]: + registry.register(obj_or_import_string(transfer_cls)) + return registry + def init_config(self, app): """Initialize configuration.""" for k in dir(config): diff --git a/invenio_records_resources/factories/factory.py b/invenio_records_resources/factories/factory.py index ae9932d5..6a0270e3 100644 --- a/invenio_records_resources/factories/factory.py +++ b/invenio_records_resources/factories/factory.py @@ -9,6 +9,7 @@ # details. """Record type factory.""" + from invenio_db import db from invenio_pidstore.providers.recordid_v2 import RecordIdProviderV2 from invenio_records.dumpers import SearchDumper diff --git a/invenio_records_resources/proxies.py b/invenio_records_resources/proxies.py index 64e0a90d..a8ffb2f9 100644 --- a/invenio_records_resources/proxies.py +++ b/invenio_records_resources/proxies.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2021-2022 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -21,3 +22,7 @@ lambda: current_app.extensions["invenio-records-resources"].notification_registry ) """Helper proxy to get the current notifications registry.""" + +current_transfer_registry = LocalProxy( + lambda: current_app.extensions["invenio-records-resources"].transfer_registry +) diff --git a/invenio_records_resources/records/api.py b/invenio_records_resources/records/api.py index db448a0d..33d14cf1 100644 --- a/invenio_records_resources/records/api.py +++ b/invenio_records_resources/records/api.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2024 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -20,6 +21,8 @@ from invenio_records.systemfields import DictField, SystemField, SystemFieldsMixin from invenio_records.systemfields.model import ModelField +from .transfer import TransferField + class Record(RecordBase, SystemFieldsMixin): """Base class for record APIs. @@ -224,6 +227,8 @@ def remove_all(cls, record_id): record_id = ModelField() _record = ModelField("record", dump=False) + transfer = TransferField() + def __repr__( self, ): diff --git a/invenio_records_resources/records/resolver.py b/invenio_records_resources/records/resolver.py index 975aed05..dd13e13d 100644 --- a/invenio_records_resources/records/resolver.py +++ b/invenio_records_resources/records/resolver.py @@ -11,7 +11,6 @@ import uuid from invenio_db import db -from invenio_pidstore.errors import PIDDeletedError from invenio_pidstore.models import PersistentIdentifier, PIDStatus diff --git a/invenio_records_resources/records/systemfields/entity_reference.py b/invenio_records_resources/records/systemfields/entity_reference.py index 26ebdfec..3d8dc3e5 100644 --- a/invenio_records_resources/records/systemfields/entity_reference.py +++ b/invenio_records_resources/records/systemfields/entity_reference.py @@ -8,7 +8,6 @@ """Systemfield for managing referenced entities in request.""" -from functools import partial from invenio_records.systemfields import SystemField diff --git a/invenio_records_resources/records/systemfields/files/manager.py b/invenio_records_resources/records/systemfields/files/manager.py index cfee538e..52cf9846 100644 --- a/invenio_records_resources/records/systemfields/files/manager.py +++ b/invenio_records_resources/records/systemfields/files/manager.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2024 CERN. # Copyright (C) 2020-2021 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -153,7 +154,16 @@ def unlock(self): # TODO: "create" and "update" should be merged somehow... @ensure_enabled - def create(self, key, obj=None, stream=None, data=None, **kwargs): + def create( + self, + key, + *, + obj=None, + stream=None, + data=None, + transfer=None, + **kwargs, + ): """Create/initialize a file.""" assert not (obj and stream) @@ -172,6 +182,8 @@ def create(self, key, obj=None, stream=None, data=None, **kwargs): rf.object_version = obj if data: rf.update(data) + if transfer: + rf.transfer = transfer rf.commit() self._entries[key] = rf return rf @@ -499,7 +511,7 @@ def _parse_set_value(self, value): stream = obj_or_stream else: raise InvalidOperationError( - description=f"Item has to be ObjectVersion or " "file-like object" + description="Item has to be ObjectVersion or " "file-like object" ) return obj, stream, data diff --git a/invenio_records_resources/records/transfer.py b/invenio_records_resources/records/transfer.py new file mode 100644 index 00000000..c59efa3f --- /dev/null +++ b/invenio_records_resources/records/transfer.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2020-2023 CERN. +# Copyright (C) 2020-2021 Northwestern University. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Transfer-related system fields.""" + +# +# Implementation Note: +# +# This module cannot be placed under `systemfields/files` because `systemfields/files` +# imports several classes from outside the `records` module (e.g., `FilesAttrConfig` +# and `PartialFileDumper`). In turn, those classes import `records.api`, creating a +# circular import. +# +# Furthermore, we need `TransferField` defined directly on `FileRecord`. We cannot +# delegate this to the user (as is done with `FilesField`) because if a target +# repository has not declared the `transfer` field on its own `FileRecord`, file +# uploads would fail. Therefore, `TransferField` must be defined here. +# +# TODO: A cleaner solution would be to refactor `systemfields/files` so that it does +# not introduce dependencies outside the `records` module. +# + +from collections.abc import Mapping + +from invenio_records.systemfields import SystemField + + +class TransferFieldData(Mapping): + """TransferType field data.""" + + def __init__(self, field): + """Initialize the field.""" + self._field = field + + @property + def transfer_type(self): + """Get the transfer type.""" + return self._field.get("type", None) + + @transfer_type.setter + def transfer_type(self, value): + """Set the transfer type.""" + self._field["type"] = value + + def get(self, key, default=None): + """Get the value from the transfer metadata.""" + return self._field.get(key, default) + + def set(self, values): + """Set values of transfer metadata, keeping the transfer type.""" + transfer_type = self.transfer_type + self._field.clear() + self._field.update(values) + self.transfer_type = transfer_type + + def __iter__(self): + """Iterate over the transfer metadata.""" + return iter(self._field) + + def __len__(self): + """Length of the transfer metadata.""" + return len(self._field) + + def __getitem__(self, key): + """Get a value from the transfer metadata.""" + return self._field[key] + + def __setitem__(self, key, value): + """Set a value in the transfer metadata.""" + self._field[key] = value + + +class TransferField(SystemField): + """TransferType field. + + Gets/sets the transfer type of the file record. + """ + + def __get__(self, record, owner=None): + """Getting the attribute value.""" + if record is None: + return self + ret = self.get_dictkey(record) + if ret is None: + ret = {} + self.set_dictkey(record, ret) + + return TransferFieldData(ret) + + def __set__(self, record, value): + """Setting a new value.""" + self.set_dictkey(record, value) diff --git a/invenio_records_resources/resources/files/config.py b/invenio_records_resources/resources/files/config.py index 33daf98f..0de68f64 100644 --- a/invenio_records_resources/resources/files/config.py +++ b/invenio_records_resources/resources/files/config.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -9,7 +10,9 @@ """File resource configuration.""" -from flask_resources import ResourceConfig +from flask_resources import HTTPJSONException, ResourceConfig, create_error_handler + +from invenio_records_resources.services.errors import TransferException class FileResourceConfig(ResourceConfig): @@ -24,6 +27,16 @@ class FileResourceConfig(ResourceConfig): "list": "/files", "item": "/files/", "item-content": "/files//content", + "item-multipart-content": "/files//content/", "item-commit": "/files//commit", "list-archive": "/files-archive", } + error_handlers = { + **ResourceConfig.error_handlers, + TransferException: create_error_handler( + lambda e: HTTPJSONException( + code=400, + description=str(e), + ) + ), + } diff --git a/invenio_records_resources/resources/files/resource.py b/invenio_records_resources/resources/files/resource.py index b2c6b1f7..a84579db 100644 --- a/invenio_records_resources/resources/files/resource.py +++ b/invenio_records_resources/resources/files/resource.py @@ -3,6 +3,7 @@ # Copyright (C) 2020 CERN. # Copyright (C) 2020 Northwestern University. # Copyright (C) 2023 TU Wien. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -13,7 +14,7 @@ from contextlib import ExitStack import marshmallow as ma -from flask import Response, abort, current_app, g, stream_with_context +from flask import Response, current_app, g, stream_with_context from flask_resources import ( JSONDeserializer, RequestBodyParser, @@ -27,8 +28,6 @@ from invenio_stats.proxies import current_stats from zipstream import ZIP_STORED, ZipStream -from invenio_records_resources.services.errors import FailedFileUploadException - from ..errors import ErrorHandlersMixin from .parser import RequestStreamParser @@ -50,6 +49,15 @@ default_content_type="application/octet-stream", ) +request_multipart_args = request_parser( + { + "pid_value": ma.fields.Str(required=True), + "key": ma.fields.Str(), + "part": ma.fields.Int(), + }, + location="view_args", +) + # # Resource @@ -83,6 +91,11 @@ def create_url_rules(self): route("DELETE", routes["item"], self.delete), route("POST", routes["item-commit"], self.create_commit), route("PUT", routes["item-content"], self.update_content), + route( + "PUT", + routes["item-multipart-content"], + self.upload_multipart_content, + ), ] return url_rules @@ -181,7 +194,7 @@ def read_content(self): if obj is not None and emitter is not None: emitter(current_app, record=item._record, obj=obj, via_api=True) - return item.send_file(), 200 + return item.send_file() @request_view_args def read_archive(self): @@ -229,10 +242,20 @@ def update_content(self): content_length=resource_requestctx.data["request_content_length"], ) - # if errors are set then there was a `TransferException` raised - if item.to_dict().get("errors"): - raise FailedFileUploadException( - file_key=item.file_id, recid=item.id, file=item.to_dict() - ) + return item.to_dict(), 200 + + @request_multipart_args + @request_stream + @response_handler() + def upload_multipart_content(self): + """Upload multipart file content.""" + item = self.service.set_multipart_file_content( + g.identity, + resource_requestctx.view_args["pid_value"], + resource_requestctx.view_args["key"], + resource_requestctx.view_args["part"], + resource_requestctx.data["request_stream"], + content_length=resource_requestctx.data["request_content_length"], + ) return item.to_dict(), 200 diff --git a/invenio_records_resources/services/files/components/__init__.py b/invenio_records_resources/services/files/components/__init__.py index 2c08f31f..fde7c7a2 100644 --- a/invenio_records_resources/services/files/components/__init__.py +++ b/invenio_records_resources/services/files/components/__init__.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2021 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -11,6 +12,7 @@ from .base import FileServiceComponent from .content import FileContentComponent from .metadata import FileMetadataComponent +from .multipart import FileMultipartContentComponent from .processor import FileProcessorComponent __all__ = ( @@ -18,4 +20,5 @@ "FileMetadataComponent", "FileProcessorComponent", "FileServiceComponent", + "FileMultipartContentComponent", ) diff --git a/invenio_records_resources/services/files/components/base.py b/invenio_records_resources/services/files/components/base.py index 09df369f..ac35ed5c 100644 --- a/invenio_records_resources/services/files/components/base.py +++ b/invenio_records_resources/services/files/components/base.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2021 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -51,4 +52,13 @@ def set_file_content(self, identity, id_, file_key, stream, content_length, reco def get_file_content(self, identity, id_, file_key, record): """Get file content handler.""" - pass + + def get_file_transfer_metadata( + self, identity, id, file_key, record, transfer_metadata + ): + """Get file transfer metadata handler.""" + + def update_file_transfer_metadata( + self, identity, id, file_key, record, transfer_metadata + ): + """Update file transfer metadata handler.""" diff --git a/invenio_records_resources/services/files/components/content.py b/invenio_records_resources/services/files/components/content.py index fa27f357..1dfff1de 100644 --- a/invenio_records_resources/services/files/components/content.py +++ b/invenio_records_resources/services/files/components/content.py @@ -1,14 +1,16 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2021 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more # details. """Files service components.""" + +from ....proxies import current_transfer_registry from ...errors import FailedFileUploadException, TransferException -from ..transfer import Transfer from .base import FileServiceComponent @@ -23,13 +25,15 @@ def set_file_content(self, identity, id, file_key, stream, content_length, recor if file_record is None: raise Exception(f'File with key "{file_key}" has not been initialized yet.') - file_type = file_record.file.storage_class if file_record.file else None - transfer = Transfer.get_transfer(file_type) + transfer = current_transfer_registry.get_transfer( + record=record, + file_record=file_record, + file_service=self.service, + uow=self.uow, + ) try: - transfer.set_file_content( - record, file_record.file, file_key, stream, content_length - ) - except TransferException as e: + transfer.set_file_content(stream, content_length) + except TransferException: failed = record.files.delete(file_key, softdelete_obj=False, remove_rf=True) raise FailedFileUploadException( file_key=file_key, recid=record.pid, file=failed @@ -39,3 +43,13 @@ def get_file_content(self, identity, id, file_key, record): """Get file content handler.""" # TODO Signal here or in resource? # file_downloaded.send(file_obj) + + def delete_file(self, identity, id_, file_key, record, deleted_file): + """Delete file handler.""" + transfer = current_transfer_registry.get_transfer( + record=record, + file_record=deleted_file, + file_service=self.service, + uow=self.uow, + ) + transfer.delete_file() diff --git a/invenio_records_resources/services/files/components/metadata.py b/invenio_records_resources/services/files/components/metadata.py index 5ef70659..34ebfb23 100644 --- a/invenio_records_resources/services/files/components/metadata.py +++ b/invenio_records_resources/services/files/components/metadata.py @@ -1,32 +1,32 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more # details. - """Files metadata component components.""" -from copy import deepcopy +from flask import current_app +from flask_babel import gettext as _ +from invenio_files_rest.errors import FileSizeError +from ....proxies import current_transfer_registry from ...errors import FilesCountExceededException -from ..transfer import Transfer +from ...uow import RecordCommitOp from .base import FileServiceComponent class FileMetadataComponent(FileServiceComponent): """File metadata service component.""" - def init_files(self, identity, id, record, data): + def init_files(self, identity, id_, record, data): """Init files handler.""" - schema = self.service.file_schema.schema(many=True) - validated_data = schema.load(data) - # All brand-new drafts don't allow exceeding files limit (while added via rest API). # Old records that already had more files than limited can continue adding files. # In case files amount goes back to under limit, users lose the privilege of adding more files. - resulting_files_count = record.files.count + len(validated_data) + resulting_files_count = record.files.count + len(data) maxFiles = self.service.config.max_files_count if maxFiles and record.files.count <= maxFiles: @@ -35,26 +35,49 @@ def init_files(self, identity, id, record, data): max_files=maxFiles, resulting_files_count=resulting_files_count ) - for file_data in validated_data: - copy_fdata = deepcopy(file_data) - file_type = copy_fdata.pop("storage_class", None) - transfer = Transfer.get_transfer( - file_type, service=self.service, uow=self.uow + for file_metadata in data: + transfer = current_transfer_registry.get_transfer( + record=record, + file_service=self.service, + key=file_metadata["key"], + transfer_type=file_metadata["transfer"]["type"], + uow=self.uow, ) - _ = transfer.init_file(record, copy_fdata) - def update_file_metadata(self, identity, id, file_key, record, data): + _ = transfer.init_file(record, file_metadata) + + def update_file_metadata(self, identity, id_, file_key, record, data): """Update file metadata handler.""" - # FIXME: move this call to a transfer call schema = self.service.file_schema.schema(many=False) - - # 'key' is required in the schema, but might not be in the data - if "key" not in data: - data["key"] = file_key validated_data = schema.load(data) record.files.update(file_key, data=validated_data) - # TODO: `commit_file` might vary based on your storage backend (e.g. S3) - def commit_file(self, identity, id, file_key, record): + def update_transfer_metadata( + self, identity, id, file_key, record, transfer_metadata + ): + """Update file transfer metadata handler.""" + file = record.files[file_key] + + file.transfer.set(transfer_metadata) + self.uow.register(RecordCommitOp(file)) + + def commit_file(self, identity, id_, file_key, record): """Commit file handler.""" - Transfer.commit_file(record, file_key) + transfer = current_transfer_registry.get_transfer( + record=record, + file_record=record.files.get(file_key), + file_service=self.service, + uow=self.uow, + ) + + transfer.commit_file() + + f_obj = record.files.get(file_key) + f_inst = getattr(f_obj, "file", None) + file_size = getattr(f_inst, "size", None) + if file_size == 0: + allow_empty_files = current_app.config.get( + "RECORDS_RESOURCES_ALLOW_EMPTY_FILES", True + ) + if not allow_empty_files: + raise FileSizeError(description=_("Empty files are not accepted.")) diff --git a/invenio_records_resources/services/files/components/multipart.py b/invenio_records_resources/services/files/components/multipart.py new file mode 100644 index 00000000..95f266d0 --- /dev/null +++ b/invenio_records_resources/services/files/components/multipart.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Files service components.""" + +from werkzeug.exceptions import NotFound + +from invenio_records_resources.proxies import current_transfer_registry +from invenio_records_resources.services.files.transfer.providers.multipart import ( + MultipartTransfer, +) + +from ...errors import FailedFileUploadException, TransferException +from .base import FileServiceComponent + + +class FileMultipartContentComponent(FileServiceComponent): + """File metadata service component.""" + + def set_multipart_file_content( + self, identity, id, file_key, part, stream, content_length, record + ): + """Set file content handler.""" + # Check if associated file record exists and is not already committed. + file_record = record.files.get(file_key) + + if file_record is None: + raise NotFound(f'File with key "{file_key}" has not been initialized yet.') + + transfer = current_transfer_registry.get_transfer( + record=record, + file_record=file_record, + file_service=self.service, + uow=self.uow, + ) + if not isinstance(transfer, MultipartTransfer): + raise TransferException( + f'Transfer type "{transfer.transfer_type}" does not support multipart uploads.' + ) + + try: + transfer.set_file_multipart_content(part, stream, content_length) + except TransferException: + raise FailedFileUploadException( + file_key=file_key, recid=record.pid, file=file_record + ) diff --git a/invenio_records_resources/services/files/config.py b/invenio_records_resources/services/files/config.py index 8b665618..2437f9db 100644 --- a/invenio_records_resources/services/files/config.py +++ b/invenio_records_resources/services/files/config.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2022 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -14,6 +15,7 @@ from .components import ( FileContentComponent, FileMetadataComponent, + FileMultipartContentComponent, FileProcessorComponent, ) from .links import FileLink @@ -53,6 +55,7 @@ class FileServiceConfig(ServiceConfig): components = [ FileMetadataComponent, FileContentComponent, + FileMultipartContentComponent, FileProcessorComponent, ] diff --git a/invenio_records_resources/services/files/generators.py b/invenio_records_resources/services/files/generators.py index 790dfe25..a4977377 100644 --- a/invenio_records_resources/services/files/generators.py +++ b/invenio_records_resources/services/files/generators.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2022 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -8,40 +9,50 @@ """File permissions generators.""" - -from invenio_access.permissions import any_user, system_process -from invenio_records_permissions.generators import Generator -from invenio_search.engine import dsl - -from .transfer import TransferType - - -class AnyUserIfFileIsLocal(Generator): - """Allows any user.""" - - def needs(self, **kwargs): - """Enabling Needs.""" +from invenio_records_permissions.generators import ConditionalGenerator + + +class IfTransferType(ConditionalGenerator): + """Conditional generator that checks the transfer type of a file.""" + + def __init__( + self, + transfer_type: str, + then_, + else_=None, + ): + """Initializes the generator.""" + + def to_list(value): + if not value: + return [] + return value if isinstance(value, (list, tuple)) else [value] + + super().__init__(to_list(then_), to_list(else_)) + self._transfer_type = transfer_type + + def _condition(self, **kwargs): + """Check if the transfer type of the file is the expected one.""" + # initiating an upload - check if the transfer type is correct + # in the file metadata + file_metadata = kwargs.get("file_metadata") + if file_metadata is not None: + return ( + file_metadata.get("transfer", {}).get("type", None) + == self._transfer_type + ) + + # already uploaded and checking access - check if the transfer type is + # correct in the file record record = kwargs["record"] file_key = kwargs.get("file_key") - is_file_local = True - if file_key: - file_record = record.files.get(file_key) - # file_record __bool__ returns false for `if file_record` - file = file_record.file if file_record is not None else None - is_file_local = not file or file.storage_class == TransferType.LOCAL - else: - file_records = record.files.entries - for file_record in file_records: - file = file_record.file - if file and file.storage_class != TransferType.LOCAL: - is_file_local = False - break - - if is_file_local: - return [any_user] - else: - return [system_process] - - def query_filter(self, **kwargs): - """Match all in search.""" - return dsl.Q("match_all") + if not file_key: + return False + file_record = record.files.get(file_key) + if file_record is None: + return False + + transfer_type = file_record.transfer.transfer_type + assert transfer_type is not None, "Transfer type not set on file record" + + return transfer_type == self._transfer_type diff --git a/invenio_records_resources/services/files/results.py b/invenio_records_resources/services/files/results.py index c9edcea7..4956b43c 100644 --- a/invenio_records_resources/services/files/results.py +++ b/invenio_records_resources/services/files/results.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -9,6 +10,7 @@ """File service results.""" +from ...proxies import current_transfer_registry from ..base import ServiceListResult from ..records.results import RecordItem @@ -41,13 +43,24 @@ def _obj(self): @property def links(self): """Get links for this result item.""" - return self._links_tpl.expand(self._identity, self._file) + _links = self._links_tpl.expand(self._identity, self._file) + + transfer = current_transfer_registry.get_transfer( + file_record=self._file, file_service=self._service, record=self._record + ) + for k, v in transfer.expand_links(self._identity, _links["self"]).items(): + if v is not None: + _links[k] = v + else: + _links.pop(k, None) + return _links def send_file(self, restricted=True, as_attachment=False): """Return file stream.""" - return self._file.object_version.send_file( - restricted=restricted, as_attachment=as_attachment + transfer = current_transfer_registry.get_transfer( + file_record=self._file, file_service=self._service, record=self._record ) + return transfer.send_file(restricted=restricted, as_attachment=as_attachment) def open_stream(self, mode): """Return a file stream context manager.""" @@ -89,11 +102,27 @@ def entries(self): projection = self._service.file_schema.dump( entry, context=dict( - identity=self._identity, + identity=self._identity, record=self._record, service=self._service ), ) + + # create links if self._links_item_tpl: - projection["links"] = self._links_item_tpl.expand(self._identity, entry) + links = self._links_item_tpl.expand(self._identity, entry) + else: + links = {} + + # add transfer links + transfer = current_transfer_registry.get_transfer( + file_record=entry, file_service=self._service, record=self._record + ) + for k, v in transfer.expand_links(self._identity, links["self"]).items(): + if v is not None: + links[k] = v + else: + links.pop(k, None) + + projection["links"] = links yield projection diff --git a/invenio_records_resources/services/files/schema.py b/invenio_records_resources/services/files/schema.py index 8d1aae32..713ad269 100644 --- a/invenio_records_resources/services/files/schema.py +++ b/invenio_records_resources/services/files/schema.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2024 CERN. # Copyright (C) 2020 European Union. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -10,95 +11,59 @@ """File schema.""" from datetime import timezone -from urllib.parse import urlparse - -from flask import current_app -from marshmallow import ( - INCLUDE, - RAISE, - Schema, - ValidationError, - pre_dump, - validate, - validates, -) +from typing import Mapping + +from marshmallow import RAISE, Schema, ValidationError, pre_load from marshmallow.fields import UUID, Boolean, Dict, Integer, Nested, Str +from marshmallow_oneofschema import OneOfSchema from marshmallow_utils.fields import GenMethod, Links, TZDateTime -from .transfer import TransferType - - -class InitFileSchema(Schema): - """Service (component) schema for file initialization. - - The UI only sends a key and the documentation only refers to a key. - The tests though pass other fields. +from ...proxies import current_transfer_registry - Option 1: Only key - Pros: We limit what we support, we prevent instances from saving data - that we will need to support. - Cons: Change a few tests, disable PUT endpoint really - Option 2: Allow extra fields - Pros: Everything stays the same. - Cons: The same is loose / not quite consistent. +class BaseTransferSchema(Schema): + """Base transfer schema. - Given LTS, going for option 2 so that many changes are not introduced. - But ideally option 1 seems better: we can add other fields when we do - support third-party data hosting (and perhaps become FileSchema). + This schema is used to dump transfer metadata during the transfer and when + the transfer is finished. """ + type_ = Str(attribute="type", data_key="type", required=True) + """Transfer type. Required field, the initial transfer type is filled + automatically by the InitFileSchema.""" + class Meta: """Meta.""" - unknown = INCLUDE - - key = Str(required=True) - storage_class = Str() - uri = Str(load_only=True) - checksum = Str() - size = Integer() - - @validates("uri") - def validate_names(self, value): - """Validate the domain of the URI is allowed.""" - # checking if storage class and uri are compatible is a - # business logic concern, not a schema concern. - if value: - validate.URL(error="Not a valid URL.")(value) - domain = urlparse(value).netloc - allowed_domains = current_app.config.get( - "RECORDS_RESOURCES_FILES_ALLOWED_DOMAINS" - ) - if domain not in allowed_domains: - raise ValidationError("Domain not allowed", field_name="uri") - - @pre_dump(pass_many=False) - def fields_from_file_obj(self, data, **kwargs): - """Fields coming from the FileInstance model.""" - # this cannot be implemented as fields.Method since those receive the already - # dumped data. it could not be access to data.file. - # using data_key and attribute from marshmallow did not work as expected. - - # data is a FileRecord instance, might not have a file yet. - # data.file is a File wrapper object. - if data.file: - # mandatory fields - data["storage_class"] = data.file.storage_class - data["uri"] = data.file.uri - - # If Local -> remove uri as it contains internal file storage info - if not TransferType(data["storage_class"]).is_serializable(): - data.pop("uri") - - # optional fields - fields = ["checksum", "size"] - for field in fields: - value = getattr(data.file, field, None) - if value is not None: - data[field] = value + unknown = RAISE - return data + +class TransferTypeSchemas(Mapping): + """Mapping of transfer types to their schemas.""" + + def __getitem__(self, transfer_type): + """Get the schema for the given transfer type.""" + return current_transfer_registry.get_transfer_class(transfer_type).Schema + + def __iter__(self): + """Iterate over the transfer types.""" + return iter(current_transfer_registry.get_transfer_types()) + + def __len__(self): + """Return the number of transfer types.""" + return len(current_transfer_registry.get_transfer_types()) + + +class TransferSchema(OneOfSchema): + """Transfer schema. A polymorphic schema that can handle different transfer types.""" + + type_field = "type" + type_field_remove = False + type_schemas = TransferTypeSchemas() + + def get_obj_type(self, obj): + """Returns name of the schema during dump() calls, given the object being dumped.""" + return obj["type"] class FileAccessSchema(Schema): @@ -112,7 +77,7 @@ class Meta: hidden = Boolean() -class FileSchema(InitFileSchema): +class FileSchema(Schema): """Service schema for files.""" class Meta: @@ -123,7 +88,6 @@ class Meta: created = TZDateTime(timezone=timezone.utc, format="iso", dump_only=True) updated = TZDateTime(timezone=timezone.utc, format="iso", dump_only=True) - status = GenMethod("dump_status") mimetype = Str(dump_only=True, attribute="file.mimetype") version_id = UUID(attribute="file.version_id", dump_only=True) file_id = UUID(attribute="file.file_id", dump_only=True) @@ -134,14 +98,56 @@ class Meta: links = Links() + key = Str(required=True, dump_only=True) + storage_class = Str(dump_only=True, attribute="file.file.storage_class") + checksum = Str(dump_only=True, attribute="file.file.checksum") + size = Integer(dump_only=True, attribute="file.file.size") + transfer = Nested(TransferSchema, dump_only=True) + status = GenMethod("dump_status") + transfer = Nested(TransferSchema, dump_only=True) + def dump_status(self, obj): """Dump file status.""" - # due to time constraints the status check is done here - # however, ideally this class should not need knowledge of - # the TransferType class, it should be encapsulated at File - # wrapper class or lower. - has_file = obj.file is not None - if has_file and TransferType(obj.file.storage_class).is_completed: - return "completed" - - return "pending" + transfer = current_transfer_registry.get_transfer( + file_record=obj, + file_service=self.context.get("service"), + record=obj.record, + ) + return transfer.status + + +class InitFileSchemaMixin(Schema): + """Service (component) schema mixin for file initialization. + + During file initialization, this mixin is merged with the FileSchema + above. + + """ + + class Meta: + """Meta.""" + + unknown = RAISE + + key = Str(required=True, dump_only=False) + storage_class = Str(default="L", dump_only=False) + checksum = Str(dump_only=False) + size = Integer(dump_only=False) + transfer = Nested(TransferSchema, dump_only=False) + + @pre_load + def _fill_initial_transfer(self, data, **kwargs): + """Fill in the default transfer type.""" + if not isinstance(data, dict): + # should be a dictionary, otherwise there will be validation error later on + return data + + data.setdefault("transfer", {}) + if not isinstance(data["transfer"], dict): + raise ValidationError( + {"transfer": "Transfer metadata must be a dictionary."} + ) + data["transfer"].setdefault( + "type", current_transfer_registry.default_transfer_type + ) + return data diff --git a/invenio_records_resources/services/files/service.py b/invenio_records_resources/services/files/service.py index 373f314f..524f60f5 100644 --- a/invenio_records_resources/services/files/service.py +++ b/invenio_records_resources/services/files/service.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2021 CERN. # Copyright (C) 2020-2021 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -10,11 +11,13 @@ """File Service API.""" from flask import current_app +from marshmallow import ValidationError from ..base import LinksTemplate, Service from ..errors import FailedFileUploadException, FileKeyNotFoundError from ..records.schema import ServiceSchemaWrapper from ..uow import RecordCommitOp, unit_of_work +from .schema import InitFileSchemaMixin class FileService(Service): @@ -27,9 +30,28 @@ def record_cls(self): @property def file_schema(self): - """Returns the data schema instance.""" + """Returns the data schema instance. + + The schema can be used for dumping file's metadata or updating them. + For the creation of a new file, use the `initial_file_schema` property + as it will include the necessary fields for initiating the file upload. + """ return ServiceSchemaWrapper(self, schema=self.config.file_schema) + @property + def initial_file_schema(self): + """Returns the data schema instance for initiating the file upload.""" + if not hasattr(self.config, "initial_file_schema"): + self.config.initial_file_schema = type( + self.config.file_schema.__name__ + "Initial", + ( + InitFileSchemaMixin, + self.config.file_schema, + ), + {}, + ) + return ServiceSchemaWrapper(self, schema=self.config.initial_file_schema) + def file_result_item(self, *args, **kwargs): """Create a new instance of the resource unit.""" return self.config.file_result_item_cls(*args, **kwargs) @@ -88,7 +110,23 @@ def list_files(self, identity, id_): @unit_of_work() def init_files(self, identity, id_, data, uow=None): """Initialize the file upload for the record.""" - record = self._get_record(id_, identity, "create_files") + # validate the input data at the beginning, as we need to check + # if user has permission for the transfer type that is on + # each of the uploaded files. This is done in the IfTransferType + # permission generator. + schema = self.initial_file_schema.schema(many=True) + data = schema.load(data) + + if not data: + raise ValidationError("No files to upload.") + + # resolve the record and check permissions for each uploaded file + record = self.record_cls.pid.resolve(id_, registered_only=False) + + for created_file in data: + self.require_permission( + identity, "create_files", record=record, file_metadata=created_file + ) self.run_components("init_files", identity, id_, record, data, uow=uow) @@ -257,7 +295,7 @@ def set_file_content( except FailedFileUploadException as e: file = e.file - current_app.logger.exception(f"File upload transfer failed.") + current_app.logger.exception("File upload transfer failed.") # we gracefully fail so that uow can commit the cleanup operation in # FileContentComponent errors = "File upload transfer failed." @@ -287,3 +325,73 @@ def get_file_content(self, identity, id_, file_key): record, links_tpl=self.file_links_item_tpl(id_), ) + + def get_transfer_metadata(self, identity, id_, file_key): + """Retrieve file transfer metadata.""" + record = self._get_record( + id_, identity, "get_file_transfer_metadata", file_key=file_key + ) + file = record.files[file_key] + transfer_metadata = dict(file.transfer) + self.run_components( + "get_transfer_metadata", identity, id_, file_key, record, transfer_metadata + ) + return transfer_metadata + + @unit_of_work() + def update_transfer_metadata( + self, identity, id_, file_key, transfer_metadata, uow=None + ): + """Update file transfer metadata.""" + record = self._get_record( + id_, identity, "update_file_transfer_metadata", file_key=file_key + ) + self.run_components( + "update_transfer_metadata", + identity, + id_, + file_key, + record, + transfer_metadata, + uow=uow, + ) + + @unit_of_work() + def set_multipart_file_content( + self, identity, id_, file_key, part, stream, content_length=None, uow=None + ): + """Save file content of a single part. + + :raises FileKeyNotFoundError: If the record has no file for the ``file_key`` + """ + record = self._get_record(id_, identity, "set_content_files", file_key=file_key) + errors = None + try: + self.run_components( + "set_multipart_file_content", + identity, + id_, + file_key, + part, + stream, + content_length, + record, + uow=uow, + ) + file = record.files[file_key] + + except FailedFileUploadException as e: + file = e.file + current_app.logger.exception("File upload transfer failed.") + # we gracefully fail so that uow can commit the cleanup operation in + # FileContentComponent + errors = "File upload transfer failed." + + return self.file_result_item( + self, + identity, + file, + record, + errors=errors, + links_tpl=self.file_links_item_tpl(id_), + ) diff --git a/invenio_records_resources/services/files/tasks.py b/invenio_records_resources/services/files/tasks.py index bacccfd0..f5066b91 100644 --- a/invenio_records_resources/services/files/tasks.py +++ b/invenio_records_resources/services/files/tasks.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2022-2024 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -8,6 +9,8 @@ """Files tasks.""" +import traceback + import requests from celery import shared_task from flask import current_app @@ -15,6 +18,7 @@ from ...proxies import current_service_registry from ...services.errors import FileKeyNotFoundError +from .transfer.constants import LOCAL_TRANSFER_TYPE @shared_task(ignore_result=True) @@ -22,20 +26,52 @@ def fetch_file(service_id, record_id, file_key): """Fetch file from external storage.""" try: service = current_service_registry.get(service_id) - file_record = service.read_file_metadata(system_identity, record_id, file_key) - source_url = file_record._file.file.uri + transfer_metadata = service.get_transfer_metadata( + system_identity, record_id, file_key + ) + source_url = transfer_metadata["url"] # download file # verify=True for self signed certificates by default - with requests.get(source_url, stream=True, allow_redirects=True) as response: - # save file - service.set_file_content( - system_identity, - record_id, - file_key, - response.raw, # has read method + try: + with requests.get( + source_url, stream=True, allow_redirects=True + ) as response: + # save file + if response.status_code != 200: + current_app.logger.error( + f"Failed to fetch file from {source_url} with status code {response.status_code}" + ) + transfer_metadata["error"] = response.text + service.update_transfer_metadata( + system_identity, record_id, file_key, transfer_metadata + ) + return + service.set_file_content( + system_identity, + record_id, + file_key, + response.raw, # has read method + ) + transfer_metadata.pop("url") + transfer_metadata["type"] = LOCAL_TRANSFER_TYPE + service.update_transfer_metadata( + system_identity, record_id, file_key, transfer_metadata + ) + # commit file + service.commit_file(system_identity, record_id, file_key) + except Exception as e: + current_app.logger.error(e) + transfer_metadata["error"] = str(e) + service.update_transfer_metadata( + system_identity, record_id, file_key, transfer_metadata ) - # commit file - service.commit_file(system_identity, record_id, file_key) + return except FileKeyNotFoundError as e: current_app.logger.error(e) + + except Exception as e: + current_app.logger.error(e) + traceback.print_exc() + # do not raise an exception as we want the task to be marked as errored + raise diff --git a/invenio_records_resources/services/files/transfer.py b/invenio_records_resources/services/files/transfer.py deleted file mode 100644 index d7c889d8..00000000 --- a/invenio_records_resources/services/files/transfer.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2022-2024 CERN. -# -# Invenio-Records-Resources is free software; you can redistribute it and/or -# modify it under the terms of the MIT License; see LICENSE file for more -# details. - -"""Files transfer.""" - -from abc import ABC, abstractmethod -from enum import Enum - -from flask import current_app -from fs.errors import CreateFailed -from invenio_files_rest.errors import FileSizeError -from invenio_i18n import lazy_gettext as _ -from werkzeug.exceptions import ClientDisconnected - -from ..errors import TransferException -from ..uow import TaskOp -from .tasks import fetch_file - - -class TransferType(str, Enum): - """File type, it inherits from str to be JSON serializable. - - LOCAL represents a file that is stored locally in the instance's storage. - FETCH represents a file that needs to be fetched from an external storage - and saved locally. - REMOTE represents a file that is stored externally and is linked to the record. - """ - - LOCAL = "L" - FETCH = "F" - REMOTE = "R" - - def __eq__(self, other): - """Equality test.""" - return self.value == other - - def __str__(self): - """Return its value.""" - return self.value - - @property - def is_completed(self): - """Return if the type represents a completed transfer.""" - return self in [TransferType.LOCAL, TransferType.REMOTE] - - def is_serializable(self): - """Return if the type represents a localy available file.""" - return self != TransferType.LOCAL - - -class BaseTransfer(ABC): - """Local transfer.""" - - def __init__(self, type, service=None, uow=None): - """Constructor.""" - self.type = type - self.service = service - self.uow = uow - - @abstractmethod - def init_file(self, record, file_data): - """Initialize a file.""" - raise NotImplementedError() - - def set_file_content(self, record, file, file_key, stream, content_length): - """Set file content.""" - bucket = record.files.bucket - - size_limit = bucket.size_limit - if content_length and size_limit and content_length > size_limit: - desc = ( - _("File size limit exceeded.") - if isinstance(size_limit, int) - else size_limit.reason - ) - raise FileSizeError(description=desc) - - try: - record.files.create_obj( - file_key, stream, size=content_length, size_limit=size_limit - ) - except (ClientDisconnected, CreateFailed) as e: - raise TransferException(f'Transfer of File with key "{file_key}" failed.') - - def commit_file(self, record, file_key): - """Commit a file.""" - # fetch files can be committed, its up to permissions to decide by who - # e.g. system, since its the one downloading the file - record.files.commit(file_key) - f_obj = record.files.get(file_key) - f_inst = getattr(f_obj, "file", None) - file_size = getattr(f_inst, "size", None) - if file_size == 0: - allow_empty_files = current_app.config.get( - "RECORDS_RESOURCES_ALLOW_EMPTY_FILES", True - ) - if not allow_empty_files: - raise FileSizeError(description=_("Empty files are not accepted.")) - - # @abstractmethod - # def read_file_content(self, record, data): - # """Read a file content.""" - # pass - - -class LocalTransfer(BaseTransfer): - """Local transfer.""" - - def __init__(self, **kwargs): - """Constructor.""" - super().__init__(TransferType.LOCAL, **kwargs) - - def init_file(self, record, file_data): - """Initialize a file.""" - uri = file_data.pop("uri", None) - if uri: - raise Exception("Cannot set URI for local files.") - - file = record.files.create(key=file_data.pop("key"), data=file_data) - - return file - - def set_file_content(self, record, file, file_key, stream, content_length): - """Set file content.""" - if file: - raise TransferException(f'File with key "{file_key}" is committed.') - - super().set_file_content(record, file, file_key, stream, content_length) - - -class FetchTransfer(BaseTransfer): - """Fetch transfer.""" - - def __init__(self, **kwargs): - """Constructor.""" - super().__init__(TransferType.FETCH, **kwargs) - - def init_file(self, record, file_data): - """Initialize a file.""" - uri = file_data.pop("uri", None) - if not uri: - raise Exception("URI is required for fetch files.") - - obj_kwargs = { - "file": { - "uri": uri, - "storage_class": self.type, - "checksum": file_data.pop("checksum", None), - "size": file_data.pop("size", None), - } - } - - file_key = file_data.pop("key") - file = record.files.create( - key=file_key, - data=file_data, - obj=obj_kwargs, - ) - - self.uow.register( - TaskOp( - fetch_file, - service_id=self.service.id, - record_id=record.pid.pid_value, - file_key=file_key, - ) - ) - return file - - -class Transfer: - """Transfer type.""" - - @classmethod - def get_transfer(cls, file_type, **kwargs): - """Get transfer type.""" - if file_type == TransferType.FETCH: - return FetchTransfer(**kwargs) - else: # default to local - return LocalTransfer(**kwargs) - - @classmethod - def commit_file(cls, record, file_key): - """Commit a file.""" - file = record.files.get(file_key).file - transfer = cls.get_transfer(getattr(file, "storage_class", None)) - # file is not passed since that is the current head of the OV - # committing means setting the latest of the bucket (OV.get) - transfer.commit_file(record, file_key) diff --git a/invenio_records_resources/services/files/transfer/__init__.py b/invenio_records_resources/services/files/transfer/__init__.py new file mode 100644 index 00000000..a9f0196c --- /dev/null +++ b/invenio_records_resources/services/files/transfer/__init__.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Files transfer.""" + +from .base import Transfer, TransferStatus +from .constants import ( + FETCH_TRANSFER_TYPE, + LOCAL_TRANSFER_TYPE, + MULTIPART_TRANSFER_TYPE, + REMOTE_TRANSFER_TYPE, +) +from .providers.fetch import FetchTransfer +from .providers.local import LocalTransfer +from .providers.multipart import MultipartTransfer +from .providers.remote import RemoteTransfer + +__all__ = ( + "Transfer", + "FETCH_TRANSFER_TYPE", + "LOCAL_TRANSFER_TYPE", + "MULTIPART_TRANSFER_TYPE", + "REMOTE_TRANSFER_TYPE", + "TransferStatus", + "FetchTransfer", + "LocalTransfer", + "MultipartTransfer", + "RemoteTransfer", +) diff --git a/invenio_records_resources/services/files/transfer/base.py b/invenio_records_resources/services/files/transfer/base.py new file mode 100644 index 00000000..db68ceb4 --- /dev/null +++ b/invenio_records_resources/services/files/transfer/base.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Base transfer class.""" + +from abc import ABC + +from flask_babel import lazy_gettext as _ +from fs.errors import CreateFailed +from invenio_files_rest.errors import FileSizeError +from werkzeug.exceptions import ClientDisconnected + +from invenio_records_resources.records.api import Record +from invenio_records_resources.services.errors import TransferException +from invenio_records_resources.services.files.service import FileService + +from ..schema import BaseTransferSchema + + +class TransferStatus: + """Transfer status. Constants to be used as return values for get_status.""" + + # Can not be enum to be json serializable, so just a class with constants. + + PENDING = "pending" + COMPLETED = "completed" + FAILED = "failed" + + +class Transfer(ABC): + """Local transfer.""" + + transfer_type: str + """ + The transfer type for this transfer instance. + Overriding classes must set this class attribute. + """ + + Schema = BaseTransferSchema + """ + Schema definition for transfer metadata. Transfer providers are free to supply + their own schema with additional fields. + """ + + def __init__( + self, + record: Record, + key: str, + file_service: FileService, + uow=None, + ): + """Constructor.""" + self.record = record + self.key = key + self.file_service = file_service + self.uow = uow + + def init_file(self, record, file_metadata): + """Initialize a file and return a file record.""" + return record.files.create( + key=file_metadata.pop("key"), + transfer=file_metadata.pop("transfer"), + data=file_metadata, + ) + + @property + def file_record(self): + """Get the file record.""" + return self.record.files[self.key] + + def set_file_content(self, stream, content_length): + """Set file content.""" + bucket = self.record.bucket + + size_limit = bucket.size_limit + if content_length and size_limit and content_length > size_limit: + desc = ( + _("File size limit exceeded.") + if isinstance(size_limit, int) + else size_limit.reason + ) + raise FileSizeError(description=desc) + + try: + self.record.files.create_obj( + self.file_record.key, stream, size=content_length, size_limit=size_limit + ) + except (ClientDisconnected, CreateFailed): + raise TransferException( + f'Transfer of File with key "{self.file_record.key}" failed.' + ) + + def commit_file(self): + """Commit a file.""" + # fetch files can be committed, its up to permissions to decide by who + # e.g. system, since its the one downloading the file + self.record.files.commit(self.file_record.key) + + def delete_file(self): + """ + Delete a file. + + This method is called before a file is removed from the record. + It can be used, for example, to do a cleanup of the file in the storage. + """ + + @property + def status(self): + """ + Get status of the upload of the passed file record. + + Returns TransferStatus.COMPLETED if the file is uploaded, + TransferStatus.PENDING if the file is not uploaded yet or + TransferStatus.FAILED if the file upload failed. + """ + if self.file_record is not None and self.file_record.file is not None: + return TransferStatus.COMPLETED + + return TransferStatus.PENDING + + def expand_links(self, identity, self_url): + """Expand links.""" + return {} + + def send_file(self, *, restricted, as_attachment): + """Send file to the client.""" + return self.file_record.object_version.send_file( + restricted=restricted, as_attachment=as_attachment + ) diff --git a/invenio_records_resources/services/files/transfer/constants.py b/invenio_records_resources/services/files/transfer/constants.py new file mode 100644 index 00000000..d9bf5a95 --- /dev/null +++ b/invenio_records_resources/services/files/transfer/constants.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Constants for file transfer types.""" + +# predefined transfer types +LOCAL_TRANSFER_TYPE = "L" +FETCH_TRANSFER_TYPE = "F" +REMOTE_TRANSFER_TYPE = "R" +MULTIPART_TRANSFER_TYPE = "M" diff --git a/invenio_records_resources/services/files/transfer/providers/__init__.py b/invenio_records_resources/services/files/transfer/providers/__init__.py new file mode 100644 index 00000000..396234a7 --- /dev/null +++ b/invenio_records_resources/services/files/transfer/providers/__init__.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Transfer providers.""" + +from .fetch import FetchTransfer +from .local import LocalTransfer +from .multipart import MultipartTransfer +from .remote import RemoteTransfer, RemoteTransferBase + +__all__ = ( + "RemoteTransferBase", + "RemoteTransfer", + "LocalTransfer", + "FetchTransfer", + "MultipartTransfer", +) diff --git a/invenio_records_resources/services/files/transfer/providers/fetch.py b/invenio_records_resources/services/files/transfer/providers/fetch.py new file mode 100644 index 00000000..a0bc34ad --- /dev/null +++ b/invenio_records_resources/services/files/transfer/providers/fetch.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Fetch transfer provider.""" + +from urllib.parse import urlparse + +from flask import current_app +from marshmallow import ValidationError, fields, validate, validates + +from invenio_records_resources.services.files.transfer.base import TransferStatus + +from ....uow import RecordCommitOp, TaskOp +from ...schema import BaseTransferSchema +from ...tasks import fetch_file +from ..constants import FETCH_TRANSFER_TYPE, LOCAL_TRANSFER_TYPE +from .remote import RemoteTransferBase + + +class FetchTransfer(RemoteTransferBase): + """Fetch transfer. + + This transfer provider is used to fetch a file from a remote location. The external + file will be downloaded and stored in the record's bucket. When this is done, the + transfer type will be changed to `local` and the file will be available for download. + """ + + transfer_type = FETCH_TRANSFER_TYPE + + class Schema(BaseTransferSchema): + """Schema for fetch transfer.""" + + url = fields.Url(required=True, load_only=True) + """URL to fetch the file from. + + Note: the url is never dumped to the client as it can contain credentials ( + basic http authentication, pre-signed request, ...) and should not be exposed. + """ + error = fields.Str(dump_only=True) + + @validates("url") + def validate_names(self, value): + """Validate the domain of the URI is allowed.""" + # checking if storage class and uri are compatible is a + # business logic concern, not a schema concern. + if value: + validate.URL(error="Not a valid URL.")(value) + domain = urlparse(value).netloc + allowed_domains = current_app.config.get( + "RECORDS_RESOURCES_FILES_ALLOWED_DOMAINS" + ) + if domain not in allowed_domains: + raise ValidationError("Domain not allowed", field_name="uri") + + def init_file(self, record, file_metadata): + """Initialize a file and return a file record.""" + file = super().init_file(record, file_metadata) + + self.uow.register( + TaskOp( + fetch_file, + service_id=self.file_service.id, + record_id=record.pid.pid_value, + file_key=file.key, + ) + ) + return file + + def set_file_content(self, stream, content_length): + """Set file content.""" + super().set_file_content(stream, content_length) + + def commit_file(self): + """Commit the file.""" + super().commit_file() + self.file_record.transfer.transfer_type = LOCAL_TRANSFER_TYPE + self.uow.register(RecordCommitOp(self.file_record)) + + @property + def status(self): + """Get the status of the transfer.""" + # always return completed for remote files + if self.file_record.transfer.get("error"): + return TransferStatus.FAILED + return super().status diff --git a/invenio_records_resources/services/files/transfer/providers/local.py b/invenio_records_resources/services/files/transfer/providers/local.py new file mode 100644 index 00000000..abc0b986 --- /dev/null +++ b/invenio_records_resources/services/files/transfer/providers/local.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Local transfer provider.""" + +from flask_babel import gettext as _ + +from ....errors import TransferException +from ..base import Transfer +from ..constants import LOCAL_TRANSFER_TYPE + + +class LocalTransfer(Transfer): + """Local transfer. + + This transfers expects the file to be uploaded directly in one go to the + server. The file content is stored in the record's bucket. + """ + + transfer_type = LOCAL_TRANSFER_TYPE + + def set_file_content(self, stream, content_length): + """Set file content.""" + if self.file_record.file is not None: + raise TransferException( + _(f'File with key "{self.file_record.key}" is already committed.') + ) + + super().set_file_content(stream, content_length) diff --git a/invenio_records_resources/services/files/transfer/providers/multipart.py b/invenio_records_resources/services/files/transfer/providers/multipart.py new file mode 100644 index 00000000..b4e6da4f --- /dev/null +++ b/invenio_records_resources/services/files/transfer/providers/multipart.py @@ -0,0 +1,314 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Multipart file transfer provider.""" + +from datetime import datetime, timedelta + +import marshmallow as ma +from invenio_db import db +from invenio_files_rest import current_files_rest +from invenio_files_rest.models import FileInstance, ObjectVersion + +from ....errors import TransferException +from ...schema import BaseTransferSchema +from ..base import Transfer, TransferStatus +from ..constants import LOCAL_TRANSFER_TYPE, MULTIPART_TRANSFER_TYPE + + +class MultipartStorageExt: + """Multipart storage extension. + + If the backend storage supports multipart uploads, this class will delegate + all calls to the storage backend. Otherwise, it will preallocate the file + and upload the parts in place using the seek attribute. + """ + + def __init__(self, storage): + """Constructor.""" + self._storage = storage + + def multipart_initialize_upload(self, parts, size, part_size): + """ + Initialize a multipart upload. + + :param parts: The number of parts that will be uploaded. + :param size: The total size of the file. + :param part_size: The size of each part except the last one. + + :returns: a dictionary of additional metadata that should be stored between + the initialization and the commit of the upload. + """ + # if the storage backend is multipart aware, use it + if hasattr(self._storage, "multipart_initialize_upload"): + return self._storage.multipart_initialize_upload(parts, size, part_size) + + # otherwise use it as a local storage and pre-allocate the file. + # In this case, the part size is required as we will be uploading + # the parts in place to not double the space required. + if not part_size: + raise TransferException( + "Multipart file transfer to local storage requires part_size." + ) + self._storage.initialize(size=size) + return {} + + def multipart_set_content(self, part, stream, content_length, **multipart_metadata): + """ + Set the content of a part. + + This method is called for each part of the + multipart upload when the upload comes through the Invenio server (for example, + the upload target is a local filesystem and not S3 or another external service). + + :param part: The part number. + :param stream: The stream with the part content. + :param content_length: The content length of the part. Must be equal to the + part_size for all parts except the last one. + :param multipart_metadata: The metadata returned by the multipart_initialize_upload + together with "parts", "part_size" and "size". + + :returns: a dictionary of additional metadata that should be stored as a result of this + part upload. This metadata will be passed to the commit_upload method. + """ + if hasattr(self._storage, "multipart_set_content"): + return self._storage.multipart_set_content( + part, stream, content_length, **multipart_metadata + ) + + # generic implementation + part_size = int(multipart_metadata["part_size"]) + parts = int(multipart_metadata["parts"]) + + if part > parts: + raise TransferException( + "Part number is higher than total parts sent in multipart initialization." + ) + + if part < parts and content_length != part_size: + raise TransferException( + "Size of this part must be equal to part_size sent in multipart initialization." + ) + + self._storage.update( + stream, + seek=(int(part) - 1) * part_size, + size=content_length, + ) + return {} + + def multipart_commit_upload(self, **multipart_metadata): + """ + Commit the multipart upload. + + :param multipart_metadata: The metadata returned by the multipart_initialize_upload + and the metadata returned by the multipart_set_content for each part. + """ + if hasattr(self._storage, "multipart_commit_upload"): + self._storage.multipart_commit_upload(**multipart_metadata) + + def multipart_abort_upload(self, **multipart_metadata): + """ + Abort the multipart upload. + + :param multipart_metadata: The metadata returned by the multipart_initialize_upload + and the metadata returned by the multipart_set_content for each part. + """ + if hasattr(self._storage, "multipart_abort_upload"): + return self._storage.multipart_abort_upload(**multipart_metadata) + + def multipart_links(self, base_url, **multipart_metadata): + """ + Generate links for the parts of the multipart upload. + + :param base_url: The base URL of the file inside the repository. + :param multipart_metadata: The metadata returned by the multipart_initialize_upload + and the metadata returned by the multipart_set_content for each part. + :returns: a dictionary of name of the link to link value + """ + if hasattr(self._storage, "multipart_links"): + links = self._storage.multipart_links(**multipart_metadata) + # TODO: permissions!!! Should not present part links to people that do not have rights to upload + else: + links = {} + + if "parts" not in links: + # generic implementation + parts = int(multipart_metadata.get("parts", 0)) + if not parts: + raise TransferException( + "Implementation error: Multipart file missing parts tag." + ) + links["parts"] = [ + { + "part": part_no + 1, + "url": f"{base_url}/content/{part_no+1}", + "expiration": (datetime.utcnow() + timedelta(days=14)).isoformat(), + } + for part_no in range(parts) + ] + + if "content" not in links: + links["content"] = None + + return links + + def __hasattr__(self, name): + """Delegate all calls to the storage backend.""" + return hasattr(self._storage, name) + + def __getattr__(self, name): + """Delegate all calls to the storage backend.""" + return getattr(self._storage, name) + + +class MultipartTransfer(Transfer): + """Multipart transfer provider.""" + + transfer_type = MULTIPART_TRANSFER_TYPE + + class Schema(BaseTransferSchema): + """Schema for multipart transfer.""" + + part_size = ma.fields.Int(required=False) + parts = ma.fields.Int(required=True) + + def init_file(self, record, file_metadata): + """Initialize a file.""" + parts = file_metadata["transfer"].get("parts", None) + part_size = file_metadata["transfer"].get("part_size", None) + size = file_metadata.get("size", None) + checksum = file_metadata.get("checksum", None) + storage_class = file_metadata.get("storage_class", None) + + if not parts: + raise TransferException("Multipart file transfer requires parts.") + + if not size: + raise TransferException("Multipart file transfer requires file size.") + + file_record = super().init_file(record, file_metadata) + + # create the object version and associated file instance that holds the storage_class + version = ObjectVersion.create(record.bucket, file_record.key) + file_record.object_version = version + file_record.object_version_id = version.version_id + + file_record.commit() + + # create the file instance that will be used to get the storage factory. + # it might also be used to initialize the file (preallocate its size) + file_instance = FileInstance.create() + db.session.add(file_instance) + version.set_file(file_instance) + + storage = self._get_storage( + fileinstance=file_instance, + default_location=(version.bucket.location.uri), + default_storage_class=storage_class, + ) + + # ask the storage backend to initialize the upload + multipart_metadata = ( + storage.multipart_initialize_upload(parts, size, part_size) or {} + ) + multipart_metadata.setdefault("parts", parts) + multipart_metadata.setdefault("part_size", part_size) + multipart_metadata.setdefault("size", size) + + file_record.transfer["multipart_metadata"] = multipart_metadata + + # set the uri on the file instance and potentially the checksum + file_instance.set_uri( + storage.fileurl, + size, + checksum or "mutlipart:unknown", + storage_class=storage_class, + ) + + db.session.add(file_instance) + file_record.commit() # updated transfer metadata, so need to commit + return file_record + + def set_file_content(self, stream, content_length): + """Set file content.""" + raise TransferException( + "Can not set content for multipart file, use the parts instead." + ) + + def set_file_multipart_content(self, part, stream, content_length): + """ + Set file content for a part. + + This method is called for each part of the + multipart upload when the upload comes through the Invenio server (for example, + the upload target is a local filesystem and not S3 or another external service). + + :param part: The part number. + :param stream: The stream with the part content. + :param content_length: The content length of the part. Must be equal to the + part_size for all parts except the last one. + """ + storage = self._get_storage() + updated_multipart_metadata = storage.multipart_set_content( + part, stream, content_length, **self.multipart_metadata + ) + + self.file_record.transfer["multipart_metadata"] = ( + updated_multipart_metadata or self.multipart_metadata + ) + + def commit_file(self): + """ + Commit the file. + + This method is called after all parts have been uploaded. + It then changes the storage class to local, thus turning the uploaded file + into a file that can be sent via the configured storage backend. + + This is the same principle that Fetch uses to turn a file from a remote storage + into a locally served one. + """ + super().commit_file() + + storage = self._get_storage() + storage.multipart_commit_upload(**self.multipart_metadata) + + # change the transfer type to local + self.file_record.transfer.transfer_type = LOCAL_TRANSFER_TYPE + self.file_record.commit() + + def delete_file(self): + """If this method is called, we are deleting a file with an active multipart upload.""" + storage = self._get_storage() + storage.multipart_abort_upload(**self.multipart_metadata) + + @property + def status(self): + """Get the status of the transfer.""" + # if the storage_class is M, return pending + # after commit, the storage class is changed to L (same way as FETCH works) + return TransferStatus.PENDING + + def expand_links(self, identity, self_url): + """Expand links to include part urls.""" + # if the storage can expand links, use it + storage = self._get_storage() + return storage.multipart_links(self_url, **self.multipart_metadata) + + @property + def multipart_metadata(self): + """Get the multipart metadata.""" + return self.file_record.transfer["multipart_metadata"] + + def _get_storage(self, **kwargs): + """Internal method to get the storage backend.""" + if "fileinstance" not in kwargs: + kwargs["fileinstance"] = self.file_record.file + # get the storage backend + storage = current_files_rest.storage_factory(**kwargs) + return MultipartStorageExt(storage) diff --git a/invenio_records_resources/services/files/transfer/providers/remote.py b/invenio_records_resources/services/files/transfer/providers/remote.py new file mode 100644 index 00000000..8c7b2280 --- /dev/null +++ b/invenio_records_resources/services/files/transfer/providers/remote.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Remote file transfer provider.""" + +from urllib.parse import urlparse + +from flask import current_app +from marshmallow import ValidationError, fields, validate, validates + +from ...schema import BaseTransferSchema +from ..base import Transfer, TransferStatus +from ..constants import REMOTE_TRANSFER_TYPE + + +class RemoteTransferBase(Transfer): + """Remote transfer base class.""" + + class Schema(BaseTransferSchema): + """Schema for remote transfer.""" + + url = fields.Str(required=True, load_only=True) + """URL that points to the remote file. + + It is not dumped to the client as it would make download statistics impossible. + The file is accessed by using the /content url and then a 302 redirect + is sent to the client with the actual URI. + """ + + @validates("url") + def validate_names(self, value): + """Validate the domain of the URL is allowed.""" + # checking if storage class and uri are compatible is a + # business logic concern, not a schema concern. + if value: + validate.URL(error="Not a valid URL.")(value) + domain = urlparse(value).netloc + allowed_domains = current_app.config.get( + "RECORDS_RESOURCES_FILES_ALLOWED_REMOTE_DOMAINS", () + ) + if domain not in allowed_domains: + raise ValidationError("Domain not allowed", field_name="url") + + +class RemoteTransfer(RemoteTransferBase): + """Remote transfer.""" + + transfer_type = REMOTE_TRANSFER_TYPE + + @property + def status(self): + """Get the status of the transfer.""" + # always return completed for remote files + return TransferStatus.COMPLETED + + def send_file(self, *, as_attachment, **kwargs): + """Send the file to the client.""" + return current_app.response_class( + status=302, + headers={ + "Location": self.file_record.transfer["url"], + }, + ) diff --git a/invenio_records_resources/services/files/transfer/registry.py b/invenio_records_resources/services/files/transfer/registry.py new file mode 100644 index 00000000..63f54c4d --- /dev/null +++ b/invenio_records_resources/services/files/transfer/registry.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2024 CERN. +# Copyright (C) 2025 CESNET. +# +# Invenio-Records-Resources is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. +"""Transfer registry.""" + + +class TransferRegistry: + """A registry for transfer providers.""" + + def __init__(self, default_transfer_type: str): + """Creates a new transfer registry. + + :param default_transfer_type: The default transfer type to use when no transfer type is provided in file upload initiation. + """ + self._transfers = {} + self._default_transfer_type = default_transfer_type + + @property + def default_transfer_type(self): + """Get the default transfer type.""" + return self._default_transfer_type + + def register(self, transfer_cls): + """Register a new transfer provider.""" + transfer_type = transfer_cls.transfer_type + + if transfer_type in self._transfers: + raise RuntimeError( + f"Transfer with type '{transfer_type}' " "is already registered." + ) + + self._transfers[transfer_type] = transfer_cls + + def get_transfer_class(self, transfer_type): + """Get transfer class by transfer type.""" + return self._transfers[transfer_type] + + def get_transfer_types(self): + """Get all registered transfer types.""" + return self._transfers.keys() + + def get_transfer( + self, + *, + record, + file_service, + key=None, + transfer_type=None, + file_record=None, + uow=None, + ): + """Get transfer for the given record and file service. + + To specify the file that the transfer should be performed on, + either the file record or the key together with transfer_type + must be provided. + """ + if file_record: + key = file_record.key + transfer_type = file_record.transfer.transfer_type + if key is None: + raise ValueError("Either key or file_record must be provided.") + if transfer_type is None: + raise ValueError("Either file_record or transfer_type must be provided.") + + return self._transfers[transfer_type]( + record=record, key=key, file_service=file_service, uow=uow + ) diff --git a/invenio_records_resources/services/records/facets/facets.py b/invenio_records_resources/services/records/facets/facets.py index b4bb3ccc..625e81aa 100644 --- a/invenio_records_resources/services/records/facets/facets.py +++ b/invenio_records_resources/services/records/facets/facets.py @@ -9,7 +9,6 @@ """Facets types defined.""" -from functools import reduce from invenio_search.engine import dsl diff --git a/invenio_records_resources/services/references/schema.py b/invenio_records_resources/services/references/schema.py index 0034c5ba..dd9713f9 100644 --- a/invenio_records_resources/services/references/schema.py +++ b/invenio_records_resources/services/references/schema.py @@ -8,7 +8,7 @@ """Schema for entity references.""" -from marshmallow import RAISE, Schema, ValidationError, fields, validates_schema +from marshmallow import Schema, ValidationError, fields, validates_schema # diff --git a/tests/conftest.py b/tests/conftest.py index f1ecd8d3..5ba64ef6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2023 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -32,7 +33,9 @@ def app_config(app_config): app_config["RECORDS_RESOURCES_FILES_ALLOWED_DOMAINS"] = [ "inveniordm.test", ] - + app_config["RECORDS_RESOURCES_FILES_ALLOWED_REMOTE_DOMAINS"] = [ + "inveniordm.test", + ] app_config["FILES_REST_STORAGE_CLASS_LIST"] = { "L": "Local", "F": "Fetch", @@ -99,4 +102,5 @@ def identity_simple(): i = Identity(1) i.provides.add(UserNeed(1)) i.provides.add(Need(method="system_role", value="any_user")) + i.provides.add(Need(method="system_role", value="authenticated_user")) return i diff --git a/tests/factories/conftest.py b/tests/factories/conftest.py index 5acf1e62..52d2b04e 100644 --- a/tests/factories/conftest.py +++ b/tests/factories/conftest.py @@ -11,7 +11,6 @@ """Factories test configuration.""" import pytest -from flask_principal import Identity, Need, UserNeed from invenio_app.factory import create_api as _create_api diff --git a/tests/factories/test_factory.py b/tests/factories/test_factory.py index 79237bda..41e2c40c 100644 --- a/tests/factories/test_factory.py +++ b/tests/factories/test_factory.py @@ -15,7 +15,7 @@ from sqlalchemy.exc import InvalidRequestError from invenio_records_resources.factories.factory import RecordTypeFactory -from invenio_records_resources.services import RecordServiceConfig, SearchOptions +from invenio_records_resources.services import SearchOptions from invenio_records_resources.services.records.components import ServiceComponent from invenio_records_resources.services.records.facets import TermsFacet diff --git a/tests/mock_module/permissions.py b/tests/mock_module/permissions.py index 8b08c512..4301c7aa 100644 --- a/tests/mock_module/permissions.py +++ b/tests/mock_module/permissions.py @@ -3,6 +3,7 @@ # This file is part of Invenio. # Copyright (C) 2020 CERN. # Copyright (C) 2020 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -11,9 +12,19 @@ """Example of a permission policy.""" from invenio_records_permissions import RecordPermissionPolicy -from invenio_records_permissions.generators import AnyUser, SystemProcess +from invenio_records_permissions.generators import ( + AnyUser, + AuthenticatedUser, + SystemProcess, +) -from invenio_records_resources.services.files.generators import AnyUserIfFileIsLocal +from invenio_records_resources.services.files.generators import IfTransferType +from invenio_records_resources.services.files.transfer import ( + FETCH_TRANSFER_TYPE, + LOCAL_TRANSFER_TYPE, + MULTIPART_TRANSFER_TYPE, + REMOTE_TRANSFER_TYPE, +) class PermissionPolicy(RecordPermissionPolicy): @@ -24,10 +35,33 @@ class PermissionPolicy(RecordPermissionPolicy): can_read = [AnyUser(), SystemProcess()] can_update = [AnyUser(), SystemProcess()] can_delete = [AnyUser(), SystemProcess()] - can_create_files = [AnyUser(), SystemProcess()] - can_set_content_files = [AnyUserIfFileIsLocal(), SystemProcess()] - can_get_content_files = [AnyUserIfFileIsLocal(), SystemProcess()] - can_commit_files = [AnyUserIfFileIsLocal(), SystemProcess()] + can_set_content_files = [ + IfTransferType(LOCAL_TRANSFER_TYPE, AnyUser()), + IfTransferType(MULTIPART_TRANSFER_TYPE, AnyUser()), + SystemProcess(), + ] + can_create_files = [ + IfTransferType(LOCAL_TRANSFER_TYPE, AnyUser()), + IfTransferType(FETCH_TRANSFER_TYPE, AuthenticatedUser()), + IfTransferType(REMOTE_TRANSFER_TYPE, AuthenticatedUser()), + IfTransferType(MULTIPART_TRANSFER_TYPE, AnyUser()), + SystemProcess(), + ] + can_get_content_files = [ + IfTransferType(LOCAL_TRANSFER_TYPE, AnyUser()), + IfTransferType(REMOTE_TRANSFER_TYPE, AnyUser()), + SystemProcess(), + ] + can_commit_files = [ + IfTransferType(LOCAL_TRANSFER_TYPE, AnyUser()), + IfTransferType(FETCH_TRANSFER_TYPE, SystemProcess()), + IfTransferType(MULTIPART_TRANSFER_TYPE, AnyUser()), + SystemProcess(), + ] can_read_files = [AnyUser(), SystemProcess()] can_update_files = [AnyUser(), SystemProcess()] can_delete_files = [AnyUser(), SystemProcess()] + + # who can get/set transfer metadata (currently service-level only, not exposed via REST API) + can_get_file_transfer_metadata = [SystemProcess()] + can_update_file_transfer_metadata = [SystemProcess()] diff --git a/tests/records/test_systemfield_index.py b/tests/records/test_systemfield_index.py index 5510f378..83303312 100644 --- a/tests/records/test_systemfield_index.py +++ b/tests/records/test_systemfield_index.py @@ -12,8 +12,6 @@ from invenio_search.engine import dsl from mock_module.api import Record -from invenio_records_resources.records.systemfields import IndexField - def test_class_attribute_access(): """Test that field is returned.""" diff --git a/tests/records/test_systemfield_modelpid.py b/tests/records/test_systemfield_modelpid.py index 0862410f..941d1221 100644 --- a/tests/records/test_systemfield_modelpid.py +++ b/tests/records/test_systemfield_modelpid.py @@ -8,7 +8,6 @@ """ModelPIDField tests.""" -import pytest from invenio_records.systemfields import ModelField from mock_module.api import Record as RecordBase from mock_module.models import RecordMetadataWithPID diff --git a/tests/records/test_systemfield_pid.py b/tests/records/test_systemfield_pid.py index 1c25d37f..bfc04217 100644 --- a/tests/records/test_systemfield_pid.py +++ b/tests/records/test_systemfield_pid.py @@ -9,7 +9,6 @@ """PIDField tests.""" -from datetime import datetime from invenio_pidstore.providers.recordid_v2 import RecordIdProviderV2 from mock_module.api import Record diff --git a/tests/resources/test_files_resource.py b/tests/resources/test_files_resource.py index b6831a2d..476c419a 100644 --- a/tests/resources/test_files_resource.py +++ b/tests/resources/test_files_resource.py @@ -3,12 +3,14 @@ # Copyright (C) 2020-2024 CERN. # Copyright (C) 2021 Northwestern University. # Copyright (C) 2021 European Union. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more # details. """Invenio Resources module to create REST APIs.""" + import zipfile from io import BytesIO from unittest.mock import patch @@ -127,7 +129,6 @@ def test_files_api_flow(client, search_clear, headers, input_data, location): assert res.json["key"] == "test.pdf" assert res.json["status"] == "completed" assert res.json["metadata"] == {"title": "Test file"} - file_size = str(res.json["size"]) assert isinstance(res.json["size"], int), "File size not integer" # Read a file's content @@ -309,6 +310,28 @@ def test_file_api_errors(client, search_clear, headers, input_data, location): # Pass an object instead of an array res = client.post(f"/mocks/{id_}/files", headers=headers, json={"key": "test.pdf"}) assert res.status_code == 400 + assert res.json == { + "errors": [{"field": "0._schema", "messages": ["Invalid input type."]}], + "message": "A validation error occurred.", + "status": 400, + } + + res = client.post( + f"/mocks/{id_}/files", + headers=headers, + json=[{"key": "test.pdf", "transfer": "not a dictionary"}], + ) + assert res.status_code == 400 + assert res.json == { + "errors": [ + { + "field": "transfer", + "messages": ["Transfer metadata must be a dictionary."], + } + ], + "message": "A validation error occurred.", + "status": 400, + } res = client.post( f"/mocks/{id_}/files", @@ -341,10 +364,14 @@ def test_file_api_errors(client, search_clear, headers, input_data, location): f"/mocks/{id_}/files", headers=headers, json=[ - {"key": "test.pdf", "title": "Test file"}, + {"key": "test.pdf", "metadata": {"title": "Test file"}}, ], ) assert res.status_code == 400 + assert res.json == { + "message": "File with key test.pdf already exists.", + "status": 400, + } def test_disabled_upload_file_resource( @@ -486,3 +513,85 @@ def add(self, fp, *args, **kwargs): files.sort() assert files == ["f1.pdf", "f2.pdf", "f3.pdf"] assert all(f.closed for f in captured_fps) + + +def test_files_multipart_api_flow( + app, client, search_clear, headers, input_data, location +): + """Test record creation.""" + # Initialize a draft + res = client.post("/mocks", headers=headers, json=input_data) + assert res.status_code == 201 + id_ = res.json["id"] + assert res.json["links"]["files"].endswith(f"/api/mocks/{id_}/files") + + # Initialize files upload + res = client.post( + f"/mocks/{id_}/files", + headers=headers, + json=[ + { + "key": "test.pdf", + "metadata": { + "title": "Test file", + }, + "size": 17, + "transfer": { + "type": "M", + "parts": 2, + "part_size": 10, + }, + }, + ], + ) + assert res.status_code == 201 + res_file = res.json["entries"][0] + assert res_file["key"] == "test.pdf" + assert res_file["status"] == "pending" + assert res_file["metadata"] == {"title": "Test file"} + assert res_file["links"]["self"].endswith(f"/api/mocks/{id_}/files/test.pdf") + assert "content" not in res_file["links"] + assert res_file["links"]["commit"].endswith( + f"/api/mocks/{id_}/files/test.pdf/commit" + ) + + parts_links = { + x["part"]: x["url"].split("/api", maxsplit=1)[1] + for x in res_file["links"]["parts"] + } + + assert len(parts_links) == 2 + + def upload_part(part_number, data): + res = client.put( + parts_links[part_number], + headers={ + "content-type": "application/octet-stream", + }, + data=data, + ) + assert res.status_code == 200 + assert res.json["status"] == "pending" + assert res.json["transfer"]["type"] == "M" + + upload_part(1, b"1234567890") + upload_part(2, b"1234567") + + # Commit the uploaded file + res = client.post(f"/mocks/{id_}/files/test.pdf/commit", headers=headers) + assert res.status_code == 200 + assert res.json["status"] == "completed" + assert res.json["transfer"]["type"] == "L" + + # Get the file metadata + res = client.get(f"/mocks/{id_}/files/test.pdf", headers=headers) + assert res.status_code == 200 + assert res.json["key"] == "test.pdf" + assert res.json["status"] == "completed" + assert res.json["metadata"] == {"title": "Test file"} + assert isinstance(res.json["size"], int), "File size not integer" + + # Read a file's content + res = client.get(f"/mocks/{id_}/files/test.pdf/content", headers=headers) + assert res.status_code == 200 + assert res.data == b"12345678901234567" diff --git a/tests/services/files/conftest.py b/tests/services/files/conftest.py index ce14cf69..ece894ee 100644 --- a/tests/services/files/conftest.py +++ b/tests/services/files/conftest.py @@ -2,6 +2,7 @@ # # Copyright (C) 2020-2021 CERN. # Copyright (C) 2021 Northwestern University. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -15,7 +16,7 @@ import pytest from invenio_cache import current_cache -from mock_module.api import Record, RecordWithFiles +from mock_module.api import RecordWithFiles from mock_module.config import ServiceWithFilesConfig from invenio_records_resources.services import RecordService @@ -44,7 +45,7 @@ def example_record(app, db, service, input_data, identity_simple, location): @pytest.fixture() -def example_file_record(db, input_data): +def example_file_record(db, input_data, location): """Example record.""" record = RecordWithFiles.create({}, **input_data) record.commit() diff --git a/tests/services/files/test_file_service.py b/tests/services/files/test_file_service.py index b790bdd2..35a5d600 100644 --- a/tests/services/files/test_file_service.py +++ b/tests/services/files/test_file_service.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2020-2024 CERN. +# Copyright (C) 2025 CESNET. # # Invenio-Records-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -12,6 +13,8 @@ from unittest.mock import patch import pytest +from flask_principal import Identity +from invenio_access import any_user from invenio_access.permissions import system_identity from invenio_files_rest.errors import FileSizeError from marshmallow import ValidationError @@ -35,6 +38,33 @@ class MockResponse: """Mock response.""" raw = BytesIO(b"test file content") + status_code = 200 + + class MockRequest: + """Mock request.""" + + def __enter__(self): + """Mock ctx manager.""" + return MockResponse() + + def __exit__(self, *args): + """Mock ctx manager.""" + pass + + return MockRequest() + + +@pytest.fixture(scope="module") +def mock_404_request(): + """Patch response raw.""" + + # Mock HTTP request + class MockResponse: + """Mock response.""" + + raw = BytesIO(b"not found") + status_code = 404 + text = "not found" class MockRequest: """Mock request.""" @@ -163,12 +193,12 @@ def test_init_files(file_service, location, example_file_record, identity_simple # -# External files +# External fetched files # @patch("invenio_records_resources.services.files.tasks.requests.get") -def test_external_file_simple_flow( +def test_fetch_file_simple_flow( p_response_raw, mock_request, file_service, @@ -194,8 +224,10 @@ def test_external_file_simple_flow( file_to_initialise = [ { "key": "article.txt", - "uri": "https://inveniordm.test/files/article.txt", - "storage_class": "F", + "transfer": { + "url": "https://inveniordm.test/files/article.txt", + "type": "F", + }, } ] @@ -213,7 +245,7 @@ def test_external_file_simple_flow( result = file_service.read_file_metadata(identity_simple, recid, "article.txt") result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "L" # changed after commit + assert result["transfer"]["type"] == "L" # changed after commit assert "uri" not in result # Retrieve file @@ -236,7 +268,7 @@ def test_external_file_simple_flow( assert list(result.entries) == [] -def test_external_file_invalid_url( +def test_fetch_file_invalid_url( file_service, example_file_record, identity_simple, location ): """Test invalid URL as URI.""" @@ -245,8 +277,10 @@ def test_external_file_invalid_url( file_to_initialise = [ { "key": "article.txt", - "uri": "invalid", - "storage_class": "F", + "transfer": { + "url": "invalid", + "type": "F", + }, } ] @@ -255,8 +289,40 @@ def test_external_file_invalid_url( @patch("invenio_records_resources.services.files.tasks.requests.get") -@patch("invenio_records_resources.services.files.transfer.fetch_file") -def test_content_and_commit_external_file( +def test_fetch_unreadable_file( + p_response_raw, + mock_404_request, + file_service, + example_file_record, + identity_simple, + location, +): + """Test fetching non-existing file.""" + + p_response_raw.return_value = mock_404_request + + recid = example_file_record["id"] + file_to_initialise = [ + { + "key": "article.txt", + "transfer": { + "url": "https://inveniordm.test/files/article-that-does-not-exist.txt", + "type": "F", + }, + } + ] + + file_service.init_files(identity_simple, recid, file_to_initialise) + + # List files + result = file_service.list_files(identity_simple, recid) + assert result.to_dict()["entries"][0]["status"] == "failed" + assert result.to_dict()["entries"][0]["transfer"]["error"] == "not found" + + +@patch("invenio_records_resources.services.files.tasks.requests.get") +@patch("invenio_records_resources.services.files.transfer.providers.fetch.fetch_file") +def test_content_and_commit_fetched_file( p_fetch_file, p_response_raw, mock_request, @@ -278,8 +344,10 @@ def test_content_and_commit_external_file( file_to_initialise = [ { "key": "article.txt", - "uri": "https://inveniordm.test/files/article.txt", - "storage_class": "F", + "transfer": { + "type": "F", + "url": "https://inveniordm.test/files/article.txt", + }, } ] @@ -291,7 +359,7 @@ def test_content_and_commit_external_file( result = file_service.read_file_metadata(identity_simple, recid, "article.txt") result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "F" + assert result["transfer"]["type"] == "F" # Set content as user content = BytesIO(b"test file content") @@ -314,7 +382,7 @@ def test_content_and_commit_external_file( ) result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "F" # not commited yet + assert result["transfer"]["type"] == "F" # not commited yet assert "uri" not in result # Commit as user @@ -325,13 +393,13 @@ def test_content_and_commit_external_file( result = file_service.commit_file(system_identity, recid, "article.txt") result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "L" + assert result["transfer"]["type"] == "L" assert "uri" not in result @patch("invenio_records_resources.services.files.tasks.requests.get") -@patch("invenio_records_resources.services.files.transfer.fetch_file") -def test_delete_not_committed_external_file( +@patch("invenio_records_resources.services.files.transfer.providers.fetch.fetch_file") +def test_delete_not_committed_fetched_file( p_fetch_file, p_response_raw, mock_request, @@ -353,8 +421,10 @@ def test_delete_not_committed_external_file( file_to_initialise = [ { "key": "article.txt", - "uri": "https://inveniordm.test/files/article.txt", - "storage_class": "F", + "transfer": { + "type": "F", + "url": "https://inveniordm.test/files/article.txt", + }, } ] @@ -366,7 +436,7 @@ def test_delete_not_committed_external_file( result = file_service.read_file_metadata(identity_simple, recid, "article.txt") result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "F" + assert result["transfer"]["type"] == "F" # Delete file file_service.delete_file(identity_simple, recid, "article.txt") @@ -403,8 +473,8 @@ def test_delete_not_committed_external_file( @patch("invenio_records_resources.services.files.tasks.requests.get") -@patch("invenio_records_resources.services.files.transfer.fetch_file") -def test_read_not_committed_external_file( +@patch("invenio_records_resources.services.files.transfer.providers.fetch.fetch_file") +def test_read_not_committed_fetched_file( p_fetch_file, p_response_raw, mock_request, @@ -424,8 +494,10 @@ def test_read_not_committed_external_file( file_to_initialise = [ { "key": "article.txt", - "uri": "https://inveniordm.test/files/article.txt", - "storage_class": "F", + "transfer": { + "type": "F", + "url": "https://inveniordm.test/files/article.txt", + }, } ] # Initialize file saving @@ -436,7 +508,7 @@ def test_read_not_committed_external_file( result = file_service.read_file_metadata(identity_simple, recid, "article.txt") result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "F" + assert result["transfer"]["type"] == "F" # List files result = file_service.list_files(identity_simple, recid) @@ -446,7 +518,7 @@ def test_read_not_committed_external_file( result = file_service.read_file_metadata(identity_simple, recid, "article.txt") result = result.to_dict() assert result["key"] == file_to_initialise[0]["key"] - assert result["storage_class"] == "F" # changed after commit + assert result["transfer"]["type"] == "F" # changed after commit # Retrieve file with pytest.raises(PermissionDeniedError): @@ -498,3 +570,139 @@ def test_empty_files( else: with pytest.raises(FileSizeError): result = file_service.commit_file(identity_simple, recid, "article.txt") + + +def test_multipart_file_upload_local_storage( + file_service, location, example_file_record, identity_simple +): + """Test the multipart upload to the local storage. + + - Initialize file saving + - Save 1 files via multipart upload + - Commit the files + - List files of the record + - Read file metadata + - Retrieve a file + """ + recid = example_file_record["id"] + key = "article.txt" + file_to_initialise = [ + { + "key": key, + "checksum": "md5:c785060c866796cc2a1708c997154c8e", + "size": 17, # 2kB + "metadata": { + "description": "Published article PDF.", + }, + "transfer": { + "type": "M", + "parts": 2, + "part_size": 10, + }, + } + ] + # Initialize file saving + result = file_service.init_files(identity_simple, recid, file_to_initialise) + result = result.to_dict() + + assert result["entries"][0]["key"] == key + assert "parts" in result["entries"][0]["links"] + + def upload_part(part_no, part_content, part_size): + # for to_file in to_files: + return file_service.set_multipart_file_content( + identity_simple, + recid, + key, + part_no, + BytesIO(part_content), + part_size, + ) + + content = b"test file content" + result = upload_part(1, content[:10], 10) + assert result.to_dict()["key"] == key + + result = upload_part(2, content[10:], 7) + assert result.to_dict()["key"] == key + + result = file_service.commit_file(identity_simple, recid, "article.txt") + assert result.to_dict()["key"] == file_to_initialise[0]["key"] + + # List files + result = file_service.list_files(identity_simple, recid) + assert result.to_dict()["entries"][0]["key"] == file_to_initialise[0]["key"] + assert result.to_dict()["entries"][0]["storage_class"] == "L" + + # Read file metadata + result = file_service.read_file_metadata(identity_simple, recid, "article.txt") + assert result.to_dict()["key"] == file_to_initialise[0]["key"] + assert result.to_dict()["transfer"]["type"] == "L" + + # Retrieve file + result = file_service.get_file_content(identity_simple, recid, "article.txt") + assert result.file_id == "article.txt" + + +# +# External remote files +# + + +def test_remote_file( + file_service, + example_file_record, + identity_simple, + location, +): + """Test the lifecycle of an external remote file.""" + + recid = example_file_record["id"] + file_to_initialise = [ + { + "key": "article.txt", + "transfer": { + "url": "https://inveniordm.test/files/article.txt", + "type": "R", + }, + } + ] + + # Initialize file saving + result = file_service.init_files(identity_simple, recid, file_to_initialise) + file_result = result.to_dict()["entries"][0] + assert file_result["key"] == file_to_initialise[0]["key"] + + assert file_result["transfer"]["type"] == "R" + assert "url" not in file_result["transfer"] + + sent_file = file_service.get_file_content( + identity_simple, recid, "article.txt" + ).send_file() + assert sent_file.status_code == 302 + assert sent_file.headers["Location"] == "https://inveniordm.test/files/article.txt" + + +def test_remote_file_no_permissions( + file_service, + example_file_record, + location, +): + """Test the lifecycle of an external remote file.""" + + recid = example_file_record["id"] + file_to_initialise = [ + { + "key": "article.txt", + "transfer": { + "url": "https://inveniordm.test/files/article.txt", + "type": "R", + }, + } + ] + + i = Identity(None) + i.provides.add(any_user) + + with pytest.raises(PermissionDeniedError): + file_service.init_files(i, recid, file_to_initialise) diff --git a/tests/services/test_service.py b/tests/services/test_service.py index 6848142e..5b2c9669 100644 --- a/tests/services/test_service.py +++ b/tests/services/test_service.py @@ -16,10 +16,7 @@ """ import pytest -from invenio_cache import current_cache from invenio_pidstore.errors import PIDDeletedError -from invenio_search import current_search, current_search_client -from marshmallow import ValidationError from mock_module.api import Record