Skip to content

Commit

Permalink
[flink] refactor the clone action as we introduced external path (#4844)
Browse files Browse the repository at this point in the history
  • Loading branch information
neuyilan authored Feb 12, 2025
1 parent ee68fbb commit ecdf46f
Show file tree
Hide file tree
Showing 11 changed files with 1,219 additions and 483 deletions.
22 changes: 22 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,28 @@ public DataFileMeta copy(List<String> newExtraFiles) {
externalPath);
}

public DataFileMeta newExternalPath(String newExternalPath) {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols,
newExternalPath);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
return new DataFileMeta(
fileName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter()
suggestedFileSize);
}

private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {
public ManifestEntryWriter createManifestEntryWriter(Path manifestPath) {
return new ManifestEntryWriter(writerFactory, manifestPath, compression);
}

/** Writer for {@link ManifestEntry}. */
public class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {

private final SimpleStatsCollector partitionStatsCollector;
private final SimpleStatsConverter partitionStatsSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.CloneSourceBuilder;
import org.apache.paimon.flink.clone.CopyFileOperator;
import org.apache.paimon.flink.clone.PickFilesForCloneOperator;
import org.apache.paimon.flink.clone.CopyDataFileOperator;
import org.apache.paimon.flink.clone.CopyManifestFileOperator;
import org.apache.paimon.flink.clone.CopyMetaFilesForCloneOperator;
import org.apache.paimon.flink.clone.SnapshotHintChannelComputer;
import org.apache.paimon.flink.clone.SnapshotHintOperator;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
Expand Down Expand Up @@ -105,27 +106,50 @@ private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws Exception
targetTableName)
.build();

SingleOutputStreamOperator<CloneFileInfo> pickFilesForClone =
SingleOutputStreamOperator<Void> copyMetaFiles =
cloneSource
.forward()
.process(
new CopyMetaFilesForCloneOperator(
sourceCatalogConfig, targetCatalogConfig))
.name("Side Output")
.setParallelism(1);

DataStream<CloneFileInfo> indexFilesStream =
copyMetaFiles.getSideOutput(CopyMetaFilesForCloneOperator.INDEX_FILES_TAG);
DataStream<CloneFileInfo> dataManifestFilesStream =
copyMetaFiles.getSideOutput(CopyMetaFilesForCloneOperator.DATA_MANIFEST_FILES_TAG);

SingleOutputStreamOperator<CloneFileInfo> copyIndexFiles =
indexFilesStream
.transform(
"Pick Files",
"Copy Index Files",
TypeInformation.of(CloneFileInfo.class),
new PickFilesForCloneOperator(
new CopyDataFileOperator(sourceCatalogConfig, targetCatalogConfig))
.setParallelism(parallelism);

SingleOutputStreamOperator<CloneFileInfo> copyDataManifestFiles =
dataManifestFilesStream
.transform(
"Copy Data Manifest Files",
TypeInformation.of(CloneFileInfo.class),
new CopyManifestFileOperator(
sourceCatalogConfig, targetCatalogConfig))
.forceNonParallel();
.setParallelism(parallelism);

SingleOutputStreamOperator<CloneFileInfo> copyFiles =
pickFilesForClone
.rebalance()
SingleOutputStreamOperator<CloneFileInfo> copyDataFile =
copyDataManifestFiles
.transform(
"Copy Files",
"Copy Data Files",
TypeInformation.of(CloneFileInfo.class),
new CopyFileOperator(sourceCatalogConfig, targetCatalogConfig))
new CopyDataFileOperator(sourceCatalogConfig, targetCatalogConfig))
.setParallelism(parallelism);

DataStream<CloneFileInfo> combinedStream = copyDataFile.union(copyIndexFiles);

SingleOutputStreamOperator<CloneFileInfo> snapshotHintOperator =
FlinkStreamPartitioner.partition(
copyFiles, new SnapshotHintChannelComputer(), parallelism)
combinedStream, new SnapshotHintChannelComputer(), parallelism)
.transform(
"Recreate Snapshot Hint",
TypeInformation.of(CloneFileInfo.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

package org.apache.paimon.flink.clone;

import javax.annotation.Nullable;

/** The information of copy file. */
public class CloneFileInfo {

private final String sourceFilePath;
private final String filePathExcludeTableRoot;
@Nullable private final String sourceFilePath;
@Nullable private final String filePathExcludeTableRoot;
private final String sourceIdentifier;
private final String targetIdentifier;

public CloneFileInfo(
String sourceFilePath,
String filePathExcludeTableRoot,
@Nullable String sourceFilePath,
@Nullable String filePathExcludeTableRoot,
String sourceIdentifier,
String targetIdentifier) {
this.sourceFilePath = sourceFilePath;
Expand All @@ -37,10 +38,12 @@ public CloneFileInfo(
this.targetIdentifier = targetIdentifier;
}

@Nullable
public String getSourceFilePath() {
return sourceFilePath;
}

@Nullable
public String getFilePathExcludeTableRoot() {
return filePathExcludeTableRoot;
}
Expand All @@ -56,7 +59,7 @@ public String getTargetIdentifier() {
@Override
public String toString() {
return String.format(
"{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s }",
"{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s}",
sourceFilePath, filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
}
}
Loading

0 comments on commit ecdf46f

Please sign in to comment.