diff --git a/chimedb/__init__.py b/chimedb/__init__.py new file mode 100644 index 0000000..8db66d3 --- /dev/null +++ b/chimedb/__init__.py @@ -0,0 +1 @@ +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/chimedb/cfm/api.py b/chimedb/cfm/api.py new file mode 100644 index 0000000..7f38cd1 --- /dev/null +++ b/chimedb/cfm/api.py @@ -0,0 +1,454 @@ +"""Nemo Public API""" +from __future__ import annotations +from typing import TYPE_CHECKING, Union, Tuple + +import enum +import pathlib +import peewee as pw +from tabulate import tabulate +from collections import defaultdict + +import chimedb.core as db +from chimedb.data_index import ( + ArchiveFile, + ArchiveAcq, + ArchiveFileCopy, + ArchiveFileCopyRequest, + StorageNode, + StorageGroup, +) + +from .tags import FileReservationTag, FileReservation + +if TYPE_CHECKING: + import pathlib + + # A file may be specified via (nicest first): + # * an ArchiveFile record + # * a 2-tuple with two strings containing an acquisition name and a + # file name + # * a pathlib.Path + # * a string with a path + FileSpec = Union[str, pathlib.Path, Tuple[str, str], ArchiveFile] + + +class CopyState(enum.Flag): + """State bitmask for managed file copies: + + Flags: + * PRESENT: file is present on the managed node + * AVAILABLE: file is present on the source node(s) + * RESERVED: reservation exists for current tag + * RECALLED: a request was made to recall the file + """ + + PRESENT = enum.auto() + AVAILABLE = enum.auto() + RESERVED = enum.auto() + RECALLED = enum.auto() + + +class FileManager: + """Base object for data management. + + In general, on cedar, you should use the pre-configured + `CedarManager` subclass, which is already set up for + you, rather than instantiating this class directly. + + Parameters + ---------- + node + The destination node. + + Raises + ------ + ValueError + `node.io_class` was not "Reserving". + + """ + + __slots__ = ["_node", "_sources", "_tag"] + + def __init__(self, node: StorageNode) -> None: + # if node.io_class != "Reserving": + # raise ValueError("non-Reserving node provided.") + + self._node = node + self._sources = list() + self._tag = None + + # Connect to the database + db.connect(read_write=True) + + def add_source(self, src: StorageNode | StorageGroup) -> None: + """Add `src` as a data source. + + If `src` is a group, all nodes in the group are added. + + Parameters + ---------- + src + The source. + + Raises + ------ + TypeError + src was neither a StorageNode neither a StorageGroup + """ + + if isinstance(src, StorageGroup): + for node in StorageNode.select().where(StorageNode.group == src): + self.add_source(node) + elif isinstance(src, StorageNode): + self._sources.append(src) + else: + raise TypeError("expected StorageNode or StorageGroup") + + def use_tag(self, tag: FileReservationTag | str) -> None: + """Use `tag` for future reservations.""" + + if not isinstance(tag, FileReservationTag): + tag = FileReservationTag.get(name=tag) + + self._tag = tag + + def _autotags(self, tags): + if tags: + return tags + + if self._tag is None: + raise ValueError("no tag in use") + + return [self._tag] + + # Reservation querying methods + # These need no sources to work + + def all_reserved_files( + self, tags: list(FileReservationTag) | None = None + ) -> list(ArchiveFile): + """List all files reserved by tag. + + Parameters: + ----------- + tags : optional + List files reserved by at least one of the given `tags`. + If this is omitted, or None, the currently in-use tag is used. + + Returns: + -------- + files + The ArchiveFiles reserved by the current tag on this node. + May be the empty list. + + Raises + ------ + ValueError: + No `tags` were given and no tag is in use. (See `use_tag`.) + """ + + # This raises ValueError if both tags and self._tag are None/empty. + tags = self._autotags(tags) + + # query + files = ( + ArchiveFile.select(ArchiveFile) + .join(FileReservation) + .where(FileReservation.node == self._node, FileReservation.tag << tags) + ) + + return list(files) + + def reserved_size(self, tags: list(FileReservationTag) | None = None) -> int: + """Total size of files reserved by the current tag. + + Parameters: + ----------- + tags : optional + List files reserved by at least one of the given `tags`. + If this is omitted, or None, the currently in-use tag is used. + + Returns + ------- + size + The total size in bytes. + + Raises + ------ + ValueError: + No `tags` were given and no tag is in use. (See `use_tag`.) + """ + + # This raises ValueError if both tags and self._tag are None/empty. + tags = self._autotags(tags) + + # query + size = ( + ArchiveFile(pw.SUM(ArchiveFile.size_b)) + .join(FileReservation) + .where(FileReservation.node == self._node, FileReservation.tag << tags) + ).scalar() + + return size + + def is_reserved(self, file: FileSpec) -> bool: + """Has the current tag reserved `file`?""" + return self.reserve(file, check_only=True) + + def _fixup_filespec(self, files: FileSpec | list(FileSpec)) -> list(ArchiveFile): + """Convert `files` into a list of `ArchiveFile` records. + + A single file may be specified one of three ways: + * a string containing a path + * a Pathlib.Path containing a path + * an ArchiveFile record itself + * a 2-tuple of strings: (acq-name, file-name) + The paths may be absolute or relative. If absolute, the _must_ include + the storage node root. If relative, they're assumed to be relative to that path. + + Parameters + ---------- + files: + Either a single file specification, as outlined above, or else a list + of the same (which is allowed to be empty). + + Returns + ------- + files: list of ArchiveFiles + this is always a list, even in the case of a single file spec. + + Raises + ------ + ValueError + At least one file spec was not relative to node.root (if an + absolute path), or did not refer to a valid file. + """ + files_out = list() + + # if we don't have a list, first step is to listify + if not isinstance(files, list): + files = [files] + + # Now loop over files and convert. + for file in files: + # We essentially do a type cascade here: + # str -> pathlib -> 2-tuple -> ArchiveFile + + # Convert str to pathlib, if necessary + if isinstance(file, str): + file = pathlib.Path(file) + + # Convert pathlib to 2-tuple: + if isinstance(file, pathlib.Path): + # Make relative + if file.is_absolute: + # Raises ValueError on failure + file = file.relative_to(self._node.root) + + file = file.parts + + # Look-up ArchiveFile from 2-tuple: + if isinstance(file, tuple): + if len(file) != 2: + raise ValueError(f"Expected two path elements, but got: {file}") + + # Find the file + try: + file = ( + ArchiveFile.select() + .join(ArchiveAcq) + .where( + ArchiveAcq.name == file[0], + ArchiveFile.name == file[1], + ) + .get() + ) + except pw.NotFoundError: + raise ValueError(f"No such file: {file[0]}/{file[1]}") + + # Finally we have an ArchiveFile, append it to the output list + files_out.append(file) + + # Return the converted list + return files_out + + def _filecopy_state(self, file: ArchiveFile) -> Tuple[CopyState, StorageNode]: + """What is the state of `file` for the current tag? + + Also returns the source node containing `file`, if any.""" + + state = CopyState(0) + source = None + + # Is it on the destination? + try: + ArchiveFileCopy.get( + ArchiveFileCopy.file == file, + ArchiveFileCopy.node == self._node, + ArchiveFileCopy.has_file == "Y", + ) + state |= CopyState.PRESENT + except pw.DoesNotExist: + pass + + # Is it on at least one of the sources? + # If so, remember which one + try: + copy = ArchiveFileCopy.get( + ArchiveFileCopy.file == file, + ArchiveFileCopy.node << self._sources, + ArchiveFileCopy.has_file == "Y", + ) + state |= CopyState.AVAILABLE + source = copy.node + except pw.DoesNotExist: + pass + + # Is it reserved? + try: + FileReservation.get( + FileReservation.file == file, + FileReservation.tag == self._tag, + FileReservation.node == self._node, + ) + state |= CopyState.RESERVED + except pw.DoesNotExist: + pass + + return state, source + + def reserve( + self, + files: FileSpec | list(FileSpec), + check_only: bool = False, + verbose: bool = False, + ) -> bool: + """Reserve `file` using the current tag. + + Parameters + ---------- + files: + The file or files to reserve. If specified as a path, + should be relative to the node root. + check_only: optional + If set to `True`, new copy requests will not be made if `files` + aren't already available on the node. In this case, the only + result of calling this method is its return value. + verbose: optional + if set to `True`, a summary of the current state of the `files` + is written to the terminal. + + Returns + ------- + present : bool + True if all reserved files are already on the storage node. + If this is False, and `check_only` wasn't set to True, then + this call will have created requests to have the missing files + transferred from the source nodes onto the storage node. + """ + files = self._fixup_filespec(files) + + result = True + + states = dict() + + for file in files: + # Get current state: + state, source = self._filecopy_state(file) + + # Update result + if result and CopyState.PRESENT not in state: + result = False + + # Skip all this if we're only checking + if not check_only: + # Reserve the file + if CopyState.RESERVED not in state: + FileReservation.create(file=file, node=self._node, tag=self._tag) + + # Recall the file + if ( + state & (CopyState.PRESENT | CopyState.AVAILABLE) + == CopyState.AVAILABLE + ): + _, created = ArchiveFileCopyRequest.get_or_create( + file=file, + group_to=self._node.group, + node_from=source, + nice=0, + cancelled=False, + completed=False, + n_requests=0, + timestamp=0, + ) + if created: + state |= CopyState.RECALLED + + # Remember + states[file] = state + + # Verbose report + if verbose: + # Descriptions of states + def _state_name(state, check_only): + # This is the reservation state at the _start_ + if CopyState.RESERVED in state: + name = "Previously reserved" + elif check_only: + name = "Unreserved" + else: + name = "Newly reserved" + + if CopyState.PRESENT in state: + return name + ", present" + + if CopyState.RECALLED in state: + return name + ", recalling" + + if CopyState.AVAILABLE in state: + return name + ", available" + + return name + ", missing" + + # Tot up things + file_totals = defaultdict(int) + byte_totals = defaultdict(int) + + for file, state in states.items(): + name = _state_name(state, check_only) + file_totals[name] += 1 + byte_totals[name] += file.size_b + + # tabulate + table_data = [ + [name, file_totals[name], byte_totals[name] / 1e9] + for name in file_totals + ] + + # Now print + print( + tabulate( + table_data, + headers=["State", "Files", "GB"], + intfmt=",", + floatfmt=",.3f", + ) + ) + + return result + + +class CedarManager(FileManager): + """A FileManager pre-configured for CHIME use on cedar. + + Parameters + ---------- + tag + The reservation tag to use. Can be changed later + via `use_tag`. + """ + + def __init__(self, tag: FileReservationTag | str) -> None: + super().__init__(StorageNode.get(name="cedar_online")) + self.add_source(StorageGroup.get(name="cedar_nearline")) + self.use_tag(tag) diff --git a/chimedb/cfm/cli.py b/chimedb/cfm/cli.py new file mode 100644 index 0000000..b9250e5 --- /dev/null +++ b/chimedb/cfm/cli.py @@ -0,0 +1,165 @@ +"""CHIME file manager CLI""" +from __future__ import annotations +from typing import TextIO + +import os +import time +import click +import shlex +import tempfile +from subprocess import Popen + +import chimedb.core as db + +from .api import CedarManager + + +@click.group +def cli(): + """This is the Cedar File Manager. It is not documented.""" + pass + + +@cli.group +def tag(): + """Manage reservation tags""" + + +def get_editor(context: click.Context) -> str: + """Figure out the user's preferred editor. + + Essentially the same as how git does it. + """ + + # This is not recommended, but if someone really + # wants a CFM-specific editor, we'll support it. + try: + return os.environ["CFM_EDITOR"] + except KeyError: + pass + + # Are we using a dumb terminal? + term = os.environ.get("TERM", "dumb") + terminal_is_dumb = "dumb" in term + + # the VISUAL editor only works on non-dumb terminals + if not terminal_is_dumb: + try: + return os.environ["VISUAL"] + except KeyError: + pass + + # the EDITOR works on all terminals + try: + return os.environ["EDITOR"] + except KeyError: + pass + + # Git only defines a fallback for non-dumb terminals + if not terminal_is_dumb: + # NB: if "vi" is vim, you really should be running + # with the -f flag (which standard vi does not support). + + # Be obnoxious + click.echo('No editor found. Falling back to "vi".', err=True) + time.sleep(1) + + return "vi" + + # No editor found + context.fail("No editor found. Specify one with environmental variable CFM_EDITOR or EDITOR.") + + + +@tag.command +@click.pass_context +@click.option( + "--description", + "-d", + metavar="DESC", + help="The description of the tag. If not set, an editor will be spawned to give you the opportunity to enter the description.", +) +@click.option( + "--user", + "-u", + metavar="NAME", + type=str, + help="The user creating the tag. If not set, your username is used.", +) +@click.argument("tag", type=str) +def new( + context: click.Context, + description: str | None, + user: str | None, + tag: str, +): + """Create a new tag called TAG.""" + + # If no user, use username + if user is None: + user = os.getlogin() + + if description is None: + editor = get_editor(context) + + # Create file + with tempfile.NamedTemporaryFile(prefix="cfm-", delete=False) as f: + f.write( + b'# Enter the description of the tag "' + tag.encode() + b'"\n' + b"# Lines beginning with the # will be ignored." + ) + f.close() + + # Now open in the external editor for the user + args = shlex.split(editor) + args.append(f.name) + proc = Popen(args) + proc.wait() + + +@cli.command +@click.option( + "--check", + "-c", + is_flag=True, + help="Check only: don't make any changes to the database.", +) +@click.option( + "--tag", "-t", metavar="TAG", type=str, help="Use reservation TAG to reserve files." +) +@click.option( + "--read-from", + "-f", + type=click.File(mode="r"), + metavar="TEXTFILE", + help='read FILEs from TEXTFILE. If TEXTFILE is "-", read from standard input.', +) +@click.argument("envtag", envvar="CHIMEFM_TAG", type=str) +@click.argument("file_", metavar="FILE", nargs=-1, type=str) +def reserve( + check: bool, + tag: str | None, + read_from: TextIO | None, + envtag: str | None, + file_: tuple | None, +): + """Reserve FILE(s).""" + + # If no tag is specified, look for an environmental tag + if tag is None: + if envtag is None: + click.UsageError("No tag specified.") + else: + tag = envtag + + # Create a list of files + files = list(file_) + + if read_from: + files.append(read_from.readlines()) + + # Create the manager + cm = CedarManager(tag) + + # Reserve all the things + cm.reserve(files, check_only=check, verbose=True) diff --git a/chimedb/cfm/tags.py b/chimedb/cfm/tags.py new file mode 100644 index 0000000..ac7f19e --- /dev/null +++ b/chimedb/cfm/tags.py @@ -0,0 +1,70 @@ +"""FileReservationTag and FileReservation tables +""" + +import peewee as pw + +from chimedb.core.orm import base_model +from chimedb.data_index import ArchiveFile, StorageNode + + +class FileReservationTag(base_model): + """The list of available tags for files. + + Attributes + ---------- + name : str + The tag name + creator : str + The username of the creator of this tag + creation_date : datetime + The time when this tag was created + description : str, optional + A description of the tag + """ + + name = pw.CharField(max_length=64, unique=True) + creator = pw.CharField(max_length=64) + creation_time = pw.DateTimeField() + description = pw.TextField(null=True) + + +class FileReservation(base_model): + """ArchiveFile reservation records. + + Attributes + ---------- + file : foreign key to ArchiveFile + The file being reserved + node : foreign key to StorageNode + The storage node on which the file is reserved + tag : foreign key to FileReservationTag + The tag reserving the file + """ + + file = pw.ForeignKeyField(ArchiveFile, backref="reservations") + node = pw.ForeignKeyField(StorageNode, backref="reservations") + tag = pw.ForeignKeyField(FileReservationTag, backref="reservations") + + def reserved_in_node(self, file: ArchiveFile, node: StorageNode): + """Is `file` reserved in `node`? + + Parameters + ---------- + file : ArchiveFile + The file to check the reservation of + node : StorageNode + The node to check the reservation of + + Returns + ------- + tags : list of FileReservationTag + If the file has reservations in `node`, this a the list + of `FileReservationTag` values reserving this file. + If the file has no reservations, this is the empty list. + """ + return [ + rec.tag + for rec in FileReservation.select(FileReservation.tag) + .where(FileReservation.file == file, FileReservation.node == StorageNode) + .execute() + ] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ac05948 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,34 @@ +[build-system] +requires = ["setuptools>=61.0.0.", "wheel", "setuptools-git-versioning"] +build-backed = "setuptools.build_meta" + +[project] +name = "chimedb.cfm" +authors = [ + {name = "D. V. Wiebe for the CHIME Collaboration", email = "dvw@phas.ubc.ca"} +] +description = "CHIME file manager" +requires-python = ">=3.10" +dynamic = ["readme", "version"] +license = {file = "LICENSE"} +dependencies = [ + "click", + "peewee", + "chimedb @ git+https://github.com/chime-experiment/chimedb.git", + "chimedb.data_index @ git+https://github.com/chime-experiment/chimedb_di.git", +] + +[project.scripts] +cfm = "chimedb.cfm.cli:cli" + + +[project.optional-dependencies] +test = [ + "pytest >= 7.0" +] + +[tool.setuptools.dynamic] +readme = {file = ["README.md"], content-type = "text/markdown"} + +[tool.setuptools-git-versioning] +enabled = true diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3c76c8d --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,126 @@ +"""Common fixtures.""" + +import pytest +import chimedb.core as db +from chimefm.tags import FileReservation, FileReservationTag +from chimedb.data_index import ( + AcqType, + ArchiveAcq, + ArchiveFile, + ArchiveFileCopy, + ArchiveFileCopyRequest, + FileType, + StorageGroup, + StorageNode, +) + + +@pytest.fixture +def proxy(): + """Open a connection to the database. + + Returns the database proxy. + """ + db.test_enable() + db.connect(read_write=True) + yield db.proxy + + db.close() + + +@pytest.fixture +def tables(proxy): + """Ensure all the tables are created.""" + + proxy.create_tables( + [ + AcqType, + ArchiveAcq, + ArchiveFile, + ArchiveFileCopy, + ArchiveFileCopyRequest, + FileReservation, + FileReservationTag, + FileType, + StorageGroup, + StorageNode, + ] + ) + + +@pytest.fixture +def test_data(tables): + """Create some test data""" + + # The only acquisition + acq = ArchiveAcq.create(name="acqpath", type=AcqType.create(name="acqtype")) + + # The file type + ftype = FileType.create(name="filetype") + + # Make the test (managed) node + test_node = StorageNode.create( + name="cedar_online", + group=StorageGroup.create(name="test_group"), + root="/test", + active=True, + storage_type="F", + min_avail_gb=1, + ) + + # All the source nodes are in the same group + group = StorageGroup.create(name="cedar_nearline") + + # Make a few source nodes + sources = [ + StorageNode.create( + name=name, + group=group, + root="/" + name, + active=True, + storage_type="A", + min_avail_gb=1, + ) + for name in ["src1", "src2", "src3"] + ] + + def _create_file(name, size, nodes): + """Create a file called `name` with + size `size` and add it to the `nodes` listed. + + File type is `ftype` + + Returns the created ArchiveFile + """ + nonlocal acq, ftype + + file_ = ArchiveFile.create(acq=acq, name=name, size_b=size, type=ftype) + + for node in nodes: + ArchiveFileCopy.create( + file=file_, node=node, has_file="Y", wants_file="Y", size_b=size + ) + + return file_ + + # Already on test node + file0 = _create_file("file0", 123456789, [test_node]) + + # Only on test node + _create_file("file1", 123456789, [test_node, sources[0]]) + + # Not on test node + _create_file("file2", 123456789, [sources[0]]) + + # On multiple sources + _create_file("file3", 123456789, sources) + + # Missing + _create_file("file4", 123456789, []) + + # Create some tags + tag1 = FileReservationTag.create(name="tag1") + FileReservationTag.create(name="tag2") + + # Pre-emptively reserve a file + FileReservation(file=file0, node=test_node, tag=tag1) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..1c0270c --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,22 @@ +"""Tests for chimefm.api.""" + +import chimefm.api as cfm +import chimedb.data_index as di + +from chimefm.tags import FileReservation, FileReservationTag + + +def test_add(test_data): + """Tests adding a new reservation.""" + + tag = FileReservationTag.get(name="tag1") + cedar = cfm.CedarManager(tag=tag) + + file2 = di.ArchiveFile.get(name="file2") + + result = cedar.reserve(di.ArchiveFile.get(name="file2")) + assert result is False + + # Check for reservation + res = FileReservation.get(file=file2, tag=tag) + assert res.node.name == "cedar_online"