-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbcon_multi.py
201 lines (181 loc) · 7.07 KB
/
dbcon_multi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python
"""
Create and manage database connections, run SQL queries and extract results.
Can use either cx_Oracle or pyodbc to make the connection
v0.1 initial version
"""
from __future__ import print_function
import time
# Handling for either/or import situaiton as we
# don't necessarily need both pyodbc and cx_Oracle
# Also exception names we later want to handle depend on success
# of these imports
FAILED_IMPORTS = []
DB_EXCEPTIONS = []
try:
import pyodbc
except (ImportError, ModuleNotFoundError) as err:
FAILED_IMPORTS.append("pyodbc")
else:
DB_EXCEPTIONS.append(pyodbc.Error)
try:
import cx_Oracle
from cx_Oracle import DatabaseError
except (ImportError, ModuleNotFoundError) as err:
FAILED_IMPORTS.append("cx_Oracle")
else:
DB_EXCEPTIONS.append(DatabaseError)
if FAILED_IMPORTS == ["pyodbc", "cx_Oracle"]:
print("Critical Failure. Failed to import db module.")
class DbCon(object):
def __init__(self, username, password, database,
odbc_driver="", do_nothing=False):
"""
Create database connection using either pyodbc or cx_Oracle
(depending on odbc_driver param).
Has method for exectuing SQL query.
Result of query stored in self.results
Errors held in self.errors
Args:
username (str) - username
password (str) - password
database (str) - database name
odbc_driver (str) - Optional odbc driver name
e.g. "Oracle in Oraclient11g_home"
or "Oracle in Instantclient11_1"
If None/empty cx_Oracle connection will be
used instead of ODBC.
do_nothing (bool) - don't automatically make connection if true
"""
#Connection type:
if odbc_driver:
self.db_module = pyodbc
else:
self.db_module = cx_Oracle
#Holds error messages
self.errors = []
#Holds results
self.results = []
#Holds column headings
self.headings = []
#Execution time for query as date/time string (updated by self.runsql()
self.execution_time = ""
self.database = database
# Construct connection string
# ODBC connection string (depends on tnsnames.ora)
if odbc_driver:
self.constring = "Driver={%s};Dbq=%s;Uid=%s;Pwd=%s" % (odbc_driver, database, username, password)
##print(">>>", odbc_driver)
#cx_Oracle connection string
else:
# Direct connection - expects !<database name>,sid
# e.g. "!lh10xwbgmxq6h2j.cptix4mlxjrs.eu-west-2.rds.amazonaws.com,hub"
if database.startswith("!") and "," in database:
parts = database.split(",")
host = parts[0][1:].strip()
sid = parts[1].strip()
# Construct direct connection string
self.constring = username + "/" + password + "@" + host + ":1521/" + sid
# tns type connection string
# note don't really need these for simple connections like this as
# could use - self.cnxn = cx_Oracle.connect(username, password, database)
else:
self.constring = username + "/" + password + "@" + database
#Sometimes we might not want to automatically open the connection
if not do_nothing:
self.open()
def open(self):
"""Open and test database connection"""
#Try to make database connection using connection string
try:
self.cnxn = self.db_module.connect(self.constring)
# DatabaseError - cx_Oracle, pyodbc.Error - pyodbc
except (DatabaseError, pyodbc.Error) as err:
self.cnxn = None
self.errors.append(str(err))
def close(self):
"""If connection exists, close it"""
if self.cnxn:
self.cnxn.close()
def runsql(self, sql, params=()):
"""Execute SQL using current connection, retrieve results and
store in self.results but only if there's a current connection
Args:
sql (str) - sql to be executed
params - optional container of sql substitution parameters
"""
self.execution_time = time.strftime("%d-%b-%Y %H:%M:%S")
#Don't run if there's no connection
if not self.cnxn:
self.errors.append("Can't execute SQL because no connection.")
else:
self.results, self.headings, self.errors = self.execute(sql, params)
def execute(self, sql, params=()):
"""Execute sql using current connection and retrieve results
Args:
sql - sql to execute
params - optional subsitution parameters if format valid for cx_Oracle
Returns:
SQL query result (list of tuples)
Column headings (list of strings)
Error messages (list of strings)
"""
#Create cursor and execute SQL;
local_errors = []
headings = []
rows = []
cursor = self.cnxn.cursor()
try:
if not params:
cursor.execute(sql)
else:
cursor.execute(sql, params)
except Exception as err:
local_errors.append("Error on execution:" + str(err))
else:
#SQL exececution successful - retrieve results
try:
rows = cursor.fetchall()
except Exception as err:
local_errors.append("Error on fetching results:" + str(err))
else:
#Also capture column headings
headings = [d[0] for d in cursor.description]
return rows, headings, local_errors
def db_info(self):
"""Get some info from v$database
Return details if found
"""
if self.cnxn:
db_info_sql = "select name, db_unique_name, dbid, created from v$database"
rows, headings, errors = self.execute(db_info_sql)
if not errors:
result = {h:str(r) for h, r in zip(headings, rows[0])}
else:
result = errors
else:
result = "Can't retrieve details as no connection"
return result
# Example/test connection
if __name__ == "__main__":
database = "!lh10xwbgmxq6h2j.cptix4mlxjrs.eu-west-2.rds.amazonaws.com,hub"
username = input("Hub username:")
password = input("Hub password (will show!):")
# Query with no paramters
test_con = DbCon(username, password, database)
test_con.db_info()
print(test_con.results)
test_con.runsql("select sysdate from dual")
test_con.close()
print("Results:", test_con.results)
print("Errors:", test_con.errors)
print("finished")
# Query with parameter
test_con = DbCon(username, password, database)
test_con.db_info()
print(test_con.results)
test_con.runsql("select * from hub_processes where process_code = :pc", ["BEW01"])
test_con.close()
print("Results:", test_con.results)
print("Errors:", test_con.errors)
print("finished")