From aee5ded203983b4e6e9559d41aa2f2da6f8515b0 Mon Sep 17 00:00:00 2001 From: chenzhuoyu Date: Tue, 19 Dec 2023 14:55:25 +0800 Subject: [PATCH 1/2] [flink] Support the creation of tags based on the latest snapshot --- docs/content/maintenance/manage-tags.md | 2 +- .../paimon/flink/action/CreateTagAction.java | 11 ++++- .../flink/action/CreateTagActionFactory.java | 13 ++++-- .../paimon/flink/action/TagActionITCase.java | 46 +++++++++++++++++++ 4 files changed, 67 insertions(+), 5 deletions(-) diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index 865e68b5e5b2..a631fc60f096 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -108,7 +108,7 @@ You can create a tag with given name (cannot be number) and snapshot ID. --database \ --table \ --tag_name \ - --snapshot \ + [--snapshot ] \ [--catalog_conf [--catalog_conf ...]] ``` diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 948cec406324..9317c696f06a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -18,7 +18,11 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.utils.SnapshotManager; + import java.util.Map; +import java.util.Objects; /** Create tag action for Flink. */ public class CreateTagAction extends TableActionBase { @@ -40,6 +44,11 @@ public CreateTagAction( @Override public void run() throws Exception { - table.createTag(tagName, snapshotId); + SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager(); + long idToUse = + (snapshotId >= 0) + ? snapshotId + : Objects.requireNonNull(snapshotManager.latestSnapshot()).id(); + table.createTag(tagName, idToUse); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index bfc9510d764c..c10fc5792e4c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -39,12 +39,19 @@ public String identifier() { @Override public Optional create(MultipleParameterToolAdapter params) { checkRequiredArgument(params, TAG_NAME); - checkRequiredArgument(params, SNAPSHOT); Tuple3 tablePath = getTablePath(params); Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); String tagName = params.get(TAG_NAME); - long snapshot = Long.parseLong(params.get(SNAPSHOT)); + + long snapshot = -1; + if (params.has(SNAPSHOT)) { + try { + snapshot = Long.parseLong(params.get(SNAPSHOT)); + } catch (NumberFormatException e) { + System.out.println("Warning: Invalid snapshot ID provided. Using latest snapshot."); + } + } CreateTagAction action = new CreateTagAction( @@ -60,7 +67,7 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " create_tag --warehouse --database " - + "--table --tag_name --snapshot "); + + "--table --tag_name [--snapshot ]"); System.out.println(); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java index b5c616707174..a01849ed94ca 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java @@ -111,4 +111,50 @@ public void testCreateAndDeleteTag() throws Exception { } assertThat(tagManager.tagExists("tag2")).isFalse(); } + + @Test + public void testCreateLatestTag() throws Exception { + init(warehouse); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + + if (ThreadLocalRandom.current().nextBoolean()) { + createAction( + CreateTagAction.class, + "create_tag", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--tag_name", + "tag2") + .run(); + } else { + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + } + assertThat(tagManager.tagExists("tag2")).isTrue(); + } } From 31a901087b94e5ed961c38862dc2c02be79efabe Mon Sep 17 00:00:00 2001 From: chenzhuoyu Date: Wed, 20 Dec 2023 13:46:09 +0800 Subject: [PATCH 2/2] Fix the review content --- docs/content/maintenance/manage-tags.md | 7 +++++++ .../paimon/flink/action/CreateTagAction.java | 11 ++++------- .../flink/action/CreateTagActionFactory.java | 8 ++------ .../flink/procedure/CreateTagProcedure.java | 14 ++++++++++++++ .../sql/CreateAndDeleteTagProcedureTest.scala | 12 ++++++++++++ .../spark/procedure/CreateTagProcedure.java | 15 +++++++++++++-- .../CreateAndDeleteTagProcedureTest.scala | 12 ++++++++++++ 7 files changed, 64 insertions(+), 15 deletions(-) diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index a631fc60f096..cbc2d859ae2e 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -112,6 +112,8 @@ You can create a tag with given name (cannot be number) and snapshot ID. [--catalog_conf [--catalog_conf ...]] ``` +If `snapshot` unset, snapshot_id defaults to the latest. + {{< /tab >}} {{< tab "Java API" >}} @@ -136,6 +138,11 @@ Run the following sql: CALL create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2); ``` +To create a tag based on the latest snapshot id, run the following sql: +```sql +CALL create_tag(table => 'test.T', tag => 'test_tag'); +``` + {{< /tab >}} {{< /tabs >}} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 9317c696f06a..41b35c598aee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -28,7 +28,7 @@ public class CreateTagAction extends TableActionBase { private final String tagName; - private final long snapshotId; + private final Long snapshotId; public CreateTagAction( String warehouse, @@ -36,7 +36,7 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, - long snapshotId) { + Long snapshotId) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; this.snapshotId = snapshotId; @@ -45,10 +45,7 @@ public CreateTagAction( @Override public void run() throws Exception { SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager(); - long idToUse = - (snapshotId >= 0) - ? snapshotId - : Objects.requireNonNull(snapshotManager.latestSnapshot()).id(); - table.createTag(tagName, idToUse); + Long idToUse = snapshotId == null ? snapshotManager.latestSnapshotId() : snapshotId; + table.createTag(tagName, Objects.requireNonNull(idToUse)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index c10fc5792e4c..01bbdba42abf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -44,13 +44,9 @@ public Optional create(MultipleParameterToolAdapter params) { Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); String tagName = params.get(TAG_NAME); - long snapshot = -1; + Long snapshot = null; if (params.has(SNAPSHOT)) { - try { - snapshot = Long.parseLong(params.get(SNAPSHOT)); - } catch (NumberFormatException e) { - System.out.println("Warning: Invalid snapshot ID provided. Using latest snapshot."); - } + snapshot = Long.parseLong(params.get(SNAPSHOT)); } CreateTagAction action = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 9cd4067bcb94..9ebbccfa6106 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -20,10 +20,14 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Objects; + /** * Create tag procedure. Usage: * @@ -44,6 +48,16 @@ public String[] call( return new String[] {"Success"}; } + public String[] call(ProcedureContext procedureContext, String tableId, String tagName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager(); + long latestSnapshotId = Objects.requireNonNull(snapshotManager.latestSnapshotId()); + + table.createTag(tagName, latestSnapshotId); + return new String[] {"Success"}; + } + @Override public String identifier() { return IDENTIFIER; diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala index 30e112ab5507..b1c469b8c932 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"), Row(true) :: Nil) checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_latestSnapshot_tag") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) } finally { stream.stop() } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index c0c63b9b0570..1389dbc01d50 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.table.AbstractFileStoreTable; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.util.Objects; + import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -36,7 +40,7 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), - ProcedureParameter.required("snapshot", LongType) + ProcedureParameter.optional("snapshot", LongType) }; private static final StructType OUTPUT_TYPE = @@ -68,7 +72,14 @@ public InternalRow[] call(InternalRow args) { return modifyPaimonTable( tableIdent, table -> { - table.createTag(tag, snapshot); + long snapshotToUse = + (snapshot == 0) + ? Objects.requireNonNull( + ((AbstractFileStoreTable) table) + .snapshotManager() + .latestSnapshotId()) + : snapshot; + table.createTag(tag, snapshotToUse); InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; }); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index f9f5592737c5..0e642979abfe 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"), Row(true) :: Nil) checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_latestSnapshot_tag") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) } finally { stream.stop() }