From d8e8f69d6d1a626005089868d8de429d223c93c6 Mon Sep 17 00:00:00 2001 From: Benjamin Winger Date: Tue, 21 Jan 2025 10:34:38 -0500 Subject: [PATCH] Review fixes --- .../operator/aggregate/hash_aggregate.h | 16 ++++------------ .../operator/aggregate/hash_aggregate.cpp | 14 +++++++------- test/test_files/agg/hash_large.test | 7 ++++--- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/src/include/processor/operator/aggregate/hash_aggregate.h b/src/include/processor/operator/aggregate/hash_aggregate.h index f062cc14ea5..a3c90c4ec15 100644 --- a/src/include/processor/operator/aggregate/hash_aggregate.h +++ b/src/include/processor/operator/aggregate/hash_aggregate.h @@ -37,14 +37,9 @@ struct HashAggregateInfo { class HashAggregateSharedState final : public BaseAggregateSharedState { public: - explicit HashAggregateSharedState(main::ClientContext* context, HashAggregateInfo hashInfo, + explicit HashAggregateSharedState(main::ClientContext* context, HashAggregateInfo aggInfo, const std::vector& aggregateFunctions); - void initPartitions(main::ClientContext* context, - const std::vector& keyDataTypes, - const std::vector& payloadDataTypes, - const std::vector& types); - ~HashAggregateSharedState(); void appendTuple(std::span tuple, common::hash_t hash); @@ -64,9 +59,6 @@ class HashAggregateSharedState final : public BaseAggregateSharedState { uint64_t getCurrentOffset() const { return currentOffset; } - // return whether limitNumber is exceeded - bool increaseAndCheckLimitCount(uint64_t num); - void setLimitNumber(uint64_t num) { limitNumber = num; } uint64_t getLimitNumber() const { return limitNumber; } @@ -81,14 +73,14 @@ class HashAggregateSharedState final : public BaseAggregateSharedState { void assertFinalized() const; - const HashAggregateInfo& getInfo() const { return hashInfo; } + const HashAggregateInfo& getAggregateInfo() const { return aggInfo; } protected: std::tuple getPartitionForOffset( common::offset_t offset) const; public: - HashAggregateInfo hashInfo; + HashAggregateInfo aggInfo; common::MPSCQueue> overflow; struct Partition { std::unique_ptr hashTable; @@ -103,7 +95,7 @@ class HashAggregateSharedState final : public BaseAggregateSharedState { table.resize(table.getNumTuplesPerBlock()); } // numTuplesReserved may be greater than the capacity of the factorizedTable - // if threads try to write to it it while a new block is being allocated + // if threads try to write to it while a new block is being allocated // So it should not be relied on for anything other than reserving tuples std::atomic numTuplesReserved; // Set after the tuple has been written to the block. diff --git a/src/processor/operator/aggregate/hash_aggregate.cpp b/src/processor/operator/aggregate/hash_aggregate.cpp index 5b0f64c2b88..76f169cad58 100644 --- a/src/processor/operator/aggregate/hash_aggregate.cpp +++ b/src/processor/operator/aggregate/hash_aggregate.cpp @@ -32,20 +32,20 @@ std::string HashAggregatePrintInfo::toString() const { return result; } HashAggregateSharedState::HashAggregateSharedState(main::ClientContext* context, - HashAggregateInfo hashInfo, const std::vector& aggregateFunctions) - : BaseAggregateSharedState{aggregateFunctions}, hashInfo{std::move(hashInfo)}, + HashAggregateInfo aggInfo, const std::vector& aggregateFunctions) + : BaseAggregateSharedState{aggregateFunctions}, aggInfo{std::move(aggInfo)}, globalPartitions{static_cast(context->getMaxNumThreadForExec())}, limitNumber{common::INVALID_LIMIT}, numThreads{0}, memoryManager{context->getMemoryManager()} { // When copying directly into factorizedTables the table's schema's internal mayContainNulls // won't be updated and it's probably less work to just always check nulls - for (size_t i = 0; i < this->hashInfo.tableSchema.getNumColumns(); i++) { - this->hashInfo.tableSchema.setMayContainsNullsToTrue(i); + for (size_t i = 0; i < this->aggInfo.tableSchema.getNumColumns(); i++) { + this->aggInfo.tableSchema.setMayContainsNullsToTrue(i); } for (auto& partition : globalPartitions) { partition.headBlock = new Partition::TupleBlock(context->getMemoryManager(), - this->hashInfo.tableSchema.copy()); + this->aggInfo.tableSchema.copy()); } } @@ -88,7 +88,7 @@ HashAggregateInfo::HashAggregateInfo(const HashAggregateInfo& other) void HashAggregateLocalState::init(HashAggregateSharedState* sharedState, ResultSet& resultSet, main::ClientContext* context, std::vector& aggregateFunctions, std::vector types) { - auto& info = sharedState->getInfo(); + auto& info = sharedState->getAggregateInfo(); std::vector keyDataTypes; for (auto& pos : info.flatKeysPos) { auto vector = resultSet.getValueVector(pos).get(); @@ -173,7 +173,7 @@ void HashAggregateSharedState::appendTuple(std::span tuple, common::has return; } else { // No more space in the block, allocate and replace it - auto* newBlock = new Partition::TupleBlock(memoryManager, hashInfo.tableSchema.copy()); + auto* newBlock = new Partition::TupleBlock(memoryManager, aggInfo.tableSchema.copy()); if (partition.headBlock.compare_exchange_strong(block, newBlock)) { // TODO(bmwinger): if the queuedTuples has at least a certain size (benchmark to see // if there's a benefit to waiting for multiple blocks) then cycle through the queue diff --git a/test/test_files/agg/hash_large.test b/test/test_files/agg/hash_large.test index ccfb5691fb7..1d8e4a1b37a 100644 --- a/test/test_files/agg/hash_large.test +++ b/test/test_files/agg/hash_large.test @@ -10,13 +10,14 @@ ---- 1 3373 -STATEMENT MATCH (a:account)-[]->(b:account) WHERE a.ID > 1000000 RETURN a.ID, COUNT(b) as n ORDER BY n DESC LIMIT 5; -# TODO(bmwinger): Ordering is wrong here +-CHECK_ORDER ---- 5 -18776017|2272 3359851|3373 -5442012|2204 59804598|2467 7860742|2458 +18776017|2272 +5442012|2204 -STATEMENT MATCH (a:account)-[]->(b:account) WHERE a.ID > 1000000 RETURN a.ID, COUNT(b.ID) as n ORDER BY n, a.ID; +-CHECK_ORDER ---- 68949 :hash_large.csv