diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 66c8134caef3..8db57ea3af86 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -131,6 +131,7 @@ public class DeltaLakeMergeSink private final boolean deletionVectorEnabled; private final Map deletionVectors; private final int randomPrefixLength; + private final Optional shallowCloneSourceTableLocation; @Nullable private DeltaLakeCdfPageSink cdfPageSink; @@ -155,7 +156,8 @@ public DeltaLakeMergeSink( FileFormatDataSourceStats fileFormatDataSourceStats, boolean deletionVectorEnabled, Map deletionVectors, - int randomPrefixLength) + int randomPrefixLength, + Optional shallowCloneSourceTableLocation) { this.typeOperators = requireNonNull(typeOperators, "typeOperators is null"); this.session = requireNonNull(session, "session is null"); @@ -184,6 +186,8 @@ public DeltaLakeMergeSink( this.deletionVectorEnabled = deletionVectorEnabled; this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null")); this.randomPrefixLength = randomPrefixLength; + this.shallowCloneSourceTableLocation = requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null"); + dataColumnsIndices = new int[tableColumnCount]; dataAndRowIdColumnsIndices = new int[tableColumnCount + 1]; for (int i = 0; i < tableColumnCount; i++) { @@ -407,8 +411,8 @@ private Slice writeDeletionVector( long rowCount) { String tablePath = rootTableLocation.toString(); - String sourceRelativePath = relativePath(tablePath, sourcePath); - DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceRelativePath); + String sourceReferencePath = getReferencedPath(tablePath, sourcePath); + DeletionVectorEntry oldDeletionVector = deletionVectors.get(sourceReferencePath); DeletionVectorEntry deletionVectorEntry; try { @@ -420,14 +424,14 @@ private Slice writeDeletionVector( try { DataFileInfo newFileInfo = new DataFileInfo( - sourceRelativePath, + sourceReferencePath, length, lastModified.toEpochMilli(), DATA, deletion.partitionValues, readStatistics(parquetMetadata, dataColumns, rowCount), Optional.of(deletionVectorEntry)); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo)); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceReferencePath), Optional.ofNullable(oldDeletionVector), Optional.of(newFileInfo)); return utf8Slice(mergeResultJsonCodec.toJson(result)); } catch (Throwable e) { @@ -445,9 +449,9 @@ private Slice writeDeletionVector( private Slice onlySourceFile(String sourcePath, FileDeletion deletion) { - String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath); - DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty()); + String sourceReferencePath = getReferencedPath(rootTableLocation.toString(), sourcePath); + DeletionVectorEntry deletionVector = deletionVectors.get(sourceReferencePath); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.ofNullable(deletionVector), Optional.empty()); return utf8Slice(mergeResultJsonCodec.toJson(result)); } @@ -457,9 +461,18 @@ private List rewriteFile(String sourcePath, FileDeletion deletion) try { String tablePath = rootTableLocation.toString(); Location sourceLocation = Location.of(sourcePath); - String sourceRelativePath = relativePath(tablePath, sourcePath); - - Location targetLocation = sourceLocation.sibling(session.getQueryId() + "_" + randomUUID()); + String sourceReferencePath = getReferencedPath(tablePath, sourcePath); + + // get the relative path for the cloned table if `sourcePath` is a source table file location + Optional sourceRelativePath = shallowCloneSourceTableLocation + .filter(sourcePath::startsWith) + .map(location -> relativePath(location, sourcePath)); + // build the target location by appending the source relative path after current table location if + // it's a cloned table and the sourcePath is a source table file location + Location targetLocation = sourceRelativePath.map(rootTableLocation::appendPath) + .orElse(sourceLocation) + .sibling(session.getQueryId() + "_" + randomUUID()); + // write under current table location, no matter the table is cloned or not String targetRelativePath = relativePath(tablePath, targetLocation.toString()); ParquetFileWriter fileWriter = createParquetFileWriter(targetLocation, dataColumns); @@ -474,7 +487,7 @@ private List rewriteFile(String sourcePath, FileDeletion deletion) Optional newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer); - DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo); + DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceReferencePath), Optional.empty(), newFileInfo); return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result))); } catch (IOException e) { @@ -524,8 +537,8 @@ private ParquetFileWriter createParquetFileWriter(Location path, List deletionVectors) + Map deletionVectors, + Optional shallowCloneSourceTableLocation) implements ConnectorMergeTableHandle { public DeltaLakeMergeTableHandle @@ -33,6 +35,7 @@ public record DeltaLakeMergeTableHandle( requireNonNull(tableHandle, "tableHandle is null"); requireNonNull(insertTableHandle, "insertTableHandle is null"); deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null")); + requireNonNull(shallowCloneSourceTableLocation, "shallowCloneSourceTableLocation is null"); } @Override diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index ab729bea2793..24c61c9372db 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -69,14 +69,17 @@ import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; +import io.trino.plugin.deltalake.transactionlog.Transaction; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries; +import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException; @@ -292,6 +295,7 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; +import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; @@ -2064,6 +2068,18 @@ private void appendTableEntries( private static void appendAddFileEntries(TransactionLogWriter transactionLogWriter, List dataFileInfos, List partitionColumnNames, List originalColumnNames, boolean dataChange) throws JsonProcessingException + { + appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumnNames, originalColumnNames, dataChange, Optional.empty()); + } + + private static void appendAddFileEntries( + TransactionLogWriter transactionLogWriter, + List dataFileInfos, + List partitionColumnNames, + List originalColumnNames, + boolean dataChange, + Optional cloneSourceLocation) + throws JsonProcessingException { Map toOriginalColumnNames = originalColumnNames.stream() .collect(toImmutableMap(name -> name.toLowerCase(ENGLISH), identity())); @@ -2081,9 +2097,13 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit partitionValues = unmodifiableMap(partitionValues); + String path = cloneSourceLocation.isPresent() && info.path().startsWith(cloneSourceLocation.get()) + ? info.path() + : toUriFormat(info.path()); // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + transactionLogWriter.appendAddFileEntry( new AddFileEntry( - toUriFormat(info.path()), // Paths are RFC 2396 URI encoded https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file + path, partitionValues, info.size(), info.creationTime(), @@ -2448,7 +2468,55 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns); Map deletionVectors = loadDeletionVectors(session, handle); - return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors); + return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors, shallowCloneSourceTableLocation(session, handle)); + } + + private Optional shallowCloneSourceTableLocation(ConnectorSession session, DeltaLakeTableHandle handle) + { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + String sourceTableName; + try { + // The clone commit is the first commit of the cloned table, so set the endVersion to 0 + TransactionLogTail transactionLogTail = loadNewTail(fileSystem, handle.getLocation(), Optional.empty(), Optional.of(0L), DataSize.ofBytes(0)); + List transactions = transactionLogTail.getTransactions(); + if (transactions.isEmpty()) { + return Optional.empty(); + } + + Optional cloneCommit = transactions.getFirst().transactionEntries().getEntries(fileSystem) + .map(DeltaLakeTransactionLogEntry::getCommitInfo) + .filter(Objects::nonNull) + .filter(commitInfoEntry -> commitInfoEntry.operation().equals("CLONE")) + .findFirst(); + if (cloneCommit.isEmpty()) { + return Optional.empty(); + } + + // It's the cloned table + Map operationParameters = cloneCommit.get().operationParameters(); + if (!operationParameters.containsKey("source")) { + return Optional.empty(); + } + + sourceTableName = operationParameters.get("source"); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + checkArgument(sourceTableName != null && sourceTableName.contains(".") && sourceTableName.split("\\.").length == 3, "Unexpected source table in operation_parameters: %s", sourceTableName); + String[] names = sourceTableName.split("\\."); + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) getTableHandle(session, new SchemaTableName(names[1], names[2]), Optional.empty(), Optional.empty()); + if (tableHandle == null) { + return Optional.empty(); + } + + String tableLocation = tableHandle.getLocation(); + if (!tableLocation.endsWith("/")) { + tableLocation += "/"; + } + + return Optional.of(tableLocation); } private Map loadDeletionVectors(ConnectorSession session, DeltaLakeTableHandle handle) @@ -2577,14 +2645,22 @@ private long commitMergeOperation( appendCdcFilesInfos(transactionLogWriter, cdcFiles, partitionColumns); } + Optional cloneSourceTableLocation = mergeHandle.shallowCloneSourceTableLocation(); for (DeltaLakeMergeResult mergeResult : mergeResults) { if (mergeResult.oldFile().isEmpty()) { continue; } - transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); + + String oldFile = mergeResult.oldFile().get(); + if (cloneSourceTableLocation.isPresent() && oldFile.startsWith(cloneSourceTableLocation.get())) { + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(oldFile, createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); + } + else { + transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(oldFile), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector())); + } } - appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true); + appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true, cloneSourceTableLocation); transactionLogWriter.flush(); return commitVersion; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java index f84ee23de32f..42ee2c08f89a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java @@ -201,7 +201,8 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction fileFormatDataSourceStats, isDeletionVectorEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()), merge.deletionVectors(), - getRandomPrefixLength(tableHandle.metadataEntry())); + getRandomPrefixLength(tableHandle.metadataEntry()), + merge.shallowCloneSourceTableLocation()); } private DeltaLakeCdfPageSink createCdfPageSink( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 0f0b29f84340..15473981e9bf 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -135,7 +135,9 @@ public class TestDeltaLakeBasic new ResourceTable("variant", "databricks153/variant"), new ResourceTable("type_widening", "databricks153/type_widening"), new ResourceTable("type_widening_partition", "databricks153/type_widening_partition"), - new ResourceTable("type_widening_nested", "databricks153/type_widening_nested")); + new ResourceTable("type_widening_nested", "databricks153/type_widening_nested"), + new ResourceTable("clone_merge_source", "deltalake/clone_merge/clone_merge_source"), + new ResourceTable("clone_merge_cloned", "deltalake/clone_merge/clone_merge_cloned")); // The col-{uuid} pattern for delta.columnMapping.physicalName private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"); @@ -2294,6 +2296,16 @@ public void testUnsupportedWriterVersion() "Cannot execute vacuum procedure with 8 writer version"); } + /** + * @see deltalake.clone_merge + */ + @Test + public void testMergeOnClonedTable() + { + assertQuery("SELECT * FROM clone_merge_source", "VALUES (1, 'A', TIMESTAMP '2024-01-01'), (2, 'B', TIMESTAMP '2024-01-01'), (3, 'C', TIMESTAMP '2024-02-02'), (4, 'D', TIMESTAMP '2024-02-02')"); + assertQuery("SELECT * FROM clone_merge_cloned", "VALUES (1, 'A', TIMESTAMP '2024-01-01'), (2, 'update1', TIMESTAMP '2024-01-01'), (3, 'C', TIMESTAMP '2024-02-02'), (4, 'update1', TIMESTAMP '2024-02-02')"); + } + private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation) throws IOException { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 98230e78aa5d..08c613b385aa 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -631,12 +631,16 @@ public void testDeleteWithNonPartitionFilter() ImmutableMultiset.builder() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) + // One more newStream for check the table if is a cloned table + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "OutputFile.createOrOverwrite")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) + // One more length for check the table if is a cloned table + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/README.md b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/README.md new file mode 100644 index 000000000000..e0c5835f7701 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/README.md @@ -0,0 +1,14 @@ +Data generated using OSS Delta Lake 3.3.0 and Trino 469: + +```sql +-- Spark> +CREATE TABLE clone_merge_cloned SHALLOW CLONE clone_merge_source; + +-- Trino> +UPDATE clone_merge_cloned set v = 'update1' where id in (2,4); +``` + +Note the `path` in add/remove entry in `_delta_log/00000000000000000000.json` and `_delta_log/00000000000000000001.json` is manually modified, +that the prefix is changed to `../clone_merge_source` for test purpose, i.e, the original `path` in add entry value like +`s3://bucket/warehouse/clone_merge_source/clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet` +is changed to `../clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet`. diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..d32d764df369 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/_delta_log/00000000000000000000.json @@ -0,0 +1,7 @@ +{"commitInfo":{"timestamp":1737867567966,"operation":"CLONE","operationParameters":{"source":"spark_catalog.tiny.clone_merge_source","sourceVersion":1},"readVersion":-1,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"removedFilesSize":"0","numRemovedFiles":"0","sourceTableSize":"2628","numCopiedFiles":"0","copiedFilesSize":"0","sourceNumOfFiles":"4"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"fd54aed4-0055-4ecd-aa9e-7795b7fe839f"}} +{"metaData":{"id":"0da262b5-58dd-4393-a492-05eb485ec63f","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part"],"configuration":{},"createdTime":1737867528867}} +{"add":{"path":"../clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"v\":\"B\"},\"maxValues\":{\"id\":2,\"v\":\"B\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"add":{"path":"../clone_merge_source/part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"v\":\"D\"},\"maxValues\":{\"id\":4,\"v\":\"D\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"add":{"path":"../clone_merge_source/part=2024-02-02/part-00002-55f327ce-5cba-41d7-ba87-9ec9b32afa2b.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"v\":\"C\"},\"maxValues\":{\"id\":3,\"v\":\"C\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"add":{"path":"../clone_merge_source/part=2024-01-01/part-00000-8216f105-6126-4fc9-be2a-00a94ea0331c.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"v\":\"A\"},\"maxValues\":{\"id\":1,\"v\":\"A\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..040e1ebca69f --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/_delta_log/00000000000000000001.json @@ -0,0 +1,5 @@ +{"commitInfo":{"version":1,"timestamp":1737867604174,"operation":"MERGE","operationParameters":{"queryId":"20250126_045101_00007_jk3a6"},"clusterId":"trino-testversion-6af4d70d-56bb-4ce7-af2d-8f5d248b6470","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{}}} +{"remove":{"path":"../clone_merge_source/part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"deletionTimestamp":1737867604176,"dataChange":true}} +{"remove":{"path":"../clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"deletionTimestamp":1737867604176,"dataChange":true}} +{"add":{"path":"part=2024-01-01/20250126_045101_00007_jk3a6_6bec6efd-ae89-4774-a487-24e5d5d8a63f","partitionValues":{"part":"2024-01-01"},"size":345,"modificationTime":1737867603823,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"v\":\"update1\"},\"maxValues\":{\"id\":2,\"v\":\"update1\"},\"nullCount\":{\"id\":0,\"v\":0}}","tags":{}}} +{"add":{"path":"part=2024-02-02/20250126_045101_00007_jk3a6_d60a8bbf-7228-402e-aac8-d99922df12bc","partitionValues":{"part":"2024-02-02"},"size":345,"modificationTime":1737867603861,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"v\":\"update1\"},\"maxValues\":{\"id\":4,\"v\":\"update1\"},\"nullCount\":{\"id\":0,\"v\":0}}","tags":{}}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/part=2024-01-01/20250126_045101_00007_jk3a6_6bec6efd-ae89-4774-a487-24e5d5d8a63f b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/part=2024-01-01/20250126_045101_00007_jk3a6_6bec6efd-ae89-4774-a487-24e5d5d8a63f new file mode 100644 index 000000000000..783bf8e969d5 Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/part=2024-01-01/20250126_045101_00007_jk3a6_6bec6efd-ae89-4774-a487-24e5d5d8a63f differ diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/part=2024-02-02/20250126_045101_00007_jk3a6_d60a8bbf-7228-402e-aac8-d99922df12bc b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/part=2024-02-02/20250126_045101_00007_jk3a6_d60a8bbf-7228-402e-aac8-d99922df12bc new file mode 100644 index 000000000000..a84e29d48b47 Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_cloned/part=2024-02-02/20250126_045101_00007_jk3a6_d60a8bbf-7228-402e-aac8-d99922df12bc differ diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/README.md b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/README.md new file mode 100644 index 000000000000..8a755195cfe5 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/README.md @@ -0,0 +1,16 @@ +Data generated using OSS Delta Lake 3.3.0: + +```sql +CREATE TABLE clone_merge_source +(id int, v string, part date) +USING DELTA +LOCATION ? +PARTITIONED BY (part); + +INSERT INTO clone_merge_source +VALUES + (1, 'A', TIMESTAMP '2024-01-01'), + (2, 'B', TIMESTAMP '2024-01-01'), + (3, 'C', TIMESTAMP '2024-02-02'), + (4, 'D', TIMESTAMP '2024-02-02'); +``` diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000000.crc b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000000.crc new file mode 100644 index 000000000000..e4d562f46a08 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"txnId":"ec2a940d-7ffe-45f3-824c-b50c61589e40","tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"d336c764-d0d0-47e6-b2a5-cbcf55b822cb","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part"],"configuration":{},"createdTime":1737867528867},"protocol":{"minReaderVersion":1,"minWriterVersion":2},"allFiles":[]} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..3b2427c6c466 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1737867528972,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[\"part\"]","clusterBy":"[]","description":null,"isManaged":"true","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"ec2a940d-7ffe-45f3-824c-b50c61589e40"}} +{"metaData":{"id":"d336c764-d0d0-47e6-b2a5-cbcf55b822cb","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part"],"configuration":{},"createdTime":1737867528867}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000001.crc b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000001.crc new file mode 100644 index 000000000000..b4bc2af05f94 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000001.crc @@ -0,0 +1 @@ +{"txnId":"63e86e4d-3ce1-4016-b3a3-11d7b9a2a431","tableSizeBytes":2628,"numFiles":4,"numMetadata":1,"numProtocol":1,"setTransactions":[],"domainMetadata":[],"metadata":{"id":"d336c764-d0d0-47e6-b2a5-cbcf55b822cb","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part"],"configuration":{},"createdTime":1737867528867},"protocol":{"minReaderVersion":1,"minWriterVersion":2},"allFiles":[{"path":"part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"size":657,"modificationTime":1737867545000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"v\":\"B\"},\"maxValues\":{\"id\":2,\"v\":\"B\"},\"nullCount\":{\"id\":0,\"v\":0}}"},{"path":"part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"size":657,"modificationTime":1737867545000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"v\":\"D\"},\"maxValues\":{\"id\":4,\"v\":\"D\"},\"nullCount\":{\"id\":0,\"v\":0}}"},{"path":"part=2024-01-01/part-00000-8216f105-6126-4fc9-be2a-00a94ea0331c.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"size":657,"modificationTime":1737867545000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"v\":\"A\"},\"maxValues\":{\"id\":1,\"v\":\"A\"},\"nullCount\":{\"id\":0,\"v\":0}}"},{"path":"part=2024-02-02/part-00002-55f327ce-5cba-41d7-ba87-9ec9b32afa2b.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"size":657,"modificationTime":1737867545000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"v\":\"C\"},\"maxValues\":{\"id\":3,\"v\":\"C\"},\"nullCount\":{\"id\":0,\"v\":0}}"}]} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..ad3c3bad5294 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/_delta_log/00000000000000000001.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1737867545214,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"4","numOutputRows":"4","numOutputBytes":"2628"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0","txnId":"63e86e4d-3ce1-4016-b3a3-11d7b9a2a431"}} +{"add":{"path":"part=2024-01-01/part-00000-8216f105-6126-4fc9-be2a-00a94ea0331c.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"v\":\"A\"},\"maxValues\":{\"id\":1,\"v\":\"A\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"add":{"path":"part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet","partitionValues":{"part":"2024-01-01"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"v\":\"B\"},\"maxValues\":{\"id\":2,\"v\":\"B\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"add":{"path":"part=2024-02-02/part-00002-55f327ce-5cba-41d7-ba87-9ec9b32afa2b.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"v\":\"C\"},\"maxValues\":{\"id\":3,\"v\":\"C\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} +{"add":{"path":"part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet","partitionValues":{"part":"2024-02-02"},"size":657,"modificationTime":1737867545000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"v\":\"D\"},\"maxValues\":{\"id\":4,\"v\":\"D\"},\"nullCount\":{\"id\":0,\"v\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-01-01/part-00000-8216f105-6126-4fc9-be2a-00a94ea0331c.c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-01-01/part-00000-8216f105-6126-4fc9-be2a-00a94ea0331c.c000.snappy.parquet new file mode 100644 index 000000000000..ee7944a61754 Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-01-01/part-00000-8216f105-6126-4fc9-be2a-00a94ea0331c.c000.snappy.parquet differ diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet new file mode 100644 index 000000000000..3378143b8a18 Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-01-01/part-00001-18fc4038-1ed9-44db-b1e8-893b2d3f02da.c000.snappy.parquet differ diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-02-02/part-00002-55f327ce-5cba-41d7-ba87-9ec9b32afa2b.c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-02-02/part-00002-55f327ce-5cba-41d7-ba87-9ec9b32afa2b.c000.snappy.parquet new file mode 100644 index 000000000000..e53d44a73a73 Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-02-02/part-00002-55f327ce-5cba-41d7-ba87-9ec9b32afa2b.c000.snappy.parquet differ diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet new file mode 100644 index 000000000000..7a549aa91ad2 Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/deltalake/clone_merge/clone_merge_source/part=2024-02-02/part-00003-d752c44a-09fe-4576-9b1d-7dfa9ca215d4.c000.snappy.parquet differ diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java index d773c66a6547..817d6cfc4a78 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java @@ -22,6 +22,7 @@ import io.trino.testng.services.Flaky; import org.testng.annotations.Test; +import java.sql.Date; import java.util.List; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -250,6 +251,99 @@ public void testReadFromSchemaChangedDeepCloneTable() testReadSchemaChangedCloneTable("DEEP", false); } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testShallowCloneTableMerge() + { + testShallowCloneTableMerge(false); + testShallowCloneTableMerge(true); + } + + private void testShallowCloneTableMerge(boolean partitioned) + { + String baseTable = "test_dl_base_table_" + randomNameSuffix(); + String clonedTable = "test_dl_clone_tableV1_" + randomNameSuffix(); + String directoryName = "databricks-merge-clone-compatibility-test-"; + try { + onDelta().executeQuery("CREATE TABLE default." + baseTable + + " (id INT, v STRING, part DATE) USING delta " + + (partitioned ? "PARTITIONED BY (part) " : "") + + "LOCATION 's3://" + bucketName + "/" + directoryName + baseTable + "'"); + + onDelta().executeQuery("INSERT INTO default." + baseTable + " " + + "VALUES (1, 'A', TIMESTAMP '2024-01-01'), " + + "(2, 'B', TIMESTAMP '2024-01-01'), " + + "(3, 'C', TIMESTAMP '2024-02-02'), " + + "(4, 'D', TIMESTAMP '2024-02-02')"); + + onDelta().executeQuery("CREATE TABLE default." + clonedTable + + " SHALLOW CLONE default." + baseTable + + " LOCATION 's3://" + bucketName + "/" + directoryName + clonedTable + "'"); + + List expectedRows = ImmutableList.of( + row(1, "A", Date.valueOf("2024-01-01")), + row(2, "B", Date.valueOf("2024-01-01")), + row(3, "C", Date.valueOf("2024-02-02")), + row(4, "D", Date.valueOf("2024-02-02"))); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + + // update on cloned table + onTrino().executeQuery("UPDATE delta.default." + clonedTable + " SET v = 'xxx' WHERE id in (1,3)"); + // source table not change + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + List expectedRowsAfterUpdate = ImmutableList.of( + row(1, "xxx", Date.valueOf("2024-01-01")), + row(2, "B", Date.valueOf("2024-01-01")), + row(3, "xxx", Date.valueOf("2024-02-02")), + row(4, "D", Date.valueOf("2024-02-02"))); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable)) + .containsOnly(expectedRowsAfterUpdate); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onDelta().executeQuery("SELECT * FROM default." + clonedTable)) + .containsOnly(expectedRowsAfterUpdate); + + // merge on cloned table + String mergeSql = format(""" + MERGE INTO %s t + USING (VALUES (3, 'yyy', TIMESTAMP '2025-01-01'), (4, 'zzz', TIMESTAMP '2025-02-02'), (5, 'kkk', TIMESTAMP '2025-03-03')) AS s(id, v, part) + ON (t.id = s.id) + WHEN MATCHED AND s.v = 'zzz' THEN DELETE + WHEN MATCHED THEN UPDATE SET v = s.v + WHEN NOT MATCHED THEN INSERT (id, v, part) VALUES(s.id, s.v, s.part) + """, "delta.default." + clonedTable); + onTrino().executeQuery(mergeSql); + + List expectedRowsAfterMerge = ImmutableList.of( + row(1, "xxx", Date.valueOf("2024-01-01")), + row(2, "B", Date.valueOf("2024-01-01")), + row(3, "yyy", Date.valueOf("2024-02-02")), + row(5, "kkk", Date.valueOf("2025-03-03"))); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable)) + .containsOnly(expectedRowsAfterMerge); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)) + .containsOnly(expectedRows); + assertThat(onDelta().executeQuery("SELECT * FROM default." + clonedTable)) + .containsOnly(expectedRowsAfterMerge); + + // access base table after drop cloned table + onTrino().executeQuery("DROP TABLE delta.default." + clonedTable); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)) + .containsOnly(expectedRows); + } + finally { + onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + baseTable); + onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + clonedTable); + } + } + private void testReadSchemaChangedCloneTable(String cloneType, boolean partitioned) { String directoryName = "/databricks-compatibility-test-";