From 051eb8943d6e991e1d3daed49f44e9729252c16a Mon Sep 17 00:00:00 2001 From: chenzhuoyu Date: Wed, 20 Dec 2023 13:46:09 +0800 Subject: [PATCH] Fix the review content --- docs/content/maintenance/manage-tags.md | 7 +++++++ .../paimon/flink/action/CreateTagAction.java | 11 ++++------- .../flink/action/CreateTagActionFactory.java | 8 ++------ .../flink/procedure/CreateTagProcedure.java | 14 ++++++++++++++ .../sql/CreateAndDeleteTagProcedureTest.scala | 12 ++++++++++++ .../spark/procedure/CreateTagProcedure.java | 15 +++++++++++++-- .../CreateAndDeleteTagProcedureTest.scala | 12 ++++++++++++ 7 files changed, 64 insertions(+), 15 deletions(-) diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index a631fc60f0963..cbc2d859ae2ec 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -112,6 +112,8 @@ You can create a tag with given name (cannot be number) and snapshot ID. [--catalog_conf [--catalog_conf ...]] ``` +If `snapshot` unset, snapshot_id defaults to the latest. + {{< /tab >}} {{< tab "Java API" >}} @@ -136,6 +138,11 @@ Run the following sql: CALL create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2); ``` +To create a tag based on the latest snapshot id, run the following sql: +```sql +CALL create_tag(table => 'test.T', tag => 'test_tag'); +``` + {{< /tab >}} {{< /tabs >}} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 9317c696f06a1..dd3e7a4862057 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -28,7 +28,7 @@ public class CreateTagAction extends TableActionBase { private final String tagName; - private final long snapshotId; + private final Long snapshotId; public CreateTagAction( String warehouse, @@ -36,7 +36,7 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, - long snapshotId) { + Long snapshotId) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; this.snapshotId = snapshotId; @@ -45,10 +45,7 @@ public CreateTagAction( @Override public void run() throws Exception { SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager(); - long idToUse = - (snapshotId >= 0) - ? snapshotId - : Objects.requireNonNull(snapshotManager.latestSnapshot()).id(); - table.createTag(tagName, idToUse); + Long idToUse = snapshotId == null ? snapshotId : snapshotManager.latestSnapshotId(); + table.createTag(tagName, Objects.requireNonNull(idToUse)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index c10fc5792e4c2..01bbdba42abff 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -44,13 +44,9 @@ public Optional create(MultipleParameterToolAdapter params) { Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); String tagName = params.get(TAG_NAME); - long snapshot = -1; + Long snapshot = null; if (params.has(SNAPSHOT)) { - try { - snapshot = Long.parseLong(params.get(SNAPSHOT)); - } catch (NumberFormatException e) { - System.out.println("Warning: Invalid snapshot ID provided. Using latest snapshot."); - } + snapshot = Long.parseLong(params.get(SNAPSHOT)); } CreateTagAction action = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 9cd4067bcb941..9ebbccfa6106d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -20,10 +20,14 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.SnapshotManager; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Objects; + /** * Create tag procedure. Usage: * @@ -44,6 +48,16 @@ public String[] call( return new String[] {"Success"}; } + public String[] call(ProcedureContext procedureContext, String tableId, String tagName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager(); + long latestSnapshotId = Objects.requireNonNull(snapshotManager.latestSnapshotId()); + + table.createTag(tagName, latestSnapshotId); + return new String[] {"Success"}; + } + @Override public String identifier() { return IDENTIFIER; diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala index 30e112ab55077..b1c469b8c932d 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"), Row(true) :: Nil) checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_latestSnapshot_tag") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) } finally { stream.stop() } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index c0c63b9b0570f..1389dbc01d502 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.table.AbstractFileStoreTable; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.util.Objects; + import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -36,7 +40,7 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), - ProcedureParameter.required("snapshot", LongType) + ProcedureParameter.optional("snapshot", LongType) }; private static final StructType OUTPUT_TYPE = @@ -68,7 +72,14 @@ public InternalRow[] call(InternalRow args) { return modifyPaimonTable( tableIdent, table -> { - table.createTag(tag, snapshot); + long snapshotToUse = + (snapshot == 0) + ? Objects.requireNonNull( + ((AbstractFileStoreTable) table) + .snapshotManager() + .latestSnapshotId()) + : snapshot; + table.createTag(tag, snapshotToUse); InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; }); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index f9f5592737c5e..0e642979abfe1 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"), Row(true) :: Nil) checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_latestSnapshot_tag") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) } finally { stream.stop() }