diff --git a/scripts/create_registry_db.py b/scripts/create_registry_db.py index 126701c7..35c431a1 100644 --- a/scripts/create_registry_db.py +++ b/scripts/create_registry_db.py @@ -240,9 +240,9 @@ def _Dependency(schema, has_production): # The following should be adjusted whenever there is a change to the structure # of the database tables. _DB_VERSION_MAJOR = 2 -_DB_VERSION_MINOR = 0 +_DB_VERSION_MINOR = 1 _DB_VERSION_PATCH = 0 -_DB_VERSION_COMMENT = "Added production dependencies" +_DB_VERSION_COMMENT = "Add dataset status" # Parse command line arguments parser = argparse.ArgumentParser( diff --git a/src/cli/register.py b/src/cli/register.py index f5f002f2..3d67bd0f 100644 --- a/src/cli/register.py +++ b/src/cli/register.py @@ -37,7 +37,7 @@ def register_dataset(args): ) # Register new dataset. - new_id = datareg.Registrar.register_dataset( + new_id = datareg.Registrar.dataset.register( args.relative_path, args.version, name=args.name, diff --git a/src/dataregistry/DataRegistry.py b/src/dataregistry/DataRegistry.py index 1ab0b5eb..d4bef9b5 100644 --- a/src/dataregistry/DataRegistry.py +++ b/src/dataregistry/DataRegistry.py @@ -22,16 +22,17 @@ def __init__( """ Primary data registry wrapper class. - Class links to both the Registrar class, to registry new dataset, and - the Query class, to query existing datasets. + The DataRegistry class links to both the Registrar class, to + register/modify/delete datasets, and the Query class, to query existing + datasets. Links to the database is done automatically using the: - the users config file (if None defaults are used) - - the passed schema (if None default is used) + - the passed schema (if None the default schema is used) The `root_dir` is the location the data is copied to. This can be manually passed, or alternately a predefined `site` can be chosen. If - nether are chosen, the NERSC site will be selected. + nether are chosen, the NERSC site will be selected as the default. Parameters ---------- @@ -59,18 +60,13 @@ def __init__( self.db_connection = DbConnection(config_file, schema=schema, verbose=verbose) # Work out the location of the root directory - root_dir = self._get_root_dir(root_dir, site) + self.root_dir = self._get_root_dir(root_dir, site) # Create registrar object - self.Registrar = Registrar( - self.db_connection, - root_dir, - owner=owner, - owner_type=owner_type, - ) + self.Registrar = Registrar(self.db_connection, self.root_dir, owner, owner_type) # Create query object - self.Query = Query(self.db_connection, root_dir) + self.Query = Query(self.db_connection, self.root_dir) def _get_root_dir(self, root_dir, site): """ diff --git a/src/dataregistry/__init__.py b/src/dataregistry/__init__.py index b940e1f5..b14f76e2 100644 --- a/src/dataregistry/__init__.py +++ b/src/dataregistry/__init__.py @@ -1,7 +1,6 @@ from ._version import __version__ from .db_basic import * from .registrar import * -from .registrar_util import * from .query import * from .git_util import * from .DataRegistry import DataRegistry diff --git a/src/dataregistry/query.py b/src/dataregistry/query.py index b855909d..4463824a 100644 --- a/src/dataregistry/query.py +++ b/src/dataregistry/query.py @@ -2,7 +2,7 @@ from sqlalchemy import text, select import sqlalchemy.sql.sqltypes as sqltypes import pandas as pd -from dataregistry.registrar_util import _form_dataset_path +from dataregistry.registrar.registrar_util import _form_dataset_path from dataregistry.exceptions import DataRegistryNYI, DataRegistryException import os diff --git a/src/dataregistry/registrar/__init__.py b/src/dataregistry/registrar/__init__.py new file mode 100644 index 00000000..f0d44449 --- /dev/null +++ b/src/dataregistry/registrar/__init__.py @@ -0,0 +1 @@ +from .registrar import Registrar diff --git a/src/dataregistry/registrar/base_table_class.py b/src/dataregistry/registrar/base_table_class.py new file mode 100644 index 00000000..0d08131c --- /dev/null +++ b/src/dataregistry/registrar/base_table_class.py @@ -0,0 +1,137 @@ +import os + +from dataregistry.db_basic import TableMetadata +from sqlalchemy import select, update +from datetime import datetime + +from .registrar_util import ( + _bump_version, + _copy_data, + _form_dataset_path, + _name_from_relpath, + _parse_version_string, + _read_configuration_file, + get_directory_info, +) +from .dataset_util import set_dataset_status, get_dataset_status + +# Allowed owner types +_OWNER_TYPES = {"user", "project", "group", "production"} + +# Default maximum allowed length of configuration file allowed to be ingested +_DEFAULT_MAX_CONFIG = 10000 + + +class BaseTable: + def __init__(self, db_connection, root_dir, owner, owner_type): + """ + Base class to register/modify/delete entries in the database tables. + + Each table subclass (e.g., DatasetTable) will inherit this class. + + Functions universal to all tables, such as delete and modify are + written here, the register function, and other unique functions for the + tables, are in their respective subclasses. + + Parameters + ---------- + db_connection : DbConnection object + Encompasses sqlalchemy engine, dialect (database backend) + and schema version + root_dir : str + Root directory of the dataregistry on disk + owner : str + To set the default owner for all registered datasets in this + instance. + owner_type : str + To set the default owner_type for all registered datasets in this + instance. + """ + + # Root directory on disk for data registry files + self._root_dir = root_dir + + # Database engine and dialect. + self._engine = db_connection.engine + self._schema = db_connection.schema + + # Link to Table Metadata. + self._metadata_getter = TableMetadata(db_connection) + + # Store user id + self._uid = os.getenv("USER") + + # Default owner and owner_type's + self._owner = owner + self._owner_type = owner_type + + # Allowed owner types + self._OWNER_TYPES = _OWNER_TYPES + + # Max configuration file length allowed + self._DEFAULT_MAX_CONFIG = _DEFAULT_MAX_CONFIG + + def _get_table_metadata(self, tbl): + return self._metadata_getter.get(tbl) + + def delete(self, entry_id): + """ + Delete an entry from the DESC data registry. + + Parameters + ---------- + entry_id : int + Entry we want to delete from the registry + """ + + raise NotImplementedError + + def modify(self, entry_id, modify_fields): + """ + Modify an entry in the DESC data registry. + + Parameters + ---------- + entry_id : int + The dataset/execution/etc ID we wish to delete from the database + modify_fields : dict + Dict where key is the column to modify (must be allowed to modify) + and value is the desired new value for the entry + """ + + raise NotImplementedError + + def find_entry(self, entry_id): + """ + Find an entry in the database. + + Parameters + ---------- + entry_id : int + Unique identifier for table entry + e.g., dataset_id for the dataset table + + Returns + ------- + r : CursorResult object + Found entry (None if no entry found) + """ + + # Search for dataset in the registry. + my_table = self._get_table_metadata(self.which_table) + + if self.which_table == "dataset": + stmt = select(my_table).where(my_table.c.dataset_id == entry_id) + else: + raise ValueError("Can only perform `find_entry` on dataset table for now") + + with self._engine.connect() as conn: + result = conn.execute(stmt) + conn.commit() + + # Pull out the single result + for r in result: + return r + + # No results found + return None diff --git a/src/dataregistry/registrar.py b/src/dataregistry/registrar/dataset.py similarity index 61% rename from src/dataregistry/registrar.py rename to src/dataregistry/registrar/dataset.py index baee35c1..65c8f79c 100644 --- a/src/dataregistry/registrar.py +++ b/src/dataregistry/registrar/dataset.py @@ -1,304 +1,32 @@ -import time import os +import time from datetime import datetime +import shutil -# from sqlalchemy import MetaData, Table, Column, insert, text, -from sqlalchemy import update, select - -# from sqlalchemy.exc import DBAPIError, IntegrityError from dataregistry.db_basic import add_table_row -from dataregistry.registrar_util import _form_dataset_path, get_directory_info -from dataregistry.registrar_util import _parse_version_string, _bump_version -from dataregistry.registrar_util import ( +from sqlalchemy import select, update + +from .base_table_class import BaseTable +from .registrar_util import ( + _bump_version, + _copy_data, + _form_dataset_path, _name_from_relpath, + _parse_version_string, _read_configuration_file, - _copy_data, + get_directory_info, ) -from dataregistry.db_basic import TableMetadata - -# from dataregistry.exceptions import * - -__all__ = ["Registrar"] - -# Default maximum allowed length of configuration file allowed to be ingested -_DEFAULT_MAX_CONFIG = 10000 - -# Allowed owner types -_OWNER_TYPES = {"user", "project", "group", "production"} - - -class Registrar: - def __init__( - self, - db_connection, - root_dir, - owner=None, - owner_type=None, - ): - """ - Class to register new datasets, executions and alias names. - - Parameters - ---------- - db_connection : DbConnection object - Encompasses sqlalchemy engine, dialect (database backend) - and schema version - root_dir : str - Root directory of the dataregistry on disk - owner : str - To set the default owner for all registered datasets in this - instance. - owner_type : str - To set the default owner_type for all registered datasets in this - instance. - """ - - # Root directory on disk for data registry files - self._root_dir = root_dir - - # Database engine and dialect. - self._engine = db_connection.engine - self._schema = db_connection.schema - - # Link to Table Metadata. - self._metadata_getter = TableMetadata(db_connection) - - # Store user id - self._uid = os.getenv("USER") - - # Default owner and owner_type's - self._owner = owner - self._owner_type = owner_type - - def get_owner_types(self): - """ - Returns a list of allowed owner_types that can be registered within the - data registry. - - Returns - ------- - - : set - Set of owner_types - """ - - return _OWNER_TYPES - - def _get_table_metadata(self, tbl): - return self._metadata_getter.get(tbl) - - def _find_previous(self, relative_path, dataset_table, owner, owner_type): - """ - Check to see if a dataset exists already in the registry, and if we are - allowed to overwrite it. - - Parameters - ---------- - relative_path : str - Relative path to dataset - dataset_table : SQLAlchemy Table object - Link to the dataset table - owner : str - Owner of the dataset - owner_type : str - - Returns - ------- - previous : list - List of dataset IDs that are overwritable - """ - - # Search for dataset in the registry. - stmt = ( - select(dataset_table.c.dataset_id, dataset_table.c.is_overwritable) - .where( - dataset_table.c.relative_path == relative_path, - dataset_table.c.owner == owner, - dataset_table.c.owner_type == owner_type, - ) - .order_by(dataset_table.c.dataset_id.desc()) - ) - - with self._engine.connect() as conn: - result = conn.execute(stmt) - conn.commit() - - # If the datasets are overwritable, log their ID, else return None - previous = [] - for r in result: - if not r.is_overwritable: - return None - else: - previous.append(r.dataset_id) - - return previous - - def _handle_data(self, relative_path, old_location, owner, owner_type, verbose): - """ - Find characteristics of dataset (i.e., is it a file or directory, how - many files and total disk space of the dataset). - - If old_location is not None, copy the dataset files and directories - into the data registry. - - Parameters - ---------- - relative_path : str - Relative path of dataset in the data registry - old_location : str - Location of data (if not already in the data registry root) - Data will be copied from this location - owner : str - Owner of the dataset - owner_type : str - Owner type of the dataset - verbose : bool - True for extra output - - Returns - ------- - dataset_organization : str - "file", "directory", or "dummy" - num_files : int - Total number of files making up dataset - total_size : float - Total disk space of dataset in bytes - ds_creation_date : datetime - When file or directory was created - success : bool - True if data copy was successful, else False - """ - - # Get destination directory in data registry. - dest = _form_dataset_path( - owner_type, - owner, - relative_path, - schema=self._schema, - root_dir=self._root_dir, - ) - - # Is the data already on location, or coming from somewhere new? - if old_location: - loc = old_location - else: - loc = dest - - # Get metadata on dataset. - if os.path.isfile(loc): - dataset_organization = "file" - elif os.path.isdir(loc): - dataset_organization = "directory" - else: - raise FileNotFoundError(f"Dataset {loc} not found") - - if verbose: - tic = time.time() - print("Collecting metadata...", end="") - - ds_creation_date = datetime.fromtimestamp(os.path.getctime(loc)) - - if dataset_organization == "directory": - num_files, total_size = get_directory_info(loc) - else: - num_files = 1 - total_size = os.path.getsize(loc) - if verbose: - print(f"took {time.time()-tic:.2f}s") - - # Copy data into data registry - if old_location: - if verbose: - tic = time.time() - print( - f"Copying {num_files} files ({total_size/1024/1024:.2f} Mb)...", - end="", - ) - _copy_data(dataset_organization, old_location, dest) - if verbose: - print(f"took {time.time()-tic:.2f}") - else: - success = True - - return dataset_organization, num_files, total_size, ds_creation_date - - def register_execution( - self, - name, - description=None, - execution_start=None, - locale=None, - configuration=None, - input_datasets=[], - input_production_datasets=[], - max_config_length=_DEFAULT_MAX_CONFIG, - ): - """ - Register a new execution in the DESC data registry. - - Any args marked with '**' share their name with the associated column - in the registry schema. Descriptions of what these columns are can be - found in `schema.yaml` or the documentation. +from .dataset_util import set_dataset_status, get_dataset_status - Parameters - ---------- - name** : str - description** : str, optional - execution_start** : datetime, optional - locale** : str, optional - configuration** : str, optional - input_datasets** : list, optional - input_production_datasets** : list, optional - max_config_length : int, optional - Maxiumum number of lines to read from a configuration file - Returns - ------- - my_id : int - The execution ID of the new row relating to this entry - """ +class DatasetTable(BaseTable): + def __init__(self, db_connection, root_dir, owner, owner_type, execution_table): + super().__init__(db_connection, root_dir, owner, owner_type) - # Put the execution information together - values = {"name": name} - if locale: - values["locale"] = locale - if execution_start: - values["execution_start"] = execution_start - if description: - values["description"] = description - values["register_date"] = datetime.now() - values["creator_uid"] = self._uid + self.execution_table = execution_table + self.which_table = "dataset" - exec_table = self._get_table_metadata("execution") - dependency_table = self._get_table_metadata("dependency") - - # Read configuration file. Enter contents as a raw string. - if configuration: - values["configuration"] = _read_configuration_file( - configuration, max_config_length - ) - - # Enter row into data registry database - with self._engine.connect() as conn: - my_id = add_table_row(conn, exec_table, values, commit=False) - - # handle dependencies - for d in input_datasets: - values["register_date"] = datetime.now() - values["input_id"] = d - values["execution_id"] = my_id - add_table_row(conn, dependency_table, values, commit=False) - - # handle production dependencies - for d in input_production_datasets: - values["register_date"] = datetime.now() - values["input_production_id"] = d - values["execution_id"] = my_id - add_table_row(conn, dependency_table, values, commit=False) - - conn.commit() - return my_id - - def register_dataset( + def register( self, relative_path, version, @@ -323,10 +51,10 @@ def register_dataset( execution_configuration=None, input_datasets=[], input_production_datasets=[], - max_config_length=_DEFAULT_MAX_CONFIG, + max_config_length=None, ): """ - Register a new dataset in the DESC data registry. + Create a new dataset entry in the DESC data registry. Any args marked with '**' share their name with the associated column in the registry schema. Descriptions of what these columns are can be @@ -334,7 +62,8 @@ def register_dataset( First, the dataset entry is created in the database. If success, the data is then copied (if `old_location` was provided). Only if both - steps are successful will there be `is_valid=True` entry in the registry. + steps are successful will there be "valid" status entry in the + registry. Parameters ---------- @@ -383,13 +112,17 @@ def register_dataset( The execution ID associated with the dataset """ + # Set max configuration file length + if max_config_length is None: + max_config_length = self._DEFAULT_MAX_CONFIG + # Make sure the owner_type is legal if owner_type is None: if self._owner_type is not None: owner_type = self._owner_type else: owner_type = "user" - if owner_type not in _OWNER_TYPES: + if owner_type not in self._OWNER_TYPES: raise ValueError(f"{owner_type} is not a valid owner_type") # Establish the dataset owner @@ -417,17 +150,17 @@ def register_dataset( "Only the production schema can handle owner_type='production'" ) - # If name not passed, automatically generate a name from the relative path + # If `name` not passed, automatically generate a name from the relative path if name is None: name = _name_from_relpath(relative_path) # Look for previous entries. Fail if not overwritable dataset_table = self._get_table_metadata("dataset") - previous = self._find_previous(relative_path, dataset_table, owner, owner_type) + previous = self._find_previous(relative_path, owner, owner_type) if previous is None: print(f"Dataset {relative_path} exists, and is not overwritable") - return None + return None, None # Deal with version string (non-special case) if version not in ["major", "minor", "patch"]: @@ -451,7 +184,7 @@ def register_dataset( execution_name = f"{execution_name}-{version_suffix}" if execution_description is None: execution_description = "Fabricated execution for dataset" - execution_id = self.register_execution( + execution_id = self.execution_table.register( execution_name, description=execution_description, execution_start=execution_start, @@ -483,7 +216,6 @@ def register_dataset( values["is_overwritten"] = False values["is_external_link"] = False values["is_archived"] = False - values["is_valid"] = True values["register_date"] = datetime.now() values["owner_type"] = owner_type values["owner"] = owner @@ -491,8 +223,8 @@ def register_dataset( values["register_root_dir"] = self._root_dir # We tentatively start with an "invalid" dataset in the database. This - # will be upgraded to True if the data copying (if any) was successful. - values["is_valid"] = False + # will be upgraded to valid if the data copying (if any) was successful. + values["status"] = 0 # Create a new row in the data registry database. with self._engine.connect() as conn: @@ -518,11 +250,13 @@ def register_dataset( ) = self._handle_data( relative_path, old_location, owner, owner_type, verbose ) + valid_status = 1 else: dataset_organization = "dummy" num_files = 0 total_size = 0 ds_creation_date = None + valid_status = 0 # Case where use is overwriting the dateset `creation_date` if creation_date: @@ -538,7 +272,7 @@ def register_dataset( nfiles=num_files, total_disk_space=total_size / 1024 / 1024, creation_date=ds_creation_date, - is_valid=True, + status=set_dataset_status(values["status"], valid=True), ) ) conn.execute(update_stmt) @@ -546,44 +280,202 @@ def register_dataset( return prim_key, execution_id - def register_dataset_alias(self, aliasname, dataset_id): + def _handle_data(self, relative_path, old_location, owner, owner_type, verbose): """ - Register a new dataset alias in the DESC data registry. + Find characteristics of dataset (i.e., is it a file or directory, how + many files and total disk space of the dataset). - Any args marked with '**' share their name with the associated column - in the registry schema. Descriptions of what these columns are can be - found in `schema.yaml` or the documentation. + If old_location is not None, copy the dataset files and directories + into the data registry. Parameters ---------- - aliasname** : str - dataset_id** : int + relative_path : str + Relative path of dataset in the data registry + old_location : str + Location of data (if not already in the data registry root) + Data will be copied from this location + owner : str + Owner of the dataset + owner_type : str + Owner type of the dataset + verbose : bool + True for extra output Returns ------- - prim_key : int - The dataset_alias ID of the new row relating to this entry + dataset_organization : str + "file", "directory", or "dummy" + num_files : int + Total number of files making up dataset + total_size : float + Total disk space of dataset in bytes + ds_creation_date : datetime + When file or directory was created """ - now = datetime.now() - values = {"alias": aliasname} - values["dataset_id"] = dataset_id - values["register_date"] = now - values["creator_uid"] = self._uid + # Get destination directory in data registry. + dest = _form_dataset_path( + owner_type, + owner, + relative_path, + schema=self._schema, + root_dir=self._root_dir, + ) - alias_table = self._get_table_metadata("dataset_alias") + # Is the data already on location, or coming from somewhere new? + if old_location: + loc = old_location + else: + loc = dest + + # Get metadata on dataset. + if os.path.isfile(loc): + dataset_organization = "file" + elif os.path.isdir(loc): + dataset_organization = "directory" + else: + raise FileNotFoundError(f"Dataset {loc} not found") + + if verbose: + tic = time.time() + print("Collecting metadata...", end="") + + ds_creation_date = datetime.fromtimestamp(os.path.getctime(loc)) + + if dataset_organization == "directory": + num_files, total_size = get_directory_info(loc) + else: + num_files = 1 + total_size = os.path.getsize(loc) + if verbose: + print(f"took {time.time()-tic:.2f}s") + + # Copy data into data registry + if old_location: + if verbose: + tic = time.time() + print( + f"Copying {num_files} files ({total_size/1024/1024:.2f} Mb)...", + end="", + ) + _copy_data(dataset_organization, old_location, dest) + if verbose: + print(f"took {time.time()-tic:.2f}") + + return dataset_organization, num_files, total_size, ds_creation_date + + def _find_previous(self, relative_path, owner, owner_type): + """ + Find each dataset with combination of `relative_path`, `owner`, + `owner_type`. + + We want to know, of those datasets, which are overwritable but have not + yet been marked as overwritten. + + If any dataset with the same path has `is_overwritable=False`, the + routine returns None, indicating the dataset is not allowed to be + overwritten. + + Parameters + ---------- + relative_path : str + Relative path to dataset + owner : str + Owner of the dataset + owner_type : str + Owner type of the dataset + + Returns + ------- + dataset_id_list : list + List of dataset IDs that have the desired path combination that are + overwritable, but have not already previously been overwritten. + """ + + # Search for dataset in the registry. + dataset_table = self._get_table_metadata("dataset") + stmt = select( + dataset_table.c.dataset_id, + dataset_table.c.is_overwritable, + dataset_table.c.is_overwritten, + ) + + stmt = stmt.where( + dataset_table.c.relative_path == relative_path, + dataset_table.c.owner == owner, + dataset_table.c.owner_type == owner_type, + ) + + with self._engine.connect() as conn: + result = conn.execute(stmt) + conn.commit() + + # Pull out the single result + dataset_id_list = [] + for r in result: + if not r.is_overwritable: + return None + + if not r.is_overwritten: + dataset_id_list.append(r.dataset_id) + + return dataset_id_list + + def delete(self, dataset_id): + """ + Delete an dataset entry from the DESC data registry. + + This will also remove the raw data from the root dir, but the dataset + entry remains in the registry (now with an updated `status` field). + + Parameters + ---------- + dataset_id : int + Dataset we want to delete from the registry + """ + + # First make sure the given dataset id is in the registry + dataset_table = self._get_table_metadata(self.which_table) + previous_dataset = self.find_entry(dataset_id) + + # Check dataset exists + if previous_dataset is None: + raise ValueError(f"Dataset ID {dataset_id} does not exist") + # Check dataset is valid + if not get_dataset_status(previous_dataset.status, "valid"): + raise ValueError(f"Dataset ID {dataset_id} does not have a valid status") + # Check dataset has not already been deleted + if get_dataset_status(previous_dataset.status, "deleted"): + raise ValueError(f"Dataset ID {dataset_id} does not have a valid status") + + # Update the status of the dataset to deleted with self._engine.connect() as conn: - prim_key = add_table_row(conn, alias_table, values) - - # Update any other alias rows which have been superseded - stmt = ( - update(alias_table) - .where( - alias_table.c.alias == aliasname, - alias_table.c.dataset_alias_id != prim_key, + update_stmt = ( + update(dataset_table) + .where(dataset_table.c.dataset_id == dataset_id) + .values( + status=set_dataset_status(previous_dataset.status, deleted=True), + delete_date=datetime.now(), + delete_uid=self._uid, ) - .values(supersede_date=now) ) - conn.execute(stmt) + conn.execute(update_stmt) conn.commit() - return prim_key + + # Delete the physical data in the root_dir + if previous_dataset.data_org != "dummy": + data_path = _form_dataset_path( + previous_dataset.owner_type, + previous_dataset.owner, + previous_dataset.relative_path, + schema=self._schema, + root_dir=self._root_dir, + ) + print(f"Deleting data {data_path}") + if os.path.isfile(data_path): + os.remove(data_path) + else: + shutil.rmtree(data_path) + + print(f"Deleted {dataset_id} from data registry") diff --git a/src/dataregistry/registrar/dataset_alias.py b/src/dataregistry/registrar/dataset_alias.py new file mode 100644 index 00000000..8d925f05 --- /dev/null +++ b/src/dataregistry/registrar/dataset_alias.py @@ -0,0 +1,55 @@ +from datetime import datetime + +from dataregistry.db_basic import add_table_row +from sqlalchemy import update + +from .base_table_class import BaseTable + + +class DatasetAliasTable(BaseTable): + def __init__(self, db_connection, root_dir, owner, owner_type): + super().__init__(db_connection, root_dir, owner, owner_type) + + self.which_table = "dataset_alias" + + def register(self, aliasname, dataset_id): + """ + Create a new `dataset_alias` entry in the DESC data registry. + + Any args marked with '**' share their name with the associated column + in the registry schema. Descriptions of what these columns are can be + found in `schema.yaml` or the documentation. + + Parameters + ---------- + aliasname** : str + dataset_id** : int + + Returns + ------- + prim_key : int + The dataset_alias ID of the new row relating to this entry + """ + + now = datetime.now() + values = {"alias": aliasname} + values["dataset_id"] = dataset_id + values["register_date"] = now + values["creator_uid"] = self._uid + + alias_table = self._get_table_metadata("dataset_alias") + with self._engine.connect() as conn: + prim_key = add_table_row(conn, alias_table, values) + + # Update any other alias rows which have been superseded + stmt = ( + update(alias_table) + .where( + alias_table.c.alias == aliasname, + alias_table.c.dataset_alias_id != prim_key, + ) + .values(supersede_date=now) + ) + conn.execute(stmt) + conn.commit() + return prim_key diff --git a/src/dataregistry/registrar/dataset_util.py b/src/dataregistry/registrar/dataset_util.py new file mode 100644 index 00000000..e9f8887b --- /dev/null +++ b/src/dataregistry/registrar/dataset_util.py @@ -0,0 +1,74 @@ +# Define constants for dataset's "status" bit position +VALID_STATUS_BITS = { + # Is a valid dataset or not. "Invalid" means the dataset entry was created in + # the database, but there was an issue copying the physical data. + "valid": 0, + # Has the data of this dataset been deleted from the `root_dir`? + "deleted": 1, + # Has the data for this dataset been archived? + "archived": 2, +} + + +def set_dataset_status(current_valid_flag, valid=None, deleted=None, archived=None): + """ + Update a value of a dataset's status bit poistion. + + These properties are not mutually exclusive, e.g., a dataset can be both + archived and deleted. + + Properties + ---------- + current_valid_flag : int + The current bitwise representation of the dataset's status + valid : bool, optional + True to set the dataset as valid, False for invalid + deleted : bool, optional + True to set the dataset as deleted + archived : bool, optional + True to set the dataset as archived + + Returns + ------- + valid_flag : int + The datasets new bitwise representation + """ + + if valid is not None: + current_valid_flag &= ~(1 << VALID_STATUS_BITS["valid"]) + current_valid_flag |= valid << VALID_STATUS_BITS["valid"] + + if deleted is not None: + current_valid_flag &= ~(1 << VALID_STATUS_BITS["deleted"]) + current_valid_flag |= deleted << VALID_STATUS_BITS["deleted"] + + if archived is not None: + current_valid_flag &= ~(1 << VALID_STATUS_BITS["archived"]) + current_valid_flag |= archived << VALID_STATUS_BITS["archived"] + + return current_valid_flag + + +def get_dataset_status(current_valid_flag, which_bit): + """ + Return the status of a dataset for a given bit index. + + Properties + ---------- + current_flag_value : int + The current bitwise representation of the dataset's status + which_bit : str + One of VALID_STATUS_BITS keys() + + Returns + ------- + - : bool + True if `which_bit` is 1. e.g., If a dataset is deleted + `get_dataset_status(, "deleted") will return True. + """ + + # Make sure `which_bit` is valid. + if which_bit not in VALID_STATUS_BITS.keys(): + raise ValueError(f"{which_bit} is not a valid dataset status") + + return (current_valid_flag & (1 << VALID_STATUS_BITS[which_bit])) != 0 diff --git a/src/dataregistry/registrar/execution.py b/src/dataregistry/registrar/execution.py new file mode 100644 index 00000000..dec012d4 --- /dev/null +++ b/src/dataregistry/registrar/execution.py @@ -0,0 +1,94 @@ +from datetime import datetime + +from dataregistry.db_basic import add_table_row + +from .base_table_class import BaseTable +from .registrar_util import _read_configuration_file + + +class ExecutionTable(BaseTable): + def __init__(self, db_connection, root_dir, owner, owner_type): + super().__init__(db_connection, root_dir, owner, owner_type) + + self.which_table = "execution" + + def register( + self, + name, + description=None, + execution_start=None, + locale=None, + configuration=None, + input_datasets=[], + input_production_datasets=[], + max_config_length=None, + ): + """ + Create a new execution entry in the DESC data registry. + + Any args marked with '**' share their name with the associated column + in the registry schema. Descriptions of what these columns are can be + found in `schema.yaml` or the documentation. + + Parameters + ---------- + name** : str + description** : str, optional + execution_start** : datetime, optional + locale** : str, optional + configuration** : str, optional + input_datasets** : list, optional + input_production_datasets** : list, optional + max_config_length : int, optional + Maxiumum number of lines to read from a configuration file + + Returns + ------- + my_id : int + The execution ID of the new row relating to this entry + """ + + # Set max configuration file length + if max_config_length is None: + max_config_length = self._DEFAULT_MAX_CONFIG + + # Put the execution information together + values = {"name": name} + if locale: + values["locale"] = locale + if execution_start: + values["execution_start"] = execution_start + if description: + values["description"] = description + values["register_date"] = datetime.now() + values["creator_uid"] = self._uid + + exec_table = self._get_table_metadata("execution") + dependency_table = self._get_table_metadata("dependency") + + # Read configuration file. Enter contents as a raw string. + if configuration: + values["configuration"] = _read_configuration_file( + configuration, max_config_length + ) + + # Enter row into data registry database + with self._engine.connect() as conn: + my_id = add_table_row(conn, exec_table, values, commit=False) + + # handle dependencies + for d in input_datasets: + values["register_date"] = datetime.now() + values["input_id"] = d + values["execution_id"] = my_id + add_table_row(conn, dependency_table, values, commit=False) + + # handle production dependencies + for d in input_production_datasets: + values["register_date"] = datetime.now() + values["input_production_id"] = d + values["execution_id"] = my_id + add_table_row(conn, dependency_table, values, commit=False) + + conn.commit() + return my_id diff --git a/src/dataregistry/registrar/registrar.py b/src/dataregistry/registrar/registrar.py new file mode 100644 index 00000000..b7e846d2 --- /dev/null +++ b/src/dataregistry/registrar/registrar.py @@ -0,0 +1,52 @@ +from .base_table_class import _OWNER_TYPES +from .dataset import DatasetTable +from .dataset_alias import DatasetAliasTable +from .execution import ExecutionTable + +__all__ = ["Registrar"] + + +class Registrar: + def __init__(self, db_connection, root_dir, owner, owner_type): + """ + The registrar class is a wrapper for each table subclass (dataset, + execution and dataset_alias). Each table subclass can + register/modify/delete entries in those tables. + + Parameters + ---------- + db_connection : DbConnection object + Encompasses sqlalchemy engine, dialect (database backend) + and schema version + root_dir : str + Root directory of the dataregistry on disk + owner : str + To set the default owner for all registered datasets in this + instance. + owner_type : str + To set the default owner_type for all registered datasets in this + instance. + """ + + # Class wrappers which are used to register/modify/delete entries in + # their respective tables in the database + self.execution = ExecutionTable(db_connection, root_dir, owner, owner_type) + self.dataset_alias = DatasetAliasTable( + db_connection, root_dir, owner, owner_type + ) + self.dataset = DatasetTable( + db_connection, root_dir, owner, owner_type, self.execution + ) + + def get_owner_types(self): + """ + Returns a list of allowed `owner_types` that can be registered within the + data registry. + + Returns + ------- + - : set + Set of owner_types + """ + + return _OWNER_TYPES diff --git a/src/dataregistry/registrar_util.py b/src/dataregistry/registrar/registrar_util.py similarity index 98% rename from src/dataregistry/registrar_util.py rename to src/dataregistry/registrar/registrar_util.py index 17d8adee..2ca21e6e 100644 --- a/src/dataregistry/registrar_util.py +++ b/src/dataregistry/registrar/registrar_util.py @@ -2,9 +2,10 @@ import os import re import warnings -from sqlalchemy import MetaData, Table, Column, text, select from shutil import copyfile, copytree, rmtree +from sqlalchemy import select + __all__ = [ "_parse_version_string", "_bump_version", @@ -329,7 +330,7 @@ def _compute_checksum(file_path): os.rename(temp_dest, dest) print( - f"Something went wrong during data copying, aborting." + "Something went wrong during data copying, aborting." "Note an entry in the registry database will still have" "been created" ) diff --git a/src/dataregistry/schema/schema.yaml b/src/dataregistry/schema/schema.yaml index 0e5896da..30035fd6 100644 --- a/src/dataregistry/schema/schema.yaml +++ b/src/dataregistry/schema/schema.yaml @@ -248,10 +248,6 @@ dataset: type: "String" description: "User provided human-readable description of the dataset" cli_optional: True - is_valid: - type: "Boolean" - nullable: False - description: "False if, e.g., copy failed" execution_id: type: "Integer" foreign_key: True @@ -273,7 +269,22 @@ dataset: type: "Boolean" nullable: False description: "True if an external link" - is_archived: - type: "Boolean" + status: + type: "Integer" nullable: False - description: "True if dataset is archived" + description: "What is the status of the dataset? This is a bitmask description of multiple states. Bit number 0=valid (1 if dataset is valid, 0 if copy data failed during creation), 1=deleted (1 if dataset is deleted and no longer on disk, 0 if data is still on disk, database entry is always kept) 3=archived (1 if data has been archived). For example '0b011` would be valid=1, deleted=1 and archived=0." + archive_date: + type: "DateTime" + description: "Dataset archive date" + archive_path: + type: "String" + description: "Path the dataset was archived to" + delete_date: + type: "DateTime" + description: "Date the dataset was deleted" + delete_uid: + type: "String" + description: "User ID of person who deleted the dataset" + move_date: + type: "DateTime" + description: "Date the dataset was last moved" diff --git a/tests/end_to_end_tests/test_database.py b/tests/end_to_end_tests/test_database.py index 24f4b22a..0aa27954 100644 --- a/tests/end_to_end_tests/test_database.py +++ b/tests/end_to_end_tests/test_database.py @@ -52,5 +52,5 @@ def test_db_version(): """ actual_major, actual_minor, actual_patch = datareg.Query.get_db_versioning() assert actual_major == 2, "db major version doesn't match expected" - assert actual_minor == 0, "db minor version doesn't match expected" + assert actual_minor == 1, "db minor version doesn't match expected" assert actual_patch == 0, "db patch version doesn't match expected" diff --git a/tests/end_to_end_tests/test_end_to_end.py b/tests/end_to_end_tests/test_end_to_end.py index 3943131c..e6fb0c8f 100644 --- a/tests/end_to_end_tests/test_end_to_end.py +++ b/tests/end_to_end_tests/test_end_to_end.py @@ -4,14 +4,25 @@ from dataregistry import DataRegistry from dataregistry.db_basic import SCHEMA_VERSION -from dataregistry.registrar import _OWNER_TYPES import pytest +from dataregistry.registrar.registrar_util import _form_dataset_path +from dataregistry.registrar.dataset_util import set_dataset_status, get_dataset_status + @pytest.fixture def dummy_file(tmp_path): """ - Create some dummy (temporary) files and directories + Create some dummy (temporary) files and directories: + + | - + | - + | - file1.txt + | - file2.txt + | - + | - file2.txt + | + | - Parameters ---------- @@ -30,8 +41,9 @@ def dummy_file(tmp_path): tmp_src_dir = tmp_path / "source" tmp_src_dir.mkdir() - f = tmp_src_dir / "file1.txt" - f.write_text("i am a dummy file") + for i in range(2): + f = tmp_src_dir / f"file{i+1}.txt" + f.write_text("i am a dummy file") p = tmp_src_dir / "directory1" p.mkdir() @@ -82,7 +94,7 @@ def _insert_alias_entry(datareg, name, dataset_id): The alias ID for this new entry """ - new_id = datareg.Registrar.register_dataset_alias(name, dataset_id) + new_id = datareg.Registrar.dataset_alias.register(name, dataset_id) assert new_id is not None, "Trying to create a dataset alias that already exists" print(f"Created dataset alias entry with id {new_id}") @@ -113,7 +125,7 @@ def _insert_execution_entry( The execution ID for this new entry """ - new_id = datareg.Registrar.register_execution( + new_id = datareg.Registrar.execution.register( name, description=description, input_datasets=input_datasets, @@ -201,7 +213,7 @@ def _insert_dataset_entry( make_sym_link = False # Add new entry. - dataset_id, execution_id = datareg.Registrar.register_dataset( + dataset_id, execution_id = datareg.Registrar.dataset.register( relpath, version, version_suffix=version_suffix, @@ -431,9 +443,11 @@ def test_copy_data(dummy_file, data_org): "data_org,data_path,v_str,overwritable", [ ("file", "file1.txt", "0.0.1", True), - ("file", "file1.txt", "0.0.2", False), + ("file", "file1.txt", "0.0.2", True), + ("file", "file1.txt", "0.0.3", False), ("directory", "dummy_dir", "0.0.1", True), - ("directory", "dummy_dir", "0.0.2", False), + ("directory", "dummy_dir", "0.0.2", True), + ("directory", "dummy_dir", "0.0.3", False), ], ) def test_on_location_data(dummy_file, data_org, data_path, v_str, overwritable): @@ -441,9 +455,9 @@ def test_on_location_data(dummy_file, data_org, data_path, v_str, overwritable): Test ingesting real data into the registry (already on location). Also tests overwriting datasets. - Does twice for each file, the first is a normal entry with - `is_overwritable=True`. The second tests overwriting the previous data with - a new version. + Does three times for each file, the first is a normal entry with + `is_overwritable=True`. The second and third tests overwriting the previous + data with a new version. """ # Establish connection to database @@ -488,14 +502,22 @@ def test_on_location_data(dummy_file, data_org, data_path, v_str, overwritable): else: assert getattr(r, "dataset.is_overwritable") == True assert getattr(r, "dataset.is_overwritten") == True - else: - if num_results == 1: - assert getattr(r, "dataset.is_overwritable") == False + elif getattr(r, "version_string") == "0.0.2": + assert num_results >= 2 + if num_results == 2: + assert getattr(r, "dataset.is_overwritable") == True + assert getattr(r, "dataset.is_overwritten") == False + elif num_results == 3: + assert getattr(r, "dataset.is_overwritable") == True assert getattr(r, "dataset.is_overwritten") == True - else: + elif getattr(r, "version_string") == "0.0.3": + assert num_results >= 3 + if num_results == 3: assert getattr(r, "dataset.is_overwritable") == False assert getattr(r, "dataset.is_overwritten") == False - assert i < 2 + else: + assert getattr(r, "dataset.is_overwritable") == True + assert getattr(r, "dataset.is_overwritten") == True def test_dataset_alias(dummy_file): @@ -845,3 +867,91 @@ def test_get_dataset_absolute_path(dummy_file): assert v == os.path.join( str(tmp_root_dir), SCHEMA_VERSION, dset_ownertype, dset_owner, dset_relpath ) + + +@pytest.mark.parametrize( + "is_dummy,dataset_name", + [ + (True, "dummy_dataset_to_delete"), + (False, "real_dataset_to_delete"), + (False, "real_directory_to_delete"), + ], +) +def test_delete_entry(dummy_file, is_dummy, dataset_name): + """ + Make a simple entry, then delete it, then check it was deleted. + + Does this for a dummy dataset and a real one. + """ + + # Establish connection to database + tmp_src_dir, tmp_root_dir = dummy_file + datareg = DataRegistry(root_dir=str(tmp_root_dir), schema=SCHEMA_VERSION) + + # Make sure we raise an exception trying to delete a dataset that doesn't exist + with pytest.raises(ValueError, match="does not exist"): + datareg.Registrar.dataset.delete(10000) + + # Where is the real data? + if is_dummy: + data_path = None + else: + if dataset_name == "real_dataset_to_delete": + data_path = str(tmp_src_dir / "file2.txt") + assert os.path.isfile(data_path) + else: + data_path = str(tmp_src_dir / "directory1") + assert os.path.isdir(data_path) + + # Add entry + d_id = _insert_dataset_entry( + datareg, + f"DESC/datasets/{dataset_name}", + "0.0.1", + "user", + None, + "A dataset to delete", + is_dummy=is_dummy, + old_location=data_path, + ) + + # Now delete that entry + datareg.Registrar.dataset.delete(d_id) + + # Check the entry was deleted + f = datareg.Query.gen_filter("dataset.dataset_id", "==", d_id) + results = datareg.Query.find_datasets( + [ + "dataset.status", + "dataset.delete_date", + "dataset.delete_uid", + "dataset.owner_type", + "dataset.owner", + "dataset.relative_path", + ], + [f], + return_format="cursorresult", + ) + + for r in results: + assert get_dataset_status(getattr(r, "dataset.status"), "deleted") + assert getattr(r, "dataset.delete_date") is not None + assert getattr(r, "dataset.delete_uid") is not None + + if not is_dummy: + # Make sure the file in the root_dir has gone + data_path = _form_dataset_path( + getattr(r, "dataset.owner_type"), + getattr(r, "dataset.owner"), + getattr(r, "dataset.relative_path"), + schema=SCHEMA_VERSION, + root_dir=str(tmp_root_dir), + ) + if dataset_name == "real_dataset_to_delete": + assert not os.path.isfile(data_path) + else: + assert not os.path.isdir(data_path) + + # Make sure we can not delete an already deleted entry. + with pytest.raises(ValueError, match="not have a valid status"): + datareg.Registrar.dataset.delete(d_id) diff --git a/tests/end_to_end_tests/test_query_cli_entries.py b/tests/end_to_end_tests/test_query_cli_entries.py index 209a8588..154ed768 100644 --- a/tests/end_to_end_tests/test_query_cli_entries.py +++ b/tests/end_to_end_tests/test_query_cli_entries.py @@ -6,7 +6,7 @@ from dataregistry.db_basic import SCHEMA_VERSION # Establish connection to database (default schema) -datareg = DataRegistry(root_dir="temp") +datareg = DataRegistry(root_dir="temp_root_dir") def test_cli_basic_dataset(): @@ -44,7 +44,7 @@ def test_cli_production_entry(): if datareg.Query._dialect != "sqlite": # Establish connection to database (production schema) - datareg_prod = DataRegistry(schema="production") + datareg_prod = DataRegistry(schema="production", root_dir="temp_root_dir") f = datareg_prod.Query.gen_filter( "dataset.name", "==", "my_production_cli_dataset" diff --git a/tests/unit_tests/test_dataset_status.py b/tests/unit_tests/test_dataset_status.py new file mode 100644 index 00000000..2342cf7a --- /dev/null +++ b/tests/unit_tests/test_dataset_status.py @@ -0,0 +1,51 @@ +from dataregistry.registrar.dataset_util import set_dataset_status, get_dataset_status +import pytest + + +@pytest.mark.parametrize( + "start_status,valid,deleted,archived,end_status", + [ + (0, True, False, False, "0b1"), + (0, True, True, True, "0b111"), + (0, True, False, True, "0b101"), + (5, None, True, None, "0b111"), + ], +) +def test_set_dataset_status(start_status, valid, deleted, archived, end_status): + """ + Make sure dataset bitwise valid flags get set correctly + + Starts from a value and adds a flag, e.g., `deleted`, then + checks the combined bitmask is correct. + """ + + assert ( + bin( + set_dataset_status( + start_status, valid=valid, deleted=deleted, archived=archived + ) + ) + == end_status + ) + + +@pytest.mark.parametrize( + "bin_status,is_valid,is_deleted,is_archived", + [ + ("0b1", True, False, False), + ("0b111", True, True, True), + ("0b101", True, False, True), + ("0b011", True, True, False), + ], +) +def test_get_dataset_status(bin_status, is_valid, is_deleted, is_archived): + """ + Make sure dataset bitwise valid flags get checked correctly. + + For a given `bin_status` (binary status), check that it pulls out the + individual flags correctly. + """ + + assert get_dataset_status(int(bin_status, 2), "valid") == is_valid + assert get_dataset_status(int(bin_status, 2), "deleted") == is_deleted + assert get_dataset_status(int(bin_status, 2), "archived") == is_archived diff --git a/tests/unit_tests/test_registrar_util.py b/tests/unit_tests/test_registrar_util.py index ebad041c..247a0805 100644 --- a/tests/unit_tests/test_registrar_util.py +++ b/tests/unit_tests/test_registrar_util.py @@ -1,4 +1,4 @@ -from dataregistry.registrar_util import ( +from dataregistry.registrar.registrar_util import ( _parse_version_string, _name_from_relpath, _form_dataset_path, diff --git a/tests/unit_tests/test_root_dir.py b/tests/unit_tests/test_root_dir.py index 44d1841f..fe0e56ec 100644 --- a/tests/unit_tests/test_root_dir.py +++ b/tests/unit_tests/test_root_dir.py @@ -39,5 +39,4 @@ def test_root_dir_manual(root_dir, site, set_env_var, ans): if reg.db_connection.dialect != "sqlite": assert reg.db_connection.schema is not None - assert reg.Registrar._root_dir == ans - assert reg.Query._root_dir == ans + assert reg.root_dir == ans diff --git a/tests/unit_tests/test_rutil_copy_data.py b/tests/unit_tests/test_rutil_copy_data.py index 91a20c38..4f6c4ced 100644 --- a/tests/unit_tests/test_rutil_copy_data.py +++ b/tests/unit_tests/test_rutil_copy_data.py @@ -1,6 +1,6 @@ import pytest import os -from dataregistry.registrar_util import _copy_data +from dataregistry.registrar.registrar_util import _copy_data @pytest.fixture