-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Remove state from append only unaware bucket writer #4219
Merged
Merged
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ | |
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.function.Function; | ||
|
||
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; | ||
|
||
|
@@ -64,7 +65,6 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { | |
|
||
private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class); | ||
|
||
private final String commitUser; | ||
protected final SnapshotManager snapshotManager; | ||
private final FileStoreScan scan; | ||
private final int writerNumberMax; | ||
|
@@ -85,14 +85,12 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { | |
private boolean isInsertOnly; | ||
|
||
protected AbstractFileStoreWrite( | ||
String commitUser, | ||
SnapshotManager snapshotManager, | ||
FileStoreScan scan, | ||
@Nullable IndexMaintainer.Factory<T> indexFactory, | ||
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, | ||
String tableName, | ||
int writerNumberMax) { | ||
this.commitUser = commitUser; | ||
this.snapshotManager = snapshotManager; | ||
this.scan = scan; | ||
this.indexFactory = indexFactory; | ||
|
@@ -169,28 +167,18 @@ public void notifyNewFiles( | |
@Override | ||
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) | ||
throws Exception { | ||
long latestCommittedIdentifier; | ||
Function<WriterContainer<T>, Boolean> writerCleanChecker; | ||
if (writers.values().stream() | ||
.map(Map::values) | ||
.flatMap(Collection::stream) | ||
.mapToLong(w -> w.lastModifiedCommitIdentifier) | ||
.max() | ||
.orElse(Long.MIN_VALUE) | ||
== Long.MIN_VALUE) { | ||
// Optimization for the first commit. | ||
// | ||
// If this is the first commit, no writer has previous modified commit, so the value of | ||
// `latestCommittedIdentifier` does not matter. | ||
// | ||
// Without this optimization, we may need to scan through all snapshots only to find | ||
// that there is no previous snapshot by this user, which is very inefficient. | ||
latestCommittedIdentifier = Long.MIN_VALUE; | ||
// If this is the first commit, no writer should be cleaned. | ||
writerCleanChecker = writerContainer -> false; | ||
} else { | ||
latestCommittedIdentifier = | ||
snapshotManager | ||
.latestSnapshotOfUser(commitUser) | ||
.map(Snapshot::commitIdentifier) | ||
.orElse(Long.MIN_VALUE); | ||
writerCleanChecker = createWriterCleanChecker(); | ||
} | ||
|
||
List<CommitMessage> result = new ArrayList<>(); | ||
|
@@ -226,14 +214,7 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden | |
result.add(committable); | ||
|
||
if (committable.isEmpty()) { | ||
// Condition 1: There is no more record waiting to be committed. Note that the | ||
// condition is < (instead of <=), because each commit identifier may have | ||
// multiple snapshots. We must make sure all snapshots of this identifier are | ||
// committed. | ||
// Condition 2: No compaction is in progress. That is, no more changelog will be | ||
// produced. | ||
if (writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier | ||
&& !writerContainer.writer.isCompacting()) { | ||
if (writerCleanChecker.apply(writerContainer)) { | ||
// Clear writer if no update, and if its latest modification has committed. | ||
// | ||
// We need a mechanism to clear writers, otherwise there will be more and | ||
|
@@ -242,12 +223,10 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden | |
LOG.debug( | ||
"Closing writer for partition {}, bucket {}. " | ||
+ "Writer's last modified identifier is {}, " | ||
+ "while latest committed identifier is {}, " | ||
+ "current commit identifier is {}.", | ||
+ "while current commit identifier is {}.", | ||
partition, | ||
bucket, | ||
writerContainer.lastModifiedCommitIdentifier, | ||
latestCommittedIdentifier, | ||
commitIdentifier); | ||
} | ||
writerContainer.writer.close(); | ||
|
@@ -266,6 +245,41 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden | |
return result; | ||
} | ||
|
||
// This abstract function returns a whole function (instead of just a boolean value), | ||
// because we do not want to introduce `commitUser` into this base class. | ||
// | ||
// For writers with no conflicts, `commitUser` might be some random value. | ||
protected abstract Function<WriterContainer<T>, Boolean> createWriterCleanChecker(); | ||
|
||
protected static <T> | ||
Function<WriterContainer<T>, Boolean> createConflictAwareWriterCleanChecker( | ||
String commitUser, SnapshotManager snapshotManager) { | ||
long latestCommittedIdentifier = | ||
snapshotManager | ||
.latestSnapshotOfUser(commitUser) | ||
.map(Snapshot::commitIdentifier) | ||
.orElse(Long.MIN_VALUE); | ||
if (LOG.isDebugEnabled()) { | ||
LOG.debug("Latest committed identifier is {}", latestCommittedIdentifier); | ||
} | ||
|
||
// Condition 1: There is no more record waiting to be committed. Note that the | ||
// condition is < (instead of <=), because each commit identifier may have | ||
// multiple snapshots. We must make sure all snapshots of this identifier are | ||
// committed. | ||
// | ||
// Condition 2: No compaction is in progress. That is, no more changelog will be | ||
// produced. | ||
return writerContainer -> | ||
writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier | ||
&& !writerContainer.writer.isCompacting(); | ||
} | ||
|
||
protected static <T> | ||
Function<WriterContainer<T>, Boolean> createNoConflictAwareWriterCleanChecker() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just inline this method in |
||
return writerContainer -> true; | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
for (Map<Integer, WriterContainer<T>> bucketWriters : writers.values()) { | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do this abstraction? Or just introducing a
boolean checkWriterClean
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This abstract function returns a whole function (instead of just a boolean value), because we do not want to introduce
commitUser
into this base class.For writers with no conflicts,
commitUser
might be some random value.