Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MERGE on cloned table in Delta Lake #24756

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DeltaLakeMergeSink
private final boolean deletionVectorEnabled;
private final Map<String, DeletionVectorEntry> deletionVectors;
private final int randomPrefixLength;
private final Optional<String> shallowCloneSourceTableLocation;

@Nullable
private DeltaLakeCdfPageSink cdfPageSink;
Expand All @@ -155,7 +156,8 @@ public DeltaLakeMergeSink(
FileFormatDataSourceStats fileFormatDataSourceStats,
boolean deletionVectorEnabled,
Map<String, DeletionVectorEntry> deletionVectors,
int randomPrefixLength)
int randomPrefixLength,
Optional<String> shallowCloneSourceTableLocation)
{
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
this.session = requireNonNull(session, "session is null");
Expand Down Expand Up @@ -184,6 +186,8 @@ public DeltaLakeMergeSink(
this.deletionVectorEnabled = deletionVectorEnabled;
this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
this.randomPrefixLength = randomPrefixLength;
this.shallowCloneSourceTableLocation = requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null");

dataColumnsIndices = new int[tableColumnCount];
dataAndRowIdColumnsIndices = new int[tableColumnCount + 1];
for (int i = 0; i < tableColumnCount; i++) {
Expand Down Expand Up @@ -407,8 +411,8 @@ private Slice writeDeletionVector(
long rowCount)
{
String tablePath = rootTableLocation.toString();
String sourceRelativePath = relativePath(tablePath, sourcePath);
DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceRelativePath);
String sourceReferencePath = getReferencedPath(tablePath, sourcePath);
DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceReferencePath);

DeletionVectorEntry deletionVectorEntry;
try {
Expand All @@ -420,14 +424,14 @@ private Slice writeDeletionVector(

try {
DataFileInfo newFileInfo = new DataFileInfo(
sourceRelativePath,
sourceReferencePath,
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
length,
lastModified.toEpochMilli(),
DATA,
deletion.partitionValues,
readStatistics(parquetMetadata, dataColumns, rowCount),
Optional.of(deletionVectorEntry));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceReferencePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo));
return utf8Slice(mergeResultJsonCodec.toJson(result));
}
catch (Throwable e) {
Expand All @@ -445,9 +449,9 @@ private Slice writeDeletionVector(

private Slice onlySourceFile(String sourcePath, FileDeletion deletion)
{
String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath);
DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty());
String sourceReferencePath = getReferencedPath(rootTableLocation.toString(), sourcePath);
DeletionVectorEntry deletionVector = deletionVectors.get(sourceReferencePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.ofNullable(deletionVector), Optional.empty());
return utf8Slice(mergeResultJsonCodec.toJson(result));
}

Expand All @@ -457,9 +461,18 @@ private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)
try {
String tablePath = rootTableLocation.toString();
Location sourceLocation = Location.of(sourcePath);
String sourceRelativePath = relativePath(tablePath, sourcePath);

Location targetLocation = sourceLocation.sibling(session.getQueryId() + "_" + randomUUID());
String sourceReferencePath = getReferencedPath(tablePath, sourcePath);

// get the relative path for the cloned table if `sourcePath` is a source table file location
Optional<String> sourceRelativePath = shallowCloneSourceTableLocation
.filter(sourcePath::startsWith)
.map(location -> relativePath(location, sourcePath));
// build the target location by appending the source relative path after current table location if
// it's a cloned table and the sourcePath is a source table file location
Location targetLocation = sourceRelativePath.map(rootTableLocation::appendPath)
.orElse(sourceLocation)
.sibling(session.getQueryId() + "_" + randomUUID());
// write under current table location, no matter the table is cloned or not
String targetRelativePath = relativePath(tablePath, targetLocation.toString());
ParquetFileWriter fileWriter = createParquetFileWriter(targetLocation, dataColumns);

Expand All @@ -474,7 +487,7 @@ private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)

Optional<DataFileInfo> newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer);

DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.empty(), newFileInfo);
return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result)));
}
catch (IOException e) {
Expand Down Expand Up @@ -524,8 +537,8 @@ private ParquetFileWriter createParquetFileWriter(Location path, List<DeltaLakeC

private RoaringBitmapArray loadDeletionVector(Location path)
{
String relativePath = relativePath(rootTableLocation.toString(), path.toString());
DeletionVectorEntry deletionVector = deletionVectors.get(relativePath);
String referencedPath = getReferencedPath(rootTableLocation.toString(), path.toString());
DeletionVectorEntry deletionVector = deletionVectors.get(referencedPath);
if (deletionVector == null) {
return new RoaringBitmapArray();
}
Expand Down Expand Up @@ -677,6 +690,16 @@ private ReaderPageSource createParquetPageSource(Location path)
OptionalLong.of(fileSize));
}

private String getReferencedPath(String basePath, String sourcePath)
{
if (shallowCloneSourceTableLocation.isPresent() && sourcePath.startsWith(shallowCloneSourceTableLocation.get())) {
// It's the absolute path of shallow clone source table file
return sourcePath;
}

return relativePath(basePath, sourcePath);
}

@Override
public void abort()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
import io.trino.spi.connector.ConnectorTableHandle;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record DeltaLakeMergeTableHandle(
DeltaLakeTableHandle tableHandle,
DeltaLakeInsertTableHandle insertTableHandle,
Map<String, DeletionVectorEntry> deletionVectors)
Map<String, DeletionVectorEntry> deletionVectors,
Optional<String> shallowCloneSourceTableLocation)
implements ConnectorMergeTableHandle
{
public DeltaLakeMergeTableHandle
{
requireNonNull(tableHandle, "tableHandle is null");
requireNonNull(insertTableHandle, "insertTableHandle is null");
deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException;
Expand Down Expand Up @@ -292,6 +295,7 @@
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
Expand Down Expand Up @@ -2064,6 +2068,18 @@ private void appendTableEntries(

private static void appendAddFileEntries(TransactionLogWriter transactionLogWriter, List<DataFileInfo> dataFileInfos, List<String> partitionColumnNames, List<String> originalColumnNames, boolean dataChange)
throws JsonProcessingException
{
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumnNames, originalColumnNames, dataChange, Optional.empty());
}

private static void appendAddFileEntries(
TransactionLogWriter transactionLogWriter,
List<DataFileInfo> dataFileInfos,
List<String> partitionColumnNames,
List<String> originalColumnNames,
boolean dataChange,
Optional<String> cloneSourceLocation)
throws JsonProcessingException
{
Map<String, String> toOriginalColumnNames = originalColumnNames.stream()
.collect(toImmutableMap(name -> name.toLowerCase(ENGLISH), identity()));
Expand All @@ -2081,9 +2097,13 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit

partitionValues = unmodifiableMap(partitionValues);

String path = cloneSourceLocation.isPresent() && info.path().startsWith(cloneSourceLocation.get())
? info.path()
: toUriFormat(info.path()); // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file

transactionLogWriter.appendAddFileEntry(
new AddFileEntry(
toUriFormat(info.path()), // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
path,
partitionValues,
info.size(),
info.creationTime(),
Expand Down Expand Up @@ -2448,7 +2468,55 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns);

Map<String, DeletionVectorEntry> deletionVectors = loadDeletionVectors(session, handle);
return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors);
return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors, shallowCloneSourceTableLocation(session, handle));
}

private Optional<String> shallowCloneSourceTableLocation(ConnectorSession session, DeltaLakeTableHandle handle)
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
String sourceTableName;
try {
// The clone commit is the first commit of the cloned table, so set the endVersion to 0
TransactionLogTail transactionLogTail = loadNewTail(fileSystem, handle.getLocation(), Optional.empty(), Optional.of(0L), DataSize.ofBytes(0));
List<Transaction> transactions = transactionLogTail.getTransactions();
if (transactions.isEmpty()) {
return Optional.empty();
}

Optional<CommitInfoEntry> cloneCommit = transactions.getFirst().transactionEntries().getEntries(fileSystem)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.filter(commitInfoEntry -> commitInfoEntry.operation().equals("CLONE"))
.findFirst();
if (cloneCommit.isEmpty()) {
return Optional.empty();
}

// It's the cloned table
Map<String, String> operationParameters = cloneCommit.get().operationParameters();
if (!operationParameters.containsKey("source")) {
return Optional.empty();
}

sourceTableName = operationParameters.get("source");
}
catch (IOException e) {
throw new RuntimeException(e);
}

checkArgument(sourceTableName != null && sourceTableName.contains(".") && sourceTableName.split("\\.").length == 3, "Unexpected source table in operation_parameters: %s", sourceTableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@chenjian2664 chenjian2664 Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question about the path in the remove entry in the _/delta_log/xxx.json file:
Since it is possible that the cloned table reference a source table file(absolute path in s3), so here means I have to modify the path that to make it work befor upload the files?
i, e: the path in removeEntry is s3://aaa/bbb/ccc.parquet, then for the test, I may create two directory(source, cloned), so here the path may change to ../source/aaa/bbb/ccc.parquet in cloned table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the test testMergeOnClonedTable in TestDeltaLakeBasic, PTAL
The absolute path of file in the source table that referenced by cloned table is changed to relative path with logic mentioned above.

String[] names = sourceTableName.split("\\.");
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) getTableHandle(session, new SchemaTableName(names[1], names[2]), Optional.empty(), Optional.empty());
if (tableHandle == null) {
return Optional.empty();
}

String tableLocation = tableHandle.getLocation();
if (!tableLocation.endsWith("/")) {
tableLocation += "/";
}

return Optional.of(tableLocation);
}

