Skip to content

Commit

Permalink
Merge pull request #1 from adityajaroli/first-version_master
Browse files Browse the repository at this point in the history
First version to release which is tested on python 3.9/.10/.11 on macos
  • Loading branch information
adityajaroli authored Jan 7, 2024
2 parents 879107d + 3096fcd commit 5d94166
Show file tree
Hide file tree
Showing 25 changed files with 2,586 additions and 43 deletions.
43 changes: 1 addition & 42 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
schedule:
- cron: '31 7 * * 5'

jobs:
analyze:
name: Analyze
# Runner size impacts CodeQL analysis time. To learn more, please see:
# - https://gh.io/recommended-hardware-resources-for-running-codeql
# - https://gh.io/supported-runners-and-hardware-resources
# - https://gh.io/using-larger-runners
# Consider using larger runners for possible analysis time improvements.
runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }}
timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }}
permissions:
Expand All @@ -37,11 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
language: [ ]
# CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ]
# Use only 'java-kotlin' to analyze code written in Java, Kotlin or both
# Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
language: [python]

steps:
- name: Checkout repository
Expand All @@ -52,29 +30,10 @@ jobs:
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.

# For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality


# Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3

# ℹ️ Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun

# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.

# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
Expand Down
33 changes: 33 additions & 0 deletions .github/workflows/unit_test_execution.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: UnitTestExecution

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]

workflow_dispatch:

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ '3.9', '3.10', '3.11' ]
name: Testing on python version - ${{ matrix.python-version }}
steps:
- uses: actions/checkout@v3

- name: Setup Python
uses: actions/[email protected]
with:
python-version: ${{ matrix.python-version }}
architecture: x64

- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r test-requirements.txt
- name: Run unit test cases
run: coverage run --source=src.pg_bulk_loader --module pytest --verbose && coverage report --show-missing
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
.idea/

# C extensions
*.so
Expand Down
197 changes: 196 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,197 @@
# pandas-to-postgres
A utility package to do bulk insertion faster from pandas dataframe to postgres table.

<h2>Overview</h2>

**pandas-to-postgres** is a utility package designed to facilitate faster bulk insertion from pandas DataFrame to a PostgreSQL table.

<h2>Purpose</h2>

This utility leverages the power of PostgreSQL in combination with Python to efficiently handle the bulk insertion of large datasets. The key features that contribute to its speed include:

1. Utilization of Postgres' copy command
2. Integration of Psycopg3's pipeline feature
3. Implementation of Python's coroutines
4. Harnessing the power of multiprocessing
5. Capability to drop indexes during insertion and recreate them in parallel

<h2>Usage</h2>

The utility provides the following useful functions and classes:

1. **batch_insert_to_postgres**
2. **batch_insert_to_postgres_with_multi_process**
3. **BatchInsert**


<h3>batch_insert_to_postgres() function</h3>

- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
- `table_name`: Name of the table for bulk insertion.
- `data_df`: Data in the form of a pandas DataFrame.
- `batch_size`: Number of records to insert and commit at a time.
- `min_conn_pool_size`, `max_conn_pool_size`: Determine the number of PostgreSQL connections in the connection pool.
- `drop_and_create_index`: Set to True if indexes need to be dropped during insert and re-created once insertion is complete.
- `use_multi_process_for_create_index`: Set to True if indexes need to be re-created in parallel; otherwise, they will be created sequentially.

<h3>batch_insert_to_postgres_with_multi_process() function</h3>

- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
- `table_name`: Name of the table for bulk insertion.
- `data_generator`: Python generator containing DataFrames.
- `batch_size`: Number of records to insert and commit at a time.
- `min_conn_pool_size`, `max_conn_pool_size`: Determine the number of PostgreSQL connections in the connection pool.
- `drop_and_create_index`: Set to True if indexes need to be dropped during insert and re-created once insertion is complete.
- `no_of_processes`: Specify the number of cores for multiprocessing.

<h3>BatchInsert class</h3>
This class serves as the core logic for the utility and is wrapped by the first two utility functions. Users may find it useful if additional logic needs to be developed around the functionality or if a custom sequential or parallel computation logic is required.

Properties to create an instance of BatchInsert class:
- `batch_size`:Number of records to insert and commit at a time.
- `table_name`: Name of the table for bulk insertion.
- `pg_conn_details`: Instance of the PgConnectionDetail class containing PostgreSQL server connection details.
- `min_conn`, `max_conn`: Determine the number of PostgreSQL connections in the connection pool.

<h3>Developer Notes:</h3>

- The `min_conn` or `min_conn_pool_size` can be either equal to or less than the result of `ceil(total_data_size / batch_size)`.
- The `max_conn` or `max_conn_pool_size` can be either equal to or greater than the result of `ceil(total_data_size / batch_size)`.
- The `no_of_processes` can be set to the number of available cores or left as None for the system to determine the optimal number based on resource availability.
- The ideal `batch_size`, as observed during testing, typically falls within the range of 100,000 to 250,000. However, this recommendation is contingent upon the characteristics of the data and table structure.
The multiprocessing function execution must start in the __main__ block.

