diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java new file mode 100644 index 000000000000..b34ec382f6a6 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfig.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.google.common.base.Preconditions; + + +public class ForceCommitBatchConfig { + private final int _batchSize; + private final int _batchStatusCheckIntervalMs; + private final int _batchStatusCheckTimeoutMs; + + private ForceCommitBatchConfig(int batchSize, int batchStatusCheckIntervalMs, int batchStatusCheckTimeoutMs) { + _batchSize = batchSize; + _batchStatusCheckIntervalMs = batchStatusCheckIntervalMs; + _batchStatusCheckTimeoutMs = batchStatusCheckTimeoutMs; + } + + public static ForceCommitBatchConfig of(int batchSize, int batchStatusCheckIntervalSec, + int batchStatusCheckTimeoutSec) { + Preconditions.checkArgument(batchSize > 0, "Batch size should be greater than zero"); + Preconditions.checkArgument(batchStatusCheckIntervalSec > 0, + "Batch status check interval should be greater than zero"); + Preconditions.checkArgument(batchStatusCheckTimeoutSec > 0, + "Batch status check timeout should be greater than zero"); + return new ForceCommitBatchConfig(batchSize, batchStatusCheckIntervalSec * 1000, batchStatusCheckTimeoutSec * 1000); + } + + public int getBatchSize() { + return _batchSize; + } + + public int getBatchStatusCheckIntervalMs() { + return _batchStatusCheckIntervalMs; + } + + public int getBatchStatusCheckTimeoutMs() { + return _batchStatusCheckTimeoutMs; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index f4a0e633a010..e69de66acba5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.Executor; import javax.inject.Inject; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -169,19 +170,35 @@ public Map forceCommit( @ApiParam(value = "Comma separated list of partition group IDs to be committed") @QueryParam("partitions") String partitionGroupIds, @ApiParam(value = "Comma separated list of consuming segments to be committed") @QueryParam("segments") - String consumingSegments, @Context HttpHeaders headers) { + String consumingSegments, + @ApiParam(value = "Max number of consuming segments to commit at once (default = Integer.MAX_VALUE)") + @QueryParam("batchSize") @DefaultValue(Integer.MAX_VALUE + "") int batchSize, + @ApiParam(value = "How often to check whether the current batch of segments have been successfully committed or" + + " not (default = 5)") + @QueryParam("batchStatusCheckIntervalSec") @DefaultValue("5") int batchStatusCheckIntervalSec, + @ApiParam(value = "Timeout based on which the controller will stop checking the forceCommit status of the batch" + + " of segments and throw an exception. (default = 180)") + @QueryParam("batchStatusCheckTimeoutSec") @DefaultValue("180") int batchStatusCheckTimeoutSec, + @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); if (partitionGroupIds != null && consumingSegments != null) { throw new ControllerApplicationException(LOGGER, "Cannot specify both partitions and segments to commit", Response.Status.BAD_REQUEST); } + ForceCommitBatchConfig batchConfig; + try { + batchConfig = ForceCommitBatchConfig.of(batchSize, batchStatusCheckIntervalSec, batchStatusCheckTimeoutSec); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Invalid batch config", Response.Status.BAD_REQUEST, e); + } long startTimeMs = System.currentTimeMillis(); String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validateTable(tableNameWithType); Map response = new HashMap<>(); try { Set consumingSegmentsForceCommitted = - _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments); + _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType, partitionGroupIds, consumingSegments, + batchConfig); response.put("forceCommitStatus", "SUCCESS"); try { String jobId = UUID.randomUUID().toString(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 2b9cf8f954ef..fc3172e09361 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -28,11 +28,15 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.TreeSet; @@ -72,6 +76,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.api.resources.Constants; +import org.apache.pinot.controller.api.resources.ForceCommitBatchConfig; import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; @@ -117,7 +122,9 @@ import org.apache.pinot.spi.utils.StringUtil; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.spi.utils.retry.AttemptFailureException; import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1848,15 +1855,126 @@ private boolean isTmpAndCanDelete(String filePath, Set downloadUrls, Pin * @return the set of consuming segments for which commit was initiated */ public Set forceCommit(String tableNameWithType, @Nullable String partitionGroupIdsToCommit, - @Nullable String segmentsToCommit) { + @Nullable String segmentsToCommit, ForceCommitBatchConfig batchConfig) { IdealState idealState = getIdealState(tableNameWithType); Set allConsumingSegments = findConsumingSegments(idealState); Set targetConsumingSegments = filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit, segmentsToCommit); - sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + int batchSize = batchConfig.getBatchSize(); + if (batchSize >= targetConsumingSegments.size()) { + // No need to divide segments in batches. + sendForceCommitMessageToServers(tableNameWithType, targetConsumingSegments); + } else { + List> segmentBatchList = getSegmentBatchList(idealState, targetConsumingSegments, batchSize); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> processBatchesSequentially(segmentBatchList, tableNameWithType, batchConfig)); + executor.shutdown(); + } return targetConsumingSegments; } + private void processBatchesSequentially(List> segmentBatchList, String tableNameWithType, + ForceCommitBatchConfig forceCommitBatchConfig) { + Set prevBatch = null; + for (Set segmentBatchToCommit : segmentBatchList) { + if (prevBatch != null) { + waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch, forceCommitBatchConfig); + } + sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit); + prevBatch = segmentBatchToCommit; + } + } + + private void waitUntilPrevBatchIsComplete(String tableNameWithType, Set segmentBatchToCommit, + ForceCommitBatchConfig forceCommitBatchConfig) { + int batchStatusCheckIntervalMs = forceCommitBatchConfig.getBatchStatusCheckIntervalMs(); + int batchStatusCheckTimeoutMs = forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(); + + try { + Thread.sleep(batchStatusCheckIntervalMs); + } catch (InterruptedException e) { + LOGGER.error("Exception occurred while waiting for the forceCommit of segments: {}", segmentBatchToCommit, e); + throw new RuntimeException(e); + } + + int maxAttempts = (batchStatusCheckTimeoutMs + batchStatusCheckIntervalMs - 1) / batchStatusCheckIntervalMs; + RetryPolicy retryPolicy = RetryPolicies.fixedDelayRetryPolicy(maxAttempts, batchStatusCheckIntervalMs); + Set[] segmentsYetToBeCommitted = new Set[1]; + try { + retryPolicy.attempt(() -> { + segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit); + return segmentsYetToBeCommitted[0].isEmpty(); + }); + } catch (AttemptFailureException e) { + String errorMsg = String.format( + "Exception occurred while waiting for the forceCommit of segments: %s, attempt count: %d, " + + "segmentsYetToBeCommitted: %s", segmentBatchToCommit, e.getAttempts(), segmentsYetToBeCommitted[0]); + LOGGER.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + + LOGGER.info("segmentBatch: {} successfully force committed", segmentBatchToCommit); + } + + @VisibleForTesting + List> getSegmentBatchList(IdealState idealState, Set targetConsumingSegments, int batchSize) { + int numSegments = targetConsumingSegments.size(); + List> segmentBatchList = new ArrayList<>((numSegments + batchSize - 1) / batchSize); + + Map> instanceToConsumingSegments = + getInstanceToConsumingSegments(idealState, targetConsumingSegments); + + Set segmentsAdded = Sets.newHashSetWithExpectedSize(numSegments); + Set currentBatch = Sets.newHashSetWithExpectedSize(batchSize); + Collection> instanceSegmentsCollection = instanceToConsumingSegments.values(); + + while (!instanceSegmentsCollection.isEmpty()) { + Iterator> instanceCollectionIterator = instanceSegmentsCollection.iterator(); + // Pick segments in round-robin fashion to parallelize forceCommit across max servers + while (instanceCollectionIterator.hasNext()) { + Queue consumingSegments = instanceCollectionIterator.next(); + String segmentName = consumingSegments.poll(); + if (consumingSegments.isEmpty()) { + instanceCollectionIterator.remove(); + } + if (!segmentsAdded.add(segmentName)) { + // There might be a segment replica hosted on another instance added before + continue; + } + currentBatch.add(segmentName); + if (currentBatch.size() == batchSize) { + segmentBatchList.add(currentBatch); + currentBatch = Sets.newHashSetWithExpectedSize(batchSize); + } + } + } + + if (!currentBatch.isEmpty()) { + segmentBatchList.add(currentBatch); + } + return segmentBatchList; + } + + @VisibleForTesting + Map> getInstanceToConsumingSegments(IdealState idealState, + Set targetConsumingSegments) { + Map> instanceToConsumingSegments = new HashMap<>(); + Map> segmentNameToInstanceToStateMap = idealState.getRecord().getMapFields(); + + for (String segmentName: targetConsumingSegments) { + Map instanceToStateMap = segmentNameToInstanceToStateMap.get(segmentName); + + for (Map.Entry instanceToState : instanceToStateMap.entrySet()) { + String instance = instanceToState.getKey(); + String state = instanceToState.getValue(); + if (state.equals(SegmentStateModel.CONSUMING)) { + instanceToConsumingSegments.computeIfAbsent(instance, k -> new LinkedList<>()).add(segmentName); + } + } + } + return instanceToConsumingSegments; + } + /** * Among all consuming segments, filter the ones that are in the given partitions or segments. */ diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java new file mode 100644 index 000000000000..862f19218f90 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/ForceCommitBatchConfigTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; + + +public class ForceCommitBatchConfigTest { + + @Test + public void testForceCommitBatchConfig() { + ForceCommitBatchConfig forceCommitBatchConfig = ForceCommitBatchConfig.of(Integer.MAX_VALUE, 5, 180); + assertEquals(forceCommitBatchConfig.getBatchSize(), Integer.MAX_VALUE); + assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000); + assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 180000); + + forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 5, 180); + assertEquals(forceCommitBatchConfig.getBatchSize(), 1); + assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 5000); + assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 180000); + + forceCommitBatchConfig = ForceCommitBatchConfig.of(1, 23, 37); + assertEquals(forceCommitBatchConfig.getBatchSize(), 1); + assertEquals(forceCommitBatchConfig.getBatchStatusCheckIntervalMs(), 23000); + assertEquals(forceCommitBatchConfig.getBatchStatusCheckTimeoutMs(), 37000); + + assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(0, 5, 180)); + assertThrows(IllegalArgumentException.class, () -> ForceCommitBatchConfig.of(32, 0, 0)); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index d5969e611f91..abcd75a2004a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -29,11 +29,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -1248,6 +1251,76 @@ public void testGetPartitionIds() Assert.assertEquals(partitionIds.size(), 2); } + @Test + public void testGetInstanceToConsumingSegments() { + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager realtimeSegmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + IdealState idealState = mock(IdealState.class); + Map> map = Map.of( + "seg0", Map.of("i1", "CONSUMING", "i4", "ONLINE"), + "seg1", Map.of("i2", "CONSUMING"), + "seg2", Map.of("i3", "CONSUMING", "i2", "OFFLINE"), + "seg3", Map.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"), + "seg4", Map.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING") + ); + + ZNRecord znRecord = mock(ZNRecord.class); + when(znRecord.getMapFields()).thenReturn(map); + when(idealState.getRecord()).thenReturn(znRecord); + // Use TreeSet to ensure ordering + Set targetConsumingSegment = new TreeSet<>(map.keySet()); + + Map> instanceToConsumingSegments = + realtimeSegmentManager.getInstanceToConsumingSegments(idealState, targetConsumingSegment); + assertEquals(instanceToConsumingSegments, Map.of( + "i1", new LinkedList<>(List.of("seg0", "seg4")), + "i2", new LinkedList<>(List.of("seg1", "seg3")), + "i3", new LinkedList<>(List.of("seg2", "seg3", "seg4")), + "i4", new LinkedList<>(List.of("seg3")), + "i5", new LinkedList<>(List.of("seg4")) + )); + } + + @Test + public void getSegmentBatchList() { + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager realtimeSegmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + IdealState idealState = mock(IdealState.class); + + Map> map = Map.of( + "seg0", Map.of("i1", "CONSUMING", "i4", "ONLINE"), + "seg1", Map.of("i2", "CONSUMING"), + "seg2", Map.of("i3", "CONSUMING", "i2", "OFFLINE"), + "seg3", Map.of("i4", "CONSUMING", "i2", "CONSUMING", "i3", "CONSUMING"), + "seg4", Map.of("i5", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"), + "seg5", Map.of("i6", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING"), + "seg6", Map.of("i7", "CONSUMING", "i1", "CONSUMING", "i3", "CONSUMING") + ); + + ZNRecord znRecord = mock(ZNRecord.class); + when(znRecord.getMapFields()).thenReturn(map); + when(idealState.getRecord()).thenReturn(znRecord); + // Use TreeSet to ensure ordering + Set targetConsumingSegment = new TreeSet<>(map.keySet()); + + List> segmentBatchList = + realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 2); + assertEquals(segmentBatchList, List.of( + Set.of("seg0", "seg1"), + Set.of("seg2", "seg3"), + Set.of("seg4", "seg5"), + Set.of("seg6") + )); + + segmentBatchList = realtimeSegmentManager.getSegmentBatchList(idealState, targetConsumingSegment, 4); + assertEquals(segmentBatchList, List.of( + Set.of("seg0", "seg1", "seg2", "seg3"), + Set.of("seg4", "seg5", "seg6") + )); + } + @Test public void getSegmentsYetToBeCommitted() { PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 17655506411b..02f8d1f659f6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -428,10 +428,22 @@ public void testForceCommit() throws Exception { Set consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); String jobId = forceCommit(getTableName()); + testForceCommitInternal(jobId, consumingSegments, 60000L); + } + + @Test + public void testForceCommitInBatches() + throws Exception { + Set consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); + String jobId = forceCommit(getTableName(), 1, 5, 210); + testForceCommitInternal(jobId, consumingSegments, 240000L); + } + + private void testForceCommitInternal(String jobId, Set consumingSegments, long timeoutMs) { Map jobMetadata = _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); assert jobMetadata != null; - assert jobMetadata.get("segmentsForceCommitted") != null; + assert jobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST) != null; TestUtils.waitForCondition(aVoid -> { try { @@ -446,7 +458,7 @@ public void testForceCommit() } catch (Exception e) { return false; } - }, 60000L, "Error verifying force commit operation on table!"); + }, timeoutMs, "Error verifying force commit operation on table!"); } public Set getConsumingSegmentsFromIdealState(String tableNameWithType) { @@ -492,6 +504,15 @@ private String forceCommit(String tableName) return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText(); } + private String forceCommit(String tableName, int batchSize, int batchIntervalSec, int batchTimeoutSec) + throws Exception { + String response = + sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName) + "?batchSize=" + batchSize + + "&batchStatusCheckIntervalSec=" + batchIntervalSec + "&batchStatusCheckTimeoutSec=" + batchTimeoutSec, + null); + return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText(); + } + @Test @Override public void testHardcodedServerPartitionedSqlQueries() diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java index 2f7904edc8fa..8ed1b5c2fa95 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptFailureException.java @@ -26,12 +26,29 @@ * either operation throwing an exception or running out of attempts. */ public class AttemptFailureException extends Exception { + private final int _attempts; public AttemptFailureException(String message) { super(message); + _attempts = 0; + } + + public AttemptFailureException(String message, int attempts) { + super(message); + _attempts = attempts; } public AttemptFailureException(Throwable cause) { super(cause); + _attempts = 0; + } + + public AttemptFailureException(Throwable cause, int attempts) { + super(cause); + _attempts = attempts; + } + + public int getAttempts() { + return _attempts; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java index c710aa1e72c3..7e4521b517c9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/AttemptsExceededException.java @@ -24,18 +24,11 @@ */ public class AttemptsExceededException extends AttemptFailureException { - private int _attempts = 0; - public AttemptsExceededException(String message) { super(message); } public AttemptsExceededException(String message, int attempts) { - super(message); - _attempts = attempts; - } - - public int getAttempts() { - return _attempts; + super(message, attempts); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java index 23385e85bb2f..380bed123ffc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/RetriableOperationException.java @@ -23,18 +23,11 @@ */ public class RetriableOperationException extends AttemptFailureException { - private int _attempts = 0; - - public int getAttempts() { - return _attempts; - } - public RetriableOperationException(Throwable cause) { super(cause); } public RetriableOperationException(Throwable cause, int attempts) { - super(cause); - _attempts = attempts; + super(cause, attempts); } }