From 55b91da4d63e333bb98a70d917f70b2d196ce1f5 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Wed, 20 Mar 2024 10:51:44 +0100 Subject: [PATCH] WIP Add RequestInfo Delay Calculator Implementation of exponential backoff. Idea is to start with a minimum delay on the first time-out or circuit breaker activation. If the next such event happens within twice the last delay after the previous event, double the delay until a maximum delay is reached. Use the maximum delay from then on, until a sufficiently long period (maximum delay) without an event happens. Than the delay is reset to minimum. TODO: Make minimum and maximum delay configurable. Signed-off-by: Karsten Schnitter --- .../GrpcRequestExceptionHandler.java | 13 ++-- .../dataprepper/GrpcRetryInfoCalculator.java | 48 +++++++++++++++ .../GrpcRequestExceptionHandlerTest.java | 23 +++---- .../GrpcRetryInfoCalculatorTest.java | 60 +++++++++++++++++++ 4 files changed, 119 insertions(+), 25 deletions(-) create mode 100644 data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java create mode 100644 data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java index 7e79b5c1dc..426fbddaf5 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java @@ -6,8 +6,6 @@ package org.opensearch.dataprepper; import com.google.protobuf.Any; -import com.google.protobuf.Duration; -import com.google.rpc.RetryInfo; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction; @@ -23,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.concurrent.TimeoutException; public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction { @@ -38,12 +37,14 @@ public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFu private final Counter badRequestsCounter; private final Counter requestsTooLargeCounter; private final Counter internalServerErrorCounter; + private final GrpcRetryInfoCalculator retryInfoCalculator; public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) { requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); + retryInfoCalculator = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(2)); } @Override @@ -81,14 +82,8 @@ private com.google.rpc.Status createStatus(final Throwable e, final Status.Code builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage()); } if (code == Status.Code.RESOURCE_EXHAUSTED) { - builder.addDetails(Any.pack(createRetryInfo())); + builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo())); } return builder.build(); } - - // TODO: Implement logic for the responsse retry delay to be sent with the retry info - private RetryInfo createRetryInfo() { - Duration.Builder duration = Duration.newBuilder().setSeconds(0L).setNanos(100_000_000); - return RetryInfo.newBuilder().setRetryDelay(duration).build(); - } } diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java new file mode 100644 index 0000000000..1211e5e8ba --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java @@ -0,0 +1,48 @@ +package org.opensearch.dataprepper; + +import com.google.rpc.RetryInfo; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +class GrpcRetryInfoCalculator { + + private final Duration minimumDelay; + private final Duration maximumDelay; + + private final AtomicReference lastTimeCalled; + private final AtomicReference nextDelay; + + GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) { + this.minimumDelay = minimumDelay; + this.maximumDelay = maximumDelay; + this.lastTimeCalled = new AtomicReference<>(Instant.now()); + this.nextDelay = new AtomicReference<>(minimumDelay); + } + + private static RetryInfo createProtoResult(Duration delay) { + return RetryInfo.newBuilder().setRetryDelay(mapDuration(delay)).build(); + } + + private static Duration minDuration(Duration left, Duration right) { + return left.compareTo(right) <= 0 ? left : right; + } + + private static com.google.protobuf.Duration.Builder mapDuration(Duration duration) { + return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano()); + } + + RetryInfo createRetryInfo() { + Instant now = Instant.now(); + // Is the last time we got called longer ago than the next delay? + if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) { + // Use minimum delay and reset the saved delay + nextDelay.set(minimumDelay); + return createProtoResult(minimumDelay); + } + Duration delay = nextDelay.getAndUpdate(d -> minDuration(maximumDelay, d.multipliedBy(2))); + return createProtoResult(delay); + } + +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java index a23c8f3a20..2dc3310196 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper; -import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Any; import com.google.rpc.RetryInfo; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.server.RequestTimeoutException; @@ -27,13 +27,14 @@ import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import java.io.IOException; -import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -110,21 +111,11 @@ public void testHandleTimeoutException() { verify(requestTimeoutsCounter, times(2)).increment(); - // TODO: Adjust to retry delay logic verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture()); - assertThat(status.getValue().getDetailsCount(), equalTo(1)); - - status.getAllValues().stream().map(com.google.rpc.Status::getDetailsList).flatMap(List::stream).map(e -> { - try { - return e.unpack( - RetryInfo.class); - } catch (InvalidProtocolBufferException ex) { - throw new AssertionError("unxepected status detail item",ex); - } - }).forEach(info -> { - assertThat(info.getRetryDelay().getSeconds(), equalTo(0L)); - assertThat(info.getRetryDelay().getNanos(), equalTo(100_000_000)); - }); + for (com.google.rpc.Status currentStatus: status.getAllValues()) { + Optional retryInfo = currentStatus.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst(); + assertTrue(retryInfo.isPresent(), "No RetryInfo at status:\n" + currentStatus.toString()); + } } @Test diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java new file mode 100644 index 0000000000..5848330f08 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java @@ -0,0 +1,60 @@ +package org.opensearch.dataprepper; + +import com.google.rpc.RetryInfo; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class GrpcRetryInfoCalculatorTest { + + @Test + public void testMinimumDelayOnFirstCall() { + RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo(); + + assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000)); + assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L)); + } + + @Test + public void testExponentialBackoff() { + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10)); + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + RetryInfo retryInfo3 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L)); + assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(2L)); + assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(4L)); + } + + @Test + public void testUsesMaximumAsLongestDelay() { + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2)); + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + RetryInfo retryInfo3 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L)); + assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(2L)); + assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L)); + } + + @Test + public void testResetAfterDelayWearsOff() throws InterruptedException { + GrpcRetryInfoCalculator calculator = + new GrpcRetryInfoCalculator(Duration.ofNanos(1_000_000), Duration.ofSeconds(1)); + RetryInfo retryInfo1 = calculator.createRetryInfo(); + RetryInfo retryInfo2 = calculator.createRetryInfo(); + Thread.sleep(5); + RetryInfo retryInfo3 = calculator.createRetryInfo(); + + assertThat(retryInfo1.getRetryDelay().getNanos(), equalTo(1_000_000)); + assertThat(retryInfo2.getRetryDelay().getNanos(), equalTo(2_000_000)); + assertThat(retryInfo3.getRetryDelay().getNanos(), equalTo(1_000_000)); + } +}