Skip to content

Commit

Permalink
moving buffer creation from ETL scripts to Postgres utility script
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkhamLee committed Feb 26, 2024
1 parent 12ba677 commit 927b84d
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions etl_pipelines/etl_library/postgres_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# utilities for writing data to PostgreSQL

import psycopg2
from io import StringIO
from etl_library.logging_util import logger # noqa: E402


Expand Down Expand Up @@ -39,12 +40,33 @@ def clear_table(connection: object, table: str):
cursor.execute(delete_string)
connection.commit()
logger.info('Postgres Table cleared succesfully')
return 0

except (Exception, psycopg2.DatabaseError) as error:
logger.debug(f'Table clearing operation failed with error: {error}') # noqa: E501
return error

# strict enforcement of what columns are used ensures data quality
# avoids issues where tab delimiting can create erroneous empty columns
# in the data frame
@staticmethod
def prepare_payload(payload: object) -> object:

# get dataframe columns for managing data quality
columns = list(payload.columns)

buffer = StringIO()

# explicit column definitions + tab as the delimiter allow us to ingest
# text data with punctuation without having situations where a comma
# in a sentence is treated as new column or causes a blank column to be
# created.
payload.to_csv(buffer, index=False, sep='\t', columns=columns,
header=False)
buffer.seek(0)

return buffer

# this method is for instances where the buffer has already been prepared
# and the data is ready to be written to PostgreSQL
@staticmethod
def write_data(connection: object, buffer: object, table: str):

Expand All @@ -62,3 +84,26 @@ def write_data(connection: object, buffer: object, table: str):
cursor.close()
logger.debug(f'PostgresDB write failed with error: {error}')
return error

# sending over a data frame or csv - still need to create a buffer object
@staticmethod
def write_data_raw(connection: object, data: object, table: str):

# count rows
row_count = len(data)

# prepare payload
buffer = PostgresUtilities.prepare_payload(data)

cursor = connection.cursor()

try:
cursor.copy_from(buffer, table, sep="\t")
connection.commit()
cursor.close()
logger.info(f"{row_count} rows successfully written to Postgres")

except (Exception, psycopg2.DatabaseError) as error:
connection.rollback()
cursor.close()
logger.debug(f'PostgresDB write failed with error: {error}')

0 comments on commit 927b84d

Please sign in to comment.