Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support upper bound in dynamic bucket mode #4974

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@
<td>Integer</td>
<td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.max-buckets-per-assigner</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Max buckets per assigner operator for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).</td>
</tr>
<tr>
<td><h5>dynamic-bucket.target-row-num</h5></td>
<td style="word-wrap: break-word;">2000000</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Initial buckets for a partition in assigner operator for dynamic bucket mode.");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER =
key("dynamic-bucket.max-buckets-per-assigner")
liyubin117 marked this conversation as resolved.
Show resolved Hide resolved
.intType()
.defaultValue(-1)
.withDescription(
"Max buckets per assigner operator for a partition in dynamic bucket mode, It should "
+ "either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
key("dynamic-bucket.assigner-parallelism")
.intType()
Expand Down Expand Up @@ -2219,6 +2227,10 @@ public Integer dynamicBucketInitialBuckets() {
return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
}

public Integer dynamicBucketMaxBucketsPerAssigner() {
return options.get(DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER);
}

public Integer dynamicBucketAssignerParallelism() {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HashBucketAssigner implements BucketAssigner {
private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
private final int maxBucketsNum;

private final Map<BinaryRow, PartitionIndex> partitionIndex;

Expand All @@ -55,7 +56,8 @@ public HashBucketAssigner(
int numChannels,
int numAssigners,
int assignId,
long targetBucketRowNumber) {
long targetBucketRowNumber,
int maxBucketsNum) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
this.indexFileHandler = indexFileHandler;
Expand All @@ -64,6 +66,7 @@ public HashBucketAssigner(
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
this.maxBucketsNum = maxBucketsNum;
}

/** Assign a bucket for key hash of a record. */
Expand All @@ -84,7 +87,7 @@ public int assign(BinaryRow partition, int hash) {
this.partitionIndex.put(partition, index);
}

int assigned = index.assign(hash, this::isMyBucket);
int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Assign " + assigned + " to the partition " + partition + " key hash " + hash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;
import org.apache.paimon.utils.IntIterator;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -57,13 +58,13 @@ public PartitionIndex(
long targetBucketRowNumber) {
this.hash2Bucket = hash2Bucket;
this.nonFullBucketInformation = bucketInformation;
this.totalBucket = new HashSet<>(bucketInformation.keySet());
this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet());
this.targetBucketRowNumber = targetBucketRowNumber;
this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
this.accessed = true;
}

public int assign(int hash, IntPredicate bucketFilter) {
public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
accessed = true;

// 1. is it a key that has appeared before
Expand All @@ -80,29 +81,39 @@ public int assign(int hash, IntPredicate bucketFilter) {
Long number = entry.getValue();
if (number < targetBucketRowNumber) {
entry.setValue(number + 1);
hash2Bucket.put(hash, bucket.shortValue());
return bucket;
return cacheBucketAndGet(hash, bucket);
} else {
iterator.remove();
if (-1 != maxBucketsNum && totalBucket.size() == maxBucketsNum) {
return cacheBucketAndGet(
hash,
KeyAndBucketExtractor.bucketWithUpperBound(
totalBucket, hash, maxBucketsNum));
}
}
}

// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
hash2Bucket.put(hash, (short) i);
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
return i;
if (-1 == maxBucketsNum || totalBucket.size() < maxBucketsNum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here should be: max bucket should be smaller than maxBucketsNum.

If it is a job restarted from scratch, each task is increasing, and the previous judgment may be problematic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wise consideration! done :)

// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
return cacheBucketAndGet(hash, i);
}
}
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
throw new RuntimeException(
String.format(
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucket, targetBucketRowNumber));
@SuppressWarnings("OptionalGetWithoutIsPresent")
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
throw new RuntimeException(
String.format(
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucket, targetBucketRowNumber));
} else {
return cacheBucketAndGet(
hash,
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum));
}
}

public static PartitionIndex loadIndex(
Expand Down Expand Up @@ -137,4 +148,9 @@ public static PartitionIndex loadIndex(
}
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber);
}

