Skip to content

Commit

Permalink
refactor: deprecate usage of cursor.execute statements in favor of …
Browse files Browse the repository at this point in the history
…the in class implementation of `execute`.
  • Loading branch information
gmcrocetti committed Jan 22, 2025
1 parent 5efac82 commit ff41294
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from contextlib import (
ExitStack,
closing,
contextmanager,
)
from datetime import (
Expand Down Expand Up @@ -1651,10 +1652,18 @@ def run_transaction(self):

def execute(self, sql: str | Select | TextClause, params=None):
"""Simple passthrough to SQLAlchemy connectable"""
from sqlalchemy.exc import DBAPIError as SQLAlchemyDatabaseError

args = [] if params is None else [params]
if isinstance(sql, str):
return self.con.exec_driver_sql(sql, *args)
return self.con.execute(sql, *args)
execute_function = self.con.exec_driver_sql
else:
execute_function = self.con.execute

try:
return execute_function(sql, *args)
except SQLAlchemyDatabaseError as exc:
raise DatabaseError(f"Execution failed on sql '{sql}': {exc}") from exc

def read_table(
self,
Expand Down Expand Up @@ -2108,17 +2117,19 @@ def run_transaction(self):
self.con.commit()

def execute(self, sql: str | Select | TextClause, params=None):
from adbc_driver_manager import DatabaseError as ADBCDatabaseError

if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
except ADBCDatabaseError as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
except ADBCDatabaseError as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
Expand Down Expand Up @@ -2207,8 +2218,7 @@ def read_table(
else:
stmt = f"SELECT {select_list} FROM {table_name}"

with self.con.cursor() as cur:
cur.execute(stmt)
with closing(self.execute(stmt)) as cur:
pa_table = cur.fetch_arrow_table()
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)

Expand Down Expand Up @@ -2278,8 +2288,7 @@ def read_query(
if chunksize:
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")

with self.con.cursor() as cur:
cur.execute(sql)
with closing(self.execute(sql)) as cur:
pa_table = cur.fetch_arrow_table()
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)

Expand Down Expand Up @@ -2335,6 +2344,9 @@ def to_sql(
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""
from adbc_driver_manager import DatabaseError as ADBCDatabaseError
import pyarrow as pa

if index_label:
raise NotImplementedError(
"'index_label' is not implemented for ADBC drivers"
Expand Down Expand Up @@ -2364,22 +2376,25 @@ def to_sql(
if if_exists == "fail":
raise ValueError(f"Table '{table_name}' already exists.")
elif if_exists == "replace":
with self.con.cursor() as cur:
cur.execute(f"DROP TABLE {table_name}")
sql_statement = f"DROP TABLE {table_name}"
self.execute(sql_statement).close()
elif if_exists == "append":
mode = "append"

import pyarrow as pa

try:
tbl = pa.Table.from_pandas(frame, preserve_index=index)
except pa.ArrowNotImplementedError as exc:
raise ValueError("datatypes not supported") from exc

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
try:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
except ADBCDatabaseError as exc:
raise DatabaseError(
f"Failed to insert records on table={name} with {mode=}"
) from exc

self.con.commit()
return total_inserted
Expand Down Expand Up @@ -2496,9 +2511,9 @@ def sql_schema(self) -> str:
return str(";\n".join(self.table))

def _execute_create(self) -> None:
with self.pd_sql.run_transaction() as conn:
with self.pd_sql.run_transaction():
for stmt in self.table:
conn.execute(stmt)
self.pd_sql.execute(stmt).close()

def insert_statement(self, *, num_rows: int) -> str:
names = list(map(str, self.frame.columns))
Expand All @@ -2520,8 +2535,13 @@ def insert_statement(self, *, num_rows: int) -> str:
return insert_statement

def _execute_insert(self, conn, keys, data_iter) -> int:
from sqlite3 import DatabaseError as SQLiteDatabaseError

data_list = list(data_iter)
conn.executemany(self.insert_statement(num_rows=1), data_list)
try:
conn.executemany(self.insert_statement(num_rows=1), data_list)
except SQLiteDatabaseError as exc:
raise DatabaseError("Execution failed") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
Expand Down Expand Up @@ -2643,17 +2663,19 @@ def run_transaction(self):
cur.close()

def execute(self, sql: str | Select | TextClause, params=None):
from sqlite3 import DatabaseError as SQLiteDatabaseError

if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
except SQLiteDatabaseError as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
except SQLiteDatabaseError as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
Expand Down

0 comments on commit ff41294

Please sign in to comment.