Skip to content

Commit

Permalink
Updated README file
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Jaroli authored and Aditya Jaroli committed Jan 6, 2024
1 parent 926a51a commit 10541b2
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 1 deletion.
152 changes: 151 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,152 @@
# pandas-to-postgres
A utility package to do bulk insertion faster from pandas dataframe to postgres table.

<h2>Overview</h2>
**pandas-to-postgres** is a utility package designed to facilitate faster bulk insertion from pandas DataFrame to a PostgreSQL table.

<h2>Purpose</h2>
This utility leverages the power of PostgreSQL in combination with Python to efficiently handle the bulk insertion of large datasets. The key features that contribute to its speed include:

1. Utilization of Postgres' copy command
2. Integration of Psycopg3's pipeline feature
3. Implementation of Python's coroutines
4. Harnessing the power of multiprocessing
5. Capability to drop indexes during insertion and recreate them in parallel

<h2>Usage</h2>
The utility provides the following useful functions and classes:
1. **batch_insert_to_postgres**
2. **batch_insert_to_postgres_with_multi_process**
3. **BatchInsert**

<h3>_**batch_insert_to_postgres()**_ function</h3>

- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
- `table_name`: Name of the table for bulk insertion.
- `data_df`: Data in the form of a pandas DataFrame.
- `batch_size`: Number of records to insert and commit at a time.
- `min_conn_pool_size`, `max_conn_pool_size`: Determine the number of PostgreSQL connections in the connection pool.
- `drop_and_create_index`: Set to True if indexes need to be dropped during insert and re-created once insertion is complete.
- `use_multi_process_for_create_index`: Set to True if indexes need to be re-created in parallel; otherwise, they will be created sequentially.

<h3>_**batch_insert_to_postgres_with_multi_process()**_ function</h3>

- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
- `table_name`: Name of the table for bulk insertion.
- `data_generator`: Python generator containing DataFrames.
- `batch_size`: Number of records to insert and commit at a time.
- `min_conn_pool_size`, `max_conn_pool_size`: Determine the number of PostgreSQL connections in the connection pool.
- `drop_and_create_index`: Set to True if indexes need to be dropped during insert and re-created once insertion is complete.
- `no_of_processes`: Specify the number of cores for multiprocessing; set to None for auto-assignment.

<h3>BatchInsert class</h3>
This class serves as the core logic for the utility and is wrapped by the first two utility functions. Users may find it useful if additional logic needs to be developed around the functionality or if a custom sequential or parallel computation logic is required.

Properties to create an instance of BatchInsert class:
- `batch_size`:Number of records to insert and commit at a time.
- `table_name`: Name of the table for bulk insertion.
- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
- `min_conn`, `max_conn`: Determine the number of PostgreSQL connections in the connection pool.

<h2>Examples:</h2>

1. Sequential insertion with a specified batch size:

```python
import pandas as pd
from src.batch.batch_insert_wrapper import batch_insert_to_postgres
from src.batch.pg_connection_detail import PgConnectionDetail

# Read data. Let's suppose below DataFrame has 20M records
input_data_df = pd.DataFrame()

# Create Postgres Connection Details object. This will help in creating and managing the database connections
pg_conn_details = PgConnectionDetail(
user="<postgres username>",
password="<postgres password>",
database="<postgres database>",
host="<host address to postgres server>",
port="<port>",
schema="<schema name where table exist>"
)

# Data will be inserted and committed in the batch of 2,50,000
await batch_insert_to_postgres(
pg_conn_details=pg_conn_details,
table_name="<table_name>",
data_df=input_data_df,
batch_size=250000,
min_conn_pool_size=20,
max_conn_pool_size=25,
use_multi_process_for_create_index=True,
drop_and_create_index=True
)
```

2. Sequential insertion without loading the entire dataset into memory:
```python
import pandas as pd
from src.batch.batch_insert import BatchInsert
from src.batch.pg_connection_detail import PgConnectionDetail

# Create Postgres Connection Details object. This will help in creating and managing the database connections
pg_conn_details = PgConnectionDetail(
user="<postgres username>",
password="<postgres password>",
database="<postgres database>",
host="<host address to postgres server>",
port="<port>",
schema="<schema name where table exist>"
)
batch_ = BatchInsert(
batch_size=250000,
table_name="<table_name>",
pg_conn_details=pg_conn_details,
min_conn=20,
max_conn=25
)
try:
# Open and create the connections in the connection pool
await batch_.open_connection_pool()

# Lets load only a chunk of 1M from the csv file of 20M
for input_df in pd.read_csv("file-name.csv", chunksize=1000000):
# This will partition the 1M data into 4 partitions of size 250000 each as the batch_size is 250000.
await batch_.execute(input_df)
finally:
# Close the connection pool
await batch_.close_connection_pool()
```

3. Parallel insertion using multiprocessing:

The below code uses 5 cores and processes 5M records parallely i.e. 1M on one core with 250000 records insertion at a time.

```python
import pandas as pd
from src.batch.batch_insert_wrapper import batch_insert_to_postgres_with_multi_process
from src.batch.pg_connection_detail import PgConnectionDetail

# Create Postgres Connection Details object. This will help in creating and managing the database connections
pg_conn_details = PgConnectionDetail(
user="<postgres username>",
password="<postgres password>",
database="<postgres database>",
host="<host address to postgres server>",
port="<port>",
schema="<schema name where table exist>"
)

df_generator = pd.read_csv("20M-file.csv", chunksize=1000000)

# Data will be inserted and committed in the batch of 2,50,000
await batch_insert_to_postgres_with_multi_process(
pg_conn_details=pg_conn_details,
table_name="<table_name>",
data_generator=df_generator,
batch_size=250000,
min_conn_pool_size=20,
max_conn_pool_size=25,
no_of_processes=5,
drop_and_create_index=True
)
```
2 changes: 2 additions & 0 deletions src/batch/batch_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ def __init__(
self.data_df = None
self.pool = self.pg_conn_details.create_connection_pool(min_size=self.min_conn, max_size=self.max_conn)

@retry(Exception, tries=3, delay=2, backoff=1)
async def open_connection_pool(self):
await self.pool.open(wait=True)

@retry(Exception, tries=3, delay=2, backoff=1)
async def close_connection_pool(self):
await self.pool.close()

Expand Down

0 comments on commit 10541b2

Please sign in to comment.