Skip to content

Commit

Permalink
Rework aiopg.sa.result module with Cython.
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiPashkin committed Mar 20, 2020
1 parent c83e6a5 commit c46a973
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
env:

install:
- pip install -U setuptools
- pip install -U setuptools cython
- python setup.py install
- pip install -Ur requirements.txt
- pip install codecov
Expand Down
239 changes: 123 additions & 116 deletions aiopg/sa/result.py → aiopg/sa/result.pyx
Original file line number Diff line number Diff line change
@@ -1,69 +1,82 @@
import weakref
from collections.abc import Mapping, Sequence

#cython: language_level=3, boundscheck=False, wraparound=False, nonecheck=False
from cpython cimport Py_XINCREF, PyObject
from cpython.tuple cimport PyTuple_SET_ITEM, PyTuple_GET_ITEM, PyTuple_GetItem
from cpython.dict cimport PyDict_GetItem
from cpython.list cimport PyList_New, PyList_SET_ITEM
from sqlalchemy.sql import expression, sqltypes

from . import exc


class RowProxy(Mapping):
__slots__ = ('_result_proxy', '_row', '_processors', '_keymap')
cdef class RowProxy:
cdef ResultMetaData _result_metadata
cdef tuple _row
cdef dict _key_to_index

def __init__(self, result_proxy, row, processors, keymap):
def __cinit__(self, result_metadata, row, key_to_index):
"""RowProxy objects are constructed by ResultProxy objects."""
self._result_proxy = result_proxy
self._result_metadata = result_metadata
self._row = row
self._processors = processors
self._keymap = keymap
self._key_to_index = key_to_index

def __iter__(self):
return iter(self._result_proxy.keys)
return iter(self._result_metadata.keys)

def __len__(self):
return len(self._row)

def __getitem__(self, key):
try:
processor, obj, index = self._keymap[key]
except KeyError:
processor, obj, index = self._result_proxy._key_fallback(key)
# Do we need slicing at all? RowProxy now is Mapping not Sequence
# except TypeError:
# if isinstance(key, slice):
# l = []
# for processor, value in zip(self._processors[key],
# self._row[key]):
# if processor is None:
# l.append(value)
# else:
# l.append(processor(value))
# return tuple(l)
# else:
# raise
cdef object _getitem(self, key):
cdef PyObject* item
cdef int index
cdef tuple data

item = PyDict_GetItem(self._key_to_index, key)
if item is NULL:
fallback_key = self._result_metadata._key_fallback(key)
if fallback_key is None:
raise KeyError(key)
item = PyDict_GetItem(self._key_to_index, fallback_key)
if item is NULL:
raise KeyError(key)
index = <object>item
if index is None:
raise exc.InvalidRequestError(
"Ambiguous column name '%s' in result set! "
"try 'use_labels' option on select statement." % key)
if processor is not None:
return processor(self._row[index])
else:
return self._row[index]
"Ambiguous column name %s in result set! "
"try 'use_labels' option on select statement." % repr(key)
)
item = PyTuple_GetItem(self._row, index)
if item is NULL:
return None
return <object>item

def __getitem__(self, item):
cdef PyObject* result

if isinstance(item, int):
result = PyTuple_GetItem(self._row, item)
if result is NULL:
raise KeyError(item)
return <object>result
return self._getitem(item)

def __getattr__(self, name):
cdef object _getattr(self, str name):
try:
return self[name]
return self._getitem(name)
except KeyError as e:
raise AttributeError(e.args[0])

def __getattr__(self, str item):
return self._getattr(item)

def __contains__(self, key):
return self._result_proxy._has_key(self._row, key)
return self._result_metadata._has_key(self._row, key)

__hash__ = None

def __eq__(self, other):
if isinstance(other, RowProxy):
if hasattr(other, 'as_tuple'):
return self.as_tuple() == other.as_tuple()
elif isinstance(other, Sequence):
elif hasattr(other, 'index') and hasattr(other, 'count'):
return self.as_tuple() == other
else:
return NotImplemented
Expand All @@ -78,19 +91,23 @@ def __repr__(self):
return repr(self.as_tuple())


class ResultMetaData(object):
cdef class ResultMetaData:
"""Handle cursor.description, applying additional info from an execution
context."""
cdef public list keys
cdef public dict _keymap
cdef public list _processors
cdef public dict _key_to_index

def __init__(self, result_proxy, cursor_description):
self._processors = processors = []

def __init__(self, ResultProxy result_proxy, cursor_description):
map_type, map_column_name = self.result_map(result_proxy._result_map)

# We do not strictly need to store the processor in the key mapping,
# though it is faster in the Python version (probably because of the
# saved attribute lookup self._processors)
self._keymap = keymap = {}
self._processors = processors = []
self._key_to_index = key_to_index = {}
self.keys = []
dialect = result_proxy.dialect

Expand All @@ -102,12 +119,11 @@ def __init__(self, result_proxy, cursor_description):
assert dialect.case_sensitive, \
"Doesn't support case insensitive database connection"

# high precedence key values.
primary_keymap = {}

assert not dialect.description_encoding, \
"psycopg in py3k should not use this"

ambiguous = []

for i, rec in enumerate(cursor_description):
colname = rec[0]
coltype = rec[1]
Expand All @@ -116,43 +132,29 @@ def __init__(self, result_proxy, cursor_description):
# if dialect.requires_name_normalize:
# colname = dialect.normalize_name(colname)

