Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improve exception handling in S3BlobContainer synchronous operations #17056

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -373,17 +374,31 @@
}

@Override
public DeleteResult delete() {
public DeleteResult delete() throws IOException {
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
deleteAsync(future);
return future.actionGet();
return getFutureValue(future);

Check warning on line 380 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L380

Added line #L380 was not covered by tests
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
PlainActionFuture<Void> future = new PlainActionFuture<>();
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
future.actionGet();
getFutureValue(future);
}

Check warning on line 388 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L387-L388

Added lines #L387 - L388 were not covered by tests

private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
try {
return future.get();

Check warning on line 392 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L392

Added line #L392 was not covered by tests
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new RuntimeException(e.getCause());

Check warning on line 400 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L400

Added line #L400 was not covered by tests
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,116 @@ public void onFailure(Exception e) {
assertEquals(simulatedFailure, exceptionRef.get().getCause());
}

public void testDeleteWithInterruptedException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

// Mock the list operation to block indefinitely
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Thread.currentThread().interrupt();
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IllegalStateException e = expectThrows(IllegalStateException.class, blobContainer::delete);
assertEquals("Future got interrupted", e.getMessage());
assertTrue(Thread.interrupted()); // Clear interrupted state
}

public void testDeleteWithExecutionException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

RuntimeException simulatedError = new RuntimeException("Simulated error");
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onError(simulatedError);
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IOException e = expectThrows(IOException.class, blobContainer::delete);
assertEquals("Failed to list objects for deletion", e.getMessage());
assertEquals(simulatedError, e.getCause());
}

public void testDeleteBlobsIgnoringIfNotExistsWithInterruptedException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(5);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

// Mock deleteObjects to block indefinitely
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenAnswer(invocation -> {
Thread.currentThread().interrupt();
return null;
});

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
List<String> blobNames = Arrays.asList("test1", "test2");

IllegalStateException e = expectThrows(IllegalStateException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
assertEquals("Future got interrupted", e.getMessage());
assertTrue(Thread.interrupted()); // Clear interrupted state
}

public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(5);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

RuntimeException simulatedError = new RuntimeException("Simulated delete error");
CompletableFuture<DeleteObjectsResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(simulatedError);
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
List<String> blobNames = Arrays.asList("test1", "test2");

IOException e = expectThrows(IOException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
assertEquals("Failed to delete blobs " + blobNames, e.getMessage());
assertEquals(simulatedError, e.getCause().getCause());
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down
Loading