Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports Force Committing Segments in Batches #14811

Merged
merged 81 commits into from
Feb 1, 2025

Conversation

noob-se7en
Copy link
Contributor

@noob-se7en noob-se7en commented Jan 14, 2025

Problem Statement
The Force Commit API can cause high ingestion lag and slower queries because it triggers the simultaneous segment commit for all consuming segments. This happens because:

  1. If N is the number of partition groups a server is consuming from, The API will cause all of the N consuming segments to commit. Hence N consumer threads will rush to acquire segment build semaphore. If the Semaphore allows only M permits, Only M consuming segments are in the segment build stage and the remaining (N - M) consumer threads are waiting on the Semaphore. Since (N - M) consumer threads are waiting, the consumption lag can become substantial.
  2. Since M consuming segments are built in parallel, queries can become slower on poorly sized servers due to high memory consumption.

Solution
Adds an additional optional Integer parameter: batchSize to the forceCommit API (Default Value = Integer.MAX_VALUE i.e. commit as many segments as possible at once if no batchSize is provided) .

3 New query params added to the ForceCommit API:

  • batchSize (integer, optional):
    Max number of consuming segments to commit at once.
    Default: Integer.MAX_VALUE
    Example: batchSize=100

  • batchStatusCheckIntervalSec (integer, optional):
    How often (in seconds) to check whether the current batch of segments has been successfully committed.
    Default: 5
    Example: batchStatusCheckIntervalSec=10

  • batchStatusCheckTimeoutSec (integer, optional):
    Timeout (in seconds) after which the controller stops checking the forceCommit status and throws an exception.
    Default: 180
    Example: batchStatusCheckTimeoutSec=300

@codecov-commenter
Copy link

codecov-commenter commented Jan 14, 2025

Codecov Report

Attention: Patch coverage is 48.86364% with 45 lines in your changes missing coverage. Please review.

Project coverage is 63.72%. Comparing base (59551e4) to head (5b78c1b).
Report is 1659 commits behind head on master.

Files with missing lines Patch % Lines
.../core/realtime/PinotLLCRealtimeSegmentManager.java 40.67% 30 Missing and 5 partials ⚠️
...ller/api/resources/PinotRealtimeTableResource.java 0.00% 5 Missing ⚠️
...pinot/spi/utils/retry/AttemptFailureException.java 60.00% 4 Missing ⚠️
...ntroller/api/resources/ForceCommitBatchConfig.java 91.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14811      +/-   ##
============================================
+ Coverage     61.75%   63.72%   +1.97%     
- Complexity      207     1482    +1275     
============================================
  Files          2436     2713     +277     
  Lines        133233   152138   +18905     
  Branches      20636    23510    +2874     
