diff --git a/CMakeLists.txt b/CMakeLists.txt index 79a6044af36..a23a4b08af6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -196,7 +196,7 @@ endif () if(${BUILD_KUZU}) add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}") add_definitions(-DKUZU_CMAKE_VERSION="${CMAKE_PROJECT_VERSION}") -add_definitions(-DKUZU_EXTENSION_VERSION="0.2.0") +add_definitions(-DKUZU_EXTENSION_VERSION="0.2.2") include_directories(src/include) include_directories(third_party/antlr4_cypher/include) diff --git a/src/binder/bind/bind_copy.cpp b/src/binder/bind/bind_copy.cpp index 53816a96f50..3f605f5804b 100644 --- a/src/binder/bind/bind_copy.cpp +++ b/src/binder/bind/bind_copy.cpp @@ -6,7 +6,6 @@ #include "catalog/catalog_entry/rel_table_catalog_entry.h" #include "common/enums/table_type.h" #include "common/exception/binder.h" -#include "common/exception/message.h" #include "common/string_format.h" #include "function/table/bind_input.h" #include "main/client_context.h" diff --git a/src/binder/bind_expression/bind_variable_expression.cpp b/src/binder/bind_expression/bind_variable_expression.cpp index d80db3a5e35..bfb3914634e 100644 --- a/src/binder/bind_expression/bind_variable_expression.cpp +++ b/src/binder/bind_expression/bind_variable_expression.cpp @@ -1,5 +1,4 @@ #include "binder/binder.h" -#include "binder/expression/literal_expression.h" #include "binder/expression/variable_expression.h" #include "binder/expression_binder.h" #include "common/exception/binder.h" diff --git a/src/common/arrow/CMakeLists.txt b/src/common/arrow/CMakeLists.txt index c3281117ebf..34f81286f20 100644 --- a/src/common/arrow/CMakeLists.txt +++ b/src/common/arrow/CMakeLists.txt @@ -1,7 +1,10 @@ add_library(kuzu_common_arrow OBJECT + arrow_array_scan.cpp + arrow_converter.cpp + arrow_null_mask_tree.cpp arrow_row_batch.cpp - arrow_converter.cpp) + arrow_type.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/common/arrow/arrow_array_scan.cpp b/src/common/arrow/arrow_array_scan.cpp new file mode 100644 index 00000000000..556cf2a24c2 --- /dev/null +++ b/src/common/arrow/arrow_array_scan.cpp @@ -0,0 +1,549 @@ +#include "common/arrow/arrow_converter.h" +#include "common/types/interval_t.h" +#include "common/types/types.h" +#include "common/vector/value_vector.h" + +namespace kuzu { +namespace common { + +// scans are based on data specification found here +// https://arrow.apache.org/docs/format/Columnar.html + +// all offsets are measured by value, not physical size + +template +static void scanArrowArrayFixedSizePrimitive(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto arrayBuffer = (const uint8_t*)array->buffers[1]; + mask->copyToValueVector(&outputVector, dstOffset, count); + memcpy(outputVector.getData() + dstOffset * outputVector.getNumBytesPerValue(), + arrayBuffer + srcOffset * sizeof(T), count * sizeof(T)); +} + +template<> +void scanArrowArrayFixedSizePrimitive(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto arrayBuffer = (const uint8_t*)array->buffers[1]; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + outputVector.setValue( + i + dstOffset, NullMask::isNull((const uint64_t*)arrayBuffer, i + srcOffset)); + } +} + +static void scanArrowArrayDurationScaledUp(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curValue = arrayBuffer[i]; + outputVector.setValue( + i + dstOffset, interval_t(0, 0, curValue * scaleFactor)); + } + } +} + +static void scanArrowArrayDurationScaledDown(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curValue = arrayBuffer[i]; + outputVector.setValue( + i + dstOffset, interval_t(0, 0, curValue / scaleFactor)); + } + } +} + +static void scanArrowArrayMonthInterval(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto arrayBuffer = ((const int32_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curValue = arrayBuffer[i]; + outputVector.setValue(i + dstOffset, interval_t(curValue, 0, 0)); + } + } +} + +static void scanArrowArrayDayTimeInterval(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + int64_t curValue = arrayBuffer[i]; + int32_t day = curValue; + int64_t micros = (curValue >> (4 * sizeof(int64_t))) * 1000; + // arrow stores ms, while we store us + outputVector.setValue(i + dstOffset, interval_t(0, day, micros)); + } + } +} + +static void scanArrowArrayMonthDayNanoInterval(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto arrayBuffer = + (const int64_t*)((const uint8_t*)array->buffers[1] + srcOffset * 16); // 16 bits per value + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + int64_t curValue = arrayBuffer[2 * i]; + int32_t month = curValue; + int32_t day = curValue >> (4 * sizeof(int64_t)); + int64_t micros = arrayBuffer[2 * i + 1] / 1000; + outputVector.setValue(i + dstOffset, interval_t(month, day, micros)); + } + } +} + +template +static void scanArrowArrayBLOB(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; + auto arrayBuffer = (const uint8_t*)array->buffers[2]; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curOffset = offsets[i], nextOffset = offsets[i + 1]; + const uint8_t* data = arrayBuffer + curOffset; + auto length = nextOffset - curOffset; + BlobVector::addBlob(&outputVector, i + dstOffset, data, length); + } + } +} + +static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + auto arrayBuffer = (const uint8_t*)(array->buffers[1]); + auto valueBuffs = (const uint8_t**)(array->buffers + 2); + // BLOB value buffers begin from index 2 onwards + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curView = (const int32_t*)(arrayBuffer + (i + srcOffset) * 16); + // view structures are 16 bytes long + auto viewLength = curView[0]; + if (viewLength <= 12) { + BlobVector::addBlob( + &outputVector, i + dstOffset, (uint8_t*)(curView + 1), viewLength); + } else { + auto bufIndex = curView[2]; + auto offset = curView[3]; + BlobVector::addBlob( + &outputVector, i + dstOffset, valueBuffs[bufIndex] + offset, viewLength); + } + } + } +} + +static void scanArrowArrayFixedBLOB(const ArrowArray* array, ValueVector& outputVector, + ArrowNullMaskTree* mask, int64_t BLOBsize, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto arrayBuffer = ((const uint8_t*)array->buffers[1]) + srcOffset * BLOBsize; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + BlobVector::addBlob(&outputVector, i + dstOffset, arrayBuffer + i * BLOBsize, BLOBsize); + } + } +} + +template +static void scanArrowArrayVarList(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector); + for (int64_t i = 0; i < count; i++) { + auto curOffset = offsets[i], nextOffset = offsets[i + 1]; + if (!mask->isNull(i)) { + auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset); + outputVector.setValue(i + dstOffset, newEntry); + } + } + ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer, + mask->getChild(0), offsets[0], 0u, offsets[count] - offsets[0]); +} + +template +static void scanArrowArrayVarListView(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; + auto sizes = ((const offsetsT*)array->buffers[2]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curOffset = offsets[i], size = sizes[i]; + auto newEntry = ListVector::addList(&outputVector, size); + outputVector.setValue(i + dstOffset, newEntry); + ArrowNullMaskTree childTree(schema->children[0], array->children[0], srcOffset, count); + // make our own child here. precomputing through the mask tree is too complicated + ArrowConverter::fromArrowArray(schema->children[0], array->children[0], + *auxiliaryBuffer, &childTree, curOffset, newEntry.offset, newEntry.size); + } + } +} +/* +TODO manh: scan arrow fixed list +static void scanArrowArrayFixedList(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + mask->copyToValueVector(&outputVector, dstOffset, count); + int64_t numValuesInList = FixedListType::getNumValuesInList(&outputVector.dataType); + ArrowConverter::fromArrowArray(schema->children[0], array->children[0], outputVector, + mask->getChild(0), srcOffset * numValuesInList, dstOffset * numValuesInList, + count * numValuesInList); +} +*/ + +static void scanArrowArrayStruct(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + outputVector.setValue(i + dstOffset, + i + dstOffset); // struct_entry_t doesn't work for some reason + } + } + for (int64_t j = 0; j < schema->n_children; j++) { + ArrowConverter::fromArrowArray(schema->children[j], array->children[j], + *StructVector::getFieldVector(&outputVector, j).get(), mask->getChild(j), srcOffset, + dstOffset, count); + } +} + +static void scanArrowArrayDenseUnion(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto types = ((const int8_t*)array->buffers[0]) + srcOffset; + auto offsets = ((const int32_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curType = types[i]; + auto curOffset = offsets[i]; + UnionVector::setTagField(&outputVector, curType); + ArrowConverter::fromArrowArray(schema->children[curType], array->children[curType], + *StructVector::getFieldVector(&outputVector, curType).get(), + mask->getChild(curType), curOffset + srcOffset, i + dstOffset, 1); + // may be inefficient, since we're only scanning a single value + // should probably ask if we support dense unions (ie. is it okay to pack them) + } + } +} + +static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + auto types = ((const int8_t*)array->buffers[0]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto curType = types[i]; + UnionVector::setTagField(&outputVector, curType); + } + } + // it is specified that values that aren't selected in the type buffer + // must also be semantically correct. this is why this scanning works. + // however, there is possibly room for optimization here. + // eg. nulling out unselected children + for (int8_t i = 0; i < array->n_children; i++) { + ArrowConverter::fromArrowArray(schema->children[i], array->children[i], + *UnionVector::getValVector(&outputVector, i), mask->getChild(i), srcOffset, dstOffset, + count); + } +} + +template +static void scanArrowArrayDictionaryEncoded(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + + auto values = ((const offsetsT*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); + for (int64_t i = 0; i < count; i++) { + if (!mask->isNull(i)) { + auto dictOffseted = (*mask->getDictionary()) + values[i]; + ArrowConverter::fromArrowArray(schema->dictionary, array->dictionary, outputVector, + &dictOffseted, values[i] + array->dictionary->offset, i + dstOffset, + 1); // possibly inefficient? + } + } +} + +static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + + const ArrowArray* runEndArray = array->children[0]; + auto runEndBuffer = (const int32_t*)runEndArray->buffers[1]; + + // binary search run end corresponding to srcOffset + auto runEndIdx = runEndArray->offset; + { + auto L = runEndArray->offset, H = L + runEndArray->length; + while (H >= L) { + auto M = (H + L) >> 1; + if (runEndBuffer[M] < srcOffset) { + runEndIdx = M; + H = M - 1; + } else { + L = M + 1; + } + } + } + + for (int64_t i = 0; i < count; i++) { + while (i + srcOffset >= runEndBuffer[runEndIdx + 1]) { + runEndIdx++; + } + auto valuesOffseted = (*mask->getChild(1)) + runEndIdx; + ArrowConverter::fromArrowArray(schema->children[1], array->children[1], outputVector, + &valuesOffseted, runEndIdx, i + dstOffset, + 1); // there is optimization to be made here... + } +} + +void ArrowConverter::fromArrowArray(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count) { + const auto arrowType = schema->format; + if (array->dictionary != nullptr) { + switch (arrowType[0]) { + case 'c': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'C': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 's': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'S': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'i': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'I': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'l': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'L': + return scanArrowArrayDictionaryEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + default: + throw RuntimeException("Invalid Index Type: " + std::string(arrowType)); + } + } + switch (arrowType[0]) { + case 'n': + // NULL + outputVector.setAllNull(); + return; + case 'b': + // BOOL + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'c': + // INT8 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'C': + // UINT8 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 's': + // INT16 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'S': + // UINT16 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'i': + // INT32 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'I': + // UINT32 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'l': + // INT64 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'L': + // UINT64 + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'f': + // FLOAT + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'g': + // DOUBLE + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'z': + // BLOB + return scanArrowArrayBLOB(array, outputVector, mask, srcOffset, dstOffset, count); + case 'Z': + // LONG BLOB + return scanArrowArrayBLOB(array, outputVector, mask, srcOffset, dstOffset, count); + case 'u': + // STRING + return scanArrowArrayBLOB(array, outputVector, mask, srcOffset, dstOffset, count); + case 'U': + // LONG STRING + return scanArrowArrayBLOB(array, outputVector, mask, srcOffset, dstOffset, count); + case 'v': + switch (arrowType[1]) { + case 'z': + // BINARY VIEW + case 'u': + // STRING VIEW + return scanArrowArrayBLOBView(array, outputVector, mask, srcOffset, dstOffset, count); + default: + KU_UNREACHABLE; + } + case 'w': + // FIXED BLOB + return scanArrowArrayFixedBLOB( + array, outputVector, mask, std::stoi(arrowType + 2), srcOffset, dstOffset, count); + case 't': + switch (arrowType[1]) { + case 'd': + // DATE + if (arrowType[2] == 'D') { + // days since unix epoch + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + } else { + // ms since unix epoch + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + } + case 't': + // TODO pure time type + KU_UNREACHABLE; + case 's': + // TIMESTAMP + return scanArrowArrayFixedSizePrimitive( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'D': + // DURATION (KUZU INTERVAL) + switch (arrowType[2]) { + case 's': + // consider implement overflow checking here? + return scanArrowArrayDurationScaledUp( + array, outputVector, mask, 1000000, srcOffset, dstOffset, count); + case 'm': + return scanArrowArrayDurationScaledUp( + array, outputVector, mask, 1000, srcOffset, dstOffset, count); + case 'u': + return scanArrowArrayDurationScaledUp( + array, outputVector, mask, 1, srcOffset, dstOffset, count); + case 'n': + return scanArrowArrayDurationScaledDown( + array, outputVector, mask, 1000, srcOffset, dstOffset, count); + default: + KU_UNREACHABLE; + } + case 'i': + // INTERVAL + switch (arrowType[2]) { + case 'M': + return scanArrowArrayMonthInterval( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'D': + return scanArrowArrayDayTimeInterval( + array, outputVector, mask, srcOffset, dstOffset, count); + case 'n': + return scanArrowArrayMonthDayNanoInterval( + array, outputVector, mask, srcOffset, dstOffset, count); + default: + KU_UNREACHABLE; + } + default: + KU_UNREACHABLE; + } + case '+': + switch (arrowType[1]) { + case 'r': + // RUN END ENCODED + return scanArrowArrayRunEndEncoded( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'l': + // VAR_LIST + return scanArrowArrayVarList( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'L': + // LONG VAR_LIST + return scanArrowArrayVarList( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'w': + // FIXED_LIST + // TODO Manh: Array Scanning + KU_UNREACHABLE; + // return scanArrowArrayFixedList( + // schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 's': + // STRUCT + return scanArrowArrayStruct( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'm': + // MAP + // TODO maxwell; + // for some reason the columnar format specification doesn't mention maps at all + KU_UNREACHABLE; + case 'u': + if (arrowType[2] == 'd') { + // DENSE UNION + return scanArrowArrayDenseUnion( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + } else { + // SPARSE UNION + return scanArrowArraySparseUnion( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + } + case 'v': + switch (arrowType[2]) { + case 'l': + return scanArrowArrayVarListView( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + case 'L': + return scanArrowArrayVarListView( + schema, array, outputVector, mask, srcOffset, dstOffset, count); + // LONG VAR_LIST VIEW + default: + KU_UNREACHABLE; + } + default: + KU_UNREACHABLE; + } + default: + KU_UNREACHABLE; + } +} + +void ArrowConverter::fromArrowArray( + const ArrowSchema* schema, const ArrowArray* array, ValueVector& outputVector) { + ArrowNullMaskTree mask(schema, array, array->offset, array->length); + return fromArrowArray(schema, array, outputVector, &mask, array->offset, 0, array->length); +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/arrow/arrow_null_mask_tree.cpp b/src/common/arrow/arrow_null_mask_tree.cpp new file mode 100644 index 00000000000..0544b253502 --- /dev/null +++ b/src/common/arrow/arrow_null_mask_tree.cpp @@ -0,0 +1,214 @@ +#include + +#include "common/arrow/arrow.h" +#include "common/arrow/arrow_nullmask_tree.h" + +namespace kuzu { +namespace common { + +// scans are based on data specification found here +// https://arrow.apache.org/docs/format/Columnar.html + +// all offsets are measured by value, not physical size + +void ArrowNullMaskTree::copyToValueVector(ValueVector* vec, uint64_t dstOffset, uint64_t count) { + vec->setNullFromBits(mask->getData(), offset, dstOffset, count); +} + +ArrowNullMaskTree ArrowNullMaskTree::operator+(int64_t offset) { + // this operation is mostly a special case for dictionary/run-end encoding + ArrowNullMaskTree ret(*this); + ret.offset += offset; + return ret; +} + +bool ArrowNullMaskTree::copyFromBuffer(const void* buffer, uint64_t srcOffset, uint64_t count) { + if (buffer == nullptr) { + mask->setAllNonNull(); + return false; + } + mask->copyFromNullBits((const uint64_t*)buffer, srcOffset, 0, count, true); + return true; +} + +bool ArrowNullMaskTree::applyParentBitmap(const NullMask* parent, uint64_t count) { + if (parent == nullptr) { + return false; + } + const uint64_t* buffer = parent->data; + if (buffer != nullptr) { + for (int64_t i = 0; i < (count >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2); i++) { + mask->buffer[i] |= buffer[i]; + } + return true; + } + return false; +} + +template +void ArrowNullMaskTree::scanListPushDown( + const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, uint64_t count) { + const offsetsT* offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; + offsetsT auxiliaryLength = offsets[count] - offsets[0]; + NullMask pushDownMask((auxiliaryLength + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >> + NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2); + for (int64_t i = 0; i < count; i++) { + pushDownMask.setNullFromRange(offsets[i], offsets[i + 1] - offsets[i], isNull(i)); + } + children->push_back(ArrowNullMaskTree( + schema->children[0], array->children[0], offsets[0], auxiliaryLength, &pushDownMask)); +} + +void ArrowNullMaskTree::scanStructPushDown( + const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, uint64_t count) { + for (int64_t i = 0; i < array->n_children; i++) { + children->push_back(ArrowNullMaskTree( + schema->children[i], array->children[i], srcOffset, count, mask.get())); + } +} + +ArrowNullMaskTree::ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray* array, + uint64_t srcOffset, uint64_t count, const NullMask* parentBitmap) + : offset{0}, mask{std::make_shared( + (count + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >> + NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2)}, + children(std::make_shared>()) { + if (schema->dictionary != nullptr) { + copyFromBuffer(array->buffers[0], srcOffset, count); + applyParentBitmap(parentBitmap, count); + dictionary = std::make_shared(schema->dictionary, array->dictionary, + array->dictionary->offset, array->dictionary->length); + return; + } + const char* arrowType = schema->format; + std::vector structFields; + switch (arrowType[0]) { + case 'n': + mask->setAllNull(); + break; + case 'b': + case 'c': + case 'C': + case 's': + case 'S': + case 'i': + case 'I': + case 'l': + case 'L': + case 'f': + case 'g': + copyFromBuffer(array->buffers[0], srcOffset, count); + break; + case 'z': + case 'Z': + case 'u': + case 'U': + case 'v': + case 'w': + case 't': + copyFromBuffer(array->buffers[0], srcOffset, count); + applyParentBitmap(parentBitmap, count); + break; + case '+': + switch (arrowType[1]) { + case 'l': + copyFromBuffer(array->buffers[0], srcOffset, count); + applyParentBitmap(parentBitmap, count); + scanListPushDown(schema, array, srcOffset, count); + break; + case 'L': + copyFromBuffer(array->buffers[0], srcOffset, count); + applyParentBitmap(parentBitmap, count); + scanListPushDown(schema, array, srcOffset, count); + break; + case 'w': + // TODO manh: array null resolution + KU_UNREACHABLE; + copyFromBuffer(array->buffers[0], srcOffset, count); + applyParentBitmap(parentBitmap, count); + break; + case 's': + copyFromBuffer(array->buffers[0], srcOffset, count); + applyParentBitmap(parentBitmap, count); + scanStructPushDown(schema, array, srcOffset, count); + break; + case 'm': + // TODO maxwell bind map types + KU_UNREACHABLE; + case 'u': { + const int8_t* types = (const int8_t*)array->buffers[0]; + if (schema->format[2] == 'd') { + const int32_t* offsets = (const int32_t*)array->buffers[1]; + std::vector countChildren(array->n_children), + lowestOffsets(array->n_children); + std::vector highestOffsets(array->n_children); + for (int64_t i = srcOffset; i < srcOffset + count; i++) { + int32_t curOffset = offsets[i]; + int32_t curType = types[i]; + if (countChildren[curType] == 0) { + lowestOffsets[curType] = curOffset; + } + highestOffsets[curType] = curOffset; + countChildren[curType]++; + } + for (int64_t i = 0; i < array->n_children; i++) { + children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i], + lowestOffsets[i], highestOffsets[i] - lowestOffsets[i])); + } + for (int64_t i = srcOffset; i < srcOffset + count; i++) { + int32_t curOffset = offsets[i]; + int8_t curType = types[i]; + mask->setNull(i, children->operator[](curType).isNull(curOffset)); + } + } else { + for (int64_t i = 0; i < array->n_children; i++) { + children->push_back( + ArrowNullMaskTree(schema->children[i], array->children[i], srcOffset, count)); + } + for (int64_t i = srcOffset; i < srcOffset + count; i++) { + int8_t curType = types[i]; + mask->setNull(i, children->operator[](curType).isNull(i)); + // this isn't specified in the arrow specification, but is it valid to + // compute this using a bitwise OR? + } + } + if (parentBitmap != nullptr) { + for (int64_t i = 0; i < count >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; i++) { + mask->buffer[i] |= parentBitmap->buffer[i]; + } + } + } break; + case 'v': + // list views *suck*, especially when trying to write code that can support + // parallelization for this, we generate child NullMaskTrees on the fly, rather than + // attempt any precomputation + if (array->buffers[0] == nullptr) { + mask->setAllNonNull(); + } else { + mask->copyFromNullBits((const uint64_t*)array->buffers[0], srcOffset, 0, count, true); + } + if (parentBitmap != nullptr) { + for (int64_t i = 0; i < count >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; i++) { + mask->buffer[i] |= parentBitmap->buffer[i]; + } + } + break; + case 'r': + // it's better to resolve validity during the actual scanning for run-end encoded arrays + // so for this, let's just resolve child validities and move on + for (int64_t i = 0; i < array->n_children; i++) { + children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i], + array->children[i]->offset, array->children[i]->length)); + } + break; + default: + KU_UNREACHABLE; + } + break; + default: + KU_UNREACHABLE; + } +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/arrow/arrow_type.cpp b/src/common/arrow/arrow_type.cpp new file mode 100644 index 00000000000..dec7b047500 --- /dev/null +++ b/src/common/arrow/arrow_type.cpp @@ -0,0 +1,149 @@ +#include "common/arrow/arrow_converter.h" +#include "common/exception/not_implemented.h" +#include "common/exception/runtime.h" + +namespace kuzu { +namespace common { + +// pyarrow format string specifications can be found here +// https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings + +LogicalType ArrowConverter::fromArrowSchema(const ArrowSchema* schema) { + const char* arrowType = schema->format; + std::vector structFields; + // if we have a dictionary, then the logical type of the column is dependent upon the + // logical type of the dict + if (schema->dictionary != nullptr) { + return fromArrowSchema(schema->dictionary); + } + switch (arrowType[0]) { + case 'n': + return LogicalType(LogicalTypeID::ANY); + case 'b': + return LogicalType(LogicalTypeID::BOOL); + case 'c': + return LogicalType(LogicalTypeID::INT8); + case 'C': + return LogicalType(LogicalTypeID::UINT8); + case 's': + return LogicalType(LogicalTypeID::INT16); + case 'S': + return LogicalType(LogicalTypeID::UINT16); + case 'i': + return LogicalType(LogicalTypeID::INT32); + case 'I': + return LogicalType(LogicalTypeID::UINT32); + case 'l': + return LogicalType(LogicalTypeID::INT64); + case 'L': + return LogicalType(LogicalTypeID::UINT64); + case 'f': + return LogicalType(LogicalTypeID::FLOAT); + case 'g': + return LogicalType(LogicalTypeID::DOUBLE); + case 'z': + case 'Z': + return LogicalType(LogicalTypeID::BLOB); + case 'u': + case 'U': + return LogicalType(LogicalTypeID::STRING); + case 'v': + switch (arrowType[1]) { + case 'z': + return LogicalType(LogicalTypeID::BLOB); + case 'u': + return LogicalType(LogicalTypeID::STRING); + default: + KU_UNREACHABLE; + } + + case 'd': + throw NotImplementedException("custom bitwidth decimals are not supported"); + case 'w': + return LogicalType(LogicalTypeID::BLOB); // fixed width binary + case 't': + switch (arrowType[1]) { + case 'd': + if (arrowType[2] == 'D') { + return LogicalType(LogicalTypeID::DATE); + } else { + return LogicalType(LogicalTypeID::TIMESTAMP_MS); + } + case 't': + // TODO implement pure time type + throw NotImplementedException("Pure time types are not supported"); + case 's': + // TODO maxwell: timezone support + switch (arrowType[2]) { + case 's': + return LogicalType(LogicalTypeID::TIMESTAMP_SEC); + case 'm': + return LogicalType(LogicalTypeID::TIMESTAMP_MS); + case 'u': + return LogicalType(LogicalTypeID::TIMESTAMP); + case 'n': + return LogicalType(LogicalTypeID::TIMESTAMP_NS); + default: + KU_UNREACHABLE; + } + case 'D': + // duration + case 'i': + // interval + return LogicalType(LogicalTypeID::INTERVAL); + default: + KU_UNREACHABLE; + } + case '+': + KU_ASSERT(schema->n_children > 0); + switch (arrowType[1]) { + // complex types need a complementary ExtraTypeInfo object + case 'l': + case 'L': + return *LogicalType::VAR_LIST( + std::make_unique(fromArrowSchema(schema->children[0]))); + case 'w': + throw RuntimeException("Fixed list is currently WIP."); + // TODO Manh: Array Binding + // return *LogicalType::FIXED_LIST( + // std::make_unique(fromArrowSchema(schema->children[0])), + // std::stoi(arrowType+3)); + case 's': + for (int64_t i = 0; i < schema->n_children; i++) { + structFields.emplace_back(std::string(schema->children[i]->name), + std::make_unique(fromArrowSchema(schema->children[i]))); + } + return *LogicalType::STRUCT(std::move(structFields)); + case 'm': + // TODO maxwell bind map types + throw NotImplementedException("Scanning Arrow Map types is not supported"); + case 'u': + throw RuntimeException("Unions are currently WIP."); + for (int64_t i = 0; i < schema->n_children; i++) { + structFields.emplace_back(std::string(schema->children[i]->name), + std::make_unique(fromArrowSchema(schema->children[i]))); + } + return *LogicalType::UNION(std::move(structFields)); + case 'v': + switch (arrowType[2]) { + case 'l': + case 'L': + return *LogicalType::VAR_LIST( + std::make_unique(fromArrowSchema(schema->children[0]))); + default: + KU_UNREACHABLE; + } + case 'r': + // logical type corresponds to second child + return fromArrowSchema(schema->children[1]); + default: + KU_UNREACHABLE; + } + default: + KU_UNREACHABLE; + } + // refer to arrow_converted.cpp:65 +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/null_mask.cpp b/src/common/null_mask.cpp index d611c6d3a6d..4a9d82ea3d8 100644 --- a/src/common/null_mask.cpp +++ b/src/common/null_mask.cpp @@ -88,8 +88,8 @@ void NullMask::resize(uint64_t capacity) { } bool NullMask::copyFromNullBits(const uint64_t* srcNullEntries, uint64_t srcOffset, - uint64_t dstOffset, uint64_t numBitsToCopy) { - if (copyNullMask(srcNullEntries, srcOffset, this->data, dstOffset, numBitsToCopy)) { + uint64_t dstOffset, uint64_t numBitsToCopy, bool invert) { + if (copyNullMask(srcNullEntries, srcOffset, this->data, dstOffset, numBitsToCopy, invert)) { this->mayContainNulls = true; return true; } diff --git a/src/common/vector/value_vector.cpp b/src/common/vector/value_vector.cpp index 8593a19f2af..6ec432c9c40 100644 --- a/src/common/vector/value_vector.cpp +++ b/src/common/vector/value_vector.cpp @@ -59,8 +59,8 @@ bool ValueVector::discardNull(ValueVector& vector) { } bool ValueVector::setNullFromBits(const uint64_t* srcNullEntries, uint64_t srcOffset, - uint64_t dstOffset, uint64_t numBitsToCopy) { - return nullMask->copyFromNullBits(srcNullEntries, srcOffset, dstOffset, numBitsToCopy); + uint64_t dstOffset, uint64_t numBitsToCopy, bool invert) { + return nullMask->copyFromNullBits(srcNullEntries, srcOffset, dstOffset, numBitsToCopy, invert); } template diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h index f65178f529f..27f0cd6a08e 100644 --- a/src/include/catalog/catalog.h +++ b/src/include/catalog/catalog.h @@ -4,7 +4,6 @@ #include "catalog/catalog_entry/table_catalog_entry.h" #include "catalog_content.h" -#include "function/table_functions.h" namespace kuzu { namespace storage { diff --git a/src/include/common/arrow/arrow.h b/src/include/common/arrow/arrow.h index ba480455bb2..160cfd099bd 100644 --- a/src/include/common/arrow/arrow.h +++ b/src/include/common/arrow/arrow.h @@ -54,3 +54,21 @@ struct ArrowArray { #ifdef __cplusplus } #endif + +struct ArrowSchemaWrapper : public ArrowSchema { + ArrowSchemaWrapper() { release = nullptr; } + ~ArrowSchemaWrapper() { + if (release) { + release(this); + } + } +}; + +struct ArrowArrayWrapper : public ArrowArray { + ArrowArrayWrapper() { release = nullptr; } + ~ArrowArrayWrapper() { + if (release) { + release(this); + } + } +}; diff --git a/src/include/common/arrow/arrow_converter.h b/src/include/common/arrow/arrow_converter.h index 6ec50ee25d5..2d68d245b52 100644 --- a/src/include/common/arrow/arrow_converter.h +++ b/src/include/common/arrow/arrow_converter.h @@ -4,6 +4,7 @@ #include #include "common/arrow/arrow.h" +#include "common/arrow/arrow_nullmask_tree.h" #include "main/query_result.h" struct ArrowSchema; @@ -26,6 +27,13 @@ struct ArrowConverter { static void toArrowArray( main::QueryResult& queryResult, ArrowArray* out_array, std::int64_t chunkSize); + static common::LogicalType fromArrowSchema(const ArrowSchema* schema); + static void fromArrowArray(const ArrowSchema* schema, const ArrowArray* array, + ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, + uint64_t count); + static void fromArrowArray( + const ArrowSchema* schema, const ArrowArray* array, ValueVector& outputVector); + private: static void initializeChild(ArrowSchema& child, const std::string& name = ""); static void setArrowFormatForStruct( diff --git a/src/include/common/arrow/arrow_nullmask_tree.h b/src/include/common/arrow/arrow_nullmask_tree.h new file mode 100644 index 00000000000..b3494b9ca80 --- /dev/null +++ b/src/include/common/arrow/arrow_nullmask_tree.h @@ -0,0 +1,39 @@ +#pragma once + +#include "common/arrow/arrow.h" +#include "common/null_mask.h" +#include "common/vector/value_vector.h" + +namespace kuzu { +namespace common { + +class ArrowNullMaskTree { +public: + ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, + uint64_t count, const NullMask* parentMask = nullptr); + + void copyToValueVector(ValueVector* vec, uint64_t dstOffset, uint64_t count); + bool isNull(int64_t idx) { return mask->isNull(idx + offset); } + ArrowNullMaskTree* getChild(int idx) { return &(*children)[idx]; } + ArrowNullMaskTree* getDictionary() { return dictionary.get(); } + ArrowNullMaskTree operator+(int64_t offset); + +private: + bool copyFromBuffer(const void* buffer, uint64_t srcOffset, uint64_t count); + bool applyParentBitmap(const NullMask* buffer, uint64_t count); + + template + void scanListPushDown( + const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, uint64_t count); + + void scanStructPushDown( + const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, uint64_t count); + + int64_t offset; + std::shared_ptr mask; + std::shared_ptr> children; + std::shared_ptr dictionary; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/enums/expression_type.h b/src/include/common/enums/expression_type.h index 376afffa9a2..db597623829 100644 --- a/src/include/common/enums/expression_type.h +++ b/src/include/common/enums/expression_type.h @@ -184,6 +184,7 @@ const char* const IN_MEM_READ_RDF_LITERAL_FUNC_NAME = "IN_MEM_READ_RDF_LITERAL"; const char* const IN_MEM_READ_RDF_RESOURCE_TRIPLE_FUNC_NAME = "IN_MEM_READ_RDF_RESOURCE_TRIPLE"; const char* const IN_MEM_READ_RDF_LITERAL_TRIPLE_FUNC_NAME = "IN_MEM_READ_RDF_LITERAL_TRIPLE"; const char* const READ_PANDAS_FUNC_NAME = "READ_PANDAS"; +const char* const READ_PYARROW_FUNC_NAME = "READ_PYARROW"; const char* const READ_FTABLE_FUNC_NAME = "READ_FTABLE"; enum class ExpressionType : uint8_t { diff --git a/src/include/common/null_mask.h b/src/include/common/null_mask.h index 0f6c31e1826..9246c7cfddf 100644 --- a/src/include/common/null_mask.h +++ b/src/include/common/null_mask.h @@ -9,6 +9,8 @@ namespace kuzu { namespace common { +class ArrowNullMaskTree; + constexpr uint64_t NULL_BITMASKS_WITH_SINGLE_ONE[64] = {0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80, 0x100, 0x200, 0x400, 0x800, 0x1000, 0x2000, 0x4000, 0x8000, 0x10000, 0x20000, 0x40000, 0x80000, 0x100000, 0x200000, 0x400000, 0x800000, 0x1000000, 0x2000000, 0x4000000, 0x8000000, 0x10000000, @@ -66,6 +68,7 @@ const uint64_t NULL_HIGH_MASKS[65] = {0x0, 0x8000000000000000, 0xc00000000000000 0xfffffffffffffffe, 0xffffffffffffffff}; class NullMask { + friend class ArrowNullMaskTree; public: static constexpr uint64_t NO_NULL_ENTRY = 0; @@ -133,7 +136,7 @@ class NullMask { uint64_t* dstNullEntries, uint64_t dstOffset, uint64_t numBitsToCopy, bool invert = false); bool copyFromNullBits(const uint64_t* srcNullEntries, uint64_t srcOffset, uint64_t dstOffset, - uint64_t numBitsToCopy); + uint64_t numBitsToCopy, bool invert = false); // Sets the given number of bits to null (if isNull is true) or non-null (if isNull is false), // starting at the offset diff --git a/src/include/common/vector/value_vector.h b/src/include/common/vector/value_vector.h index d780a2cd72e..a18e4eba843 100644 --- a/src/include/common/vector/value_vector.h +++ b/src/include/common/vector/value_vector.h @@ -51,13 +51,17 @@ class KUZU_API ValueVector { } bool setNullFromBits(const uint64_t* srcNullEntries, uint64_t srcOffset, uint64_t dstOffset, - uint64_t numBitsToCopy); + uint64_t numBitsToCopy, bool invert = false); uint32_t getNumBytesPerValue() const { return numBytesPerValue; } // TODO(Guodong): Rename this to getValueRef template - inline T& getValue(uint32_t pos) const { + const T& getValue(uint32_t pos) const { + return ((T*)valueBuffer.get())[pos]; + } + template + T& getValue(uint32_t pos) { return ((T*)valueBuffer.get())[pos]; } template diff --git a/src/include/function/table/scan_replacement.h b/src/include/function/table/scan_replacement.h index 63bb4e3c5ca..075a7731822 100644 --- a/src/include/function/table/scan_replacement.h +++ b/src/include/function/table/scan_replacement.h @@ -14,7 +14,7 @@ struct ScanReplacementData { using scan_replace_func_t = std::function(const std::string&)>; struct ScanReplacement { - ScanReplacement(scan_replace_func_t replaceFunc) : replaceFunc{replaceFunc} {} + explicit ScanReplacement(scan_replace_func_t replaceFunc) : replaceFunc{replaceFunc} {} scan_replace_func_t replaceFunc; }; diff --git a/src/include/main/client_context.h b/src/include/main/client_context.h index fdbaf812fb4..f21fc699ce2 100644 --- a/src/include/main/client_context.h +++ b/src/include/main/client_context.h @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -11,7 +10,6 @@ #include "common/timer.h" #include "common/types/value/value.h" #include "function/function.h" -#include "function/scalar_function.h" #include "function/table/scan_replacement.h" #include "main/kuzu_fwd.h" #include "parser/statement.h" diff --git a/tools/python_api/CMakeLists.txt b/tools/python_api/CMakeLists.txt index 1319a8c43cb..aa0fb52fcc3 100644 --- a/tools/python_api/CMakeLists.txt +++ b/tools/python_api/CMakeLists.txt @@ -17,6 +17,8 @@ pybind11_add_module(_kuzu src_cpp/py_query_result.cpp src_cpp/py_query_result_converter.cpp src_cpp/py_conversion.cpp + src_cpp/pyarrow/pyarrow_bind.cpp + src_cpp/pyarrow/pyarrow_scan.cpp src_cpp/pandas/pandas_bind.cpp src_cpp/pandas/pandas_scan.cpp src_cpp/pandas/pandas_analyzer.cpp diff --git a/tools/python_api/src_cpp/include/cached_import/py_cached_modules.h b/tools/python_api/src_cpp/include/cached_import/py_cached_modules.h index 6bc374df4c4..28146a0cba6 100644 --- a/tools/python_api/src_cpp/include/cached_import/py_cached_modules.h +++ b/tools/python_api/src_cpp/include/cached_import/py_cached_modules.h @@ -66,9 +66,10 @@ class PandasCachedItem : public PythonCachedItem { }; public: - PandasCachedItem() : PythonCachedItem("pandas"), core(this), DataFrame(this), NA("NA", this), - NaT("NaT", this) {} + PandasCachedItem() : PythonCachedItem("pandas"), ArrowDtype("ArrowDtype", this), core(this), DataFrame(this), + NA("NA", this), NaT("NaT", this) {} + PythonCachedItem ArrowDtype; CoreCachedItem core; DataFrameCachedItem DataFrame; PythonCachedItem NA; @@ -96,9 +97,10 @@ class PyarrowCachedItem : public PythonCachedItem { class TableCachedItem : public PythonCachedItem { public: explicit TableCachedItem(PythonCachedItem* parent): PythonCachedItem("Table", parent), - from_batches("from_batches", this) {} + from_batches("from_batches", this), from_pandas("from_pandas", this) {} PythonCachedItem from_batches; + PythonCachedItem from_pandas; }; class LibCachedItem : public PythonCachedItem { diff --git a/tools/python_api/src_cpp/include/pyarrow/pyarrow_bind.h b/tools/python_api/src_cpp/include/pyarrow/pyarrow_bind.h new file mode 100644 index 00000000000..98f64e240a0 --- /dev/null +++ b/tools/python_api/src_cpp/include/pyarrow/pyarrow_bind.h @@ -0,0 +1,18 @@ +#pragma once + +#include "common/arrow/arrow_converter.h" +#include "py_object_container.h" +#include "pybind_include.h" + +namespace kuzu { + +namespace main { +class ClientContext; +} + +struct Pyarrow { + static std::shared_ptr bind(py::handle tableToBind, + std::vector& returnTypes, std::vector& names); +}; + +} // namespace kuzu diff --git a/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h b/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h new file mode 100644 index 00000000000..237c8c38259 --- /dev/null +++ b/tools/python_api/src_cpp/include/pyarrow/pyarrow_scan.h @@ -0,0 +1,61 @@ +#pragma once + +#include "common/arrow/arrow.h" +#include "function/scalar_function.h" +#include "function/table/bind_data.h" +#include "function/table/scan_functions.h" +#include "function/table_functions.h" +#include "pyarrow_bind.h" +#include "pybind_include.h" + +namespace kuzu { + +struct PyArrowTableScanLocalState final : public function::TableFuncLocalState { + ArrowArrayWrapper* arrowArray; + + explicit PyArrowTableScanLocalState(ArrowArrayWrapper* arrowArray) : arrowArray{arrowArray} {} +}; + +struct PyArrowTableScanSharedState final : public function::BaseScanSharedState { + std::vector> chunks; + uint64_t currentChunk; + std::mutex lock; + + PyArrowTableScanSharedState( + uint64_t numRows, std::vector>&& chunks) + : BaseScanSharedState{numRows}, chunks{std::move(chunks)}, currentChunk{0} {} + + ArrowArrayWrapper* getNextChunk(); +}; + +struct PyArrowTableScanFunctionData final : public function::TableFuncBindData { + std::shared_ptr schema; + std::unique_ptr table; + uint64_t numRows; + + PyArrowTableScanFunctionData(std::vector columnTypes, + std::shared_ptr schema, std::vector columnNames, + py::object table, uint64_t numRows) + : TableFuncBindData{std::move(columnTypes), std::move(columnNames)}, + schema{std::move(schema)}, table{std::make_unique(table)}, numRows{numRows} {} + + ~PyArrowTableScanFunctionData() override { + py::gil_scoped_acquire acquire; + table.reset(); + } + + std::unique_ptr copy() const override { + py::gil_scoped_acquire acquire; + // the schema is considered immutable so copying it by copying the shared_ptr is fine. + return std::make_unique( + columnTypes, schema, columnNames, *table, numRows); + } +}; + +struct PyArrowTableScanFunction { + static function::function_set getFunctionSet(); + + static function::TableFunction getFunction(); +}; + +} // namespace kuzu diff --git a/tools/python_api/src_cpp/pandas/pandas_scan.cpp b/tools/python_api/src_cpp/pandas/pandas_scan.cpp index 67607187a85..f432ab2093a 100644 --- a/tools/python_api/src_cpp/pandas/pandas_scan.cpp +++ b/tools/python_api/src_cpp/pandas/pandas_scan.cpp @@ -1,5 +1,6 @@ #include "pandas/pandas_scan.h" +#include "pyarrow/pyarrow_scan.h" #include "function/table/bind_input.h" #include "cached_import/py_cached_import.h" #include "numpy/numpy_scan.h" @@ -13,31 +14,7 @@ using namespace kuzu::catalog; namespace kuzu { -static offset_t tableFunc(TableFuncInput&, TableFuncOutput&); -static std::unique_ptr bindFunc(main::ClientContext*, - TableFuncBindInput*); -static std::unique_ptr initSharedState( - TableFunctionInitInput&); -static std::unique_ptr initLocalState( - TableFunctionInitInput&, TableFuncSharedState*, - storage::MemoryManager*); -static bool sharedStateNext(const TableFuncBindData*, - PandasScanLocalState*, TableFuncSharedState*); -static void pandasBackendScanSwitch(PandasColumnBindData*, uint64_t, - uint64_t, ValueVector*); - -static TableFunction getFunction() { - return TableFunction(READ_PANDAS_FUNC_NAME, tableFunc, bindFunc, initSharedState, - initLocalState, std::vector{LogicalTypeID::POINTER}); -} - -function_set PandasScanFunction::getFunctionSet() { - function_set functionSet; - functionSet.push_back(getFunction().copy()); - return functionSet; -} - -std::unique_ptr bindFunc( +std::unique_ptr bindFunc( main::ClientContext* /*context*/, TableFuncBindInput* input) { py::gil_scoped_acquire acquire; py::handle df(reinterpret_cast(input->inputs[0].getValue())); @@ -71,16 +48,16 @@ bool sharedStateNext(const TableFuncBindData* /*bindData*/, return true; } -std::unique_ptr initLocalState( - TableFunctionInitInput& input, TableFuncSharedState* sharedState, - storage::MemoryManager*) { +std::unique_ptr initLocalState( + function::TableFunctionInitInput& input, function::TableFuncSharedState* sharedState, + storage::MemoryManager* /*mm*/) { auto localState = std::make_unique(0 /* start */, 0 /* end */); sharedStateNext(input.bindData, localState.get(), sharedState); return localState; } -std::unique_ptr initSharedState( - TableFunctionInitInput& input) { +std::unique_ptr initSharedState( + function::TableFunctionInitInput& input) { // LCOV_EXCL_START if (PyGILState_Check()) { throw RuntimeException("PandasScan called but GIL was already held!"); @@ -132,6 +109,32 @@ std::vector> PandasScanFunctionData::copyC return result; } +static TableFunction getFunction() { + return TableFunction(READ_PANDAS_FUNC_NAME, tableFunc, bindFunc, initSharedState, + initLocalState, std::vector{LogicalTypeID::POINTER}); +} + +function_set PandasScanFunction::getFunctionSet() { + function_set functionSet; + functionSet.push_back(getFunction().copy()); + return functionSet; +} + +static bool isPyArrowBacked(const py::handle &df) { + py::list dtypes = df.attr("dtypes"); + if (dtypes.empty()) { + return false; + } + + auto arrow_dtype = importCache->pandas.ArrowDtype(); + for (auto &dtype : dtypes) { + if (py::isinstance(dtype, arrow_dtype)) { + return true; + } + } + return false; +} + static std::unique_ptr tryReplacePD(py::dict& dict, py::str& objectName) { if (!dict.contains(objectName)) { return nullptr; @@ -139,7 +142,11 @@ static std::unique_ptr tryReplacePD(py::dict& dict, py::str auto entry = dict[objectName]; if (PyConnection::isPandasDataframe(entry)) { auto scanReplacementData = std::make_unique(); - scanReplacementData->func = getFunction(); + if (isPyArrowBacked(entry)) { + scanReplacementData->func = PyArrowTableScanFunction::getFunction(); + } else { + scanReplacementData->func = getFunction(); + } auto bindInput = TableFuncBindInput(); bindInput.inputs.push_back(Value::createValue(reinterpret_cast(entry.ptr()))); scanReplacementData->bindInput = std::move(bindInput); diff --git a/tools/python_api/src_cpp/py_database.cpp b/tools/python_api/src_cpp/py_database.cpp index b2ed26c506f..d7ecc9acedb 100644 --- a/tools/python_api/src_cpp/py_database.cpp +++ b/tools/python_api/src_cpp/py_database.cpp @@ -2,6 +2,7 @@ #include "include/cached_import/py_cached_import.h" #include "pandas/pandas_scan.h" +#include "pyarrow/pyarrow_scan.h" #include diff --git a/tools/python_api/src_cpp/pyarrow/pyarrow_bind.cpp b/tools/python_api/src_cpp/pyarrow/pyarrow_bind.cpp new file mode 100644 index 00000000000..9bcaf40e06d --- /dev/null +++ b/tools/python_api/src_cpp/pyarrow/pyarrow_bind.cpp @@ -0,0 +1,26 @@ +#include "pyarrow/pyarrow_bind.h" + +#include "cached_import/py_cached_import.h" +#include "common/arrow/arrow.h" +#include "common/arrow/arrow_converter.h" + +namespace kuzu { + +std::shared_ptr Pyarrow::bind(py::handle tableToBind, + std::vector& returnTypes, std::vector& names) { + + std::shared_ptr schema = std::make_shared(); + auto pyschema = tableToBind.attr("schema"); + auto exportSchemaToC = pyschema.attr("_export_to_c"); + exportSchemaToC(reinterpret_cast(schema.get())); + + for (int64_t i = 0; i < schema->n_children; i++) { + ArrowSchema* child = schema->children[i]; + names.emplace_back(child->name); + returnTypes.push_back(common::ArrowConverter::fromArrowSchema(child)); + } + + return schema; +} + +} // namespace kuzu diff --git a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp new file mode 100644 index 00000000000..e9739ea65aa --- /dev/null +++ b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp @@ -0,0 +1,102 @@ +#include "pyarrow/pyarrow_scan.h" + +#include "cached_import/py_cached_import.h" +#include "common/arrow/arrow_converter.h" +#include "function/table/bind_input.h" +#include "py_connection.h" +#include "pybind11/pytypes.h" + +using namespace kuzu::function; +using namespace kuzu::common; +using namespace kuzu::catalog; + +namespace kuzu { + +static std::unique_ptr bindFunc( + main::ClientContext* /*context*/, TableFuncBindInput* input) { + + py::gil_scoped_acquire acquire; + py::object table(py::reinterpret_steal( + reinterpret_cast(input->inputs[0].getValue()))); + if (py::isinstance(table, importCache->pandas.DataFrame())) { + table = importCache->pyarrow.lib.Table.from_pandas()(table); + } + std::vector returnTypes; + std::vector names; + if (py::isinstance(table)) { + KU_UNREACHABLE; + } + auto numRows = py::len(table); + auto schema = Pyarrow::bind(table, returnTypes, names); + return std::make_unique( + std::move(returnTypes), std::move(schema), std::move(names), table, numRows); +} + +ArrowArrayWrapper* PyArrowTableScanSharedState::getNextChunk() { + std::lock_guard lck{lock}; + if (currentChunk == chunks.size()) { + return nullptr; + } + return chunks[currentChunk++].get(); +} + +static std::unique_ptr initSharedState( + function::TableFunctionInitInput& input) { + + py::gil_scoped_acquire acquire; + PyArrowTableScanFunctionData* bindData = + dynamic_cast(input.bindData); + py::list batches = bindData->table->attr("to_batches")(DEFAULT_VECTOR_CAPACITY); + std::vector> arrowArrayBatches; + + for (auto& i : batches) { + arrowArrayBatches.push_back(std::make_shared()); + i.attr("_export_to_c")(reinterpret_cast(arrowArrayBatches.back().get())); + } + + return std::make_unique( + bindData->numRows, std::move(arrowArrayBatches)); +} + +static std::unique_ptr initLocalState( + function::TableFunctionInitInput& /*input*/, function::TableFuncSharedState* sharedState, + storage::MemoryManager* /*mm*/) { + + PyArrowTableScanSharedState* pyArrowShared = + dynamic_cast(sharedState); + return std::make_unique(pyArrowShared->getNextChunk()); +} + +static common::offset_t tableFunc( + function::TableFuncInput& input, function::TableFuncOutput& output) { + + auto arrowScanData = dynamic_cast(input.bindData); + auto arrowLocalState = dynamic_cast(input.localState); + auto arrowSharedState = dynamic_cast(input.sharedState); + if (arrowLocalState->arrowArray == nullptr) { + return 0; + } + for (auto i = 0u; i < arrowScanData->columnTypes.size(); i++) { + common::ArrowConverter::fromArrowArray(arrowScanData->schema->children[i], + arrowLocalState->arrowArray->children[i], *output.dataChunk.getValueVector(i)); + } + auto len = arrowLocalState->arrowArray->length; + arrowLocalState->arrowArray = arrowSharedState->getNextChunk(); + return len; +} + +function::function_set PyArrowTableScanFunction::getFunctionSet() { + + function_set functionSet; + functionSet.push_back( + std::make_unique(READ_PYARROW_FUNC_NAME, tableFunc, bindFunc, + initSharedState, initLocalState, std::vector{LogicalTypeID::POINTER})); + return functionSet; +} + +TableFunction PyArrowTableScanFunction::getFunction() { + return TableFunction(READ_PYARROW_FUNC_NAME, tableFunc, bindFunc, initSharedState, + initLocalState, std::vector{LogicalTypeID::POINTER}); +} + +} // namespace kuzu diff --git a/tools/python_api/test/test_df_pyarrow.py b/tools/python_api/test/test_df_pyarrow.py new file mode 100644 index 00000000000..b88db4616b1 --- /dev/null +++ b/tools/python_api/test/test_df_pyarrow.py @@ -0,0 +1,234 @@ +import pandas as pd, pyarrow as pa +import pytz +import struct, random, math +from datetime import datetime, timedelta +from pandas.arrays import ArrowExtensionArray as arrowtopd +import pytest +import kuzu +from type_aliases import ConnDB +from pathlib import Path + +def generate_primitive(dtype): + if (random.randrange(0, 5) == 0): + return None + if (dtype.startswith("bool")): + return random.randrange(0, 1) == 1 + if (dtype.startswith("int32")): + return random.randrange(-2147483648, 2147483648) + if (dtype.startswith("int64")): + return random.randrange(-9223372036854775808, 9223372036854775808) + if (dtype.startswith("uint64")): + return random.randrange(0, 18446744073709551616) + if (dtype.startswith("float32")): + random_bits = random.getrandbits(32) + random_bytes = struct.pack(' None: + db = kuzu.Database(tmp_path) + conn = kuzu.Connection(db) + establish_connection = (conn, db) + # stress tests primitive reading + sfs = [100, 2048, 4000, 9000, 16000] + threads = [1, 2, 5, 10] + for sf in sfs: + for thread in threads: + pyarrow_test_helper(establish_connection, sf, thread) + +def test_pyarrow_time(conn_db_readonly : ConnDB) -> None: + conn, db = conn_db_readonly + col1 = pa.array([1000123, 2000123, 3000123], type=pa.duration('s')) + col2 = pa.array([1000123000000, 2000123000000, 3000123000000], type=pa.duration('us')) + col3 = pa.array([1000123000000000, 2000123000000000, 3000123000000000], type=pa.duration('ns')) + col4 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.timestamp('s')) + col5 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.timestamp('s')) + col6 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.timestamp('ms')) + col7 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.timestamp('us')) + col8 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.timestamp('ns')) + col9 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.date32()) + col10 = pa.array([datetime(2012, 1, 20), datetime(2000, 12, 2), datetime(1987, 5, 27)], + type=pa.date64()) + # not implemented by pandas + # col11 = pa.array([(1, 2, 3), (4, 5, -6), (100, 200, 1000000000)], type=pa.month_day_nano_interval()) + # for some reason, pyarrow doesnt support the direct creation of pure month or pure datetime + # intervals, so that will remain untested for now + df = pd.DataFrame({ + 'col1': arrowtopd(col1), + 'col2': arrowtopd(col2), + 'col3': arrowtopd(col3), + 'col4': arrowtopd(col4), + 'col5': arrowtopd(col5), + 'col6': arrowtopd(col6), + 'col7': arrowtopd(col7), + 'col8': arrowtopd(col8), + 'col9': arrowtopd(col9), + 'col10': arrowtopd(col10) + #'col11': arrowtopd(col11) + }) + result = conn.execute('CALL READ_PANDAS(df) RETURN *').get_as_df() + for colname in ['col1', 'col2', 'col3']: + for expected, actual in zip(df[colname], result[colname]): + tmp1 = expected if type(expected) is timedelta else expected.to_pytimedelta() + tmp2 = actual if type(actual) is timedelta else actual.to_pytimedelta() + assert tmp1 == tmp2 + for colname in ['col4', 'col5', 'col6', 'col7', 'col8']: + for expected, actual in zip(df[colname], result[colname]): + tmp1 = expected if type(expected) is datetime else expected.to_pydatetime() + tmp2 = actual if type(actual) is datetime else actual.to_pydatetime() + assert tmp1 == tmp2 + for colname in ['col9', 'col10']: + for expected, actual in zip(df[colname], result[colname]): + assert datetime.combine(expected, datetime.min.time()) == actual.to_pydatetime() + +def generate_blob(length): + if (random.randint(0, 5) == 0): + return None + return random.getrandbits(8*length).to_bytes(length, 'little') + +def test_pyarrow_blob(conn_db_readonly : ConnDB) -> None: + conn, db = conn_db_readonly + # blobs, blob views, and fixed size blobs + random.seed(100) + index = pa.array(range(16000), type=pa.int64()) + col1 = pa.array([generate_blob(random.randint(10, 100)) for i in range(16000)], + type=pa.binary()) + col2 = pa.array([generate_blob(random.randint(10, 100)) for i in range(16000)], + type=pa.large_binary()) + col3 = pa.array([generate_blob(32) for i in range(16000)], + type=pa.binary(32)) + col4 = col1.view(pa.binary()) + df = pd.DataFrame({ + 'index': arrowtopd(index), + 'col1' : arrowtopd(col1), + 'col2' : arrowtopd(col2), + 'col3' : arrowtopd(col3), + 'col4' : arrowtopd(col4) + }).sort_values(by=['index']) + result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index').get_as_df() + for colname in ['col1', 'col2', 'col3', 'col4']: + for expected, actual in zip(df[colname], result[colname]): + if is_null(expected) or is_null(actual): + assert is_null(expected) + assert is_null(actual) + else: + if bytes(expected) != bytes(actual): + print(expected) + print(actual) + print(df[colname]) + print(result[colname]) + print(colname) + assert bytes(expected) == bytes(actual) + +def generate_string(length): + if (random.randint(0, 5) == 0): + return None + return ''.join([random.choice('1234567890-=qwertyuiop[]\\asdfghjkl;\'zxcvbnm,./') + for i in range(length)]) + +def test_pyarrow_string(conn_db_readonly : ConnDB) -> None: + conn, db = conn_db_readonly + # blobs, blob views, and fixed size blobs + random.seed(100) + index = pa.array(range(16000), type=pa.int64()) + col1 = pa.array([generate_string(random.randint(10, 100)) for i in range(16000)], + type=pa.string()) + col2 = pa.array([generate_string(random.randint(10, 100)) for i in range(16000)], + type=pa.large_string()) + col3 = col1.view(pa.string()) + df = pd.DataFrame({ + 'index': arrowtopd(index), + 'col1' : arrowtopd(col1), + 'col2' : arrowtopd(col2), + 'col3' : arrowtopd(col3), + }).sort_values(by=['index']) + result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index').get_as_df() + for colname in ['col1', 'col2', 'col3']: + for expected, actual in zip(df[colname], result[colname]): + if is_null(expected) or is_null(actual): + assert is_null(expected) + assert is_null(actual) + else: + assert str(expected) == str(actual) + +def test_pyarrow_dict(conn_db_readonly : ConnDB) -> None: + conn, db = conn_db_readonly + random.seed(100) + index = pa.array(range(2000), type=pa.int64()) + col1 = pa.array([random.randint(0, 1) for i in range(2000)], type=pa.int32()).dictionary_encode() + col2 = pa.array([random.randint(-20, 20) / 10 for i in range(2000)], type=pa.float64()).dictionary_encode() + #it seems arrow hasn't implemented dictionary encoding for nested types + #col3 = pa.array([ + # [generate_string(random.randint(10, 100)) for x in range(random.randint(10, 100))] + # for i in range(3000) + #], type=pa.list_(pa.string())).dictionary_encode() + df = pd.DataFrame({ + 'index': arrowtopd(index), + 'col1' : arrowtopd(col1), + 'col2' : arrowtopd(col2) + }) + result = conn.execute('CALL READ_PANDAS(df) RETURN * ORDER BY index').get_as_df() + for colname in ['col1', 'col2']: + for expected, actual in zip(df[colname], result[colname]): + assert expected == actual