Skip to content

Commit

Permalink
[Spark] Allow setting the same active Delta transaction (#3660)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

OptimisticTransaction.setActive() and OptimisticTransaction.withActive()
methods will fail if the active transaction is already set, even if the
caller tries to set the same transaction. This commit fixes this issue
and allows setting the same transaction instance.

## How was this patch tested?

New and existing tests.

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
cstavr authored Sep 10, 2024
1 parent b7ef01c commit 559ca40
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ class DeltaLog private(
catalogTableOpt: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None)(
thunk: OptimisticTransaction => T): T = {
val txn = startTransaction(catalogTableOpt, snapshotOpt)
OptimisticTransaction.setActive(txn)
try {
val txn = startTransaction(catalogTableOpt, snapshotOpt)
OptimisticTransaction.setActive(txn)
thunk(txn)
} finally {
OptimisticTransaction.clearActive()
Expand All @@ -233,9 +233,9 @@ class DeltaLog private(
/** Legacy/compat overload that does not require catalog table information. Avoid prod use. */
@deprecated("Please use the CatalogTable overload instead", "3.0")
def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
val txn = startTransaction()
OptimisticTransaction.setActive(txn)
try {
val txn = startTransaction()
OptimisticTransaction.setActive(txn)
thunk(txn)
} finally {
OptimisticTransaction.clearActive()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,14 @@ object OptimisticTransaction {
* `OptimisticTransaction.withNewTransaction`. Use that to create and set active txns.
*/
private[delta] def setActive(txn: OptimisticTransaction): Unit = {
if (active.get != null) {
throw DeltaErrors.activeTransactionAlreadySet()
getActive() match {
case Some(activeTxn) =>
if (!(activeTxn eq txn)) {
throw DeltaErrors.activeTransactionAlreadySet()
}
case _ =>
active.set(txn)
}
active.set(txn)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,38 @@ trait DeltaWithNewTransactionSuiteBase extends QueryTest
withTempDir { dir =>
val log = DeltaLog.forTable(spark, dir.getCanonicalPath)
log.withNewTransaction { txn =>

require(OptimisticTransaction.getActive().nonEmpty)
assert(OptimisticTransaction.getActive() === Some(txn))
intercept[IllegalStateException] {
OptimisticTransaction.setActive(txn)
log.withNewTransaction { txn2 => }
}
assert(OptimisticTransaction.getActive() === Some(txn))
}
assert(OptimisticTransaction.getActive().isEmpty)
}
}

test("withActiveTxn idempotency") {
withTempDir { dir =>
val log = DeltaLog.forTable(spark, dir.getCanonicalPath)
val txn = log.startTransaction()
assert(OptimisticTransaction.getActive().isEmpty)
OptimisticTransaction.withActive(txn) {
assert(OptimisticTransaction.getActive() === Some(txn))
OptimisticTransaction.withActive(txn) {
assert(OptimisticTransaction.getActive() === Some(txn))
}
assert(OptimisticTransaction.getActive() === Some(txn))

val txn2 = log.startTransaction()
intercept[IllegalStateException] {
log.withNewTransaction { txn2 => }
OptimisticTransaction.withActive(txn2) { }
}
intercept[IllegalStateException] {
OptimisticTransaction.setActive(txn2)
}
assert(OptimisticTransaction.getActive() === Some(txn))
}
assert(OptimisticTransaction.getActive().isEmpty)
}
}

Expand Down

0 comments on commit 559ca40

Please sign in to comment.