Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: deprecate usage of cursor.execute statements in favor of the in class implementation of execute. #60748

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1651,10 +1651,18 @@ def run_transaction(self):

def execute(self, sql: str | Select | TextClause, params=None):
"""Simple passthrough to SQLAlchemy connectable"""
from sqlalchemy.exc import DBAPIError as SQLAlchemyDatabaseError

args = [] if params is None else [params]
if isinstance(sql, str):
return self.con.exec_driver_sql(sql, *args)
return self.con.execute(sql, *args)
execute_function = self.con.exec_driver_sql
else:
execute_function = self.con.execute

try:
return execute_function(sql, *args)
except SQLAlchemyDatabaseError as exc:
raise DatabaseError(f"Execution failed on sql '{sql}': {exc}") from exc

def read_table(
self,
Expand Down Expand Up @@ -2108,17 +2116,19 @@ def run_transaction(self):
self.con.commit()

def execute(self, sql: str | Select | TextClause, params=None):
from adbc_driver_manager import DatabaseError as ADBCDatabaseError

if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
except ADBCDatabaseError as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
except ADBCDatabaseError as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
Expand Down Expand Up @@ -2207,8 +2217,7 @@ def read_table(
else:
stmt = f"SELECT {select_list} FROM {table_name}"

with self.con.cursor() as cur:
cur.execute(stmt)
with self.execute(stmt) as cur:
pa_table = cur.fetch_arrow_table()
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)

Expand Down Expand Up @@ -2278,8 +2287,7 @@ def read_query(
if chunksize:
raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")

with self.con.cursor() as cur:
cur.execute(sql)
with self.execute(sql) as cur:
pa_table = cur.fetch_arrow_table()
df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend)

Expand Down Expand Up @@ -2335,6 +2343,9 @@ def to_sql(
engine : {'auto', 'sqlalchemy'}, default 'auto'
Raises NotImplementedError if not set to 'auto'
"""
from adbc_driver_manager import DatabaseError as ADBCDatabaseError
import pyarrow as pa

if index_label:
raise NotImplementedError(
"'index_label' is not implemented for ADBC drivers"
Expand Down Expand Up @@ -2364,22 +2375,25 @@ def to_sql(
if if_exists == "fail":
raise ValueError(f"Table '{table_name}' already exists.")
elif if_exists == "replace":
with self.con.cursor() as cur:
cur.execute(f"DROP TABLE {table_name}")
sql_statement = f"DROP TABLE {table_name}"
self.execute(sql_statement).close()
elif if_exists == "append":
mode = "append"

import pyarrow as pa

try:
tbl = pa.Table.from_pandas(frame, preserve_index=index)
except pa.ArrowNotImplementedError as exc:
raise ValueError("datatypes not supported") from exc

with self.con.cursor() as cur:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
try:
total_inserted = cur.adbc_ingest(
table_name=name, data=tbl, mode=mode, db_schema_name=schema
)
except ADBCDatabaseError as exc:
raise DatabaseError(
f"Failed to insert records on table={name} with {mode=}"
) from exc

self.con.commit()
return total_inserted
Expand Down Expand Up @@ -2496,9 +2510,9 @@ def sql_schema(self) -> str:
return str(";\n".join(self.table))

def _execute_create(self) -> None:
with self.pd_sql.run_transaction() as conn:
with self.pd_sql.run_transaction():
for stmt in self.table:
conn.execute(stmt)
self.pd_sql.execute(stmt).close()

def insert_statement(self, *, num_rows: int) -> str:
names = list(map(str, self.frame.columns))
Expand All @@ -2520,8 +2534,13 @@ def insert_statement(self, *, num_rows: int) -> str:
return insert_statement

def _execute_insert(self, conn, keys, data_iter) -> int:
from sqlite3 import DatabaseError as SQLiteDatabaseError

data_list = list(data_iter)
conn.executemany(self.insert_statement(num_rows=1), data_list)
try:
conn.executemany(self.insert_statement(num_rows=1), data_list)
except SQLiteDatabaseError as exc:
raise DatabaseError("Execution failed") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
Expand Down Expand Up @@ -2643,17 +2662,19 @@ def run_transaction(self):
cur.close()

def execute(self, sql: str | Select | TextClause, params=None):
from sqlite3 import DatabaseError as SQLiteDatabaseError

if not isinstance(sql, str):
raise TypeError("Query must be a string unless using sqlalchemy.")
args = [] if params is None else [params]
cur = self.con.cursor()
try:
cur.execute(sql, *args)
return cur
except Exception as exc:
except SQLiteDatabaseError as exc:
try:
self.con.rollback()
except Exception as inner_exc: # pragma: no cover
except SQLiteDatabaseError as inner_exc: # pragma: no cover
ex = DatabaseError(
f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
)
Expand Down