Skip to content

Commit

Permalink
Store partitions globally instead of per-thread to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Jan 6, 2025
1 parent 85edfff commit ba7e260
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 156 deletions.
57 changes: 31 additions & 26 deletions src/include/processor/operator/aggregate/aggregate_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <cstdint>

#include "aggregate_input.h"
#include "common/copy_constructors.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "function/aggregate_function.h"
#include "processor/result/base_hash_table.h"
Expand Down Expand Up @@ -93,6 +95,10 @@ class AggregateHashTable : public BaseHashTable {

void resize(uint64_t newSize);

AggregateHashTable createEmptyCopy() const { return AggregateHashTable(*this); }

DEFAULT_BOTH_MOVE(AggregateHashTable);

protected:
virtual uint64_t matchFTEntries(const std::vector<common::ValueVector*>& flatKeyVectors,
const std::vector<common::ValueVector*>& unFlatKeyVectors, uint64_t numMayMatches,
Expand Down Expand Up @@ -210,6 +216,29 @@ class AggregateHashTable : public BaseHashTable {
return *factorizedTable;
}

static std::vector<common::LogicalType> getDistinctAggKeyTypes(
const AggregateHashTable& hashTable) {
std::vector<common::LogicalType> distinctAggKeyTypes(hashTable.distinctHashTables.size());
std::transform(hashTable.distinctHashTables.begin(), hashTable.distinctHashTables.end(),
distinctAggKeyTypes.begin(), [&](const auto& distinctHashTable) {
if (distinctHashTable) {
return distinctHashTable->keyTypes.back().copy();
} else {
return common::LogicalType();
}
});
return distinctAggKeyTypes;
}

private:
// Does not copy the contents of the hash table and is provided as a convenient way of
// constructing more hash tables without having to hold on to or expose the construction
// arguments via createEmptyCopy
AggregateHashTable(const AggregateHashTable& other)
: AggregateHashTable(*other.memoryManager, common::LogicalType::copy(other.keyTypes),
common::LogicalType::copy(other.payloadTypes), other.aggregateFunctions,
getDistinctAggKeyTypes(other), 0, other.factorizedTable->getTableSchema()->copy()) {}

protected:
uint32_t hashColIdxInFT{};
std::unique_ptr<uint64_t[]> mayMatchIdxes;
Expand Down Expand Up @@ -244,22 +273,15 @@ struct AggregateHashTableUtils {
class HashAggregateSharedState;
class PartitioningAggregateHashTable : public AggregateHashTable {
public:
static constexpr size_t NUM_PARTITIONS = 128;

PartitioningAggregateHashTable(HashAggregateSharedState* sharedState,
storage::MemoryManager& memoryManager, std::vector<common::LogicalType> keyTypes,
std::vector<common::LogicalType> payloadTypes,
const std::vector<function::AggregateFunction>& aggregateFunctions,
const std::vector<common::LogicalType>& distinctAggKeyTypes,
FactorizedTableSchema tableSchema)
: AggregateHashTable(memoryManager, std::move(keyTypes), std::move(payloadTypes),
aggregateFunctions, distinctAggKeyTypes, NUM_PARTITIONS * 1024, tableSchema.copy()),
partitions{}, sharedState{sharedState}, tableSchema{std::move(tableSchema)},
memoryManager{memoryManager} {
std::generate(partitions.begin(), partitions.end(), [&]() {
return std::make_unique<FactorizedTable>(&memoryManager, this->tableSchema.copy());
});
}
aggregateFunctions, distinctAggKeyTypes, 0 /*minimum size*/, tableSchema.copy()),
sharedState{sharedState}, tableSchema{std::move(tableSchema)} {}

uint64_t append(const std::vector<common::ValueVector*>& flatKeyVectors,
const std::vector<common::ValueVector*>& unFlatKeyVectors,
Expand All @@ -268,27 +290,10 @@ class PartitioningAggregateHashTable : public AggregateHashTable {
uint64_t resultSetMultiplicity);

void mergeAll();
uint64_t getNumEntries() const {
uint64_t numEntries = 0;
for (const auto& partition : partitions) {
numEntries += partition->getNumTuples();
}
return numEntries;
}

protected:
FactorizedTable& getFactorizedTable(common::hash_t entryHash) override {
return *partitions[entryHash % NUM_PARTITIONS];
}

private:
uint8_t* appendEmptyTuple(uint8_t partitionIdx);

private:
std::array<std::unique_ptr<FactorizedTable>, NUM_PARTITIONS> partitions;
HashAggregateSharedState* sharedState;
FactorizedTableSchema tableSchema;
storage::MemoryManager& memoryManager;
};

} // namespace processor
Expand Down
92 changes: 62 additions & 30 deletions src/include/processor/operator/aggregate/hash_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,51 @@
#include <memory>

#include "aggregate_hash_table.h"
#include "common/concurrent_vector.h"
#include "common/copy_constructors.h"
#include "common/in_mem_overflow_buffer.h"
#include "common/mpsc_queue.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "main/client_context.h"
#include "processor/operator/aggregate/base_aggregate.h"
#include "processor/result/factorized_table.h"
#include "processor/result/factorized_table_schema.h"

namespace kuzu {
namespace processor {

struct HashAggregateInfo;
struct HashAggregateInfo {
std::vector<DataPos> flatKeysPos;
std::vector<DataPos> unFlatKeysPos;
std::vector<DataPos> dependentKeysPos;
FactorizedTableSchema tableSchema;

HashAggregateInfo(std::vector<DataPos> flatKeysPos, std::vector<DataPos> unFlatKeysPos,
std::vector<DataPos> dependentKeysPos, FactorizedTableSchema tableSchema);
HashAggregateInfo(const HashAggregateInfo& other);
};

// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor): This is a final class.
class HashAggregateSharedState final : public BaseAggregateSharedState {

public:
explicit HashAggregateSharedState(
explicit HashAggregateSharedState(main::ClientContext* context, HashAggregateInfo hashInfo,
const std::vector<function::AggregateFunction>& aggregateFunctions);

void initPartitions(main::ClientContext* context, std::vector<common::LogicalType> keyDataTypes,
std::vector<common::LogicalType> payloadDataTypes, HashAggregateInfo& info,
std::vector<common::LogicalType> types);
void initPartitions(main::ClientContext* context,
const std::vector<common::LogicalType>& keyDataTypes,
const std::vector<common::LogicalType>& payloadDataTypes,
const std::vector<common::LogicalType>& types);

// Will return either the original factorized table for reuse, or a nullptr if the
std::unique_ptr<FactorizedTable> mergeTable(uint8_t partitionIdx,
std::unique_ptr<FactorizedTable> aggregateHashTable);
~HashAggregateSharedState();

void tryMergeQueue();
void finalizeAggregateHashTable();
void appendTuple(std::span<uint8_t> tuple, common::hash_t hash);
void appendOverflow(common::InMemOverflowBuffer* overflowBuffer) const {
overflow->merge(*overflowBuffer);
}

void finalizeAggregateHashTable(const AggregateHashTable& localHashTable);

std::pair<uint64_t, uint64_t> getNextRangeToRead() override;

Expand Down Expand Up @@ -62,33 +78,51 @@ class HashAggregateSharedState final : public BaseAggregateSharedState {

void assertFinalized() const;

const HashAggregateInfo& getInfo() const { return hashInfo; }

protected:
std::tuple<const FactorizedTable*, common::offset_t> getPartitionForOffset(
common::offset_t offset) const;

private:
public:
HashAggregateInfo hashInfo;
std::unique_ptr<common::InMemOverflowBuffer> overflow;
struct Partition {
std::unique_ptr<AggregateHashTable> hashTable;
std::mutex mtx;
common::MPSCQueue<std::unique_ptr<FactorizedTable>> queuedTuples;
struct TupleBlock {
TupleBlock(storage::MemoryManager* memoryManager, FactorizedTableSchema tableSchama)
: numTuplesReserved{0}, numTuplesWritten{0},
factorizedTable{memoryManager, std::move(tableSchama)} {
// Start at a fixed capacity of one full block (so that concurrent writes are safe).
// If it is not filled, we resize it to the actual capacity before writing it to the
// hashTable
factorizedTable.resize(factorizedTable.getNumTuplesPerBlock());
}
// numTuplesReserved may be greater than the capacity of the factorizedTable
// if threads try to write to it it while a new block is being allocated
// So it should not be relied on for anything other than reserving tuples
std::atomic<uint64_t> numTuplesReserved;
// Set after the tuple has been written to the block.
// Once numTuplesWritten == factorizedTable.getNumTuplesPerBlock() all writes have
// finished
std::atomic<uint64_t> numTuplesWritten;
FactorizedTable factorizedTable;
};
common::MPSCQueue<TupleBlock*> queuedTuples;
// When queueing tuples, they are always added to the headBlock until the headBlock is full
// (numTuplesReserved >= factorizedTable.getNumTuplesPerBlock()), then pushed into the
// queuedTuples (at which point, the numTuplesReserved may not be equal to the
// numTuplesWritten)
std::atomic<TupleBlock*> headBlock;
bool finalized = false;
};
std::array<Partition, PartitioningAggregateHashTable::NUM_PARTITIONS> globalPartitions;
std::vector<Partition> globalPartitions;
std::atomic_uint64_t limitCounter;
uint64_t limitNumber;
std::atomic<size_t> numThreadsFinishedProducing;
std::atomic<size_t> numThreads;
};

struct HashAggregateInfo {
std::vector<DataPos> flatKeysPos;
std::vector<DataPos> unFlatKeysPos;
std::vector<DataPos> dependentKeysPos;
FactorizedTableSchema tableSchema;

HashAggregateInfo(std::vector<DataPos> flatKeysPos, std::vector<DataPos> unFlatKeysPos,
std::vector<DataPos> dependentKeysPos, FactorizedTableSchema tableSchema);
HashAggregateInfo(const HashAggregateInfo& other);
storage::MemoryManager* memoryManager;
};

struct HashAggregateLocalState {
Expand All @@ -99,8 +133,7 @@ struct HashAggregateLocalState {
std::unique_ptr<PartitioningAggregateHashTable> aggregateHashTable;

void init(HashAggregateSharedState* sharedState, ResultSet& resultSet,
main::ClientContext* context, HashAggregateInfo& info,
std::vector<function::AggregateFunction>& aggregateFunctions,
main::ClientContext* context, std::vector<function::AggregateFunction>& aggregateFunctions,
std::vector<common::LogicalType> types);
uint64_t append(const std::vector<AggregateInput>& aggregateInputs,
uint64_t multiplicity) const;
Expand Down Expand Up @@ -129,13 +162,13 @@ struct HashAggregatePrintInfo final : OPPrintInfo {
class HashAggregate : public BaseAggregate {
public:
HashAggregate(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<HashAggregateSharedState> sharedState, HashAggregateInfo hashInfo,
std::shared_ptr<HashAggregateSharedState> sharedState,
std::vector<function::AggregateFunction> aggregateFunctions,
std::vector<AggregateInfo> aggInfos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
std::unique_ptr<OPPrintInfo> printInfo)
: BaseAggregate{std::move(resultSetDescriptor), std::move(aggregateFunctions),
std::move(aggInfos), std::move(child), id, std::move(printInfo)},
hashInfo{std::move(hashInfo)}, sharedState{std::move(sharedState)} {}
sharedState{std::move(sharedState)} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand All @@ -144,15 +177,14 @@ class HashAggregate : public BaseAggregate {
void finalizeInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<HashAggregate>(resultSetDescriptor->copy(), sharedState, hashInfo,
return make_unique<HashAggregate>(resultSetDescriptor->copy(), sharedState,
copyVector(aggregateFunctions), copyVector(aggInfos), children[0]->clone(), id,
printInfo->copy());
}

std::shared_ptr<HashAggregateSharedState> getSharedState() const { return sharedState; }

private:
HashAggregateInfo hashInfo;
HashAggregateLocalState localState;
std::shared_ptr<HashAggregateSharedState> sharedState;
};
Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/result/base_hash_table.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/copy_constructors.h"
#include "common/types/types.h"
#include "processor/result/factorized_table.h"
#include "storage/buffer_manager/memory_manager.h"
Expand All @@ -15,6 +16,8 @@ class BaseHashTable {

virtual ~BaseHashTable() = default;

DELETE_COPY_DEFAULT_MOVE(BaseHashTable);

protected:
static constexpr uint64_t HASH_BLOCK_SIZE = common::TEMP_PAGE_SIZE;

Expand All @@ -38,7 +41,7 @@ class BaseHashTable {
uint64_t numSlotsPerBlockLog2;
uint64_t slotIdxInBlockMask;
std::vector<std::unique_ptr<DataBlock>> hashSlotsBlocks;
storage::MemoryManager& memoryManager;
storage::MemoryManager* memoryManager;
std::unique_ptr<FactorizedTable> factorizedTable;
std::vector<compare_function_t> compareEntryFuncs;
std::vector<common::LogicalType> keyTypes;
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class KUZU_API FactorizedTable {
void setNonOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx);
void clear();

storage::MemoryManager* getMemoryManager() { return memoryManager; }

void resize(uint64_t numTuples);

private:
void setOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx, ft_tuple_idx_t tupleIdx);

Expand Down
11 changes: 6 additions & 5 deletions src/processor/map/map_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,18 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createHashAggregate(const expressi
allKeys.insert(allKeys.end(), keys.begin(), keys.end());
allKeys.insert(allKeys.end(), payloads.begin(), payloads.end());
auto aggregateInputInfos = getAggregateInputInfos(allKeys, aggregates, *inSchema);
auto sharedState = std::make_shared<HashAggregateSharedState>(aggFunctions);
auto flatKeys = getKeyExpressions(keys, *inSchema, true /* isFlat */);
auto unFlatKeys = getKeyExpressions(keys, *inSchema, false /* isFlat */);
auto tableSchema = getFactorizedTableSchema(flatKeys, unFlatKeys, payloads, aggFunctions);
HashAggregateInfo aggregateInfo{getDataPos(flatKeys, *inSchema),
getDataPos(unFlatKeys, *inSchema), getDataPos(payloads, *inSchema), std::move(tableSchema)};

auto sharedState = std::make_shared<HashAggregateSharedState>(clientContext,
std::move(aggregateInfo), aggFunctions);
auto printInfo = std::make_unique<HashAggregatePrintInfo>(allKeys, aggregates);
auto aggregate =
make_unique<HashAggregate>(std::make_unique<ResultSetDescriptor>(inSchema), sharedState,
std::move(aggregateInfo), std::move(aggFunctions), std::move(aggregateInputInfos),
std::move(prevOperator), getOperatorID(), printInfo->copy());
auto aggregate = make_unique<HashAggregate>(std::make_unique<ResultSetDescriptor>(inSchema),
sharedState, std::move(aggFunctions), std::move(aggregateInputInfos),
std::move(prevOperator), getOperatorID(), printInfo->copy());
// Create AggScan.
expression_vector outputExpressions;
outputExpressions.insert(outputExpressions.end(), flatKeys.begin(), flatKeys.end());
Expand Down
Loading

0 comments on commit ba7e260

Please sign in to comment.