Skip to content

Commit

Permalink
pyarrow backend scanning for pandas
Browse files Browse the repository at this point in the history
formatting checks

remove disabled tests

more clang format fixes...

py lint check

clang tidy

more clang tidy and py lint checks

more and more clang tidy

explicit pyarrow scan ctor

possibly fixed tests not running?

CI fixes

fix pytest

non portable type resolution solution

apple clang test fix?

add some requested changes

apply backend switching

remove fixed list
  • Loading branch information
mxwli committed Mar 21, 2024
1 parent 752871d commit ab267a1
Show file tree
Hide file tree
Showing 22 changed files with 1,472 additions and 19 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ nodejs:
python:
$(call run-cmake-release, -DBUILD_PYTHON=TRUE)

python-debug:
$(call run-cmake-debug, -DBUILD_PYTHON=TRUE)

rust:
ifeq ($(OS),Windows_NT)
set KUZU_TESTING=1
Expand Down
5 changes: 4 additions & 1 deletion src/common/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
add_library(kuzu_common_arrow
OBJECT
arrow_array_scan.cpp
arrow_converter.cpp
arrow_null_mask_tree.cpp
arrow_row_batch.cpp
arrow_converter.cpp)
arrow_type.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_common_arrow>
Expand Down
544 changes: 544 additions & 0 deletions src/common/arrow/arrow_array_scan.cpp

Large diffs are not rendered by default.

214 changes: 214 additions & 0 deletions src/common/arrow/arrow_null_mask_tree.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#include <vector>

#include "common/arrow/arrow.h"
#include "common/arrow/arrow_nullmask_tree.h"
#include "common/null_mask.h"