name, obj, type_ = (
map_column_name.get(colname, colname),
None,
map_type.get(colname, typemap.get(coltype, sqltypes.NULLTYPE))
name = str(map_column_name.get(colname, colname))
type_ = map_type.get(
colname,
typemap.get(coltype, sqltypes.NULLTYPE)
)

processor = type_._cached_result_processor(dialect, coltype)

processors.append(processor)
rec = (processor, obj, i)

# indexes as keys. This is only needed for the Python version of
# RowProxy (the C version uses a faster path for integer indexes).
primary_keymap[i] = rec
rec = (processor, i)

# populate primary keymap, looking for conflicts.
if primary_keymap.setdefault(name, rec) is not rec:
# place a record that doesn't have the "index" - this
# is interpreted later as an AmbiguousColumnError,
# but only when actually accessed. Columns
# colliding by name is not a problem if those names
# aren't used; integer access is always
# unambiguous.
primary_keymap[name] = rec = (None, obj, None)
if name in keymap:
ambiguous.append(name)
keymap[name] = rec
key_to_index[name] = i

self.keys.append(name)
if obj:
for o in obj:
keymap[o] = rec
# technically we should be doing this but we
# are saving on callcounts by not doing so.
# if keymap.setdefault(o, rec) is not rec:
# keymap[o] = (None, obj, None)

# overwrite keymap values with those of the
# high precedence keymap.
keymap.update(primary_keymap)

for name in ambiguous:
keymap[name] = (None, None)
key_to_index[name] = None

for processor, i in keymap.values():
if processor is not None:
processors.append((processor, i))

def result_map(self, data_map):
data_map = data_map or {}
Expand All @@ -166,50 +168,30 @@ def result_map(self, data_map):

return map_type, map_column_name

def _key_fallback(self, key, raiseerr=True):
def _key_fallback(self, key):
map = self._keymap
result = None
if isinstance(key, str):
result = map.get(key)
# fallback for targeting a ColumnElement to a textual expression
# this is a rare use case which only occurs when matching text()
# or colummn('name') constructs to ColumnElements, or after a
# pickle/unpickle roundtrip
elif isinstance(key, expression.ColumnElement):
if isinstance(key, expression.ColumnElement):
if (key._label and key._label in map):
result = map[key._label]
result = key._label
elif (hasattr(key, 'key') and key.key in map):
# match is only on name.
result = map[key.key]
# search extra hard to make sure this
# isn't a column/label name overlap.
# this check isn't currently available if the row
# was unpickled.
if result is not None and result[1] is not None:
for obj in result[1]:
if key._compare_name_for_result(obj):
break
else:
result = None
if result is None:
if raiseerr:
raise exc.NoSuchColumnError(
"Could not locate column in row for column '%s'" %
expression._string_or_unprintable(key))
else:
return None
else:
map[key] = result
result = key.key

return result

def _has_key(self, row, key):
if key in self._keymap:
return True
else:
return self._key_fallback(key, False) is not None
return self._key_fallback(key) in self._keymap


class ResultProxy:
cdef class ResultProxy:
"""Wraps a DB-API cursor object to provide easier access to row columns.
Individual columns may be accessed by their integer position,
Expand All @@ -228,15 +210,20 @@ class ResultProxy:
data using sqlalchemy TypeEngine objects, which are referenced from
the originating SQL statement that produced this result set.
"""

def __init__(self, connection, cursor, dialect, result_map=None):
cdef object _dialect
cdef public object _result_map
cdef object _cursor
cdef object _connection
cdef object _rowcount
cdef object _metadata

def __cinit__(self, connection, cursor, dialect, result_map=None):
self._dialect = dialect
self._result_map = result_map
self._cursor = cursor
self._connection = connection
self._rowcount = cursor.rowcount
self._metadata = None
self._weak = None
self._init_metadata()

@property
Expand Down Expand Up @@ -290,10 +277,8 @@ def _init_metadata(self):
cursor_description = self.cursor.description
if cursor_description is not None:
self._metadata = ResultMetaData(self, cursor_description)
self._weak = weakref.ref(self, lambda wr: self.cursor.close())
else:
self.close()
self._weak = None

@property
def returns_rows(self):
Expand Down Expand Up @@ -333,7 +318,9 @@ def close(self):
self.cursor.close()
# allow consistent errors
self._cursor = None
self._weak = None

def __del__(self):
self.close()

def __aiter__(self):
return self
Expand All @@ -353,13 +340,33 @@ def _non_result(self):
else:
raise exc.ResourceClosedError("This result object is closed.")

def _process_rows(self, rows):
process_row = RowProxy
cdef list _process_rows(self, list rows):
cdef int i
cdef list results

results = PyList_New(len(rows))
metadata = self._metadata
keymap = metadata._keymap
processors = metadata._processors
return [process_row(metadata, row, processors, keymap)
for row in rows]
key_to_index = metadata._key_to_index
processors = self._metadata._processors
i = 0
for row in rows:
for processor, index in processors:
value = <object>PyTuple_GET_ITEM(row, index)
if value is not None:
value = processor(value)
Py_XINCREF(<PyObject*>value)

PyTuple_SET_ITEM(row, index, value)
row_proxy = RowProxy(
metadata,
row,
key_to_index
)
Py_XINCREF(<PyObject*>row_proxy)
check = PyList_SET_ITEM(results, i, row_proxy)
i += 1

return results

async def fetchall(self):
"""Fetch all rows, just like DB-API cursor.fetchall()."""
Expand Down
Loading

0 comments on commit c46a973

Please sign in to comment.