-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Supports Force Committing Segments in Batches #14811
Changes from 73 commits
eeb5be1
5f5a554
ca5104a
434e8a3
504f3c9
987bb00
ff25c5f
e28ff47
99a7cee
255bc34
3a9e41a
b2eeb85
1782207
90db3b8
fa418b9
470c6eb
8de7bfc
4f2d4fc
50af02e
09d557e
32b7fd5
e334983
1aecc5a
5be2722
153a897
430127d
f20948e
5012b5f
c2312d2
49474f5
ab2220f
ed90f11
e88aa2a
8c8d8d3
1be0316
55fa6e2
3297ddd
748d0d3
095acc0
36360b8
d3d42ca
262bee0
68cdc26
9f833c6
f5d68ae
bffab6d
a1079c2
b8a2e7f
5730a06
c8565d6
165e7ab
0cab772
de04824
b95a2f6
857dd6a
2f7e5d9
0b64439
9e3ddad
01604e9
bb84ae2
71f4ee1
2408d13
ff67929
80dda07
11299f4
ad7aec0
7ea5535
5ea7c3f
6907c8f
2a61ce4
445efbc
444fc49
e7ab323
5c186d8
6110a5a
f2fbd4b
1d5af84
92c771d
b14e2af
de52012
5b78c1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.controller.api.resources; | ||
|
||
public class ForceCommitBatchConfig { | ||
|
||
private static final int DEFAULT_BATCH_SIZE = Integer.MAX_VALUE; | ||
private static final int DEFAULT_BATCH_STATUS_CHECK_INTERVAL_SEC = 5; | ||
private static final int DEFAULT_BATCH_STATUS_CHECK_TIMEOUT_SEC = 180; | ||
|
||
private final int _batchSize; | ||
private final int _batchStatusCheckIntervalMs; | ||
private final int _batchStatusCheckTimeoutMs; | ||
|
||
private ForceCommitBatchConfig(Integer batchSize, Integer batchStatusCheckIntervalMs, | ||
Integer batchStatusCheckTimeoutMs) { | ||
_batchSize = batchSize; | ||
_batchStatusCheckIntervalMs = batchStatusCheckIntervalMs; | ||
_batchStatusCheckTimeoutMs = batchStatusCheckTimeoutMs; | ||
} | ||
|
||
public static ForceCommitBatchConfig of(Integer batchSize, Integer batchStatusCheckIntervalSec, | ||
Integer batchStatusCheckTimeoutSec) { | ||
if (batchSize == null) { | ||
batchSize = DEFAULT_BATCH_SIZE; | ||
} else if (batchSize <= 0) { | ||
throw new IllegalArgumentException("Batch size should be greater than zero"); | ||
} | ||
|
||
if (batchStatusCheckIntervalSec == null) { | ||
batchStatusCheckIntervalSec = DEFAULT_BATCH_STATUS_CHECK_INTERVAL_SEC; | ||
} else if (batchStatusCheckIntervalSec <= 0) { | ||
throw new IllegalArgumentException("Batch status check interval should be greater than zero"); | ||
} | ||
|
||
if (batchStatusCheckTimeoutSec == null) { | ||
batchStatusCheckTimeoutSec = DEFAULT_BATCH_STATUS_CHECK_TIMEOUT_SEC; | ||
} else if (batchStatusCheckTimeoutSec <= 0) { | ||
throw new IllegalArgumentException("Batch status check timeout should be greater than zero"); | ||
} | ||
|
||
return new ForceCommitBatchConfig(batchSize, batchStatusCheckIntervalSec * 1000, batchStatusCheckTimeoutSec * 1000); | ||
} | ||
|
||
public int getBatchSize() { | ||
return _batchSize; | ||
} | ||
|
||
public int getBatchStatusCheckIntervalMs() { | ||
return _batchStatusCheckIntervalMs; | ||
} | ||
|
||
public int getBatchStatusCheckTimeoutMs() { | ||
return _batchStatusCheckTimeoutMs; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -169,19 +169,40 @@ public Map<String, String> forceCommit( | |||||||||||||
@ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions") | ||||||||||||||
String partitionGroupIds, | ||||||||||||||
@ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments") | ||||||||||||||
String consumingSegments, @Context HttpHeaders headers) { | ||||||||||||||
String consumingSegments, | ||||||||||||||
@ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)") | ||||||||||||||
@QueryParam("batchSize") | ||||||||||||||
Integer batchSize, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can directly put default in the API:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can only put compile-time constant here, hence this won't work. |
||||||||||||||
@ApiParam(value = "How often to check whether the current batch of segments have been successfully committed or" | ||||||||||||||
+ " not (default = 5)") | ||||||||||||||
@QueryParam("batchStatusCheckIntervalSec") | ||||||||||||||
Integer batchStatusCheckIntervalSec, | ||||||||||||||
@ApiParam(value = "Timeout based on which the controller will stop checking the forceCommit status of the batch" | ||||||||||||||
+ " of segments and throw an exception. (default = 180)") | ||||||||||||||
@QueryParam("batchStatusCheckTimeoutSec") | ||||||||||||||
Integer batchStatusCheckTimeoutSec, | ||||||||||||||
@Context HttpHeaders headers) { | ||||||||||||||
tableName = DatabaseUtils.translateTableName(tableName, headers); | ||||||||||||||
if (partitionGroupIds != null && consumingSegments != null) { | ||||||||||||||
throw new ControllerApplicationException(LOGGER, "Cannot specify both partitions and segments to commit", | ||||||||||||||
Response.Status.BAD_REQUEST); | ||||||||||||||
} | ||||||||||||||
ForceCommitBatchConfig forceCommitBatchConfig; | ||||||||||||||
try { | ||||||||||||||
forceCommitBatchConfig = | ||||||||||||||
ForceCommitBatchConfig.of(batchSize, batchStatusCheckIntervalSec, batchStatusCheckTimeoutSec); | ||||||||||||||
} catch (Exception e) { | ||||||||||||||
throw new ControllerApplicationException(LOGGER, "Invalid batch config", | ||||||||||||||
Response.Status.BAD_REQUEST); | ||||||||||||||
} | ||||||||||||||
long startTimeMs = System.currentTimeMillis(); | ||||||||||||||
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); | ||||||||||||||
validateTable(tableNameWithType); | ||||||||||||||
Map<String, String> response = new HashMap<>(); | ||||||||||||||
try { | ||||||||||||||
Set<String> consumingSegmentsForceCommitted = | ||||||||||||||
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments); | ||||||||||||||
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments, | ||||||||||||||
forceCommitBatchConfig); | ||||||||||||||
response.put("forceCommitStatus", "SUCCESS"); | ||||||||||||||
try { | ||||||||||||||
String jobId = UUID.randomUUID().toString(); | ||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,11 +28,15 @@ | |
import java.sql.Timestamp; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Queue; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.TreeSet; | ||
|
@@ -72,6 +76,7 @@ | |
import org.apache.pinot.controller.ControllerConf; | ||
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; | ||
import org.apache.pinot.controller.api.resources.Constants; | ||
import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig; | ||
import org.apache.pinot.controller.api.resources.PauseStatusDetails; | ||
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; | ||
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; | ||
|
@@ -117,7 +122,10 @@ | |
import org.apache.pinot.spi.utils.StringUtil; | ||
import org.apache.pinot.spi.utils.TimeUtils; | ||
import org.apache.pinot.spi.utils.builder.TableNameBuilder; | ||
import org.apache.pinot.spi.utils.retry.AttemptsExceededException; | ||
import org.apache.pinot.spi.utils.retry.RetriableOperationException; | ||
import org.apache.pinot.spi.utils.retry.RetryPolicies; | ||
import org.apache.pinot.spi.utils.retry.RetryPolicy; | ||
import org.apache.zookeeper.data.Stat; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -189,6 +197,7 @@ public class PinotLLCRealtimeSegmentManager { | |
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0); | ||
private final ExecutorService _deepStoreUploadExecutor; | ||
private final Set<String> _deepStoreUploadExecutorPendingSegments; | ||
private final ExecutorService _forceCommitExecutorService; | ||
|
||
private volatile boolean _isStopping = false; | ||
|
||
|
@@ -213,6 +222,7 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan | |
controllerConf.getDeepStoreRetryUploadParallelism()) : null; | ||
_deepStoreUploadExecutorPendingSegments = | ||
_isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null; | ||
_forceCommitExecutorService = Executors.newCachedThreadPool(); | ||
} | ||
|
||
public boolean isDeepStoreLLCSegmentUploadRetryEnabled() { | ||
|
@@ -309,6 +319,8 @@ public void stop() { | |
LOGGER.error("Failed to close fileUploadDownloadClient."); | ||
} | ||
} | ||
|
||
_forceCommitExecutorService.shutdown(); | ||
} | ||
|
||
/** | ||
|
@@ -1848,15 +1860,130 @@ private boolean isTmpAndCanDelete(String filePath, Set<String> downloadUrls, Pin | |
* @return the set of consuming segments for which commit was initiated | ||
*/ | ||
public Set<String> forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, | ||
@Nullable String segmentsToCommit) { | ||
@Nullable String segmentsToCommit, ForceCommitBatchConfig forceCommitBatchConfig) { | ||
IdealState idealState = getIdealState(tableNameWithType); | ||
Set<String> allConsumingSegments = findConsumingSegments(idealState); | ||
Set<String> targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, | ||
segmentsToCommit); | ||
sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); | ||
|
||
List<Set<String>> segmentBatchList = | ||
getSegmentBatchList(idealState, targetConsumingSegments, forceCommitBatchConfig.getBatchSize()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To reduce overhead, can we use the old way when batch size is non-positive? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually we can still keep positive only batch size, but first check if batch size >= |
||
|
||
_forceCommitExecutorService.submit( | ||
() -> processBatchesSequentially(segmentBatchList, tableNameWithType, forceCommitBatchConfig)); | ||
|
||
return targetConsumingSegments; | ||
} | ||
|
||
private void processBatchesSequentially(List<Set<String>> segmentBatchList, String tableNameWithType, | ||
ForceCommitBatchConfig forceCommitBatchConfig) { | ||
Set<String> prevBatch = null; | ||
for (Set<String> segmentBatchToCommit : segmentBatchList) { | ||
if (prevBatch != null) { | ||
waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, forceCommitBatchConfig); | ||
} | ||
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); | ||
prevBatch = segmentBatchToCommit; | ||
} | ||
} | ||
|
||
private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set<String> segmentBatchToCommit, | ||
ForceCommitBatchConfig forceCommitBatchConfig) { | ||
int batchStatusCheckIntervalMs = forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); | ||
int batchStatusCheckTimeoutMs = forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); | ||
|
||
try { | ||
Thread.sleep(batchStatusCheckIntervalMs); | ||
} catch (InterruptedException e) { | ||
LOGGER.error("Exception occurred while waiting for the forceCommit of segments: {}", segmentBatchToCommit, e); | ||
throw new RuntimeException(e); | ||
} | ||
|
||
int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs; | ||
RetryPolicy retryPolicy = | ||
RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs); | ||
final Set<String>[] segmentsYetToBeCommitted = new Set[1]; | ||
|
||
try { | ||
retryPolicy.attempt(() -> { | ||
segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); | ||
return segmentsYetToBeCommitted[0].isEmpty(); | ||
}); | ||
} catch (AttemptsExceededException | RetriableOperationException e) { | ||
int attemptCount; | ||
if (e instanceof AttemptsExceededException) { | ||
attemptCount = ((AttemptsExceededException) e).getAttempts(); | ||
} else { | ||
attemptCount = ((RetriableOperationException) e).getAttempts(); | ||
} | ||
String errorMsg = String.format( | ||
"Exception occurred while waiting for the forceCommit of segments: %s, attempt count: %d, " | ||
+ "segmentsYetToBeCommitted: %s", | ||
segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]); | ||
throw new RuntimeException(errorMsg, e); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments, | ||
int batchSize) { | ||
Map<String, Queue<String>> instanceToConsumingSegments = | ||
getInstanceToConsumingSegments(idealState, targetConsumingSegments); | ||
|
||
List<Set<String>> segmentBatchList = new ArrayList<>(); | ||
Set<String> currentBatch = new HashSet<>(); | ||
Set<String> segmentsAdded = new HashSet<>(); | ||
Collection<Queue<String>> instanceSegmentsCollection = instanceToConsumingSegments.values(); | ||
|
||
while (!instanceSegmentsCollection.isEmpty()) { | ||
Iterator<Queue<String>> instanceCollectionIterator = instanceSegmentsCollection.iterator(); | ||
// pick segments in round-robin fashion to parallelize | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Smart! |
||
// forceCommit across max servers | ||
while (instanceCollectionIterator.hasNext()) { | ||
Queue<String> consumingSegments = instanceCollectionIterator.next(); | ||
String segmentName = consumingSegments.poll(); | ||
if (consumingSegments.isEmpty()) { | ||
instanceCollectionIterator.remove(); | ||
} | ||
if (!segmentsAdded.add(segmentName)) { | ||
// there might be a segment replica hosted on | ||
// another instance added before | ||
continue; | ||
} | ||
currentBatch.add(segmentName); | ||
if (currentBatch.size() == batchSize) { | ||
segmentBatchList.add(currentBatch); | ||
currentBatch = new HashSet<>(); | ||
} | ||
} | ||
} | ||
|
||
if (!currentBatch.isEmpty()) { | ||
segmentBatchList.add(currentBatch); | ||
} | ||
return segmentBatchList; | ||
} | ||
|
||
@VisibleForTesting | ||
Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState idealState, | ||
Set<String> targetConsumingSegments) { | ||
Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>(); | ||
Map<String, Map<String, String>> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); | ||
|
||
for (String segmentName: targetConsumingSegments) { | ||
Map<String, String> instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName); | ||
|
||
for (Map.Entry<String, String> instanceToState : instanceToStateMap.entrySet()) { | ||
String instance = instanceToState.getKey(); | ||
String state = instanceToState.getValue(); | ||
if (state.equals(SegmentStateModel.CONSUMING)) { | ||
instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName); | ||
} | ||
} | ||
} | ||
return instanceToConsumingSegments; | ||
} | ||
|
||
/** | ||
* Among all consuming segments, filter the ones that are in the given partitions or segments. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pinot.controller.api.resources; | ||
|
||
import org.testng.annotations.Test; | ||
|
||
import static org.testng.Assert.assertThrows; | ||
|
||
|
||
public class ForceCommitBatchConfigTest { | ||
|
||
@Test | ||
public void testForceCommitBatchConfig() { | ||
ForceCommitBatchConfig forceCommitBatchConfig = ForceCommitBatchConfig.of(null, null, null); | ||
assert Integer.MAX_VALUE == forceCommitBatchConfig.getBatchSize(); | ||
assert 5000 == forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); | ||
assert 180000 == forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); | ||
|
||
forceCommitBatchConfig = ForceCommitBatchConfig.of(1, null, null); | ||
assert 1 == forceCommitBatchConfig.getBatchSize(); | ||
assert 5000 == forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); | ||
assert 180000 == forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); | ||
|
||
forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 23, 37); | ||
assert 1 == forceCommitBatchConfig.getBatchSize(); | ||
assert 23000 == forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); | ||
assert 37000 == forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); | ||
|
||
assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(0, null, null)); | ||
assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(32, 0, null)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add
@Nullabe
annotation for parameters that can be null. Same for other placesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method has int params only now