Skip to content

Commit

Permalink
Support MERGE on cloned table in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Jan 21, 2025
1 parent d949877 commit 841ac9a
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DeltaLakeMergeSink
private final boolean deletionVectorEnabled;
private final Map<String, DeletionVectorEntry> deletionVectors;
private final int randomPrefixLength;
private final Optional<String> shallowCloneSourceTableLocation;

@Nullable
private DeltaLakeCdfPageSink cdfPageSink;
Expand All @@ -155,7 +156,8 @@ public DeltaLakeMergeSink(
FileFormatDataSourceStats fileFormatDataSourceStats,
boolean deletionVectorEnabled,
Map<String, DeletionVectorEntry> deletionVectors,
int randomPrefixLength)
int randomPrefixLength,
Optional<String> shallowCloneSourceTableLocation)
{
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
this.session = requireNonNull(session, "session is null");
Expand Down Expand Up @@ -184,6 +186,8 @@ public DeltaLakeMergeSink(
this.deletionVectorEnabled = deletionVectorEnabled;
this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
this.randomPrefixLength = randomPrefixLength;
this.shallowCloneSourceTableLocation = requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null");

dataColumnsIndices = new int[tableColumnCount];
dataAndRowIdColumnsIndices = new int[tableColumnCount + 1];
for (int i = 0; i < tableColumnCount; i++) {
Expand Down Expand Up @@ -407,8 +411,8 @@ private Slice writeDeletionVector(
long rowCount)
{
String tablePath = rootTableLocation.toString();
String sourceRelativePath = relativePath(tablePath, sourcePath);
DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceRelativePath);
String sourceReferencePath = getReferencedPath(tablePath, sourcePath);
DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceReferencePath);

DeletionVectorEntry deletionVectorEntry;
try {
Expand All @@ -420,14 +424,14 @@ private Slice writeDeletionVector(

try {
DataFileInfo newFileInfo = new DataFileInfo(
sourceRelativePath,
sourceReferencePath,
length,
lastModified.toEpochMilli(),
DATA,
deletion.partitionValues,
readStatistics(parquetMetadata, dataColumns, rowCount),
Optional.of(deletionVectorEntry));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceReferencePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo));
return utf8Slice(mergeResultJsonCodec.toJson(result));
}
catch (Throwable e) {
Expand All @@ -445,9 +449,9 @@ private Slice writeDeletionVector(

private Slice onlySourceFile(String sourcePath, FileDeletion deletion)
{
String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath);
DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty());
String sourceReferencePath = getReferencedPath(rootTableLocation.toString(), sourcePath);
DeletionVectorEntry deletionVector = deletionVectors.get(sourceReferencePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.ofNullable(deletionVector), Optional.empty());
return utf8Slice(mergeResultJsonCodec.toJson(result));
}

Expand All @@ -457,24 +461,24 @@ private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)
try {
String tablePath = rootTableLocation.toString();
Location sourceLocation = Location.of(sourcePath);
String sourceRelativePath = relativePath(tablePath, sourcePath);
String sourceReferencePath = getReferencedPath(tablePath, sourcePath);

Location targetLocation = sourceLocation.sibling(session.getQueryId() + "_" + randomUUID());
String targetRelativePath = relativePath(tablePath, targetLocation.toString());
String targetReferencePath = getReferencedPath(tablePath, targetLocation.toString());
ParquetFileWriter fileWriter = createParquetFileWriter(targetLocation, dataColumns);

DeltaLakeWriter writer = new DeltaLakeWriter(
fileWriter,
rootTableLocation,
targetRelativePath,
targetReferencePath,
deletion.partitionValues(),
writerStats,
dataColumns,
DATA);

Optional<DataFileInfo> newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer);

DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.empty(), newFileInfo);
return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result)));
}
catch (IOException e) {
Expand Down Expand Up @@ -524,8 +528,8 @@ private ParquetFileWriter createParquetFileWriter(Location path, List<DeltaLakeC

private RoaringBitmapArray loadDeletionVector(Location path)
{
String relativePath = relativePath(rootTableLocation.toString(), path.toString());
DeletionVectorEntry deletionVector = deletionVectors.get(relativePath);
String referencedPath = getReferencedPath(rootTableLocation.toString(), path.toString());
DeletionVectorEntry deletionVector = deletionVectors.get(referencedPath);
if (deletionVector == null) {
return new RoaringBitmapArray();
}
Expand Down Expand Up @@ -677,6 +681,16 @@ private ReaderPageSource createParquetPageSource(Location path)
OptionalLong.of(fileSize));
}

private String getReferencedPath(String basePath, String sourcePath)
{
if (shallowCloneSourceTableLocation.isPresent() && sourcePath.startsWith(shallowCloneSourceTableLocation.get())) {
// It's the absolute path of shallow clone source table file
return sourcePath;
}

return relativePath(basePath, sourcePath);
}

@Override
public void abort()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
import io.trino.spi.connector.ConnectorTableHandle;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record DeltaLakeMergeTableHandle(
DeltaLakeTableHandle tableHandle,
DeltaLakeInsertTableHandle insertTableHandle,
Map<String, DeletionVectorEntry> deletionVectors)
Map<String, DeletionVectorEntry> deletionVectors,
Optional<String> shallowCloneSourceTableLocation)
implements ConnectorMergeTableHandle
{
public DeltaLakeMergeTableHandle
{
requireNonNull(tableHandle, "tableHandle is null");
requireNonNull(insertTableHandle, "insertTableHandle is null");
deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics;
import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException;
Expand Down Expand Up @@ -292,6 +295,7 @@
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
Expand Down Expand Up @@ -2064,6 +2068,18 @@ private void appendTableEntries(

private static void appendAddFileEntries(TransactionLogWriter transactionLogWriter, List<DataFileInfo> dataFileInfos, List<String> partitionColumnNames, List<String> originalColumnNames, boolean dataChange)
throws JsonProcessingException
{
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumnNames, originalColumnNames, dataChange, Optional.empty());
}

private static void appendAddFileEntries(
TransactionLogWriter transactionLogWriter,
List<DataFileInfo> dataFileInfos,
List<String> partitionColumnNames,
List<String> originalColumnNames,
boolean dataChange,
Optional<String> cloneSourceLocation)
throws JsonProcessingException
{
Map<String, String> toOriginalColumnNames = originalColumnNames.stream()
.collect(toImmutableMap(name -> name.toLowerCase(ENGLISH), identity()));
Expand All @@ -2081,9 +2097,13 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit

partitionValues = unmodifiableMap(partitionValues);

String path = cloneSourceLocation.isPresent() && info.path().startsWith(cloneSourceLocation.get())
? info.path()
: toUriFormat(info.path()); // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file

transactionLogWriter.appendAddFileEntry(
new AddFileEntry(
toUriFormat(info.path()), // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
path,
partitionValues,
info.size(),
info.creationTime(),
Expand Down Expand Up @@ -2448,7 +2468,52 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns);

Map<String, DeletionVectorEntry> deletionVectors = loadDeletionVectors(session, handle);
return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors);
return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors, shallowCloneSourceTableLocation(session, handle));
}

private Optional<String> shallowCloneSourceTableLocation(ConnectorSession session, DeltaLakeTableHandle handle)
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
String sourceTableName = "";
try {
TransactionLogTail transactionLogTail = loadNewTail(fileSystem, handle.getLocation(), Optional.empty(), Optional.of(0L), DataSize.ofBytes(0));
List<Transaction> transactions = transactionLogTail.getTransactions();
for (Transaction transaction : transactions) {
Optional<CommitInfoEntry> cloneCommit = transaction.transactionEntries().getEntries(fileSystem)
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.filter(commitInfoEntry -> commitInfoEntry.operation().equals("CLONE"))
.findFirst();
if (cloneCommit.isEmpty()) {
return Optional.empty();
}

// It's the cloned table
Map<String, String> operationParameters = cloneCommit.get().operationParameters();
if (!operationParameters.containsKey("source") || !operationParameters.containsKey("sourceVersion")) {
return Optional.empty();
}

sourceTableName = operationParameters.get("source");
checkArgument(sourceTableName != null && sourceTableName.contains(".") && sourceTableName.split("\\.").length == 3, "Unexpected source table in operation_parameters: %s", sourceTableName);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}

String[] names = sourceTableName.split("\\.");
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) getTableHandle(session, new SchemaTableName(names[1], names[2]), Optional.empty(), Optional.empty());
if (tableHandle == null) {
return Optional.empty();
}

String tableLocation = tableHandle.getLocation();
if (!tableLocation.endsWith("/")) {
tableLocation += "/";
}

return Optional.of(tableLocation);
}

