Skip to content

Commit

Permalink
fix(hashjoin): Add extra output capacity for left semi join (facebook…
Browse files Browse the repository at this point in the history
…incubator#12108)

Summary:
We allocate an extra row in the output for left join, full join, and anti join
to handle cases where the output includes a row carried over from the Tracker
or a row added by the `finish` function at the end of the input batch. This
scenario can also occur in the context of a left semi join with a filter when input
batch size > `outputBatchSize_`. So we need allocate a extra row for left semi joins
also.
Fix the issue below, repro UT is added also.
```
E0117 13:19:37.833635 1938231 JoinFuzzer.cpp:1562] ==============================> Started iteration 700 (seed: 3326651276)
E0117 13:19:37.836191 1938231 JoinFuzzer.cpp:539] Executing query plan with UNGROUPED strategy[0 groups]:
-- HashJoin[2][LEFT SEMI (PROJECT) t0=u0 AND t1=u1, filter: eq(ROW["tp2"],ROW["bp2"])] -> tp3:REAL, match:BOOLEAN
  -- Values[0][10240 rows in 10 vectors] -> t0:DOUBLE, t1:DATE, tp2:BOOLEAN, tp3:REAL, tp4:SMALLINT, tp5:INTEGER
  -- Values[1][1290 rows in 10 vectors] -> u0:DOUBLE, u1:DATE, bp2:BOOLEAN
E0117 13:19:37.874596 1938231 JoinFuzzer.cpp:539] Executing query plan with UNGROUPED strategy[0 groups]:
-- HashJoin[2][LEFT SEMI (PROJECT) t0=u0 AND t1=u1, filter: eq(ROW["tp2"],ROW["bp2"])] -> tp3:REAL, match:BOOLEAN
  -- Values[0][10240 rows in 10 vectors] -> t0:DOUBLE, t1:DATE, tp2:BOOLEAN, tp3:REAL, tp4:SMALLINT, tp5:INTEGER
  -- Values[1][1290 rows in 10 vectors] -> u0:DOUBLE, u1:DATE, bp2:BOOLEAN
E0117 13:19:37.875802 1938231 JoinFuzzer.cpp:539] Executing query plan with UNGROUPED strategy[0 groups] and spilling injection:
-- HashJoin[2][LEFT SEMI (PROJECT) t0=u0 AND t1=u1, filter: eq(ROW["tp2"],ROW["bp2"])] -> tp3:REAL, match:BOOLEAN
  -- Values[0][10240 rows in 10 vectors] -> t0:DOUBLE, t1:DATE, tp2:BOOLEAN, tp3:REAL, tp4:SMALLINT, tp5:INTEGER
  -- Values[1][1290 rows in 10 vectors] -> u0:DOUBLE, u1:DATE, bp2:BOOLEAN

E0117 13:19:37.878841 2044573 Exceptions.h:66] Line: /var/git/velox/velox/exec/HashProbe.cpp:1622, Function:evalFilter, Expression: numPassed <= outputTableRowsCapacity_ (1025 vs. 1024), Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: (1025 vs. 1024)
Retriable: False
Expression: numPassed <= outputTableRowsCapacity_
Context: Operator: HashProbe[2] 1
Function: evalFilter
File: /var/git/velox/velox/exec/HashProbe.cpp
Line: 1622
Stack trace:
# 0  facebook::velox::VeloxException::VeloxException(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)
# 1  void facebook::velox::detail::veloxCheckFail<facebook::velox::VeloxRuntimeError, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(facebook::velox::detail::VeloxCheckFailArgs const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)
# 2  facebook::velox::exec::HashProbe::evalFilter(int, int)
# 3  facebook::velox::exec::HashProbe::getOutputInternal(bool)
# 4  facebook::velox::exec::HashProbe::getOutput()
# 5  facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exec::Driver>&, std::shared_ptr<facebook::velox::exec::BlockingState>&, std::shared_ptr<facebook::velox::RowVector>&)::{lambda()https://github.com/facebookincubator/velox/issues/5}::operator()() const
# 6  facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exec::Driver>&, std::shared_ptr<facebook::velox::exec::BlockingState>&, std::shared_ptr<facebook::velox::RowVector>&)
# 7  facebook::velox::exec::Driver::run(std::shared_ptr<facebook::velox::exec::Driver>)
# 8  void folly::detail::function::call_<facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::Driver>)::{lambda()facebookincubator#1}, true, false, void>(, folly::detail::function::Data&)
# 9  folly::ThreadPoolExecutor::runTask(std::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&)
# 10 folly::CPUThreadPoolExecutor::threadRun(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)
# 11 void folly::detail::function::call_<std::_Bind<void (folly::ThreadPoolExecutor::*(folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)>, true, false, void>(, folly::detail::function::Data&)
# 12 0x00000000000dc252
# 13 0x0000000000094ac2
# 14 0x000000000012684f
```

Pull Request resolved: facebookincubator#12108

Reviewed By: bikramSingh91

Differential Revision: D68544370

Pulled By: xiaoxmeng

fbshipit-source-id: 09eb7f9e42411daf50be8944e06aac056eca9511
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Jan 24, 2025
1 parent 419de77 commit edd7e8f
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
3 changes: 2 additions & 1 deletion velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,8 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
outputTableRowsCapacity_ = outputBatchSize;
if (filter_ &&
(isLeftJoin(joinType_) || isFullJoin(joinType_) ||
isAntiJoin(joinType_))) {
isAntiJoin(joinType_) || isLeftSemiFilterJoin(joinType_) ||
isLeftSemiProjectJoin(joinType_))) {
// If we need non-matching probe side row, there is a possibility that such
// row exists at end of an input batch and being carried over in the next
// output batch, so we need to make extra room of one row in output.
Expand Down
75 changes: 75 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3858,6 +3858,81 @@ TEST_F(HashJoinTest, nullAwareRightSemiProjectWithFilterNotAllowed) {
"Null-aware right semi project join doesn't support extra filter");
}

TEST_F(HashJoinTest, leftSemiJoinWithExtraOutputCapacity) {
std::vector<RowVectorPtr> probeVectors;
std::vector<RowVectorPtr> buildVectors;
probeVectors.push_back(makeRowVector(
{"t0", "t1"},
{
makeFlatVector<int32_t>({1, 2, 3, 4, 5, 6}),
makeFlatVector<int64_t>({10, 10, 10, 10, 10, 10}),
}));

buildVectors.push_back(makeRowVector(
{"u0", "u1"},
{
makeFlatVector<int32_t>({1, 1, 1, 1, 1}),
makeFlatVector<int64_t>({10, 10, 10, 10, 10}),
}));
buildVectors.push_back(makeRowVector(
{"u0", "u1"},
{
makeFlatVector<int32_t>({2, 3, 4, 5, 6}),
makeFlatVector<int64_t>({10, 10, 10, 10, 10}),
}));

createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);
auto runQuery = [&](const std::string& query,
const std::string& filter,
core::JoinType joinType) {
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
std::vector<std::string> outputLayout = {"t0", "t1"};
if (joinType == core::JoinType::kLeftSemiProject) {
outputLayout.push_back("match");
}
auto plan = PlanBuilder(planNodeIdGenerator)
.values(probeVectors)
.hashJoin(
{"t0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.values(buildVectors)
.planNode(),
filter,
outputLayout,
joinType,
false)
.planNode();
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(plan)
.config(core::QueryConfig::kPreferredOutputBatchRows, "5")
.referenceQuery(query)
.injectSpill(false)
.run();
};
{
SCOPED_TRACE("left semi filter join");
std::string filter = "t1 = u1";
runQuery(
fmt::format(
"SELECT t0, t1 FROM t WHERE EXISTS (SELECT u0 FROM u WHERE t0 = u0 AND {})",
filter),
filter,
core::JoinType::kLeftSemiFilter);
}

{
SCOPED_TRACE("left semi project join");
std::string filter = "t1 <> u1";
runQuery(
fmt::format(
"SELECT t0, t1, t0 IN (SELECT u0 FROM u WHERE {}) FROM t", filter),
filter,
core::JoinType::kLeftSemiProject);
}
}

TEST_F(HashJoinTest, nullAwareMultiKeyNotAllowed) {
auto probe = makeRowVector(
ROW({"t0", "t1", "t2"}, {INTEGER(), BIGINT(), VARCHAR()}), 10);
Expand Down

0 comments on commit edd7e8f

Please sign in to comment.