diff --git a/HerdingCats/data_loader/data_loader.py b/HerdingCats/data_loader/data_loader.py index 99f1632..04a9285 100644 --- a/HerdingCats/data_loader/data_loader.py +++ b/HerdingCats/data_loader/data_loader.py @@ -23,11 +23,12 @@ # LOAD CKAN DATA RESOURCES INTO STORAGE class CkanCatResourceLoader: """A class to load data resources into various formats and storage systems.""" - + SUPPORTED_FORMATS = { "spreadsheet": ["xlsx", "xls"], "csv": ["csv"], - "json": ["json"] + "json": ["json"], + "parquet": ["parquet"] } def __init__(self): @@ -48,20 +49,67 @@ def _validate_dependencies(self): @staticmethod def validate_inputs(func): - """Decorator to validate common input parameters.""" + """ + Decorator to validate common input parameters. + Handles both single resource lists and lists of resource lists. + + This is because what we input will look like this: + + Format 1: Single List + ┌─────────── Single Resource List ───────────┐ + │ [0]: "Homicide Accused.csv" │ + │ [1]: "2024-09-20T13:21:02.610Z" │ + │ [2]: "csv" ◄─── Format │ + │ [3]: "https://..." ◄─── URL │ + └────────────────────────────────────────────┘ + + Format 2: List of Lists + ┌─────────────────────── Outer List ────────────────────────┐ + │ ┌─────────── Inner List 1 ───────────┐ ┌─── List 2 ───┐ │ + │ │ [0]: "Homicide Accused.csv" │ │ ... │ │ + │ │ [1]: "2024-09-20T13:21:02.610Z" │ │ ... │ │ + │ │ [2]: "csv" ◄─── Format │ │ ... │ │ + │ │ [3]: "https://..." ◄─── URL │ │ ... │ │ + │ └─────────────────────────────────────┘ └─────────────┘ │ + └──────────────────────────────────────────────────────────┘ + + But we only want to focus on the first list and only need format and url. + """ @wraps(func) def wrapper(self, resource_data: Optional[List], *args, **kwargs): - if not resource_data or len(resource_data) < 2: - logger.error("Invalid or insufficient resource data provided") - raise ValueError("Resource data must be a list with at least 2 elements") - - url = resource_data[1] - + # First validate we have a list + if not isinstance(resource_data, list) or not resource_data: + logger.error("Invalid resource data: must be a non-empty list") + raise ValueError("Resource data must be a non-empty list") + + # Determine if we have a single resource or multiple resources + # We check if the first element is a list to determine the structure + target_resource = (resource_data[0] + if isinstance(resource_data[0], list) + else resource_data) + + # Validate the resource has all required elements + if len(target_resource) < 4: + logger.error("Invalid resource format: resource must have at least 4 elements") + raise ValueError("Resource must contain at least 4 elements") + + # Extract format and URL from their positions + format_type = target_resource[2].lower() + url = target_resource[3] + # Validate URL format if not url.startswith(('http://', 'https://')): + logger.error(f"Invalid URL format: {url}") raise ValueError("Invalid URL format") - - return func(self, resource_data, *args, **kwargs) + + # Create the modified resource in the expected format + modified_resource = [format_type, url] + logger.info(f"You're currently working with this resource {modified_resource}") + + # Log what type of resource we processed + logger.debug(f"Processed {'multiple' if isinstance(resource_data[0], list) else 'single'} resource format") + + return func(self, modified_resource, *args, **kwargs) return wrapper def _fetch_data(self, url: str) -> BytesIO: @@ -104,16 +152,16 @@ def _load_dataframe( ) -> Union[PandasDataFrame, PolarsDataFrame]: """ Common method to load data into either pandas or polars DataFrame. - + Args: binary_data: BytesIO object containing the file data file_format: Format of the file (e.g., 'csv', 'xlsx') sheet_name: Name of the sheet for Excel files loader_type: Which DataFrame implementation to use ('pandas' or 'polars') - + Returns: Either a pandas or polars DataFrame depending on loader_type - + Raises: ValueError: If file format is unsupported Exception: If loading fails for any other reason @@ -121,23 +169,29 @@ def _load_dataframe( try: match (file_format, loader_type): case ("spreadsheet" | "xlsx", "pandas"): - return (pd.read_excel(binary_data, sheet_name=sheet_name) + return (pd.read_excel(binary_data, sheet_name=sheet_name) if sheet_name else pd.read_excel(binary_data)) - + case ("spreadsheet" | "xlsx", "polars"): return (pl.read_excel(binary_data, sheet_name=sheet_name) if sheet_name else pl.read_excel(binary_data)) - + case ("csv", "pandas"): return pd.read_csv(binary_data) - + case ("csv", "polars"): return pl.read_csv(binary_data) - + + case ("parquet", "pandas"): + return pd.read_parquet(binary_data) + + case ("parquet", "polars"): + return pl.read_parquet(binary_data) + case _: logger.error(f"Unsupported format: {file_format}") raise ValueError(f"Unsupported file format: {file_format}") - + except Exception as e: logger.error(f"Failed to load {loader_type} DataFrame: {str(e)}") raise @@ -148,9 +202,9 @@ def polars_data_loader(self, resource_data: List, sheet_name: Optional[str] = No """Load a resource into a Polars DataFrame.""" binary_data = self._fetch_data(resource_data[1]) return self._load_dataframe( - binary_data, - resource_data[0].lower(), - sheet_name=sheet_name, + binary_data, + resource_data[0].lower(), + sheet_name=sheet_name, loader_type="polars" ) @@ -159,9 +213,9 @@ def pandas_data_loader(self, resource_data: List, sheet_name: Optional[str] = No """Load a resource into a Pandas DataFrame.""" binary_data = self._fetch_data(resource_data[1]) return self._load_dataframe( - binary_data, - resource_data[0].lower(), - sheet_name=sheet_name, + binary_data, + resource_data[0].lower(), + sheet_name=sheet_name, loader_type="pandas" ) @@ -170,40 +224,43 @@ def _create_duckdb_table(self, conn: duckdb.DuckDBPyConnection, df: pd.DataFrame try: # Convert pandas DataFrame directly to DuckDB table conn.register(f'temp_{table_name}', df) - + # Create permanent table from temporary registration sql_command = f""" - CREATE TABLE {table_name} AS + CREATE TABLE {table_name} AS SELECT * FROM temp_{table_name} """ conn.execute(sql_command) - + # Verify the table result = conn.execute(f"SELECT * FROM {table_name} LIMIT 5").fetch_df() print(result) if len(result) == 0: raise duckdb.Error("No data was loaded into the table") - + logger.info(f"Successfully created table '{table_name}'") - + except Exception as e: logger.error(f"Failed to create DuckDB table: {str(e)}") raise @validate_inputs - def duckdb_data_loader(self, resource_data: List, sheet_name: str, table_name: str) -> duckdb.DuckDBPyConnection: + def duckdb_data_loader(self, resource_data: List, table_name: str, sheet_name: Optional[str] = None) -> duckdb.DuckDBPyConnection: """Load resource data into an in-memory DuckDB database via pandas.""" + if not resource_data is None: + raise ValueError("Must be a list") + if not isinstance(table_name, str) or not table_name.strip(): raise ValueError("Table name must be a non-empty string") try: # First load data into pandas DataFrame df = self.pandas_data_loader(resource_data, sheet_name=sheet_name) - + # Then create DuckDB connection and load the DataFrame conn = duckdb.connect(":memory:") self._create_duckdb_table(conn, df, table_name) - + logger.info(f"Data successfully loaded into in-memory table '{table_name}'") return conn except Exception as e: @@ -224,7 +281,7 @@ def motherduck_data_loader(self, resource_data: List, token: str, try: # First load data into pandas DataFrame df = self.pandas_data_loader(resource_data) - + # Then connect to MotherDuck and load the DataFrame with duckdb.connect(connection_string) as conn: logger.info("MotherDuck Connection Established") @@ -283,15 +340,15 @@ def aws_s3_data_loader(self, resource_data: List, bucket_name: str, case "raw": filename = f"{custom_name}-{uuid.uuid4()}.{file_format}" s3_client.upload_fileobj(binary_data, bucket_name, filename) - + case "parquet": parquet_buffer = self._convert_to_parquet(binary_data, file_format) filename = f"{custom_name}-{uuid.uuid4()}.parquet" s3_client.upload_fileobj(parquet_buffer, bucket_name, filename) - + logger.info(f"File uploaded successfully to S3 as {filename}") return filename - + except Exception as e: logger.error(f"AWS S3 upload error: {e}") raise @@ -329,16 +386,16 @@ def validate_inputs(func): """Decorator to validate resource data containing download URLs and formats.""" @wraps(func) def wrapper(self, resource_data: Optional[List[Dict]], *args, **kwargs): - # Check if resource data exists and is non-empty + # Check if resource data exists and is non-empty if not resource_data or not isinstance(resource_data, list): logger.error("Resource data must be a list") raise ValueError("Resource data must be a list of dictionaries") return func(self, resource_data, *args, **kwargs) return wrapper - + def _validate_resource_data( - self, - resource_data: Optional[List[Dict[str, str]]], + self, + resource_data: Optional[List[Dict[str, str]]], format_type: str ) -> str: """Validate resource data and extract download URL.""" @@ -347,10 +404,10 @@ def _validate_resource_data( # Get all supported formats all_formats = [fmt for formats in self.SUPPORTED_FORMATS.values() for fmt in formats] - + # If the provided format_type is a category, get its format - valid_formats = (self.SUPPORTED_FORMATS.get(format_type, []) - if format_type in self.SUPPORTED_FORMATS + valid_formats = (self.SUPPORTED_FORMATS.get(format_type, []) + if format_type in self.SUPPORTED_FORMATS else [format_type]) # Validate format type @@ -362,11 +419,11 @@ def _validate_resource_data( # Find matching resource url = next( - (r.get('download_url') for r in resource_data + (r.get('download_url') for r in resource_data if r.get('format', '').lower() in valid_formats), None ) - + # If format provided does not have a url provide the formats that do if not url: available_formats = [r['format'] for r in resource_data] @@ -374,7 +431,7 @@ def _validate_resource_data( f"No resource found with format: {format_type}. " f"Available formats: {', '.join(available_formats)}" ) - + return url def _fetch_data(self, url: str, api_key: Optional[str] = None) -> BytesIO: @@ -383,7 +440,7 @@ def _fetch_data(self, url: str, api_key: Optional[str] = None) -> BytesIO: # Add API key to URL if provided if api_key: url = f"{url}?apikey={api_key}" - + response = requests.get(url) response.raise_for_status() return BytesIO(response.content) @@ -495,15 +552,15 @@ def duckdb_data_loader( ) -> duckdb.DuckDBPyConnection: """Load data from a resource URL directly into DuckDB.""" url = self._validate_resource_data(resource_data, format_type) - + if api_key: url = f"{url}?apikey={api_key}" - + con = duckdb.connect(':memory:') con.execute("SET force_download=true") con.execute("INSTALL spatial") con.execute("LOAD spatial") - + try: # Use match statement for format handling match format_type: @@ -518,16 +575,16 @@ def duckdb_data_loader( con.execute("CREATE TABLE data AS SELECT * FROM st_read(?)", [url]) case _: raise ValueError(f"Unsupported format type: {format_type}") - + # Verify data was loaded sample_data = con.execute("SELECT * FROM data LIMIT 10").fetchall() if not sample_data and not api_key: raise OpenDataSoftExplorerError( "Received empty dataset. This likely means an API key is required." ) - + return con - + except duckdb.Error as e: raise OpenDataSoftExplorerError(f"Failed to load {format_type} resource into DuckDB", e) @@ -565,7 +622,7 @@ def _convert_to_parquet(self, binary_data: BytesIO, format_type: str) -> BytesIO return parquet_buffer except Exception as e: raise OpenDataSoftExplorerError(f"Failed to convert to parquet: {str(e)}", e) - + @validate_inputs def aws_s3_data_loader( self, @@ -590,17 +647,17 @@ def aws_s3_data_loader( Returns: str: Name of the uploaded file """ - + # Validate inputs if not all(isinstance(x, str) and x.strip() for x in [bucket_name, custom_name]): raise ValueError("Bucket name and custom name must be non-empty strings") - + # Get URL for specified format url = self._validate_resource_data(resource_data, format_type) - + # Fetch data binary_data = self._fetch_data(url, api_key) - + # Setup S3 s3_client = boto3.client("s3") self._verify_s3_bucket(s3_client, bucket_name) @@ -614,7 +671,7 @@ def aws_s3_data_loader( parquet_buffer = self._convert_to_parquet(binary_data, format_type) filename = f"{custom_name}-{uuid.uuid4()}.parquet" s3_client.upload_fileobj(parquet_buffer, bucket_name, filename) - + logger.success(f"File uploaded successfully to S3 as {filename}") return filename except Exception as e: @@ -649,13 +706,13 @@ def _validate_dependencies(self): missing = [name for name, module in required_modules.items() if module is None] if missing: raise ImportError(f"Missing required dependencies: {', '.join(missing)}") - + @staticmethod def validate_inputs(func): """Decorator to validate resource data containing download URLs and formats.""" @wraps(func) def wrapper(self, resource_data: Optional[List[Dict]], *args, **kwargs): - # Check if resource data exists and is non-empty + # Check if resource data exists and is non-empty if not resource_data or not isinstance(resource_data, list): logger.error("Resource data must be a list") raise ValueError("Resource data must be a list of dictionaries") @@ -663,8 +720,8 @@ def wrapper(self, resource_data: Optional[List[Dict]], *args, **kwargs): return wrapper def _validate_resource_data( - self, - resource_data: Optional[List[Dict[str, str]]], + self, + resource_data: Optional[List[Dict[str, str]]], format_type: str ) -> str: """Validate resource data and extract download URL.""" @@ -673,12 +730,12 @@ def _validate_resource_data( # Get all supported formats all_formats = [fmt for formats in self.SUPPORTED_FORMATS.values() for fmt in formats] - + # If the provided format_type is a category, get its format - valid_formats = (self.SUPPORTED_FORMATS.get(format_type, []) - if format_type in self.SUPPORTED_FORMATS + valid_formats = (self.SUPPORTED_FORMATS.get(format_type, []) + if format_type in self.SUPPORTED_FORMATS else [format_type]) - + # Validate format type if format_type not in self.SUPPORTED_FORMATS and format_type not in all_formats: raise FrenchCatDataLoaderError( @@ -688,11 +745,11 @@ def _validate_resource_data( # Find matching resource url = next( - (r.get('resource_url') for r in resource_data + (r.get('resource_url') for r in resource_data if r.get('resource_format', '').lower() in valid_formats), None ) - + # If format provided does not have a url provide the formats that do if not url: available_formats = [r['resource_url'] for r in resource_data] @@ -700,9 +757,9 @@ def _validate_resource_data( f"No resource found with format: {format_type}. " f"Available formats: {', '.join(available_formats)}" ) - + return url - + @validate_inputs def duckdb_data_loader( self, @@ -713,15 +770,15 @@ def duckdb_data_loader( ) -> duckdb.DuckDBPyConnection: """Load data from a resource URL directly into DuckDB.""" url = self._validate_resource_data(resource_data, format_type) - + if api_key: url = f"{url}?apikey={api_key}" - + con = duckdb.connect(':memory:') con.execute("SET force_download=true") con.execute("INSTALL spatial") con.execute("LOAD spatial") - + try: # Use match statement for format handling match format_type: @@ -736,15 +793,15 @@ def duckdb_data_loader( con.execute("CREATE TABLE data AS SELECT * FROM st_read(?)", [url]) case _: raise ValueError(f"Unsupported format type: {format_type}") - + # Verify data was loaded sample_data = con.execute("SELECT * FROM data LIMIT 10").fetchall() if not sample_data and not api_key: raise FrenchCatDataLoaderError( "Received empty dataset. This likely means an API key is required." ) - + return con - + except duckdb.Error as e: - raise FrenchCatDataLoaderError(f"Failed to load {format_type} resource into DuckDB", e) \ No newline at end of file + raise FrenchCatDataLoaderError(f"Failed to load {format_type} resource into DuckDB", e) diff --git a/HerdingCats/explorer/cat_explore.py b/HerdingCats/explorer/cat_explore.py index 109efe5..1e1357f 100644 --- a/HerdingCats/explorer/cat_explore.py +++ b/HerdingCats/explorer/cat_explore.py @@ -2,7 +2,6 @@ import pandas as pd import polars as pl import duckdb -import time from typing import Any, Dict, Optional, Union, Literal, List, Tuple from loguru import logger @@ -816,47 +815,47 @@ def package_search_condense_dataframe_unpack( # Extract specific data from results # OR flatten nested data structures # ---------------------------- - def extract_resource_url( - self, package_info: List[Dict], resource_name: str - ) -> List[str] | None: + def extract_resource_url(self, package_info: List[Dict]) -> List[str] | None: """ - Extracts the URL and format of a specific resource from a package. + Extracts the download inmformation for resources in a package. - Specify the name of the resource you want to use. + Pass in package info list Returns: - List[format, url]: The format of the resource and the URL. - [ - 'spreadsheet', - 'https://data.london.gov.uk/download/violence-reduction-unit/1ef840d0-2c02-499c-ae40-382005b2a0c7/VRU%2520Dataset%2520Q1%2520April-Nov%25202023.xlsx' - ] + List[resource_name, resource_created, format, url] - Example: - if __name__ == "__main__": - with CkanCatSession("data.london.gov.uk") as session: - explore = CkanCatExplorer(session) - all_packages = explore.package_list_dictionary() - data = all_packages.get("violence-reduction-unit") - info = explore.package_show_info_json(data) - dl_link = explore.extract_resource_url(info, "VRU Q1 2023-24 Dataset") - print(dl_link) + # Example: + import HerdingCats as hc + from pprint import pprint + def main(): + with hc.CatSession(hc.CkanDataCatalogues.LONDON_DATA_STORE) as session: + explore = hc.CkanCatExplorer(session) + dataset = explore.show_package_info("mps-homicide-dashboard-data") + urls = explore.extract_resource_url(dataset) + pprint(urls) + + if __name__ =="__main__": + main() """ + results = [] for item in package_info: - if item.get("resource_name") == resource_name: + resource_name = item.get("resource_name") + created = item.get("resource_created") url = item.get("resource_url") format = item.get("resource_format") - if url and format: + if all([resource_name, created, format, url]): logger.success( f"Found URL for resource '{resource_name}'. Format is: {format}" ) - return [format, url] + results.append([resource_name, created, format, url]) else: logger.warning( f"Resource '{resource_name}' found in package, but no URL available" ) return None + return results @staticmethod def _extract_condensed_package_data( @@ -1504,7 +1503,7 @@ def main(): try: result = self._extract_resource_data(data) return result - except Exception as e: + except Exception: logger.error("Error fetching resource: {str(e)}") @staticmethod