<h2>Package installation:</h2>
`pip install pg-bulk-loader`

<h2>Examples:</h2>

1. Loading entire dataset once and sending for bulk insert in batches:

```python
import pandas as pd
import asyncio
from pg_bulk_loader import PgConnectionDetail, batch_insert_to_postgres


async def run():
# Read data. Let's suppose below DataFrame has 20M records
input_data_df = pd.DataFrame()

# Create Postgres Connection Details object. This will help in creating and managing the database connections
pg_conn_details = PgConnectionDetail(
user="<postgres username>",
password="<postgres password>",
database="<postgres database>",
host="<host address to postgres server>",
port="<port>",
schema="<schema name where table exist>"
)

# Data will be inserted and committed in the batch of 2,50,000
await batch_insert_to_postgres(
pg_conn_details=pg_conn_details,
table_name="<table_name>",
data_df=input_data_df,
batch_size=250000,
min_conn_pool_size=20,
max_conn_pool_size=25,
use_multi_process_for_create_index=True,
drop_and_create_index=True
)


if __name__ == '__main__':
asyncio.run(run())
```

2. Loading dataset in chunks and sending for bulk insert in batches:

```python
import pandas as pd
import asyncio
from pg_bulk_loader import PgConnectionDetail, FastLoadHack, BatchInsert


async def run():
# Create Postgres Connection Details object. This will help in creating and managing the database connections
pg_conn_details = PgConnectionDetail(
user="<postgres username>",
password="<postgres password>",
database="<postgres database>",
host="<host address to postgres server>",
port="<port>",
schema="<schema name where table exist>"
)
batch_ = BatchInsert(
batch_size=250000,
table_name="<table_name>",
pg_conn_details=pg_conn_details,
min_conn=20,
max_conn=25
)

# If index needs to be dropped before insertion
fast_load_hack = FastLoadHack(pg_conn_details=pg_conn_details, table_name=table_name)
indexes: dict = fast_load_hack.get_indexes()
fast_load_hack.drop_indexes(list(indexes.keys()))

try:
# Open and create the connections in the connection pool
await batch_.open_connection_pool()

# Lets load only a chunk of 1M from the csv file of 20M
for input_df in pd.read_csv("file-name.csv", chunksize=1000000):
# This will partition the 1M data into 4 partitions of size 250000 each as the batch_size is 250000.
await batch_.execute(input_df)
finally:
# Close the connection pool
await batch_.close_connection_pool()
# Re-create indexes once insertion is done
fast_load_hack.create_indexes(list(indexes.values()), use_multi_process=True) # Set to True if indexes need to be created parallely


if __name__ == '__main__':
asyncio.run(run())

```

3. Parallel insertion using multiprocessing:

The below code uses 5 cores and processes 5M records parallely i.e. 1M on one core with 250000 records insertion at a time.

```python
import pandas as pd
import asyncio
from pg_bulk_loader import PgConnectionDetail, batch_insert_to_postgres_with_multi_process


async def run():
# Create Postgres Connection Details object. This will help in creating and managing the database connections
pg_conn_details = PgConnectionDetail(
user="<postgres username>",
password="<postgres password>",
database="<postgres database>",
host="<host address to postgres server>",
port="<port>",
schema="<schema name where table exist>"
)

df_generator = pd.read_csv("20M-file.csv", chunksize=1000000)

# Data will be inserted and committed in the batch of 2,50,000
await batch_insert_to_postgres_with_multi_process(
pg_conn_details=pg_conn_details,
table_name="<table_name>",
data_generator=df_generator,
batch_size=250000,
min_conn_pool_size=20,
max_conn_pool_size=25,
no_of_processes=5,
drop_and_create_index=True
)


# The multiprocessing execution must start in the __main__.
if __name__ == '__main__':
asyncio.run(run())
```
27 changes: 27 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[project]
name = "pg_bulk_loader"
version = "1.0.0"
authors = [
{ name="Aditya Jaroli", email="[email protected]" },
]
description = "A utility package to do bulk insertion faster from pandas dataframe to postgres table."
readme = "README.md"
requires-python = ">=3.9"
classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: Apache Software License"
]

dependencies = [
"pandas",
"psycopg[binary]",
"asyncio",
"psycopg_pool",
"retry"
]

[project.urls]
Homepage = "https://github.com/adityajaroli/pandas-to-postgres.git"
Issues = "https://github.com/adityajaroli/pandas-to-postgres/issues"
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pandas
psycopg[binary]
asyncio
psycopg_pool
retry
6 changes: 6 additions & 0 deletions src/pg_bulk_loader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .batch.batch_insert import BatchInsert
from .batch.batch_insert_wrapper import batch_insert_to_postgres
from .batch.batch_insert_wrapper import batch_insert_to_postgres_with_multi_process
from .batch.batch_insert_wrapper import run_batch_task
from .batch.fast_load_hack import FastLoadHack
from .batch.pg_connection_detail import PgConnectionDetail
Empty file.
Loading

0 comments on commit 5d94166

Please sign in to comment.