Skip to content

Commit

Permalink
[flink] Fix that schema validation fails when using 'scan.x' options …
Browse files Browse the repository at this point in the history
…to do time travel on schema changed tables (#2543)
  • Loading branch information
yuzelin authored Dec 21, 2023
1 parent 5bd47e4 commit aadfb6d
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private static int parseFraction(String v) {
// Format
// --------------------------------------------------------------------------------------------

private static String formatTimestamp(Timestamp ts, int precision) {
public static String formatTimestamp(Timestamp ts, int precision) {
LocalDateTime ldt = ts.toLocalDateTime();

String fraction = pad(9, ldt.getNano());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ public InnerStreamTableScan newStreamScan() {

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
checkImmutability(dynamicOptions);
return copyInternal(dynamicOptions, true);
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
checkImmutability(dynamicOptions);
return copyInternal(dynamicOptions, false);
}

@Override
public FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions) {
return copyInternal(dynamicOptions, true);
}

private void checkImmutability(Map<String, String> dynamicOptions) {
Map<String, String> options = tableSchema.options();
// check option is not immutable
dynamicOptions.forEach(
Expand All @@ -171,12 +187,9 @@ public FileStoreTable copy(Map<String, String> dynamicOptions) {
SchemaManager.checkAlterTableOption(k);
}
});

return internalCopyWithoutCheck(dynamicOptions);
}

@Override
public FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions) {
private FileStoreTable copyInternal(Map<String, String> dynamicOptions, boolean tryTimeTravel) {
Map<String, String> options = new HashMap<>(tableSchema.options());

// merge non-null dynamic options into schema.options
Expand All @@ -203,8 +216,10 @@ public FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOption
// validate schema with new options
SchemaValidation.validateTableSchema(newTableSchema);

// see if merged options contain time travel option
newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
if (tryTimeTravel) {
// see if merged options contain time travel option
newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
}

return copy(newTableSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ default Optional<String> comment() {
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);

/** Doesn't change table schema even when there exists time travel scan options. */
FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);

/** Sometimes we have to change some Immutable options to implement features. */
FileStoreTable internalCopyWithoutCheck(Map<String, String> dynamicOptions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,14 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {
newOptions.putAll(origin.getOptions());
newOptions.putAll(dynamicOptions);

// notice that the Paimon table schema must be the same with the Flink's
if (origin instanceof DataCatalogTable) {
table = ((DataCatalogTable) origin).table().copy(newOptions);
FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table();
table = fileStoreTable.copyWithoutTimeTravel(newOptions);
} else {
table = FileStoreTableFactory.create(createCatalogContext(context)).copy(newOptions);
table =
FileStoreTableFactory.create(createCatalogContext(context))
.copyWithoutTimeTravel(newOptions);
}

Schema schema = FlinkCatalog.fromCatalogTable(context.getCatalogTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -386,4 +387,33 @@ public void testIgnoreDelete() throws Exception {
Row.ofKind(RowKind.INSERT, 1, "B"), Row.ofKind(RowKind.INSERT, 2, "B"));
iterator.close();
}

@Test
public void testScanFromOldSchema() throws InterruptedException {
sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)");

sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");

Thread.sleep(1_000);
long timestamp = System.currentTimeMillis();

sql("ALTER TABLE select_old ADD f2 STRING");
sql("INSERT INTO select_old VALUES (3, 'c', 'C')");

// this way will initialize source with the latest schema
assertThat(
sql(
"SELECT * FROM select_old /*+ OPTIONS('scan.timestamp-millis'='%s') */",
timestamp))
// old schema doesn't have column f2
.containsExactlyInAnyOrder(Row.of(1, "a", null), Row.of(2, "b", null));

// this way will initialize source with time-travelled schema
assertThat(
sql(
"SELECT * FROM select_old FOR SYSTEM_TIME AS OF TIMESTAMP '%s'",
DateTimeUtils.formatTimestamp(
DateTimeUtils.toInternal(timestamp, 0), 0)))
.containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -521,4 +522,35 @@ public void testIgnoreDelete() {
sql("INSERT INTO ignore_delete VALUES (1, 'B')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B"));
}

@Test
public void testScanFromOldSchema() throws Exception {
sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)");

sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");

Thread.sleep(1_000);
long timestamp = System.currentTimeMillis();

sql("ALTER TABLE select_old ADD f2 STRING");
sql("INSERT INTO select_old VALUES (3, 'c', 'C')");

// this way will initialize source with the latest schema
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM select_old /*+ OPTIONS('scan.timestamp-millis'='%s') */",
timestamp));
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c", "C"));
iterator.close();

// this way will initialize source with time-travelled schema
iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM select_old FOR SYSTEM_TIME AS OF TIMESTAMP '%s'",
DateTimeUtils.formatTimestamp(
DateTimeUtils.toInternal(timestamp, 0), 0)));
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c"));
}
}

0 comments on commit aadfb6d

Please sign in to comment.