From ff41294c8a6595837a02c222cac6f17bf31e6674 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti <24530683+gmcrocetti@users.noreply.github.com> Date: Tue, 21 Jan 2025 14:23:35 -0300 Subject: [PATCH] refactor: deprecate usage of `cursor.execute` statements in favor of the in class implementation of `execute`. --- pandas/io/sql.py | 62 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 5652d7fab0c7c..8bddd637444d8 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -11,6 +11,7 @@ ) from contextlib import ( ExitStack, + closing, contextmanager, ) from datetime import ( @@ -1651,10 +1652,18 @@ def run_transaction(self): def execute(self, sql: str | Select | TextClause, params=None): """Simple passthrough to SQLAlchemy connectable""" + from sqlalchemy.exc import DBAPIError as SQLAlchemyDatabaseError + args = [] if params is None else [params] if isinstance(sql, str): - return self.con.exec_driver_sql(sql, *args) - return self.con.execute(sql, *args) + execute_function = self.con.exec_driver_sql + else: + execute_function = self.con.execute + + try: + return execute_function(sql, *args) + except SQLAlchemyDatabaseError as exc: + raise DatabaseError(f"Execution failed on sql '{sql}': {exc}") from exc def read_table( self, @@ -2108,6 +2117,8 @@ def run_transaction(self): self.con.commit() def execute(self, sql: str | Select | TextClause, params=None): + from adbc_driver_manager import DatabaseError as ADBCDatabaseError + if not isinstance(sql, str): raise TypeError("Query must be a string unless using sqlalchemy.") args = [] if params is None else [params] @@ -2115,10 +2126,10 @@ def execute(self, sql: str | Select | TextClause, params=None): try: cur.execute(sql, *args) return cur - except Exception as exc: + except ADBCDatabaseError as exc: try: self.con.rollback() - except Exception as inner_exc: # pragma: no cover + except ADBCDatabaseError as inner_exc: # pragma: no cover ex = DatabaseError( f"Execution failed on sql: {sql}\n{exc}\nunable to rollback" ) @@ -2207,8 +2218,7 @@ def read_table( else: stmt = f"SELECT {select_list} FROM {table_name}" - with self.con.cursor() as cur: - cur.execute(stmt) + with closing(self.execute(stmt)) as cur: pa_table = cur.fetch_arrow_table() df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend) @@ -2278,8 +2288,7 @@ def read_query( if chunksize: raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") - with self.con.cursor() as cur: - cur.execute(sql) + with closing(self.execute(sql)) as cur: pa_table = cur.fetch_arrow_table() df = arrow_table_to_pandas(pa_table, dtype_backend=dtype_backend) @@ -2335,6 +2344,9 @@ def to_sql( engine : {'auto', 'sqlalchemy'}, default 'auto' Raises NotImplementedError if not set to 'auto' """ + from adbc_driver_manager import DatabaseError as ADBCDatabaseError + import pyarrow as pa + if index_label: raise NotImplementedError( "'index_label' is not implemented for ADBC drivers" @@ -2364,22 +2376,25 @@ def to_sql( if if_exists == "fail": raise ValueError(f"Table '{table_name}' already exists.") elif if_exists == "replace": - with self.con.cursor() as cur: - cur.execute(f"DROP TABLE {table_name}") + sql_statement = f"DROP TABLE {table_name}" + self.execute(sql_statement).close() elif if_exists == "append": mode = "append" - import pyarrow as pa - try: tbl = pa.Table.from_pandas(frame, preserve_index=index) except pa.ArrowNotImplementedError as exc: raise ValueError("datatypes not supported") from exc with self.con.cursor() as cur: - total_inserted = cur.adbc_ingest( - table_name=name, data=tbl, mode=mode, db_schema_name=schema - ) + try: + total_inserted = cur.adbc_ingest( + table_name=name, data=tbl, mode=mode, db_schema_name=schema + ) + except ADBCDatabaseError as exc: + raise DatabaseError( + f"Failed to insert records on table={name} with {mode=}" + ) from exc self.con.commit() return total_inserted @@ -2496,9 +2511,9 @@ def sql_schema(self) -> str: return str(";\n".join(self.table)) def _execute_create(self) -> None: - with self.pd_sql.run_transaction() as conn: + with self.pd_sql.run_transaction(): for stmt in self.table: - conn.execute(stmt) + self.pd_sql.execute(stmt).close() def insert_statement(self, *, num_rows: int) -> str: names = list(map(str, self.frame.columns)) @@ -2520,8 +2535,13 @@ def insert_statement(self, *, num_rows: int) -> str: return insert_statement def _execute_insert(self, conn, keys, data_iter) -> int: + from sqlite3 import DatabaseError as SQLiteDatabaseError + data_list = list(data_iter) - conn.executemany(self.insert_statement(num_rows=1), data_list) + try: + conn.executemany(self.insert_statement(num_rows=1), data_list) + except SQLiteDatabaseError as exc: + raise DatabaseError("Execution failed") from exc return conn.rowcount def _execute_insert_multi(self, conn, keys, data_iter) -> int: @@ -2643,6 +2663,8 @@ def run_transaction(self): cur.close() def execute(self, sql: str | Select | TextClause, params=None): + from sqlite3 import DatabaseError as SQLiteDatabaseError + if not isinstance(sql, str): raise TypeError("Query must be a string unless using sqlalchemy.") args = [] if params is None else [params] @@ -2650,10 +2672,10 @@ def execute(self, sql: str | Select | TextClause, params=None): try: cur.execute(sql, *args) return cur - except Exception as exc: + except SQLiteDatabaseError as exc: try: self.con.rollback() - except Exception as inner_exc: # pragma: no cover + except SQLiteDatabaseError as inner_exc: # pragma: no cover ex = DatabaseError( f"Execution failed on sql: {sql}\n{exc}\nunable to rollback" )