private int cacheBucketAndGet(int hash, int bucket) {
hash2Bucket.put(hash, (short) bucket);
return bucket;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

Expand All @@ -32,14 +34,17 @@ public class SimpleHashBucketAssigner implements BucketAssigner {
private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
private final int maxBucketsNum;

private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;

public SimpleHashBucketAssigner(int numAssigners, int assignId, long targetBucketRowNumber) {
public SimpleHashBucketAssigner(
int numAssigners, int assignId, long targetBucketRowNumber, int maxBucketsNum) {
this.numAssigners = numAssigners;
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
this.maxBucketsNum = maxBucketsNum;
}

@Override
Expand Down Expand Up @@ -71,7 +76,7 @@ private class SimplePartitionIndex {
private int currentBucket;

private SimplePartitionIndex() {
bucketInformation = new HashMap<>();
bucketInformation = new LinkedHashMap<>();
loadNewBucket();
}

Expand All @@ -83,7 +88,15 @@ public int assign(int hash) {

Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L);
if (num >= targetBucketRowNumber) {
loadNewBucket();
if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) {
int bucket =
KeyAndBucketExtractor.bucketWithUpperBound(
bucketInformation.keySet(), hash, maxBucketsNum);
hash2Bucket.put(hash, (short) bucket);
liyubin117 marked this conversation as resolved.
Show resolved Hide resolved
return bucket;
} else {
loadNewBucket();
}
}
bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L : l + 1);
hash2Bucket.put(hash, (short) currentBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.types.RowKind;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand All @@ -31,6 +38,7 @@
* @param <T> type of record
*/
public interface KeyAndBucketExtractor<T> {
Logger LOG = LoggerFactory.getLogger(KeyAndBucketExtractor.class);

void setRecord(T record);

Expand All @@ -51,4 +59,17 @@ static int bucket(int hashcode, int numBuckets) {
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
return Math.abs(hashcode % numBuckets);
}

static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int maxBucketsNum) {
checkArgument(maxBucketsNum > 0, "Num max-buckets is illegal: " + maxBucketsNum);
LOG.debug(
"Assign record (hashcode '{}') to new bucket exceed upper bound '{}' defined in '{}', Stop creating new buckets.",
hashcode,
maxBucketsNum,
DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER.key());
return bucketsSet.stream()
.skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
.findFirst()
.orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -63,7 +66,21 @@ private HashBucketAssigner createAssigner(int numChannels, int numAssigners, int
numChannels,
numAssigners,
assignId,
5);
5,
-1);
}

private HashBucketAssigner createAssigner(
int numChannels, int numAssigners, int assignId, int maxBucketsNum) {
return new HashBucketAssigner(
table.snapshotManager(),
commitUser,
fileHandler,
numChannels,
numAssigners,
assignId,
5,
maxBucketsNum);
}

@Test
Expand Down Expand Up @@ -92,8 +109,52 @@ public void testAssign() {
}

@Test
public void testPartitionCopy() {
HashBucketAssigner assigner = createAssigner(1, 1, 0);
public void testAssignWithUpperBound() {
HashBucketAssigner assigner = createAssigner(2, 2, 0, 2);

// assign
assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
assertThat(assigner.assign(row(1), 2)).isEqualTo(0);
assertThat(assigner.assign(row(1), 4)).isEqualTo(0);
assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
assertThat(assigner.assign(row(1), 8)).isEqualTo(0);

// full
assertThat(assigner.assign(row(1), 10)).isEqualTo(2);
assertThat(assigner.assign(row(1), 12)).isEqualTo(2);
assertThat(assigner.assign(row(1), 14)).isEqualTo(2);
assertThat(assigner.assign(row(1), 16)).isEqualTo(2);
assertThat(assigner.assign(row(1), 18)).isEqualTo(2);

// another partition
assertThat(assigner.assign(row(2), 12)).isEqualTo(0);

// read assigned
assertThat(assigner.assign(row(1), 6)).isEqualTo(0);

// not mine
assertThatThrownBy(() -> assigner.assign(row(1), 1))
.hasMessageContaining("This is a bug, record assign id");

// exceed upper bound
// partition 1
int hash = 18;
for (int i = 0; i < 200; i++) {
int bucket = assigner.assign(row(1), hash += 2);
Assertions.assertThat(bucket).isIn(0, 2);
}
// partition 2
hash = 12;
for (int i = 0; i < 200; i++) {
int bucket = assigner.assign(row(2), hash += 2);
Assertions.assertThat(bucket).isIn(0, 2);
}
}

@ParameterizedTest(name = "maxBucket: {0}")
@ValueSource(ints = {-1, 1, 2})
public void testPartitionCopy(int maxBucketsNum) {
HashBucketAssigner assigner = createAssigner(1, 1, 0, maxBucketsNum);

BinaryRow partition = row(1);
assertThat(assigner.assign(partition, 0)).isEqualTo(0);
Expand Down Expand Up @@ -144,6 +205,34 @@ public void testAssignRestore() {
assertThat(assigner0.assign(row(1), 17)).isEqualTo(3);
}

@Test
public void testAssignRestoreWithUpperBound() {
IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
commit.commit(
0,
Arrays.asList(
createCommitMessage(row(1), 0, bucket0),
createCommitMessage(row(1), 2, bucket2)));

HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);

// read assigned
assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
assertThat(assigner2.assign(row(1), 4)).isEqualTo(2);
assertThat(assigner0.assign(row(1), 5)).isEqualTo(0);
assertThat(assigner2.assign(row(1), 7)).isEqualTo(2);

// new assign
assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
assertThat(assigner0.assign(row(1), 11)).isEqualTo(0);
assertThat(assigner0.assign(row(1), 14)).isEqualTo(0);
assertThat(assigner2.assign(row(1), 16)).isEqualTo(2);
// exceed buckets limits
assertThat(assigner0.assign(row(1), 17)).isEqualTo(0);
}

@Test
public void testAssignDecoupled() {
HashBucketAssigner assigner1 = createAssigner(3, 2, 1);
Expand Down
Loading
Loading