namespace kuzu {
namespace common {

// scans are based on data specification found here
// https://arrow.apache.org/docs/format/Columnar.html

// all offsets are measured by value, not physical size

void ArrowNullMaskTree::copyToValueVector(ValueVector* vec, int64_t dstOffset, int64_t cnt) {
vec->setNullFromBits(mask->getData(), offset, dstOffset, cnt);
}

ArrowNullMaskTree ArrowNullMaskTree::operator+(int64_t offset) {
// this operation is mostly a special case for dictionary/run-end encoding
ArrowNullMaskTree ret(*this);
ret.offset += offset;
return ret;
}

bool ArrowNullMaskTree::copyFromBuffer(const void* buffer, int64_t srcOffset, int64_t cnt) {
if (buffer == nullptr) {
mask->setAllNonNull();
return false;
}
mask->copyFromNullBits((const uint64_t*)buffer, srcOffset, 0, cnt, true);
return true;
}

bool ArrowNullMaskTree::applyParentBitmap(const NullMask* parent, int64_t cnt) {
if (parent == nullptr) {
return false;
}
const uint64_t* buffer = parent->data;
if (buffer != nullptr) {
for (int64_t i = 0; i < (cnt >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2); i++) {
mask->buffer[i] |= buffer[i];
}
return true;
}
return false;
}

template<typename offsetsT>
void ArrowNullMaskTree::scanListPushDown(
const ArrowSchema* schema, const ArrowArray* array, int64_t srcOffset, int64_t cnt) {
const offsetsT* offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
offsetsT auxiliaryLength = offsets[cnt] - offsets[0];
NullMask pushDownMask((auxiliaryLength + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >>
NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2);
for (int64_t i = 0; i < cnt; i++) {
pushDownMask.setNullFromRange(offsets[i], offsets[i + 1] - offsets[i], isNull(i));
}
children->push_back(ArrowNullMaskTree(
schema->children[0], array->children[0], offsets[0], auxiliaryLength, &pushDownMask));
}

void ArrowNullMaskTree::scanStructPushDown(
const ArrowSchema* schema, const ArrowArray* array, int64_t srcOffset, int64_t cnt) {
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(
ArrowNullMaskTree(schema->children[i], array->children[i], srcOffset, cnt, mask.get()));
}
}

ArrowNullMaskTree::ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray* array,
int64_t srcOffset, int64_t cnt, const NullMask* parentBitmap)
: offset{0}, mask{std::make_shared<common::NullMask>(
(cnt + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >>
NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2)},
children(std::make_shared<std::vector<ArrowNullMaskTree>>()) {
if (schema->dictionary != nullptr) {
copyFromBuffer(array->buffers[0], srcOffset, cnt);
applyParentBitmap(parentBitmap, cnt);
dictionary = std::make_shared<ArrowNullMaskTree>(schema->dictionary, array->dictionary,
array->dictionary->offset, array->dictionary->length);
return;
}
const char* arrowType = schema->format;
std::vector<common::StructField> structFields;
switch (arrowType[0]) {
case 'n':
mask->setAllNull();
break;
case 'b':
case 'c':
case 'C':
case 's':
case 'S':
case 'i':
case 'I':
case 'l':
case 'L':
case 'f':
case 'g':
copyFromBuffer(array->buffers[0], srcOffset, cnt);
break;
case 'z':
case 'Z':
case 'u':
case 'U':
case 'v':
case 'w':
case 't':
copyFromBuffer(array->buffers[0], srcOffset, cnt);
applyParentBitmap(parentBitmap, cnt);
break;
case '+':
switch (arrowType[1]) {
case 'l':
copyFromBuffer(array->buffers[0], srcOffset, cnt);
applyParentBitmap(parentBitmap, cnt);
scanListPushDown<int32_t>(schema, array, srcOffset, cnt);
break;
case 'L':
copyFromBuffer(array->buffers[0], srcOffset, cnt);
applyParentBitmap(parentBitmap, cnt);
scanListPushDown<int64_t>(schema, array, srcOffset, cnt);
break;
case 'w':
// TODO manh: array null resolution
KU_UNREACHABLE;
copyFromBuffer(array->buffers[0], srcOffset, cnt);
applyParentBitmap(parentBitmap, cnt);
break;
case 's':
copyFromBuffer(array->buffers[0], srcOffset, cnt);
applyParentBitmap(parentBitmap, cnt);
scanStructPushDown(schema, array, srcOffset, cnt);
break;
case 'm':
// TODO maxwell bind map types
KU_UNREACHABLE;
case 'u': {
const int8_t* types = (const int8_t*)array->buffers[0];
if (schema->format[2] == 'd') {
const int32_t* offsets = (const int32_t*)array->buffers[1];
std::vector<int32_t> count(array->n_children), lowestOffsets(array->n_children);
std::vector<int32_t> highestOffsets(array->n_children);
for (int64_t i = srcOffset; i < srcOffset + cnt; i++) {
int32_t curOffset = offsets[i];
int32_t curType = types[i];
if (count[curType] == 0) {
lowestOffsets[curType] = curOffset;
}
highestOffsets[curType] = curOffset;
count[curType]++;
}
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i],
lowestOffsets[i], highestOffsets[i] - lowestOffsets[i]));
}
for (int64_t i = srcOffset; i < srcOffset + cnt; i++) {
int32_t curOffset = offsets[i];
int8_t curType = types[i];
mask->setNull(i, children->operator[](curType).isNull(curOffset));
}
} else {
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(
ArrowNullMaskTree(schema->children[i], array->children[i], srcOffset, cnt));
}
for (int64_t i = srcOffset; i < srcOffset + cnt; i++) {
int8_t curType = types[i];
mask->setNull(i, children->operator[](curType).isNull(i));
// this isn't specified in the arrow specification, but is it valid to
// compute this using a bitwise OR?
}
}
if (parentBitmap != nullptr) {
for (int64_t i = 0; i < cnt >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; i++) {
mask->buffer[i] |= parentBitmap->buffer[i];
}
}
} break;
case 'v':
// list views *suck*, especially when trying to write code that can support
// parallelization for this, we generate child NullMaskTrees on the fly, rather than
// attempt any precomputation
if (array->buffers[0] == nullptr) {
mask->setAllNonNull();
} else {
mask->copyFromNullBits((const uint64_t*)array->buffers[0], srcOffset, 0, cnt, true);
}
if (parentBitmap != nullptr) {
for (int64_t i = 0; i < cnt >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; i++) {
mask->buffer[i] |= parentBitmap->buffer[i];
}
}
break;
case 'r':
// it's better to resolve validity during the actual scanning for run-end encoded arrays
// so for this, let's just resolve child validities and move on
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i],
array->children[i]->offset, array->children[i]->length));
}
break;
default:
KU_UNREACHABLE;
}
break;
default:
KU_UNREACHABLE;
}
}

} // namespace common
} // namespace kuzu
150 changes: 150 additions & 0 deletions src/common/arrow/arrow_type.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include "common/arrow/arrow_converter.h"

#include "common/exception/not_implemented.h"
#include "common/exception/runtime.h"

