diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index 8328900056a..c6cce250d4b 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.CompletableFuture; import org.apache.commons.cli.Options; @@ -24,6 +25,8 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.expressions.Literal; +import io.delta.kernel.hook.PostCommitHook; +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType; import io.delta.kernel.utils.*; import static io.delta.kernel.examples.utils.Utils.parseArgs; @@ -409,11 +412,17 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException { // for every 10 versions. for (int i = 0; i < 12; i++) { TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath); - if (commitResult.isReadyForCheckpoint()) { + for(PostCommitHook hook: commitResult.getPostCommitHooks()) // Checkpoint the table - Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion()); - didCheckpoint = true; - } + didCheckpoint = didCheckpoint || CompletableFuture.supplyAsync(() -> { + // run the code async + try{ + hook.threadSafeInvoke(engine); + } catch (IOException e) { + return false; + } + return hook.getType().equals(PostCommitHookType.CHECKPOINT); + }).join(); // wait async finish. } if (!didCheckpoint) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 831fa18245c..f95f5b3a537 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -17,7 +17,9 @@ import io.delta.kernel.annotation.Evolving; import io.delta.kernel.engine.Engine; +import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.utils.CloseableIterable; +import java.util.List; /** * Contains the result of a successful transaction commit. Returned by {@link @@ -28,11 +30,11 @@ @Evolving public class TransactionCommitResult { private final long version; - private final boolean isReadyForCheckpoint; + private final List postCommitHooks; - public TransactionCommitResult(long version, boolean isReadyForCheckpoint) { + public TransactionCommitResult(long version, List postCommitHooks) { this.version = version; - this.isReadyForCheckpoint = isReadyForCheckpoint; + this.postCommitHooks = postCommitHooks; } /** @@ -45,13 +47,19 @@ public long getVersion() { } /** - * Is the table ready for checkpoint (i.e. there are enough commits since the last checkpoint)? If - * yes the connector can choose to checkpoint as the version the transaction is committed as using - * {@link Table#checkpoint(Engine, long)} + * Operations for connector to trigger post-commit. * - * @return Is the table ready for checkpointing? + *

Usage: + * + *

+ * + * @return list of post-commit operations */ - public boolean isReadyForCheckpoint() { - return isReadyForCheckpoint; + public List getPostCommitHooks() { + return postCommitHooks; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java new file mode 100644 index 00000000000..173168ff1a0 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -0,0 +1,43 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.hook; + +import io.delta.kernel.engine.Engine; +import java.io.IOException; + +/** + * A hook for executing operation after a transaction commit. Hooks are added in the Transaction and + * engine need to invoke the hook explicitly for executing the operation. Supported operations are + * listed in {@link PostCommitHookType}. + */ +public interface PostCommitHook { + + enum PostCommitHookType { + /** + * Writes a new checkpoint at the version committed by the transaction. This hook is present + * when the table is ready for checkpoint according to its configured checkpoint interval. To + * perform this operation, reading previous checkpoint + logs is required to construct a new + * checkpoint, with latency scaling based on log size (typically seconds to minutes). + */ + CHECKPOINT + } + + /** Invokes the post commit operation whose implementation must be thread safe. */ + void threadSafeInvoke(Engine engine) throws IOException; + + PostCommitHookType getType(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 44b0b49ccd9..b2bef46757a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -27,9 +27,11 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.expressions.Column; +import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.data.TransactionStateRow; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.hook.CheckpointHook; import io.delta.kernel.internal.metrics.TransactionMetrics; import io.delta.kernel.internal.metrics.TransactionReportImpl; import io.delta.kernel.internal.replay.ConflictChecker; @@ -354,7 +356,12 @@ private TransactionCommitResult doCommit( "Write file actions to JSON log file `%s`", FileNames.deltaFile(logPath, commitAsVersion)); - return new TransactionCommitResult(commitAsVersion, isReadyForCheckpoint(commitAsVersion)); + List postCommitHooks = new ArrayList<>(); + if (isReadyForCheckpoint(commitAsVersion)) { + postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion)); + } + + return new TransactionCommitResult(commitAsVersion, postCommitHooks); } catch (FileAlreadyExistsException e) { throw e; } catch (IOException ioe) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java new file mode 100644 index 00000000000..bb291422956 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java @@ -0,0 +1,44 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.hook; + +import io.delta.kernel.Table; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.hook.PostCommitHook; +import io.delta.kernel.internal.fs.Path; +import java.io.IOException; + +/** Write a new checkpoint at the version committed by the txn. */ +public class CheckpointHook implements PostCommitHook { + + private final Path tablePath; + private final long checkpointVersion; + + public CheckpointHook(Path tablePath, long checkpointVersion) { + this.tablePath = tablePath; + this.checkpointVersion = checkpointVersion; + } + + @Override + public void threadSafeInvoke(Engine engine) throws IOException { + Table.forPath(engine, tablePath.toString()).checkpoint(engine, checkpointVersion); + } + + @Override + public PostCommitHookType getType() { + return PostCommitHookType.CHECKPOINT; + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 6abef53db27..6f24cca7aa1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -26,8 +26,15 @@ import io.delta.kernel.internal.util.FileNames import io.delta.kernel.internal.util.Utils.singletonCloseableIterator import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl} import io.delta.kernel.utils.FileStatus -import io.delta.kernel.{Meta, Operation, Table, Transaction, TransactionBuilder, TransactionCommitResult} -import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row} +import io.delta.kernel.{ + Meta, + Operation, + Table, + Transaction, + TransactionBuilder, + TransactionCommitResult +} +import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row} import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal.ofInt @@ -39,6 +46,7 @@ import io.delta.kernel.types.StructType import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} import io.delta.kernel.utils.CloseableIterator import io.delta.kernel.Operation.CREATE_TABLE +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -139,10 +147,14 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { tablePath: String, result: TransactionCommitResult, expSize: Long): Unit = { - if (result.isReadyForCheckpoint) { - Table.forPath(engine, tablePath).checkpoint(engine, result.getVersion) - verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize) - } + result.getPostCommitHooks.forEach( + hook => { + if (hook.getType == PostCommitHookType.CHECKPOINT) { + hook.threadSafeInvoke(engine) + verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize) + } + } + ) } /** @@ -399,7 +411,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { expVersion: Long, expIsReadyForCheckpoint: Boolean): Unit = { assert(result.getVersion === expVersion) - assert(result.isReadyForCheckpoint === expIsReadyForCheckpoint) + assertCheckpointReadiness(result, expIsReadyForCheckpoint) } def verifyTableProperties( @@ -421,4 +433,16 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { builder.append("]") checkAnswer(resultProperties, Seq(builder.toString()).map(TestRow(_))) } + + def assertCheckpointReadiness( + txnResult: TransactionCommitResult, + isReadyForCheckpoint: Boolean): Unit = { + assert( + txnResult.getPostCommitHooks + .stream() + .anyMatch( + hook => hook.getType == PostCommitHookType.CHECKPOINT + ) === isReadyForCheckpoint + ) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 83b77ef208b..22f3d07fcd0 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -25,6 +25,7 @@ import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions._ import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal._ +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames import io.delta.kernel.internal.{SnapshotImpl, TableConfig} @@ -131,7 +132,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert(!txnResult.isReadyForCheckpoint) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath = tablePath, version = 0) verifyWrittenContent(tablePath, testSchema, Seq.empty) @@ -350,7 +351,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert(!txnResult.isReadyForCheckpoint) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) verifyWrittenContent(tablePath, schema, Seq.empty) @@ -368,7 +369,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert(!txnResult.isReadyForCheckpoint) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath, version = 0) verifyWrittenContent(tablePath, schema, Seq.empty)