Skip to content

Commit

Permalink
Kafka source offset-based deduplication. (#33596)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp authored Feb 3, 2025
1 parent edf7c90 commit 8cbccc9
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
Expand All @@ -66,6 +70,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}

// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
PartitionMark partition = partitions.get(/* index= */ 0);
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract @Nullable Boolean getOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +785,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(Boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -886,11 +891,16 @@ static <K, V> void setupExternalBuilder(
if (config.allowDuplicates != null) {
builder.setAllowDuplicates(config.allowDuplicates);
}

if (config.redistribute
&& (config.allowDuplicates == null || !config.allowDuplicates)
&& config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
} else {
builder.setRedistributed(false);
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
builder.setOffsetDeduplication(false);
}
}

Expand Down Expand Up @@ -959,6 +969,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1066,26 +1081,21 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
* Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
*/
public Read<K, V> withRedistribute() {
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
}
return toBuilder().setRedistributed(true).build();
}

public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
if (!isAllowDuplicates()) {
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
}
return toBuilder().setAllowDuplicates(allowDuplicates).build();
}

public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
checkState(
isRedistributed(),
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down Expand Up @@ -1541,6 +1551,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
}

checkRedistributeConfiguration();

warnAboutUnsafeConfigurations(input);

// Infer key/value coders if not specified explicitly
Expand Down Expand Up @@ -1573,6 +1586,26 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

private void checkRedistributeConfiguration() {
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
LOG.warn(
"withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
}
if (isAllowDuplicates()) {
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
}
if (getRedistributeNumKeys() > 0) {
checkState(
isRedistributed(),
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
}
if (getOffsetDeduplication() != null && getOffsetDeduplication()) {
checkState(
isRedistributed() && !isAllowDuplicates(),
"withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
}
}

private void warnAboutUnsafeConfigurations(PBegin input) {
Long checkpointingInterval =
input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ Object getDefaultValue() {
return false;
}
},
OFFSET_DEDUPLICATION(LEGACY),
;

private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -142,4 +144,15 @@ void update(double quantity) {
return avg;
}
}

static final class OffsetBasedDeduplication {

static byte[] encodeOffset(long offset) {
return Longs.toByteArray(offset);
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -299,6 +300,30 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (!offsetBasedDeduplicationSupported()) {
// Defer result to super if offset deduplication is not supported.
return super.getCurrentRecordId();
}
if (curRecord == null) {
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}
return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
if (!offsetBasedDeduplicationSupported()) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curRecord == null) {
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -313,6 +338,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetBasedDeduplicationSupported() {
return source.offsetBasedDeduplicationSupported();
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -331,8 +360,8 @@ public long getSplitBacklogBytes() {
private final String name;
private @Nullable Consumer<byte[], byte[]> consumer = null;
private final List<PartitionState<K, V>> partitionStates;
private @Nullable KafkaRecord<K, V> curRecord = null;
private @Nullable Instant curTimestamp = null;
private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
private @MonotonicNonNull Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private @Nullable Deserializer<K> keyDeserializerInstance = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
// XXX make all splits have the same # of partitions
while (partitions.size() % numSplits > 0) {
++numSplits;
int numSplits;
if (offsetBasedDeduplicationSupported()) {
// Enforce 1:1 split to partition ratio for offset deduplication.
numSplits = partitions.size();
LOG.info(
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ "Forcing the number of splits to equal the number of total partitions: {}.",
numSplits);
} else {
numSplits = Math.min(desiredNumSplits, partitions.size());
// Make all splits have the same # of partitions.
while (partitions.size() % numSplits > 0) {
++numSplits;
}
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

Expand Down Expand Up @@ -177,6 +187,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetBasedDeduplicationSupported() {
return spec.getOffsetDeduplication() != null && spec.getOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void testConstructKafkaRead() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();
System.out.println("xxx : " + result.toString());
assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransformWithOffsetDedup;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -114,7 +115,8 @@ private PipelineResult testReadTransformCreationWithImplementationBoundPropertie
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
0)));
0, /*numKeys*/
null /*offsetDeduplication*/)));
return p.run();
}

Expand All @@ -139,6 +141,17 @@ public void testReadTransformCreationWithLegacyImplementationBoundProperty() {
assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), containsInAnyOrder(expect));
}

@Test
public void testReadTransformCreationWithOffsetDeduplication() {
p.apply(mkKafkaReadTransformWithOffsetDedup(1000, new ValueAsTimestampFn()));
PipelineResult r = p.run();
String[] expect =
KafkaIOTest.mkKafkaTopics.stream()
.map(topic -> String.format("kafka:`%s`.%s", KafkaIOTest.mkKafkaServers, topic))
.toArray(String[]::new);
assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), containsInAnyOrder(expect));
}

@Test
public void testReadTransformCreationWithSdfImplementationBoundProperty() {
PipelineResult r =
Expand Down
Loading

0 comments on commit 8cbccc9

Please sign in to comment.