private Map<String, DeletionVectorEntry> loadDeletionVectors(ConnectorSession session, DeltaLakeTableHandle handle)
Expand Down Expand Up @@ -2577,14 +2642,22 @@ private long commitMergeOperation(
appendCdcFilesInfos(transactionLogWriter, cdcFiles, partitionColumns);
}

Optional<String> cloneSourceTableLocation = mergeHandle.shallowCloneSourceTableLocation();
for (DeltaLakeMergeResult mergeResult : mergeResults) {
if (mergeResult.oldFile().isEmpty()) {
continue;
}
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));

String oldFile = mergeResult.oldFile().get();
if (cloneSourceTableLocation.isPresent() && oldFile.startsWith(cloneSourceTableLocation.get())) {
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(oldFile, createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));
}
else {
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(oldFile), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));
}
}

appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true);
appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true, cloneSourceTableLocation);

transactionLogWriter.flush();
return commitVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
fileFormatDataSourceStats,
isDeletionVectorEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()),
merge.deletionVectors(),
getRandomPrefixLength(tableHandle.metadataEntry()));
getRandomPrefixLength(tableHandle.metadataEntry()),
merge.shallowCloneSourceTableLocation());
}

private DeltaLakeCdfPageSink createCdfPageSink(
Expand Down
Loading

0 comments on commit 841ac9a

Please sign in to comment.