============================================
+ Hits          82274    96949   +14675     
- Misses        44911    47902    +2991     
- Partials       6048     7287    +1239     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.65% <48.86%> (+1.94%) ⬆️
java-21 63.61% <48.86%> (+1.99%) ⬆️
skip-bytebuffers-false 63.67% <48.86%> (+1.92%) ⬆️
skip-bytebuffers-true 63.59% <48.86%> (+35.87%) ⬆️
temurin 63.72% <48.86%> (+1.97%) ⬆️
unittests 63.72% <48.86%> (+1.97%) ⬆️
unittests1 56.22% <66.66%> (+9.32%) ⬆️
unittests2 34.05% <47.72%> (+6.32%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang
Copy link
Contributor

This solves #11950

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you are pushing the batch throttling to the server side. What will happen if some replicas decide to commit, and others get throttled? Even worse, could this cause deadlock?

@siddharthteotia
Copy link
Contributor

Is it not possible to solve the problem on controller / coordinate from the controller ? Pushing this down to the individual server will likely lead to error-prone situations

@noob-se7en
Copy link
Contributor Author

@Jackie-Jiang I don't quite get what is meant by

and others get throttled?

Regarding Deadlock or any edge case - Server will use the same logic which is used /tables/forceCommitStatus/{jobId} to check the status of the batch, so there should be no deadlock.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good

@@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
controllerConf.getDeepStoreRetryUploadParallelism()) : null;
_deepStoreUploadExecutorPendingSegments =
_isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null;
_forceCommitExecutorService = Executors.newFixedThreadPool(4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a fixed size pool could actually cause problems when there are multiple force commit request. Since it is waiting most of the time, I'd actually suggest making a single thread pool for each request same as the last version. It is not query path so the overhead should be fine.


Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields();
for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
if (!targetConsumingSegments.contains(segmentName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's loop over targetConsumingSegments instead of ideal state. Ideal state should always contain targetConsumingSegments because they are extracted from ideal state.

Comment on lines 1965 to 1971
instanceToConsumingSegments.compute(instance, (key, value) -> {
if (value == null) {
value = new LinkedList<>();
}
value.add(segmentName);
return value;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instanceToConsumingSegments.compute(instance, (key, value) -> {
if (value == null) {
value = new LinkedList<>();
}
value.add(segmentName);
return value;
});
instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName);

Comment on lines 1962 to 1963
for (String instance : instanceToStateMap.keySet()) {
String state = instanceToStateMap.get(instance);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use entrySet() to reduce lookup


while (segmentsRemaining) {
segmentsRemaining = false;
// pick segments in round-robin fashion to parallelize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smart!

String segmentName = queue.poll();
// there might be a segment replica hosted on
// another instance added before
if (segmentsAdded.contains(segmentName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reduce a lookup by

Suggested change
if (segmentsAdded.contains(segmentName)) {
if (!segmentsAdded.add(segmentName)) {

// pick segments in round-robin fashion to parallelize
// forceCommit across max servers
for (Queue<String> queue : instanceToConsumingSegments.values()) {
if (!queue.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the queue when it is empty to avoid checking it again and again. You may use iterator to remove entry without extra lookup


try {
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
} catch (InterruptedException ignored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring interrupt could be risky (holding a long running thread). Let's wrap it as a RuntimeException and throw it. We may log an error when catching it

}

int attemptCount = 0;
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
final Set<String>[] segmentsYetToBeCommitted = new Set[1];

@@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager {

// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take the check interval also from the rest API because different use case might want different interval; we might also want to add a TIMEOUT and also take that from rest API. The retry count can be calculated from timeout and interval.
We can provide default values (e.g. 5s, 3min) for them in case they are not provided. IMO 15s interval is too long because it means for each batch we will wait at least 15s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.
(Default case (5s interval, 3m timeout) might be little expensive as we are waiting for segments build to complete and there will be 36 calls at max to ZK per batch)

Copy link
Contributor

@sajjad-moradi sajjad-moradi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we going to come up with a good value for batch size param?
Since we only want M segment commits on each server, maybe Controller can decide which servers commit which partitions so that for each batch, one server commits at most M segments?

Comment on lines 1918 to 1919
LOGGER.error(errorMsg, e);
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception is thrown, there's no need to log. Add the errorMsg to the runtime exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need to log the message. This is running in a thread, and we are not handling the exception

int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs;
RetryPolicy retryPolicy =
RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs);
int attemptCount = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this variable. Both AttemptsExceededException and RetriableOperationException have getAttempts method.

@noob-se7en
Copy link
Contributor Author

noob-se7en commented Jan 30, 2025

How are we going to come up with a good value for batch size param?

hmmmm... IMO It should be <= _serverConfig.getProperty(MAX_PARALLEL_SEGMENT_BUILDS) * num_of_server_instances . To begin with we can start with this.

More accurate might be ~min(_numPartitions, _serverConfig.getProperty(MAX_PARALLEL_SEGMENT_BUILDS) * (num_of_server_instances - RF))

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

private final int _batchStatusCheckIntervalMs;
private final int _batchStatusCheckTimeoutMs;

private ForceCommitBatchConfig(Integer batchSize, Integer batchStatusCheckIntervalMs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Nullabe annotation for parameters that can be null. Same for other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has int params only now

sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments);

List<Set<String>> segmentBatchList =
getSegmentBatchList(idealState, targetConsumingSegments, forceCommitBatchConfig.getBatchSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce overhead, can we use the old way when batch size is non-positive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can still keep positive only batch size, but first check if batch size >= targetConsumingSegments.size() and fall back

Comment on lines 173 to 175
@ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)")
@QueryParam("batchSize")
Integer batchSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can directly put default in the API:

Suggested change
@ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)")
@QueryParam("batchSize")
Integer batchSize,
@ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)")
@QueryParam("batchSize") @DefaultValue(Integer.toString(Integer.MAX_VALUE))
int batchSize,

Copy link
Contributor Author

@noob-se7en noob-se7en Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only put compile-time constant here, hence this won't work.
Refactoring Integer to int and setting 0 to Integer.MAX

@noob-se7en
Copy link
Contributor Author

The integration test seems to be flaky which I am unable to reproduce locally. Might have to revert that test in future.

@Jackie-Jiang Jackie-Jiang merged commit 6747ad0 into apache:master Feb 1, 2025
20 of 21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants