Skip to content

Commit

Permalink
Merge pull request #23 from Ackuq/fix/keys-zip
Browse files Browse the repository at this point in the history
Fix zipping of keys
  • Loading branch information
Ackuq authored May 4, 2022
2 parents e8c6964 + d62dbe2 commit be77747
Showing 1 changed file with 35 additions and 34 deletions.
69 changes: 35 additions & 34 deletions scala/src/main/scala/execution/PITJoinExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,11 @@ protected[pit] case class PITJoinExec(

private def copyKeys(
ctx: CodegenContext,
vars: Seq[ExprCode]
vars: Seq[ExprCode],
keys: Seq[Expression]
): Seq[ExprCode] = {
vars.zipWithIndex.map { case (ev, i) =>
ctx.addBufferedState(leftKeys(i).dataType, "value", ev.value)
ctx.addBufferedState(keys(i).dataType, "value", ev.value)
}
}

Expand All @@ -298,7 +299,7 @@ protected[pit] case class PITJoinExec(
val toleranceCheck =
leftPIT.zip(rightPIT).zipWithIndex.map { case ((l, r), i) =>
s"""
| (${l.value} - ${r.value} > ${tolerance})
| (${l.value} - ${r.value} > $tolerance)
|""".stripMargin
}
toleranceCheck.mkString(" && ")
Expand Down Expand Up @@ -399,7 +400,7 @@ protected[pit] case class PITJoinExec(
): Seq[ExprCode] = {
ctx.INPUT_ROW = rightRow
right.output.zipWithIndex.map { case (a, i) =>
val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx);
val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
if (returnNulls) {
val isNull = ctx.freshName("isNull")
val value = ctx.freshName("value")
Expand Down Expand Up @@ -453,14 +454,14 @@ protected[pit] case class PITJoinExec(
.mkString(" || ")

// Copy the right key as class members so they could be used in next function call.
val rightPITKeyVars = copyKeys(ctx, rightPITKeyTmpVars)
val rightEquiKeyVars = copyKeys(ctx, rightEquiKeyTmpVars)
val rightPITKeyVars = copyKeys(ctx, rightPITKeyTmpVars, leftPitKeys)
val rightEquiKeyVars = copyKeys(ctx, rightEquiKeyTmpVars, leftEquiKeys)

val matched =
ctx.addMutableState("InternalRow", "matched", forceInline = true)

val matchedPITKeyVars = copyKeys(ctx, leftPITKeyVars)
val matchedEquiKeyVars = copyKeys(ctx, leftEquiKeyVars)
val matchedPITKeyVars = copyKeys(ctx, leftPITKeyVars, leftPitKeys)
val matchedEquiKeyVars = copyKeys(ctx, leftEquiKeyVars, leftEquiKeys)

ctx.addNewFunction(
"findNextInnerJoinRows",
Expand Down Expand Up @@ -623,32 +624,32 @@ protected[pit] case class PITJoinExec(

val thisPlan = ctx.addReferenceObj("plan", this)
val eagerCleanup = s"$thisPlan.cleanupResources();"
returnNulls match {
case false => s"""
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
| ${leftVarDecl.mkString("\n")}
| ${beforeLoop.trim}
| InternalRow $rightRow = (InternalRow) $matched;
| ${condCheck.trim}
| $numOutput.add(1);
| ${consume(ctx, leftVars ++ rightVars)}
| if (shouldStop()) return;
|}
|$eagerCleanup
|""".stripMargin
case true =>
s"""
|while($leftInput.hasNext()) {
| findNextInnerJoinRows($leftInput, $rightInput);
| ${leftVarDecl.mkString("\n")}
| ${beforeLoop.trim}
| InternalRow $rightRow = (InternalRow) $matched;
| ${condCheck.trim}
| $numOutput.add(1);
| ${consume(ctx, leftVars ++ rightVars)};
| if (shouldStop()) return;
|}
|""".stripMargin
if (returnNulls) {
s"""
|while($leftInput.hasNext()) {
| findNextInnerJoinRows($leftInput, $rightInput);
| ${leftVarDecl.mkString("\n")}
| ${beforeLoop.trim}
| InternalRow $rightRow = (InternalRow) $matched;
| ${condCheck.trim}
| $numOutput.add(1);
| ${consume(ctx, leftVars ++ rightVars)};
| if (shouldStop()) return;
|}
|""".stripMargin
} else {
s"""
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
| ${leftVarDecl.mkString("\n")}
| ${beforeLoop.trim}
| InternalRow $rightRow = (InternalRow) $matched;
| ${condCheck.trim}
| $numOutput.add(1);
| ${consume(ctx, leftVars ++ rightVars)}
| if (shouldStop()) return;
|}
|$eagerCleanup
|""".stripMargin
}
}
}
Expand Down

0 comments on commit be77747

Please sign in to comment.