Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified the input param #6

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pg_bulk_loader"
version = "1.0.1"
version = "1.0.0"
authors = [
{ name="Aditya Jaroli", email="[email protected]" },
]
Expand Down
4 changes: 1 addition & 3 deletions src/pg_bulk_loader/batch/batch_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pandas as pd
import asyncio
from .pg_connection_detail import PgConnectionDetail
from ..utils.dataframe_utils import get_ranges
from ..utils.common_utils import get_ranges
from ..utils.time_it_decorator import time_it
from retry import retry

Expand Down Expand Up @@ -40,12 +40,10 @@ async def open_connection_pool(self):
async def close_connection_pool(self):
await self.pool.close()

@time_it
async def execute(self, data_df: pd.DataFrame, col_names: list = None):
"""
:param data_df: Data to be inserted
:param col_names: column(s) to be considered for insert from the data_df
:return: Boolean - indicating whether the insertion was successful or not
"""
try:
partition_ranges = get_ranges(data_df.shape[0], self.batch_size)
Expand Down
19 changes: 9 additions & 10 deletions src/pg_bulk_loader/batch/batch_insert_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import asyncio
from concurrent.futures import ProcessPoolExecutor
import math
import types


def __optimize_connection_pool_size(min_conn, total_data_size, batch_size):
Expand Down Expand Up @@ -62,20 +63,18 @@ async def run_with_generator(data_generator, batch_size, pg_conn_details, table_
@time_it
async def batch_insert_to_postgres(
pg_conn_details: PgConnectionDetail,
input_data,
table_name: str,
batch_size: int,
min_conn_pool_size: int = 5,
max_conn_pool_size: int = 10,
use_multi_process_for_create_index: bool = True,
drop_and_create_index: bool = True,
data_df: pd.DataFrame = None,
data_generator = None
drop_and_create_index: bool = True
):
"""
:param pg_conn_details: Instance of PgConnectionDetail class which contains postgres connection details
:param table_name: Name of the table
:param data_df: Data to be inserted
:param data_generator: Generator which generates pandas DataFrame
:param input_data: Data can be a pd.DataFrame | DataFrame Generator
:param batch_size: Number of records to insert at a time
:param min_conn_pool_size: Min PG connections created and saved in connection pool
:param max_conn_pool_size: Max PG connections created and saved in connection pool
Expand All @@ -84,7 +83,7 @@ async def batch_insert_to_postgres(
Note: Only non-pk indexes are dropped and re-created.
:return:
"""
if data_df is None and data_generator is None:
if input_data is None:
raise Exception("Data input cannot be empty!")

fast_load_hack = FastLoadHack(pg_conn_details=pg_conn_details, table_name=table_name)
Expand All @@ -95,11 +94,11 @@ async def batch_insert_to_postgres(
fast_load_hack.drop_indexes(list(indexes.keys()))

try:
if isinstance(data_df, pd.DataFrame) and not data_df.empty:
await run(data_df, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size)
else:
if isinstance(input_data, pd.DataFrame) and not input_data.empty:
await run(input_data, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size)
elif isinstance(input_data, types.GeneratorType):
await run_with_generator(
data_generator, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size
input_data, batch_size, pg_conn_details, table_name, min_conn_pool_size, max_conn_pool_size
)
except Exception as e:
raise e
Expand Down
19 changes: 9 additions & 10 deletions tests/unit/test_batch_insert_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ async def test_batch_insert_when_input_is_null(self):
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=None,
data_generator=None,
input_data=None,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand Down Expand Up @@ -55,7 +54,7 @@ async def test_batch_insert_when_exception_is_thrown(self, mock_run):
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand All @@ -74,7 +73,7 @@ async def test_batch_insert_when_table_does_not_have_indexes_and_drop_and_create
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand All @@ -94,7 +93,7 @@ async def test_batch_insert_when_table_does_not_have_indexes_and_drop_and_create
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand All @@ -115,7 +114,7 @@ async def test_batch_insert_when_table_have_indexes_and_drop_and_create_index_is
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand All @@ -136,7 +135,7 @@ async def test_batch_insert_when_table_have_indexes_and_drop_and_create_index_is
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand All @@ -157,7 +156,7 @@ async def test_batch_insert_when_table_have_indexes_and_drop_and_create_index_ha
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=5,
max_conn_pool_size=7,
Expand Down Expand Up @@ -185,7 +184,7 @@ async def test_batch_insert_when_conn_pool_has_less_connection_than_total_batche
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_df=input_df,
input_data=input_df,
batch_size=200,
min_conn_pool_size=2,
max_conn_pool_size=3,
Expand All @@ -205,7 +204,7 @@ async def test_batch_insert_when_input_is_given_as_data_generator(self):
await batch_insert_to_postgres(
pg_conn_details=self.pg_connection,
table_name="aop_dummy",
data_generator=input_df_generator,
input_data=input_df_generator,
batch_size=200,
min_conn_pool_size=2,
max_conn_pool_size=3,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_dataframe_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import pytest
import pandas as pd
from src.pg_bulk_loader.utils.dataframe_utils import partition_df, get_ranges
from src.pg_bulk_loader.utils.common_utils import partition_df, get_ranges


class TestDataFrameUtils(unittest.TestCase):
Expand Down