-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hash aggregate finalization parallelization #4655
Conversation
bc28abe
to
141a9b1
Compare
This comment was marked as duplicate.
This comment was marked as duplicate.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4655 +/- ##
==========================================
+ Coverage 86.29% 86.35% +0.05%
==========================================
Files 1396 1397 +1
Lines 59848 60082 +234
Branches 7372 7387 +15
==========================================
+ Hits 51645 51881 +236
+ Misses 8036 8035 -1
+ Partials 167 166 -1 ☔ View full report in Codecov by Sentry. |
141a9b1
to
4e52c10
Compare
This comment was marked as duplicate.
This comment was marked as duplicate.
4e52c10
to
0312ecf
Compare
This comment was marked as outdated.
This comment was marked as outdated.
Benchmarks adapted from https://github.com/ClickHouse/ClickBench/, run on a 128 thread runner (2x AMD EPYC 7551)
Results are Cold/Hot, where hot is on subsequent queries in the same process. OS VM caches are dropped after each set of queries. |
0312ecf
to
1590c35
Compare
This comment was marked as outdated.
This comment was marked as outdated.
1590c35
to
85edfff
Compare
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Ben! I have some comments which we've discussed, and you can collapse any of them that you're already working on locally.
computeVectorHashes(flatKeyVectors, unFlatKeyVectors); | ||
|
||
auto startingNumTuples = getNumEntries(); | ||
if (startingNumTuples + numFlatTuples > maxNumHashSlots || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would extract the if check as a separate function, e.g., requireResize()
and refactor it altogether with resizeHashTableIfNecessary
.
Actually I have a question here on why we choose to check numTuples here? Ideally we should only check num distinct groups which should happen inside findHashSlots
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findHashSlots currently also inserts into the table, which makes things a little tricky, but I think that could be changed.
But the bigger issue is that while we could do it after figuring out the number of distinct groups, we'd have to re-run findHashSlots if we empty the hash table (even ignoring handling duplicates, the position they are inserted into may not be the same as the position they initially hashed to with linear probing, so if the original position is cleared it would break future lookups of that key).
On the other hand, with the current fixed capacity of 16384 entries (256KB with 16B entries) and always inserting <=2048 tuples at a time, it should always be the load factor check which is causing it to be emptied, so what we could do is assert that we have enough space to hold all of them, and then resize afterwards if we've exceeded the load factor after the insertions. But really all that does is make it resize once we've exceeded the load factor, instead of never exceeding the load factor, and I don't don't really know which would be preferable.
Given we insert up to 2048 at a time into a table holding up to 16384 entries, I think it would mean a load factor of 0.66-0.79 instead of a load factor of 0.54-0.66, noting that in the code we're defining the load factor as 1.5, which I'm fairly sure is inverted and really should be 0.66.
src/include/processor/operator/aggregate/aggregate_hash_table.h
Outdated
Show resolved
Hide resolved
const std::vector<common::LogicalType>& distinctAggKeyTypes, | ||
FactorizedTableSchema tableSchema) | ||
: AggregateHashTable(memoryManager, std::move(keyTypes), std::move(payloadTypes), | ||
aggregateFunctions, distinctAggKeyTypes, NUM_PARTITIONS * 1024, tableSchema.copy()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should follow a heuristic rule, which is to keep thread local HT small to fit in cache. Thus, there shouldn't be a fixed HT capacity, instead, we should dynamically calculate it based on aggregation keys and payloads, and cache size of the machine. (For cache size of the machine, one option is to access it through a library, such as libcpuid, the other option is to have a conservativ constant. The constant probably should be fine in many cases.)
For aggregation with lots of keys and payloads, so row_width is large, we may end up with a very small capacity by calculation. To avoid this, we should have a lower bound such as 2048.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should probably try and find a good benchmark for comparing performance with different row widths, but for now I've got it set to a minimum capacity of 2048 (or whatever fits in one 256KB block, if larger).
src/include/processor/operator/aggregate/aggregate_hash_table.h
Outdated
Show resolved
Hide resolved
std::this_thread::sleep_for(std::chrono::microseconds(500)); | ||
sharedState->tryMergeQueue(); | ||
} | ||
sharedState->finalizeAggregateHashTable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also think a bit on merging aggregate scan into aggregate? Ideally, we don't need to wait until the next pipeline to start scanning out from aggregate result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on it, though I wonder if this would be better as a separate PR so that the rest of this can be merged first.
Updated performanceFor the query on the msmarco dataset in the PR description the runtime is now 44 seconds with a peak memory usage of 21GB. Clickhouse benchmarks have more modest improvements (compare with #4655 (comment)), and I encountered a segfault that I'm going to look into. Benchmarks adapted from https://github.com/ClickHouse/ClickBench/, run on a 128 thread runner (2x AMD EPYC 7551)
Results are Cold/Hot, where hot is on subsequent queries in the same process. OS VM caches are dropped after each set of queries. |
ba7e260
to
6814b68
Compare
I wonder if we should also benchmark a query that has a wide row in the aggregation table (more group keys and payloads)? |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
3e5305b
to
eeec63a
Compare
This comment was marked as outdated.
This comment was marked as outdated.
eeec63a
to
32b124c
Compare
This comment was marked as outdated.
This comment was marked as outdated.
32b124c
to
d3355bb
Compare
Benchmark ResultMaster commit hash:
|
1e0e0b2
to
ada17f0
Compare
Benchmark ResultMaster commit hash:
|
180f882
to
b6dd897
Compare
b6dd897
to
02def71
Compare
Benchmark ResultMaster commit hash:
|
43570b4
to
3e6d83a
Compare
3e6d83a
to
d6da832
Compare
Benchmark ResultMaster commit hash:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great Ben. Have some minor comments.
Also can you update the numbers here with the latest? I don' think we have the seg fault now right? #4655 (comment)
auto sourcePos = sourceStartOffset + idx; | ||
memcpy(slot.entry, sourceTable.getTuple(sourcePos), | ||
getTableSchema()->getNumBytesPerTuple()); | ||
// TODO: Ideally we should actually copy the overflow so that the original overflow data can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's preventing this TODO? Are you going to address it separately? or not really looking forward to address it any soon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might have a significant impact on performance, so I wasn't rushing to complete it, but it probably should be addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would only work if we can get the overflow to support concurrent appends. At this point in the code that wouldn't matter, but we still wouldn't be able to free the overflow unless we also copy the overflow when partitioning.
Concurrent appends should be possible, but I was seeing a reasonably large difference in runtime, so maybe best to just leave it for now (as big as 0.5s->0.9s on one query with long strings).
Benchmark ResultMaster commit hash:
|
8632fce
to
7da5e47
Compare
This comment was marked as outdated.
This comment was marked as outdated.
7da5e47
to
d8e8f69
Compare
Benchmark ResultMaster commit hash:
|
d8e8f69
to
7527417
Compare
Benchmark ResultMaster commit hash:
|
Edit: Missed one thing when rebasing (one of the changes from #4709 needed to be done to a function newly added in this PR), so these benchmarks aren't fully up to date, however I'm not convinced that the change is significant. There was a large improvement the first time I ran the criterion benchmark again, and then it regressed in the other direction to be more or less the same as the initial benchmarks when I ran it again without changes Updated performance (again)For the query on the msmarco dataset in the PR description the runtime is now 41 seconds with a peak memory usage of 21GB. Clickhouse benchmarks have more modest improvements (compare with #4655 (comment)), and I encountered a segfault that I'm going to look into. Benchmarks adapted from https://github.com/ClickHouse/ClickBench/, run on a 128 thread runner (2x AMD EPYC 7551)
Results are Cold/Hot, where hot is on subsequent queries in the same process. OS VM caches are dropped after each set of queries. Performance scalingBelow are some benchmarks showing scaling. Violin plots to show the distribution; the queries were run 10 times with some warmups. The results show some discrepancies from the above results, which might be a measuring issue, but I think do a good job of showing how well it scales. The click benchmarks were done through the python API and the below were through the rust API using criterion.rs to produce the plots. |
7527417
to
31d9509
Compare
Benchmark ResultMaster commit hash:
|
Fixes kuzudb/internal#10 and #4547.
I switched back to having the partitioning hash table use linear probing, insert entries until a fixed capacity, and then empty and flush to the global partitions. The aggregation update code only works on the data within the vectors being inserted and requires that they are all available in the hash table at once, so kicking elements out of the hash table would require some complicated changes. I increased the default size of the hash table significantly to compensate as the performance was poor with hash tables of the default size.
It took a while to hunt down the bug in
BaseAggregateScan::writeAggregateResultToVector
. I've added an extra test to try and get slightly better coverage, but I think we really need some larger tests (the issue was caused by Vector re-use which previously we only would have encountered with results that have greater thanDEFAULT_VECTOR_CAPACITY
values, some of which must be null).The performance isn't scaling well with the number of partitions at the moment. On workflows which don't need very many threads it's much faster if built with fewer partitions (e.g. on the query in #4547 I found that the total runtime was about 2x faster (3x for just the the HashAggregate code) with 16 partitions on a machine with 12 threads compared to 256 partitions (that query/dataset also seems to just be getting a maximum of ~14 worker threads, presumably due to the way that the input is being divided up). I've reduced the number of partitions to 128 as a compromise given that's the number of threads on our largest testing machine, but will work on improving that next (it should be possible to do the partitioning logically without physically partitioning the data to improve cache locality).
I've increased the default buffer pool size since anything involving aggregation was requiring a bunch of extra memory for constructing the
PartitioningAggregateHashTables
, but that should be able to be reverted with the optimization mentioned above as it would also reduce the minimum memory requirements.Performance
On the query from #4547 (msmarco v2.1 1st segment with the query
MATCH (b:doc) WITH tokenize(b.segment) AS tk, OFFSET(ID(b)) AS id UNWIND tk AS t RETURN STEM(t, 'porter'), id, count(*);
.Run on a 128 thread machine (But see earlier note about not scaling past 14 threads).
(as an example of the performance improvements we might expect to achieve with better cache locality)
Note that memory usage has improved significantly since before the per-thread AggregateHashTables would be filled first, and exist in-memory simultaneously with the final merged global AggregateHashTable. Now the per-thread tables get merged into the global ones in small chunks.
I'd run some benchmarks using queries from ClickBench and seen an improvement of about 2-5x (I'll update with more details later).