namespace kuzu {
namespace common {

// pyarrow format string specifications can be found here
// https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings

LogicalType ArrowConverter::fromArrowSchema(const ArrowSchema* schema) {
const char* arrowType = schema->format;
std::vector<StructField> structFields;
// if we have a dictionary, then the logical type of the column is dependent upon the
// logical type of the dict
if (schema->dictionary != nullptr) {
return fromArrowSchema(schema->dictionary);
}
switch (arrowType[0]) {
case 'n':
return LogicalType(LogicalTypeID::ANY);
case 'b':
return LogicalType(LogicalTypeID::BOOL);
case 'c':
return LogicalType(LogicalTypeID::INT8);
case 'C':
return LogicalType(LogicalTypeID::UINT8);
case 's':
return LogicalType(LogicalTypeID::INT16);
case 'S':
return LogicalType(LogicalTypeID::UINT16);
case 'i':
return LogicalType(LogicalTypeID::INT32);
case 'I':
return LogicalType(LogicalTypeID::UINT32);
case 'l':
return LogicalType(LogicalTypeID::INT64);
case 'L':
return LogicalType(LogicalTypeID::UINT64);
case 'f':
return LogicalType(LogicalTypeID::FLOAT);
case 'g':
return LogicalType(LogicalTypeID::DOUBLE);
case 'z':
case 'Z':
return LogicalType(LogicalTypeID::BLOB);
case 'u':
case 'U':
return LogicalType(LogicalTypeID::STRING);
case 'v':
switch (arrowType[1]) {
case 'z':
return LogicalType(LogicalTypeID::BLOB);
case 'u':
return LogicalType(LogicalTypeID::STRING);
default:
KU_UNREACHABLE;
}

case 'd':
throw NotImplementedException("custom bitwidth decimals are not supported");
case 'w':
return LogicalType(LogicalTypeID::BLOB); // fixed width binary
case 't':
switch (arrowType[1]) {
case 'd':
if (arrowType[2] == 'D') {
return LogicalType(LogicalTypeID::DATE);
} else {
return LogicalType(LogicalTypeID::TIMESTAMP_MS);
}
case 't':
// TODO implement pure time type
throw NotImplementedException("Pure time types are not supported");
case 's':
// TODO maxwell: timezone support
switch (arrowType[2]) {
case 's':
return LogicalType(LogicalTypeID::TIMESTAMP_SEC);
case 'm':
return LogicalType(LogicalTypeID::TIMESTAMP_MS);
case 'u':
return LogicalType(LogicalTypeID::TIMESTAMP);
case 'n':
return LogicalType(LogicalTypeID::TIMESTAMP_NS);
default:
KU_UNREACHABLE;
}
case 'D':
// duration
case 'i':
// interval
return LogicalType(LogicalTypeID::INTERVAL);
default:
KU_UNREACHABLE;
}
case '+':
KU_ASSERT(schema->n_children > 0);
switch (arrowType[1]) {
// complex types need a complementary ExtraTypeInfo object
case 'l':
case 'L':
return *LogicalType::VAR_LIST(
std::make_unique<LogicalType>(fromArrowSchema(schema->children[0])));
case 'w':
throw RuntimeException("Fixed list is currently WIP.");
// TODO Manh: Array Binding
// return *LogicalType::FIXED_LIST(
// std::make_unique<LogicalType>(fromArrowSchema(schema->children[0])),
// std::stoi(arrowType+3));
case 's':
for (int64_t i = 0; i < schema->n_children; i++) {
structFields.emplace_back(std::string(schema->children[i]->name),
std::make_unique<LogicalType>(fromArrowSchema(schema->children[i])));
}
return *LogicalType::STRUCT(std::move(structFields));
case 'm':
// TODO maxwell bind map types
throw NotImplementedException("Scanning Arrow Map types is not supported");
case 'u':
throw RuntimeException("Unions are currently WIP.");
for (int64_t i = 0; i < schema->n_children; i++) {
structFields.emplace_back(std::string(schema->children[i]->name),
std::make_unique<LogicalType>(fromArrowSchema(schema->children[i])));
}
return *LogicalType::UNION(std::move(structFields));
case 'v':
switch (arrowType[2]) {
case 'l':
case 'L':
return *LogicalType::VAR_LIST(
std::make_unique<LogicalType>(fromArrowSchema(schema->children[0])));
default:
KU_UNREACHABLE;
}
case 'r':
// logical type corresponds to second child
return fromArrowSchema(schema->children[1]);
default:
KU_UNREACHABLE;
}
default:
KU_UNREACHABLE;
}
// refer to arrow_converted.cpp:65
}

} // namespace common
} // namespace kuzu
4 changes: 2 additions & 2 deletions src/common/null_mask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ void NullMask::resize(uint64_t capacity) {
}

bool NullMask::copyFromNullBits(const uint64_t* srcNullEntries, uint64_t srcOffset,
uint64_t dstOffset, uint64_t numBitsToCopy) {
if (copyNullMask(srcNullEntries, srcOffset, this->data, dstOffset, numBitsToCopy)) {
uint64_t dstOffset, uint64_t numBitsToCopy, bool invert) {
if (copyNullMask(srcNullEntries, srcOffset, this->data, dstOffset, numBitsToCopy, invert)) {
this->mayContainNulls = true;
return true;
}
Expand Down
Loading

0 comments on commit ab267a1

Please sign in to comment.