From 10541b2196a178142c19c3995a90b5bdc2ee5959 Mon Sep 17 00:00:00 2001 From: Aditya Jaroli Date: Sun, 7 Jan 2024 03:55:00 +0530 Subject: [PATCH] Updated README file --- README.md | 152 +++++++++++++++++++++++++++++++++++++- src/batch/batch_insert.py | 2 + 2 files changed, 153 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d7754c6..694e6ed 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,152 @@ # pandas-to-postgres -A utility package to do bulk insertion faster from pandas dataframe to postgres table. + +

Overview

+**pandas-to-postgres** is a utility package designed to facilitate faster bulk insertion from pandas DataFrame to a PostgreSQL table. + +

Purpose

+This utility leverages the power of PostgreSQL in combination with Python to efficiently handle the bulk insertion of large datasets. The key features that contribute to its speed include: + +1. Utilization of Postgres' copy command +2. Integration of Psycopg3's pipeline feature +3. Implementation of Python's coroutines +4. Harnessing the power of multiprocessing +5. Capability to drop indexes during insertion and recreate them in parallel + +

Usage

+The utility provides the following useful functions and classes: +1. **batch_insert_to_postgres** +2. **batch_insert_to_postgres_with_multi_process** +3. **BatchInsert** + +

_**batch_insert_to_postgres()**_ function

+ +- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details. +- `table_name`: Name of the table for bulk insertion. +- `data_df`: Data in the form of a pandas DataFrame. +- `batch_size`: Number of records to insert and commit at a time. +- `min_conn_pool_size`, `max_conn_pool_size`: Determine the number of PostgreSQL connections in the connection pool. +- `drop_and_create_index`: Set to True if indexes need to be dropped during insert and re-created once insertion is complete. +- `use_multi_process_for_create_index`: Set to True if indexes need to be re-created in parallel; otherwise, they will be created sequentially. + +

_**batch_insert_to_postgres_with_multi_process()**_ function

+ +- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details. +- `table_name`: Name of the table for bulk insertion. +- `data_generator`: Python generator containing DataFrames. +- `batch_size`: Number of records to insert and commit at a time. +- `min_conn_pool_size`, `max_conn_pool_size`: Determine the number of PostgreSQL connections in the connection pool. +- `drop_and_create_index`: Set to True if indexes need to be dropped during insert and re-created once insertion is complete. +- `no_of_processes`: Specify the number of cores for multiprocessing; set to None for auto-assignment. + +

BatchInsert class

+This class serves as the core logic for the utility and is wrapped by the first two utility functions. Users may find it useful if additional logic needs to be developed around the functionality or if a custom sequential or parallel computation logic is required. + +Properties to create an instance of BatchInsert class: +- `batch_size`:Number of records to insert and commit at a time. +- `table_name`: Name of the table for bulk insertion. +- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details. +- `min_conn`, `max_conn`: Determine the number of PostgreSQL connections in the connection pool. + +

Examples:

+ +1. Sequential insertion with a specified batch size: + +```python +import pandas as pd +from src.batch.batch_insert_wrapper import batch_insert_to_postgres +from src.batch.pg_connection_detail import PgConnectionDetail + +# Read data. Let's suppose below DataFrame has 20M records +input_data_df = pd.DataFrame() + +# Create Postgres Connection Details object. This will help in creating and managing the database connections +pg_conn_details = PgConnectionDetail( + user="", + password="", + database="", + host="", + port="", + schema="" +) + +# Data will be inserted and committed in the batch of 2,50,000 +await batch_insert_to_postgres( + pg_conn_details=pg_conn_details, + table_name="", + data_df=input_data_df, + batch_size=250000, + min_conn_pool_size=20, + max_conn_pool_size=25, + use_multi_process_for_create_index=True, + drop_and_create_index=True +) +``` + +2. Sequential insertion without loading the entire dataset into memory: +```python +import pandas as pd +from src.batch.batch_insert import BatchInsert +from src.batch.pg_connection_detail import PgConnectionDetail + +# Create Postgres Connection Details object. This will help in creating and managing the database connections +pg_conn_details = PgConnectionDetail( + user="", + password="", + database="", + host="", + port="", + schema="" +) +batch_ = BatchInsert( + batch_size=250000, + table_name="", + pg_conn_details=pg_conn_details, + min_conn=20, + max_conn=25 +) +try: + # Open and create the connections in the connection pool + await batch_.open_connection_pool() + + # Lets load only a chunk of 1M from the csv file of 20M + for input_df in pd.read_csv("file-name.csv", chunksize=1000000): + # This will partition the 1M data into 4 partitions of size 250000 each as the batch_size is 250000. + await batch_.execute(input_df) +finally: + # Close the connection pool + await batch_.close_connection_pool() +``` + +3. Parallel insertion using multiprocessing: + +The below code uses 5 cores and processes 5M records parallely i.e. 1M on one core with 250000 records insertion at a time. + +```python +import pandas as pd +from src.batch.batch_insert_wrapper import batch_insert_to_postgres_with_multi_process +from src.batch.pg_connection_detail import PgConnectionDetail + +# Create Postgres Connection Details object. This will help in creating and managing the database connections +pg_conn_details = PgConnectionDetail( + user="", + password="", + database="", + host="", + port="", + schema="" +) + +df_generator = pd.read_csv("20M-file.csv", chunksize=1000000) + +# Data will be inserted and committed in the batch of 2,50,000 +await batch_insert_to_postgres_with_multi_process( + pg_conn_details=pg_conn_details, + table_name="", + data_generator=df_generator, + batch_size=250000, + min_conn_pool_size=20, + max_conn_pool_size=25, + no_of_processes=5, + drop_and_create_index=True +) +``` \ No newline at end of file diff --git a/src/batch/batch_insert.py b/src/batch/batch_insert.py index c6e0d1f..999ea86 100644 --- a/src/batch/batch_insert.py +++ b/src/batch/batch_insert.py @@ -32,9 +32,11 @@ def __init__( self.data_df = None self.pool = self.pg_conn_details.create_connection_pool(min_size=self.min_conn, max_size=self.max_conn) + @retry(Exception, tries=3, delay=2, backoff=1) async def open_connection_pool(self): await self.pool.open(wait=True) + @retry(Exception, tries=3, delay=2, backoff=1) async def close_connection_pool(self): await self.pool.close()