Skip to content

Commit

Permalink
[cdc] Unaware bucket cdc sink should not chain (#4203)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Sep 19, 2024
1 parent f1da943 commit c55887c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private DataStreamSink<?> buildForFixedBucket(DataStream<CdcRecord> parsed) {

private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord> parsed) {
FileStoreTable dataTable = (FileStoreTable) table;
return new CdcUnawareBucketSink(dataTable, parallelism).sinkFrom(parsed);
// rebalance it to make sure schema change work to avoid infinite loop
return new CdcUnawareBucketSink(dataTable, parallelism).sinkFrom(parsed.rebalance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,37 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.UnawareBucketSink;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

/** CDC Sink for unaware bucket table. */
public class CdcUnawareBucketSink extends UnawareBucketSink<CdcRecord> {
import javax.annotation.Nullable;

/**
* CDC Sink for unaware bucket table. It should not add compaction node, because the compaction may
* have old schema.
*/
public class CdcUnawareBucketSink extends FlinkWriteSink<CdcRecord> {

private final Integer parallelism;

public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
super(table, null, null, parallelism);
super(table, null);
this.parallelism = parallelism;
}

@Override
protected OneInputStreamOperator<CdcRecord, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcUnawareBucketWriteOperator(table, writeProvider, commitUser);
}

@Override
public DataStream<Committable> doWrite(
DataStream<CdcRecord> input, String initialCommitUser, @Nullable Integer parallelism) {
return super.doWrite(input, initialCommitUser, this.parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ private void buildForFixedBucket(FileStoreTable table, DataStream<CdcRecord> par
}

private void buildForUnawareBucket(FileStoreTable table, DataStream<CdcRecord> parsed) {
new CdcUnawareBucketSink(table, parallelism).sinkFrom(parsed);
// rebalance it to make sure schema change work to avoid infinite loop
new CdcUnawareBucketSink(table, parallelism).sinkFrom(parsed.rebalance());
}

private void buildDividedCdcSink() {
Expand Down

0 comments on commit c55887c

Please sign in to comment.