From 70436d801be3dc90d4cbb5782a703111af38a01a Mon Sep 17 00:00:00 2001 From: Aditya Jaroli Date: Sat, 13 Jan 2024 00:54:51 +0530 Subject: [PATCH] Simplified the input param --- pyproject.toml | 2 +- src/pg_bulk_loader/batch/batch_insert.py | 4 +--- .../batch/batch_insert_wrapper.py | 19 +++++++++---------- .../{dataframe_utils.py => common_utils.py} | 0 tests/unit/test_batch_insert_wrapper.py | 19 +++++++++---------- tests/unit/test_dataframe_utils.py | 2 +- 6 files changed, 21 insertions(+), 25 deletions(-) rename src/pg_bulk_loader/utils/{dataframe_utils.py => common_utils.py} (100%) diff --git a/pyproject.toml b/pyproject.toml index aff72ee..ac929a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pg_bulk_loader" -version = "1.0.1" +version = "1.0.0" authors = [ { name="Aditya Jaroli", email="adityajaroli@gmail.com" }, ] diff --git a/src/pg_bulk_loader/batch/batch_insert.py b/src/pg_bulk_loader/batch/batch_insert.py index 70db041..772f27f 100644 --- a/src/pg_bulk_loader/batch/batch_insert.py +++ b/src/pg_bulk_loader/batch/batch_insert.py @@ -2,7 +2,7 @@ import pandas as pd import asyncio from .pg_connection_detail import PgConnectionDetail -from ..utils.dataframe_utils import get_ranges +from ..utils.common_utils import get_ranges from ..utils.time_it_decorator import time_it from retry import retry @@ -40,12 +40,10 @@ async def open_connection_pool(self): async def close_connection_pool(self): await self.pool.close() - @time_it async def execute(self, data_df: pd.DataFrame, col_names: list = None): """ :param data_df: Data to be inserted :param col_names: column(s) to be considered for insert from the data_df - :return: Boolean - indicating whether the insertion was successful or not """ try: partition_ranges = get_ranges(data_df.shape[0], self.batch_size) diff --git a/src/pg_bulk_loader/batch/batch_insert_wrapper.py b/src/pg_bulk_loader/batch/batch_insert_wrapper.py index 1b5c36b..326421d 100644 --- a/src/pg_bulk_loader/batch/batch_insert_wrapper.py +++ b/src/pg_bulk_loader/batch/batch_insert_wrapper.py @@ -6,6 +6,7 @@ import asyncio from concurrent.futures import ProcessPoolExecutor import math +import types def __optimize_connection_pool_size(min_conn, total_data_size, batch_size): @@ -62,20 +63,18 @@ async def run_with_generator(data_generator, batch_size, pg_conn_details, table_ @time_it async def batch_insert_to_postgres( pg_conn_details: PgConnectionDetail, + input_data, table_name: str, batch_size: int, min_conn_pool_size: int = 5, max_conn_pool_size: int = 10, use_multi_process_for_create_index: bool = True, - drop_and_create_index: bool = True, - data_df: pd.DataFrame = None, - data_generator = None + drop_and_create_index: bool = True ): """ :param pg_conn_details: Instance of PgConnectionDetail class which contains postgres connection details :param table_name: Name of the table - :param data_df: Data to be inserted - :param data_generator: Generator which generates pandas DataFrame + :param input_data: Data can be a pd.DataFrame | DataFrame Generator :param batch_size: Number of records to insert at a time :param min_conn_pool_size: Min PG connections created and saved in connection pool :param max_conn_pool_size: Max PG connections created and saved in connection pool @@ -84,7 +83,7 @@ async def batch_insert_to_postgres( Note: Only non-pk indexes are dropped and re-created. :return: """ - if data_df is None and data_generator is None: + if input_data is None: raise Exception("Data input cannot be empty!") fast_load_hack = FastLoadHack(pg_conn_details=pg_conn_details, table_name=table_name) @@ -95,11 +94,11 @@ async def batch_insert_to_postgres( fast_load_hack.drop_indexes(list(indexes.keys())) try: - if isinstance(data_df, pd.DataFrame) and not data_df.empty: - await run(data_df, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size) - else: + if isinstance(input_data, pd.DataFrame) and not input_data.empty: + await run(input_data, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size) + elif isinstance(input_data, types.GeneratorType): await run_with_generator( - data_generator, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size + input_data, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size ) except Exception as e: raise e diff --git a/src/pg_bulk_loader/utils/dataframe_utils.py b/src/pg_bulk_loader/utils/common_utils.py similarity index 100% rename from src/pg_bulk_loader/utils/dataframe_utils.py rename to src/pg_bulk_loader/utils/common_utils.py diff --git a/tests/unit/test_batch_insert_wrapper.py b/tests/unit/test_batch_insert_wrapper.py index 56ee9a8..d280079 100644 --- a/tests/unit/test_batch_insert_wrapper.py +++ b/tests/unit/test_batch_insert_wrapper.py @@ -19,8 +19,7 @@ async def test_batch_insert_when_input_is_null(self): await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=None, - data_generator=None, + input_data=None, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -55,7 +54,7 @@ async def test_batch_insert_when_exception_is_thrown(self, mock_run): await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -74,7 +73,7 @@ async def test_batch_insert_when_table_does_not_have_indexes_and_drop_and_create await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -94,7 +93,7 @@ async def test_batch_insert_when_table_does_not_have_indexes_and_drop_and_create await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -115,7 +114,7 @@ async def test_batch_insert_when_table_have_indexes_and_drop_and_create_index_is await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -136,7 +135,7 @@ async def test_batch_insert_when_table_have_indexes_and_drop_and_create_index_is await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -157,7 +156,7 @@ async def test_batch_insert_when_table_have_indexes_and_drop_and_create_index_ha await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=5, max_conn_pool_size=7, @@ -185,7 +184,7 @@ async def test_batch_insert_when_conn_pool_has_less_connection_than_total_batche await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_df=input_df, + input_data=input_df, batch_size=200, min_conn_pool_size=2, max_conn_pool_size=3, @@ -205,7 +204,7 @@ async def test_batch_insert_when_input_is_given_as_data_generator(self): await batch_insert_to_postgres( pg_conn_details=self.pg_connection, table_name="aop_dummy", - data_generator=input_df_generator, + input_data=input_df_generator, batch_size=200, min_conn_pool_size=2, max_conn_pool_size=3, diff --git a/tests/unit/test_dataframe_utils.py b/tests/unit/test_dataframe_utils.py index 1f7f12b..68a7193 100644 --- a/tests/unit/test_dataframe_utils.py +++ b/tests/unit/test_dataframe_utils.py @@ -1,7 +1,7 @@ import unittest import pytest import pandas as pd -from src.pg_bulk_loader.utils.dataframe_utils import partition_df, get_ranges +from src.pg_bulk_loader.utils.common_utils import partition_df, get_ranges class TestDataFrameUtils(unittest.TestCase):