Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Support the creation of tags based on the latest snapshot #2533

Merged
merged 2 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ You can create a tag with given name (cannot be number) and snapshot ID.
--database <database-name> \
--table <table-name> \
--tag_name <tag-name> \
--snapshot <snapshot-id> \
[--snapshot <snapshot_id>] \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to modify Flink and Spark procedure too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```

If `snapshot` unset, snapshot_id defaults to the latest.

{{< /tab >}}

{{< tab "Java API" >}}
Expand All @@ -136,6 +138,11 @@ Run the following sql:
CALL create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2);
```

To create a tag based on the latest snapshot id, run the following sql:
```sql
CALL create_tag(table => 'test.T', tag => 'test_tag');
```

{{< /tab >}}

{{< /tabs >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,34 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

import java.util.Map;
import java.util.Objects;

/** Create tag action for Flink. */
public class CreateTagAction extends TableActionBase {

private final String tagName;
private final long snapshotId;
private final Long snapshotId;

public CreateTagAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String tagName,
long snapshotId) {
Long snapshotId) {
super(warehouse, databaseName, tableName, catalogConfig);
this.tagName = tagName;
this.snapshotId = snapshotId;
}

@Override
public void run() throws Exception {
table.createTag(tagName, snapshotId);
SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager();
Long idToUse = snapshotId == null ? snapshotManager.latestSnapshotId() : snapshotId;
table.createTag(tagName, Objects.requireNonNull(idToUse));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ public String identifier() {
@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, TAG_NAME);
checkRequiredArgument(params, SNAPSHOT);

Tuple3<String, String, String> tablePath = getTablePath(params);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tagName = params.get(TAG_NAME);
long snapshot = Long.parseLong(params.get(SNAPSHOT));

Long snapshot = null;
if (params.has(SNAPSHOT)) {
snapshot = Long.parseLong(params.get(SNAPSHOT));
}

CreateTagAction action =
new CreateTagAction(
Expand All @@ -60,7 +63,7 @@ public void printHelp() {
System.out.println("Syntax:");
System.out.println(
" create_tag --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --tag_name <tag_name> --snapshot <snapshot_id>");
+ "--table <table_name> --tag_name <tag_name> [--snapshot <snapshot_id>]");
System.out.println();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Objects;

/**
* Create tag procedure. Usage:
*
Expand All @@ -44,6 +48,16 @@ public String[] call(
return new String[] {"Success"};
}

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
SnapshotManager snapshotManager = ((AbstractFileStoreTable) table).snapshotManager();
long latestSnapshotId = Objects.requireNonNull(snapshotManager.latestSnapshotId());

table.createTag(tagName, latestSnapshotId);
return new String[] {"Success"};
}

@Override
public String identifier() {
return IDENTIFIER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,50 @@ public void testCreateAndDeleteTag() throws Exception {
}
assertThat(tagManager.tagExists("tag2")).isFalse();
}

@Test
public void testCreateLatestTag() throws Exception {
init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

TagManager tagManager = new TagManager(table.fileIO(), table.location());

if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
CreateTagAction.class,
"create_tag",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--tag_name",
"tag2")
.run();
} else {
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
}
assertThat(tagManager.tagExists("tag2")).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_latestSnapshot_tag") :: Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
} finally {
stream.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.table.AbstractFileStoreTable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -26,6 +28,8 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Objects;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

Expand All @@ -36,7 +40,7 @@ public class CreateTagProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("tag", StringType),
ProcedureParameter.required("snapshot", LongType)
ProcedureParameter.optional("snapshot", LongType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -68,7 +72,14 @@ public InternalRow[] call(InternalRow args) {
return modifyPaimonTable(
tableIdent,
table -> {
table.createTag(tag, snapshot);
long snapshotToUse =
(snapshot == 0)
? Objects.requireNonNull(
((AbstractFileStoreTable) table)
.snapshotManager()
.latestSnapshotId())
: snapshot;
table.createTag(tag, snapshotToUse);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_latestSnapshot_tag") :: Nil)
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
} finally {
stream.stop()
}
Expand Down
Loading