From 98dc3585b20be5c9c0f13bc98a12a9c00cbe259f Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 15 Oct 2023 11:18:31 -0400 Subject: [PATCH 01/38] Add columns to Asset <-> DataSource Assoc. Tests fail. --- tiled/catalog/orm.py | 84 ++++++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 6822dedc7..48f94edbb 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -1,3 +1,5 @@ +from typing import List + from sqlalchemy import ( JSON, Boolean, @@ -7,11 +9,10 @@ ForeignKey, Index, Integer, - Table, Unicode, ) from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import relationship +from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.schema import UniqueConstraint from sqlalchemy.sql import func @@ -97,20 +98,49 @@ class Node(Timestamped, Base): ) -data_source_asset_association_table = Table( - "data_source_asset_association", - Base.metadata, - Column( - "data_source_id", - Integer, +class DataSourceAssetAssociation(Base): + """ + This describes which Assets are used by which DataSources, and how. + + The 'parameter' describes which argument to pass the asset to in when + constructing the Adapter. If 'parameter' is NULL then the asset is an + indirect dependency, such as a HDF5 data file backing an HDF5 'master' + file. + + If 'num' is NULL, the asset is passed as a scalar value, and there must be + only one for the given 'parameter'. If 'num' is not NULL, all the assets + sharing the same 'parameter' (there may be one or more) will be passed in + as a list, ordered in ascending order of 'num'. + """ + + __tablename__ = "data_source_asset_association" + + data_source_id: Mapped[int] = mapped_column( ForeignKey("data_sources.id", ondelete="CASCADE"), - ), - Column( - "asset_id", - Integer, + primary_key=True, + ) + asset_id: Mapped[int] = mapped_column( ForeignKey("assets.id", ondelete="CASCADE"), - ), -) + primary_key=True, + ) + parameter = Column(Unicode(255), nullable=True) + num = Column(Integer, nullable=True) + + data_source: Mapped["DataSource"] = relationship( + back_populates="asset_associations" + ) + asset: Mapped["Asset"] = relationship(back_populates="data_source_associations") + + # TODO We should additionally ensure that, if there is a row with some + # parameter P and num NULL, that there can be no rows with parameter P and + # num . This may be possible with a trigger. + __table_args__ = ( + UniqueConstraint( + "parameter", + "num", + name="parameter_num_unique_constraint", + ), + ) class DataSource(Timestamped, Base): @@ -129,7 +159,7 @@ class DataSource(Timestamped, Base): __tablename__ = "data_sources" __mapper_args__ = {"eager_defaults": True} - id = Column(Integer, primary_key=True, index=True, autoincrement=True) + id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True) node_id = Column( Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False ) @@ -142,10 +172,15 @@ class DataSource(Timestamped, Base): # This relates to the mutability of the data. management = Column(Enum(Management), nullable=False) - assets = relationship( - "Asset", - secondary=data_source_asset_association_table, + # many-to-many relationship to DataSource, bypassing the `Association` class + assets: Mapped[List["Asset"]] = relationship( + secondary="data_source_asset_association", back_populates="data_sources", + viewonly=True, + ) + # association between Asset -> Association -> DataSource + asset_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( + back_populates="data_source", cascade="all, delete", lazy="selectin", ) @@ -159,7 +194,7 @@ class Asset(Timestamped, Base): __tablename__ = "assets" __mapper_args__ = {"eager_defaults": True} - id = Column(Integer, primary_key=True, index=True, autoincrement=True) + id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True) data_uri = Column(Unicode(1023), index=True, unique=True) is_directory = Column(Boolean, nullable=False) @@ -167,10 +202,15 @@ class Asset(Timestamped, Base): hash_content = Column(Unicode(1023), nullable=True) size = Column(Integer, nullable=True) - data_sources = relationship( - "DataSource", - secondary=data_source_asset_association_table, + # # many-to-many relationship to Asset, bypassing the `Association` class + data_sources: Mapped[List["DataSource"]] = relationship( + secondary="data_source_asset_association", back_populates="assets", + viewonly=True, + ) + # association between DataSource -> Association -> Asset + data_source_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( + back_populates="asset", passive_deletes=True, ) From a4bd9af2d7f5f9f3f43a49cb90f98670c394603a Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 15 Oct 2023 11:58:08 -0400 Subject: [PATCH 02/38] Tweak construction --- tiled/catalog/orm.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 48f94edbb..e3b18a7e8 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -176,13 +176,13 @@ class DataSource(Timestamped, Base): assets: Mapped[List["Asset"]] = relationship( secondary="data_source_asset_association", back_populates="data_sources", + cascade="all, delete", + lazy="selectin", viewonly=True, ) # association between Asset -> Association -> DataSource asset_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( back_populates="data_source", - cascade="all, delete", - lazy="selectin", ) @@ -211,7 +211,6 @@ class Asset(Timestamped, Base): # association between DataSource -> Association -> Asset data_source_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( back_populates="asset", - passive_deletes=True, ) From 5fd277c008a0c9b7a558a76393c88fb2742a3a27 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Wed, 18 Oct 2023 07:05:21 -0400 Subject: [PATCH 03/38] WIP --- tiled/_tests/test_catalog.py | 12 ++++++++++-- tiled/adapters/csv.py | 4 ++-- tiled/adapters/tiff.py | 6 +++--- tiled/catalog/register.py | 12 ++++++++---- tiled/server/schemas.py | 21 +++++++++++++++++++-- 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index d1d9136a5..29d8c1d1a 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -22,7 +22,7 @@ from ..client.xarray import write_xarray_dataset from ..queries import Eq, Key from ..server.app import build_app, build_app_from_config -from ..server.schemas import Asset, DataSource +from ..server.schemas import Asset, DataSource, DataSourceAssetAssociation from ..structures.core import StructureFamily from .utils import enter_password @@ -211,7 +211,15 @@ async def test_write_array_external(a, tmpdir): structure=structure, parameters={}, management="external", - assets=[Asset(data_uri=str(ensure_uri(filepath)), is_directory=False)], + assets=[ + DataSourceAssetAssociation( + parameter="filepath", + num=None, + asset=Asset( + data_uri=str(ensure_uri(filepath)), is_directory=False + ), + ) + ], ) ], ) diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index 717a2858b..d517e98ea 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -5,7 +5,7 @@ def read_csv( - *args, + filepath, structure=None, metadata=None, specs=None, @@ -25,7 +25,7 @@ def read_csv( >>> read_csv("myfiles.*.csv") >>> read_csv("s3://bucket/myfiles.*.csv") """ - ddf = dask.dataframe.read_csv(*args, **kwargs) + ddf = dask.dataframe.read_csv(filepath, **kwargs) # If an instance has previously been created using the same parameters, # then we are here because the caller wants a *fresh* view on this data. # Therefore, we should clear any cached data. diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index f78e8a2da..812c189ab 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -22,15 +22,15 @@ class TiffAdapter: def __init__( self, - path, + filepath, *, structure=None, metadata=None, specs=None, access_policy=None, ): - self._file = tifffile.TiffFile(path) - self._cache_key = (type(self).__module__, type(self).__qualname__, path) + self._file = tifffile.TiffFile(filepath) + self._cache_key = (type(self).__module__, type(self).__qualname__, filepath) self.specs = specs or [] self._provided_metadata = metadata or {} self.access_policy = access_policy diff --git a/tiled/catalog/register.py b/tiled/catalog/register.py index 3baacbad8..3f525111d 100644 --- a/tiled/catalog/register.py +++ b/tiled/catalog/register.py @@ -309,10 +309,14 @@ async def register_single_item( parameters={}, management=Management.external, assets=[ - Asset( - data_uri=str(ensure_uri(str(item.absolute()))), - is_directory=is_directory, - ) + { + "parameter": "filepath", + "num": None, + "asset": Asset( + data_uri=str(ensure_uri(str(item.absolute()))), + is_directory=is_directory, + ), + } ], ) ], diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index 40258fa5f..d1b342057 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -120,6 +120,20 @@ def from_orm(cls, orm): ) +class DataSourceAssetAssociation(pydantic.BaseModel): + parameter: str + num: Optional[int] + asset: Asset + + @classmethod + def from_orm(cls, orm): + return cls( + parameter=orm.parameter, + num=orm.num, + asset=Asset.from_orm(orm.asset), + ) + + class DataSource(pydantic.BaseModel): id: Optional[int] = None structure: Optional[ @@ -133,7 +147,7 @@ class DataSource(pydantic.BaseModel): ] = None mimetype: Optional[str] = None parameters: dict = {} - assets: List[Asset] = [] + assets: List[DataSourceAssetAssociation] = [] management: Management = Management.writable @classmethod @@ -143,7 +157,10 @@ def from_orm(cls, orm): structure=orm.structure, mimetype=orm.mimetype, parameters=orm.parameters, - assets=[Asset.from_orm(asset) for asset in orm.assets], + assets=[ + DataSourceAssetAssociation.from_orm(assoc) + for assoc in orm.assets_associations + ], management=orm.management, ) From 1fd1f98be855ed03f342754f18311eac6e6dd730 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Wed, 18 Oct 2023 12:10:42 -0400 Subject: [PATCH 04/38] Mostly works. Parameter names need adjustment. --- tiled/_tests/test_catalog.py | 20 ++++++++++----- tiled/adapters/awkward_buffers.py | 2 +- tiled/adapters/excel.py | 22 +++------------- tiled/adapters/parquet.py | 6 +++-- tiled/adapters/sparse_blocks_parquet.py | 6 +++-- tiled/adapters/tiff.py | 4 +-- tiled/adapters/zarr.py | 1 + tiled/catalog/adapter.py | 34 ++++++++++++++++--------- tiled/catalog/orm.py | 7 ++++- tiled/catalog/register.py | 19 +++++++------- tiled/server/schemas.py | 31 ++++++++-------------- 11 files changed, 76 insertions(+), 76 deletions(-) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 29d8c1d1a..5bdff209d 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -21,8 +21,8 @@ from ..client import Context, from_context from ..client.xarray import write_xarray_dataset from ..queries import Eq, Key -from ..server.app import build_app, build_app_from_config -from ..server.schemas import Asset, DataSource, DataSourceAssetAssociation +from ..server.app import build_app +from ..server.schemas import Asset, DataSource from ..structures.core import StructureFamily from .utils import enter_password @@ -212,12 +212,11 @@ async def test_write_array_external(a, tmpdir): parameters={}, management="external", assets=[ - DataSourceAssetAssociation( + Asset( parameter="filepath", num=None, - asset=Asset( - data_uri=str(ensure_uri(filepath)), is_directory=False - ), + data_uri=str(ensure_uri(filepath)), + is_directory=False, ) ], ) @@ -244,7 +243,14 @@ async def test_write_dataframe_external_direct(a, tmpdir): structure=structure, parameters={}, management="external", - assets=[Asset(data_uri=str(ensure_uri(filepath)), is_directory=False)], + assets=[ + Asset( + parameter="filepath", + num=None, + data_uri=str(ensure_uri(filepath)), + is_directory=False, + ) + ], ) ], ) diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py index 06f7e2936..6104716fa 100644 --- a/tiled/adapters/awkward_buffers.py +++ b/tiled/adapters/awkward_buffers.py @@ -42,7 +42,7 @@ def init_storage(cls, directory, structure): directory.mkdir(parents=True, exist_ok=True) data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) - return [Asset(data_uri=data_uri, is_directory=True)] + return [Asset(data_uri=data_uri, is_directory=True, parameter="directory")] @classmethod def from_directory( diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index 9ca17b7ee..b5f194a60 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -8,7 +8,7 @@ class ExcelAdapter(MapAdapter): @classmethod - def from_file(cls, file, **kwargs): + def from_file(cls, filepath, **kwargs): """ Read the sheets in an Excel file. @@ -18,32 +18,16 @@ def from_file(cls, file, **kwargs): Examples -------- - Given a file path - >>> ExcelAdapter.from_file("path/to/excel_file.xlsx") - - Given a file object - - >>> file = open("path/to/excel_file.xlsx") - >>> ExcelAdapter.from_file(file) - - Given a pandas.ExcelFile object - - >>> import pandas - >>> ef = pandas.ExcelFile(file) - >>> ExcelAdapter.from_file(ef) """ - if isinstance(file, pandas.ExcelFile): - excel_file = file - else: - excel_file = pandas.ExcelFile(file) + excel_file = pandas.ExcelFile(filepath) # If an instance has previously been created using the same parameters, # then we are here because the caller wants a *fresh* view on this data. # Therefore, we should clear any cached data. cache = get_object_cache() mapping = {} for sheet_name in excel_file.sheet_names: - cache_key = (cls.__module__, cls.__qualname__, file, sheet_name) + cache_key = (cls.__module__, cls.__qualname__, filepath, sheet_name) ddf = dask.dataframe.from_pandas( with_object_cache(cache_key, excel_file.parse, sheet_name), npartitions=1, # TODO Be smarter about this. diff --git a/tiled/adapters/parquet.py b/tiled/adapters/parquet.py index 3171a7ae7..25d871a35 100644 --- a/tiled/adapters/parquet.py +++ b/tiled/adapters/parquet.py @@ -12,13 +12,13 @@ class ParquetDatasetAdapter: def __init__( self, - *partition_paths, + uris, structure, metadata=None, specs=None, access_policy=None, ): - self.partition_paths = sorted(partition_paths) + self.partition_paths = uris self._metadata = metadata or {} self._structure = structure self.specs = list(specs or []) @@ -48,6 +48,8 @@ def init_storage(cls, directory, structure): Asset( data_uri=f"{data_uri}/partition-{i}.parquet", is_directory=False, + parameter="uris", + num=i, ) for i in range(structure.npartitions) ] diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index f7fc121fc..671493a48 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -23,7 +23,7 @@ class SparseBlocksParquetAdapter: def __init__( self, - *block_uris, + block_uris, structure, metadata=None, specs=None, @@ -58,8 +58,10 @@ def init_storage( Asset( data_uri=uri, is_directory=False, + parameter="block_uris", + num=i, ) - for uri in block_uris + for i, uri in enumerate(block_uris) ] return assets diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 812c189ab..8eede8c46 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -85,13 +85,13 @@ class TiffSequenceAdapter: @classmethod def from_files( cls, - *files, + filepaths, structure=None, metadata=None, specs=None, access_policy=None, ): - seq = tifffile.TiffSequence(sorted(files)) + seq = tifffile.TiffSequence(filepaths) return cls( seq, structure=structure, diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 73ba58046..e5c367110 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -48,6 +48,7 @@ def init_storage(cls, directory, structure): Asset( data_uri=data_uri, is_directory=True, + parameter="filepath", ) ] diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 81b16996d..0c53911fd 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -390,8 +390,13 @@ async def get_adapter(self): raise RuntimeError( f"Server configuration has no adapter for mimetype {data_source.mimetype!r}" ) - data_uris = [httpx.URL(asset.data_uri) for asset in data_source.assets] - for data_uri in data_uris: + parameters = collections.defaultdict(list) + for asset in data_source.assets: + data_uri = httpx.URL(asset.data_uri) + if data_uri.scheme != "file": + raise NotImplementedError( + f"Only 'file://...' scheme URLs are currently supported, not {data_uri!r}" + ) if data_uri.scheme == "file": # Protect against misbehaving clients reading from unintended # parts of the filesystem. @@ -407,20 +412,20 @@ async def get_adapter(self): f"Refusing to serve {data_uri} because it is outside " "the readable storage area for this server." ) - paths = [] - for data_uri in data_uris: - if data_uri.scheme != "file": - raise NotImplementedError( - f"Only 'file://...' scheme URLs are currently supported, not {data_uri!r}" - ) - paths.append(safe_path(data_uri)) - adapter_kwargs = dict(data_source.parameters) + path = safe_path(data_uri) + if asset.num is None: + parameters[asset.parameter] = path + else: + # TODO Order these in SQL. + parameters[asset.parameter].append(path) + adapter_kwargs = dict(parameters) + adapter_kwargs.update(data_source.parameters) adapter_kwargs["specs"] = self.node.specs adapter_kwargs["metadata"] = self.node.metadata_ adapter_kwargs["structure"] = data_source.structure adapter_kwargs["access_policy"] = self.access_policy adapter = await anyio.to_thread.run_sync( - partial(adapter_factory, *paths, **adapter_kwargs) + partial(adapter_factory, **adapter_kwargs) ) for query in self.queries: adapter = adapter.search(query) @@ -581,7 +586,12 @@ async def create_node( data_uri=asset.data_uri, is_directory=asset.is_directory, ) - data_source_orm.assets.append(asset_orm) + assoc_orm = orm.DataSourceAssetAssociation( + asset=asset_orm, + parameter=asset.parameter, + num=asset.num, + ) + data_source_orm.asset_associations.append(assoc_orm) db.add(node) await db.commit() await db.refresh(node) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index e3b18a7e8..e78e5d6d9 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -129,13 +129,17 @@ class DataSourceAssetAssociation(Base): data_source: Mapped["DataSource"] = relationship( back_populates="asset_associations" ) - asset: Mapped["Asset"] = relationship(back_populates="data_source_associations") + asset: Mapped["Asset"] = relationship( + back_populates="data_source_associations", lazy="selectin" + ) # TODO We should additionally ensure that, if there is a row with some # parameter P and num NULL, that there can be no rows with parameter P and # num . This may be possible with a trigger. __table_args__ = ( UniqueConstraint( + "data_source_id", + "asset_id", "parameter", "num", name="parameter_num_unique_constraint", @@ -183,6 +187,7 @@ class DataSource(Timestamped, Base): # association between Asset -> Association -> DataSource asset_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( back_populates="data_source", + lazy="selectin", ) diff --git a/tiled/catalog/register.py b/tiled/catalog/register.py index 3f525111d..0f2315331 100644 --- a/tiled/catalog/register.py +++ b/tiled/catalog/register.py @@ -309,14 +309,11 @@ async def register_single_item( parameters={}, management=Management.external, assets=[ - { - "parameter": "filepath", - "num": None, - "asset": Asset( - data_uri=str(ensure_uri(str(item.absolute()))), - is_directory=is_directory, - ), - } + Asset( + data_uri=str(ensure_uri(str(item.absolute()))), + is_directory=is_directory, + parameter="filepath", + ) ], ) ], @@ -365,7 +362,7 @@ async def tiff_sequence( adapter_class = settings.adapters_by_mimetype[mimetype] key = settings.key_from_filename(name) try: - adapter = adapter_class(*sequence) + adapter = adapter_class(sequence) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) return @@ -385,8 +382,10 @@ async def tiff_sequence( Asset( data_uri=str(ensure_uri(str(item.absolute()))), is_directory=False, + parameter="filepaths", + num=i, ) - for item in sorted(sequence) + for i, item in enumerate(sorted(sequence)) ], ) ], diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index d1b342057..b40da5682 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -88,11 +88,19 @@ class Spec(pydantic.BaseModel, extra=pydantic.Extra.forbid, frozen=True): class Asset(pydantic.BaseModel): data_uri: str is_directory: bool + parameter: Optional[str] + num: Optional[int] = None id: Optional[int] = None @classmethod def from_orm(cls, orm): - return cls(id=orm.id, data_uri=orm.data_uri, is_directory=orm.is_directory) + return cls( + data_uri=orm.asset.data_uri, + is_directory=orm.asset.is_directory, + parameter=orm.parameter, + num=orm.num, + id=orm.asset.id, + ) class Management(str, enum.Enum): @@ -120,20 +128,6 @@ def from_orm(cls, orm): ) -class DataSourceAssetAssociation(pydantic.BaseModel): - parameter: str - num: Optional[int] - asset: Asset - - @classmethod - def from_orm(cls, orm): - return cls( - parameter=orm.parameter, - num=orm.num, - asset=Asset.from_orm(orm.asset), - ) - - class DataSource(pydantic.BaseModel): id: Optional[int] = None structure: Optional[ @@ -147,7 +141,7 @@ class DataSource(pydantic.BaseModel): ] = None mimetype: Optional[str] = None parameters: dict = {} - assets: List[DataSourceAssetAssociation] = [] + assets: List[Asset] = [] management: Management = Management.writable @classmethod @@ -157,10 +151,7 @@ def from_orm(cls, orm): structure=orm.structure, mimetype=orm.mimetype, parameters=orm.parameters, - assets=[ - DataSourceAssetAssociation.from_orm(assoc) - for assoc in orm.assets_associations - ], + assets=[Asset.from_orm(assoc) for assoc in orm.asset_associations], management=orm.management, ) From cc49944de522047ee52a284c6cdec1e6e6554c6f Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Wed, 18 Oct 2023 17:53:08 -0400 Subject: [PATCH 05/38] Add dedicated filepath-based constructors. --- tiled/adapters/excel.py | 38 ++++++++++++++++++++++++++++++++++---- tiled/adapters/hdf5.py | 19 ++++++++++++++++--- tiled/adapters/tiff.py | 2 +- tiled/catalog/mimetypes.py | 6 +++--- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index b5f194a60..24adc81ae 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -8,7 +8,7 @@ class ExcelAdapter(MapAdapter): @classmethod - def from_file(cls, filepath, **kwargs): + def from_file(cls, file, **kwargs): """ Read the sheets in an Excel file. @@ -18,16 +18,28 @@ def from_file(cls, filepath, **kwargs): Examples -------- - >>> ExcelAdapter.from_file("path/to/excel_file.xlsx") + Given a file object + + >>> file = open("path/to/excel_file.xlsx") + >>> ExcelAdapter.from_file(file) + + Given a pandas.ExcelFile object + + >>> import pandas + >>> ef = pandas.ExcelFile(filepath) + >>> ExcelAdapter.from_file(ef) """ - excel_file = pandas.ExcelFile(filepath) + if isinstance(file, pandas.ExcelFile): + excel_file = file + else: + excel_file = pandas.ExcelFile(file) # If an instance has previously been created using the same parameters, # then we are here because the caller wants a *fresh* view on this data. # Therefore, we should clear any cached data. cache = get_object_cache() mapping = {} for sheet_name in excel_file.sheet_names: - cache_key = (cls.__module__, cls.__qualname__, filepath, sheet_name) + cache_key = (cls.__module__, cls.__qualname__, file, sheet_name) ddf = dask.dataframe.from_pandas( with_object_cache(cache_key, excel_file.parse, sheet_name), npartitions=1, # TODO Be smarter about this. @@ -37,3 +49,21 @@ def from_file(cls, filepath, **kwargs): cache.discard_dask(ddf.__dask_keys__()) # dask tasks mapping[sheet_name] = DataFrameAdapter.from_dask_dataframe(ddf) return cls(mapping, **kwargs) + + @classmethod + def from_filepath(cls, filepath, **kwargs): + """ + Read the sheets in an Excel file. + + This maps the Excel file, which may contain one of more spreadsheets, + onto a tree of tabular structures. + + Examples + -------- + + Given a file path + + >>> ExcelAdapter.from_file("path/to/excel_file.xlsx") + """ + file = pandas.ExcelFile(filepath) + return cls.from_file(file) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index c767cbe5b..311226e3e 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -31,7 +31,7 @@ class HDF5Adapter(collections.abc.Mapping, IndexersMixin): From the root node of a file given a filepath >>> import h5py - >>> HDF5Adapter.from_file("path/to/file.h5") + >>> HDF5Adapter.from_filepath("path/to/file.h5") From the root node of a file given an h5py.File object @@ -70,10 +70,23 @@ def from_file( specs=None, access_policy=None, ): - if not isinstance(file, h5py.File): - file = h5py.File(file, "r", swmr=swmr, libver=libver) return cls(file, metadata=metadata, specs=specs, access_policy=access_policy) + @classmethod + def from_filepath( + cls, + filepath, + *, + structure=None, + metadata=None, + swmr=SWMR_DEFAULT, + libver="latest", + specs=None, + access_policy=None, + ): + file = h5py.File(filepath, "r", swmr=swmr, libver=libver) + return cls.from_file(file) + def __repr__(self): return node_repr(self, list(self)) diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 8eede8c46..40fd058b5 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -83,7 +83,7 @@ class TiffSequenceAdapter: structure_family = "array" @classmethod - def from_files( + def from_filepaths( cls, filepaths, structure=None, diff --git a/tiled/catalog/mimetypes.py b/tiled/catalog/mimetypes.py index 0d99a7660..12f00a32d 100644 --- a/tiled/catalog/mimetypes.py +++ b/tiled/catalog/mimetypes.py @@ -17,16 +17,16 @@ ).TiffAdapter, "multipart/related;type=image/tiff": lambda: importlib.import_module( "...adapters.tiff", __name__ - ).TiffSequenceAdapter.from_files, + ).TiffSequenceAdapter.from_filepaths, "text/csv": lambda: importlib.import_module( "...adapters.csv", __name__ ).read_csv, XLSX_MIME_TYPE: lambda: importlib.import_module( "...adapters.excel", __name__ - ).ExcelAdapter.from_file, + ).ExcelAdapter.from_filepath, "application/x-hdf5": lambda: importlib.import_module( "...adapters.hdf5", __name__ - ).HDF5Adapter.from_file, + ).HDF5Adapter.from_filepath, "application/x-netcdf": lambda: importlib.import_module( "...adapters.netcdf", __name__ ).read_netcdf, From 4df4af9b4bd62d959bbcb6b895c833d70ed4aa22 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Wed, 18 Oct 2023 18:22:48 -0400 Subject: [PATCH 06/38] Sort assets by (parameter, num). --- tiled/catalog/orm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index e78e5d6d9..b3ac91419 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -188,6 +188,7 @@ class DataSource(Timestamped, Base): asset_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( back_populates="data_source", lazy="selectin", + order_by=[DataSourceAssetAssociation.parameter, DataSourceAssetAssociation.num], ) From 40571c8b2f64f58aa10e6cb7cad700b73a890a8f Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Thu, 19 Oct 2023 12:32:06 -0400 Subject: [PATCH 07/38] Test registering TIFF seq out of alphanumeric order. --- tiled/_tests/test_directory_walker.py | 124 ++++++++++++++++++++++---- tiled/catalog/adapter.py | 3 + tiled/catalog/register.py | 78 ++++++++-------- 3 files changed, 153 insertions(+), 52 deletions(-) diff --git a/tiled/_tests/test_directory_walker.py b/tiled/_tests/test_directory_walker.py index f6799741c..ed48b4741 100644 --- a/tiled/_tests/test_directory_walker.py +++ b/tiled/_tests/test_directory_walker.py @@ -1,20 +1,30 @@ +import dataclasses import platform +import random from pathlib import Path +import numpy import pytest import tifffile +import yaml +from ..adapters.tiff import TiffAdapter from ..catalog import in_memory from ..catalog.register import ( + Settings, + create_node_safe, + group_tiff_sequences, identity, register, + register_tiff_sequence, skip_all, strip_suffixes, - tiff_sequence, ) +from ..catalog.utils import ensure_uri from ..client import Context, from_context from ..examples.generate_files import data, df1, generate_files from ..server.app import build_app +from ..server.schemas import Asset, DataSource, Management @pytest.fixture @@ -41,9 +51,9 @@ async def test_collision(example_data_dir, tmpdir): p = Path(example_data_dir, "a.tiff") tifffile.imwrite(str(p), data) - tree = in_memory() - with Context.from_app(build_app(tree)) as context: - await register(tree, example_data_dir) + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, example_data_dir) client = from_context(context) @@ -54,7 +64,7 @@ async def test_collision(example_data_dir, tmpdir): p.unlink() # Re-run registration; entry should be there now. - await register(tree, example_data_dir) + await register(catalog, example_data_dir) assert "a" in client @@ -73,9 +83,9 @@ async def test_same_filename_separate_directory(tmpdir): Path(tmpdir, "two").mkdir() df1.to_csv(Path(tmpdir, "one", "a.csv")) df1.to_csv(Path(tmpdir, "two", "a.csv")) - tree = in_memory() - with Context.from_app(build_app(tree)) as context: - await register(tree, tmpdir) + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, tmpdir) client = from_context(context) assert "a" in client["one"] assert "a" in client["two"] @@ -108,10 +118,10 @@ def detect_mimetype(path, mimetype): return "text/csv" return mimetype - tree = in_memory() - with Context.from_app(build_app(tree)) as context: + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: await register( - tree, + catalog, tmpdir, mimetype_detection_hook=detect_mimetype, key_from_filename=identity, @@ -130,18 +140,100 @@ async def test_skip_all_in_combination(tmpdir): for i in range(2): tifffile.imwrite(Path(tmpdir, "one", f"image{i:05}.tif"), data) - tree = in_memory() + catalog = in_memory(writable_storage=tmpdir) # By default, both file and tiff sequence are registered. - with Context.from_app(build_app(tree)) as context: - await register(tree, tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, tmpdir) client = from_context(context) assert "a" in client assert "a" in client["one"] assert "image" in client["one"] # With skip_all, directories and tiff sequence are registered, but individual files are not - with Context.from_app(build_app(tree)) as context: - await register(tree, tmpdir, walkers=[tiff_sequence, skip_all]) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, tmpdir, walkers=[group_tiff_sequences, skip_all]) client = from_context(context) assert list(client) == ["one"] assert "image" in client["one"] + + +@pytest.mark.asyncio +async def test_tiff_seq_custom_sorting(tmpdir): + "Register TIFFs that are not in alphanumeric order." + N = 10 + ordering = list(range(N)) + random.Random(0).shuffle(ordering) + files = [] + for i in ordering: + file = Path(tmpdir, f"image{i:05}.tif") + files.append(file) + tifffile.imwrite(file, i * data) + + settings = Settings.init() + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register_tiff_sequence( + catalog, + "image", + files, + settings, + ) + client = from_context(context) + actual = list(client["image"][:, 0, 0]) + assert actual == ordering + + +@pytest.mark.asyncio +async def test_image_file_with_sidecar_metadata_file(tmpdir): + "Create one Node from two different types of files." + MIMETYPE = "multipart/related;type=application/x-tiff-with-yaml" + image_filepath = Path(tmpdir, "image.tif") + tifffile.imwrite(image_filepath, data) + metadata_filepath = Path(tmpdir, "metadata.yml") + metadata = {"test_key": 3.0} + with open(metadata_filepath, "w") as file: + yaml.dump(metadata, file) + + def read_tiff_with_yaml_metadata( + image_filepath, metadata_filepath, metadata=None, **kwargs + ): + with open(metadata_filepath) as file: + metadata = yaml.safe_load(file) + return TiffAdapter(image_filepath, metadata=metadata, **kwargs) + + catalog = in_memory( + writable_storage=tmpdir, + adapters_by_mimetype={MIMETYPE: read_tiff_with_yaml_metadata}, + ) + with Context.from_app(build_app(catalog)) as context: + adapter = read_tiff_with_yaml_metadata(image_filepath, metadata_filepath) + await create_node_safe( + catalog, + key="image", + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=[ + DataSource( + mimetype=MIMETYPE, + structure=dataclasses.asdict(adapter.structure()), + parameters={}, + management=Management.external, + assets=[ + Asset( + data_uri=str(ensure_uri(str(metadata_filepath))), + is_directory=False, + parameter="metadata_filepath", + ), + Asset( + data_uri=str(ensure_uri(str(image_filepath))), + is_directory=False, + parameter="image_filepath", + ), + ], + ) + ], + ) + client = from_context(context) + assert numpy.array_equal(data, client["image"][:]) + assert client["image"].metadata["test_key"] == 3.0 diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 0c53911fd..cc444703d 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -1069,12 +1069,14 @@ def structure_family(query, tree): def in_memory( + *, metadata=None, specs=None, access_policy=None, writable_storage=None, readable_storage=None, echo=DEFAULT_ECHO, + adapters_by_mimetype=None, ): uri = "sqlite+aiosqlite:///:memory:" return from_uri( @@ -1085,6 +1087,7 @@ def in_memory( writable_storage=writable_storage, readable_storage=readable_storage, echo=echo, + adapters_by_mimetype=adapters_by_mimetype, ) diff --git a/tiled/catalog/register.py b/tiled/catalog/register.py index 0f2315331..d66c06889 100644 --- a/tiled/catalog/register.py +++ b/tiled/catalog/register.py @@ -326,7 +326,7 @@ async def register_single_item( TIFF_SEQUENCE_EMPTY_NAME_ROOT = "_unnamed" -async def tiff_sequence( +async def group_tiff_sequences( catalog, path, files, @@ -356,43 +356,49 @@ async def tiff_sequence( sequences[sequence_name].append(file) continue unhandled_files.append(file) - mimetype = "multipart/related;type=image/tiff" - for name, sequence in sorted(sequences.items()): - logger.info(" Grouped %d TIFFs into a sequence '%s'", len(sequence), name) - adapter_class = settings.adapters_by_mimetype[mimetype] - key = settings.key_from_filename(name) - try: - adapter = adapter_class(sequence) - except Exception: - logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) - return - await create_node_safe( - catalog, - key=key, - structure_family=adapter.structure_family, - metadata=dict(adapter.metadata()), - specs=adapter.specs, - data_sources=[ - DataSource( - mimetype=mimetype, - structure=dict_or_none(adapter.structure()), - parameters={}, - management=Management.external, - assets=[ - Asset( - data_uri=str(ensure_uri(str(item.absolute()))), - is_directory=False, - parameter="filepaths", - num=i, - ) - for i, item in enumerate(sorted(sequence)) - ], - ) - ], - ) + for name, sequence in sequences.items(): + await register_tiff_sequence(catalog, name, sorted(sequence), settings) return unhandled_files, unhandled_directories +TIFF_SEQ_MIMETYPE = "multipart/related;type=image/tiff" + + +async def register_tiff_sequence(catalog, name, sequence, settings): + logger.info(" Grouped %d TIFFs into a sequence '%s'", len(sequence), name) + adapter_class = settings.adapters_by_mimetype[TIFF_SEQ_MIMETYPE] + key = settings.key_from_filename(name) + try: + adapter = adapter_class(sequence) + except Exception: + logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) + return + await create_node_safe( + catalog, + key=key, + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=[ + DataSource( + mimetype=TIFF_SEQ_MIMETYPE, + structure=dict_or_none(adapter.structure()), + parameters={}, + management=Management.external, + assets=[ + Asset( + data_uri=str(ensure_uri(str(item.absolute()))), + is_directory=False, + parameter="filepaths", + num=i, + ) + for i, item in enumerate(sequence) + ], + ) + ], + ) + + async def skip_all( catalog, path, @@ -410,7 +416,7 @@ async def skip_all( return [], directories -DEFAULT_WALKERS = [tiff_sequence, one_node_per_item] +DEFAULT_WALKERS = [group_tiff_sequences, one_node_per_item] async def watch( From 88b5671ad387827f258e9373b770c82b2580fdbd Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Thu, 19 Oct 2023 16:14:57 -0400 Subject: [PATCH 08/38] Test HDF5 virtual dataset. --- tiled/_tests/test_directory_walker.py | 56 +++++++++++++++++++++++++++ tiled/catalog/adapter.py | 2 + 2 files changed, 58 insertions(+) diff --git a/tiled/_tests/test_directory_walker.py b/tiled/_tests/test_directory_walker.py index ed48b4741..88cbefb8e 100644 --- a/tiled/_tests/test_directory_walker.py +++ b/tiled/_tests/test_directory_walker.py @@ -3,11 +3,13 @@ import random from pathlib import Path +import h5py import numpy import pytest import tifffile import yaml +from ..adapters.hdf5 import HDF5Adapter from ..adapters.tiff import TiffAdapter from ..catalog import in_memory from ..catalog.register import ( @@ -237,3 +239,57 @@ def read_tiff_with_yaml_metadata( client = from_context(context) assert numpy.array_equal(data, client["image"][:]) assert client["image"].metadata["test_key"] == 3.0 + + +@pytest.mark.asyncio +async def test_hdf5_virtual_datasets(tmpdir): + layout = h5py.VirtualLayout(shape=(4, 100), dtype="i4") + + data_filepaths = [] + for n in range(1, 5): + filepath = Path(tmpdir, f"{n}.h5") + data_filepaths.append(filepath) + vsource = h5py.VirtualSource(filepath, "data", shape=(100,)) + layout[n - 1] = vsource + + # Add virtual dataset to output file + filepath = Path(tmpdir, "VDS.h5") + with h5py.File(filepath, "w", libver="latest") as file: + file.create_virtual_dataset("data", layout, fillvalue=-5) + + assets = [ + Asset( + data_uri=str(ensure_uri(str(fp))), + is_directory=False, + parameter=None, # an indirect dependency + ) + for fp in data_filepaths + ] + assets.append( + Asset( + data_uri=str(ensure_uri(str(filepath))), + is_directory=False, + parameter="filepath", + ) + ) + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + adapter = HDF5Adapter.from_filepath(filepath) + await create_node_safe( + catalog, + key="VDS", + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=[ + DataSource( + mimetype="application/x-hdf5", + structure=None, + parameters={}, + management=Management.external, + assets=assets, + ) + ], + ) + client = from_context(context) + client["VDS"]["data"][:] diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index cc444703d..02fba74ac 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -392,6 +392,8 @@ async def get_adapter(self): ) parameters = collections.defaultdict(list) for asset in data_source.assets: + if asset.parameter is None: + continue data_uri = httpx.URL(asset.data_uri) if data_uri.scheme != "file": raise NotImplementedError( From b49fd3c86b6af641384f4a9da3c64ff21dab1fcc Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 24 Oct 2023 14:29:43 -0400 Subject: [PATCH 09/38] copyedit docstring Co-authored-by: Padraic Shafer <76011594+padraic-shafer@users.noreply.github.com> --- tiled/adapters/excel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index 24adc81ae..b93b8ff2e 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -26,6 +26,7 @@ def from_file(cls, file, **kwargs): Given a pandas.ExcelFile object >>> import pandas + >>> filepath = "path/to/excel_file.xlsx" >>> ef = pandas.ExcelFile(filepath) >>> ExcelAdapter.from_file(ef) """ From d4b0de10b4f703bb8ec082ae5ff7dd30a02ce523 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 24 Oct 2023 14:31:18 -0400 Subject: [PATCH 10/38] Copy edit docstring Co-authored-by: Padraic Shafer <76011594+padraic-shafer@users.noreply.github.com> --- tiled/catalog/orm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index b3ac91419..f175ef00b 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -102,7 +102,7 @@ class DataSourceAssetAssociation(Base): """ This describes which Assets are used by which DataSources, and how. - The 'parameter' describes which argument to pass the asset to in when + The 'parameter' describes which argument to pass the asset into when constructing the Adapter. If 'parameter' is NULL then the asset is an indirect dependency, such as a HDF5 data file backing an HDF5 'master' file. From 63d4ca6ffd09bab6fd0e32fdac013db71d4c8f2a Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 Nov 2023 15:55:35 -0500 Subject: [PATCH 11/38] Test for parameter, num uniqueness constraints. --- tiled/_tests/test_catalog.py | 80 +++++++++++++++++++++++++++++++++++- tiled/catalog/orm.py | 1 - 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 5bdff209d..18d94fe6b 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -8,6 +8,7 @@ import pandas.testing import pytest import pytest_asyncio +import sqlalchemy.exc import tifffile import xarray @@ -17,12 +18,13 @@ from ..catalog import in_memory from ..catalog.adapter import WouldDeleteData from ..catalog.explain import record_explanations +from ..catalog.register import create_node_safe from ..catalog.utils import ensure_uri from ..client import Context, from_context from ..client.xarray import write_xarray_dataset from ..queries import Eq, Key -from ..server.app import build_app -from ..server.schemas import Asset, DataSource +from ..server.app import build_app, build_app_from_config +from ..server.schemas import Asset, DataSource, Management from ..structures.core import StructureFamily from .utils import enter_password @@ -425,3 +427,77 @@ async def test_access_control(tmpdir): public_client["outer_z"]["inner"].read() with pytest.raises(KeyError): public_client["outer_x"] + + +@pytest.mark.parametrize( + "assets", + [ + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=None, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=1, + ), + ], + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=None, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=None, + ), + ], + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=1, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=1, + ), + ], + ], + ids=[ + "mix-null-and-int", + "duplicate-null", + "duplicate-int", + ], +) +async def test_constraints_on_parameter_and_num(a, assets): + "Test constraints enforced by database on 'parameter' and 'num'." + arr_adapter = ArrayAdapter.from_array([1, 2, 3]) + with pytest.raises(sqlalchemy.exc.IntegrityError): + await create_node_safe( + a, + key="test", + structure_family=arr_adapter.structure_family, + metadata=dict(arr_adapter.metadata()), + specs=arr_adapter.specs, + data_sources=[ + DataSource( + mimetype="application/x-test", + structure=asdict(arr_adapter.structure()), + parameters={}, + management=Management.external, + assets=assets, + ) + ], + ) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index f175ef00b..ad3047e59 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -139,7 +139,6 @@ class DataSourceAssetAssociation(Base): __table_args__ = ( UniqueConstraint( "data_source_id", - "asset_id", "parameter", "num", name="parameter_num_unique_constraint", From ebb187a73303369f499197fb4be1ae77e5f7cdda Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 Nov 2023 16:20:00 -0500 Subject: [PATCH 12/38] Create trigger. Caught one case but not the other. --- tiled/catalog/orm.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index ad3047e59..68d39247e 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -10,6 +10,8 @@ Index, Integer, Unicode, + event, + text, ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -146,6 +148,31 @@ class DataSourceAssetAssociation(Base): ) +@event.listens_for(DataSourceAssetAssociation.__table__, "after_create") +def create_trigger_unique_parameter_num_null_check(target, connection, **kw): + connection.execute( + text( + """ +CREATE TRIGGER unique_parameter_num_null_check +BEFORE INSERT ON data_source_asset_association +WHEN NEW.num IS NULL +BEGIN + SELECT RAISE(ABORT, 'Can only insert num=NULL if no other row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + ); +END""" + ) + ) + # Additionally ensure that we cannot mix NULL and INTEGER values of num for + # a given data_source_id and parameter, and that there cannot be multiple + # instances of NULL. + + class DataSource(Timestamped, Base): """ The describes how to open one or more file/blobs to extract data for a Node. From cd1ae5597213905d5bd2c880e945e0b4527eb6bb Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Fri, 10 Nov 2023 16:08:06 -0500 Subject: [PATCH 13/38] Trigger works for all cases, on SQLite. --- tiled/catalog/orm.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 68d39247e..ac794fda0 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -149,11 +149,15 @@ class DataSourceAssetAssociation(Base): @event.listens_for(DataSourceAssetAssociation.__table__, "after_create") -def create_trigger_unique_parameter_num_null_check(target, connection, **kw): - connection.execute( - text( - """ -CREATE TRIGGER unique_parameter_num_null_check +def unique_parameter_num_null_check(target, connection, **kw): + # Ensure that we cannot mix NULL and INTEGER values of num for + # a given data_source_id and parameter, and that there cannot be multiple + # instances of NULL. + if connection.engine.dialect.name == "sqlite": + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_null_if_num_int_exists BEFORE INSERT ON data_source_asset_association WHEN NEW.num IS NULL BEGIN @@ -166,11 +170,27 @@ def create_trigger_unique_parameter_num_null_check(target, connection, **kw): AND data_source_id = NEW.data_source_id ); END""" + ) + ) + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_int_if_num_null_exists +BEFORE INSERT ON data_source_asset_association +WHEN NEW.num IS NOT NULL +BEGIN + SELECT RAISE(ABORT, 'Can only insert INTEGER num if no NULL row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND num IS NULL + AND data_source_id = NEW.data_source_id + ); +END""" + ) ) - ) - # Additionally ensure that we cannot mix NULL and INTEGER values of num for - # a given data_source_id and parameter, and that there cannot be multiple - # instances of NULL. class DataSource(Timestamped, Base): From 34b5d16f764e9d3eec675779b8f8f5501dc5fee4 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 21 Nov 2023 16:49:43 -0500 Subject: [PATCH 14/38] Tests pass on PostgreSQL also --- tiled/_tests/test_catalog.py | 25 +++++++++++++++++++++++-- tiled/catalog/orm.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 18d94fe6b..ccf7bdba5 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -8,6 +8,7 @@ import pandas.testing import pytest import pytest_asyncio +import sqlalchemy.dialects.postgresql.asyncpg import sqlalchemy.exc import tifffile import xarray @@ -446,6 +447,20 @@ async def test_access_control(tmpdir): num=1, ), ], + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=1, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=None, + ), + ], [ Asset( data_uri="file://localhost/test1", @@ -476,7 +491,8 @@ async def test_access_control(tmpdir): ], ], ids=[ - "mix-null-and-int", + "null-then-int", + "int-then-null", "duplicate-null", "duplicate-int", ], @@ -484,7 +500,12 @@ async def test_access_control(tmpdir): async def test_constraints_on_parameter_and_num(a, assets): "Test constraints enforced by database on 'parameter' and 'num'." arr_adapter = ArrayAdapter.from_array([1, 2, 3]) - with pytest.raises(sqlalchemy.exc.IntegrityError): + with pytest.raises( + ( + sqlalchemy.exc.IntegrityError, # SQLite + sqlalchemy.exc.DBAPIError, # PostgreSQL + ) + ): await create_node_safe( a, key="test", diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index ac794fda0..0174ee70f 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -191,6 +191,35 @@ def unique_parameter_num_null_check(target, connection, **kw): END""" ) ) + elif connection.engine.dialect.name == "postgresql": + connection.execute( + text( + """ +CREATE OR REPLACE FUNCTION test_parameter_exists() +RETURNS TRIGGER AS $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + ) THEN + RAISE EXCEPTION 'Can only insert num=NULL if no other row exists for the same parameter'; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql;""" + ) + ) + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_null_if_num_int_exists +BEFORE INSERT ON data_source_asset_association +FOR EACH ROW +EXECUTE FUNCTION test_parameter_exists();""" + ) + ) class DataSource(Timestamped, Base): From b78f7a11f58da02c98adb77e4cbed6c9a0a25fd9 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 15:38:14 -0500 Subject: [PATCH 15/38] Adapters expect URIs --- tiled/_tests/test_catalog.py | 20 +++++++------- tiled/adapters/awkward_buffers.py | 11 +++++--- tiled/adapters/csv.py | 4 ++- tiled/adapters/excel.py | 4 +-- tiled/adapters/hdf5.py | 7 ++--- tiled/adapters/parquet.py | 17 ++++++------ tiled/adapters/sparse_blocks_parquet.py | 20 ++++++-------- tiled/adapters/table.py | 4 +-- tiled/adapters/tiff.py | 7 +++-- tiled/adapters/zarr.py | 35 +++++-------------------- tiled/catalog/adapter.py | 31 +++++++++++----------- tiled/catalog/utils.py | 35 ++++++------------------- tiled/utils.py | 15 +++++++++++ 13 files changed, 96 insertions(+), 114 deletions(-) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index ccf7bdba5..516c8b254 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -200,9 +200,10 @@ async def test_metadata_index_is_used(example_data_adapter): @pytest.mark.asyncio async def test_write_array_external(a, tmpdir): arr = numpy.ones((5, 3)) - filepath = tmpdir / "file.tiff" - tifffile.imwrite(str(filepath), arr) - ad = TiffAdapter(str(filepath)) + filepath = str(tmpdir / "file.tiff") + data_uri = ensure_uri(filepath) + tifffile.imwrite(filepath, arr) + ad = TiffAdapter(data_uri) structure = asdict(ad.structure()) await a.create_node( key="x", @@ -216,9 +217,9 @@ async def test_write_array_external(a, tmpdir): management="external", assets=[ Asset( - parameter="filepath", + parameter="data_uri", num=None, - data_uri=str(ensure_uri(filepath)), + data_uri=str(data_uri), is_directory=False, ) ], @@ -232,9 +233,10 @@ async def test_write_array_external(a, tmpdir): @pytest.mark.asyncio async def test_write_dataframe_external_direct(a, tmpdir): df = pandas.DataFrame(numpy.ones((5, 3)), columns=list("abc")) - filepath = tmpdir / "file.csv" + filepath = str(tmpdir / "file.csv") + data_uri = ensure_uri(filepath) df.to_csv(filepath, index=False) - dfa = read_csv(filepath) + dfa = read_csv(data_uri) structure = asdict(dfa.structure()) await a.create_node( key="x", @@ -248,9 +250,9 @@ async def test_write_dataframe_external_direct(a, tmpdir): management="external", assets=[ Asset( - parameter="filepath", + parameter="data_uri", num=None, - data_uri=str(ensure_uri(filepath)), + data_uri=data_uri, is_directory=False, ) ], diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py index 6104716fa..2c084cedd 100644 --- a/tiled/adapters/awkward_buffers.py +++ b/tiled/adapters/awkward_buffers.py @@ -2,11 +2,11 @@ A directory containing awkward buffers, one file per form key. """ import collections.abc -from urllib import parse import awkward.forms from ..structures.core import StructureFamily +from ..utils import path_from_uri from .awkward import AwkwardAdapter @@ -37,23 +37,26 @@ class AwkwardBuffersAdapter(AwkwardAdapter): structure_family = StructureFamily.awkward @classmethod - def init_storage(cls, directory, structure): + def init_storage(cls, data_uri, structure): from ..server.schemas import Asset + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) - data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) return [Asset(data_uri=data_uri, is_directory=True, parameter="directory")] @classmethod def from_directory( cls, - directory, + data_uri, structure, metadata=None, specs=None, access_policy=None, ): form = awkward.forms.from_dict(structure.form) + directory = path_from_uri(data_uri) + if not directory.is_dir(): + raise ValueError(f"Not a directory: {directory}") container = DirectoryContainer(directory, form) return cls( container, diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index d517e98ea..eda87889a 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -1,11 +1,12 @@ import dask.dataframe from ..server.object_cache import NO_CACHE, get_object_cache +from ..utils import path_from_uri from .dataframe import DataFrameAdapter def read_csv( - filepath, + data_uri, structure=None, metadata=None, specs=None, @@ -25,6 +26,7 @@ def read_csv( >>> read_csv("myfiles.*.csv") >>> read_csv("s3://bucket/myfiles.*.csv") """ + filepath = path_from_uri(data_uri) ddf = dask.dataframe.read_csv(filepath, **kwargs) # If an instance has previously been created using the same parameters, # then we are here because the caller wants a *fresh* view on this data. diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index b93b8ff2e..c60fee471 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -52,7 +52,7 @@ def from_file(cls, file, **kwargs): return cls(mapping, **kwargs) @classmethod - def from_filepath(cls, filepath, **kwargs): + def from_uri(cls, data_uri, **kwargs): """ Read the sheets in an Excel file. @@ -66,5 +66,5 @@ def from_filepath(cls, filepath, **kwargs): >>> ExcelAdapter.from_file("path/to/excel_file.xlsx") """ - file = pandas.ExcelFile(filepath) + file = pandas.ExcelFile(data_uri) return cls.from_file(file) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index 311226e3e..3c1b4da31 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -8,7 +8,7 @@ from ..adapters.utils import IndexersMixin from ..iterviews import ItemsView, KeysView, ValuesView from ..structures.core import StructureFamily -from ..utils import node_repr +from ..utils import node_repr, path_from_uri from .array import ArrayAdapter SWMR_DEFAULT = bool(int(os.getenv("TILED_HDF5_SWMR_DEFAULT", "0"))) @@ -73,9 +73,9 @@ def from_file( return cls(file, metadata=metadata, specs=specs, access_policy=access_policy) @classmethod - def from_filepath( + def from_uri( cls, - filepath, + data_uri, *, structure=None, metadata=None, @@ -84,6 +84,7 @@ def from_filepath( specs=None, access_policy=None, ): + filepath = path_from_uri(data_uri) file = h5py.File(filepath, "r", swmr=swmr, libver=libver) return cls.from_file(file) diff --git a/tiled/adapters/parquet.py b/tiled/adapters/parquet.py index 25d871a35..7f491c779 100644 --- a/tiled/adapters/parquet.py +++ b/tiled/adapters/parquet.py @@ -1,9 +1,9 @@ from pathlib import Path -from urllib import parse import dask.dataframe from ..structures.core import StructureFamily +from ..utils import path_from_uri from .dataframe import DataFrameAdapter @@ -12,13 +12,14 @@ class ParquetDatasetAdapter: def __init__( self, - uris, + data_uris, structure, metadata=None, specs=None, access_policy=None, ): - self.partition_paths = uris + # TODO Store data_uris instead and generalize to non-file schemes. + self._partition_paths = [path_from_uri(uri) for uri in data_uris] self._metadata = metadata or {} self._structure = structure self.specs = list(specs or []) @@ -30,7 +31,7 @@ def metadata(self): @property def dataframe_adapter(self): partitions = [] - for path in self.partition_paths: + for path in self._partition_paths: if not Path(path).exists(): partition = None else: @@ -39,11 +40,11 @@ def dataframe_adapter(self): return DataFrameAdapter(partitions, self._structure) @classmethod - def init_storage(cls, directory, structure): + def init_storage(cls, data_uri, structure): from ..server.schemas import Asset + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) - data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) assets = [ Asset( data_uri=f"{data_uri}/partition-{i}.parquet", @@ -56,13 +57,13 @@ def init_storage(cls, directory, structure): return assets def write_partition(self, data, partition): - uri = self.partition_paths[partition] + uri = self._partition_paths[partition] data.to_parquet(uri) def write(self, data): if self.structure().npartitions != 1: raise NotImplementedError - uri = self.partition_paths[0] + uri = self._partition_paths[0] data.to_parquet(uri) def read(self, *args, **kwargs): diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index 671493a48..e27dd4296 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -1,5 +1,4 @@ import itertools -from urllib import parse import numpy import pandas @@ -7,6 +6,7 @@ from ..adapters.array import slice_and_shape_from_block_and_chunks from ..structures.core import StructureFamily +from ..utils import path_from_uri def load_block(uri): @@ -23,7 +23,7 @@ class SparseBlocksParquetAdapter: def __init__( self, - block_uris, + data_uris, structure, metadata=None, specs=None, @@ -31,7 +31,7 @@ def __init__( ): num_blocks = (range(len(n)) for n in structure.chunks) self.blocks = {} - for block, uri in zip(itertools.product(*num_blocks), sorted(block_uris)): + for block, uri in zip(itertools.product(*num_blocks), data_uris): self.blocks[block] = uri self._structure = structure self._metadata = metadata or {} @@ -41,27 +41,23 @@ def __init__( @classmethod def init_storage( cls, - directory, + data_uri, structure, ): from ..server.schemas import Asset + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) num_blocks = (range(len(n)) for n in structure.chunks) - block_uris = [] - for block in itertools.product(*num_blocks): - filepath = directory / f"block-{'.'.join(map(str, block))}.parquet" - uri = parse.urlunparse(("file", "localhost", str(filepath), "", "", None)) - block_uris.append(uri) assets = [ Asset( - data_uri=uri, + data_uri=f"{data_uri}/block-{'.'.join(map(str, block))}.parquet", is_directory=False, - parameter="block_uris", + parameter="data_uris", num=i, ) - for i, uri in enumerate(block_uris) + for i, block in enumerate(itertools.product(*num_blocks)) ] return assets diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index 41b3ce742..d44bf4989 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -25,14 +25,14 @@ class TableAdapter: @classmethod def from_pandas( cls, - *args, + data_uri, metadata=None, specs=None, access_policy=None, npartitions=1, **kwargs, ): - ddf = dask.dataframe.from_pandas(*args, npartitions=npartitions, **kwargs) + ddf = dask.dataframe.from_pandas(data_uri, npartitions=npartitions, **kwargs) if specs is None: specs = [Spec("dataframe")] return cls.from_dask_dataframe( diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 40fd058b5..7dde2b70c 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -6,6 +6,7 @@ from ..server.object_cache import with_object_cache from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import StructureFamily +from ..utils import path_from_uri class TiffAdapter: @@ -22,13 +23,14 @@ class TiffAdapter: def __init__( self, - filepath, + data_uri, *, structure=None, metadata=None, specs=None, access_policy=None, ): + filepath = path_from_uri(data_uri) self._file = tifffile.TiffFile(filepath) self._cache_key = (type(self).__module__, type(self).__qualname__, filepath) self.specs = specs or [] @@ -85,12 +87,13 @@ class TiffSequenceAdapter: @classmethod def from_filepaths( cls, - filepaths, + data_uris, structure=None, metadata=None, specs=None, access_policy=None, ): + filepaths = [path_from_uri(data_uri) for data_uri in data_uris] seq = tifffile.TiffSequence(filepaths) return cls( seq, diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index e5c367110..020f0e102 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -9,32 +9,32 @@ from ..adapters.utils import IndexersMixin from ..iterviews import ItemsView, KeysView, ValuesView -from ..structures.array import ArrayStructure from ..structures.core import StructureFamily -from ..utils import node_repr +from ..utils import node_repr, path_from_uri from .array import ArrayAdapter, slice_and_shape_from_block_and_chunks INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) def read_zarr(filepath, **kwargs): - file = zarr.open(filepath) - if isinstance(file, zarr.hierarchy.Group): - adapter = ZarrGroupAdapter.from_directory(file, **kwargs) + zarr_obj = zarr.open(filepath) # Group or Array + if isinstance(zarr_obj, zarr.hierarchy.Group): + adapter = ZarrGroupAdapter(zarr_obj, **kwargs) else: - adapter = ZarrArrayAdapter.from_directory(file, **kwargs) + adapter = ZarrArrayAdapter(zarr_obj, **kwargs) return adapter class ZarrArrayAdapter(ArrayAdapter): @classmethod - def init_storage(cls, directory, structure): + def init_storage(cls, data_uri, structure): from ..server.schemas import Asset # Zarr requires evenly-sized chunks within each dimension. # Use the first chunk along each dimension. zarr_chunks = tuple(dim[0] for dim in structure.chunks) shape = tuple(dim[0] * len(dim) for dim in structure.chunks) + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) storage = zarr.storage.DirectoryStore(str(directory)) zarr.storage.init_array( @@ -52,21 +52,6 @@ def init_storage(cls, directory, structure): ) ] - @classmethod - def from_directory( - cls, - directory, - structure=None, - **kwargs, - ): - if not isinstance(directory, zarr.core.Array): - array = zarr.open_array(str(directory), "r+") - else: - array = directory - if structure is None: - structure = ArrayStructure.from_array(array) - return cls(array, structure, **kwargs) - def _stencil(self): "Trims overflow because Zarr always has equal-sized chunks." return tuple(builtins.slice(0, dim) for dim in self.structure().shape) @@ -112,12 +97,6 @@ def __init__( self._provided_metadata = metadata or {} super().__init__() - @classmethod - def from_directory(cls, directory, **kwargs): - if not isinstance(directory, zarr.hierarchy.Group): - directory = zarr.open_group(directory, "r") - return cls(directory, **kwargs) - def __repr__(self): return node_repr(self, list(self)) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 02fba74ac..a289682a9 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -11,7 +11,6 @@ from urllib.parse import quote_plus, urlparse import anyio -import httpx from fastapi import HTTPException from sqlalchemy import delete, event, func, not_, or_, select, text, type_coerce, update from sqlalchemy.exc import IntegrityError @@ -40,6 +39,7 @@ UnsupportedQueryType, ensure_awaitable, import_object, + path_from_uri, safe_json_dump, ) from . import orm @@ -52,7 +52,7 @@ ZARR_MIMETYPE, ZIP_MIMETYPE, ) -from .utils import SCHEME_PATTERN, ensure_uri, safe_path +from .utils import SCHEME_PATTERN, ensure_uri DEFAULT_ECHO = bool(int(os.getenv("TILED_ECHO_SQL", "0") or "0")) INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") @@ -117,7 +117,7 @@ def __init__( raise ValueError("readable_storage should be a list of URIs or paths") if writable_storage: writable_storage = ensure_uri(str(writable_storage)) - if not writable_storage.scheme == "file": + if not urlparse(writable_storage).scheme == "file": raise NotImplementedError( "Only file://... writable storage is currently supported." ) @@ -394,32 +394,31 @@ async def get_adapter(self): for asset in data_source.assets: if asset.parameter is None: continue - data_uri = httpx.URL(asset.data_uri) - if data_uri.scheme != "file": + scheme = urlparse(asset.data_uri).scheme + if scheme != "file": raise NotImplementedError( - f"Only 'file://...' scheme URLs are currently supported, not {data_uri!r}" + f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}" ) - if data_uri.scheme == "file": + if scheme == "file": # Protect against misbehaving clients reading from unintended # parts of the filesystem. + asset_path = path_from_uri(asset.data_uri) for readable_storage in self.context.readable_storage: if Path( os.path.commonpath( - [safe_path(readable_storage), safe_path(data_uri)] + [path_from_uri(readable_storage), asset_path] ) - ) == safe_path(readable_storage): + ) == path_from_uri(readable_storage): break else: raise RuntimeError( - f"Refusing to serve {data_uri} because it is outside " + f"Refusing to serve {asset.data_uri} because it is outside " "the readable storage area for this server." ) - path = safe_path(data_uri) if asset.num is None: - parameters[asset.parameter] = path + parameters[asset.parameter] = asset.data_uri else: - # TODO Order these in SQL. - parameters[asset.parameter].append(path) + parameters[asset.parameter].append(asset.data_uri) adapter_kwargs = dict(parameters) adapter_kwargs.update(data_source.parameters) adapter_kwargs["specs"] = self.node.specs @@ -570,7 +569,7 @@ async def create_node( ) init_storage = CREATE_ADAPTER_BY_MIMETYPE[data_source.mimetype] assets = await ensure_awaitable( - init_storage, safe_path(data_uri), data_source.structure + init_storage, data_uri, data_source.structure ) data_source.assets.extend(assets) data_source_orm = orm.DataSource( @@ -888,7 +887,7 @@ async def write_partition(self, *args, **kwargs): def delete_asset(data_uri, is_directory): url = urlparse(data_uri) if url.scheme == "file": - path = safe_path(data_uri) + path = path_from_uri(data_uri) if is_directory: shutil.rmtree(path) else: diff --git a/tiled/catalog/utils.py b/tiled/catalog/utils.py index 1e52883cb..f02b86bf2 100644 --- a/tiled/catalog/utils.py +++ b/tiled/catalog/utils.py @@ -1,43 +1,24 @@ import re -import sys from pathlib import Path -from urllib import parse - -import httpx +from urllib.parse import urlparse, urlunparse SCHEME_PATTERN = re.compile(r"^[a-z0-9+]+:\/\/.*$") -def safe_path(uri): - """ - Acceess the path of a URI and return it as a Path object. - - Ideally we could just do uri.path, but Windows paths confuse - HTTP URI parsers because of the drive (e.g. C:) and return - something like /C:/x/y/z with an extraneous leading slash. - """ - raw_path = httpx.URL(uri).path - if sys.platform == "win32" and raw_path[0] == "/": - path = raw_path[1:] - else: - path = raw_path - return Path(path) - - def ensure_uri(uri_or_path): "Accept a URI or file path (Windows- or POSIX-style) and return a URI." if not SCHEME_PATTERN.match(str(uri_or_path)): # Interpret this as a filepath. path = uri_or_path - uri_str = parse.urlunparse( + uri_str = urlunparse( ("file", "localhost", str(Path(path).absolute()), "", "", None) ) else: # Interpret this as a URI. uri_str = uri_or_path - uri = httpx.URL(uri_str) - # Ensure that, if the scheme is file, it meets the techincal standard for - # file URIs, like file://localhost/..., not the shorthand file:///... - if uri.scheme == "file": - uri = uri.copy_with(host="localhost") - return uri + parsed = urlparse(uri_str) + if parsed.netloc == "": + mutable = list(parsed) + mutable[1] = "localhost" + uri_str = urlunparse(mutable) + return uri_str diff --git a/tiled/utils.py b/tiled/utils.py index bb91cb94c..8668d1750 100644 --- a/tiled/utils.py +++ b/tiled/utils.py @@ -8,9 +8,12 @@ import inspect import operator import os +import platform import sys import threading +from pathlib import Path from typing import Any, Callable +from urllib.parse import urlparse import anyio @@ -654,3 +657,15 @@ async def ensure_awaitable(func, *args, **kwargs): return await func(*args, **kwargs) else: return await anyio.to_thread.run_sync(func, *args, **kwargs) + + +def path_from_uri(uri): + parsed = urlparse(uri) + if parsed.scheme != "file": + raise ValueError(f"Only 'file' URIs are supported. URI: {uri}") + if platform.system() == "Windows": + # We slice because we need "C:/..." not "/C:/...". + path = Path(parsed.path[1:]) + else: + path = Path(parsed.path) + return path From 2636720f5b1830636bdcaece45ffb8ae508cba1d Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 15:54:18 -0500 Subject: [PATCH 16/38] Catalog and writing tests pass. --- tiled/_tests/test_catalog.py | 1 + tiled/adapters/awkward_buffers.py | 2 +- tiled/adapters/parquet.py | 2 +- tiled/adapters/sparse_blocks_parquet.py | 6 +++--- tiled/adapters/table.py | 4 +++- tiled/adapters/zarr.py | 7 +++---- tiled/catalog/mimetypes.py | 2 +- 7 files changed, 13 insertions(+), 11 deletions(-) diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index 516c8b254..8eab51b3b 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -499,6 +499,7 @@ async def test_access_control(tmpdir): "duplicate-int", ], ) +@pytest.mark.asyncio async def test_constraints_on_parameter_and_num(a, assets): "Test constraints enforced by database on 'parameter' and 'num'." arr_adapter = ArrayAdapter.from_array([1, 2, 3]) diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py index 2c084cedd..a12b92234 100644 --- a/tiled/adapters/awkward_buffers.py +++ b/tiled/adapters/awkward_buffers.py @@ -42,7 +42,7 @@ def init_storage(cls, data_uri, structure): directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) - return [Asset(data_uri=data_uri, is_directory=True, parameter="directory")] + return [Asset(data_uri=data_uri, is_directory=True, parameter="data_uri")] @classmethod def from_directory( diff --git a/tiled/adapters/parquet.py b/tiled/adapters/parquet.py index 7f491c779..9c6903bff 100644 --- a/tiled/adapters/parquet.py +++ b/tiled/adapters/parquet.py @@ -49,7 +49,7 @@ def init_storage(cls, data_uri, structure): Asset( data_uri=f"{data_uri}/partition-{i}.parquet", is_directory=False, - parameter="uris", + parameter="data_uris", num=i, ) for i in range(structure.npartitions) diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index e27dd4296..da479ce73 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -12,7 +12,7 @@ def load_block(uri): # TODO This can be done without pandas. # Better to use a plain I/O library. - df = pandas.read_parquet(uri) + df = pandas.read_parquet(path_from_uri(uri)) coords = df[df.columns[:-1]].values.T data = df["data"].values return coords, data @@ -66,13 +66,13 @@ def metadata(self): def write_block(self, data, block): uri = self.blocks[block] - data.to_parquet(uri) + data.to_parquet(path_from_uri(uri)) def write(self, data): if len(self.blocks) > 1: raise NotImplementedError uri = self.blocks[(0,) * len(self._structure.shape)] - data.to_parquet(uri) + data.to_parquet(path_from_uri(uri)) def read(self, slice=...): all_coords = [] diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index d44bf4989..ac1e04daf 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -5,6 +5,7 @@ from ..server.object_cache import get_object_cache from ..structures.core import Spec, StructureFamily from ..structures.table import TableStructure +from ..utils import path_from_uri from .array import ArrayAdapter @@ -32,7 +33,8 @@ def from_pandas( npartitions=1, **kwargs, ): - ddf = dask.dataframe.from_pandas(data_uri, npartitions=npartitions, **kwargs) + filepath = path_from_uri(data_uri) + ddf = dask.dataframe.from_pandas(filepath, npartitions=npartitions, **kwargs) if specs is None: specs = [Spec("dataframe")] return cls.from_dask_dataframe( diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 020f0e102..263b3744e 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -1,7 +1,6 @@ import builtins import collections.abc import os -from urllib import parse import zarr.core import zarr.hierarchy @@ -16,7 +15,8 @@ INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) -def read_zarr(filepath, **kwargs): +def read_zarr(data_uri, **kwargs): + filepath = path_from_uri(data_uri) zarr_obj = zarr.open(filepath) # Group or Array if isinstance(zarr_obj, zarr.hierarchy.Group): adapter = ZarrGroupAdapter(zarr_obj, **kwargs) @@ -43,12 +43,11 @@ def init_storage(cls, data_uri, structure): chunks=zarr_chunks, dtype=structure.data_type.to_numpy_dtype(), ) - data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) return [ Asset( data_uri=data_uri, is_directory=True, - parameter="filepath", + parameter="data_uri", ) ] diff --git a/tiled/catalog/mimetypes.py b/tiled/catalog/mimetypes.py index 12f00a32d..c540076c2 100644 --- a/tiled/catalog/mimetypes.py +++ b/tiled/catalog/mimetypes.py @@ -23,7 +23,7 @@ ).read_csv, XLSX_MIME_TYPE: lambda: importlib.import_module( "...adapters.excel", __name__ - ).ExcelAdapter.from_filepath, + ).ExcelAdapter.from_uri, "application/x-hdf5": lambda: importlib.import_module( "...adapters.hdf5", __name__ ).HDF5Adapter.from_filepath, From 6abc1a861e90aec34ae51037514b94ad1713ea68 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 15:56:21 -0500 Subject: [PATCH 17/38] Add docstring with examples --- tiled/utils.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tiled/utils.py b/tiled/utils.py index 8668d1750..6fcd736fb 100644 --- a/tiled/utils.py +++ b/tiled/utils.py @@ -660,6 +660,16 @@ async def ensure_awaitable(func, *args, **kwargs): def path_from_uri(uri): + """ + Give a URI, return a Path. + + If the URI has a scheme other than 'file', raise ValueError. + + >>> path_from_uri('file://localhost/a/b/c') # POSIX-style + '/a/b/c' + >>> path_from_uri('file://localhost/C:/a/b/c') # Windows-style + 'C:/a/b/c' + """ parsed = urlparse(uri) if parsed.scheme != "file": raise ValueError(f"Only 'file' URIs are supported. URI: {uri}") From 2ae5c068c6382274e8695be6961fd455941be81d Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 16:33:09 -0500 Subject: [PATCH 18/38] Directory walker tests pass --- tiled/_tests/test_directory_walker.py | 27 ++++++++++++++------------- tiled/adapters/tiff.py | 4 +++- tiled/catalog/mimetypes.py | 4 ++-- tiled/catalog/register.py | 12 ++++++------ tiled/catalog/utils.py | 2 +- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/tiled/_tests/test_directory_walker.py b/tiled/_tests/test_directory_walker.py index 88cbefb8e..742255f61 100644 --- a/tiled/_tests/test_directory_walker.py +++ b/tiled/_tests/test_directory_walker.py @@ -27,6 +27,7 @@ from ..examples.generate_files import data, df1, generate_files from ..server.app import build_app from ..server.schemas import Asset, DataSource, Management +from ..utils import path_from_uri @pytest.fixture @@ -196,19 +197,19 @@ async def test_image_file_with_sidecar_metadata_file(tmpdir): with open(metadata_filepath, "w") as file: yaml.dump(metadata, file) - def read_tiff_with_yaml_metadata( - image_filepath, metadata_filepath, metadata=None, **kwargs - ): - with open(metadata_filepath) as file: + def read_tiff_with_yaml_metadata(image_uri, metadata_uri, metadata=None, **kwargs): + with open(path_from_uri(metadata_uri)) as file: metadata = yaml.safe_load(file) - return TiffAdapter(image_filepath, metadata=metadata, **kwargs) + return TiffAdapter(image_uri, metadata=metadata, **kwargs) catalog = in_memory( writable_storage=tmpdir, adapters_by_mimetype={MIMETYPE: read_tiff_with_yaml_metadata}, ) with Context.from_app(build_app(catalog)) as context: - adapter = read_tiff_with_yaml_metadata(image_filepath, metadata_filepath) + adapter = read_tiff_with_yaml_metadata( + ensure_uri(image_filepath), ensure_uri(metadata_filepath) + ) await create_node_safe( catalog, key="image", @@ -223,14 +224,14 @@ def read_tiff_with_yaml_metadata( management=Management.external, assets=[ Asset( - data_uri=str(ensure_uri(str(metadata_filepath))), + data_uri=ensure_uri(metadata_filepath), is_directory=False, - parameter="metadata_filepath", + parameter="metadata_uri", ), Asset( - data_uri=str(ensure_uri(str(image_filepath))), + data_uri=ensure_uri(image_filepath), is_directory=False, - parameter="image_filepath", + parameter="image_uri", ), ], ) @@ -267,14 +268,14 @@ async def test_hdf5_virtual_datasets(tmpdir): ] assets.append( Asset( - data_uri=str(ensure_uri(str(filepath))), + data_uri=ensure_uri(filepath), is_directory=False, - parameter="filepath", + parameter="data_uri", ) ) catalog = in_memory(writable_storage=tmpdir) with Context.from_app(build_app(catalog)) as context: - adapter = HDF5Adapter.from_filepath(filepath) + adapter = HDF5Adapter.from_uri(ensure_uri(filepath)) await create_node_safe( catalog, key="VDS", diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 7dde2b70c..794371701 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -30,6 +30,8 @@ def __init__( specs=None, access_policy=None, ): + if not isinstance(data_uri, str): + raise Exception filepath = path_from_uri(data_uri) self._file = tifffile.TiffFile(filepath) self._cache_key = (type(self).__module__, type(self).__qualname__, filepath) @@ -85,7 +87,7 @@ class TiffSequenceAdapter: structure_family = "array" @classmethod - def from_filepaths( + def from_uris( cls, data_uris, structure=None, diff --git a/tiled/catalog/mimetypes.py b/tiled/catalog/mimetypes.py index c540076c2..2f44a41f2 100644 --- a/tiled/catalog/mimetypes.py +++ b/tiled/catalog/mimetypes.py @@ -17,7 +17,7 @@ ).TiffAdapter, "multipart/related;type=image/tiff": lambda: importlib.import_module( "...adapters.tiff", __name__ - ).TiffSequenceAdapter.from_filepaths, + ).TiffSequenceAdapter.from_uris, "text/csv": lambda: importlib.import_module( "...adapters.csv", __name__ ).read_csv, @@ -26,7 +26,7 @@ ).ExcelAdapter.from_uri, "application/x-hdf5": lambda: importlib.import_module( "...adapters.hdf5", __name__ - ).HDF5Adapter.from_filepath, + ).HDF5Adapter.from_uri, "application/x-netcdf": lambda: importlib.import_module( "...adapters.netcdf", __name__ ).read_netcdf, diff --git a/tiled/catalog/register.py b/tiled/catalog/register.py index d66c06889..1031705cc 100644 --- a/tiled/catalog/register.py +++ b/tiled/catalog/register.py @@ -291,7 +291,7 @@ async def register_single_item( adapter_factory = settings.adapters_by_mimetype[mimetype] logger.info(" Resolved mimetype '%s' with adapter for '%s'", mimetype, item) try: - adapter = await anyio.to_thread.run_sync(adapter_factory, item) + adapter = await anyio.to_thread.run_sync(adapter_factory, ensure_uri(item)) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s':", item) return @@ -310,9 +310,9 @@ async def register_single_item( management=Management.external, assets=[ Asset( - data_uri=str(ensure_uri(str(item.absolute()))), + data_uri=ensure_uri(item), is_directory=is_directory, - parameter="filepath", + parameter="data_uri", ) ], ) @@ -369,7 +369,7 @@ async def register_tiff_sequence(catalog, name, sequence, settings): adapter_class = settings.adapters_by_mimetype[TIFF_SEQ_MIMETYPE] key = settings.key_from_filename(name) try: - adapter = adapter_class(sequence) + adapter = adapter_class([ensure_uri(filepath) for filepath in sequence]) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) return @@ -387,9 +387,9 @@ async def register_tiff_sequence(catalog, name, sequence, settings): management=Management.external, assets=[ Asset( - data_uri=str(ensure_uri(str(item.absolute()))), + data_uri=ensure_uri(item), is_directory=False, - parameter="filepaths", + parameter="data_uris", num=i, ) for i, item in enumerate(sequence) diff --git a/tiled/catalog/utils.py b/tiled/catalog/utils.py index f02b86bf2..0be02d1d8 100644 --- a/tiled/catalog/utils.py +++ b/tiled/catalog/utils.py @@ -21,4 +21,4 @@ def ensure_uri(uri_or_path): mutable = list(parsed) mutable[1] = "localhost" uri_str = urlunparse(mutable) - return uri_str + return str(uri_str) From cff9c6849a2342cac6c79bd6aac157de8551e51a Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 16:35:09 -0500 Subject: [PATCH 19/38] Revert confused change --- tiled/adapters/table.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index ac1e04daf..41b3ce742 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -5,7 +5,6 @@ from ..server.object_cache import get_object_cache from ..structures.core import Spec, StructureFamily from ..structures.table import TableStructure -from ..utils import path_from_uri from .array import ArrayAdapter @@ -26,15 +25,14 @@ class TableAdapter: @classmethod def from_pandas( cls, - data_uri, + *args, metadata=None, specs=None, access_policy=None, npartitions=1, **kwargs, ): - filepath = path_from_uri(data_uri) - ddf = dask.dataframe.from_pandas(filepath, npartitions=npartitions, **kwargs) + ddf = dask.dataframe.from_pandas(*args, npartitions=npartitions, **kwargs) if specs is None: specs = [Spec("dataframe")] return cls.from_dask_dataframe( From 5dfff6562f360e5c2fcbeb4ee13ffa0d2920b490 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 17:15:54 -0500 Subject: [PATCH 20/38] TIFF tests pass --- tiled/_tests/test_tiff.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tiled/_tests/test_tiff.py b/tiled/_tests/test_tiff.py index 2fefc6914..7f8dce667 100644 --- a/tiled/_tests/test_tiff.py +++ b/tiled/_tests/test_tiff.py @@ -8,6 +8,7 @@ from ..adapters.tiff import TiffAdapter, TiffSequenceAdapter from ..catalog import in_memory from ..catalog.register import TIFF_SEQUENCE_EMPTY_NAME_ROOT, register +from ..catalog.utils import ensure_uri from ..client import Context, from_context from ..server.app import build_app @@ -18,18 +19,21 @@ def client(tmpdir_module): sequence_directory = Path(tmpdir_module, "sequence") sequence_directory.mkdir() + filepaths = [] for i in range(3): data = numpy.random.random((5, 7)) - tf.imwrite(sequence_directory / f"temp{i:05}.tif", data) + filepath = sequence_directory / f"temp{i:05}.tif" + tf.imwrite(filepath, data) + filepaths.append(filepath) color_data = numpy.random.randint(0, 255, COLOR_SHAPE, dtype="uint8") path = Path(tmpdir_module, "color.tif") tf.imwrite(path, color_data) tree = MapAdapter( { - "color": TiffAdapter(str(path)), - "sequence": TiffSequenceAdapter( - tf.TiffSequence(str(sequence_directory / "*.tif")) + "color": TiffAdapter(ensure_uri(path)), + "sequence": TiffSequenceAdapter.from_uris( + [ensure_uri(filepath) for filepath in filepaths] ), } ) From 5f7bcf6c1747571e6d53ddd23dd6bb17421ca7e1 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 17:21:32 -0500 Subject: [PATCH 21/38] Zarr handles init (no structure) and access (structure). --- tiled/adapters/zarr.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 263b3744e..3ecd436bb 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -15,13 +15,16 @@ INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) -def read_zarr(data_uri, **kwargs): +def read_zarr(data_uri, structure=None, **kwargs): filepath = path_from_uri(data_uri) zarr_obj = zarr.open(filepath) # Group or Array if isinstance(zarr_obj, zarr.hierarchy.Group): adapter = ZarrGroupAdapter(zarr_obj, **kwargs) else: - adapter = ZarrArrayAdapter(zarr_obj, **kwargs) + if structure is None: + adapter = ZarrArrayAdapter.from_array(zarr_obj, **kwargs) + else: + adapter = ZarrArrayAdapter(zarr_obj, structure=structure, **kwargs) return adapter From c470aaaeea4fccb0eb3f3d173b470c177c679fa6 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sat, 20 Jan 2024 17:21:50 -0500 Subject: [PATCH 22/38] Update XDI to use URI. --- tiled/examples/xdi.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tiled/examples/xdi.py b/tiled/examples/xdi.py index d1bff35a9..828ad2c52 100644 --- a/tiled/examples/xdi.py +++ b/tiled/examples/xdi.py @@ -13,10 +13,12 @@ from tiled.adapters.dataframe import DataFrameAdapter from tiled.structures.core import Spec +from tiled.utils import path_from_uri -def read_xdi(filepath, structure=None, metadata=None, specs=None, access_policy=None): +def read_xdi(data_uri, structure=None, metadata=None, specs=None, access_policy=None): "Read XDI-formatted file." + filepath = path_from_uri(data_uri) with open(filepath, "r") as file: metadata = {} fields = collections.defaultdict(dict) From 32469924af178a34142b6eb8811a1ef25ed188f1 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 15:10:57 -0500 Subject: [PATCH 23/38] Change some custom mimetypes and refactor default creation. --- tiled/catalog/adapter.py | 18 ++++++++++-------- tiled/catalog/mimetypes.py | 6 +++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index a289682a9..60421d18a 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -46,35 +46,37 @@ from .core import check_catalog_database, initialize_database from .explain import ExplainAsyncSession from .mimetypes import ( + AWKWARD_BUFFERS_MIMETYPE, DEFAULT_ADAPTERS_BY_MIMETYPE, PARQUET_MIMETYPE, SPARSE_BLOCKS_PARQUET_MIMETYPE, ZARR_MIMETYPE, - ZIP_MIMETYPE, ) from .utils import SCHEME_PATTERN, ensure_uri DEFAULT_ECHO = bool(int(os.getenv("TILED_ECHO_SQL", "0") or "0")) INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") +# When data is uploaded, how is it saved? +# TODO: Make this configurable at Catalog construction time. DEFAULT_CREATION_MIMETYPE = { StructureFamily.array: ZARR_MIMETYPE, - StructureFamily.awkward: ZIP_MIMETYPE, + StructureFamily.awkward: AWKWARD_BUFFERS_MIMETYPE, StructureFamily.table: PARQUET_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } -CREATE_ADAPTER_BY_MIMETYPE = OneShotCachedMap( +DEFAULT_INIT_STORAGE = OneShotCachedMap( { - ZARR_MIMETYPE: lambda: importlib.import_module( + StructureFamily.array: lambda: importlib.import_module( "...adapters.zarr", __name__ ).ZarrArrayAdapter.init_storage, - ZIP_MIMETYPE: lambda: importlib.import_module( + StructureFamily.awkward: lambda: importlib.import_module( "...adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter.init_storage, - PARQUET_MIMETYPE: lambda: importlib.import_module( + StructureFamily.table: lambda: importlib.import_module( "...adapters.parquet", __name__ ).ParquetDatasetAdapter.init_storage, - SPARSE_BLOCKS_PARQUET_MIMETYPE: lambda: importlib.import_module( + StructureFamily.sparse: lambda: importlib.import_module( "...adapters.sparse_blocks_parquet", __name__ ).SparseBlocksParquetAdapter.init_storage, } @@ -567,7 +569,7 @@ async def create_node( data_uri = str(self.context.writable_storage) + "".join( f"/{quote_plus(segment)}" for segment in (self.segments + [key]) ) - init_storage = CREATE_ADAPTER_BY_MIMETYPE[data_source.mimetype] + init_storage = DEFAULT_INIT_STORAGE[structure_family] assets = await ensure_awaitable( init_storage, data_uri, data_source.structure ) diff --git a/tiled/catalog/mimetypes.py b/tiled/catalog/mimetypes.py index 2f44a41f2..d6f2405cc 100644 --- a/tiled/catalog/mimetypes.py +++ b/tiled/catalog/mimetypes.py @@ -7,9 +7,9 @@ # OneShotCachedMap is used to defer imports. We don't want to pay up front # for importing Readers that we will not actually use. PARQUET_MIMETYPE = "application/x-parquet" -SPARSE_BLOCKS_PARQUET_MIMETYPE = "application/x-parquet-sparse" # HACK! -ZIP_MIMETYPE = "application/zip" +SPARSE_BLOCKS_PARQUET_MIMETYPE = "application/x-parquet;structure=sparse" ZARR_MIMETYPE = "application/x-zarr" +AWKWARD_BUFFERS_MIMETYPE = "application/x-awkward-buffers" DEFAULT_ADAPTERS_BY_MIMETYPE = OneShotCachedMap( { "image/tiff": lambda: importlib.import_module( @@ -39,7 +39,7 @@ ZARR_MIMETYPE: lambda: importlib.import_module( "...adapters.zarr", __name__ ).read_zarr, - ZIP_MIMETYPE: lambda: importlib.import_module( + AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "...adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter.from_directory, } From 27d9cfa9998d647edfb87a4921fa54f740edf52e Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 16:16:38 -0500 Subject: [PATCH 24/38] WIP: Write migration --- tiled/catalog/core.py | 13 +- ...cab_enrich_datasource_asset_association.py | 122 ++++++++++++++++++ tiled/catalog/orm.py | 5 +- 3 files changed, 132 insertions(+), 8 deletions(-) create mode 100644 tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py diff --git a/tiled/catalog/core.py b/tiled/catalog/core.py index 3cf202510..992c8a1da 100644 --- a/tiled/catalog/core.py +++ b/tiled/catalog/core.py @@ -3,12 +3,15 @@ from ..alembic_utils import DatabaseUpgradeNeeded, UninitializedDatabase, check_database from .base import Base -# This is the alembic revision ID of the database revision -# required by this version of Tiled. -REQUIRED_REVISION = "3db11ff95b6c" - # This is list of all valid revisions (from current to oldest). -ALL_REVISIONS = ["3db11ff95b6c", "0b033e7fbe30", "83889e049ddc", "6825c778aa3c"] +ALL_REVISIONS = [ + "a66028395cab", + "3db11ff95b6c", + "0b033e7fbe30", + "83889e049ddc", + "6825c778aa3c", +] +REQUIRED_REVISION = ALL_REVISIONS[0] async def initialize_database(engine): diff --git a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py new file mode 100644 index 000000000..65170b0f8 --- /dev/null +++ b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py @@ -0,0 +1,122 @@ +"""Enrich DataSource-Asset association. + +Revision ID: a66028395cab +Revises: 3db11ff95b6c +Create Date: 2024-01-21 15:17:20.571763 + +""" +import sqlalchemy as sa +from alembic import op + +from tiled.authn_database.orm import ( # unique_parameter_num_null_check, + DataSourceAssetAssociation, + JSONVariant, +) + +# revision identifiers, used by Alembic. +revision = "a66028395cab" +down_revision = "3db11ff95b6c" +branch_labels = None +depends_on = None + + +def upgrade(): + connection = op.get_bind() + data_sources = sa.Table( + "data_sources", + sa.MetaData(), + sa.Column("id", sa.Integer), + sa.Column("node_id", sa.Integer), + sa.Column("mimetype", sa.Unicode), + sa.Column("structure", JSONVariant), + ) + data_source_asset_association = sa.Table( + DataSourceAssetAssociation.__tablename__, + sa.MetaData(), + sa.Column("asset_id", sa.Integer), + sa.Column("data_source_id", sa.Integer), + sa.Column("parameter", sa.Unicode(255)), + sa.Column("num", sa.Integer), + ) + + # Rename some MIME types. + + # While Awkward data is typically _transmitted_ in a ZIP archive, + # it is stored as directory of buffers, with no ZIP involved. + # Thus using 'application/zip' in the database was a mistake. + connection.execute( + data_sources.update() + .where(mimetype="application/zip") + .values(mimtype="application/x-awkward-buffers") + ) + # The format is standard parquet. We will use a MIME type + # parameter to let Tiled know to use the Adapter for sparse + # data, as opposed to the Adapter for tabular data. + connection.execute( + data_sources.update() + .where(mimetype="application/x-parquet-sparse") + .values(mimtype="application/x-parquet;structure=sparse") + ) + + # Add columns 'parameter' and 'num' to association table. + op.add_column( + DataSourceAssetAssociation.__tablename__, + sa.Column("parameter", sa.Unicode(255), nullable=True), + ) + op.add_column( + DataSourceAssetAssociation.__tablename__, + sa.Column("num", sa.Integer, nullable=True), + ) + + # First populate the columns to bring them into compliance with + # constraints. Then, apply constraints. + + connection.execute( + data_source_asset_association.update() + .where( + sa.not_( + sa._or( + data_sources.mimetype == "multipart/related;type=image/tiff", + data_sources.mimetype == "application/x-parquet", + data_sources.mimetype == "application/x-parquet;structure=sparse", + ) + ) + ) + .values(parameter="data_uri") + ) + connection.execute( + data_source_asset_association.update() + .where( + sa._or( + data_sources.mimetype == "multipart/related;type=image/tiff", + data_sources.mimetype == "application/x-parquet", + data_sources.mimetype == "application/x-parquet;structure=sparse", + ) + ) + .values(parameter="data_uris") # plural + ) + # results = connection.execute( + # sa.select( + # data_sources.c.id, + # data_sources.c.structure, + # nodes.c.structure_family, + # ).select_from(joined) + # ).fetchall() + + # Create unique constraint and triggers. + # op.create_unique_constraint( + # constraint_name="parameter_num_unique_constraint", + # table_name=DataSourceAssetAssociation.__tablename__, + # columns=[ + # "data_source_id", + # "parameter", + # "num", + # ], + # ) + # unique_parameter_num_null_check(data_source_asset_association, connection) + + +def downgrade(): + # This _could_ be implemented but we will wait for a need since we are + # still in alpha releases. + raise NotImplementedError diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 0174ee70f..12f88990e 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -135,9 +135,6 @@ class DataSourceAssetAssociation(Base): back_populates="data_source_associations", lazy="selectin" ) - # TODO We should additionally ensure that, if there is a row with some - # parameter P and num NULL, that there can be no rows with parameter P and - # num . This may be possible with a trigger. __table_args__ = ( UniqueConstraint( "data_source_id", @@ -145,6 +142,8 @@ class DataSourceAssetAssociation(Base): "num", name="parameter_num_unique_constraint", ), + # Below, in unique_parameter_num_null_check, additional constraints + # are applied, via triggers. ) From c73492fa5ae897cc880ea3575df8c9497b6c215e Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 16:37:17 -0500 Subject: [PATCH 25/38] Generate TIFF sequences in example data. --- tiled/examples/generate_files.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tiled/examples/generate_files.py b/tiled/examples/generate_files.py index f4790d7f0..45915fe3a 100644 --- a/tiled/examples/generate_files.py +++ b/tiled/examples/generate_files.py @@ -10,7 +10,11 @@ ("a.tif",), ("b.tif",), ("c.tif",), - ("more", "d.tif"), + ("more", "A0001.tif"), + ("more", "A0002.tif"), + ("more", "A0003.tif"), + ("more", "B0001.tif"), + ("more", "B0002.tif"), ("more", "even_more", "e.tif"), ("more", "even_more", "f.tif"), ] From e1e248633fbe7e91fd060abd9b00b56dab2fc461 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 16:37:32 -0500 Subject: [PATCH 26/38] fixes --- ...cab_enrich_datasource_asset_association.py | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py index 65170b0f8..fc4e8d329 100644 --- a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py +++ b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py @@ -8,7 +8,7 @@ import sqlalchemy as sa from alembic import op -from tiled.authn_database.orm import ( # unique_parameter_num_null_check, +from tiled.catalog.orm import ( # unique_parameter_num_null_check, DataSourceAssetAssociation, JSONVariant, ) @@ -46,16 +46,16 @@ def upgrade(): # Thus using 'application/zip' in the database was a mistake. connection.execute( data_sources.update() - .where(mimetype="application/zip") - .values(mimtype="application/x-awkward-buffers") + .where(data_sources.c.mimetype == "application/zip") + .values(mimetype="application/x-awkward-buffers") ) # The format is standard parquet. We will use a MIME type # parameter to let Tiled know to use the Adapter for sparse # data, as opposed to the Adapter for tabular data. connection.execute( data_sources.update() - .where(mimetype="application/x-parquet-sparse") - .values(mimtype="application/x-parquet;structure=sparse") + .where(data_sources.c.mimetype == "application/x-parquet-sparse") + .values(mimetype="application/x-parquet;structure=sparse") ) # Add columns 'parameter' and 'num' to association table. @@ -74,24 +74,35 @@ def upgrade(): connection.execute( data_source_asset_association.update() .where( - sa.not_( - sa._or( - data_sources.mimetype == "multipart/related;type=image/tiff", - data_sources.mimetype == "application/x-parquet", - data_sources.mimetype == "application/x-parquet;structure=sparse", + data_source_asset_association.c.data_source_id + == sa.select(data_sources.c.id) + .where( + sa.not_( + sa.or_( + data_sources.c.mimetype == "multipart/related;type=image/tiff", + data_sources.c.mimetype == "application/x-parquet", + data_sources.c.mimetype + == "application/x-parquet;structure=sparse", + ) ) ) + .as_scalar() ) .values(parameter="data_uri") ) connection.execute( data_source_asset_association.update() .where( - sa._or( - data_sources.mimetype == "multipart/related;type=image/tiff", - data_sources.mimetype == "application/x-parquet", - data_sources.mimetype == "application/x-parquet;structure=sparse", + data_source_asset_association.c.data_source_id + == sa.select(data_sources.c.id) + .where( + sa.or_( + data_sources.c.mimetype == "multipart/related;type=image/tiff", + data_sources.c.mimetype == "application/x-parquet", + data_sources.c.mimetype == "application/x-parquet;structure=sparse", + ) ) + .as_scalar() ) .values(parameter="data_uris") # plural ) From 89bb6edd1344c987b184f6af963b1aeb090a1b9c Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 18:32:37 -0500 Subject: [PATCH 27/38] Migration is tested on SQLite --- ...cab_enrich_datasource_asset_association.py | 187 +++++++++++++----- 1 file changed, 141 insertions(+), 46 deletions(-) diff --git a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py index fc4e8d329..edbecd170 100644 --- a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py +++ b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py @@ -8,9 +8,10 @@ import sqlalchemy as sa from alembic import op -from tiled.catalog.orm import ( # unique_parameter_num_null_check, +from tiled.catalog.orm import ( DataSourceAssetAssociation, JSONVariant, + unique_parameter_num_null_check, ) # revision identifiers, used by Alembic. @@ -38,6 +39,12 @@ def upgrade(): sa.Column("parameter", sa.Unicode(255)), sa.Column("num", sa.Integer), ) + assets = sa.Table( + "assets", + sa.MetaData(), + sa.Column("id", sa.Integer), + sa.Column("data_uri", sa.Unicode), + ) # Rename some MIME types. @@ -71,60 +78,148 @@ def upgrade(): # First populate the columns to bring them into compliance with # constraints. Then, apply constraints. - connection.execute( - data_source_asset_association.update() + results = connection.execute( + sa.select(data_sources.c.id) .where( - data_source_asset_association.c.data_source_id - == sa.select(data_sources.c.id) - .where( - sa.not_( - sa.or_( - data_sources.c.mimetype == "multipart/related;type=image/tiff", - data_sources.c.mimetype == "application/x-parquet", - data_sources.c.mimetype - == "application/x-parquet;structure=sparse", - ) + sa.not_( + data_sources.c.mimetype.in_( + [ + "multipart/related;type=image/tiff", + "application/x-parquet", + "application/x-parquet;structure=sparse", + ] ) ) - .as_scalar() ) - .values(parameter="data_uri") - ) - connection.execute( - data_source_asset_association.update() + .select_from(data_sources) + .join( + data_source_asset_association, + data_sources.c.id == data_source_asset_association.c.data_source_id, + ) + .join( + assets, + data_source_asset_association.c.asset_id == assets.c.id, + ) + .distinct() + ).fetchall() + for (data_source_id,) in results: + connection.execute( + data_source_asset_association.update() + .where(data_source_asset_association.c.data_source_id == data_source_id) + .values(parameter="data_uri") + ) + results = connection.execute( + sa.select(data_sources.c.id) .where( - data_source_asset_association.c.data_source_id - == sa.select(data_sources.c.id) - .where( - sa.or_( - data_sources.c.mimetype == "multipart/related;type=image/tiff", - data_sources.c.mimetype == "application/x-parquet", - data_sources.c.mimetype == "application/x-parquet;structure=sparse", - ) + data_sources.c.mimetype.in_( + [ + "multipart/related;type=image/tiff", + "application/x-parquet", + "application/x-parquet;structure=sparse", + ] ) - .as_scalar() ) - .values(parameter="data_uris") # plural - ) - # results = connection.execute( - # sa.select( - # data_sources.c.id, - # data_sources.c.structure, - # nodes.c.structure_family, - # ).select_from(joined) - # ).fetchall() + .select_from(data_sources) + .join( + data_source_asset_association, + data_sources.c.id == data_source_asset_association.c.data_source_id, + ) + .join( + assets, + data_source_asset_association.c.asset_id == assets.c.id, + ) + .distinct() + ).fetchall() + for (data_source_id,) in results: + connection.execute( + data_source_asset_association.update() + .where(data_source_asset_association.c.data_source_id == data_source_id) + .values(parameter="data_uris") # plural + ) + sorted_assoc = connection.execute( + sa.select(data_source_asset_association.c.data_source_id, assets.c.id) + .where(data_source_asset_association.c.data_source_id == data_source_id) + .order_by(assets.c.data_uri) + .select_from(data_sources) + .join( + data_source_asset_association, + data_sources.c.id == data_source_asset_association.c.data_source_id, + ) + .join( + assets, + data_source_asset_association.c.asset_id == assets.c.id, + ) + ).fetchall() + for num, (data_source_id, asset_id) in enumerate(sorted_assoc, start=1): + connection.execute( + data_source_asset_association.update() + .where(data_source_asset_association.c.data_source_id == data_source_id) + .where(data_source_asset_association.c.asset_id == asset_id) + .values(num=num) + ) # Create unique constraint and triggers. - # op.create_unique_constraint( - # constraint_name="parameter_num_unique_constraint", - # table_name=DataSourceAssetAssociation.__tablename__, - # columns=[ - # "data_source_id", - # "parameter", - # "num", - # ], - # ) - # unique_parameter_num_null_check(data_source_asset_association, connection) + if connection.engine.dialect.name == "sqlite": + # SQLite does not supported adding constraints to an existing table. + # We invoke its 'copy and move' functionality. + with op.batch_alter_table(DataSourceAssetAssociation.__tablename__) as batch_op: + batch_op.create_unique_constraint( + "parameter_num_unique_constraint", + [ + "data_source_id", + "parameter", + "num", + ], + ) + with op.get_context().autocommit_block(): + connection.execute( + sa.text( + """ + CREATE TRIGGER cannot_insert_num_null_if_num_int_exists + BEFORE INSERT ON data_source_asset_association + WHEN NEW.num IS NULL + BEGIN + SELECT RAISE(ABORT, 'Can only insert num=NULL if no other row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + ); + END""" + ) + ) + connection.execute( + sa.text( + """ + CREATE TRIGGER cannot_insert_num_int_if_num_null_exists + BEFORE INSERT ON data_source_asset_association + WHEN NEW.num IS NOT NULL + BEGIN + SELECT RAISE(ABORT, 'Can only insert INTEGER num if no NULL row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND num IS NULL + AND data_source_id = NEW.data_source_id + ); + END""" + ) + ) + else: + # PostgreSQL + op.create_unique_constraint( + "parameter_num_unique_constraint", + [ + "data_source_id", + "parameter", + "num", + ], + ) + unique_parameter_num_null_check(data_source_asset_association, connection) def downgrade(): From cd6ee56673363bc4760ca2103bd23e9347d96bd9 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 18:40:37 -0500 Subject: [PATCH 28/38] Fix usage --- .../a66028395cab_enrich_datasource_asset_association.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py index edbecd170..f2fbae9cf 100644 --- a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py +++ b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py @@ -163,6 +163,7 @@ def upgrade(): # SQLite does not supported adding constraints to an existing table. # We invoke its 'copy and move' functionality. with op.batch_alter_table(DataSourceAssetAssociation.__tablename__) as batch_op: + # Gotcha: This does not take table_name because it is bound into batch_op. batch_op.create_unique_constraint( "parameter_num_unique_constraint", [ @@ -213,6 +214,7 @@ def upgrade(): # PostgreSQL op.create_unique_constraint( "parameter_num_unique_constraint", + DataSourceAssetAssociation.__tablename__, [ "data_source_id", "parameter", From cbde2b74daeb0b555c6cd964814e269e159c393d Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 18:54:40 -0500 Subject: [PATCH 29/38] Make PG trigger conditional same as SQLite --- tiled/catalog/orm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 12f88990e..83ad8fd6c 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -216,6 +216,7 @@ def unique_parameter_num_null_check(target, connection, **kw): CREATE TRIGGER cannot_insert_num_null_if_num_int_exists BEFORE INSERT ON data_source_asset_association FOR EACH ROW +WHEN (NEW.num IS NULL) EXECUTE FUNCTION test_parameter_exists();""" ) ) From d58f8c6fd2038ba3b9e1d3585cb02cbd48ed711f Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 20:34:40 -0500 Subject: [PATCH 30/38] Add missing constraint for PG. --- tiled/catalog/orm.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 83ad8fd6c..a6e70f308 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -220,6 +220,36 @@ def unique_parameter_num_null_check(target, connection, **kw): EXECUTE FUNCTION test_parameter_exists();""" ) ) + connection.execute( + text( + """ +CREATE OR REPLACE FUNCTION test_not_null_parameter_exists() +RETURNS TRIGGER AS $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + AND num IS NULL + ) THEN + RAISE EXCEPTION 'Can only insert INTEGER num if no NULL row exists for the same parameter'; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql;""" + ) + ) + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_int_if_num_null_exists +BEFORE INSERT ON data_source_asset_association +FOR EACH ROW +WHEN (NEW.num IS NOT NULL) +EXECUTE FUNCTION test_not_null_parameter_exists();""" + ) + ) class DataSource(Timestamped, Base): From af229a06c977887df9e1c4561effc9c54594f510 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 20:42:23 -0500 Subject: [PATCH 31/38] Include SQLite test data in CI. --- .github/workflows/ci.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc75e63bc..293ecab2e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,6 +55,10 @@ jobs: shell: bash -l {0} run: source continuous_integration/scripts/start_LDAP.sh + - name: Download SQLite example data. + shell: bash -l {0} + run: source continuous_integration/scripts/download_sqlite_data.sh + - name: Start PostgreSQL service in container. shell: bash -l {0} run: source continuous_integration/scripts/start_postgres.sh From db960010ad853985d2740df9827f56ee1caaade0 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 20:42:55 -0500 Subject: [PATCH 32/38] Run database migrations against example data in CI. --- .github/workflows/ci.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 293ecab2e..66d68cf76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,6 +63,17 @@ jobs: shell: bash -l {0} run: source continuous_integration/scripts/start_postgres.sh + + - name: Ensure example data is migrated to current catalog database schema. + # The example data is expected to be kept up to date to the latest Tiled + # release, but this CI run may include some unreleased schema changes, + # so we run a migration here. + shell: bash -l {0} + run: | + set -vxeuo pipefail + tiled catalog upgrade-database sqlite+aiosqlite:///tiled_test_db_sqlite.db + tiled catalog upgrade-database postgresql+asyncpg://postgres:secret@localhost:5432 + - name: Test with pytest shell: bash -l {0} run: | From 588eb28da94c2db33ad181bd034ef22df8abb49d Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 20:51:09 -0500 Subject: [PATCH 33/38] Target correct database for upgrade --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 66d68cf76..131c5efb8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,7 +72,7 @@ jobs: run: | set -vxeuo pipefail tiled catalog upgrade-database sqlite+aiosqlite:///tiled_test_db_sqlite.db - tiled catalog upgrade-database postgresql+asyncpg://postgres:secret@localhost:5432 + tiled catalog upgrade-database postgresql+asyncpg://postgres:secret@localhost:5432/tiled-example-data - name: Test with pytest shell: bash -l {0} From 9f2a2a368f924b294e948ddb85d4e6b6877a4bbb Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Sun, 21 Jan 2024 20:59:51 -0500 Subject: [PATCH 34/38] Rename SQL function --- tiled/catalog/orm.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index a6e70f308..b48326317 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -194,7 +194,7 @@ def unique_parameter_num_null_check(target, connection, **kw): connection.execute( text( """ -CREATE OR REPLACE FUNCTION test_parameter_exists() +CREATE OR REPLACE FUNCTION raise_if_parameter_exists() RETURNS TRIGGER AS $$ BEGIN IF EXISTS ( @@ -217,13 +217,13 @@ def unique_parameter_num_null_check(target, connection, **kw): BEFORE INSERT ON data_source_asset_association FOR EACH ROW WHEN (NEW.num IS NULL) -EXECUTE FUNCTION test_parameter_exists();""" +EXECUTE FUNCTION raise_if_parameter_exists();""" ) ) connection.execute( text( """ -CREATE OR REPLACE FUNCTION test_not_null_parameter_exists() +CREATE OR REPLACE FUNCTION raise_if_null_parameter_exists() RETURNS TRIGGER AS $$ BEGIN IF EXISTS ( @@ -247,7 +247,7 @@ def unique_parameter_num_null_check(target, connection, **kw): BEFORE INSERT ON data_source_asset_association FOR EACH ROW WHEN (NEW.num IS NOT NULL) -EXECUTE FUNCTION test_not_null_parameter_exists();""" +EXECUTE FUNCTION raise_if_null_parameter_exists();""" ) ) From ac0f8f260b961cea0c5dd43a72b17684129990b6 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 22 Jan 2024 19:46:33 -0500 Subject: [PATCH 35/38] Explain overly clever test. --- tiled/_tests/test_directory_walker.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tiled/_tests/test_directory_walker.py b/tiled/_tests/test_directory_walker.py index 742255f61..7ca556066 100644 --- a/tiled/_tests/test_directory_walker.py +++ b/tiled/_tests/test_directory_walker.py @@ -170,6 +170,7 @@ async def test_tiff_seq_custom_sorting(tmpdir): for i in ordering: file = Path(tmpdir, f"image{i:05}.tif") files.append(file) + # data is a block of ones tifffile.imwrite(file, i * data) settings = Settings.init() @@ -182,6 +183,11 @@ async def test_tiff_seq_custom_sorting(tmpdir): settings, ) client = from_context(context) + # We are being a bit clever here. + # Each image in this image series has pixels with a constant value, and + # that value matches the image's position in the sequence enumerated by + # `ordering`. We pick out one pixel and check that its value matches + # the corresponding value in `ordering`. actual = list(client["image"][:, 0, 0]) assert actual == ordering From 3852c268338f9609cda923280bc13264f1ba5cf2 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 22 Jan 2024 19:46:42 -0500 Subject: [PATCH 36/38] Update docstring for API change. --- tiled/adapters/hdf5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index 3c1b4da31..8aa9d70b2 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -31,7 +31,7 @@ class HDF5Adapter(collections.abc.Mapping, IndexersMixin): From the root node of a file given a filepath >>> import h5py - >>> HDF5Adapter.from_filepath("path/to/file.h5") + >>> HDF5Adapter.from_uri("file://localhost/path/to/file.h5") From the root node of a file given an h5py.File object From d8997e810d65b39e0548d02b0f73238acd34087d Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 22 Jan 2024 19:46:53 -0500 Subject: [PATCH 37/38] Give trigger better name. --- tiled/catalog/orm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index b48326317..bda9d0309 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -156,7 +156,7 @@ def unique_parameter_num_null_check(target, connection, **kw): connection.execute( text( """ -CREATE TRIGGER cannot_insert_num_null_if_num_int_exists +CREATE TRIGGER cannot_insert_num_null_if_num_exists BEFORE INSERT ON data_source_asset_association WHEN NEW.num IS NULL BEGIN @@ -213,7 +213,7 @@ def unique_parameter_num_null_check(target, connection, **kw): connection.execute( text( """ -CREATE TRIGGER cannot_insert_num_null_if_num_int_exists +CREATE TRIGGER cannot_insert_num_null_if_num_exists BEFORE INSERT ON data_source_asset_association FOR EACH ROW WHEN (NEW.num IS NULL) From 2ddf62099b4f85687f1a822bfd1a96a255dcd0f6 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 22 Jan 2024 19:49:36 -0500 Subject: [PATCH 38/38] Comment on handling of HDF5 virtual data sets. --- tiled/_tests/test_directory_walker.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tiled/_tests/test_directory_walker.py b/tiled/_tests/test_directory_walker.py index 7ca556066..b3e974a0d 100644 --- a/tiled/_tests/test_directory_walker.py +++ b/tiled/_tests/test_directory_walker.py @@ -250,6 +250,20 @@ def read_tiff_with_yaml_metadata(image_uri, metadata_uri, metadata=None, **kwarg @pytest.mark.asyncio async def test_hdf5_virtual_datasets(tmpdir): + # A virtual database comprises one master file and N data files. The master + # file must be handed to the Adapter for opening. The data files are not + # handled directly by the Adapter but they still ought to be tracked as + # Assets for purposes of data movement, accounting for data size, etc. + # This is why they are Assets with parameter=NULL/None, Assets not used + # directly by the Adapter. + + # One could do one-dataset-per-directory. But like TIFF series in practice + # they are often mixed, so we address that general case and track them at + # the per-file level. + + # Contrast this to Zarr, where the files involves are always bundled by + # directory. We track Zarr at the directory level. + layout = h5py.VirtualLayout(shape=(4, 100), dtype="i4") data_filepaths = []