Skip to content

Commit

Permalink
small refractor
Browse files Browse the repository at this point in the history
  • Loading branch information
Zihan Li committed Nov 15, 2023
1 parent 76696ca commit 807027f
Showing 1 changed file with 1 addition and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
public static final Long DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES = Long.valueOf(50 * 1024 * 1024);
// Max number of records to be pulled in single polling.
private static final String KAFKA_MAX_POLL_RECORDS_KEY = "kafka.consumer.maxPollRecords";
// Changed from Integer.MAX_VALUE to 1 to workaround and issue with lack of error handling support in LiKafka10
private static final int DEFAULT_MAX_POLL_RECORDS = 100;
private static final Long MAX_LOG_ERRORS = 100L;

Expand Down Expand Up @@ -232,7 +231,7 @@ public KafkaStreamingExtractor(WorkUnitState state) {
} else {
// As there is no avg record size available, using lower number to make sure we don't hit OOM issue
state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS);
log.info("set max.poll.records to be 100");
log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS);
}
this.kafkaConsumerClientResolver =
new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
Expand Down

0 comments on commit 807027f

Please sign in to comment.