diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py new file mode 100644 index 000000000..53b803eae --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -0,0 +1,227 @@ +import argparse +import logging +import subprocess +import sys +from pathlib import Path + +from clp_py_utils.clp_config import StorageType + +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + dump_container_config, + generate_container_config, + generate_container_name, + generate_container_start_cmd, + get_clp_home, + load_config_file, + validate_and_load_db_credentials_file, +) + +# Command/Argument Constants +from clp_package_utils.scripts.native.archive_manager import ( + BEGIN_TS_ARG, + BY_FILTER_COMMAND, + BY_IDS_COMMAND, + DEL_COMMAND, + DRY_RUN_ARG, + END_TS_ARG, + FIND_COMMAND, +) + +logger = logging.getLogger(__file__) + + +def _validate_timestamps(begin_ts, end_ts): + if begin_ts < 0: + logger.error("begin-ts must be non-negative.") + return False + if end_ts is not None and end_ts < 0: + logger.error("end-ts must be non-negative.") + return False + if end_ts is not None and begin_ts > end_ts: + logger.error("begin-ts must be <= end-ts.") + return False + return True + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + # Top-level parser and options + args_parser = argparse.ArgumentParser( + description="View list of archive IDs or delete compressed archives." + ) + args_parser.add_argument( + "--config", + "-c", + default=str(default_config_file_path), + help="CLP package configuration file.", + ) + + # Top-level commands + subparsers = args_parser.add_subparsers( + dest="subcommand", + required=True, + ) + find_parser = subparsers.add_parser( + FIND_COMMAND, + help="List IDs of archives.", + ) + del_parser = subparsers.add_parser( + DEL_COMMAND, + help="Delete archives from the database and file system.", + ) + + # Options for find subcommand + find_parser.add_argument( + BEGIN_TS_ARG, + dest="begin_ts", + type=int, + default=0, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + find_parser.add_argument( + END_TS_ARG, + dest="end_ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + # Options for delete subcommand + del_parser.add_argument( + DRY_RUN_ARG, + dest="dry_run", + action="store_true", + help="Only prints the archives to be deleted, without actually deleting them.", + ) + + # Subcommands for delete subcommand + del_subparsers = del_parser.add_subparsers( + dest="del_subcommand", + required=True, + ) + + # Delete by ID subcommand + del_id_parser = del_subparsers.add_parser( + BY_IDS_COMMAND, + help="Delete archives by ID.", + ) + + # Delete by ID arguments + del_id_parser.add_argument( + "ids", + nargs="+", + help="List of archive IDs to delete", + ) + + # Delete by filter subcommand + del_filter_parser = del_subparsers.add_parser( + BY_FILTER_COMMAND, + help="Delete archives that fall within the specified time range.", + ) + + # Delete by filter arguments + del_filter_parser.add_argument( + BEGIN_TS_ARG, + type=int, + default=0, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + del_filter_parser.add_argument( + END_TS_ARG, + type=int, + required=True, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + parsed_args = args_parser.parse_args(argv[1:]) + + begin_timestamp: int + end_timestamp: int + subcommand = parsed_args.subcommand + + # Validate and load config file + try: + config_file_path = Path(parsed_args.config) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + + # Validate and load necessary credentials + validate_and_load_db_credentials_file(clp_config, clp_home, False) + except: + logger.exception("Failed to load config.") + return -1 + + storage_type = clp_config.archive_output.storage.type + if StorageType.FS != storage_type: + logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") + return -1 + + # Validate input depending on subcommands + if DEL_COMMAND == subcommand and BY_FILTER_COMMAND == parsed_args.del_subcommand: + begin_timestamp = parsed_args.begin_ts + end_timestamp = parsed_args.end_ts + + # Validate the input timestamp + if not _validate_timestamps(begin_timestamp, end_timestamp): + return -1 + + elif FIND_COMMAND == subcommand: + begin_timestamp = parsed_args.begin_ts + end_timestamp = parsed_args.end_ts + + # Validate the input timestamp + if not _validate_timestamps(begin_timestamp, end_timestamp): + return -1 + + container_name = generate_container_name("archive-manager") + + container_clp_config, mounts = generate_container_config(clp_config, clp_home) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) + + necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) + + # fmt: off + archive_manager_cmd = [ + "python3", + "-m", "clp_package_utils.scripts.native.archive_manager", + "--config", str(generated_config_path_on_container), + str(subcommand), + ] + # fmt : on + # Add subcommand-specific arguments + if DEL_COMMAND == subcommand: + if parsed_args.dry_run: + archive_manager_cmd.append(DRY_RUN_ARG) + if BY_IDS_COMMAND == parsed_args.del_subcommand: + archive_manager_cmd.append(BY_IDS_COMMAND) + archive_manager_cmd.extend(parsed_args.ids) + elif BY_FILTER_COMMAND == parsed_args.del_subcommand: + archive_manager_cmd.extend([ + BY_FILTER_COMMAND, + str(begin_timestamp), + str(end_timestamp) + ]) + elif FIND_COMMAND == subcommand: + archive_manager_cmd.extend([BEGIN_TS_ARG, str(begin_timestamp)]) + if end_timestamp is not None: + archive_manager_cmd.extend([END_TS_ARG, str(end_timestamp)]) + + cmd = container_start_cmd + archive_manager_cmd + + subprocess.run(cmd, check=True) + + # Remove generated files + generated_config_path_on_host.unlink() + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/del_archives.py deleted file mode 100644 index 5b9bc6d97..000000000 --- a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py +++ /dev/null @@ -1,110 +0,0 @@ -import argparse -import logging -import subprocess -import sys -from pathlib import Path - -from clp_py_utils.clp_config import StorageType - -from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - dump_container_config, - generate_container_config, - generate_container_name, - generate_container_start_cmd, - get_clp_home, - load_config_file, - validate_and_load_db_credentials_file, -) - -logger = logging.getLogger(__file__) - - -def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH - - args_parser = argparse.ArgumentParser( - description="Deletes archives that fall within the specified time range." - ) - args_parser.add_argument( - "--config", - "-c", - default=str(default_config_file_path), - help="CLP package configuration file.", - ) - args_parser.add_argument( - "--begin-ts", - type=int, - default=0, - help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", - ) - args_parser.add_argument( - "--end-ts", - type=int, - required=True, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", - ) - parsed_args = args_parser.parse_args(argv[1:]) - - # Validate and load config file - try: - config_file_path = Path(parsed_args.config) - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - clp_config.validate_logs_dir() - - # Validate and load necessary credentials - validate_and_load_db_credentials_file(clp_config, clp_home, False) - except: - logger.exception("Failed to load config.") - return -1 - - storage_type = clp_config.archive_output.storage.type - if StorageType.FS != storage_type: - logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") - return -1 - - # Validate the input timestamp - begin_ts = parsed_args.begin_ts - end_ts = parsed_args.end_ts - if begin_ts > end_ts: - logger.error("begin-ts must be <= end-ts") - return -1 - if end_ts < 0 or begin_ts < 0: - logger.error("begin_ts and end_ts must be non-negative.") - return -1 - - container_name = generate_container_name("del-archives") - - container_clp_config, mounts = generate_container_config(clp_config, clp_home) - generated_config_path_on_container, generated_config_path_on_host = dump_container_config( - container_clp_config, clp_config, container_name - ) - - necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] - container_start_cmd = generate_container_start_cmd( - container_name, necessary_mounts, clp_config.execution_container - ) - - # fmt: off - del_archive_cmd = [ - "python3", - "-m", "clp_package_utils.scripts.native.del_archives", - "--config", str(generated_config_path_on_container), - str(begin_ts), - str(end_ts) - - ] - # fmt: on - - cmd = container_start_cmd + del_archive_cmd - subprocess.run(cmd, check=True) - - # Remove generated files - generated_config_path_on_host.unlink() - - return 0 - - -if "__main__" == __name__: - sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py new file mode 100644 index 000000000..980f2dce0 --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -0,0 +1,355 @@ +import argparse +import logging +import shutil +import sys +from abc import ABC, abstractmethod +from contextlib import closing +from pathlib import Path +from typing import List + +from clp_py_utils.clp_config import Database +from clp_py_utils.sql_adapter import SQL_Adapter + +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + get_clp_home, + load_config_file, +) + +# Command/Argument Constants +FIND_COMMAND = "find" +DEL_COMMAND = "del" +BY_IDS_COMMAND = "by-ids" +BY_FILTER_COMMAND = "by-filter" +BEGIN_TS_ARG = "--begin-ts" +END_TS_ARG = "--end-ts" +DRY_RUN_ARG = "--dry-run" + +logger = logging.getLogger(__file__) + + +class DeleteHandler(ABC): + def __init__(self, params: List[str]): + self._params = params + + def get_params(self) -> List[str]: + return self._params + + @abstractmethod + def get_criteria(self) -> str: ... + + @abstractmethod + def get_not_found_message(self) -> str: ... + + @abstractmethod + def validate_results(self, archive_ids: List[str]) -> None: ... + + +class FilterDeleteHandler(DeleteHandler): + def get_criteria(self) -> str: + return "begin_timestamp >= %s AND end_timestamp <= %s" + + def get_not_found_message(self) -> str: + return "No archives found within the specified time range." + + def validate_results(self, archive_ids: List[str]) -> None: + pass + + +class IdDeleteHandler(DeleteHandler): + def get_criteria(self) -> str: + return f"id in ({','.join(['%s'] * len(self._params))})" + + def get_not_found_message(self) -> str: + return "No archives found with matching IDs." + + def validate_results(self, archive_ids: List[str]) -> None: + not_found_ids = set(self._params) - set(archive_ids) + if not_found_ids: + logger.warning( + f""" + Archives with the following IDs don't exist: + {', '.join(not_found_ids)} + """ + ) + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + # Top-level parser and options + args_parser = argparse.ArgumentParser( + description="View list of archive IDs or delete compressed archives." + ) + args_parser.add_argument( + "--config", + "-c", + default=str(default_config_file_path), + help="CLP configuration file.", + ) + + # Top-level commands + subparsers = args_parser.add_subparsers( + dest="subcommand", + required=True, + ) + find_parser = subparsers.add_parser( + FIND_COMMAND, + help="List IDs of archives.", + ) + del_parser = subparsers.add_parser( + DEL_COMMAND, + help="Delete archives from the database and file system.", + ) + + # Options for find subcommand + find_parser.add_argument( + BEGIN_TS_ARG, + dest="begin_ts", + type=int, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + find_parser.add_argument( + END_TS_ARG, + dest="end_ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + # Options for delete subcommand + del_parser.add_argument( + DRY_RUN_ARG, + dest="dry_run", + action="store_true", + help="Preview delete without making changes. Lists errors and files to be deleted.", + ) + + # Subcommands for delete subcommands + del_subparsers = del_parser.add_subparsers( + dest="del_subcommand", + required=True, + ) + + # Delete by ID subcomand + del_id_parser = del_subparsers.add_parser( + BY_IDS_COMMAND, + help="Delete archives by ID.", + ) + + # Delete by ID arguments + del_id_parser.add_argument( + "ids", + nargs="+", + help="List of archive IDs to delete", + ) + + # Delete by filter subcommand + del_filter_parser = del_subparsers.add_parser( + BY_FILTER_COMMAND, + help="Deletes archives that fall within the specified time range.", + ) + + # Delete by filter arguments + del_filter_parser.add_argument( + "begin_ts", + type=int, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + del_filter_parser.add_argument( + "end_ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + parsed_args = args_parser.parse_args(argv[1:]) + + # Validate and load config file + config_file_path = Path(parsed_args.config) + try: + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + except: + logger.exception("Failed to load config.") + return -1 + + database_config = clp_config.database + archives_dir = clp_config.archive_output.get_directory() + if not archives_dir.exists(): + logger.error("`archive_output.directory` doesn't exist.") + return -1 + + if FIND_COMMAND == parsed_args.subcommand: + return _find_archives( + archives_dir, + database_config, + parsed_args.begin_ts, + parsed_args.end_ts, + ) + elif DEL_COMMAND == parsed_args.subcommand: + delete_handler: DeleteHandler + if BY_IDS_COMMAND == parsed_args.del_subcommand: + delete_handler = IdDeleteHandler(parsed_args.ids) + return _delete_archives( + archives_dir, + database_config, + delete_handler, + parsed_args.dry_run, + ) + elif BY_FILTER_COMMAND == parsed_args.del_subcommand: + delete_handler = FilterDeleteHandler([parsed_args.begin_ts, parsed_args.end_ts]) + return _delete_archives( + archives_dir, + database_config, + delete_handler, + parsed_args.dry_run, + ) + + +def _find_archives( + archives_dir: Path, + database_config: Database, + begin_ts: int, + end_ts: int = None, +) -> int: + """ + Lists all archive IDs, if begin_ts and end_ts are provided, + only lists archives where `begin_ts <= archive.begin_timestamp` and + `archive.end_timestamp <= end_ts`. + :param archives_dir: + :param database_config: + :param begin_ts: + :param end_ts: + """ + archive_ids: List[str] + logger.info("Starting to find archives from the database.") + try: + sql_adapter = SQL_Adapter(database_config) + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + params = (begin_ts,) + query = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" + if end_ts is not None: + query += " AND end_timestamp <= %s" + params = params + (end_ts,) + + db_cursor.execute(query, params) + results = db_cursor.fetchall() + + archive_ids = [result["id"] for result in results] + if 0 == len(archive_ids): + logger.info("No archives found within specified time range.") + return 0 + + logger.info(f"Found {len(archive_ids)} archives within the specified time range.") + for archive_id in archive_ids: + logger.info(archive_id) + archive_path = archives_dir / archive_id + if not archive_path.is_dir(): + logger.warning(f"Archive {archive_id} in database not found on disk.") + + except Exception: + logger.exception("Failed to find archives from the database.") + return -1 + + logger.info(f"Finished finding archives from the database.") + return 0 + + +def _delete_archives( + archives_dir: Path, + database_config: Database, + delete_handler: DeleteHandler, + dry_run: bool = False, +) -> int: + """ + Deletes archives from both metadata database and disk based on provided SQL query. + :param archives_dir: + :param database_config: + :param delete_handler: Object to handle differences between by-filter and by-ids delete types. + :param dry_run: If True, no changes will be made to the database or disk. + :return: 0 on success, -1 otherwise. + """ + + archive_ids: List[str] + logger.info("Starting to delete archives from the database.") + try: + sql_adapter = SQL_Adapter(database_config) + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + if dry_run: + logger.info("Running in dry-run mode.") + + criteria = delete_handler.get_criteria() + params = delete_handler.get_params() + + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}archives` + WHERE {criteria} + RETURNING id + """, + params, + ) + results = db_cursor.fetchall() + + if 0 == len(results): + logger.info(delete_handler.get_not_found_message) + return 0 + + archive_ids = [result["id"] for result in results] + delete_handler.validate_results(archive_ids) + + ids_string = ", ".join(f"'{archive_id}'" for archive_id in archive_ids) + + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}files` + WHERE archive_id in ({ids_string}) + """ + ) + + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}archive_tags` + WHERE archive_id in ({ids_string}) + """ + ) + for archive_id in archive_ids: + logger.info(f"Deleted archive {archive_id} from the database.") + + if dry_run: + logger.info("Dry-run finished.") + db_conn.rollback() + return 0 + + db_conn.commit() + + except Exception: + logger.exception("Failed to delete archives from the database. Aborting deletion.") + return -1 + + logger.info(f"Finished deleting archives from the database.") + + for archive_id in archive_ids: + archive_path = archives_dir / archive_id + if not archive_path.is_dir(): + logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") + continue + + logger.info(f"Deleting archive {archive_id} from disk.") + shutil.rmtree(archive_path) + + logger.info(f"Finished deleting archives from disk.") + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py deleted file mode 100644 index c489c3806..000000000 --- a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py +++ /dev/null @@ -1,139 +0,0 @@ -import argparse -import logging -import shutil -import sys -from contextlib import closing -from pathlib import Path -from typing import List - -from clp_py_utils.clp_config import Database -from clp_py_utils.sql_adapter import SQL_Adapter - -from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - get_clp_home, - load_config_file, -) - -logger = logging.getLogger(__file__) - - -def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH - - args_parser = argparse.ArgumentParser( - description="Deletes archives that fall within the specified time range." - ) - args_parser.add_argument( - "--config", - "-c", - required=True, - default=str(default_config_file_path), - help="CLP configuration file.", - ) - args_parser.add_argument( - "begin_ts", - type=int, - help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", - ) - args_parser.add_argument( - "end_ts", - type=int, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", - ) - parsed_args = args_parser.parse_args(argv[1:]) - - # Validate and load config file - config_file_path = Path(parsed_args.config) - try: - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - clp_config.validate_logs_dir() - except: - logger.exception("Failed to load config.") - return -1 - - database_config = clp_config.database - archives_dir = clp_config.archive_output.get_directory() - if not archives_dir.exists(): - logger.error("`archive_output.directory` doesn't exist.") - return -1 - - return _delete_archives( - archives_dir, - database_config, - parsed_args.begin_ts, - parsed_args.end_ts, - ) - - -def _delete_archives( - archives_dir: Path, - database_config: Database, - begin_ts: int, - end_ts: int, -) -> int: - """ - Deletes all archives where `begin_ts <= archive.begin_timestamp` and - `archive.end_timestamp <= end_ts` from both the metadata database and disk. - :param archives_dir: - :param database_config: - :param begin_ts: - :param end_ts: - :return: 0 on success, -1 otherwise. - """ - - archive_ids: List[str] - logger.info("Starting to delete archives from the database.") - try: - sql_adapter = SQL_Adapter(database_config) - clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) - table_prefix = clp_db_connection_params["table_prefix"] - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - db_cursor.execute( - f""" - DELETE FROM `{table_prefix}archives` - WHERE begin_timestamp >= %s AND end_timestamp <= %s - RETURNING id - """, - (begin_ts, end_ts), - ) - results = db_cursor.fetchall() - - if 0 == len(results): - logger.info("No archives (exclusively) within the specified time range.") - return 0 - - archive_ids = [result["id"] for result in results] - db_cursor.execute( - f""" - DELETE FROM `{table_prefix}files` - WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) - """, - archive_ids, - ) - db_conn.commit() - except Exception: - logger.exception("Failed to delete archives from the database. Aborting deletion.") - return -1 - - logger.info(f"Finished deleting archives from the database.") - - for archive_id in archive_ids: - archive_path = archives_dir / archive_id - if not archive_path.is_dir(): - logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") - continue - - logger.info(f"Deleting archive {archive_id} from disk.") - shutil.rmtree(archive_path) - - logger.info(f"Finished deleting archives from disk.") - - return 0 - - -if "__main__" == __name__: - sys.exit(main(sys.argv)) diff --git a/components/package-template/src/sbin/admin-tools/del-archives.sh b/components/package-template/src/sbin/admin-tools/archive-manager.sh similarity index 81% rename from components/package-template/src/sbin/admin-tools/del-archives.sh rename to components/package-template/src/sbin/admin-tools/archive-manager.sh index 4d7ebc6b7..8dc9cdecb 100755 --- a/components/package-template/src/sbin/admin-tools/del-archives.sh +++ b/components/package-template/src/sbin/admin-tools/archive-manager.sh @@ -5,5 +5,5 @@ package_root="$script_dir/../.." PYTHONPATH=$(readlink -f "$package_root/lib/python3/site-packages") \ python3 \ - -m clp_package_utils.scripts.del_archives \ + -m clp_package_utils.scripts.archive_manager \ "$@"