From 88acd44c524d8ff7659f593ae0c6712bbaed63f6 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 13 Jan 2025 18:22:32 +0000 Subject: [PATCH 01/32] Rework del_archives into archive manager --- .../scripts/archive_manager.py | 194 +++++++++++++ .../clp_package_utils/scripts/del_archives.py | 110 ------- .../scripts/native/archive_manager.py | 271 ++++++++++++++++++ .../scripts/native/del_archives.py | 139 --------- .../{del-archives.sh => archive-manager.sh} | 0 5 files changed, 465 insertions(+), 249 deletions(-) create mode 100644 components/clp-package-utils/clp_package_utils/scripts/archive_manager.py delete mode 100644 components/clp-package-utils/clp_package_utils/scripts/del_archives.py create mode 100644 components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py delete mode 100644 components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py rename components/package-template/src/sbin/admin-tools/{del-archives.sh => archive-manager.sh} (100%) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py new file mode 100644 index 000000000..f9ce7ba97 --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -0,0 +1,194 @@ +import argparse +import logging +import subprocess +import sys +from pathlib import Path + +from clp_py_utils.clp_config import StorageType + +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + dump_container_config, + generate_container_config, + generate_container_name, + generate_container_start_cmd, + get_clp_home, + load_config_file, + validate_and_load_db_credentials_file, +) + +logger = logging.getLogger(__file__) + +def validate_timestamps(begin_ts, end_ts): + if begin_ts > end_ts: + logger.error("begin-ts must be <= end-ts") + return False + if end_ts < 0 or begin_ts < 0: + logger.error("begin_ts and end_ts must be non-negative.") + return False + return True + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + # Top-level parser and options + args_parser = argparse.ArgumentParser( + description="View or delete archives." + ) + args_parser.add_argument( + "--config", + "-c", + default=str(default_config_file_path), + help="CLP package configuration file.", + ) + + # Top-level commands + subparsers = args_parser.add_subparsers( + dest="subcommand", + required=True, + ) + find_parser = subparsers.add_parser( + "find", + help="List IDs of archives.", + ) + del_parser = subparsers.add_parser( + "del", + help="Delete archives.", + ) + + # Find options + find_parser.add_argument( + "--begin-ts", + type=int, + default=0, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + find_parser.add_argument( + "--end-ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + # Delete options + del_parser.add_argument( + "--dry-run", + dest="dry_run", + action="store_true", + help="Preview delete without making changes. Lists errors and files to be deleted.", + ) + + # Delete subcommands + del_subparsers = del_parser.add_subparsers( + dest="del_subcommand", + required=True, + ) + del_id_parser = del_subparsers.add_parser( + "by-ids", + help="Delete archives by ID.", + ) + del_filter_parser = del_subparsers.add_parser( + "by-filter", + help="Delete archives within time frame.", + ) + + # Delete by ID arguments + del_id_parser.add_argument( + "ids", + nargs='+', + help="List of archive IDs to delete", + ) + + # Delete by filter arguments + del_filter_parser.add_argument( + "--begin-ts", + type=int, + default=0, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + del_filter_parser.add_argument( + "--end-ts", + type=int, + required=True, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + parsed_args = args_parser.parse_args(argv[1:]) + + # Validate and load config file + try: + config_file_path = Path(parsed_args.config) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + + # Validate and load necessary credentials + validate_and_load_db_credentials_file(clp_config, clp_home, False) + except: + logger.exception("Failed to load config.") + return -1 + + storage_type = clp_config.archive_output.storage.type + if StorageType.FS != storage_type: + logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") + return -1 + + # Validate input depending on subcommands + if "del" == parsed_args.subcommand: + if "by-filter" == parsed_args.del_subcommand: + + # Validate the input timestamp + if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): + return -1 + + elif "find" == parsed_args.subcommand: + if hasattr(parsed_args, 'end_ts'): + + # Validate the input timestamp + if not validate_timestamps(parsed_args.begins_ts, parsed_args.end_ts): + return -1 + + container_name = generate_container_name("archive-manager") + + container_clp_config, mounts = generate_container_config(clp_config, clp_home) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) + + necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) + + # fmt: off + del_archive_cmd = [ + "python3", + "-m", "clp_package_utils.scripts.native.archive_manager", + "--config", str(generated_config_path_on_container), + str(parsed_args.subcommand), + ] + + # Add subcommand-specific arguments + if "del" == parsed_args.subcommand: + if True == parsed_args.dry_run: + del_archive_cmd.extend("--dry-run") + if "by-ids" == parsed_args.del_subcommand: + del_archive_cmd.extend(parsed_args.ids) + elif "by-filter" == parsed_args.del_subcommand: + del_archive_cmd.extend([str(parsed_args.begin_ts). str(parsed_args.end_ts)]) + elif "find" == parsed_args.subcommand: + if hasattr(parsed_args, 'end_ts'): + del_archive_cmd.extend([str(parsed_args.begin_ts). str(parsed_args.end_ts)]) + + # fmt: on + + cmd = container_start_cmd + del_archive_cmd + subprocess.run(cmd, check=True) + + # Remove generated files + generated_config_path_on_host.unlink() + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/del_archives.py deleted file mode 100644 index 5b9bc6d97..000000000 --- a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py +++ /dev/null @@ -1,110 +0,0 @@ -import argparse -import logging -import subprocess -import sys -from pathlib import Path - -from clp_py_utils.clp_config import StorageType - -from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - dump_container_config, - generate_container_config, - generate_container_name, - generate_container_start_cmd, - get_clp_home, - load_config_file, - validate_and_load_db_credentials_file, -) - -logger = logging.getLogger(__file__) - - -def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH - - args_parser = argparse.ArgumentParser( - description="Deletes archives that fall within the specified time range." - ) - args_parser.add_argument( - "--config", - "-c", - default=str(default_config_file_path), - help="CLP package configuration file.", - ) - args_parser.add_argument( - "--begin-ts", - type=int, - default=0, - help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", - ) - args_parser.add_argument( - "--end-ts", - type=int, - required=True, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", - ) - parsed_args = args_parser.parse_args(argv[1:]) - - # Validate and load config file - try: - config_file_path = Path(parsed_args.config) - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - clp_config.validate_logs_dir() - - # Validate and load necessary credentials - validate_and_load_db_credentials_file(clp_config, clp_home, False) - except: - logger.exception("Failed to load config.") - return -1 - - storage_type = clp_config.archive_output.storage.type - if StorageType.FS != storage_type: - logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") - return -1 - - # Validate the input timestamp - begin_ts = parsed_args.begin_ts - end_ts = parsed_args.end_ts - if begin_ts > end_ts: - logger.error("begin-ts must be <= end-ts") - return -1 - if end_ts < 0 or begin_ts < 0: - logger.error("begin_ts and end_ts must be non-negative.") - return -1 - - container_name = generate_container_name("del-archives") - - container_clp_config, mounts = generate_container_config(clp_config, clp_home) - generated_config_path_on_container, generated_config_path_on_host = dump_container_config( - container_clp_config, clp_config, container_name - ) - - necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] - container_start_cmd = generate_container_start_cmd( - container_name, necessary_mounts, clp_config.execution_container - ) - - # fmt: off - del_archive_cmd = [ - "python3", - "-m", "clp_package_utils.scripts.native.del_archives", - "--config", str(generated_config_path_on_container), - str(begin_ts), - str(end_ts) - - ] - # fmt: on - - cmd = container_start_cmd + del_archive_cmd - subprocess.run(cmd, check=True) - - # Remove generated files - generated_config_path_on_host.unlink() - - return 0 - - -if "__main__" == __name__: - sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py new file mode 100644 index 000000000..aeedc9248 --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -0,0 +1,271 @@ +import argparse +import logging +import shutil +import sys +from contextlib import closing +from pathlib import Path +from typing import List + +from clp_py_utils.clp_config import Database +from clp_py_utils.sql_adapter import SQL_Adapter + +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + get_clp_home, + load_config_file, +) + +logger = logging.getLogger(__file__) + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + # Top-level parser and options + args_parser = argparse.ArgumentParser( + description="View or delete archives." + ) + args_parser.add_argument( + "--config", + "-c", + required=True, + default=str(default_config_file_path), + help="CLP configuration file.", + ) + + # Top-level commands + subparsers = args_parser.add_subparsers( + dest="subcommand", + required=True, + ) + find_parser = subparsers.add_parser( + "find", + help="List IDs of archives.", + ) + del_parser = subparsers.add_parser( + "del", + help="Delete archives.", + ) + + # Find options + find_parser.add_argument( + "begin_ts", + type=int, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + find_parser.add_argument( + "end_ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + # Delete options + del_parser.add_argument( + "--dry-run", + dest="drun_run", + action="store_true", + help="Preview delete without making changes. Lists errors and files to be deleted.", + ) + + # Delete subcommands + del_subparsers = del_parser.add_subparsers( + dest="del_subcommand", + required=True, + ) + del_id_parser = del_subparsers.add_parser( + "by-ids", + help="Delete archives by ID.", + ) + del_filter_parser = del_subparsers.add_parser( + "by-filter", + help="delte archives within time frame.", + ) + + # Delete by ID arguments + del_id_parser.add_argument( + "ids", + nargs='+', + help="List of archive IDs to delete", + ) + + # Delete by filter arguments + del_filter_parser.add_argument( + "begin_ts", + type=int, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + del_filter_parser.add_argument( + "end_ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + + parsed_args = args_parser.parse_args(argv[1:]) + + # Validate and load config file + config_file_path = Path(parsed_args.config) + try: + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + except: + logger.exception("Failed to load config.") + return -1 + + database_config = clp_config.database + archives_dir = clp_config.archive_output.get_directory() + if not archives_dir.exists(): + logger.error("`archive_output.directory` doesn't exist.") + return -1 + + if "find" == parsed_args.subcommand: + # not done yet + return 0 + elif "del" == parsed_args.subcommand: + if "by-ids" == parsed_args.del_subcommand: + return _delete_archives_by_ids( + archives_dir, + database_config, + parsed_args.ids, + ) + elif "by-filter" == parsed_args.del_subcommand: + return _delete_archives_by_filter( + archives_dir, + database_config, + parsed_args.begin_ts, + parsed_args.end_ts, + ) + + +def _delete_archives( + archives_dir: Path, + database_config: Database, + query: str, + params: tuple, + criteria: str, +) -> int: + """ + Deletes all archives where `begin_ts <= archive.begin_timestamp` and + `archive.end_timestamp <= end_ts` from both the metadata database and disk. + :param archives_dir: + :param database_config: + :param query: SQL query to select archives to delete. + :param params: Tuple of parameters for SQL query. + :return: 0 on success, -1 otherwise. + """ + + archive_ids: List[str] + logger.info("Starting to delete archives from the database.") + try: + sql_adapter = SQL_Adapter(database_config) + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + db_cursor.execute(query, params) + results = db_cursor.fetchall() + + if 0 == len(results): + if "filter" == criteria: + logger.info("No archives found within specified time frame.") + elif "ids" == criteria: + logger.info("No archives found with matching IDs.") + return 0 + + archive_ids = [result["id"] for result in results] + + if "ids" == criteria: + not_found_ids = set(params) - set(archive_ids) + if not_found_ids: + logger.warning(f"Failed to find archives with the following IDs: {', '.join(not_found_ids)}") + + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}files` + WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) + """, + archive_ids, + ) + + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}archive_tags` + WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) + """, + archive_ids, + ) + + db_conn.commit() + except Exception: + logger.exception("Failed to delete archives from the database. Aborting deletion.") + return -1 + + logger.info(f"Finished deleting archives from the database.") + + for archive_id in archive_ids: + archive_path = archives_dir / archive_id + if not archive_path.is_dir(): + logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") + continue + + logger.info(f"Deleting archive {archive_id} from disk.") + shutil.rmtree(archive_path) + + logger.info(f"Finished deleting archives from disk.") + + return 0 + +def _delete_archives_by_filter( + archives_dir: Path, + database_config: Database, + begin_ts: int, + end_ts: int, +) -> int: + """ + Deletes all archives where `begin_ts <= archive.begin_timestamp` and + `archive.end_timestamp <= end_ts` from both the metadata database and disk. + :param archives_dir: + :param database_config: + :param params: Tuple of parameters for SQL query. + :return: 0 on success, -1 otherwise. + """ + + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + + query = f""" + DELETE FROM `{table_prefix}archives` + WHERE begin_timestamp >= %s AND end_timestamp <= %s + RETURNING id + """ + + return _delete_archives(archives_dir, database_config, query, (begin_ts, end_ts), "filter") + +def _delete_archives_by_ids( + archives_dir: Path, + database_config: Database, + archive_ids: List[str], +) -> int: + """ + Deletes all archives with the specified IDs from both the metadata database and disk. + :param archives_dir: + :param database_config: + :param archive_ids: + :return: 0 on success, -1 otherwise. + """ + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + + query = f""" + DELETE FROM `{table_prefix}archives` + WHERE id in ({', '.join(['%s'] * len(archive_ids))}) + RETURNING id + """ + + return _delete_archives(archives_dir, database_config, query, (archive_ids), "ids") + + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py deleted file mode 100644 index c489c3806..000000000 --- a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py +++ /dev/null @@ -1,139 +0,0 @@ -import argparse -import logging -import shutil -import sys -from contextlib import closing -from pathlib import Path -from typing import List - -from clp_py_utils.clp_config import Database -from clp_py_utils.sql_adapter import SQL_Adapter - -from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - get_clp_home, - load_config_file, -) - -logger = logging.getLogger(__file__) - - -def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH - - args_parser = argparse.ArgumentParser( - description="Deletes archives that fall within the specified time range." - ) - args_parser.add_argument( - "--config", - "-c", - required=True, - default=str(default_config_file_path), - help="CLP configuration file.", - ) - args_parser.add_argument( - "begin_ts", - type=int, - help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", - ) - args_parser.add_argument( - "end_ts", - type=int, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", - ) - parsed_args = args_parser.parse_args(argv[1:]) - - # Validate and load config file - config_file_path = Path(parsed_args.config) - try: - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) - clp_config.validate_logs_dir() - except: - logger.exception("Failed to load config.") - return -1 - - database_config = clp_config.database - archives_dir = clp_config.archive_output.get_directory() - if not archives_dir.exists(): - logger.error("`archive_output.directory` doesn't exist.") - return -1 - - return _delete_archives( - archives_dir, - database_config, - parsed_args.begin_ts, - parsed_args.end_ts, - ) - - -def _delete_archives( - archives_dir: Path, - database_config: Database, - begin_ts: int, - end_ts: int, -) -> int: - """ - Deletes all archives where `begin_ts <= archive.begin_timestamp` and - `archive.end_timestamp <= end_ts` from both the metadata database and disk. - :param archives_dir: - :param database_config: - :param begin_ts: - :param end_ts: - :return: 0 on success, -1 otherwise. - """ - - archive_ids: List[str] - logger.info("Starting to delete archives from the database.") - try: - sql_adapter = SQL_Adapter(database_config) - clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) - table_prefix = clp_db_connection_params["table_prefix"] - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - db_cursor.execute( - f""" - DELETE FROM `{table_prefix}archives` - WHERE begin_timestamp >= %s AND end_timestamp <= %s - RETURNING id - """, - (begin_ts, end_ts), - ) - results = db_cursor.fetchall() - - if 0 == len(results): - logger.info("No archives (exclusively) within the specified time range.") - return 0 - - archive_ids = [result["id"] for result in results] - db_cursor.execute( - f""" - DELETE FROM `{table_prefix}files` - WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) - """, - archive_ids, - ) - db_conn.commit() - except Exception: - logger.exception("Failed to delete archives from the database. Aborting deletion.") - return -1 - - logger.info(f"Finished deleting archives from the database.") - - for archive_id in archive_ids: - archive_path = archives_dir / archive_id - if not archive_path.is_dir(): - logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") - continue - - logger.info(f"Deleting archive {archive_id} from disk.") - shutil.rmtree(archive_path) - - logger.info(f"Finished deleting archives from disk.") - - return 0 - - -if "__main__" == __name__: - sys.exit(main(sys.argv)) diff --git a/components/package-template/src/sbin/admin-tools/del-archives.sh b/components/package-template/src/sbin/admin-tools/archive-manager.sh similarity index 100% rename from components/package-template/src/sbin/admin-tools/del-archives.sh rename to components/package-template/src/sbin/admin-tools/archive-manager.sh From afd71d4fcf081543030907de583c3bfdd2d69f8a Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 13 Jan 2025 20:17:49 +0000 Subject: [PATCH 02/32] Implement find and dry-run --- .../scripts/archive_manager.py | 2 +- .../scripts/native/archive_manager.py | 74 ++++++++++++++++++- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index f9ce7ba97..f7579d8e5 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -144,7 +144,7 @@ def main(argv): if hasattr(parsed_args, 'end_ts'): # Validate the input timestamp - if not validate_timestamps(parsed_args.begins_ts, parsed_args.end_ts): + if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): return -1 container_name = generate_container_name("archive-manager") diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index aeedc9248..f2300cdcb 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -127,6 +127,7 @@ def main(argv): archives_dir, database_config, parsed_args.ids, + parsed_args.dry_run, ) elif "by-filter" == parsed_args.del_subcommand: return _delete_archives_by_filter( @@ -134,8 +135,60 @@ def main(argv): database_config, parsed_args.begin_ts, parsed_args.end_ts, + parsed_args.dry_run, ) +def _find_archives( + archives_dir: Path, + database_config: Database, + begin_ts: int = None, + end_ts: int = None, +) -> int: + """ + Lists all archive IDs, if begin_its and end_its are provided, + only lists archives where `begin_ts <= archive.begin_timestamp` and + `archive.end_timestamp <= end_ts`. + :param archives_dir: + :param database_config: + :param begin_ts: + :param end_ts: + """ + archive_ids: List[str] + logger.info("Starting to find archives from the database.") + try: + sql_adapter = SQL_Adapter(database_config) + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + + params = () + query = f"SELECT id FROM `{table_prefix}archives`" + if begin_ts is not None and end_ts is not None: + query += " WHERE begin_timestamp >= %s AND end_timestamp <= %s" + params = (begin_ts, end_ts) + + db_cursor.execute(query, params) + results = db_cursor.fetchall() + + archive_ids = [result["id"] for result in results] + + if 0 == len(archive_ids): + logger.info("No archives found within specified time range.") + return 0 + + logger.info(f"Found {len(archive_ids)} archives within specified time range.") + for archive_id in archive_ids: + logger.info(archive_id) + + except Exception: + logger.exception("Failed to find archives from the database. Aborting deletion.") + return -1 + + logger.info(f"Finished finding archives from the database.") + + return 0 def _delete_archives( archives_dir: Path, @@ -143,6 +196,7 @@ def _delete_archives( query: str, params: tuple, criteria: str, + dry_run: bool = False, ) -> int: """ Deletes all archives where `begin_ts <= archive.begin_timestamp` and @@ -163,6 +217,10 @@ def _delete_archives( with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: + + if dry_run: + logger.info("Running in dry-run mode. No changes will be made to the database.") + db_cursor.execute(query, params) results = db_cursor.fetchall() @@ -196,6 +254,11 @@ def _delete_archives( archive_ids, ) + if dry_run: + logger.info("Dry-run finished.") + db_conn.rollback() + return 0 + db_conn.commit() except Exception: logger.exception("Failed to delete archives from the database. Aborting deletion.") @@ -221,13 +284,16 @@ def _delete_archives_by_filter( database_config: Database, begin_ts: int, end_ts: int, + dry_run: bool = False, ) -> int: """ Deletes all archives where `begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts` from both the metadata database and disk. :param archives_dir: :param database_config: - :param params: Tuple of parameters for SQL query. + :param begin_ts: + :param end_ts: + :param dry_run: :return: 0 on success, -1 otherwise. """ @@ -240,18 +306,20 @@ def _delete_archives_by_filter( RETURNING id """ - return _delete_archives(archives_dir, database_config, query, (begin_ts, end_ts), "filter") + return _delete_archives(archives_dir, database_config, query, (begin_ts, end_ts), "filter", dry_run) def _delete_archives_by_ids( archives_dir: Path, database_config: Database, archive_ids: List[str], + dry_run: bool = False, ) -> int: """ Deletes all archives with the specified IDs from both the metadata database and disk. :param archives_dir: :param database_config: :param archive_ids: + :param dry_run: :return: 0 on success, -1 otherwise. """ clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) @@ -263,7 +331,7 @@ def _delete_archives_by_ids( RETURNING id """ - return _delete_archives(archives_dir, database_config, query, (archive_ids), "ids") + return _delete_archives(archives_dir, database_config, query, tuple(archive_ids), "ids", dry_run) From 370e78c6623c1341a0776db68aa9f1e19cddd4f4 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 13 Jan 2025 21:04:13 +0000 Subject: [PATCH 03/32] Fix del_archives reference --- .../package-template/src/sbin/admin-tools/archive-manager.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/package-template/src/sbin/admin-tools/archive-manager.sh b/components/package-template/src/sbin/admin-tools/archive-manager.sh index 4d7ebc6b7..8dc9cdecb 100755 --- a/components/package-template/src/sbin/admin-tools/archive-manager.sh +++ b/components/package-template/src/sbin/admin-tools/archive-manager.sh @@ -5,5 +5,5 @@ package_root="$script_dir/../.." PYTHONPATH=$(readlink -f "$package_root/lib/python3/site-packages") \ python3 \ - -m clp_package_utils.scripts.del_archives \ + -m clp_package_utils.scripts.archive_manager \ "$@" From 209b88347f5344ec80c485affebb64078d7aa17a Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 13 Jan 2025 21:57:12 +0000 Subject: [PATCH 04/32] fix check for end_ts --- .../clp_package_utils/scripts/archive_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index f7579d8e5..e87ba2e95 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -141,7 +141,7 @@ def main(argv): return -1 elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, 'end_ts'): + if hasattr(parsed_args, 'end_ts') and parsed_args.end_ts is not None: # Validate the input timestamp if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): @@ -176,7 +176,7 @@ def main(argv): elif "by-filter" == parsed_args.del_subcommand: del_archive_cmd.extend([str(parsed_args.begin_ts). str(parsed_args.end_ts)]) elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, 'end_ts'): + if hasattr(parsed_args, 'end_ts') and parsed_args.end_ts is not None: del_archive_cmd.extend([str(parsed_args.begin_ts). str(parsed_args.end_ts)]) # fmt: on From 757d358ee02b02266571bfc08df3acc1c2082aef Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Tue, 14 Jan 2025 18:27:51 +0000 Subject: [PATCH 05/32] Fix argument passing between scripts --- .../scripts/archive_manager.py | 9 +++--- .../scripts/native/archive_manager.py | 28 +++++++++++++------ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index e87ba2e95..488f1c7f4 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -170,14 +170,15 @@ def main(argv): # Add subcommand-specific arguments if "del" == parsed_args.subcommand: if True == parsed_args.dry_run: - del_archive_cmd.extend("--dry-run") + del_archive_cmd.extend(["--dry-run"]) if "by-ids" == parsed_args.del_subcommand: + del_archive_cmd.extend(["by-ids"]) del_archive_cmd.extend(parsed_args.ids) elif "by-filter" == parsed_args.del_subcommand: - del_archive_cmd.extend([str(parsed_args.begin_ts). str(parsed_args.end_ts)]) + del_archive_cmd.extend(["by-filter", str(parsed_args.begin_ts), str(parsed_args.end_ts)]) elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, 'end_ts') and parsed_args.end_ts is not None: - del_archive_cmd.extend([str(parsed_args.begin_ts). str(parsed_args.end_ts)]) + if hasattr(parsed_args, "end-ts") and parsed_args.end_ts is not None: + del_archive_cmd.extend(["--begin-ts", str(parsed_args.begin_ts), "--end-ts", str(parsed_args.end_ts)]) # fmt: on diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index f2300cdcb..ae2b0b933 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -50,12 +50,12 @@ def main(argv): # Find options find_parser.add_argument( - "begin_ts", + "--begin_ts", type=int, help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) find_parser.add_argument( - "end_ts", + "--end_ts", type=int, help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) @@ -63,7 +63,7 @@ def main(argv): # Delete options del_parser.add_argument( "--dry-run", - dest="drun_run", + dest="dry_run", action="store_true", help="Preview delete without making changes. Lists errors and files to be deleted.", ) @@ -119,8 +119,15 @@ def main(argv): return -1 if "find" == parsed_args.subcommand: - # not done yet - return 0 + if parsed_args.begin_ts is None and parsed_args.end_ts is None: + return _find_archives(archives_dir, database_config) + else: + return _find_archives( + archives_dir, + database_config, + parsed_args.begin_ts, + parsed_args.end_ts, + ) elif "del" == parsed_args.subcommand: if "by-ids" == parsed_args.del_subcommand: return _delete_archives_by_ids( @@ -194,7 +201,7 @@ def _delete_archives( archives_dir: Path, database_config: Database, query: str, - params: tuple, + params: List[str], criteria: str, dry_run: bool = False, ) -> int: @@ -204,7 +211,7 @@ def _delete_archives( :param archives_dir: :param database_config: :param query: SQL query to select archives to delete. - :param params: Tuple of parameters for SQL query. + :param params: List of parameters for SQL query. :return: 0 on success, -1 otherwise. """ @@ -233,6 +240,9 @@ def _delete_archives( archive_ids = [result["id"] for result in results] + for archive_id in archive_ids: + logger.info(f"Deleted archive {archive_id} from the database.") + if "ids" == criteria: not_found_ids = set(params) - set(archive_ids) if not_found_ids: @@ -306,7 +316,7 @@ def _delete_archives_by_filter( RETURNING id """ - return _delete_archives(archives_dir, database_config, query, (begin_ts, end_ts), "filter", dry_run) + return _delete_archives(archives_dir, database_config, query, [begin_ts, end_ts], "filter", dry_run) def _delete_archives_by_ids( archives_dir: Path, @@ -331,7 +341,7 @@ def _delete_archives_by_ids( RETURNING id """ - return _delete_archives(archives_dir, database_config, query, tuple(archive_ids), "ids", dry_run) + return _delete_archives(archives_dir, database_config, query, archive_ids, "ids", dry_run) From 5d49d9521a2a3b48ba0f321f27e138d627baa2a7 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 15 Jan 2025 19:48:07 +0000 Subject: [PATCH 06/32] Rename variable for consistency --- .../clp_package_utils/scripts/archive_manager.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 488f1c7f4..f67343b86 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -160,7 +160,7 @@ def main(argv): ) # fmt: off - del_archive_cmd = [ + archive_manager_cmd = [ "python3", "-m", "clp_package_utils.scripts.native.archive_manager", "--config", str(generated_config_path_on_container), @@ -170,19 +170,19 @@ def main(argv): # Add subcommand-specific arguments if "del" == parsed_args.subcommand: if True == parsed_args.dry_run: - del_archive_cmd.extend(["--dry-run"]) + archive_manager_cmd.extend(["--dry-run"]) if "by-ids" == parsed_args.del_subcommand: - del_archive_cmd.extend(["by-ids"]) - del_archive_cmd.extend(parsed_args.ids) + archive_manager_cmd.extend(["by-ids"]) + archive_manager_cmd.extend(parsed_args.ids) elif "by-filter" == parsed_args.del_subcommand: - del_archive_cmd.extend(["by-filter", str(parsed_args.begin_ts), str(parsed_args.end_ts)]) + archive_manager_cmd.extend(["by-filter", str(parsed_args.begin_ts), str(parsed_args.end_ts)]) elif "find" == parsed_args.subcommand: if hasattr(parsed_args, "end-ts") and parsed_args.end_ts is not None: - del_archive_cmd.extend(["--begin-ts", str(parsed_args.begin_ts), "--end-ts", str(parsed_args.end_ts)]) + archive_manager_cmd.extend(["--begin-ts", str(parsed_args.begin_ts), "--end-ts", str(parsed_args.end_ts)]) # fmt: on - cmd = container_start_cmd + del_archive_cmd + cmd = container_start_cmd + archive_manager_cmd subprocess.run(cmd, check=True) # Remove generated files From 497d2a49030d2566f098a52e5194e9b148d9b59f Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 15 Jan 2025 20:10:00 +0000 Subject: [PATCH 07/32] Fix find filter not working --- .../clp_package_utils/scripts/archive_manager.py | 7 +++++-- .../clp_package_utils/scripts/native/archive_manager.py | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index f67343b86..5561d910a 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -24,7 +24,7 @@ def validate_timestamps(begin_ts, end_ts): logger.error("begin-ts must be <= end-ts") return False if end_ts < 0 or begin_ts < 0: - logger.error("begin_ts and end_ts must be non-negative.") + logger.error("begin-ts and end-ts must be non-negative.") return False return True @@ -60,12 +60,14 @@ def main(argv): # Find options find_parser.add_argument( "--begin-ts", + dest="begin_ts", type=int, default=0, help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) find_parser.add_argument( "--end-ts", + dest="end_ts", type=int, help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) @@ -177,12 +179,13 @@ def main(argv): elif "by-filter" == parsed_args.del_subcommand: archive_manager_cmd.extend(["by-filter", str(parsed_args.begin_ts), str(parsed_args.end_ts)]) elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, "end-ts") and parsed_args.end_ts is not None: + if hasattr(parsed_args, "end_ts") and parsed_args.end_ts is not None: archive_manager_cmd.extend(["--begin-ts", str(parsed_args.begin_ts), "--end-ts", str(parsed_args.end_ts)]) # fmt: on cmd = container_start_cmd + archive_manager_cmd + subprocess.run(cmd, check=True) # Remove generated files diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index ae2b0b933..7906f4015 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -50,12 +50,14 @@ def main(argv): # Find options find_parser.add_argument( - "--begin_ts", + "--begin-ts", + dest="begin_ts", type=int, help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) find_parser.add_argument( - "--end_ts", + "--end-ts", + dest="end_ts", type=int, help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) From f48ea7c7f875de348679fa9181c924ff380ca29e Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 15 Jan 2025 20:28:51 +0000 Subject: [PATCH 08/32] Improve formatting --- .../clp_package_utils/scripts/native/archive_manager.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 7906f4015..232841217 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -248,7 +248,12 @@ def _delete_archives( if "ids" == criteria: not_found_ids = set(params) - set(archive_ids) if not_found_ids: - logger.warning(f"Failed to find archives with the following IDs: {', '.join(not_found_ids)}") + logger.warning( + f""" + Failed to find archives with the following IDs: + {', '.join(not_found_ids)} + """ + ) db_cursor.execute( f""" @@ -344,7 +349,6 @@ def _delete_archives_by_ids( """ return _delete_archives(archives_dir, database_config, query, archive_ids, "ids", dry_run) - if "__main__" == __name__: From 5f880af7789dc4822bc929be75cf3c9387280c8b Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 15 Jan 2025 20:53:27 +0000 Subject: [PATCH 09/32] Remove trailing whitespace --- .../clp_package_utils/scripts/archive_manager.py | 2 +- .../scripts/native/archive_manager.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 5561d910a..a7b802f12 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -141,7 +141,7 @@ def main(argv): # Validate the input timestamp if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): return -1 - + elif "find" == parsed_args.subcommand: if hasattr(parsed_args, 'end_ts') and parsed_args.end_ts is not None: diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 232841217..a9f7055de 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -159,7 +159,7 @@ def _find_archives( `archive.end_timestamp <= end_ts`. :param archives_dir: :param database_config: - :param begin_ts: + :param begin_ts: :param end_ts: """ archive_ids: List[str] @@ -171,7 +171,7 @@ def _find_archives( with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - + params = () query = f"SELECT id FROM `{table_prefix}archives`" if begin_ts is not None and end_ts is not None: @@ -186,7 +186,7 @@ def _find_archives( if 0 == len(archive_ids): logger.info("No archives found within specified time range.") return 0 - + logger.info(f"Found {len(archive_ids)} archives within specified time range.") for archive_id in archive_ids: logger.info(archive_id) @@ -226,7 +226,7 @@ def _delete_archives( with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - + if dry_run: logger.info("Running in dry-run mode. No changes will be made to the database.") @@ -250,7 +250,7 @@ def _delete_archives( if not_found_ids: logger.warning( f""" - Failed to find archives with the following IDs: + Failed to find archives with the following IDs: {', '.join(not_found_ids)} """ ) @@ -275,7 +275,7 @@ def _delete_archives( logger.info("Dry-run finished.") db_conn.rollback() return 0 - + db_conn.commit() except Exception: logger.exception("Failed to delete archives from the database. Aborting deletion.") From c210992a7adbec633f8ce8223b6290e112f0e734 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 15 Jan 2025 21:04:41 +0000 Subject: [PATCH 10/32] Linter reformats --- .../clp_package_utils/scripts/archive_manager.py | 10 +++++----- .../scripts/native/archive_manager.py | 14 +++++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index a7b802f12..c8cb8e621 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -19,6 +19,7 @@ logger = logging.getLogger(__file__) + def validate_timestamps(begin_ts, end_ts): if begin_ts > end_ts: logger.error("begin-ts must be <= end-ts") @@ -28,14 +29,13 @@ def validate_timestamps(begin_ts, end_ts): return False return True + def main(argv): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH # Top-level parser and options - args_parser = argparse.ArgumentParser( - description="View or delete archives." - ) + args_parser = argparse.ArgumentParser(description="View or delete archives.") args_parser.add_argument( "--config", "-c", @@ -97,7 +97,7 @@ def main(argv): # Delete by ID arguments del_id_parser.add_argument( "ids", - nargs='+', + nargs="+", help="List of archive IDs to delete", ) @@ -143,7 +143,7 @@ def main(argv): return -1 elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, 'end_ts') and parsed_args.end_ts is not None: + if hasattr(parsed_args, "end_ts") and parsed_args.end_ts is not None: # Validate the input timestamp if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index a9f7055de..a50a34bfd 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -23,9 +23,7 @@ def main(argv): default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH # Top-level parser and options - args_parser = argparse.ArgumentParser( - description="View or delete archives." - ) + args_parser = argparse.ArgumentParser(description="View or delete archives.") args_parser.add_argument( "--config", "-c", @@ -87,7 +85,7 @@ def main(argv): # Delete by ID arguments del_id_parser.add_argument( "ids", - nargs='+', + nargs="+", help="List of archive IDs to delete", ) @@ -147,6 +145,7 @@ def main(argv): parsed_args.dry_run, ) + def _find_archives( archives_dir: Path, database_config: Database, @@ -199,6 +198,7 @@ def _find_archives( return 0 + def _delete_archives( archives_dir: Path, database_config: Database, @@ -296,6 +296,7 @@ def _delete_archives( return 0 + def _delete_archives_by_filter( archives_dir: Path, database_config: Database, @@ -323,7 +324,10 @@ def _delete_archives_by_filter( RETURNING id """ - return _delete_archives(archives_dir, database_config, query, [begin_ts, end_ts], "filter", dry_run) + return _delete_archives( + archives_dir, database_config, query, [begin_ts, end_ts], "filter", dry_run + ) + def _delete_archives_by_ids( archives_dir: Path, From 812ce456d30cbea5095db01465fb367c5c790110 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 16 Jan 2025 19:20:54 +0000 Subject: [PATCH 11/32] Address review comments --- .../scripts/archive_manager.py | 64 +++++++++++-------- .../scripts/native/archive_manager.py | 58 +++++++++-------- 2 files changed, 68 insertions(+), 54 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index c8cb8e621..97dd7e633 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -17,10 +17,19 @@ validate_and_load_db_credentials_file, ) +# Command/Argument Constants +FIND_COMMAND = "find" +DEL_COMMAND = "del" +BY_IDS_COMMAND = "by-ids" +BY_FILTER_COMMAND = "by-filter" +BEGIN_TS_ARG = "--begin-ts" +END_TS_ARG = "--end-ts" +DRY_RUN_ARG = "--dry-run" + logger = logging.getLogger(__file__) -def validate_timestamps(begin_ts, end_ts): +def _validate_timestamps(begin_ts, end_ts): if begin_ts > end_ts: logger.error("begin-ts must be <= end-ts") return False @@ -49,24 +58,24 @@ def main(argv): required=True, ) find_parser = subparsers.add_parser( - "find", + FIND_COMMAND, help="List IDs of archives.", ) del_parser = subparsers.add_parser( - "del", + DEL_COMMAND, help="Delete archives.", ) # Find options find_parser.add_argument( - "--begin-ts", + BEGIN_TS_ARG, dest="begin_ts", type=int, default=0, help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) find_parser.add_argument( - "--end-ts", + END_TS_ARG, dest="end_ts", type=int, help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", @@ -74,7 +83,7 @@ def main(argv): # Delete options del_parser.add_argument( - "--dry-run", + DRY_RUN_ARG, dest="dry_run", action="store_true", help="Preview delete without making changes. Lists errors and files to be deleted.", @@ -86,11 +95,11 @@ def main(argv): required=True, ) del_id_parser = del_subparsers.add_parser( - "by-ids", + BY_IDS_COMMAND, help="Delete archives by ID.", ) del_filter_parser = del_subparsers.add_parser( - "by-filter", + BY_FILTER_COMMAND, help="Delete archives within time frame.", ) @@ -103,13 +112,13 @@ def main(argv): # Delete by filter arguments del_filter_parser.add_argument( - "--begin-ts", + BEGIN_TS_ARG, type=int, default=0, help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) del_filter_parser.add_argument( - "--end-ts", + END_TS_ARG, type=int, required=True, help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", @@ -135,18 +144,18 @@ def main(argv): return -1 # Validate input depending on subcommands - if "del" == parsed_args.subcommand: - if "by-filter" == parsed_args.del_subcommand: + if DEL_COMMAND == parsed_args.subcommand: + if BY_FILTER_COMMAND == parsed_args.del_subcommand: # Validate the input timestamp - if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): + if not _validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): return -1 - elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, "end_ts") and parsed_args.end_ts is not None: + elif FIND_COMMAND == parsed_args.subcommand: + if parsed_args.end_ts is not None: # Validate the input timestamp - if not validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): + if not _validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): return -1 container_name = generate_container_name("archive-manager") @@ -168,21 +177,20 @@ def main(argv): "--config", str(generated_config_path_on_container), str(parsed_args.subcommand), ] - + # fmt : on # Add subcommand-specific arguments - if "del" == parsed_args.subcommand: + if DEL_COMMAND == parsed_args.subcommand: if True == parsed_args.dry_run: - archive_manager_cmd.extend(["--dry-run"]) - if "by-ids" == parsed_args.del_subcommand: - archive_manager_cmd.extend(["by-ids"]) + archive_manager_cmd.append(DRY_RUN_ARG) + if BY_IDS_COMMAND == parsed_args.del_subcommand: + archive_manager_cmd.append(BY_IDS_COMMAND) archive_manager_cmd.extend(parsed_args.ids) - elif "by-filter" == parsed_args.del_subcommand: - archive_manager_cmd.extend(["by-filter", str(parsed_args.begin_ts), str(parsed_args.end_ts)]) - elif "find" == parsed_args.subcommand: - if hasattr(parsed_args, "end_ts") and parsed_args.end_ts is not None: - archive_manager_cmd.extend(["--begin-ts", str(parsed_args.begin_ts), "--end-ts", str(parsed_args.end_ts)]) - - # fmt: on + elif BY_FILTER_COMMAND == parsed_args.del_subcommand: + archive_manager_cmd.extend([BY_FILTER_COMMAND, str(parsed_args.begin_ts), str(parsed_args.end_ts)]) + elif FIND_COMMAND == parsed_args.subcommand: + archive_manager_cmd.extend([BEGIN_TS_ARG, str(parsed_args.begin_ts)]) + if parsed_args.end_ts is not None: + archive_manager_cmd.extend([END_TS_ARG, str(parsed_args.end_ts)]) cmd = container_start_cmd + archive_manager_cmd diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index a50a34bfd..d46cfe54a 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -15,6 +15,15 @@ load_config_file, ) +# Command/Argument Constants +FIND_COMMAND = "find" +DEL_COMMAND = "del" +BY_IDS_COMMAND = "by-ids" +BY_FILTER_COMMAND = "by-filter" +BEGIN_TS_ARG = "--begin-ts" +END_TS_ARG = "--end-ts" +DRY_RUN_ARG = "--dry-run" + logger = logging.getLogger(__file__) @@ -38,23 +47,23 @@ def main(argv): required=True, ) find_parser = subparsers.add_parser( - "find", + FIND_COMMAND, help="List IDs of archives.", ) del_parser = subparsers.add_parser( - "del", + DEL_COMMAND, help="Delete archives.", ) # Find options find_parser.add_argument( - "--begin-ts", + BEGIN_TS_ARG, dest="begin_ts", type=int, help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", ) find_parser.add_argument( - "--end-ts", + END_TS_ARG, dest="end_ts", type=int, help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", @@ -62,7 +71,7 @@ def main(argv): # Delete options del_parser.add_argument( - "--dry-run", + DRY_RUN_ARG, dest="dry_run", action="store_true", help="Preview delete without making changes. Lists errors and files to be deleted.", @@ -74,11 +83,11 @@ def main(argv): required=True, ) del_id_parser = del_subparsers.add_parser( - "by-ids", + BY_IDS_COMMAND, help="Delete archives by ID.", ) del_filter_parser = del_subparsers.add_parser( - "by-filter", + BY_FILTER_COMMAND, help="delte archives within time frame.", ) @@ -118,25 +127,22 @@ def main(argv): logger.error("`archive_output.directory` doesn't exist.") return -1 - if "find" == parsed_args.subcommand: - if parsed_args.begin_ts is None and parsed_args.end_ts is None: - return _find_archives(archives_dir, database_config) - else: - return _find_archives( + if FIND_COMMAND == parsed_args.subcommand: + return _find_archives( archives_dir, database_config, parsed_args.begin_ts, parsed_args.end_ts, ) - elif "del" == parsed_args.subcommand: - if "by-ids" == parsed_args.del_subcommand: + elif DEL_COMMAND == parsed_args.subcommand: + if BY_IDS_COMMAND == parsed_args.del_subcommand: return _delete_archives_by_ids( archives_dir, database_config, parsed_args.ids, parsed_args.dry_run, ) - elif "by-filter" == parsed_args.del_subcommand: + elif BY_FILTER_COMMAND == parsed_args.del_subcommand: return _delete_archives_by_filter( archives_dir, database_config, @@ -149,7 +155,7 @@ def main(argv): def _find_archives( archives_dir: Path, database_config: Database, - begin_ts: int = None, + begin_ts: int, end_ts: int = None, ) -> int: """ @@ -171,10 +177,10 @@ def _find_archives( db_conn.cursor(dictionary=True) ) as db_cursor: - params = () - query = f"SELECT id FROM `{table_prefix}archives`" - if begin_ts is not None and end_ts is not None: - query += " WHERE begin_timestamp >= %s AND end_timestamp <= %s" + params = (begin_ts,) + query = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" + if end_ts is not None: + query += " AND end_timestamp <= %s" params = (begin_ts, end_ts) db_cursor.execute(query, params) @@ -255,20 +261,20 @@ def _delete_archives( """ ) + id_string = ", ".join(f"'{archive_id}'" for archive_id in archive_ids) + db_cursor.execute( f""" DELETE FROM `{table_prefix}files` - WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) - """, - archive_ids, + WHERE archive_id in ({id_string}) + """ ) db_cursor.execute( f""" DELETE FROM `{table_prefix}archive_tags` - WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) - """, - archive_ids, + WHERE archive_id in ({id_string}) + """ ) if dry_run: From 08b8f80a704a55b3278445a082d8ef8659d8fbaf Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 16 Jan 2025 19:24:46 +0000 Subject: [PATCH 12/32] Fix lint --- .../scripts/native/archive_manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index d46cfe54a..6c84cb1b1 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -129,11 +129,11 @@ def main(argv): if FIND_COMMAND == parsed_args.subcommand: return _find_archives( - archives_dir, - database_config, - parsed_args.begin_ts, - parsed_args.end_ts, - ) + archives_dir, + database_config, + parsed_args.begin_ts, + parsed_args.end_ts, + ) elif DEL_COMMAND == parsed_args.subcommand: if BY_IDS_COMMAND == parsed_args.del_subcommand: return _delete_archives_by_ids( From 59a910aed32ede6c55a5cd070cb085f679579e42 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 16 Jan 2025 19:31:46 +0000 Subject: [PATCH 13/32] Fix Coderabbit suggestions --- .../clp_package_utils/scripts/archive_manager.py | 2 +- .../scripts/native/archive_manager.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 97dd7e633..bdc512cc6 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -180,7 +180,7 @@ def main(argv): # fmt : on # Add subcommand-specific arguments if DEL_COMMAND == parsed_args.subcommand: - if True == parsed_args.dry_run: + if parsed_args.dry_run: archive_manager_cmd.append(DRY_RUN_ARG) if BY_IDS_COMMAND == parsed_args.del_subcommand: archive_manager_cmd.append(BY_IDS_COMMAND) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 6c84cb1b1..5aa4e1ddc 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -36,7 +36,6 @@ def main(argv): args_parser.add_argument( "--config", "-c", - required=True, default=str(default_config_file_path), help="CLP configuration file.", ) @@ -88,7 +87,7 @@ def main(argv): ) del_filter_parser = del_subparsers.add_parser( BY_FILTER_COMMAND, - help="delte archives within time frame.", + help="delete archives within time frame.", ) # Delete by ID arguments @@ -197,7 +196,7 @@ def _find_archives( logger.info(archive_id) except Exception: - logger.exception("Failed to find archives from the database. Aborting deletion.") + logger.exception("Failed to find archives from the database.") return -1 logger.info(f"Finished finding archives from the database.") @@ -214,12 +213,13 @@ def _delete_archives( dry_run: bool = False, ) -> int: """ - Deletes all archives where `begin_ts <= archive.begin_timestamp` and - `archive.end_timestamp <= end_ts` from both the metadata database and disk. + Deletes archives from both metadata database and disk based on provided SQL query. :param archives_dir: :param database_config: :param query: SQL query to select archives to delete. :param params: List of parameters for SQL query. + :param criteria: Type of deletion criteria. Either "filter" or "ids". + :param dry_run: If True, no changes will be made to the database or disk. :return: 0 on success, -1 otherwise. """ @@ -248,9 +248,6 @@ def _delete_archives( archive_ids = [result["id"] for result in results] - for archive_id in archive_ids: - logger.info(f"Deleted archive {archive_id} from the database.") - if "ids" == criteria: not_found_ids = set(params) - set(archive_ids) if not_found_ids: @@ -283,6 +280,9 @@ def _delete_archives( return 0 db_conn.commit() + for archive_id in archive_ids: + logger.info(f"Deleted archive {archive_id} from the database.") + except Exception: logger.exception("Failed to delete archives from the database. Aborting deletion.") return -1 From 2b841fae9820ab3fafb3d7b82da7bf7769fdb7d8 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 22 Jan 2025 15:13:16 +0000 Subject: [PATCH 14/32] Implement review suggestions --- .../scripts/archive_manager.py | 89 +++++++---- .../scripts/native/archive_manager.py | 145 +++++++----------- 2 files changed, 108 insertions(+), 126 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index bdc512cc6..e53b6b86a 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -18,23 +18,28 @@ ) # Command/Argument Constants -FIND_COMMAND = "find" -DEL_COMMAND = "del" -BY_IDS_COMMAND = "by-ids" -BY_FILTER_COMMAND = "by-filter" -BEGIN_TS_ARG = "--begin-ts" -END_TS_ARG = "--end-ts" -DRY_RUN_ARG = "--dry-run" +from clp_package_utils.scripts.native.archive_manager import ( + FIND_COMMAND, + DEL_COMMAND, + BY_IDS_COMMAND, + BY_FILTER_COMMAND, + BEGIN_TS_ARG, + END_TS_ARG, + DRY_RUN_ARG, +) logger = logging.getLogger(__file__) def _validate_timestamps(begin_ts, end_ts): - if begin_ts > end_ts: - logger.error("begin-ts must be <= end-ts") + if begin_ts is not None and begin_ts < 0: + logger.error("begin-ts must be non-negative.") return False - if end_ts < 0 or begin_ts < 0: - logger.error("begin-ts and end-ts must be non-negative.") + if end_ts is not None and end_ts < 0: + logger.error("end-ts must be non-negative.") + return False + if begin_ts is not None and end_ts is not None and begin_ts > end_ts: + logger.error("begin-ts must be <= end-ts") return False return True @@ -66,7 +71,7 @@ def main(argv): help="Delete archives.", ) - # Find options + # Options for find subcommand find_parser.add_argument( BEGIN_TS_ARG, dest="begin_ts", @@ -81,27 +86,25 @@ def main(argv): help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) - # Delete options + # Options for delete subcommand del_parser.add_argument( DRY_RUN_ARG, dest="dry_run", action="store_true", - help="Preview delete without making changes. Lists errors and files to be deleted.", + help="Only prints the archives to be deleted, without actually deleting them.", ) - # Delete subcommands + # Subcommands for delete subcommand del_subparsers = del_parser.add_subparsers( dest="del_subcommand", required=True, ) + + # Delete by ID subcommand del_id_parser = del_subparsers.add_parser( BY_IDS_COMMAND, help="Delete archives by ID.", ) - del_filter_parser = del_subparsers.add_parser( - BY_FILTER_COMMAND, - help="Delete archives within time frame.", - ) # Delete by ID arguments del_id_parser.add_argument( @@ -109,6 +112,12 @@ def main(argv): nargs="+", help="List of archive IDs to delete", ) + + # Delete by filter subcommand + del_filter_parser = del_subparsers.add_parser( + BY_FILTER_COMMAND, + help="Delete archives that fall within the specified time range.", + ) # Delete by filter arguments del_filter_parser.add_argument( @@ -126,6 +135,10 @@ def main(argv): parsed_args = args_parser.parse_args(argv[1:]) + begin_timestamp = None + end_timestamp = None + subcommand = parsed_args.subcommand + # Validate and load config file try: config_file_path = Path(parsed_args.config) @@ -144,18 +157,22 @@ def main(argv): return -1 # Validate input depending on subcommands - if DEL_COMMAND == parsed_args.subcommand: - if BY_FILTER_COMMAND == parsed_args.del_subcommand: + if DEL_COMMAND == subcommand and BY_FILTER_COMMAND == parsed_args.del_subcommand: + begin_timestamp = parsed_args.begin_ts + end_timestamp = parsed_args.end_ts - # Validate the input timestamp - if not _validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): - return -1 + # Validate the input timestamp + if not _validate_timestamps(begin_timestamp, end_timestamp): + return -1 + + elif FIND_COMMAND == subcommand: + begin_timestamp = parsed_args.begin_ts + end_timestamp = parsed_args.end_ts - elif FIND_COMMAND == parsed_args.subcommand: - if parsed_args.end_ts is not None: + if begin_timestamp is not None or end_timestamp is not None: # Validate the input timestamp - if not _validate_timestamps(parsed_args.begin_ts, parsed_args.end_ts): + if not _validate_timestamps(begin_timestamp, end_timestamp): return -1 container_name = generate_container_name("archive-manager") @@ -175,22 +192,26 @@ def main(argv): "python3", "-m", "clp_package_utils.scripts.native.archive_manager", "--config", str(generated_config_path_on_container), - str(parsed_args.subcommand), + str(subcommand), ] # fmt : on # Add subcommand-specific arguments - if DEL_COMMAND == parsed_args.subcommand: + if DEL_COMMAND == subcommand: if parsed_args.dry_run: archive_manager_cmd.append(DRY_RUN_ARG) if BY_IDS_COMMAND == parsed_args.del_subcommand: archive_manager_cmd.append(BY_IDS_COMMAND) archive_manager_cmd.extend(parsed_args.ids) elif BY_FILTER_COMMAND == parsed_args.del_subcommand: - archive_manager_cmd.extend([BY_FILTER_COMMAND, str(parsed_args.begin_ts), str(parsed_args.end_ts)]) - elif FIND_COMMAND == parsed_args.subcommand: - archive_manager_cmd.extend([BEGIN_TS_ARG, str(parsed_args.begin_ts)]) - if parsed_args.end_ts is not None: - archive_manager_cmd.extend([END_TS_ARG, str(parsed_args.end_ts)]) + archive_manager_cmd.extend([ + BY_FILTER_COMMAND, + str(begin_timestamp), + str(end_timestamp) + ]) + elif FIND_COMMAND == subcommand: + archive_manager_cmd.extend([BEGIN_TS_ARG, str(begin_timestamp)]) + if end_timestamp is not None: + archive_manager_cmd.extend([END_TS_ARG, str(end_timestamp)]) cmd = container_start_cmd + archive_manager_cmd diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 5aa4e1ddc..315549644 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -32,7 +32,9 @@ def main(argv): default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH # Top-level parser and options - args_parser = argparse.ArgumentParser(description="View or delete archives.") + args_parser = argparse.ArgumentParser( + description="View list of archive IDs or delete compressed archives." + ) args_parser.add_argument( "--config", "-c", @@ -51,10 +53,10 @@ def main(argv): ) del_parser = subparsers.add_parser( DEL_COMMAND, - help="Delete archives.", + help="Delete archives from the database and file system.", ) - # Find options + # Options for find subcommand find_parser.add_argument( BEGIN_TS_ARG, dest="begin_ts", @@ -68,7 +70,7 @@ def main(argv): help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) - # Delete options + # Options for delete subcommand del_parser.add_argument( DRY_RUN_ARG, dest="dry_run", @@ -76,19 +78,17 @@ def main(argv): help="Preview delete without making changes. Lists errors and files to be deleted.", ) - # Delete subcommands + # Subcommands for delete subcommands del_subparsers = del_parser.add_subparsers( dest="del_subcommand", required=True, ) + + # Delete by ID subcomand del_id_parser = del_subparsers.add_parser( BY_IDS_COMMAND, help="Delete archives by ID.", ) - del_filter_parser = del_subparsers.add_parser( - BY_FILTER_COMMAND, - help="delete archives within time frame.", - ) # Delete by ID arguments del_id_parser.add_argument( @@ -97,6 +97,12 @@ def main(argv): help="List of archive IDs to delete", ) + # Delete by filter subcommand + del_filter_parser = del_subparsers.add_parser( + BY_FILTER_COMMAND, + help="delete archives within time frame.", + ) + # Delete by filter arguments del_filter_parser.add_argument( "begin_ts", @@ -135,18 +141,19 @@ def main(argv): ) elif DEL_COMMAND == parsed_args.subcommand: if BY_IDS_COMMAND == parsed_args.del_subcommand: - return _delete_archives_by_ids( + return _delete_archives( archives_dir, database_config, parsed_args.ids, + BY_IDS_COMMAND, parsed_args.dry_run, ) elif BY_FILTER_COMMAND == parsed_args.del_subcommand: - return _delete_archives_by_filter( + return _delete_archives( archives_dir, database_config, - parsed_args.begin_ts, - parsed_args.end_ts, + [parsed_args.begin_ts, parsed_args.end_ts], + BY_FILTER_COMMAND, parsed_args.dry_run, ) @@ -175,23 +182,21 @@ def _find_archives( with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - params = (begin_ts,) query = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" if end_ts is not None: query += " AND end_timestamp <= %s" - params = (begin_ts, end_ts) + params = params + (end_ts,) db_cursor.execute(query, params) results = db_cursor.fetchall() archive_ids = [result["id"] for result in results] - if 0 == len(archive_ids): logger.info("No archives found within specified time range.") return 0 - logger.info(f"Found {len(archive_ids)} archives within specified time range.") + logger.info(f"Found {len(archive_ids)} archives within the specified time range.") for archive_id in archive_ids: logger.info(archive_id) @@ -201,15 +206,19 @@ def _find_archives( logger.info(f"Finished finding archives from the database.") + for archive_id in archive_ids: + archive_path = archives_dir / archive_id + if not archive_path.is_dir(): + logger.warning(f"Archive {archive_id} in database not found on disk.") + return 0 def _delete_archives( archives_dir: Path, database_config: Database, - query: str, params: List[str], - criteria: str, + command: str, dry_run: bool = False, ) -> int: """ @@ -218,7 +227,7 @@ def _delete_archives( :param database_config: :param query: SQL query to select archives to delete. :param params: List of parameters for SQL query. - :param criteria: Type of deletion criteria. Either "filter" or "ids". + :param command: Delete subcommand. Either "filter" or "ids". :param dry_run: If True, no changes will be made to the database or disk. :return: 0 on success, -1 otherwise. """ @@ -232,47 +241,59 @@ def _delete_archives( with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - if dry_run: - logger.info("Running in dry-run mode. No changes will be made to the database.") - - db_cursor.execute(query, params) + logger.info("Running in dry-run mode.") + + criteria: str + if BY_FILTER_COMMAND == command: + criteria = "begin_timestamp >= %s AND end_timestamp <= %s" + elif BY_IDS_COMMAND == command: + criteria = "id in (%s)" % ','.join(['%s'] * len(params)) + + db_cursor.execute(f""" + DELETE FROM `{table_prefix}archives` + WHERE {criteria} + RETURNING id + """, + params + ) results = db_cursor.fetchall() if 0 == len(results): - if "filter" == criteria: + if BY_FILTER_COMMAND == command: logger.info("No archives found within specified time frame.") - elif "ids" == criteria: + elif BY_IDS_COMMAND == command: logger.info("No archives found with matching IDs.") return 0 archive_ids = [result["id"] for result in results] - - if "ids" == criteria: + if "ids" == command: not_found_ids = set(params) - set(archive_ids) if not_found_ids: logger.warning( f""" - Failed to find archives with the following IDs: + Archives with the following IDs don't exist: {', '.join(not_found_ids)} """ ) - id_string = ", ".join(f"'{archive_id}'" for archive_id in archive_ids) + ids_string = ", ".join(f"'{archive_id}'" for archive_id in archive_ids) db_cursor.execute( f""" DELETE FROM `{table_prefix}files` - WHERE archive_id in ({id_string}) + WHERE archive_id in ({ids_string}) """ ) db_cursor.execute( f""" DELETE FROM `{table_prefix}archive_tags` - WHERE archive_id in ({id_string}) + WHERE archive_id in ({ids_string}) """ ) + for archive_id in archive_ids: + logger.info(f"Deleted archive {archive_id} from the database.") if dry_run: logger.info("Dry-run finished.") @@ -280,8 +301,6 @@ def _delete_archives( return 0 db_conn.commit() - for archive_id in archive_ids: - logger.info(f"Deleted archive {archive_id} from the database.") except Exception: logger.exception("Failed to delete archives from the database. Aborting deletion.") @@ -303,63 +322,5 @@ def _delete_archives( return 0 -def _delete_archives_by_filter( - archives_dir: Path, - database_config: Database, - begin_ts: int, - end_ts: int, - dry_run: bool = False, -) -> int: - """ - Deletes all archives where `begin_ts <= archive.begin_timestamp` and - `archive.end_timestamp <= end_ts` from both the metadata database and disk. - :param archives_dir: - :param database_config: - :param begin_ts: - :param end_ts: - :param dry_run: - :return: 0 on success, -1 otherwise. - """ - - clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) - table_prefix = clp_db_connection_params["table_prefix"] - - query = f""" - DELETE FROM `{table_prefix}archives` - WHERE begin_timestamp >= %s AND end_timestamp <= %s - RETURNING id - """ - - return _delete_archives( - archives_dir, database_config, query, [begin_ts, end_ts], "filter", dry_run - ) - - -def _delete_archives_by_ids( - archives_dir: Path, - database_config: Database, - archive_ids: List[str], - dry_run: bool = False, -) -> int: - """ - Deletes all archives with the specified IDs from both the metadata database and disk. - :param archives_dir: - :param database_config: - :param archive_ids: - :param dry_run: - :return: 0 on success, -1 otherwise. - """ - clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) - table_prefix = clp_db_connection_params["table_prefix"] - - query = f""" - DELETE FROM `{table_prefix}archives` - WHERE id in ({', '.join(['%s'] * len(archive_ids))}) - RETURNING id - """ - - return _delete_archives(archives_dir, database_config, query, archive_ids, "ids", dry_run) - - if "__main__" == __name__: sys.exit(main(sys.argv)) From 89299caadb4feb799aaa389f88df5bb7d52d4444 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 22 Jan 2025 18:09:42 +0000 Subject: [PATCH 15/32] Lint fixes --- .../clp_package_utils/scripts/archive_manager.py | 12 ++++++------ .../scripts/native/archive_manager.py | 7 ++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index e53b6b86a..d6a1cf368 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -19,13 +19,13 @@ # Command/Argument Constants from clp_package_utils.scripts.native.archive_manager import ( - FIND_COMMAND, - DEL_COMMAND, - BY_IDS_COMMAND, - BY_FILTER_COMMAND, BEGIN_TS_ARG, - END_TS_ARG, + BY_FILTER_COMMAND, + BY_IDS_COMMAND, + DEL_COMMAND, DRY_RUN_ARG, + END_TS_ARG, + FIND_COMMAND, ) logger = logging.getLogger(__file__) @@ -112,7 +112,7 @@ def main(argv): nargs="+", help="List of archive IDs to delete", ) - + # Delete by filter subcommand del_filter_parser = del_subparsers.add_parser( BY_FILTER_COMMAND, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 315549644..13e681ee5 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -248,14 +248,15 @@ def _delete_archives( if BY_FILTER_COMMAND == command: criteria = "begin_timestamp >= %s AND end_timestamp <= %s" elif BY_IDS_COMMAND == command: - criteria = "id in (%s)" % ','.join(['%s'] * len(params)) + criteria = "id in (%s)" % ",".join(["%s"] * len(params)) - db_cursor.execute(f""" + db_cursor.execute( + f""" DELETE FROM `{table_prefix}archives` WHERE {criteria} RETURNING id """, - params + params, ) results = db_cursor.fetchall() From f2b55425511c5656804ea31c21d6cc18f5b9c301 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 22 Jan 2025 18:14:08 +0000 Subject: [PATCH 16/32] Replace "ids" with BY_IDS_COMMAND --- .../clp_package_utils/scripts/native/archive_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 13e681ee5..51ba3f900 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -268,7 +268,7 @@ def _delete_archives( return 0 archive_ids = [result["id"] for result in results] - if "ids" == command: + if BY_IDS_COMMAND == command: not_found_ids = set(params) - set(archive_ids) if not_found_ids: logger.warning( From f9f4f8e659c5a439ec6ef35525ecf67f1c5e5cac Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 23 Jan 2025 18:26:36 +0000 Subject: [PATCH 17/32] Add delete handler class --- .../scripts/archive_manager.py | 24 ++--- .../scripts/native/archive_manager.py | 96 ++++++++++++------- 2 files changed, 74 insertions(+), 46 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index d6a1cf368..53b803eae 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -32,14 +32,14 @@ def _validate_timestamps(begin_ts, end_ts): - if begin_ts is not None and begin_ts < 0: + if begin_ts < 0: logger.error("begin-ts must be non-negative.") return False if end_ts is not None and end_ts < 0: logger.error("end-ts must be non-negative.") return False - if begin_ts is not None and end_ts is not None and begin_ts > end_ts: - logger.error("begin-ts must be <= end-ts") + if end_ts is not None and begin_ts > end_ts: + logger.error("begin-ts must be <= end-ts.") return False return True @@ -49,7 +49,9 @@ def main(argv): default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH # Top-level parser and options - args_parser = argparse.ArgumentParser(description="View or delete archives.") + args_parser = argparse.ArgumentParser( + description="View list of archive IDs or delete compressed archives." + ) args_parser.add_argument( "--config", "-c", @@ -68,7 +70,7 @@ def main(argv): ) del_parser = subparsers.add_parser( DEL_COMMAND, - help="Delete archives.", + help="Delete archives from the database and file system.", ) # Options for find subcommand @@ -135,8 +137,8 @@ def main(argv): parsed_args = args_parser.parse_args(argv[1:]) - begin_timestamp = None - end_timestamp = None + begin_timestamp: int + end_timestamp: int subcommand = parsed_args.subcommand # Validate and load config file @@ -169,11 +171,9 @@ def main(argv): begin_timestamp = parsed_args.begin_ts end_timestamp = parsed_args.end_ts - if begin_timestamp is not None or end_timestamp is not None: - - # Validate the input timestamp - if not _validate_timestamps(begin_timestamp, end_timestamp): - return -1 + # Validate the input timestamp + if not _validate_timestamps(begin_timestamp, end_timestamp): + return -1 container_name = generate_container_name("archive-manager") diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 51ba3f900..31e956d9a 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -2,6 +2,7 @@ import logging import shutil import sys +from abc import ABC, abstractmethod from contextlib import closing from pathlib import Path from typing import List @@ -27,6 +28,52 @@ logger = logging.getLogger(__file__) +class DeleteHandler(ABC): + def __init__(self, params: List[str]): + self._params = params + + def get_params(self) -> List[str]: + return self._params + + @abstractmethod + def get_criteria(self) -> str: ... + + @abstractmethod + def get_not_found_message(self) -> str: ... + + @abstractmethod + def validate_results(self, archive_ids: List[str]) -> None: ... + + +class FilterDeleteHandler(DeleteHandler): + def get_criteria(self) -> str: + return "begin_timestamp >= %s AND end_timestamp <= %s" + + def get_not_found_message(self) -> str: + return "No archives found within the specified time range." + + def validate_results(self, archive_ids: List[str]) -> None: + pass + + +class IdDeleteHandler(DeleteHandler): + def get_criteria(self) -> str: + return f"id in ({','.join(['%s'] * len(self._params))})" + + def get_not_found_message(self) -> str: + return "No archives found with matching IDs." + + def validate_results(self, archive_ids: List[str]) -> None: + not_found_ids = set(self._params) - set(archive_ids) + if not_found_ids: + logger.warning( + f""" + Archives with the following IDs don't exist: + {', '.join(not_found_ids)} + """ + ) + + def main(argv): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -100,7 +147,7 @@ def main(argv): # Delete by filter subcommand del_filter_parser = del_subparsers.add_parser( BY_FILTER_COMMAND, - help="delete archives within time frame.", + help="Deletes archives that fall within the specified time range.", ) # Delete by filter arguments @@ -140,20 +187,21 @@ def main(argv): parsed_args.end_ts, ) elif DEL_COMMAND == parsed_args.subcommand: + delete_handler: DeleteHandler if BY_IDS_COMMAND == parsed_args.del_subcommand: + delete_handler = IdDeleteHandler(parsed_args.ids) return _delete_archives( archives_dir, database_config, - parsed_args.ids, - BY_IDS_COMMAND, + delete_handler, parsed_args.dry_run, ) elif BY_FILTER_COMMAND == parsed_args.del_subcommand: + delete_handler = FilterDeleteHandler([parsed_args.begin_ts, parsed_args.end_ts]) return _delete_archives( archives_dir, database_config, - [parsed_args.begin_ts, parsed_args.end_ts], - BY_FILTER_COMMAND, + delete_handler, parsed_args.dry_run, ) @@ -199,35 +247,29 @@ def _find_archives( logger.info(f"Found {len(archive_ids)} archives within the specified time range.") for archive_id in archive_ids: logger.info(archive_id) + archive_path = archives_dir / archive_id + if not archive_path.is_dir(): + logger.warning(f"Archive {archive_id} in database not found on disk.") except Exception: logger.exception("Failed to find archives from the database.") return -1 logger.info(f"Finished finding archives from the database.") - - for archive_id in archive_ids: - archive_path = archives_dir / archive_id - if not archive_path.is_dir(): - logger.warning(f"Archive {archive_id} in database not found on disk.") - return 0 def _delete_archives( archives_dir: Path, database_config: Database, - params: List[str], - command: str, + delete_handler: DeleteHandler, dry_run: bool = False, ) -> int: """ Deletes archives from both metadata database and disk based on provided SQL query. :param archives_dir: :param database_config: - :param query: SQL query to select archives to delete. - :param params: List of parameters for SQL query. - :param command: Delete subcommand. Either "filter" or "ids". + :param delete_handler: Object to handle differences between by-filter and by-ids delete types. :param dry_run: If True, no changes will be made to the database or disk. :return: 0 on success, -1 otherwise. """ @@ -244,11 +286,8 @@ def _delete_archives( if dry_run: logger.info("Running in dry-run mode.") - criteria: str - if BY_FILTER_COMMAND == command: - criteria = "begin_timestamp >= %s AND end_timestamp <= %s" - elif BY_IDS_COMMAND == command: - criteria = "id in (%s)" % ",".join(["%s"] * len(params)) + criteria = delete_handler.get_criteria() + params = delete_handler.get_params() db_cursor.execute( f""" @@ -261,22 +300,11 @@ def _delete_archives( results = db_cursor.fetchall() if 0 == len(results): - if BY_FILTER_COMMAND == command: - logger.info("No archives found within specified time frame.") - elif BY_IDS_COMMAND == command: - logger.info("No archives found with matching IDs.") + logger.info(delete_handler.get_not_found_message) return 0 archive_ids = [result["id"] for result in results] - if BY_IDS_COMMAND == command: - not_found_ids = set(params) - set(archive_ids) - if not_found_ids: - logger.warning( - f""" - Archives with the following IDs don't exist: - {', '.join(not_found_ids)} - """ - ) + delete_handler.validate_results(archive_ids) ids_string = ", ".join(f"'{archive_id}'" for archive_id in archive_ids) From 63775f577e9b04b3643ef7bc9e72f3daf80e9667 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 23 Jan 2025 18:31:37 +0000 Subject: [PATCH 18/32] Fix typo --- .../clp_package_utils/scripts/native/archive_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 31e956d9a..980f2dce0 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -213,7 +213,7 @@ def _find_archives( end_ts: int = None, ) -> int: """ - Lists all archive IDs, if begin_its and end_its are provided, + Lists all archive IDs, if begin_ts and end_ts are provided, only lists archives where `begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`. :param archives_dir: From 1a5b0d29ba6fd563774519be8eaed7134aae0e2b Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Fri, 24 Jan 2025 22:39:12 +0000 Subject: [PATCH 19/32] PR review suggestions --- .../scripts/archive_manager.py | 35 +++++++++-------- .../scripts/native/archive_manager.py | 38 +++++++++++-------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 53b803eae..7b0862c18 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -20,8 +20,8 @@ # Command/Argument Constants from clp_package_utils.scripts.native.archive_manager import ( BEGIN_TS_ARG, - BY_FILTER_COMMAND, BY_IDS_COMMAND, + DEL_BY_FILTER_SUBCOMMAND, DEL_COMMAND, DRY_RUN_ARG, END_TS_ARG, @@ -50,7 +50,7 @@ def main(argv): # Top-level parser and options args_parser = argparse.ArgumentParser( - description="View list of archive IDs or delete compressed archives." + description="Views the list of archive IDs or deletes compressed archives." ) args_parser.add_argument( "--config", @@ -66,11 +66,11 @@ def main(argv): ) find_parser = subparsers.add_parser( FIND_COMMAND, - help="List IDs of archives.", + help="Lists IDs of compressed archives.", ) del_parser = subparsers.add_parser( DEL_COMMAND, - help="Delete archives from the database and file system.", + help="Deletes compressed archives from the database and file system.", ) # Options for find subcommand @@ -105,7 +105,7 @@ def main(argv): # Delete by ID subcommand del_id_parser = del_subparsers.add_parser( BY_IDS_COMMAND, - help="Delete archives by ID.", + help="Deletes archives by ID.", ) # Delete by ID arguments @@ -117,8 +117,8 @@ def main(argv): # Delete by filter subcommand del_filter_parser = del_subparsers.add_parser( - BY_FILTER_COMMAND, - help="Delete archives that fall within the specified time range.", + DEL_BY_FILTER_SUBCOMMAND, + help="Deletes compressed archives that fall within the specified time range.", ) # Delete by filter arguments @@ -159,15 +159,9 @@ def main(argv): return -1 # Validate input depending on subcommands - if DEL_COMMAND == subcommand and BY_FILTER_COMMAND == parsed_args.del_subcommand: - begin_timestamp = parsed_args.begin_ts - end_timestamp = parsed_args.end_ts - - # Validate the input timestamp - if not _validate_timestamps(begin_timestamp, end_timestamp): - return -1 - - elif FIND_COMMAND == subcommand: + if (DEL_COMMAND == subcommand and DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand) or ( + FIND_COMMAND == subcommand + ): begin_timestamp = parsed_args.begin_ts end_timestamp = parsed_args.end_ts @@ -202,16 +196,21 @@ def main(argv): if BY_IDS_COMMAND == parsed_args.del_subcommand: archive_manager_cmd.append(BY_IDS_COMMAND) archive_manager_cmd.extend(parsed_args.ids) - elif BY_FILTER_COMMAND == parsed_args.del_subcommand: + elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: archive_manager_cmd.extend([ - BY_FILTER_COMMAND, + DEL_BY_FILTER_SUBCOMMAND, str(begin_timestamp), str(end_timestamp) ]) + else: + logger.error(f"Unsupported subcommand: `{parsed_args.del_subcommand}`.") elif FIND_COMMAND == subcommand: archive_manager_cmd.extend([BEGIN_TS_ARG, str(begin_timestamp)]) if end_timestamp is not None: archive_manager_cmd.extend([END_TS_ARG, str(end_timestamp)]) + else: + logger.error(f"Unsupported subcommand: `{subcommand}`.") + cmd = container_start_cmd + archive_manager_cmd diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 980f2dce0..6e9aabc66 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -20,7 +20,7 @@ FIND_COMMAND = "find" DEL_COMMAND = "del" BY_IDS_COMMAND = "by-ids" -BY_FILTER_COMMAND = "by-filter" +DEL_BY_FILTER_SUBCOMMAND = "by-filter" BEGIN_TS_ARG = "--begin-ts" END_TS_ARG = "--end-ts" DRY_RUN_ARG = "--dry-run" @@ -29,8 +29,8 @@ class DeleteHandler(ABC): - def __init__(self, params: List[str]): - self._params = params + def __init__(self, query_params: List[str]): + self._params = query_params def get_params(self) -> List[str]: return self._params @@ -58,7 +58,8 @@ def validate_results(self, archive_ids: List[str]) -> None: class IdDeleteHandler(DeleteHandler): def get_criteria(self) -> str: - return f"id in ({','.join(['%s'] * len(self._params))})" + placeholders = ",".join(["%s"] * len(self._params)) + return f"id in ({placeholders})" def get_not_found_message(self) -> str: return "No archives found with matching IDs." @@ -146,7 +147,7 @@ def main(argv): # Delete by filter subcommand del_filter_parser = del_subparsers.add_parser( - BY_FILTER_COMMAND, + DEL_BY_FILTER_SUBCOMMAND, help="Deletes archives that fall within the specified time range.", ) @@ -196,7 +197,7 @@ def main(argv): delete_handler, parsed_args.dry_run, ) - elif BY_FILTER_COMMAND == parsed_args.del_subcommand: + elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: delete_handler = FilterDeleteHandler([parsed_args.begin_ts, parsed_args.end_ts]) return _delete_archives( archives_dir, @@ -204,6 +205,11 @@ def main(argv): delete_handler, parsed_args.dry_run, ) + else: + logger.error(f"Unsupported subcommand: `{parsed_args.del_subcommand}`.") + return -1 + else: + logger.error(f"Unsupported subcommand: `{parsed_args.subcommand}`.") def _find_archives( @@ -230,13 +236,13 @@ def _find_archives( with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - params = (begin_ts,) + query_params = (begin_ts,) query = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" if end_ts is not None: query += " AND end_timestamp <= %s" - params = params + (end_ts,) + query_params = query_params + (end_ts,) - db_cursor.execute(query, params) + db_cursor.execute(query, query_params) results = db_cursor.fetchall() archive_ids = [result["id"] for result in results] @@ -286,16 +292,16 @@ def _delete_archives( if dry_run: logger.info("Running in dry-run mode.") - criteria = delete_handler.get_criteria() - params = delete_handler.get_params() + query_criteria = delete_handler.get_criteria() + query_params = delete_handler.get_params() db_cursor.execute( f""" DELETE FROM `{table_prefix}archives` - WHERE {criteria} + WHERE {query_criteria} RETURNING id """, - params, + query_params, ) results = db_cursor.fetchall() @@ -306,19 +312,19 @@ def _delete_archives( archive_ids = [result["id"] for result in results] delete_handler.validate_results(archive_ids) - ids_string = ", ".join(f"'{archive_id}'" for archive_id in archive_ids) + ids_list_string = ",".join(["%s"] * len(archive_ids)) db_cursor.execute( f""" DELETE FROM `{table_prefix}files` - WHERE archive_id in ({ids_string}) + WHERE archive_id in ({ids_list_string}) """ ) db_cursor.execute( f""" DELETE FROM `{table_prefix}archive_tags` - WHERE archive_id in ({ids_string}) + WHERE archive_id in ({ids_list_string}) """ ) for archive_id in archive_ids: From 7d60b99546fa28fe7165ac870efdf13255e699ba Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 27 Jan 2025 17:18:05 +0000 Subject: [PATCH 20/32] Replace BY_IDS_COMMAND with DEL_BY_IDS_SUBCOMMAND --- .../clp_package_utils/scripts/archive_manager.py | 9 ++++----- .../clp_package_utils/scripts/native/archive_manager.py | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 7b0862c18..7522e4b8e 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -20,8 +20,8 @@ # Command/Argument Constants from clp_package_utils.scripts.native.archive_manager import ( BEGIN_TS_ARG, - BY_IDS_COMMAND, DEL_BY_FILTER_SUBCOMMAND, + DEL_BY_IDS_SUBCOMMAND, DEL_COMMAND, DRY_RUN_ARG, END_TS_ARG, @@ -104,7 +104,7 @@ def main(argv): # Delete by ID subcommand del_id_parser = del_subparsers.add_parser( - BY_IDS_COMMAND, + DEL_BY_IDS_SUBCOMMAND, help="Deletes archives by ID.", ) @@ -193,8 +193,8 @@ def main(argv): if DEL_COMMAND == subcommand: if parsed_args.dry_run: archive_manager_cmd.append(DRY_RUN_ARG) - if BY_IDS_COMMAND == parsed_args.del_subcommand: - archive_manager_cmd.append(BY_IDS_COMMAND) + if DEL_BY_IDS_SUBCOMMAND == parsed_args.del_subcommand: + archive_manager_cmd.append(DEL_BY_IDS_SUBCOMMAND) archive_manager_cmd.extend(parsed_args.ids) elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: archive_manager_cmd.extend([ @@ -210,7 +210,6 @@ def main(argv): archive_manager_cmd.extend([END_TS_ARG, str(end_timestamp)]) else: logger.error(f"Unsupported subcommand: `{subcommand}`.") - cmd = container_start_cmd + archive_manager_cmd diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 6e9aabc66..e0f17cb9a 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -19,7 +19,7 @@ # Command/Argument Constants FIND_COMMAND = "find" DEL_COMMAND = "del" -BY_IDS_COMMAND = "by-ids" +DEL_BY_IDS_SUBCOMMAND = "by-ids" DEL_BY_FILTER_SUBCOMMAND = "by-filter" BEGIN_TS_ARG = "--begin-ts" END_TS_ARG = "--end-ts" @@ -134,7 +134,7 @@ def main(argv): # Delete by ID subcomand del_id_parser = del_subparsers.add_parser( - BY_IDS_COMMAND, + DEL_BY_IDS_SUBCOMMAND, help="Delete archives by ID.", ) @@ -189,7 +189,7 @@ def main(argv): ) elif DEL_COMMAND == parsed_args.subcommand: delete_handler: DeleteHandler - if BY_IDS_COMMAND == parsed_args.del_subcommand: + if DEL_BY_IDS_SUBCOMMAND == parsed_args.del_subcommand: delete_handler = IdDeleteHandler(parsed_args.ids) return _delete_archives( archives_dir, From a1781daee575f5b303f3a9966044421ad18893ef Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 27 Jan 2025 18:09:30 +0000 Subject: [PATCH 21/32] Add line after fmt --- .../clp_package_utils/scripts/archive_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 7522e4b8e..88f75e753 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -189,6 +189,7 @@ def main(argv): str(subcommand), ] # fmt : on + # Add subcommand-specific arguments if DEL_COMMAND == subcommand: if parsed_args.dry_run: From 4591b1b968d60e038c125ac78ed06e23a1ba2efe Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 27 Jan 2025 20:47:32 +0000 Subject: [PATCH 22/32] Add type annotation to variables --- .../scripts/archive_manager.py | 56 +++++++----- .../scripts/native/archive_manager.py | 91 ++++++++++--------- 2 files changed, 82 insertions(+), 65 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 88f75e753..f48dc6247 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -8,6 +8,8 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + CLPConfig, + CLPDockerMounts, dump_container_config, generate_container_config, generate_container_name, @@ -28,10 +30,10 @@ FIND_COMMAND, ) -logger = logging.getLogger(__file__) +logger: logging.Logger = logging.getLogger(__file__) -def _validate_timestamps(begin_ts, end_ts): +def _validate_timestamps(begin_ts: int, end_ts: int): if begin_ts < 0: logger.error("begin-ts must be non-negative.") return False @@ -45,11 +47,11 @@ def _validate_timestamps(begin_ts, end_ts): def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + clp_home: Path = get_clp_home() + default_config_file_path: Path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH # Top-level parser and options - args_parser = argparse.ArgumentParser( + args_parser: argparse.ArgumentParser = argparse.ArgumentParser( description="Views the list of archive IDs or deletes compressed archives." ) args_parser.add_argument( @@ -60,15 +62,15 @@ def main(argv): ) # Top-level commands - subparsers = args_parser.add_subparsers( + subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers( dest="subcommand", required=True, ) - find_parser = subparsers.add_parser( + find_parser: argparse.ArgumentParser = subparsers.add_parser( FIND_COMMAND, help="Lists IDs of compressed archives.", ) - del_parser = subparsers.add_parser( + del_parser: argparse.ArgumentParser = subparsers.add_parser( DEL_COMMAND, help="Deletes compressed archives from the database and file system.", ) @@ -97,13 +99,13 @@ def main(argv): ) # Subcommands for delete subcommand - del_subparsers = del_parser.add_subparsers( + del_subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = del_parser.add_subparsers( dest="del_subcommand", required=True, ) # Delete by ID subcommand - del_id_parser = del_subparsers.add_parser( + del_id_parser: argparse.ArgumentParser = del_subparsers.add_parser( DEL_BY_IDS_SUBCOMMAND, help="Deletes archives by ID.", ) @@ -116,7 +118,7 @@ def main(argv): ) # Delete by filter subcommand - del_filter_parser = del_subparsers.add_parser( + del_filter_parser: argparse.ArgumentParser = del_subparsers.add_parser( DEL_BY_FILTER_SUBCOMMAND, help="Deletes compressed archives that fall within the specified time range.", ) @@ -135,16 +137,18 @@ def main(argv): help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) - parsed_args = args_parser.parse_args(argv[1:]) + parsed_args: argparse.Namespace = args_parser.parse_args(argv[1:]) begin_timestamp: int end_timestamp: int - subcommand = parsed_args.subcommand + subcommand: str = parsed_args.subcommand # Validate and load config file try: - config_file_path = Path(parsed_args.config) - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + config_file_path: Path = Path(parsed_args.config) + clp_config: CLPConfig = load_config_file( + config_file_path, default_config_file_path, clp_home + ) clp_config.validate_logs_dir() # Validate and load necessary credentials @@ -153,7 +157,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - storage_type = clp_config.archive_output.storage.type + storage_type: StorageType = clp_config.archive_output.storage.type if StorageType.FS != storage_type: logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") return -1 @@ -162,34 +166,38 @@ def main(argv): if (DEL_COMMAND == subcommand and DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand) or ( FIND_COMMAND == subcommand ): - begin_timestamp = parsed_args.begin_ts - end_timestamp = parsed_args.end_ts + begin_timestamp: int = parsed_args.begin_ts + end_timestamp: int = parsed_args.end_ts # Validate the input timestamp if not _validate_timestamps(begin_timestamp, end_timestamp): return -1 - container_name = generate_container_name("archive-manager") + container_name: str = generate_container_name("archive-manager") container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( container_clp_config, clp_config, container_name ) - necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] - container_start_cmd = generate_container_start_cmd( + necessary_mounts: list[CLPDockerMounts] = [ + mounts.clp_home, + mounts.logs_dir, + mounts.archives_output_dir, + ] + container_start_cmd: list[str] = generate_container_start_cmd( container_name, necessary_mounts, clp_config.execution_container ) # fmt: off - archive_manager_cmd = [ + archive_manager_cmd: list[str] = [ "python3", "-m", "clp_package_utils.scripts.native.archive_manager", "--config", str(generated_config_path_on_container), str(subcommand), ] # fmt : on - + # Add subcommand-specific arguments if DEL_COMMAND == subcommand: if parsed_args.dry_run: @@ -212,7 +220,7 @@ def main(argv): else: logger.error(f"Unsupported subcommand: `{subcommand}`.") - cmd = container_start_cmd + archive_manager_cmd + cmd: list[str] = container_start_cmd + archive_manager_cmd subprocess.run(cmd, check=True) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index e0f17cb9a..1d4c5fcc4 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -12,25 +12,26 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + CLPConfig, get_clp_home, load_config_file, ) # Command/Argument Constants -FIND_COMMAND = "find" -DEL_COMMAND = "del" -DEL_BY_IDS_SUBCOMMAND = "by-ids" -DEL_BY_FILTER_SUBCOMMAND = "by-filter" -BEGIN_TS_ARG = "--begin-ts" -END_TS_ARG = "--end-ts" -DRY_RUN_ARG = "--dry-run" +FIND_COMMAND: str = "find" +DEL_COMMAND: str = "del" +DEL_BY_IDS_SUBCOMMAND: str = "by-ids" +DEL_BY_FILTER_SUBCOMMAND: str = "by-filter" +BEGIN_TS_ARG: str = "--begin-ts" +END_TS_ARG: str = "--end-ts" +DRY_RUN_ARG: str = "--dry-run" -logger = logging.getLogger(__file__) +logger: logging.Logger = logging.getLogger(__file__) class DeleteHandler(ABC): def __init__(self, query_params: List[str]): - self._params = query_params + self._params: List[str] = query_params def get_params(self) -> List[str]: return self._params @@ -58,14 +59,14 @@ def validate_results(self, archive_ids: List[str]) -> None: class IdDeleteHandler(DeleteHandler): def get_criteria(self) -> str: - placeholders = ",".join(["%s"] * len(self._params)) + placeholders: str = ",".join(["%s"] * len(self._params)) return f"id in ({placeholders})" def get_not_found_message(self) -> str: return "No archives found with matching IDs." def validate_results(self, archive_ids: List[str]) -> None: - not_found_ids = set(self._params) - set(archive_ids) + not_found_ids: set[str] = set(self._params) - set(archive_ids) if not_found_ids: logger.warning( f""" @@ -76,11 +77,11 @@ def validate_results(self, archive_ids: List[str]) -> None: def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + clp_home: Path = get_clp_home() + default_config_file_path: Path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH # Top-level parser and options - args_parser = argparse.ArgumentParser( + args_parser: argparse.ArgumentParser = argparse.ArgumentParser( description="View list of archive IDs or delete compressed archives." ) args_parser.add_argument( @@ -91,15 +92,15 @@ def main(argv): ) # Top-level commands - subparsers = args_parser.add_subparsers( + subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers( dest="subcommand", required=True, ) - find_parser = subparsers.add_parser( + find_parser: argparse.ArgumentParser = subparsers.add_parser( FIND_COMMAND, help="List IDs of archives.", ) - del_parser = subparsers.add_parser( + del_parser: argparse.ArgumentParser = subparsers.add_parser( DEL_COMMAND, help="Delete archives from the database and file system.", ) @@ -127,13 +128,13 @@ def main(argv): ) # Subcommands for delete subcommands - del_subparsers = del_parser.add_subparsers( + del_subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = del_parser.add_subparsers( dest="del_subcommand", required=True, ) # Delete by ID subcomand - del_id_parser = del_subparsers.add_parser( + del_id_parser: argparse.ArgumentParser = del_subparsers.add_parser( DEL_BY_IDS_SUBCOMMAND, help="Delete archives by ID.", ) @@ -146,7 +147,7 @@ def main(argv): ) # Delete by filter subcommand - del_filter_parser = del_subparsers.add_parser( + del_filter_parser: argparse.ArgumentParser = del_subparsers.add_parser( DEL_BY_FILTER_SUBCOMMAND, help="Deletes archives that fall within the specified time range.", ) @@ -163,19 +164,21 @@ def main(argv): help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", ) - parsed_args = args_parser.parse_args(argv[1:]) + parsed_args: argparse.Namespace = args_parser.parse_args(argv[1:]) # Validate and load config file - config_file_path = Path(parsed_args.config) + config_file_path: Path = Path(parsed_args.config) try: - clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config: CLPConfig = load_config_file( + config_file_path, default_config_file_path, clp_home + ) clp_config.validate_logs_dir() except: logger.exception("Failed to load config.") return -1 - database_config = clp_config.database - archives_dir = clp_config.archive_output.get_directory() + database_config: Database = clp_config.database + archives_dir: Path = clp_config.archive_output.get_directory() if not archives_dir.exists(): logger.error("`archive_output.directory` doesn't exist.") return -1 @@ -190,7 +193,7 @@ def main(argv): elif DEL_COMMAND == parsed_args.subcommand: delete_handler: DeleteHandler if DEL_BY_IDS_SUBCOMMAND == parsed_args.del_subcommand: - delete_handler = IdDeleteHandler(parsed_args.ids) + delete_handler: IdDeleteHandler = IdDeleteHandler(parsed_args.ids) return _delete_archives( archives_dir, database_config, @@ -198,7 +201,9 @@ def main(argv): parsed_args.dry_run, ) elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: - delete_handler = FilterDeleteHandler([parsed_args.begin_ts, parsed_args.end_ts]) + delete_handler: FilterDeleteHandler = FilterDeleteHandler( + [parsed_args.begin_ts, parsed_args.end_ts] + ) return _delete_archives( archives_dir, database_config, @@ -230,22 +235,24 @@ def _find_archives( archive_ids: List[str] logger.info("Starting to find archives from the database.") try: - sql_adapter = SQL_Adapter(database_config) - clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + sql_adapter: SQL_Adapter = SQL_Adapter(database_config) + clp_db_connection_params: dict[str, any] = ( + database_config.get_clp_connection_params_and_type(True) + ) table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - query_params = (begin_ts,) - query = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" + query_params: tuple[int] = (begin_ts,) + query: str = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" if end_ts is not None: query += " AND end_timestamp <= %s" - query_params = query_params + (end_ts,) + query_params: tuple[int] = query_params + (end_ts,) db_cursor.execute(query, query_params) results = db_cursor.fetchall() - archive_ids = [result["id"] for result in results] + archive_ids: list[str] = [result["id"] for result in results] if 0 == len(archive_ids): logger.info("No archives found within specified time range.") return 0 @@ -253,7 +260,7 @@ def _find_archives( logger.info(f"Found {len(archive_ids)} archives within the specified time range.") for archive_id in archive_ids: logger.info(archive_id) - archive_path = archives_dir / archive_id + archive_path: Path = archives_dir / archive_id if not archive_path.is_dir(): logger.warning(f"Archive {archive_id} in database not found on disk.") @@ -283,8 +290,10 @@ def _delete_archives( archive_ids: List[str] logger.info("Starting to delete archives from the database.") try: - sql_adapter = SQL_Adapter(database_config) - clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + sql_adapter: SQL_Adapter = SQL_Adapter(database_config) + clp_db_connection_params: dict[str, any] = ( + database_config.get_clp_connection_params_and_type(True) + ) table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) @@ -292,8 +301,8 @@ def _delete_archives( if dry_run: logger.info("Running in dry-run mode.") - query_criteria = delete_handler.get_criteria() - query_params = delete_handler.get_params() + query_criteria: str = delete_handler.get_criteria() + query_params: str = delete_handler.get_params() db_cursor.execute( f""" @@ -309,10 +318,10 @@ def _delete_archives( logger.info(delete_handler.get_not_found_message) return 0 - archive_ids = [result["id"] for result in results] + archive_ids: list[str] = [result["id"] for result in results] delete_handler.validate_results(archive_ids) - ids_list_string = ",".join(["%s"] * len(archive_ids)) + ids_list_string: str = ",".join(["%s"] * len(archive_ids)) db_cursor.execute( f""" @@ -344,7 +353,7 @@ def _delete_archives( logger.info(f"Finished deleting archives from the database.") for archive_id in archive_ids: - archive_path = archives_dir / archive_id + archive_path: Path = archives_dir / archive_id if not archive_path.is_dir(): logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") continue From 4f9e16581e8225b83df06fed0c3531cf22e94b7f Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 27 Jan 2025 20:54:45 +0000 Subject: [PATCH 23/32] Add comments on unclear types --- .../clp_package_utils/scripts/archive_manager.py | 2 +- .../scripts/native/archive_manager.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index f48dc6247..ee37095d5 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -157,7 +157,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - storage_type: StorageType = clp_config.archive_output.storage.type + storage_type: StorageType = clp_config.archive_output.storage.type # Unsure about if StorageType.FS != storage_type: logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") return -1 diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 1d4c5fcc4..1913ad527 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -177,7 +177,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - database_config: Database = clp_config.database + database_config: Database = clp_config.database # Unsure about type archives_dir: Path = clp_config.archive_output.get_directory() if not archives_dir.exists(): logger.error("`archive_output.directory` doesn't exist.") @@ -193,7 +193,7 @@ def main(argv): elif DEL_COMMAND == parsed_args.subcommand: delete_handler: DeleteHandler if DEL_BY_IDS_SUBCOMMAND == parsed_args.del_subcommand: - delete_handler: IdDeleteHandler = IdDeleteHandler(parsed_args.ids) + delete_handler: IdDeleteHandler = IdDeleteHandler(parsed_args.ids) # Not sure this is needed return _delete_archives( archives_dir, database_config, @@ -203,7 +203,7 @@ def main(argv): elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: delete_handler: FilterDeleteHandler = FilterDeleteHandler( [parsed_args.begin_ts, parsed_args.end_ts] - ) + ) # Also not sure is needed return _delete_archives( archives_dir, database_config, @@ -238,7 +238,7 @@ def _find_archives( sql_adapter: SQL_Adapter = SQL_Adapter(database_config) clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) - ) + ) # Unsure about type table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) @@ -293,7 +293,7 @@ def _delete_archives( sql_adapter: SQL_Adapter = SQL_Adapter(database_config) clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) - ) + ) # Unsure about type table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) From 1b925cec4d13ef9b6759eadfec017cbf4dd3a492 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 27 Jan 2025 20:58:23 +0000 Subject: [PATCH 24/32] Fix lint --- .../clp_package_utils/scripts/archive_manager.py | 2 +- .../scripts/native/archive_manager.py | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index ee37095d5..1392d96fb 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -157,7 +157,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - storage_type: StorageType = clp_config.archive_output.storage.type # Unsure about + storage_type: StorageType = clp_config.archive_output.storage.type # Unsure about type if StorageType.FS != storage_type: logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") return -1 diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 1913ad527..b9ac64cd0 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -177,7 +177,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - database_config: Database = clp_config.database # Unsure about type + database_config: Database = clp_config.database # Unsure about type archives_dir: Path = clp_config.archive_output.get_directory() if not archives_dir.exists(): logger.error("`archive_output.directory` doesn't exist.") @@ -193,7 +193,9 @@ def main(argv): elif DEL_COMMAND == parsed_args.subcommand: delete_handler: DeleteHandler if DEL_BY_IDS_SUBCOMMAND == parsed_args.del_subcommand: - delete_handler: IdDeleteHandler = IdDeleteHandler(parsed_args.ids) # Not sure this is needed + delete_handler: IdDeleteHandler = IdDeleteHandler( + parsed_args.ids + ) # Not sure this is needed return _delete_archives( archives_dir, database_config, @@ -203,7 +205,7 @@ def main(argv): elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: delete_handler: FilterDeleteHandler = FilterDeleteHandler( [parsed_args.begin_ts, parsed_args.end_ts] - ) # Also not sure is needed + ) # Also not sure is needed return _delete_archives( archives_dir, database_config, @@ -238,7 +240,7 @@ def _find_archives( sql_adapter: SQL_Adapter = SQL_Adapter(database_config) clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) - ) # Unsure about type + ) # Unsure about type table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) @@ -293,7 +295,7 @@ def _delete_archives( sql_adapter: SQL_Adapter = SQL_Adapter(database_config) clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) - ) # Unsure about type + ) # Unsure about type table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) From a9e12af1c88da91dcaa3e6a4ed4174e7c498777d Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Mon, 27 Jan 2025 21:20:55 +0000 Subject: [PATCH 25/32] Remove unsure type comments --- .../clp_package_utils/scripts/archive_manager.py | 2 +- .../scripts/native/archive_manager.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 1392d96fb..f48dc6247 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -157,7 +157,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - storage_type: StorageType = clp_config.archive_output.storage.type # Unsure about type + storage_type: StorageType = clp_config.archive_output.storage.type if StorageType.FS != storage_type: logger.error(f"Archive deletion is not supported for storage type: {storage_type}.") return -1 diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index b9ac64cd0..1d4c5fcc4 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -177,7 +177,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - database_config: Database = clp_config.database # Unsure about type + database_config: Database = clp_config.database archives_dir: Path = clp_config.archive_output.get_directory() if not archives_dir.exists(): logger.error("`archive_output.directory` doesn't exist.") @@ -193,9 +193,7 @@ def main(argv): elif DEL_COMMAND == parsed_args.subcommand: delete_handler: DeleteHandler if DEL_BY_IDS_SUBCOMMAND == parsed_args.del_subcommand: - delete_handler: IdDeleteHandler = IdDeleteHandler( - parsed_args.ids - ) # Not sure this is needed + delete_handler: IdDeleteHandler = IdDeleteHandler(parsed_args.ids) return _delete_archives( archives_dir, database_config, @@ -205,7 +203,7 @@ def main(argv): elif DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand: delete_handler: FilterDeleteHandler = FilterDeleteHandler( [parsed_args.begin_ts, parsed_args.end_ts] - ) # Also not sure is needed + ) return _delete_archives( archives_dir, database_config, @@ -240,7 +238,7 @@ def _find_archives( sql_adapter: SQL_Adapter = SQL_Adapter(database_config) clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) - ) # Unsure about type + ) table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) @@ -295,7 +293,7 @@ def _delete_archives( sql_adapter: SQL_Adapter = SQL_Adapter(database_config) clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) - ) # Unsure about type + ) table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) From 86801263608c0cc9405506c48d3dc86eb7e884f1 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 29 Jan 2025 16:20:41 +0000 Subject: [PATCH 26/32] Change list annotation to typing.List --- .../clp_package_utils/scripts/archive_manager.py | 9 +++++---- .../clp_package_utils/scripts/native/archive_manager.py | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index f48dc6247..f78c06daa 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -2,6 +2,7 @@ import logging import subprocess import sys +import typing from pathlib import Path from clp_py_utils.clp_config import StorageType @@ -180,17 +181,17 @@ def main(argv): container_clp_config, clp_config, container_name ) - necessary_mounts: list[CLPDockerMounts] = [ + necessary_mounts: typing.List[CLPDockerMounts] = [ mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir, ] - container_start_cmd: list[str] = generate_container_start_cmd( + container_start_cmd: typing.List[str] = generate_container_start_cmd( container_name, necessary_mounts, clp_config.execution_container ) # fmt: off - archive_manager_cmd: list[str] = [ + archive_manager_cmd: typing.List[str] = [ "python3", "-m", "clp_package_utils.scripts.native.archive_manager", "--config", str(generated_config_path_on_container), @@ -220,7 +221,7 @@ def main(argv): else: logger.error(f"Unsupported subcommand: `{subcommand}`.") - cmd: list[str] = container_start_cmd + archive_manager_cmd + cmd: typing.List[str] = container_start_cmd + archive_manager_cmd subprocess.run(cmd, check=True) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 1d4c5fcc4..4bfc2ab84 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -2,10 +2,10 @@ import logging import shutil import sys +import typing from abc import ABC, abstractmethod from contextlib import closing from pathlib import Path -from typing import List from clp_py_utils.clp_config import Database from clp_py_utils.sql_adapter import SQL_Adapter @@ -252,7 +252,7 @@ def _find_archives( db_cursor.execute(query, query_params) results = db_cursor.fetchall() - archive_ids: list[str] = [result["id"] for result in results] + archive_ids: typing.List[str] = [result["id"] for result in results] if 0 == len(archive_ids): logger.info("No archives found within specified time range.") return 0 @@ -318,7 +318,7 @@ def _delete_archives( logger.info(delete_handler.get_not_found_message) return 0 - archive_ids: list[str] = [result["id"] for result in results] + archive_ids: typing.List[str] = [result["id"] for result in results] delete_handler.validate_results(archive_ids) ids_list_string: str = ",".join(["%s"] * len(archive_ids)) From 7edbf77daaedbb5e4424af675bc46dd1c198e6b6 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 29 Jan 2025 16:27:32 +0000 Subject: [PATCH 27/32] Fix list annotation --- .../scripts/native/archive_manager.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 4bfc2ab84..a14575907 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -30,10 +30,10 @@ class DeleteHandler(ABC): - def __init__(self, query_params: List[str]): - self._params: List[str] = query_params + def __init__(self, query_params: typing.List[str]): + self._params: typing.List[str] = query_params - def get_params(self) -> List[str]: + def get_params(self) -> typing.List[str]: return self._params @abstractmethod @@ -43,7 +43,7 @@ def get_criteria(self) -> str: ... def get_not_found_message(self) -> str: ... @abstractmethod - def validate_results(self, archive_ids: List[str]) -> None: ... + def validate_results(self, archive_ids: typing.List[str]) -> None: ... class FilterDeleteHandler(DeleteHandler): @@ -53,7 +53,7 @@ def get_criteria(self) -> str: def get_not_found_message(self) -> str: return "No archives found within the specified time range." - def validate_results(self, archive_ids: List[str]) -> None: + def validate_results(self, archive_ids: typing.List[str]) -> None: pass @@ -65,7 +65,7 @@ def get_criteria(self) -> str: def get_not_found_message(self) -> str: return "No archives found with matching IDs." - def validate_results(self, archive_ids: List[str]) -> None: + def validate_results(self, archive_ids: typing.List[str]) -> None: not_found_ids: set[str] = set(self._params) - set(archive_ids) if not_found_ids: logger.warning( @@ -232,7 +232,7 @@ def _find_archives( :param begin_ts: :param end_ts: """ - archive_ids: List[str] + archive_ids: typing.List[str] logger.info("Starting to find archives from the database.") try: sql_adapter: SQL_Adapter = SQL_Adapter(database_config) @@ -287,7 +287,7 @@ def _delete_archives( :return: 0 on success, -1 otherwise. """ - archive_ids: List[str] + archive_ids: typing.List[str] logger.info("Starting to delete archives from the database.") try: sql_adapter: SQL_Adapter = SQL_Adapter(database_config) From a0eb7e29a4593621261c82cc13d32a8d51ceb22c Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Wed, 29 Jan 2025 16:31:29 +0000 Subject: [PATCH 28/32] Fix method call --- .../clp_package_utils/scripts/native/archive_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index a14575907..c0176928d 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -315,7 +315,7 @@ def _delete_archives( results = db_cursor.fetchall() if 0 == len(results): - logger.info(delete_handler.get_not_found_message) + logger.info(delete_handler.get_not_found_message()) return 0 archive_ids: typing.List[str] = [result["id"] for result in results] From 3241f615da6743487fc3733cdb65b76a1404de1e Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 30 Jan 2025 17:55:35 +0000 Subject: [PATCH 29/32] Add asserts for begin_timestamp, tweak typing --- .../scripts/archive_manager.py | 18 +++++++------- .../scripts/native/archive_manager.py | 24 ++++++++++--------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index f78c06daa..8eb462929 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -34,7 +34,7 @@ logger: logging.Logger = logging.getLogger(__file__) -def _validate_timestamps(begin_ts: int, end_ts: int): +def _validate_timestamps(begin_ts: int, end_ts: typing.Optional[int]) -> bool: if begin_ts < 0: logger.error("begin-ts must be non-negative.") return False @@ -47,7 +47,7 @@ def _validate_timestamps(begin_ts: int, end_ts: int): return True -def main(argv): +def main(argv: typing.List[str]) -> int: clp_home: Path = get_clp_home() default_config_file_path: Path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -88,7 +88,7 @@ def main(argv): END_TS_ARG, dest="end_ts", type=int, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + help="Time-range upper-bound (inclusive) as milliseconds from the UNIX epoch.", ) # Options for delete subcommand @@ -135,13 +135,13 @@ def main(argv): END_TS_ARG, type=int, required=True, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + help="Time-range upper-bound (inclusive) as milliseconds from the UNIX epoch.", ) parsed_args: argparse.Namespace = args_parser.parse_args(argv[1:]) - begin_timestamp: int - end_timestamp: int + begin_timestamp: typing.Optional[int] + end_timestamp: typing.Optional[int] subcommand: str = parsed_args.subcommand # Validate and load config file @@ -167,10 +167,11 @@ def main(argv): if (DEL_COMMAND == subcommand and DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand) or ( FIND_COMMAND == subcommand ): - begin_timestamp: int = parsed_args.begin_ts - end_timestamp: int = parsed_args.end_ts + begin_timestamp = parsed_args.begin_ts + end_timestamp = parsed_args.end_ts # Validate the input timestamp + assert begin_timestamp is not None, "begin_timestamp is None." if not _validate_timestamps(begin_timestamp, end_timestamp): return -1 @@ -215,6 +216,7 @@ def main(argv): else: logger.error(f"Unsupported subcommand: `{parsed_args.del_subcommand}`.") elif FIND_COMMAND == subcommand: + assert begin_timestamp is not None, "begin_timestamp is None." archive_manager_cmd.extend([BEGIN_TS_ARG, str(begin_timestamp)]) if end_timestamp is not None: archive_manager_cmd.extend([END_TS_ARG, str(end_timestamp)]) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index c0176928d..39563f631 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -76,7 +76,7 @@ def validate_results(self, archive_ids: typing.List[str]) -> None: ) -def main(argv): +def main(argv: typing.List[str]) -> int: clp_home: Path = get_clp_home() default_config_file_path: Path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -116,7 +116,7 @@ def main(argv): END_TS_ARG, dest="end_ts", type=int, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + help="Time-range upper-bound (inclusive) as milliseconds from the UNIX epoch.", ) # Options for delete subcommand @@ -161,7 +161,7 @@ def main(argv): del_filter_parser.add_argument( "end_ts", type=int, - help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + help="Time-range upper-bound (inclusive) as milliseconds from the UNIX epoch.", ) parsed_args: argparse.Namespace = args_parser.parse_args(argv[1:]) @@ -215,22 +215,23 @@ def main(argv): return -1 else: logger.error(f"Unsupported subcommand: `{parsed_args.subcommand}`.") + return -1 def _find_archives( archives_dir: Path, database_config: Database, begin_ts: int, - end_ts: int = None, + end_ts: int = typing.Optional[int], ) -> int: """ - Lists all archive IDs, if begin_ts and end_ts are provided, - only lists archives where `begin_ts <= archive.begin_timestamp` and - `archive.end_timestamp <= end_ts`. + Lists all archive IDs, if begin_ts and end_ts are provided, only lists archives where + `begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`. :param archives_dir: :param database_config: :param begin_ts: :param end_ts: + :return: 0 on success, 1 on failure. """ archive_ids: typing.List[str] logger.info("Starting to find archives from the database.") @@ -239,15 +240,15 @@ def _find_archives( clp_db_connection_params: dict[str, any] = ( database_config.get_clp_connection_params_and_type(True) ) - table_prefix = clp_db_connection_params["table_prefix"] + table_prefix: str = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - query_params: tuple[int] = (begin_ts,) + query_params: typing.List[int] = [begin_ts] query: str = f"SELECT id FROM `{table_prefix}archives` WHERE begin_timestamp >= %s" if end_ts is not None: query += " AND end_timestamp <= %s" - query_params: tuple[int] = query_params + (end_ts,) + query_params.append(end_ts) db_cursor.execute(query, query_params) results = db_cursor.fetchall() @@ -280,6 +281,7 @@ def _delete_archives( ) -> int: """ Deletes archives from both metadata database and disk based on provided SQL query. + :param archives_dir: :param database_config: :param delete_handler: Object to handle differences between by-filter and by-ids delete types. @@ -302,7 +304,7 @@ def _delete_archives( logger.info("Running in dry-run mode.") query_criteria: str = delete_handler.get_criteria() - query_params: str = delete_handler.get_params() + query_params: typing.List[str] = delete_handler.get_params() db_cursor.execute( f""" From d69684a75e6fd5c6d0e25463369082f41da4a6a9 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 30 Jan 2025 19:01:21 +0000 Subject: [PATCH 30/32] Fix ids_list_string --- .../clp_package_utils/scripts/native/archive_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 39563f631..323f725e2 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -323,7 +323,7 @@ def _delete_archives( archive_ids: typing.List[str] = [result["id"] for result in results] delete_handler.validate_results(archive_ids) - ids_list_string: str = ",".join(["%s"] * len(archive_ids)) + ids_list_string: str = ", ".join(["'%s'"] * len(archive_ids)) db_cursor.execute( f""" From 325778f3d2dd1a5c1800a0d5a53f35a8989cb854 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 30 Jan 2025 20:00:04 +0000 Subject: [PATCH 31/32] Add executable permissions --- .../clp_package_utils/scripts/archive_manager.py | 0 .../clp_package_utils/scripts/native/archive_manager.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 components/clp-package-utils/clp_package_utils/scripts/archive_manager.py mode change 100644 => 100755 components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py old mode 100644 new mode 100755 diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py old mode 100644 new mode 100755 From c09bfbe07d14f4bb6b17d7a7c03a0bacd7c2bb97 Mon Sep 17 00:00:00 2001 From: Eden Zhang Date: Thu, 30 Jan 2025 23:08:56 +0000 Subject: [PATCH 32/32] Add return --- .../clp_package_utils/scripts/archive_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py index 8eb462929..33f71a9e7 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/archive_manager.py @@ -215,6 +215,7 @@ def main(argv: typing.List[str]) -> int: ]) else: logger.error(f"Unsupported subcommand: `{parsed_args.del_subcommand}`.") + return -1 elif FIND_COMMAND == subcommand: assert begin_timestamp is not None, "begin_timestamp is None." archive_manager_cmd.extend([BEGIN_TS_ARG, str(begin_timestamp)])