-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnumpy_parser.pyx
185 lines (147 loc) · 5.72 KB
/
numpy_parser.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module provides an optional protocol parser that returns
NumPy arrays.
=============================================================================
This module should not be imported by any of the main python-driver modules,
as numpy is an optional dependency.
=============================================================================
"""
include "ioutils.pyx"
cimport cython
from libc.stdint cimport uint64_t, uint8_t
from cpython.ref cimport Py_INCREF, PyObject
from cassandra.bytesio cimport BytesIOReader
from cassandra.deserializers cimport Deserializer, from_binary
from cassandra.parsing cimport ParseDesc, ColumnParser, RowParser
from cassandra import cqltypes
from cassandra.util import is_little_endian
import numpy as np
cdef extern from "numpyFlags.h":
# Include 'numpyFlags.h' into the generated C code to disable the
# deprecated NumPy API
pass
cdef extern from "Python.h":
# An integer type large enough to hold a pointer
ctypedef uint64_t Py_uintptr_t
# Simple array descriptor, useful to parse rows into a NumPy array
ctypedef struct ArrDesc:
Py_uintptr_t buf_ptr
int stride # should be large enough as we allocate contiguous arrays
int is_object
Py_uintptr_t mask_ptr
arrDescDtype = np.dtype(
[ ('buf_ptr', np.uintp)
, ('stride', np.dtype('i'))
, ('is_object', np.dtype('i'))
, ('mask_ptr', np.uintp)
], align=True)
_cqltype_to_numpy = {
cqltypes.LongType: np.dtype('>i8'),
cqltypes.CounterColumnType: np.dtype('>i8'),
cqltypes.Int32Type: np.dtype('>i4'),
cqltypes.ShortType: np.dtype('>i2'),
cqltypes.FloatType: np.dtype('>f4'),
cqltypes.DoubleType: np.dtype('>f8'),
}
obj_dtype = np.dtype('O')
cdef uint8_t mask_true = 0x01
cdef class NumpyParser(ColumnParser):
"""Decode a ResultMessage into a bunch of NumPy arrays"""
cpdef parse_rows(self, BytesIOReader reader, ParseDesc desc):
cdef Py_ssize_t rowcount
cdef ArrDesc[::1] array_descs
cdef ArrDesc *arrs
rowcount = read_int(reader)
array_descs, arrays = make_arrays(desc, rowcount)
arrs = &array_descs[0]
_parse_rows(reader, desc, arrs, rowcount)
arrays = [make_native_byteorder(arr) for arr in arrays]
result = dict(zip(desc.colnames, arrays))
return result
cdef _parse_rows(BytesIOReader reader, ParseDesc desc,
ArrDesc *arrs, Py_ssize_t rowcount):
cdef Py_ssize_t i
for i in range(rowcount):
unpack_row(reader, desc, arrs)
### Helper functions to create NumPy arrays and array descriptors
def make_arrays(ParseDesc desc, array_size):
"""
Allocate arrays for each result column.
returns a tuple of (array_descs, arrays), where
'array_descs' describe the arrays for NativeRowParser and
'arrays' is a dict mapping column names to arrays
(e.g. this can be fed into pandas.DataFrame)
"""
array_descs = np.empty((desc.rowsize,), arrDescDtype)
arrays = []
for i, coltype in enumerate(desc.coltypes):
arr = make_array(coltype, array_size)
array_descs[i]['buf_ptr'] = arr.ctypes.data
array_descs[i]['stride'] = arr.strides[0]
array_descs[i]['is_object'] = arr.dtype is obj_dtype
try:
array_descs[i]['mask_ptr'] = arr.mask.ctypes.data
except AttributeError:
array_descs[i]['mask_ptr'] = 0
arrays.append(arr)
return array_descs, arrays
def make_array(coltype, array_size):
"""
Allocate a new NumPy array of the given column type and size.
"""
try:
a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype])
a.mask = np.zeros((array_size,), dtype=np.bool)
except KeyError:
a = np.empty((array_size,), dtype=obj_dtype)
return a
#### Parse rows into NumPy arrays
@cython.boundscheck(False)
@cython.wraparound(False)
cdef inline int unpack_row(
BytesIOReader reader, ParseDesc desc, ArrDesc *arrays) except -1:
cdef Buffer buf
cdef Py_ssize_t i, rowsize = desc.rowsize
cdef ArrDesc arr
cdef Deserializer deserializer
for i in range(rowsize):
get_buf(reader, &buf)
arr = arrays[i]
if arr.is_object:
deserializer = desc.deserializers[i]
val = from_binary(deserializer, &buf, desc.protocol_version)
Py_INCREF(val)
(<PyObject **> arr.buf_ptr)[0] = <PyObject *> val
elif buf.size >= 0:
memcpy(<char *> arr.buf_ptr, buf.ptr, buf.size)
else:
memcpy(<char *>arr.mask_ptr, &mask_true, 1)
# Update the pointer into the array for the next time
arrays[i].buf_ptr += arr.stride
arrays[i].mask_ptr += 1
return 0
def make_native_byteorder(arr):
"""
Make sure all values have a native endian in the NumPy arrays.
"""
if is_little_endian and not arr.dtype.kind == 'O':
# We have arrays in big-endian order. First swap the bytes
# into little endian order, and then update the numpy dtype
# accordingly (e.g. from '>i8' to '<i8')
#
# Ignore any object arrays of dtype('O')
return arr.byteswap().newbyteorder()
return arr