Skip to content

Commit

Permalink
Address misc review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
royi-luo committed Aug 7, 2024
1 parent 82989eb commit 4010033
Show file tree
Hide file tree
Showing 17 changed files with 70 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "common/types/ku_list.h"
#include "common/types/ku_string.h"
#include "function/built_in_function_utils.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"

using kuzu::function::BuiltInFunctionsUtils;

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ constexpr T ceilDiv(T a, T b) {
}

template<std::integral To, std::integral From>
constexpr To narrowingConversion(From val) {
constexpr To safeIntegerConversion(From val) {
KU_ASSERT(static_cast<To>(val) == val);
return val;
}
Expand Down
16 changes: 11 additions & 5 deletions src/include/storage/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ enum class CompressionType : uint8_t {
INTEGER_BITPACKING = 1,
BOOLEAN_BITPACKING = 2,
CONSTANT = 3,
FLOAT = 4,
ALP = 4,
};

// used only for compressing floats/doubles
Expand Down Expand Up @@ -138,12 +138,12 @@ struct CompressionMetadata {

CompressionType compression;

std::variant<ALPMetadata> additionalMetadata;
std::optional<ALPMetadata> extraMetadata;

std::vector<CompressionMetadata> children;

CompressionMetadata(StorageValue min, StorageValue max, CompressionType compression)
: min(min), max(max), compression(compression), additionalMetadata() {}
: min(min), max(max), compression(compression), extraMetadata() {}

// constructor for float metadata
CompressionMetadata(StorageValue min, StorageValue max, CompressionType compression,
Expand All @@ -156,8 +156,14 @@ struct CompressionMetadata {
const CompressionMetadata& getChild(common::offset_t idx) const;

// accessors for additionalMetadata
inline const ALPMetadata& floatMetadata() const { return std::get<0>(additionalMetadata); }
inline ALPMetadata& floatMetadata() { return std::get<0>(additionalMetadata); }
inline const ALPMetadata& floatMetadata() const {
KU_ASSERT(extraMetadata.has_value());
return extraMetadata.value();
}
inline ALPMetadata& floatMetadata() {
KU_ASSERT(extraMetadata.has_value());
return extraMetadata.value();
}

void serialize(common::Serializer& serializer) const;
static CompressionMetadata deserialize(common::Deserializer& deserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FloatCompression final : public CompressionAlg {
const std::optional<common::NullMask>& nullMask = std::nullopt,
uint64_t nullMaskOffset = 0);

CompressionType getCompressionType() const override { return CompressionType::FLOAT; }
CompressionType getCompressionType() const override { return CompressionType::ALP; }

static BitpackInfo<EncodedType> getBitpackInfo(const CompressionMetadata& metadata);

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/column_chunk_flush.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/compression/compression.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/store/column_chunk_metadata.h"

namespace kuzu::storage {
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/column_read_writer.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "storage/buffer_manager/buffer_manager.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/db_file_id.h"

namespace kuzu {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
add_library(kuzu_storage_compression
OBJECT
compression.cpp
compression_float.cpp
float_compression.cpp
bitpacking_int128.cpp
bitpacking_utils.cpp)

Expand Down
28 changes: 14 additions & 14 deletions src/storage/compression/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "fastpfor/bitpackinghelpers.h"
#include "storage/compression/bitpacking_int128.h"
#include "storage/compression/bitpacking_utils.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/compression/sign_extend.h"
#include "storage/storage_utils.h"
#include "storage/store/column_chunk_data.h"
Expand Down Expand Up @@ -93,8 +93,8 @@ CompressionMetadata::CompressionMetadata(StorageValue min, StorageValue max,
CompressionType compression, const alp::state& state, StorageValue minEncoded,
StorageValue maxEncoded, common::PhysicalTypeID physicalType)
: min(min), max(max), compression(compression),
additionalMetadata(ALPMetadata{state, physicalType}) {
if (compression == CompressionType::FLOAT) {
extraMetadata(ALPMetadata{state, physicalType}) {
if (compression == CompressionType::ALP) {
children.emplace_back(minEncoded, maxEncoded,
minEncoded == maxEncoded ? CompressionType::CONSTANT :
CompressionType::INTEGER_BITPACKING);
Expand All @@ -111,7 +111,7 @@ void CompressionMetadata::serialize(Serializer& serializer) const {
serializer.write(max);
serializer.write(compression);

if (compression == CompressionType::FLOAT) {
if (compression == CompressionType::ALP) {
serializer.write(floatMetadata());
}

Expand All @@ -131,10 +131,10 @@ CompressionMetadata CompressionMetadata::deserialize(common::Deserializer& deser

CompressionMetadata ret(min, max, compressionType);

if (compressionType == CompressionType::FLOAT) {
if (compressionType == CompressionType::ALP) {
ALPMetadata alpMetadata;
deserializer.deserializeValue(alpMetadata);
ret.additionalMetadata = alpMetadata;
ret.extraMetadata = alpMetadata;
}

for (size_t i = 0; i < getChildCount(compressionType); ++i) {
Expand All @@ -151,7 +151,7 @@ bool CompressionMetadata::canAlwaysUpdateInPlace() const {
return true;
}
case CompressionType::CONSTANT:
case CompressionType::FLOAT:
case CompressionType::ALP:
case CompressionType::INTEGER_BITPACKING: {
return false;
}
Expand Down Expand Up @@ -202,7 +202,7 @@ bool CompressionMetadata::canUpdateInPlace(const uint8_t* data, uint32_t pos, ui
case CompressionType::UNCOMPRESSED: {
return true;
}
case CompressionType::FLOAT: {
case CompressionType::ALP: {
return TypeUtils::visit(
physicalType,
[&]<std::floating_point T>(T) {
Expand Down Expand Up @@ -289,7 +289,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, common::PhysicalTypeI
}
}
}
case CompressionType::FLOAT: {
case CompressionType::ALP: {
switch (dataType) {
case PhysicalTypeID::DOUBLE: {
return FloatCompression<double>::numValues(pageSize, *this);
Expand Down Expand Up @@ -318,7 +318,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, common::PhysicalTypeI

size_t CompressionMetadata::getChildCount(CompressionType compressionType) {
switch (compressionType) {
case CompressionType::FLOAT: {
case CompressionType::ALP: {
return 1;
}
default: {
Expand Down Expand Up @@ -397,7 +397,7 @@ std::string CompressionMetadata::toString(const PhysicalTypeID physicalType) con
case CompressionType::UNCOMPRESSED: {
return "UNCOMPRESSED";
}
case CompressionType::FLOAT: {
case CompressionType::ALP: {
uint8_t bitWidth = TypeUtils::visit(
physicalType,
[&]<std::floating_point T>(T) {
Expand Down Expand Up @@ -849,7 +849,7 @@ void ReadCompressedValuesFromPageToVector::operator()(const uint8_t* frame, Page
case CompressionType::UNCOMPRESSED:
return uncompressed.decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
case CompressionType::FLOAT: {
case CompressionType::ALP: {
switch (physicalType) {
case PhysicalTypeID::DOUBLE: {
return FloatCompression<double>().decompressFromPage(frame, pageCursor.elemPosInPage,
Expand Down Expand Up @@ -928,7 +928,7 @@ void ReadCompressedValuesFromPage::operator()(const uint8_t* frame, PageCursor&
case CompressionType::UNCOMPRESSED:
return uncompressed.decompressFromPage(frame, pageCursor.elemPosInPage, result,
startPosInResult, numValuesToRead, metadata);
case CompressionType::FLOAT: {
case CompressionType::ALP: {
switch (physicalType) {
case PhysicalTypeID::DOUBLE: {
return FloatCompression<double>().decompressFromPage(frame, pageCursor.elemPosInPage,
Expand Down Expand Up @@ -1025,7 +1025,7 @@ void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame
}
});
}
case CompressionType::FLOAT: {
case CompressionType::ALP: {
return TypeUtils::visit(physicalType, [&]<typename T>(T) {
if constexpr (std::is_floating_point_v<T>) {
FloatCompression<T>().setValuesFromUncompressed(data, dataOffset, frame, posInFrame,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"

#include "alp/encode.hpp"
#include "common/constants.h"
Expand Down Expand Up @@ -63,7 +63,7 @@ uint64_t FloatCompression<T>::compressNextPageWithExceptions(const uint8_t*& src
uint64_t srcOffset, uint64_t numValuesRemaining, uint8_t* dstBuffer, uint64_t dstBufferSize,
uint8_t* exceptionBuffer, [[maybe_unused]] uint64_t exceptionBufferSize,
uint64_t& exceptionCount, const struct CompressionMetadata& metadata) const {
KU_ASSERT(metadata.compression == CompressionType::FLOAT);
KU_ASSERT(metadata.compression == CompressionType::ALP);

const size_t numValuesToCompress =
std::min(numValuesRemaining, numValues(dstBufferSize, metadata));
Expand All @@ -83,7 +83,7 @@ uint64_t FloatCompression<T>::compressNextPageWithExceptions(const uint8_t*& src
auto* exceptionBufferEntry = reinterpret_cast<std::byte*>(
exceptionBuffer + exceptionCount * EncodeException<T>::sizeBytes());
EncodeExceptionView<T>{exceptionBufferEntry}.setValue({.value = floatValue,
.posInChunk = common::narrowingConversion<uint32_t>(srcOffset + posInPage)});
.posInChunk = common::safeIntegerConversion<uint32_t>(srcOffset + posInPage)});

// We don't need to replace with 1st successful encode as the integer bitpacking
// metadata is already populated
Expand Down
7 changes: 3 additions & 4 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "common/types/types.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/compression/compression.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"
#include "storage/store/column_chunk.h"
Expand Down Expand Up @@ -287,7 +287,7 @@ void Column::lookupInternal(Transaction* transaction, const ChunkState& state, o

[[maybe_unused]] static bool sanityCheckForWrites(const ColumnChunkMetadata& metadata,
const LogicalType& dataType) {
if (metadata.compMeta.compression == CompressionType::FLOAT) {
if (metadata.compMeta.compression == CompressionType::ALP) {
if (metadata.compMeta.children.size() == 0) {
return false;
}
Expand Down Expand Up @@ -453,8 +453,7 @@ void Column::initializeScanState(ChunkState& state) {
// TODO: Not a good way to initialize column for chunkState here.
state.column = this;

Transaction transaction{TransactionType::WRITE};
// TODO: update
Transaction& transaction = DUMMY_CHECKPOINT_TRANSACTION;
if (dataType.getPhysicalType() == common::PhysicalTypeID::DOUBLE) {
state.alpExceptionChunk = std::make_unique<InMemoryExceptionChunk<double>>(
columnReadWriter.get(), &transaction, state);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/column_chunk_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "expression_evaluator/expression_evaluator.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/compression/compression.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/store/column_chunk_flush.h"
#include "storage/store/column_chunk_metadata.h"
#include "storage/store/list_chunk_data.h"
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/column_chunk_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ ColumnChunkMetadata CompressedFloatFlushBuffer<T>::operator()(const uint8_t* buf
buffer, bufferSize, dataFH, startPageIdx, metadata);
}
// FlushBuffer should not be called with constant compression
KU_ASSERT(metadata.compMeta.compression == CompressionType::FLOAT);
KU_ASSERT(metadata.compMeta.compression == CompressionType::ALP);

auto [exceptionBuffer, exceptionBufferSize] =
flushCompressedFloats(*alg, dataType, buffer, bufferSize, dataFH, startPageIdx, metadata);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/column_chunk_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "common/serializer/serializer.h"
#include "common/type_utils.h"
#include "common/utils.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/store/column_chunk_data.h"

namespace kuzu::storage {
Expand Down
16 changes: 8 additions & 8 deletions src/storage/store/column_read_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "alp/encode.hpp"
#include "common/utils.h"
#include "storage/compression/compression_float.h"
#include "storage/compression/float_compression.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"
#include "transaction/transaction.h"
Expand Down Expand Up @@ -241,7 +241,7 @@ class FloatColumnReadWriter : public ColumnReadWriter {
void writeValueToPageFromVector(ChunkState& state, common::offset_t offsetInChunk,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom,
const write_values_from_vector_func_t& writeFromVectorFunc) override {
if (state.metadata.compMeta.compression != CompressionType::FLOAT) {
if (state.metadata.compMeta.compression != CompressionType::ALP) {
return defaultReader->writeValueToPageFromVector(state, offsetInChunk,
vectorToWriteFrom, posInVectorToWriteFrom, writeFromVectorFunc);
}
Expand All @@ -253,7 +253,7 @@ class FloatColumnReadWriter : public ColumnReadWriter {
void writeValuesToPageFromBuffer(ChunkState& state, common::offset_t dstOffset,
const uint8_t* data, const common::NullMask* nullChunkData, common::offset_t srcOffset,
common::offset_t numValues, const write_values_func_t& writeFunc) override {
if (state.metadata.compMeta.compression != CompressionType::FLOAT) {
if (state.metadata.compMeta.compression != CompressionType::ALP) {
return defaultReader->writeValuesToPageFromBuffer(state, dstOffset, data, nullChunkData,
srcOffset, numValues, writeFunc);
}
Expand Down Expand Up @@ -296,7 +296,7 @@ class FloatColumnReadWriter : public ColumnReadWriter {
common::offset_t offsetInChunk, OutputType result, uint32_t offsetInResult,
const read_value_from_page_func_t<OutputType>& readFunc) {
RUNTIME_CHECK(const ColumnChunkMetadata& metadata = state.metadata);
KU_ASSERT(metadata.compMeta.compression == CompressionType::FLOAT ||
KU_ASSERT(metadata.compMeta.compression == CompressionType::ALP ||
metadata.compMeta.compression == CompressionType::CONSTANT ||
metadata.compMeta.compression == CompressionType::UNCOMPRESSED);
std::optional<filter_func_t> filterFunc{};
Expand All @@ -310,15 +310,15 @@ class FloatColumnReadWriter : public ColumnReadWriter {
uint64_t endNodeOffset, const read_values_from_page_func_t<OutputType>& readFunc,
const std::optional<filter_func_t>& filterFunc) {
const ColumnChunkMetadata& metadata = state.metadata;
KU_ASSERT(metadata.compMeta.compression == CompressionType::FLOAT ||
KU_ASSERT(metadata.compMeta.compression == CompressionType::ALP ||
metadata.compMeta.compression == CompressionType::CONSTANT ||
metadata.compMeta.compression == CompressionType::UNCOMPRESSED);

const uint64_t numValuesToScan = endNodeOffset - startNodeOffset;
const uint64_t numValuesScanned = defaultReader->readCompressedValues(transaction, state,
result, startOffsetInResult, startNodeOffset, endNodeOffset, readFunc, filterFunc);

if (metadata.compMeta.compression == CompressionType::FLOAT && numValuesScanned > 0) {
if (metadata.compMeta.compression == CompressionType::ALP && numValuesScanned > 0) {
patchFloatExceptions(state, startNodeOffset, numValuesToScan, result,
startOffsetInResult, filterFunc);
}
Expand Down Expand Up @@ -364,15 +364,15 @@ class FloatColumnReadWriter : public ColumnReadWriter {
exceptionChunk->getExceptionAt(curExceptionIdx).posInChunk == writeOffset) {
if (newValueIsException) {
exceptionChunk->writeException(
EncodeException<T>{newValue, narrowingConversion<uint32_t>(writeOffset)},
EncodeException<T>{newValue, safeIntegerConversion<uint32_t>(writeOffset)},
curExceptionIdx);
} else {
exceptionChunk->removeExceptionAt(curExceptionIdx);
}
++curExceptionIdx;
} else if (newValueIsException) {
exceptionChunk->addException(
EncodeException<T>{newValue, narrowingConversion<uint32_t>(writeOffset)});
EncodeException<T>{newValue, safeIntegerConversion<uint32_t>(writeOffset)});
}
}

Expand Down
10 changes: 5 additions & 5 deletions test/storage/column_chunk_metadata_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ bool operator==(const CompressionMetadata& a, const CompressionMetadata& b) {
return false;
if (a.max != b.max)
return false;
if (a.additionalMetadata.index() != b.additionalMetadata.index())
if (a.extraMetadata.has_value() != b.extraMetadata.has_value())
return false;
if (std::holds_alternative<ALPMetadata>(a.additionalMetadata) &&
std::get<ALPMetadata>(a.additionalMetadata) !=
std::get<ALPMetadata>(b.additionalMetadata)) {}
if (a.extraMetadata.has_value() && a.extraMetadata.value() != b.extraMetadata.value()) {
return false;
}
if (a.children.size() != b.children.size())
return false;
for (size_t i = 0; i < a.children.size(); ++i) {
Expand Down Expand Up @@ -76,7 +76,7 @@ TEST(CompressionTests, DoubleMetadataSerializeThenDeserialize) {
alpState.fac = 5;

const CompressionMetadata origCompMeta{StorageValue{-1.01}, StorageValue{1.01},
CompressionType::FLOAT, alpState, StorageValue{0}, StorageValue{1}, PhysicalTypeID::DOUBLE};
CompressionType::ALP, alpState, StorageValue{0}, StorageValue{1}, PhysicalTypeID::DOUBLE};
const ColumnChunkMetadata orig{1, 2, 3, origCompMeta};

testSerializeThenDeserialize(orig);
Expand Down
Loading

0 comments on commit 4010033

Please sign in to comment.