private Map<String, DeletionVectorEntry> loadDeletionVectors(ConnectorSession session, DeltaLakeTableHandle handle)
Expand Down Expand Up @@ -2577,14 +2645,22 @@ private long commitMergeOperation(
appendCdcFilesInfos(transactionLogWriter, cdcFiles, partitionColumns);
}

Optional<String> cloneSourceTableLocation = mergeHandle.shallowCloneSourceTableLocation();
for (DeltaLakeMergeResult mergeResult : mergeResults) {
if (mergeResult.oldFile().isEmpty()) {
continue;
}
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));

String oldFile = mergeResult.oldFile().get();
if (cloneSourceTableLocation.isPresent() && oldFile.startsWith(cloneSourceTableLocation.get())) {
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(oldFile, createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));
}
else {
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(oldFile), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));
}
}

appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true);
appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true, cloneSourceTableLocation);

transactionLogWriter.flush();
return commitVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
fileFormatDataSourceStats,
isDeletionVectorEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()),
merge.deletionVectors(),
getRandomPrefixLength(tableHandle.metadataEntry()));
getRandomPrefixLength(tableHandle.metadataEntry()),
merge.shallowCloneSourceTableLocation());
}

private DeltaLakeCdfPageSink createCdfPageSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public class TestDeltaLakeBasic
new ResourceTable("variant", "databricks153/variant"),
new ResourceTable("type_widening", "databricks153/type_widening"),
new ResourceTable("type_widening_partition", "databricks153/type_widening_partition"),
new ResourceTable("type_widening_nested", "databricks153/type_widening_nested"));
new ResourceTable("type_widening_nested", "databricks153/type_widening_nested"),
new ResourceTable("clone_merge_source", "deltalake/clone_merge/clone_merge_source"),
new ResourceTable("clone_merge_cloned", "deltalake/clone_merge/clone_merge_cloned"));

// The col-{uuid} pattern for delta.columnMapping.physicalName
private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
Expand Down Expand Up @@ -2294,6 +2296,16 @@ public void testUnsupportedWriterVersion()
"Cannot execute vacuum procedure with 8 writer version");
}

/**
* @see deltalake.clone_merge
*/
@Test
public void testMergeOnClonedTable()
{
assertQuery("SELECT * FROM clone_merge_source", "VALUES (1, 'A', TIMESTAMP '2024-01-01'), (2, 'B', TIMESTAMP '2024-01-01'), (3, 'C', TIMESTAMP '2024-02-02'), (4, 'D', TIMESTAMP '2024-02-02')");
assertQuery("SELECT * FROM clone_merge_cloned", "VALUES (1, 'A', TIMESTAMP '2024-01-01'), (2, 'update1', TIMESTAMP '2024-01-01'), (3, 'C', TIMESTAMP '2024-02-02'), (4, 'update1', TIMESTAMP '2024-02-02')");
}

private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation)
throws IOException
{
Expand Down
Loading
Loading