From c5a840a0a63c8eb8b2bf92e791e7e807686e853f Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Mon, 30 Dec 2024 17:09:23 +0100 Subject: [PATCH 01/10] =?UTF-8?q?=F0=9F=90=9B=20Fix=20variable=20passed=20?= =?UTF-8?q?to=20`to=5Fdf`=20causing=20all=20media=20entries=20to=20be=20ch?= =?UTF-8?q?osen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/orchestration/prefect/tasks/mediatool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/viadot/orchestration/prefect/tasks/mediatool.py b/src/viadot/orchestration/prefect/tasks/mediatool.py index a2d0dbd6f..270a035fe 100644 --- a/src/viadot/orchestration/prefect/tasks/mediatool.py +++ b/src/viadot/orchestration/prefect/tasks/mediatool.py @@ -70,7 +70,7 @@ def mediatool_to_df( organization_id=organization_id, ) df_media_entries = mediatool.to_df( - data=media_entries_data, drop_columns=media_entries_columns + data=media_entries_data, columns=media_entries_columns ) unique_vehicle_ids = df_media_entries["vehicleId"].unique() unique_media_type_ids = df_media_entries["mediaTypeId"].unique() From 492d01e4bc1aa405a3b5538e63ed66aa338fb573 Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Tue, 31 Dec 2024 14:37:33 +0100 Subject: [PATCH 02/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20Refactor=20`api?= =?UTF-8?q?=5Fconnection`=20into=20`=5Fto=5Frecords`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 35 ++++++++++++++++++--------------- tests/unit/test_mediatool.py | 6 +++--- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index fb018deab..91817fbba 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -238,40 +238,43 @@ def _get_media_types( return list_media_types - def api_connection( + def _to_records( self, - get_data_from: str, + endpoint: Literal[ + "organizations", "media_entries", "vehicles", "campaigns", "media_types" + ], organization_id: str | None = None, vehicle_ids: list[str] | None = None, media_type_ids: list[str] | None = None, ) -> list[dict[str, str]]: - """General method to connect to Mediatool API and generate the response. + """Connects to the Mediatool API and retrieves data for the specified endpoint. Args: - get_data_from (str): Method to be used to extract data from. + endpoint (Literal["organizations", "media_entries", "vehicles", "campaigns", + "media_types"]): The API endpoint to fetch data from. organization_id (str, optional): Organization ID. Defaults to None. vehicle_ids (list[str]): List of organization IDs. Defaults to None. media_type_ids (list[str]): List of media type IDs. Defaults to None. Returns: - list[dict[str, str]]: Data from Mediatool API connection. + list[dict[str, str]]: A list of records containing the retrieved data. """ - self.url_abbreviation = get_data_from + if endpoint == "organizations": + return self._get_organizations(self.user_id) - if self.url_abbreviation == "organizations": - returned_data = self._get_organizations(self.user_id) + if endpoint == "media_entries": + return self._get_media_entries(organization_id=organization_id) - elif self.url_abbreviation == "media_entries": - returned_data = self._get_media_entries(organization_id=organization_id) + if endpoint == "vehicles": + return self._get_vehicles(vehicle_ids=vehicle_ids) - elif self.url_abbreviation == "vehicles": - returned_data = self._get_vehicles(vehicle_ids=vehicle_ids) + if endpoint == "campaigns": + return self._get_campaigns(organization_id=organization_id) - elif self.url_abbreviation == "campaigns": - returned_data = self._get_campaigns(organization_id=organization_id) + if endpoint == "media_types": + return self._get_media_types(media_type_ids=media_type_ids) + return None - elif self.url_abbreviation == "media_types": - returned_data = self._get_media_types(media_type_ids=media_type_ids) return returned_data diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py index f472e449e..15407191a 100644 --- a/tests/unit/test_mediatool.py +++ b/tests/unit/test_mediatool.py @@ -128,15 +128,15 @@ def test_rename_columns(): @pytest.mark.connect @patch("viadot.sources.mediatool.handle_api_response") -def test_api_connection(mock_handle_api_response): - """Test Mediatool `api_connection` method.""" +def test_to_records_connection(mock_handle_api_response): + """Test Mediatool `_to_records` method.""" mock_response = MagicMock() mock_response.text = json.dumps(variables["organizations"]) mock_handle_api_response.return_value = mock_response mediatool = Mediatool(credentials=variables["credentials"]) - result = mediatool.api_connection(get_data_from="organizations") + result = mediatool._to_records(endpoint="organizations") expected_result = [{"_id": "1", "name": "Org1", "abbreviation": "O1"}] assert result == expected_result From 1bea7c48e30ff87091fb730904dae38145de5162 Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Tue, 31 Dec 2024 14:43:33 +0100 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=94=A5=20Remove=20`=5Frename=5Fcolu?= =?UTF-8?q?mns`=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 29 ++++++----------------------- tests/unit/test_mediatool.py | 12 ------------ 2 files changed, 6 insertions(+), 35 deletions(-) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 91817fbba..44fbb512e 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -64,27 +64,6 @@ def __init__( self.url_abbreviation = None - def _rename_columns( - self, - df: pd.DataFrame, - column_suffix: str, - ) -> pd.DataFrame: - """Rename columns. - - Args: - df (pd.DataFrame): Incoming Data frame. - column_suffix (str): String to be added at the end of column name. - - Returns: - pd.DataFrame: Modified Data Frame. - """ - column_suffix = column_suffix.split("get_")[-1] - dict_mapped_names = { - column_name: f"{column_name}_{column_suffix}" for column_name in df.columns - } - - return df.rename(columns=dict_mapped_names) - def _get_organizations( self, user_id: str | None = None, @@ -310,8 +289,12 @@ def to_df( ) if column_suffix: - data_frame = self._rename_columns( - df=data_frame, column_suffix=column_suffix + # Endpoint name is added to the end of the column name to make it unique. + data_frame = data_frame.rename( + columns={ + column_name: f"{column_name}_{column_suffix}" + for column_name in data_frame.columns + } ) if columns: diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py index 15407191a..a604f9ce0 100644 --- a/tests/unit/test_mediatool.py +++ b/tests/unit/test_mediatool.py @@ -114,18 +114,6 @@ def test_get_media_types(mock_handle_api_response): assert result == expected_result -@pytest.mark.functions -def test_rename_columns(): - """Test Mediatool `_rename_columns` function.""" - df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) - - mediatool = Mediatool(credentials=variables["credentials"]) - - result = mediatool._rename_columns(df, column_suffix="test") - expected_result = pd.DataFrame({"col1_test": [1, 2], "col2_test": [3, 4]}) - pd.testing.assert_frame_equal(result, expected_result) - - @pytest.mark.connect @patch("viadot.sources.mediatool.handle_api_response") def test_to_records_connection(mock_handle_api_response): From aaa9c521f4d5cafcd69ce16dc3c06d551fa78cdd Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Tue, 31 Dec 2024 14:56:03 +0100 Subject: [PATCH 04/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refactor=20old=20met?= =?UTF-8?q?hod=20`to=5Fdf`=20into=20`fetch=5Fand=5Ftransform`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 34 ++++++++++++++++----------------- tests/unit/test_mediatool.py | 11 +++-------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 44fbb512e..5c31690e9 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -62,8 +62,6 @@ def __init__( self.header = {"Authorization": f"Bearer {credentials.get('token')}"} self.user_id = user_id or credentials.get("user_id") - self.url_abbreviation = None - def _get_organizations( self, user_id: str | None = None, @@ -254,14 +252,16 @@ def _to_records( return self._get_media_types(media_type_ids=media_type_ids) return None - - return returned_data - - @add_viadot_metadata_columns - def to_df( + def fetch_and_transform( self, + endpoint: Literal[ + "organizations", "media_entries", "vehicles", "campaigns", "media_types" + ], if_empty: str = "warn", - **kwargs, + organization_id: str | None = None, + vehicle_ids: list[str] | None = None, + media_type_ids: list[str] | None = None, + columns: list[str] | None = None, ) -> pd.DataFrame: """Pandas Data Frame with the data in the Response object and metadata. @@ -272,15 +272,13 @@ def to_df( Returns: pd.Dataframe: The response data as a Pandas Data Frame plus viadot metadata. """ - data = kwargs.get("data", False) - column_suffix = kwargs.get("column_suffix", None) - columns = kwargs.get("columns", None) - - super().to_df(if_empty=if_empty) + records = self._to_records( + endpoint, organization_id, vehicle_ids, media_type_ids + ) - data_frame = pd.DataFrame.from_dict(data) + data_frame = pd.DataFrame.from_dict(records) # type: ignore - if column_suffix == "campaigns": + if endpoint == "campaigns": data_frame.replace( to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], value=["", ""], @@ -288,11 +286,11 @@ def to_df( inplace=True, ) - if column_suffix: + if endpoint: # Endpoint name is added to the end of the column name to make it unique. data_frame = data_frame.rename( columns={ - column_name: f"{column_name}_{column_suffix}" + column_name: f"{column_name}_{endpoint}" for column_name in data_frame.columns } ) @@ -314,7 +312,7 @@ def to_df( else: self.logger.info( "Successfully downloaded data from " - + f"the Mediatool API ({self.url_abbreviation})." + + f"the Mediatool API ({endpoint})." ) return data_frame diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py index a604f9ce0..ffd4249bf 100644 --- a/tests/unit/test_mediatool.py +++ b/tests/unit/test_mediatool.py @@ -131,7 +131,7 @@ def test_to_records_connection(mock_handle_api_response): @pytest.mark.functions @patch("viadot.sources.mediatool.handle_api_response") -def test_to_df(mock_handle_api_response): +def test_fetch_and_transform(mock_handle_api_response): """Test Mediatool `to_df` method.""" mock_response = MagicMock() mock_response.text = json.dumps({"mediaEntries": [{"_id": "1", "name": "Entry1"}]}) @@ -139,13 +139,8 @@ def test_to_df(mock_handle_api_response): mediatool = Mediatool(credentials=variables["credentials"]) - data = [{"_id": "1", "name": "Entry1"}] - result_df = mediatool.to_df(data=data, column_suffix="media_entries") - result_df.drop( - columns=["_viadot_source", "_viadot_downloaded_at_utc"], - inplace=True, - axis=1, - ) + result_df = mediatool.fetch_and_transform(endpoint="media_entries") + expected_result = pd.DataFrame( {"_id_media_entries": ["1"], "name_media_entries": ["Entry1"]} ) From 391bd169cc1713febdcd50c270282a8d38e1be4e Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Tue, 31 Dec 2024 15:04:23 +0100 Subject: [PATCH 05/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20Move=20`to=5Fdf`?= =?UTF-8?q?=20logic=20from=20Prefect=20task=20to=20the=20connector?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../orchestration/prefect/tasks/mediatool.py | 120 +--------------- src/viadot/sources/mediatool.py | 132 +++++++++++++++++- 2 files changed, 133 insertions(+), 119 deletions(-) diff --git a/src/viadot/orchestration/prefect/tasks/mediatool.py b/src/viadot/orchestration/prefect/tasks/mediatool.py index 270a035fe..aed69bd0d 100644 --- a/src/viadot/orchestration/prefect/tasks/mediatool.py +++ b/src/viadot/orchestration/prefect/tasks/mediatool.py @@ -1,12 +1,11 @@ """'mediatool.py'.""" import pandas as pd -from prefect import get_run_logger, task +from prefect import task from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError from viadot.orchestration.prefect.utils import get_credentials from viadot.sources import Mediatool -from viadot.utils import join_dfs @task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60) @@ -18,9 +17,6 @@ def mediatool_to_df( ) -> pd.DataFrame: """Task to download data from Mediatool API. - Data from different endpoints are retrieved and combined. A final object is created - containing data for all organizations from the list. - Args: config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. @@ -31,15 +27,9 @@ def mediatool_to_df( media_entries_columns (list[str], optional): Columns to get from media entries. Defaults to None. - Raises: - ValueError: 'organization_ids' argument is None, and one is mandatory. - ValueError: One 'organization_id' is not in organizations. - Returns: pd.DataFrame: The response data as a Pandas Data Frame. """ - logger = get_run_logger() - if not (azure_key_vault_secret or config_key): raise MissingSourceCredentialsError @@ -50,111 +40,5 @@ def mediatool_to_df( credentials=credentials, config_key=config_key, ) - # first method ORGANIZATIONS - method = "organizations" - organizations_data = mediatool.api_connection(get_data_from=method) - df_organizations = mediatool.to_df(data=organizations_data, column_suffix=method) - - if organization_ids is None: - message = "No organizations were defined." - raise ValueError(message) - - list_of_organizations_df = [] - for organization_id in organization_ids: - if organization_id in df_organizations["_id_organizations"].unique(): - logger.info(f"Downloading data for: {organization_id} ...") - - # extract media entries per organization - media_entries_data = mediatool.api_connection( - get_data_from="media_entries", - organization_id=organization_id, - ) - df_media_entries = mediatool.to_df( - data=media_entries_data, columns=media_entries_columns - ) - unique_vehicle_ids = df_media_entries["vehicleId"].unique() - unique_media_type_ids = df_media_entries["mediaTypeId"].unique() - - # extract vehicles - method = "vehicles" - vehicles_data = mediatool.api_connection( - get_data_from=method, - vehicle_ids=unique_vehicle_ids, - ) - df_vehicles = mediatool.to_df(data=vehicles_data, column_suffix=method) - - # extract campaigns - method = "campaigns" - campaigns_data = mediatool.api_connection( - get_data_from=method, - organization_id=organization_id, - ) - df_campaigns = mediatool.to_df(data=campaigns_data, column_suffix=method) - - # extract media types - method = "media_types" - media_types_data = mediatool.api_connection( - get_data_from=method, - media_type_ids=unique_media_type_ids, - ) - df_media_types = mediatool.to_df( - data=media_types_data, column_suffix=method - ) - - # join media entries & organizations - df_merged_entries_orgs = join_dfs( - df_left=df_media_entries, - df_right=df_organizations, - left_on="organizationId", - right_on="_id_organizations", - columns_from_right_df=[ - "_id_organizations", - "name_organizations", - "abbreviation_organizations", - ], - how="left", - ) - - # join the previous merge & campaigns - df_merged_campaigns = join_dfs( - df_left=df_merged_entries_orgs, - df_right=df_campaigns, - left_on="campaignId", - right_on="_id_campaigns", - columns_from_right_df=[ - "_id_campaigns", - "name_campaigns", - "conventionalName_campaigns", - ], - how="left", - ) - - # join the previous merge & vehicles - df_merged_vehicles = join_dfs( - df_left=df_merged_campaigns, - df_right=df_vehicles, - left_on="vehicleId", - right_on="_id_vehicles", - columns_from_right_df=["_id_vehicles", "name_vehicles"], - how="left", - ) - - # join the previous merge & media types - df_merged_media_types = join_dfs( - df_left=df_merged_vehicles, - df_right=df_media_types, - left_on="mediaTypeId", - right_on="_id_media_types", - columns_from_right_df=["_id_media_types", "name_media_types"], - how="left", - ) - - list_of_organizations_df.append(df_merged_media_types) - - else: - message = ( - f"Organization - {organization_id} not found in organizations list." - ) - raise ValueError(message) - return pd.concat(list_of_organizations_df) + return mediatool.to_df(organization_ids, media_entries_columns) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 5c31690e9..56d036e6e 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -1,6 +1,7 @@ """'mediatool.py'.""" import json +from typing import Literal import pandas as pd from pydantic import BaseModel @@ -8,7 +9,7 @@ from viadot.config import get_source_credentials from viadot.exceptions import APIError, CredentialError from viadot.sources.base import Source -from viadot.utils import add_viadot_metadata_columns, handle_api_response +from viadot.utils import add_viadot_metadata_columns, handle_api_response, join_dfs class MediatoolCredentials(BaseModel): @@ -316,3 +317,132 @@ def fetch_and_transform( ) return data_frame + + @add_viadot_metadata_columns + def to_df( + self, + organization_ids: list[str] | None = None, + media_entries_columns: list[str] | None = None, + if_empty: Literal["warn"] | Literal["skip"] | Literal["fail"] = "warn", + ) -> pd.DataFrame: + """Data from different endpoints of the Mediatool API are fetched, transformed + and combind.A final object is created containing data for all organizations + from the list + + Args: + organization_ids (list[str], optional): List of organization IDs. + Defaults to None. + media_entries_columns (list[str], optional): Columns to get from media entries. + Defaults to None. + if_empty (Literal[warn, skip, fail], optional): What to do if there is no + data. Defaults to "warn". + + Raises: + ValueError: Raised when no organizations are defined or an organization ID + is not found in the organizations list. + + Returns: + pd.DataFrame: DataFrame containing the combined data from the specified + endpoints. + """ + # first method ORGANIZATIONS + df_organizations = self.fetch_and_transform(endpoint="organizations") + + if organization_ids is None: + message = "No organizations were defined." + raise ValueError(message) + + list_of_organizations_df = [] + for organization_id in organization_ids: + if organization_id in df_organizations["_id_organizations"].unique(): + self.logger.info(f"Downloading data for: {organization_id} ...") + + # extract media entries per organization + df_media_entries = self.fetch_and_transform( + endpoint="media_entries", + organization_id=organization_id, + columns=media_entries_columns, + ) + + unique_vehicle_ids = df_media_entries["vehicleId"].unique() + unique_media_type_ids = df_media_entries["mediaTypeId"].unique() + + # extract vehicles + df_vehicles = self.fetch_and_transform( + endpoint="vehicles", vehicle_ids=unique_vehicle_ids + ) + + # extract campaigns + df_campaigns = self.fetch_and_transform( + endpoint="campaigns", + organization_id=organization_id, + ) + + # extract media types + df_media_types = self.fetch_and_transform( + endpoint="media_types", + media_type_ids=unique_media_type_ids, + ) + + # join media entries & organizations + df_merged_entries_orgs = join_dfs( + df_left=df_media_entries, + df_right=df_organizations, + left_on="organizationId", + right_on="_id_organizations", + columns_from_right_df=[ + "_id_organizations", + "name_organizations", + "abbreviation_organizations", + ], + how="left", + ) + + # join the previous merge & campaigns + df_merged_campaigns = join_dfs( + df_left=df_merged_entries_orgs, + df_right=df_campaigns, + left_on="campaignId", + right_on="_id_campaigns", + columns_from_right_df=[ + "_id_campaigns", + "name_campaigns", + "conventionalName_campaigns", + ], + how="left", + ) + + # join the previous merge & vehicles + df_merged_vehicles = join_dfs( + df_left=df_merged_campaigns, + df_right=df_vehicles, + left_on="vehicleId", + right_on="_id_vehicles", + columns_from_right_df=["_id_vehicles", "name_vehicles"], + how="left", + ) + + # join the previous merge & media types + df_merged_media_types = join_dfs( + df_left=df_merged_vehicles, + df_right=df_media_types, + left_on="mediaTypeId", + right_on="_id_media_types", + columns_from_right_df=["_id_media_types", "name_media_types"], + how="left", + ) + + list_of_organizations_df.append(df_merged_media_types) + + else: + message = ( + f"Organization - {organization_id} not found in organizations list." + ) + raise ValueError(message) + + df_final = pd.concat(list_of_organizations_df) + + if df_final.empty: + self._handle_if_empty(if_empty) + + return df_final From 0fcfa52a58e3185e9ddcbda5fadff4bd8e68c8b0 Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Tue, 31 Dec 2024 15:23:11 +0100 Subject: [PATCH 06/10] =?UTF-8?q?=F0=9F=8E=A8=20Fix=20typos,=20adj=20docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 23 +++++++++++-------- .../prefect/flows/test_mediatool.py | 2 +- tests/unit/test_mediatool.py | 2 +- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 56d036e6e..857deb76f 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -258,15 +258,20 @@ def fetch_and_transform( endpoint: Literal[ "organizations", "media_entries", "vehicles", "campaigns", "media_types" ], - if_empty: str = "warn", organization_id: str | None = None, vehicle_ids: list[str] | None = None, media_type_ids: list[str] | None = None, columns: list[str] | None = None, + if_empty: str = "warn", ) -> pd.DataFrame: """Pandas Data Frame with the data in the Response object and metadata. Args: + endpoint (Literal["organizations", "media_entries", "vehicles", "campaigns", + "media_types"]): The API endpoint to fetch data from. + organization_id (str, optional): Organization ID. Defaults to None. + vehicle_ids (list[str]): List of organization IDs. Defaults to None. + media_type_ids (list[str]): List of media type IDs. Defaults to None. if_empty (str, optional): What to do if a fetch produce no data. Defaults to "warn @@ -325,17 +330,15 @@ def to_df( media_entries_columns: list[str] | None = None, if_empty: Literal["warn"] | Literal["skip"] | Literal["fail"] = "warn", ) -> pd.DataFrame: - """Data from different endpoints of the Mediatool API are fetched, transformed - and combind.A final object is created containing data for all organizations - from the list + """Fetches, transforms, and combines data from Mediatool API endpoints. Args: - organization_ids (list[str], optional): List of organization IDs. - Defaults to None. - media_entries_columns (list[str], optional): Columns to get from media entries. - Defaults to None. - if_empty (Literal[warn, skip, fail], optional): What to do if there is no - data. Defaults to "warn". + organization_ids (list[str], optional): List of organization IDs. + Defaults to None. + media_entries_columns (list[str], optional): Columns to get from media + entries. Defaults to None. + if_empty (Literal[warn, skip, fail], optional): What to do if there is no + data. Defaults to "warn". Raises: ValueError: Raised when no organizations are defined or an organization ID diff --git a/tests/integration/orchestration/prefect/flows/test_mediatool.py b/tests/integration/orchestration/prefect/flows/test_mediatool.py index 2b3aff3b9..44be8b1f2 100644 --- a/tests/integration/orchestration/prefect/flows/test_mediatool.py +++ b/tests/integration/orchestration/prefect/flows/test_mediatool.py @@ -25,7 +25,7 @@ ] -def test_genesys_to_adls( +def test_mediatool_to_adls( VIADOT_TEST_MEDIATOOL_ADLS_AZURE_KEY_VAULT_SECRET, VIADOT_TEST_MEDIATOOL_ORG, VIADOT_TEST_MEDIATOOL_ADLS_PATH, diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py index ffd4249bf..d95d0f5da 100644 --- a/tests/unit/test_mediatool.py +++ b/tests/unit/test_mediatool.py @@ -116,7 +116,7 @@ def test_get_media_types(mock_handle_api_response): @pytest.mark.connect @patch("viadot.sources.mediatool.handle_api_response") -def test_to_records_connection(mock_handle_api_response): +def test_to_records(mock_handle_api_response): """Test Mediatool `_to_records` method.""" mock_response = MagicMock() mock_response.text = json.dumps(variables["organizations"]) From 406928bb5e0fdfc1b6505c8a4138bb759ee4b9a5 Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Fri, 3 Jan 2025 11:47:17 +0100 Subject: [PATCH 07/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20Reintroduce=20opt?= =?UTF-8?q?ional=20column=20suffix=20with=20`add=5Fendpoint=5Fsuffix`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 857deb76f..12be86637 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -263,6 +263,7 @@ def fetch_and_transform( media_type_ids: list[str] | None = None, columns: list[str] | None = None, if_empty: str = "warn", + add_endpoint_suffix: bool = True, ) -> pd.DataFrame: """Pandas Data Frame with the data in the Response object and metadata. @@ -274,6 +275,10 @@ def fetch_and_transform( media_type_ids (list[str]): List of media type IDs. Defaults to None. if_empty (str, optional): What to do if a fetch produce no data. Defaults to "warn + add_endpoint_suffix (bool, optional): If True, appends the endpoint name + to column names in the format {column_name}_{endpoint} to ensure + uniqueness in combined DataFrame from multiple endpoints. + Defaults to True. Returns: pd.Dataframe: The response data as a Pandas Data Frame plus viadot metadata. @@ -292,8 +297,7 @@ def fetch_and_transform( inplace=True, ) - if endpoint: - # Endpoint name is added to the end of the column name to make it unique. + if add_endpoint_suffix: data_frame = data_frame.rename( columns={ column_name: f"{column_name}_{endpoint}" @@ -348,13 +352,13 @@ def to_df( pd.DataFrame: DataFrame containing the combined data from the specified endpoints. """ - # first method ORGANIZATIONS - df_organizations = self.fetch_and_transform(endpoint="organizations") - if organization_ids is None: message = "No organizations were defined." raise ValueError(message) + # first method ORGANIZATIONS + df_organizations = self.fetch_and_transform(endpoint="organizations") + list_of_organizations_df = [] for organization_id in organization_ids: if organization_id in df_organizations["_id_organizations"].unique(): @@ -365,6 +369,7 @@ def to_df( endpoint="media_entries", organization_id=organization_id, columns=media_entries_columns, + add_endpoint_suffix=False, ) unique_vehicle_ids = df_media_entries["vehicleId"].unique() From 0d248c7602ebc2c8770894bf75fe2c63cf30da98 Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Fri, 3 Jan 2025 12:30:10 +0100 Subject: [PATCH 08/10] =?UTF-8?q?=E2=9C=85=20Add=20tests=20for=20`to=5Fdf`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/test_mediatool.py | 183 ++++++++++++++++++++++++++++++++++- 1 file changed, 179 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py index d95d0f5da..d5277e6d8 100644 --- a/tests/unit/test_mediatool.py +++ b/tests/unit/test_mediatool.py @@ -16,13 +16,57 @@ "organizations": { "organizations": [{"_id": "1", "name": "Org1", "abbreviation": "O1"}] }, - "media_entries": {"mediaEntries": [{"_id": "1", "name": "Entry1"}]}, + "media_entries": { + "mediaEntries": [ + { + "_id": "1", + "campaignId": "1", + "organizationId": "1", + "vehicleId": "1", + "mediaTypeId": "1", + "name": "Entry1", + "customCol1": "CustomValue1", + "customCol2": "CustomValue2", + } + ] + }, "vehicle": {"vehicle": {"_id": "1", "name": "Vehicle1"}}, - "campaigns": {"campaigns": [{"_id": "1", "name": "Campaign1"}]}, + "campaigns": { + "campaigns": [ + {"_id": "1", "name": "Campaign1", "conventionalName": "conventionalName1"} + ] + }, "media_types": {"mediaType": {"_id": "1", "name": "Type1", "type": "Type"}}, } +@pytest.fixture +def mock_multiple_api_responses(): + """Fixture to mock multiple API responses for Mediatool.to_df() method.""" + mock_organizations_response = MagicMock() + mock_organizations_response.text = json.dumps(variables["organizations"]) + + mock_media_entries_response = MagicMock() + mock_media_entries_response.text = json.dumps(variables["media_entries"]) + + mock_vehicle_response = MagicMock() + mock_vehicle_response.text = json.dumps(variables["vehicle"]) + + mock_campaigns_response = MagicMock() + mock_campaigns_response.text = json.dumps(variables["campaigns"]) + + mock_media_types_response = MagicMock() + mock_media_types_response.text = json.dumps(variables["media_types"]) + + return [ + mock_organizations_response, + mock_media_entries_response, + mock_vehicle_response, + mock_campaigns_response, + mock_media_types_response, + ] + + @pytest.mark.basic def test_mediatool_credentials(): """Test Mediatool credentials.""" @@ -65,7 +109,18 @@ def test_get_media_entries(mock_handle_api_response): mediatool = Mediatool(credentials=variables["credentials"]) result = mediatool._get_media_entries(organization_id="org_id") - expected_result = [{"_id": "1", "name": "Entry1"}] + expected_result = [ + { + "_id": "1", + "campaignId": "1", + "organizationId": "1", + "vehicleId": "1", + "mediaTypeId": "1", + "name": "Entry1", + "customCol1": "CustomValue1", + "customCol2": "CustomValue2", + } + ] assert result == expected_result @@ -95,7 +150,9 @@ def test_get_campaigns(mock_handle_api_response): mediatool = Mediatool(credentials=variables["credentials"]) result = mediatool._get_campaigns(organization_id="org_id") - expected_result = [{"_id": "1", "name": "Campaign1"}] + expected_result = [ + {"_id": "1", "name": "Campaign1", "conventionalName": "conventionalName1"} + ] assert result == expected_result @@ -145,3 +202,121 @@ def test_fetch_and_transform(mock_handle_api_response): {"_id_media_entries": ["1"], "name_media_entries": ["Entry1"]} ) pd.testing.assert_frame_equal(result_df, expected_result) + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_to_df(mock_handle_api_response, mock_multiple_api_responses): + """Test Mediatool `to_df` method.""" + mock_handle_api_response.side_effect = mock_multiple_api_responses + + mediatool = Mediatool(credentials=variables["credentials"]) + + result_df = mediatool.to_df(organization_ids=["1"]) + result_df.drop( + columns=["_viadot_source", "_viadot_downloaded_at_utc"], + inplace=True, + axis=1, + ) + expected_result = pd.DataFrame( + { + "_id": ["1"], + "campaignId": ["1"], + "organizationId": ["1"], + "vehicleId": ["1"], + "mediaTypeId": ["1"], + "name": ["Entry1"], + "customCol1": "CustomValue1", + "customCol2": "CustomValue2", + "_id_organizations": ["1"], + "name_organizations": ["Org1"], + "abbreviation_organizations": ["O1"], + "_id_campaigns": ["1"], + "name_campaigns": ["Campaign1"], + "conventionalName_campaigns": ["conventionalName1"], + "_id_vehicles": ["1"], + "name_vehicles": ["Vehicle1"], + "_id_media_types": ["1"], + "name_media_types": ["Type1"], + } + ) + + pd.testing.assert_frame_equal(result_df, expected_result) + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_to_df_media_entries_columns( + mock_handle_api_response, mock_multiple_api_responses +): + """Test Mediatool `to_df` method, with optional media_entries_columns.""" + mock_handle_api_response.side_effect = mock_multiple_api_responses + + mediatool = Mediatool(credentials=variables["credentials"]) + + result_df = mediatool.to_df( + organization_ids=["1"], + media_entries_columns=[ + "_id", + "campaignId", + "organizationId", + "vehicleId", + "mediaTypeId", + "name", + ], + ) + result_df.drop( + columns=["_viadot_source", "_viadot_downloaded_at_utc"], + inplace=True, + axis=1, + ) + expected_result = pd.DataFrame( + { + "_id": "1", + "campaignId": "1", + "organizationId": "1", + "vehicleId": "1", + "mediaTypeId": "1", + "name": "Entry1", + "_id_organizations": ["1"], + "name_organizations": ["Org1"], + "abbreviation_organizations": ["O1"], + "_id_campaigns": ["1"], + "name_campaigns": ["Campaign1"], + "conventionalName_campaigns": ["conventionalName1"], + "_id_vehicles": ["1"], + "name_vehicles": ["Vehicle1"], + "_id_media_types": ["1"], + "name_media_types": ["Type1"], + } + ) + + pd.testing.assert_frame_equal(result_df, expected_result) + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_to_df_organization_ids_none(mock_handle_api_response): + """Test Mediatool `to_df` method when `organization_ids` is None.""" + mediatool = Mediatool(credentials=variables["credentials"]) + mock_organizations_response = MagicMock() + mock_organizations_response.text = json.dumps(variables["organizations"]) + mock_handle_api_response.return_value = mock_organizations_response + + with pytest.raises(ValueError, match="No organizations were defined."): + mediatool.to_df(organization_ids=None) + + +@pytest.mark.functions +@patch("viadot.sources.mediatool.handle_api_response") +def test_to_df_invalid_organization_id(mock_handle_api_response): + """Test Mediatool `to_df` method with an invalid `organization_id`.""" + mock_organizations_response = MagicMock() + mock_organizations_response.text = json.dumps(variables["organizations"]) + mediatool = Mediatool(credentials=variables["credentials"]) + mock_handle_api_response.return_value = mock_organizations_response + + with pytest.raises( + ValueError, match="Organization - 2 not found in organizations list." + ): + mediatool.to_df(organization_ids=["2"]) From ba557aaa0ffd0e06989e22164a04242191a6864f Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Fri, 10 Jan 2025 15:29:32 +0100 Subject: [PATCH 09/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20Make=20`organizat?= =?UTF-8?q?ion=5Fids`=20a=20required=20parameter=20that=20must=20be=20a=20?= =?UTF-8?q?non-empty=20list.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 9 ++++----- tests/unit/test_mediatool.py | 10 ++++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 12be86637..73eea1dca 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -330,15 +330,14 @@ def fetch_and_transform( @add_viadot_metadata_columns def to_df( self, - organization_ids: list[str] | None = None, + organization_ids: list[str], media_entries_columns: list[str] | None = None, if_empty: Literal["warn"] | Literal["skip"] | Literal["fail"] = "warn", ) -> pd.DataFrame: """Fetches, transforms, and combines data from Mediatool API endpoints. Args: - organization_ids (list[str], optional): List of organization IDs. - Defaults to None. + organization_ids (list[str]): List of organization IDs. media_entries_columns (list[str], optional): Columns to get from media entries. Defaults to None. if_empty (Literal[warn, skip, fail], optional): What to do if there is no @@ -352,8 +351,8 @@ def to_df( pd.DataFrame: DataFrame containing the combined data from the specified endpoints. """ - if organization_ids is None: - message = "No organizations were defined." + if not organization_ids: + message = "'organization_ids' must be a non-empty list." raise ValueError(message) # first method ORGANIZATIONS diff --git a/tests/unit/test_mediatool.py b/tests/unit/test_mediatool.py index d5277e6d8..54b5dd633 100644 --- a/tests/unit/test_mediatool.py +++ b/tests/unit/test_mediatool.py @@ -296,15 +296,17 @@ def test_to_df_media_entries_columns( @pytest.mark.functions @patch("viadot.sources.mediatool.handle_api_response") -def test_to_df_organization_ids_none(mock_handle_api_response): - """Test Mediatool `to_df` method when `organization_ids` is None.""" +def test_to_df_organization_ids_empty_list(mock_handle_api_response): + """Test Mediatool `to_df` method when `organization_ids` is empty list.""" mediatool = Mediatool(credentials=variables["credentials"]) mock_organizations_response = MagicMock() mock_organizations_response.text = json.dumps(variables["organizations"]) mock_handle_api_response.return_value = mock_organizations_response - with pytest.raises(ValueError, match="No organizations were defined."): - mediatool.to_df(organization_ids=None) + with pytest.raises( + ValueError, match="'organization_ids' must be a non-empty list." + ): + mediatool.to_df(organization_ids=[]) @pytest.mark.functions From 9bbbbe547895a10b32b58a68ff30f6adfa6fef4b Mon Sep 17 00:00:00 2001 From: Maciej Gardzinski Date: Fri, 10 Jan 2025 15:34:55 +0100 Subject: [PATCH 10/10] =?UTF-8?q?=F0=9F=8E=A8=20Add=20missing=20`columns`?= =?UTF-8?q?=20docstring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/sources/mediatool.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/viadot/sources/mediatool.py b/src/viadot/sources/mediatool.py index 73eea1dca..5a129f988 100644 --- a/src/viadot/sources/mediatool.py +++ b/src/viadot/sources/mediatool.py @@ -273,6 +273,9 @@ def fetch_and_transform( organization_id (str, optional): Organization ID. Defaults to None. vehicle_ids (list[str]): List of organization IDs. Defaults to None. media_type_ids (list[str]): List of media type IDs. Defaults to None. + columns (list[str], optional): If provided, a list of column names to + include in the DataFrame.By default, all columns will be included. + Defaults to None. if_empty (str, optional): What to do if a fetch produce no data. Defaults to "warn add_endpoint_suffix (bool, optional): If True, appends the endpoint name