From b8fe59ae60b3bb92c9ca2ff5a6c1485ee73aeb63 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sat, 18 Jan 2025 19:45:41 +0000 Subject: [PATCH 01/12] Aligned with current code --- .../runtime/ray/ray_utils.py | 54 ++++++++++++++----- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index ff6e53892..eaaff0f48 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -15,7 +15,7 @@ from typing import Any import ray -from data_processing.utils import GB, UnrecoverableException +from data_processing.utils import GB, UnrecoverableException, get_logger from ray.actor import ActorHandle from ray.exceptions import RayError from ray.experimental.state.api import list_actors @@ -94,6 +94,8 @@ def get_available_nodes(available_nodes_gauge: Gauge = None) -> int: for node in nodes: if node["Alive"]: nnodes += 1 + if available_nodes_gauge is not None: + available_nodes_gauge.set(nnodes) return nnodes @staticmethod @@ -115,18 +117,46 @@ def operator() -> ActorHandle: time.sleep(creation_delay) return clazz.options(**actor_options).remote(params) + logger = get_logger(__name__) + # Get class name cls_name = clazz.__class__.__name__.replace("ActorClass(", "").replace(")", "") - actors = [operator() for _ in range(n_actors)] - for i in range(120): - time.sleep(1) - alive = list_actors( - filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=RAY_MAX_ACTOR_LIMIT + # Get currently existing actors of type + current = list_actors( + filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=RAY_MAX_ACTOR_LIMIT + ) + c_len = len(current) + # compute desired number of actors + overall = c_len + n_actors + # RAY_MAX_ACTOR_LIMIT is the hard limit on the amount of actors returned by list_actors function + if overall < RAY_MAX_ACTOR_LIMIT: + # create actors + actors = [operator() for _ in range(n_actors)] + n_list = min(overall + 10, RAY_MAX_ACTOR_LIMIT) + # waiting for all actors to become ready + alive = [] + for i in range(120): + time.sleep(1) + alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=n_list) + if len(alive) >= n_actors + c_len: + return actors + # failed + if len(alive) >= n_actors / 2 + c_len: + # At least half of the actors were created + logger.info(f"created {n_actors}, alive {len(alive)} Running with less actors") + created_ids = [item.actor_id for item in alive if item not in current] + return [ + actor + for actor in actors + if (str(actor._ray_actor_id).replace("ActorID(", "").replace(")", "") in created_ids) + ] + else: + # too few actors created + raise UnrecoverableException(f"Created {n_actors}, alive {len(alive)}. Too few actors were created") + else: + raise UnrecoverableException( + f"Overall number of actors of class {cls_name} is {overall}. " + f"Too many actors to create, greater then {RAY_MAX_ACTOR_LIMIT}" ) - if len(actors) == len(alive): - return actors - # failed - raise an exception - print(f"created {actors}, alive {alive}") - raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive") @staticmethod def process_files( @@ -176,7 +206,7 @@ def process_files( while True: # we can have several workers fail here try: - res = executors.get_next_unordered() + _ = executors.get_next_unordered() break except Exception as e: if isinstance(e, RayError): From 4b4a916efa49a0f195e212e46e499404bb4a64a1 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 19 Jan 2025 09:36:43 +0000 Subject: [PATCH 02/12] Updated to current version --- .../ray/src/data_processing_ray/runtime/ray/ray_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index eaaff0f48..0b3640110 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -131,6 +131,7 @@ def operator() -> ActorHandle: if overall < RAY_MAX_ACTOR_LIMIT: # create actors actors = [operator() for _ in range(n_actors)] + # get the amount of actors to list n_list = min(overall + 10, RAY_MAX_ACTOR_LIMIT) # waiting for all actors to become ready alive = [] From 4abf263fcdc5b416c1a3a11c043e3e2c5b221c0d Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 19 Jan 2025 18:21:11 +0000 Subject: [PATCH 03/12] hugging face data access --- .../data_access/data_access_hf.py | 238 ++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 data-processing-lib/python/src/data_processing/data_access/data_access_hf.py diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py new file mode 100644 index 000000000..0d7630860 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -0,0 +1,238 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import gzip +import json +import os +from pathlib import Path +from typing import Any + +import pyarrow as pa +from huggingface_hub import HfFileSystem +from data_processing.data_access import DataAccess +from data_processing.utils import get_logger + + +logger = get_logger(__name__) + + +class DataAccessHF(DataAccess): + """ + Implementation of the Base Data access class for local folder data access. + """ + + def __init__( + self, + hf_config: dict[str, str] = None, + hf_token: str = None, + d_sets: list[str] = None, + checkpoint: bool = False, + m_files: int = -1, + n_samples: int = -1, + files_to_use: list[str] = [".parquet"], + files_to_checkpoint: list[str] = [".parquet"], + ): + """ + Create data access class for folder based configuration + :param hf_config: dictionary of path info + :param hf_token: hugging face token + :param d_sets list of the data sets to use + :param checkpoint: flag to return only files that do not exist in the output directory + :param m_files: max amount of files to return + :param n_samples: amount of files to randomly sample + :param files_to_use: files extensions of files to include + :param files_to_checkpoint: files extensions of files to use for checkpointing + """ + super().__init__(d_sets=d_sets, checkpoint=checkpoint, m_files=m_files, n_samples=n_samples, + files_to_use=files_to_use, files_to_checkpoint=files_to_checkpoint) + if hf_config is None: + self.input_folder = None + self.output_folder = None + else: + self.input_folder = hf_config["input_folder"] + if self.input_folder[-1] == "/": + self.input_folder = self.input_folder[:-1] + self.output_folder = hf_config["output_folder"] + if self.output_folder[-1] == "/": + self.output_folder = self.output_folder[:-1] + self.fs = HfFileSystem(token=hf_token) + + logger.debug(f"hf input folder: {self.input_folder}") + logger.debug(f"hf output folder: {self.output_folder}") + logger.debug(f"hf data sets: {self.d_sets}") + logger.debug(f"hf checkpoint: {self.checkpoint}") + logger.debug(f"hf m_files: {self.m_files}") + logger.debug(f"hf n_samples: {self.n_samples}") + logger.debug(f"hf files_to_use: {self.files_to_use}") + logger.debug(f"hf files_to_checkpoint: {self.files_to_checkpoint}") + + def get_output_folder(self) -> str: + """ + Get output folder as a string + :return: output_folder + """ + return self.output_folder + + def get_input_folder(self) -> str: + """ + Get input folder as a string + :return: input_folder + """ + return self.input_folder + + def _get_file_size(self, path: str) -> int: + """ + Get file size in bytes + :param path: file path + :return: file size in bytes + """ + return self.fs.info(path=path)['size'] + + def _list_files_folder(self, path: str) -> tuple[list[dict[str, Any]], int]: + """ + Get files for a given folder and all sub folders + :param path: path + :return: List of files + """ + files = sorted(self.fs.glob(path=f"{path}/**/*.*")) + res = [{"name": file, "size": self._get_file_size(file)} for file in files] + return res, 0 + + def _get_folders_to_use(self) -> tuple[list[str], int]: + """ + convert data sets to a list of folders to use + :return: list of folders and retries + """ + folders_to_use = [] + files = self.fs.ls(path=self.input_folder) + dirs = [f['name'] for f in files if f['type'] == 'directory'] + + for file in files: + for s_name in self.d_sets: + if file.endswith(s_name): + folders_to_use.append(folder) + break + return folders_to_use, 0 + + def get_table(self, path: str) -> tuple[pa.table, int]: + """ + Attempts to read a PyArrow table from the given path. + + Args: + path (str): Path to the file containing the table. + + Returns: + pyarrow.Table: PyArrow table if read successfully, None otherwise. + """ + + try: + data = self.get_file(path=path) + return TransformUtils.convert_binary_to_arrow(data=data), 0 + except Exception as e: + logger.error(f"Error reading table from {path}: {e}") + return None, 0 + + def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], int]: + """ + Saves a pyarrow table to a file and returns information about the operation. + + Args: + table (pyarrow.Table): The pyarrow table to save. + path (str): The path to the output file. + + Returns: + tuple: A tuple containing: + - size_in_memory (int): The size of the table in memory (bytes). + - file_info (dict or None): A dictionary containing: + - name (str): The name of the file. + - size (int): The size of the file (bytes). + If saving fails, file_info will be None. + """ + # Get table size in memory + try: + # Write the table to parquet format + data = TransformUtils.convert_arrow_to_binary(table=table) + self.save_file(path=path, data=data) + # Get file size and create file_info + file_info = {"name": os.path.basename(path), "size": self._get_file_size(path)} + return size_in_memory, file_info, 0 + + except Exception as e: + logger.error(f"Error saving table to {path}: {e}") + return -1, None, 0 + + def save_job_metadata(self, metadata: dict[str, Any]) -> tuple[dict[str, Any], int]: + """ + Save metadata + :param metadata: a dictionary, containing the following keys: + "pipeline", + "job details", + "code", + "job_input_params", + "execution_stats", + "job_output_stats" + two additional elements: + "source" + "target" + are filled bu implementation + :return: a dictionary as + defined https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/put_object.html + in the case of failure dict is None + """ + if self.output_folder is None: + logger.error("hf configuration is not defined, can't save metadata") + return None, 0 + metadata["source"] = {"name": self.input_folder, "type": "path"} + metadata["target"] = {"name": self.output_folder, "type": "path"} + return self.save_file( + path=f"{self.output_folder}/metadata.json", + data=json.dumps(metadata, indent=2).encode(), + ) + + def get_file(self, path: str) -> tuple[bytes, int]: + """ + Gets the contents of a file as a byte array, decompressing gz files if needed. + + Args: + path (str): The path to the file. + + Returns: + bytes: The contents of the file as a byte array, or None if an error occurs. + """ + + try: + with self.fs.open(path=files[0], mode="rb") as f: + return f.read(), 0 + except Exception as e: + logger.error(f"Error reading file {path}: {e}") + raise e + + def save_file(self, path: str, data: bytes) -> tuple[dict[str, Any], int]: + """ + Saves bytes to a file and returns a dictionary with file information. + + Args: + data (bytes): The bytes data to save. + path (str): The full name of the file to save. + + Returns: + dict or None: A dictionary with "name" and "size" keys if successful, + or None if saving fails. + """ + try: + with self.fs.open(path=files[0], mode="wb") as f: + f.write(data) + file_info = {"name": file, "size": self.fs.info(file)['size']} + return file_info, 0 + except Exception as e: + logger.error(f"Error saving bytes to file {path}: {e}") + return None, 0 From b4b4c9705abb58a30f3ca49284add9dfef5eb137 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 20 Jan 2025 17:33:10 +0000 Subject: [PATCH 04/12] hugging face data access --- .../data_processing/data_access/__init__.py | 1 + .../data_access/data_access_factory.py | 72 ++++++++++++++----- .../data_access/data_access_factory_base.py | 36 +++++++++- 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/data_access/__init__.py b/data-processing-lib/python/src/data_processing/data_access/__init__.py index 1f1d77928..dee751f0c 100644 --- a/data-processing-lib/python/src/data_processing/data_access/__init__.py +++ b/data-processing-lib/python/src/data_processing/data_access/__init__.py @@ -2,6 +2,7 @@ from data_processing.data_access.data_access import DataAccess from data_processing.data_access.data_access_local import DataAccessLocal from data_processing.data_access.data_access_s3 import DataAccessS3 +from data_processing.data_access.data_access_hf import DataAccessHF from data_processing.data_access.data_access_factory_base import DataAccessFactoryBase from data_processing.data_access.data_access_factory import DataAccessFactory from data_processing.data_access.snapshot_utils import SnapshotUtils diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py b/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py index 2172e3ed0..732e55f66 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py @@ -19,6 +19,7 @@ DataAccessFactoryBase, DataAccessLocal, DataAccessS3, + DataAccessHF, ) from data_processing.utils import ParamsUtils, str2bool @@ -46,6 +47,7 @@ def __init__(self, cli_arg_prefix: str = "data_", enable_data_navigation: bool = super().__init__(cli_arg_prefix=cli_arg_prefix) self.s3_config = None self.local_config = None + self.hf_config = None self.enable_data_navigation = enable_data_navigation def add_input_params(self, parser: argparse.ArgumentParser) -> None: @@ -77,6 +79,7 @@ def add_input_params(self, parser: argparse.ArgumentParser) -> None: self.__add_data_navigation_params(parser) def __add_data_navigation_params(self, parser): + # s3 config help_example_dict = { "input_folder": [ "s3-path/your-input-bucket", @@ -93,6 +96,7 @@ def __add_data_navigation_params(self, parser): default=None, help="AST string containing input/output paths.\n" + ParamsUtils.get_ast_help_text(help_example_dict), ) + # local config help_example_dict = { "input_folder": ["./input", "Path to input folder of files to be processed"], "output_folder": ["/tmp/output", "Path to output folder of processed files"], @@ -104,6 +108,19 @@ def __add_data_navigation_params(self, parser): help="ast string containing input/output folders using local fs.\n" + ParamsUtils.get_ast_help_text(help_example_dict), ) + # hf config + help_example_dict = { + "hf_token": ["./input", "HF token required for write operation"], + "input_folder": ["./input", "Path to input folder of files to be processed"], + "output_folder": ["/tmp/output", "Path to output folder of processed files"], + } + parser.add_argument( + f"--{self.cli_arg_prefix}hf_config", + type=ast.literal_eval, + default=None, + help="ast string containing hf_token/input/output folders using hf fs.\n" + + ParamsUtils.get_ast_help_text(help_example_dict), + ) parser.add_argument( f"--{self.cli_arg_prefix}max_files", type=int, default=-1, help="Max amount of files to process" ) @@ -154,6 +171,7 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: s3_cred = arg_dict.get(f"{self.cli_arg_prefix}s3_cred", None) s3_config = arg_dict.get(f"{self.cli_arg_prefix}s3_config", None) local_config = arg_dict.get(f"{self.cli_arg_prefix}local_config", None) + hf_config = arg_dict.get(f"{self.cli_arg_prefix}hf_config", None) checkpointing = arg_dict.get(f"{self.cli_arg_prefix}checkpointing", False) max_files = arg_dict.get(f"{self.cli_arg_prefix}max_files", -1) data_sets = arg_dict.get(f"{self.cli_arg_prefix}data_sets", None) @@ -163,18 +181,20 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: # check which configuration (S3 or Local) is specified s3_config_specified = 1 if s3_config is not None else 0 local_config_specified = 1 if local_config is not None else 0 + hf_config_specified = 1 if hf_config is not None else 0 # check that only one (S3 or Local) configuration is specified - if s3_config_specified + local_config_specified > 1: + if s3_config_specified + local_config_specified + hf_config_specified > 1: self.logger.error( f"data factory {self.cli_arg_prefix} " f"{'S3, ' if s3_config_specified == 1 else ''}" f"{'Local ' if local_config_specified == 1 else ''}" + f"{'hf ' if hf_config_specified == 1 else ''}" "configurations specified, but only one configuration expected" ) return False - # further validate the specified configuration (S3 or Local) + # further validate the specified configuration (S3, hf or Local) if s3_config_specified == 1: if not self._validate_s3_config(s3_config=s3_config): return False @@ -188,6 +208,21 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: f'input path - {self.s3_config["input_folder"]}, ' f'output path - {self.s3_config["output_folder"]}' ) + elif hf_config_specified == 1: + if not self._validate_hf_config(local_config=hf_config): + return False + self.hf_config = hf_config + self.logger.info( + f"data factory {self.cli_arg_prefix} is using HF data access: " + f"hf_token - {self.hf_config['hf_token']} " + f"input_folder - {self.hf_config['input_folder']} " + f"output_folder - {self.hf_config['output_folder']}" + ) + elif s3_cred is not None: + if not self._validate_s3_cred(s3_credentials=s3_cred): + return False + self.s3_cred = s3_cred + self.logger.info(f"data factory {self.cli_arg_prefix} is using s3 configuration without input/output path") elif local_config_specified == 1: if not self._validate_local_config(local_config=local_config): return False @@ -197,11 +232,6 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: f"input_folder - {self.local_config['input_folder']} " f"output_folder - {self.local_config['output_folder']}" ) - elif s3_cred is not None: - if not self._validate_s3_cred(s3_credentials=s3_cred): - return False - self.s3_cred = s3_cred - self.logger.info(f"data factory {self.cli_arg_prefix} is using s3 configuration without input/output path") else: self.logger.info( f"data factory {self.cli_arg_prefix} " f"is using local configuration without input/output path" @@ -240,11 +270,10 @@ def create_data_access(self) -> DataAccess: Create data access based on the parameters :return: corresponding data access class """ - if self.s3_config is not None or self.s3_cred is not None: - # If S3 config or S3 credential are specified, its S3 - return DataAccessS3( - s3_credentials=self.s3_cred, - s3_config=self.s3_config, + if self.hf_config is not None: + # hf-config is specified, its hf + return DataAccessHF( + hf_config=self.hf_config, d_sets=self.dsets, checkpoint=self.checkpointing, m_files=self.max_files, @@ -252,10 +281,11 @@ def create_data_access(self) -> DataAccess: files_to_use=self.files_to_use, files_to_checkpoint=self.files_to_checkpoint, ) - else: - # anything else is local data - return DataAccessLocal( - local_config=self.local_config, + if self.s3_config is not None or self.s3_cred is not None: + # If S3 config or S3 credential are specified, its S3 + return DataAccessS3( + s3_credentials=self.s3_cred, + s3_config=self.s3_config, d_sets=self.dsets, checkpoint=self.checkpointing, m_files=self.max_files, @@ -263,3 +293,13 @@ def create_data_access(self) -> DataAccess: files_to_use=self.files_to_use, files_to_checkpoint=self.files_to_checkpoint, ) + # anything else is local data + return DataAccessLocal( + local_config=self.local_config, + d_sets=self.dsets, + checkpoint=self.checkpointing, + m_files=self.max_files, + n_samples=self.n_samples, + files_to_use=self.files_to_use, + files_to_checkpoint=self.files_to_checkpoint, + ) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py b/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py index cef7c9657..8e011a494 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_factory_base.py @@ -130,8 +130,8 @@ def _validate_local_config(self, local_config: dict[str, str]) -> bool: def _validate_s3_config(self, s3_config: dict[str, str]) -> bool: """ Validate that - :param s3_config: dictionary of local config - :return: True if s3l config is valid, False otherwise + :param s3_config: dictionary of s3 config + :return: True if s3 config is valid, False otherwise """ valid_config = True if s3_config.get("input_folder", "") == "": @@ -141,3 +141,35 @@ def _validate_s3_config(self, s3_config: dict[str, str]) -> bool: valid_config = False self.logger.error(f"data access factory {self.cli_arg_prefix}: Could not find output folder in s3 config") return valid_config + + def _validate_hf_config(self, hf_config: dict[str, str]) -> bool: + """ + Validate that + :param s3_config: dictionary of hf config + :return: True if hf config is valid, False otherwise + """ + valid_config = True + if hf_config.get("hf_token", "") == "": + self.logger.warning(f"data access factory {self.cli_arg_prefix}: " + f"HF token is not defined, write operation may fail") + input_folder = hf_config.get("input_folder", "") + if input_folder == "": + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: Could not find input folder in HF config") + else: + if not input_folder.startswith("datasets/"): + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: " + f"Input folder in HF config has to start from datasets/") + + output_folder = hf_config.get("output_folder", "") + if output_folder == "": + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: Could not find output folder in HF config") + else: + if not output_folder.startswith("datasets/"): + valid_config = False + self.logger.error(f"data access factory {self.cli_arg_prefix}: " + f"Output folder in HF config has to start from datasets/") + + return valid_config From 4b563593d1f551e45a6734b1df749007f9491ca5 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 20 Jan 2025 19:27:28 +0000 Subject: [PATCH 05/12] Add dependency --- data-processing-lib/python/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/data-processing-lib/python/requirements.txt b/data-processing-lib/python/requirements.txt index 318d715d5..39e26a68b 100644 --- a/data-processing-lib/python/requirements.txt +++ b/data-processing-lib/python/requirements.txt @@ -5,3 +5,4 @@ mmh3 psutil polars>=1.9.0 + huggingface-hub>=0.25.2 From c0ae85ceba21bbe15b69aa26bb81d0b275b8c48c Mon Sep 17 00:00:00 2001 From: blublinsky Date: Tue, 21 Jan 2025 09:34:34 +0000 Subject: [PATCH 06/12] restored Ray utils --- .../runtime/ray/ray_utils.py | 87 ++++++------------- 1 file changed, 28 insertions(+), 59 deletions(-) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index 0b3640110..9b2503cbb 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -15,7 +15,7 @@ from typing import Any import ray -from data_processing.utils import GB, UnrecoverableException, get_logger +from data_processing.utils import GB, UnrecoverableException from ray.actor import ActorHandle from ray.exceptions import RayError from ray.experimental.state.api import list_actors @@ -37,10 +37,10 @@ class RayUtils: @staticmethod def get_available_resources( - available_cpus_gauge: Gauge = None, - available_gpus_gauge: Gauge = None, - available_memory_gauge: Gauge = None, - object_memory_gauge: Gauge = None, + available_cpus_gauge: Gauge = None, + available_gpus_gauge: Gauge = None, + available_memory_gauge: Gauge = None, + object_memory_gauge: Gauge = None, ) -> dict[str, Any]: """ Get currently available cluster resources @@ -94,13 +94,11 @@ def get_available_nodes(available_nodes_gauge: Gauge = None) -> int: for node in nodes: if node["Alive"]: nnodes += 1 - if available_nodes_gauge is not None: - available_nodes_gauge.set(nnodes) return nnodes @staticmethod def create_actors( - clazz: type, params: dict[str, Any], actor_options: dict[str, Any], n_actors: int, creation_delay: int = 0 + clazz: type, params: dict[str, Any], actor_options: dict[str, Any], n_actors: int, creation_delay: int = 0 ) -> list[ActorHandle]: """ Create a set of actors @@ -117,60 +115,31 @@ def operator() -> ActorHandle: time.sleep(creation_delay) return clazz.options(**actor_options).remote(params) - logger = get_logger(__name__) - # Get class name cls_name = clazz.__class__.__name__.replace("ActorClass(", "").replace(")", "") - # Get currently existing actors of type - current = list_actors( - filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=RAY_MAX_ACTOR_LIMIT - ) - c_len = len(current) - # compute desired number of actors - overall = c_len + n_actors - # RAY_MAX_ACTOR_LIMIT is the hard limit on the amount of actors returned by list_actors function - if overall < RAY_MAX_ACTOR_LIMIT: - # create actors - actors = [operator() for _ in range(n_actors)] - # get the amount of actors to list - n_list = min(overall + 10, RAY_MAX_ACTOR_LIMIT) - # waiting for all actors to become ready - alive = [] - for i in range(120): - time.sleep(1) - alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=n_list) - if len(alive) >= n_actors + c_len: - return actors - # failed - if len(alive) >= n_actors / 2 + c_len: - # At least half of the actors were created - logger.info(f"created {n_actors}, alive {len(alive)} Running with less actors") - created_ids = [item.actor_id for item in alive if item not in current] - return [ - actor - for actor in actors - if (str(actor._ray_actor_id).replace("ActorID(", "").replace(")", "") in created_ids) - ] - else: - # too few actors created - raise UnrecoverableException(f"Created {n_actors}, alive {len(alive)}. Too few actors were created") - else: - raise UnrecoverableException( - f"Overall number of actors of class {cls_name} is {overall}. " - f"Too many actors to create, greater then {RAY_MAX_ACTOR_LIMIT}" + actors = [operator() for _ in range(n_actors)] + for i in range(120): + time.sleep(1) + alive = list_actors( + filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=RAY_MAX_ACTOR_LIMIT ) + if len(actors) == len(alive): + return actors + # failed - raise an exception + print(f"created {actors}, alive {alive}") + raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive") @staticmethod def process_files( - executors: ActorPool, - files: list[str], - print_interval: int, - files_in_progress_gauge: Gauge, - files_completed_gauge: Gauge, - available_cpus_gauge: Gauge, - available_gpus_gauge: Gauge, - available_memory_gauge: Gauge, - object_memory_gauge: Gauge, - logger: logging.Logger, + executors: ActorPool, + files: list[str], + print_interval: int, + files_in_progress_gauge: Gauge, + files_completed_gauge: Gauge, + available_cpus_gauge: Gauge, + available_gpus_gauge: Gauge, + available_memory_gauge: Gauge, + object_memory_gauge: Gauge, + logger: logging.Logger, ) -> int: """ Process files @@ -207,7 +176,7 @@ def process_files( while True: # we can have several workers fail here try: - _ = executors.get_next_unordered() + res = executors.get_next_unordered() break except Exception as e: if isinstance(e, RayError): @@ -282,4 +251,4 @@ def wait_for_execution_completion(logger: logging.Logger, replies: list[ray.Obje actor_failures += 1 not_ready = replies - 1 replies = not_ready - return actor_failures + return actor_failures \ No newline at end of file From cb773440765325f68746d6b748724778dabb100f Mon Sep 17 00:00:00 2001 From: blublinsky Date: Tue, 21 Jan 2025 19:00:52 +0000 Subject: [PATCH 07/12] add unit tests --- .../data_access/data_access_factory.py | 5 +- .../data_access/data_access_hf.py | 8 +-- .../hf_data_access_factory_test.py | 60 +++++++++++++++++++ .../data_access/hf_data_access_test.py | 50 ++++++++++++++++ 4 files changed, 115 insertions(+), 8 deletions(-) create mode 100644 data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_factory_test.py create mode 100644 data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py b/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py index 732e55f66..16ed5231b 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_factory.py @@ -209,12 +209,11 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool: f'output path - {self.s3_config["output_folder"]}' ) elif hf_config_specified == 1: - if not self._validate_hf_config(local_config=hf_config): + if not self._validate_hf_config(hf_config=hf_config): return False self.hf_config = hf_config self.logger.info( - f"data factory {self.cli_arg_prefix} is using HF data access: " - f"hf_token - {self.hf_config['hf_token']} " + f"data factory {self.cli_arg_prefix} is using HF data access: " f"input_folder - {self.hf_config['input_folder']} " f"output_folder - {self.hf_config['output_folder']}" ) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py index 0d7630860..e5424e567 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -33,7 +33,6 @@ class DataAccessHF(DataAccess): def __init__( self, hf_config: dict[str, str] = None, - hf_token: str = None, d_sets: list[str] = None, checkpoint: bool = False, m_files: int = -1, @@ -44,7 +43,6 @@ def __init__( """ Create data access class for folder based configuration :param hf_config: dictionary of path info - :param hf_token: hugging face token :param d_sets list of the data sets to use :param checkpoint: flag to return only files that do not exist in the output directory :param m_files: max amount of files to return @@ -64,7 +62,7 @@ def __init__( self.output_folder = hf_config["output_folder"] if self.output_folder[-1] == "/": self.output_folder = self.output_folder[:-1] - self.fs = HfFileSystem(token=hf_token) + self.fs = HfFileSystem(token=hf_config["hf_token"]) logger.debug(f"hf input folder: {self.input_folder}") logger.debug(f"hf output folder: {self.output_folder}") @@ -116,10 +114,10 @@ def _get_folders_to_use(self) -> tuple[list[str], int]: files = self.fs.ls(path=self.input_folder) dirs = [f['name'] for f in files if f['type'] == 'directory'] - for file in files: + for file in dirs: for s_name in self.d_sets: if file.endswith(s_name): - folders_to_use.append(folder) + folders_to_use.append(file) break return folders_to_use, 0 diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_factory_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_factory_test.py new file mode 100644 index 000000000..66713a704 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_factory_test.py @@ -0,0 +1,60 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import sys +from argparse import ArgumentParser + +from data_processing.data_access import DataAccessFactory, DataAccessHF +from data_processing.utils import ParamsUtils + + +def test_creating_hf_data_access(): + """ + Testing creation of HF data access + :return: None + """ + input_folder = "datasets/blublinsky/test/data" + output_folder = "datasets/blublinsky/test/temp" + hf_token = "token" + hf_conf = { + "hf_token": hf_token, + "input_folder": input_folder, + "output_folder": output_folder, + } + params = {} + params["data_hf_config"] = hf_conf + params["data_num_samples"] = 3 + params["data_data_sets"] = ["ds1"] + params["data_files_to_use"] = [".nothere"] + + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(params) + daf = DataAccessFactory() + parser = ArgumentParser() + daf.add_input_params(parser) + args = parser.parse_args() + daf.apply_input_params(args) + + # create data_access + data_access = daf.create_data_access() + + # validate created data access + assert isinstance(data_access, DataAccessHF) + assert data_access.input_folder == input_folder + assert data_access.output_folder == output_folder + assert data_access.fs.token == hf_token + + assert data_access.n_samples == 3 + assert data_access.d_sets == ["ds1"] + assert data_access.files_to_use == [".nothere"] + + diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py new file mode 100644 index 000000000..0c8af949c --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py @@ -0,0 +1,50 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +from data_processing.data_access import DataAccessHF + +hf_conf = { + "hf_token": None, + "input_folder": 'datasets/blublinsky/test/data', + "output_folder": 'datasets/blublinsky/test/temp/', +} + + +def test_hf_data_access(): + """ + Testing data access of HF data access + :return: None + """ + data_access = DataAccessHF(hf_config=hf_conf) + # get files to process + files, profile, retries = data_access.get_files_to_process() + assert len(files) == 50 + assert profile['max_file_size'] >= 135.730997085571 + assert profile['min_file_size'] >= 0.00743961334228515 + assert profile['total_file_size'] >= 269.3791465759277 + + random = data_access.get_random_file_set(n_samples=5, files=files) + assert len(random) == 5 + +def test_hf_data_access_sets(): + """ + Testing data access of HF data access with data sets + :return: None + """ + data_access = DataAccessHF(hf_config=hf_conf, d_sets=["aai_Latn", "aba_Latn"]) + # get files to process + files, profile, retries = data_access.get_files_to_process() + assert len(files) == 4 + assert profile['max_file_size'] >= 0.501188278198242 + assert profile['min_file_size'] >= 0.00965785980224609 + assert profile['total_file_size'] >= 0.620627403259277 From bca8c48e84b873a4c7b855e4a50b8a62e8d88b6a Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 22 Jan 2025 10:12:57 +0000 Subject: [PATCH 08/12] more testing --- .../data_access/data_access_hf.py | 18 ++++++++---------- ..._test.py => data_access_factory_hf_test.py} | 0 ...a_access_test.py => data_access_hf_test.py} | 15 +++++++++++++++ 3 files changed, 23 insertions(+), 10 deletions(-) rename data-processing-lib/python/test/data_processing_tests/data_access/{hf_data_access_factory_test.py => data_access_factory_hf_test.py} (100%) rename data-processing-lib/python/test/data_processing_tests/data_access/{hf_data_access_test.py => data_access_hf_test.py} (79%) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py index e5424e567..2eb269e44 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -19,7 +19,7 @@ import pyarrow as pa from huggingface_hub import HfFileSystem from data_processing.data_access import DataAccess -from data_processing.utils import get_logger +from data_processing.utils import TransformUtils, get_logger logger = get_logger(__name__) @@ -133,8 +133,8 @@ def get_table(self, path: str) -> tuple[pa.table, int]: """ try: - data = self.get_file(path=path) - return TransformUtils.convert_binary_to_arrow(data=data), 0 + data, retries = self.get_file(path=path) + return TransformUtils.convert_binary_to_arrow(data=data), retries except Exception as e: logger.error(f"Error reading table from {path}: {e}") return None, 0 @@ -159,10 +159,8 @@ def save_table(self, path: str, table: pa.Table) -> tuple[int, dict[str, Any], i try: # Write the table to parquet format data = TransformUtils.convert_arrow_to_binary(table=table) - self.save_file(path=path, data=data) - # Get file size and create file_info - file_info = {"name": os.path.basename(path), "size": self._get_file_size(path)} - return size_in_memory, file_info, 0 + finfo, retries = self.save_file(path=path, data=data) + return len(data), finfo, retries except Exception as e: logger.error(f"Error saving table to {path}: {e}") @@ -208,7 +206,7 @@ def get_file(self, path: str) -> tuple[bytes, int]: """ try: - with self.fs.open(path=files[0], mode="rb") as f: + with self.fs.open(path=path, mode="rb") as f: return f.read(), 0 except Exception as e: logger.error(f"Error reading file {path}: {e}") @@ -227,9 +225,9 @@ def save_file(self, path: str, data: bytes) -> tuple[dict[str, Any], int]: or None if saving fails. """ try: - with self.fs.open(path=files[0], mode="wb") as f: + with self.fs.open(path=path, mode="wb") as f: f.write(data) - file_info = {"name": file, "size": self.fs.info(file)['size']} + file_info = {"name": path, "size": self.fs.info(path=path)['size']} return file_info, 0 except Exception as e: logger.error(f"Error saving bytes to file {path}: {e}") diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_factory_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_factory_hf_test.py similarity index 100% rename from data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_factory_test.py rename to data-processing-lib/python/test/data_processing_tests/data_access/data_access_factory_hf_test.py diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py similarity index 79% rename from data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py rename to data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py index 0c8af949c..4f42ace5d 100644 --- a/data-processing-lib/python/test/data_processing_tests/data_access/hf_data_access_test.py +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py @@ -33,9 +33,24 @@ def test_hf_data_access(): assert profile['min_file_size'] >= 0.00743961334228515 assert profile['total_file_size'] >= 269.3791465759277 + # read tables + t_stats = [ + {"n_rows": 8, "n_columns": 11}, + {"n_rows": 424, "n_columns": 11}, + {"n_rows": 9336, "n_columns": 12}, + {"n_rows": 7, "n_columns": 11}, + {"n_rows": 1353, "n_columns": 11}, + ] + for i in range(5): + table, retries = data_access.get_table(path=files[i]) + assert table.num_rows == t_stats[i]["n_rows"] + assert table.num_columns == t_stats[i]["n_columns"] + + # get random set of files random = data_access.get_random_file_set(n_samples=5, files=files) assert len(random) == 5 + def test_hf_data_access_sets(): """ Testing data access of HF data access with data sets From abd3edec2dce8b02e07fa3e0c033116575cdc5d1 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 22 Jan 2025 13:30:34 +0000 Subject: [PATCH 09/12] add support for data set card --- .../data_access/data_access_hf.py | 18 +++++++++++++++++- .../data_access/data_access_hf_test.py | 10 ++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py index 2eb269e44..995e577bd 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -17,7 +17,7 @@ from typing import Any import pyarrow as pa -from huggingface_hub import HfFileSystem +from huggingface_hub import HfFileSystem, RepoCard from data_processing.data_access import DataAccess from data_processing.utils import TransformUtils, get_logger @@ -232,3 +232,19 @@ def save_file(self, path: str, data: bytes) -> tuple[dict[str, Any], int]: except Exception as e: logger.error(f"Error saving bytes to file {path}: {e}") return None, 0 + + def get_dataset_card(self, ds_name: str) -> RepoCard: + """ + Get the Repo card for the data set + :param ds_name: data set name in the format owner/ds_name + :return: DS card object + """ + # get file location + if ds_name[-1] == "/": + path = f"datasets/{ds_name[:-1]}/README.md" + else: + path = f"datasets/{ds_name}/README.md" + # read README file + with self.fs.open(path=path, mode="r", newline="", encoding="utf-8") as f: + data = f.read() + return RepoCard(content=data) \ No newline at end of file diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py index 4f42ace5d..ce4b91bea 100644 --- a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py @@ -63,3 +63,13 @@ def test_hf_data_access_sets(): assert profile['max_file_size'] >= 0.501188278198242 assert profile['min_file_size'] >= 0.00965785980224609 assert profile['total_file_size'] >= 0.620627403259277 + + +def test_data_set_card(): + """ + Testing data set card access + :return: None + """ + data_access = DataAccessHF(hf_config=hf_conf) + card = data_access.get_dataset_card(ds_name="blublinsky/test") + assert card.data.license == 'apache-2.0' From 2a5b58d4223a740ee6759f3058d9203792152f78 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Wed, 22 Jan 2025 18:42:08 +0000 Subject: [PATCH 10/12] add support for data set card --- .../data_access/data_access_hf.py | 32 ++++++++++++++++++- .../data_access/data_access_hf_test.py | 15 +++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py index 995e577bd..7733ae0ac 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -18,6 +18,7 @@ import pyarrow as pa from huggingface_hub import HfFileSystem, RepoCard +from huggingface_hub.errors import EntryNotFoundError from data_processing.data_access import DataAccess from data_processing.utils import TransformUtils, get_logger @@ -62,6 +63,7 @@ def __init__( self.output_folder = hf_config["output_folder"] if self.output_folder[-1] == "/": self.output_folder = self.output_folder[:-1] + self.hf_config = hf_config self.fs = HfFileSystem(token=hf_config["hf_token"]) logger.debug(f"hf input folder: {self.input_folder}") @@ -247,4 +249,32 @@ def get_dataset_card(self, ds_name: str) -> RepoCard: # read README file with self.fs.open(path=path, mode="r", newline="", encoding="utf-8") as f: data = f.read() - return RepoCard(content=data) \ No newline at end of file + return RepoCard(content=data) + + def update_data_set_card(self, ds_name: str, content: str) -> None: + """ + Update Repo card + :param ds_name: data set name in the format owner/ds_name + :param content: new readme content + :return: None + """ + # make sure that token is defined + if self.hf_config["hf_token"] is None: + raise Exception("Update data set card is only supported when HF_TOKEN is defined") + # get file location + if ds_name[-1] == "/": + path = f"datasets/{ds_name[:-1]}/README.md" + else: + path = f"datasets/{ds_name}/README.md" + # delete current Readme file + try: + self.fs.rm(path=path) + except EntryNotFoundError: + logger.warning(f"Data set {ds_name} does not have README file") + except Exception as e: + logger.warning(f"Failted to delete README file {e}") + raise e + # write new Readme file + with self.fs.open(path=path, mode="w", newline="", encoding="utf-8") as f: + f.write(content) + diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py index ce4b91bea..488729eca 100644 --- a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py @@ -12,6 +12,7 @@ from data_processing.data_access import DataAccessHF +from huggingface_hub import CardData hf_conf = { "hf_token": None, @@ -70,6 +71,20 @@ def test_data_set_card(): Testing data set card access :return: None """ + # read the card data_access = DataAccessHF(hf_config=hf_conf) card = data_access.get_dataset_card(ds_name="blublinsky/test") assert card.data.license == 'apache-2.0' + # update it + data = card.data.to_dict() + data["extension"] = "my_extension" + card.data = CardData(**data) + content = card.content + # save a new card (readme) + try: + data_access.update_data_set_card(ds_name="blublinsky/test", content=content) + # read it back + card = data_access.get_dataset_card(ds_name="blublinsky/test") + assert card.data.extension == "my_extension" + except Exception as e: + print(f"Exception updating card {e}. Did you specify hf_token?") From 57f34db38b1b15f5dd5ca2d909d8a273ad22e3b7 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Thu, 23 Jan 2025 10:23:30 +0000 Subject: [PATCH 11/12] add write file testing --- .../src/data_processing/data_access/data_access_hf.py | 4 ++++ .../data_access/data_access_hf_test.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py index 7733ae0ac..fa52a359c 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -226,6 +226,10 @@ def save_file(self, path: str, data: bytes) -> tuple[dict[str, Any], int]: dict or None: A dictionary with "name" and "size" keys if successful, or None if saving fails. """ + # make sure that token is defined + if self.hf_config["hf_token"] is None: + logger.warning("Writing file is only supported when HF_TOKEN is defined") + return None, 0 try: with self.fs.open(path=path, mode="wb") as f: f.write(data) diff --git a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py index 488729eca..b8a3a7ea3 100644 --- a/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py +++ b/data-processing-lib/python/test/data_processing_tests/data_access/data_access_hf_test.py @@ -46,6 +46,12 @@ def test_hf_data_access(): table, retries = data_access.get_table(path=files[i]) assert table.num_rows == t_stats[i]["n_rows"] assert table.num_columns == t_stats[i]["n_columns"] + if i == 0: + data, _ = data_access.get_file(path=files[i]) + # write to data set + output_file = data_access.get_output_location(files[i]) + res, _ = data_access.save_file(path=output_file, data=data) + assert res is None # get random set of files random = data_access.get_random_file_set(n_samples=5, files=files) From 558f9003be1191dff4f8424ad0043fd4e32037fb Mon Sep 17 00:00:00 2001 From: blublinsky Date: Thu, 23 Jan 2025 10:31:51 +0000 Subject: [PATCH 12/12] add write file testing --- .../data_access/data_access_hf.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py index fa52a359c..395f08e9d 100644 --- a/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py +++ b/data-processing-lib/python/src/data_processing/data_access/data_access_hf.py @@ -251,8 +251,13 @@ def get_dataset_card(self, ds_name: str) -> RepoCard: else: path = f"datasets/{ds_name}/README.md" # read README file - with self.fs.open(path=path, mode="r", newline="", encoding="utf-8") as f: - data = f.read() + try: + with self.fs.open(path=path, mode="r", newline="", encoding="utf-8") as f: + data = f.read() + except Exception as e: + logger.warning(f"Failted to read README file {e}") + return None + # convert README to Repo card return RepoCard(content=data) def update_data_set_card(self, ds_name: str, content: str) -> None: @@ -274,11 +279,16 @@ def update_data_set_card(self, ds_name: str, content: str) -> None: try: self.fs.rm(path=path) except EntryNotFoundError: + # If the README file does not exist, ignore logger.warning(f"Data set {ds_name} does not have README file") except Exception as e: logger.warning(f"Failted to delete README file {e}") raise e # write new Readme file - with self.fs.open(path=path, mode="w", newline="", encoding="utf-8") as f: - f.write(content) + try: + with self.fs.open(path=path, mode="w", newline="", encoding="utf-8") as f: + f.write(content) + except Exception as e: + logger.warning(f"Failted to save README file {e}") + raise e