Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Dec 20, 2023
1 parent aee5ded commit 051eb89
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 15 deletions.
7 changes: 7 additions & 0 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ You can create a tag with given name (cannot be number) and snapshot ID.
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
If `snapshot` unset, snapshot_id defaults to the latest.
{{< /tab >}}
{{< tab "Java API" >}}
Expand All @@ -136,6 +138,11 @@ Run the following sql:
CALL create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2);
```
To create a tag based on the latest snapshot id, run the following sql:
```sql
CALL create_tag(table => 'test.T', tag => 'test_tag');
```
{{< /tab >}}
{{< /tabs >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
public class CreateTagAction extends TableActionBase {

private final String tagName;
private final long snapshotId;
private final Long snapshotId;

public CreateTagAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String tagName,
long snapshotId) {
Long snapshotId) {
super(warehouse, databaseName, tableName, catalogConfig);
this.tagName = tagName;
this.snapshotId = snapshotId;
Expand All @@ -45,10 +45,7 @@ public CreateTagAction(
@Override
public void run() throws Exception {
SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager();
long idToUse =
(snapshotId >= 0)
? snapshotId
: Objects.requireNonNull(snapshotManager.latestSnapshot()).id();
table.createTag(tagName, idToUse);
Long idToUse = snapshotId == null ? snapshotId : snapshotManager.latestSnapshotId();
table.createTag(tagName, Objects.requireNonNull(idToUse));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,9 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tagName = params.get(TAG_NAME);

long snapshot = -1;
Long snapshot = null;
if (params.has(SNAPSHOT)) {
try {
snapshot = Long.parseLong(params.get(SNAPSHOT));
} catch (NumberFormatException e) {
System.out.println("Warning: Invalid snapshot ID provided. Using latest snapshot.");
}
snapshot = Long.parseLong(params.get(SNAPSHOT));
}

CreateTagAction action =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Objects;

/**
* Create tag procedure. Usage:
*
Expand All @@ -44,6 +48,16 @@ public String[] call(
return new String[] {"Success"};
}

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager();
long latestSnapshotId = Objects.requireNonNull(snapshotManager.latestSnapshotId());

table.createTag(tagName, latestSnapshotId);
return new String[] {"Success"};
}

@Override
public String identifier() {
return IDENTIFIER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_latestSnapshot_tag") :: Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
} finally {
stream.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.table.AbstractFileStoreTable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -26,6 +28,8 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Objects;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

Expand All @@ -36,7 +40,7 @@ public class CreateTagProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("tag", StringType),
ProcedureParameter.required("snapshot", LongType)
ProcedureParameter.optional("snapshot", LongType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -68,7 +72,14 @@ public InternalRow[] call(InternalRow args) {
return modifyPaimonTable(
tableIdent,
table -> {
table.createTag(tag, snapshot);
long snapshotToUse =
(snapshot == 0)
? Objects.requireNonNull(
((AbstractFileStoreTable) table)
.snapshotManager()
.latestSnapshotId())
: snapshot;
table.createTag(tag, snapshotToUse);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_latestSnapshot_tag") :: Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
} finally {
stream.stop()
}
Expand Down

0 comments on commit 051eb89

Please sign in to comment.