From 74e25e3a284463526e3f8ea12b3a723b8a323b49 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 13 Jun 2024 15:02:26 -0500 Subject: [PATCH 01/10] Removed local cache in S3 context --- .../extensions/s3/S3ChannelContext.java | 77 +++++++------------ .../extensions/s3/S3Instructions.java | 27 +------ .../io/deephaven/extensions/s3/S3Request.java | 6 +- .../extensions/s3/S3RequestCache.java | 22 ++++++ .../extensions/s3/S3InstructionsTest.java | 25 ------ py/server/deephaven/experimental/s3.py | 6 -- 6 files changed, 54 insertions(+), 109 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index 15bbed956c9..1aecb356347 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -48,12 +48,6 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka */ final S3RequestCache sharedCache; - /** - * Used to cache recently fetched fragments as well as the ownership token for the request. This cache is local to - * the context and is used to keep the requests alive as long as the context is alive. - */ - private final S3Request.AcquiredRequest[] localCache; - /** * The size of the object in bytes, stored in context to avoid fetching multiple times */ @@ -72,7 +66,6 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka this.provider = Objects.requireNonNull(provider); this.client = Objects.requireNonNull(client); this.instructions = Objects.requireNonNull(instructions); - this.localCache = new S3Request.AcquiredRequest[instructions.maxCacheSize()]; this.sharedCache = sharedCache; if (sharedCache.getFragmentSize() != instructions.fragmentSize()) { throw new IllegalArgumentException("Fragment size mismatch between shared cache and instructions, " @@ -121,22 +114,27 @@ int fill(final long position, final ByteBuffer dest) throws IOException { final int impliedReadAhead = (int) (lastFragmentIx - firstFragmentIx); final int desiredReadAhead = instructions.readAheadCount(); final long totalRemainingFragments = numFragments - firstFragmentIx - 1; - final int maxReadAhead = instructions.maxCacheSize() - 1; - readAhead = Math.min( - Math.max(impliedReadAhead, desiredReadAhead), - (int) Math.min(maxReadAhead, totalRemainingFragments)); + readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments); } - final S3Request firstRequest = getOrCreateRequest(firstFragmentIx); + // Hold a reference to the first request to ensure it is not evicted from the cache + S3Request.AcquiredRequest acquiredFirstRequest = getOrCreateRequest(firstFragmentIx); for (int i = 0; i < readAhead; ++i) { + // Do not hold references to the read ahead requests getOrCreateRequest(firstFragmentIx + i + 1); } // blocking - int filled = firstRequest.fill(position, dest); + int filled = acquiredFirstRequest.request.fill(position, dest); + acquiredFirstRequest.release(); + acquiredFirstRequest = null; + for (int i = 0; dest.hasRemaining(); ++i) { - // Since we have already created requests for read ahead fragments, we can retrieve them from the local - // cache - final S3Request request = getRequestFromLocalCache(firstFragmentIx + i + 1); - if (request == null || !request.isDone()) { + final S3Request.AcquiredRequest acquiredReadAheadRequest = + getRequestFromSharedCache(firstFragmentIx + i + 1); + if (acquiredReadAheadRequest == null) { + break; + } + final S3Request request = acquiredReadAheadRequest.request; + if (!request.isDone()) { break; } // non-blocking since we know isDone @@ -146,7 +144,6 @@ int fill(final long position, final ByteBuffer dest) throws IOException { } private void reset() { - releaseOutstanding(); // Reset the internal state uri = null; size = UNINITIALIZED_SIZE; @@ -162,49 +159,27 @@ public void close() { if (log.isDebugEnabled()) { log.debug().append("Closing context: ").append(ctxStr()).endl(); } - releaseOutstanding(); - } - - /** - * Release all outstanding requests associated with this context. Eventually, the request will be canceled when the - * objects are garbage collected. - */ - private void releaseOutstanding() { - Arrays.fill(localCache, null); } // -------------------------------------------------------------------------------------------------- @Nullable - private S3Request getRequestFromLocalCache(final long fragmentIndex) { - return getRequestFromLocalCache(fragmentIndex, cacheIndex(fragmentIndex)); - } - - @Nullable - private S3Request getRequestFromLocalCache(final long fragmentIndex, final int cacheIdx) { - if (localCache[cacheIdx] != null && localCache[cacheIdx].request.isFragment(fragmentIndex)) { - return localCache[cacheIdx].request; + private S3Request.AcquiredRequest getRequestFromSharedCache(final long fragmentIndex) { + final S3Request.AcquiredRequest cachedRequest = sharedCache.getRequest(uri, fragmentIndex); + if (cachedRequest == null) { + return null; } - return null; + // Send the request, if not sent already. The following method is idempotent, so we always call it. + cachedRequest.request.sendRequest(); + return cachedRequest; } @NotNull - private S3Request getOrCreateRequest(final long fragmentIndex) { - final int cacheIdx = cacheIndex(fragmentIndex); - final S3Request locallyCached = getRequestFromLocalCache(fragmentIndex, cacheIdx); - if (locallyCached != null) { - return locallyCached; - } - final S3Request.AcquiredRequest sharedCacheRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this); - // Cache the request and the ownership token locally - localCache[cacheIdx] = sharedCacheRequest; + private S3Request.AcquiredRequest getOrCreateRequest(final long fragmentIndex) { + final S3Request.AcquiredRequest cachedRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this); // Send the request, if not sent already. The following method is idempotent, so we always call it. - sharedCacheRequest.request.sendRequest(); - return sharedCacheRequest.request; - } - - private int cacheIndex(final long fragmentIndex) { - return (int) (fragmentIndex % instructions.maxCacheSize()); + cachedRequest.request.sendRequest(); + return cachedRequest; } private long fragmentIndex(final long pos) { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java index 7560e9377df..27d313a235c 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java @@ -28,7 +28,6 @@ public abstract class S3Instructions implements LogOutputAppendable { private final static int DEFAULT_READ_AHEAD_COUNT = 32; private final static int DEFAULT_FRAGMENT_SIZE = 1 << 16; // 64 KiB private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB - private final static int DEFAULT_MAX_CACHE_SIZE = 256; private final static Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2); private final static Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2); @@ -73,17 +72,6 @@ public int fragmentSize() { return DEFAULT_FRAGMENT_SIZE; } - /** - * The maximum number of fragments to cache in memory, defaults to - * {@code Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE)}, which is at least - * {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is done at the deephaven layer for faster access to recently read - * fragments. Must be greater than or equal to {@code 1 + readAheadCount()}. - */ - @Default - public int maxCacheSize() { - return Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE); - } - /** * The amount of time to wait when initially establishing a connection before giving up and timing out, defaults to * 2 seconds. @@ -133,8 +121,6 @@ public interface Builder { Builder fragmentSize(int fragmentSize); - Builder maxCacheSize(int maxCacheSize); - Builder connectionTimeout(Duration connectionTimeout); Builder readTimeout(Duration connectionTimeout); @@ -152,13 +138,10 @@ default Builder endpointOverride(String endpointOverride) { abstract S3Instructions withReadAheadCount(int readAheadCount); - abstract S3Instructions withMaxCacheSize(int maxCacheSize); - @Lazy S3Instructions singleUse() { final int readAheadCount = Math.min(DEFAULT_READ_AHEAD_COUNT, readAheadCount()); - return withReadAheadCount(readAheadCount) - .withMaxCacheSize(readAheadCount + 1); + return withReadAheadCount(readAheadCount); } @Check @@ -183,14 +166,6 @@ final void boundsCheckMinFragmentSize() { } } - @Check - final void boundsCheckMaxCacheSize() { - if (maxCacheSize() < readAheadCount() + 1) { - throw new IllegalArgumentException("maxCacheSize(=" + maxCacheSize() + ") must be >= 1 + " + - "readAheadCount(=" + readAheadCount() + ")"); - } - } - @Check final void awsSdkV2Credentials() { if (!(credentials() instanceof AwsSdkV2Credentials)) { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index d031b5c8c11..cc8a0ec94c6 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -49,12 +49,16 @@ static class AcquiredRequest { * The ownership token keeps the request alive. When the ownership token is GC'd, the request is no longer * usable and will be cleaned up. */ - final Object ownershipToken; + Object ownershipToken; AcquiredRequest(final S3Request request, final Object ownershipToken) { this.request = request; this.ownershipToken = ownershipToken; } + + void release() { + ownershipToken = null; + } } /** diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java index 9f3fddd33a8..9f768d4e9ca 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java @@ -8,6 +8,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import software.amazon.awssdk.services.s3.S3Uri; /** @@ -46,6 +47,27 @@ int getFragmentSize() { return fragmentSize; } + /** + * Acquire a request for the given URI and fragment index if it already exists in the cache. + * + * @param uri the URI + * @param fragmentIndex the fragment index + * @return the request + */ + @Nullable + S3Request.AcquiredRequest getRequest(@NotNull final S3Uri uri, final long fragmentIndex) { + final S3Request.ID key = new S3Request.ID(uri, fragmentIndex); + final S3Request existingRequest = requests.get(key); + if (existingRequest != null) { + final S3Request.AcquiredRequest acquired = existingRequest.tryAcquire(); + if (acquired != null) { + return acquired; + } + remove(existingRequest); + } + return null; + } + /** * Acquire a request for the given URI and fragment index, creating and sending a new request it if it does not * exist in the cache. diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java index ef9e70400a0..521bc02f6be 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java @@ -19,7 +19,6 @@ void defaults() { assertThat(instructions.maxConcurrentRequests()).isEqualTo(256); assertThat(instructions.readAheadCount()).isEqualTo(32); assertThat(instructions.fragmentSize()).isEqualTo(65536); - assertThat(instructions.maxCacheSize()).isEqualTo(256); assertThat(instructions.connectionTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(instructions.readTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(instructions.credentials()).isEqualTo(Credentials.defaultCredentials()); @@ -102,30 +101,6 @@ void tooSmallFragmentSize() { } } - @Test - void minMaxCacheSize() { - assertThat(S3Instructions.builder() - .regionName("some-region") - .readAheadCount(99) - .maxCacheSize(100) - .build() - .maxCacheSize()) - .isEqualTo(100); - } - - @Test - void tooSmallCacheSize() { - try { - S3Instructions.builder() - .regionName("some-region") - .readAheadCount(99) - .maxCacheSize(99) - .build(); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessageContaining("maxCacheSize"); - } - } - @Test void basicCredentials() { assertThat(S3Instructions.builder() diff --git a/py/server/deephaven/experimental/s3.py b/py/server/deephaven/experimental/s3.py index 7065d519f5c..c19a381b4d0 100644 --- a/py/server/deephaven/experimental/s3.py +++ b/py/server/deephaven/experimental/s3.py @@ -38,7 +38,6 @@ def __init__(self, max_concurrent_requests: Optional[int] = None, read_ahead_count: Optional[int] = None, fragment_size: Optional[int] = None, - max_cache_size: Optional[int] = None, connection_timeout: Union[ Duration, int, str, datetime.timedelta, np.timedelta64, pd.Timedelta, None] = None, read_timeout: Union[ @@ -61,8 +60,6 @@ def __init__(self, fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the current fragment. fragment_size (int): the maximum size of each fragment to read, defaults to 64 KiB. If there are fewer bytes remaining in the file, the fetched fragment can be smaller. - max_cache_size (int): the maximum number of fragments to cache in memory while reading, defaults to 256. This - caching is done at the Deephaven layer for faster access to recently read fragments. connection_timeout (Union[Duration, int, str, datetime.timedelta, np.timedelta64, pd.Timedelta]): the amount of time to wait when initially establishing a connection before giving up and timing out, can be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or @@ -103,9 +100,6 @@ def __init__(self, if fragment_size is not None: builder.fragmentSize(fragment_size) - if max_cache_size is not None: - builder.maxCacheSize(max_cache_size) - if connection_timeout is not None: builder.connectionTimeout(time.to_j_duration(connection_timeout)) From 43e7315c34036c1175e53178fd677c853a232116 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 13 Jun 2024 17:43:32 -0500 Subject: [PATCH 02/10] Improved holding/releasing acquired requests --- .../deephaven/extensions/s3/S3ChannelContext.java | 15 +++++++++------ .../io/deephaven/extensions/s3/S3Request.java | 6 +----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index 1aecb356347..d3dcf372bb6 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -16,7 +16,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -36,6 +35,11 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka final S3AsyncClient client; final S3Instructions instructions; + /** + * Used to temporarily hold a request to prevent it from being GC'd. + */ + S3Request.AcquiredRequest acquiredRequest; + /** * The URI associated with this context. A single context object can only be associated with a single URI at a time. * But it can be re-associated with a different URI after {@link #reset() resetting}. @@ -117,15 +121,14 @@ int fill(final long position, final ByteBuffer dest) throws IOException { readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments); } // Hold a reference to the first request to ensure it is not evicted from the cache - S3Request.AcquiredRequest acquiredFirstRequest = getOrCreateRequest(firstFragmentIx); + acquiredRequest = getOrCreateRequest(firstFragmentIx); for (int i = 0; i < readAhead; ++i) { - // Do not hold references to the read ahead requests + // Do not hold references to the read-ahead requests getOrCreateRequest(firstFragmentIx + i + 1); } // blocking - int filled = acquiredFirstRequest.request.fill(position, dest); - acquiredFirstRequest.release(); - acquiredFirstRequest = null; + int filled = acquiredRequest.request.fill(position, dest); + acquiredRequest = null; for (int i = 0; dest.hasRemaining(); ++i) { final S3Request.AcquiredRequest acquiredReadAheadRequest = diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index cc8a0ec94c6..d031b5c8c11 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -49,16 +49,12 @@ static class AcquiredRequest { * The ownership token keeps the request alive. When the ownership token is GC'd, the request is no longer * usable and will be cleaned up. */ - Object ownershipToken; + final Object ownershipToken; AcquiredRequest(final S3Request request, final Object ownershipToken) { this.request = request; this.ownershipToken = ownershipToken; } - - void release() { - ownershipToken = null; - } } /** From da88de951e67a35b59e41fed4feb2f74203f1d0c Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 14 Jun 2024 09:40:06 -0500 Subject: [PATCH 03/10] Resolving comments --- .../extensions/s3/S3ChannelContext.java | 25 +++++----- .../io/deephaven/extensions/s3/S3Request.java | 46 ++++++++++++++----- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index d3dcf372bb6..f8266a77077 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -31,14 +31,17 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka static final long UNINITIALIZED_SIZE = -1; private static final long UNINITIALIZED_NUM_FRAGMENTS = -1; - final S3SeekableChannelProvider provider; + private final S3SeekableChannelProvider provider; final S3AsyncClient client; final S3Instructions instructions; /** * Used to temporarily hold a request to prevent it from being GC'd. + *

+ * This field should NOT be converted to a local variable, so that it is not optimized away by the compiler, + * and the request is not prematurely GC'd. */ - S3Request.AcquiredRequest acquiredRequest; + private S3Request.AcquiredRequest acquiredRequest; /** * The URI associated with this context. A single context object can only be associated with a single URI at a time. @@ -127,21 +130,17 @@ int fill(final long position, final ByteBuffer dest) throws IOException { getOrCreateRequest(firstFragmentIx + i + 1); } // blocking - int filled = acquiredRequest.request.fill(position, dest); - acquiredRequest = null; + int filled = acquiredRequest.fill(position, dest); + acquiredRequest = null; // release the reference for (int i = 0; dest.hasRemaining(); ++i) { - final S3Request.AcquiredRequest acquiredReadAheadRequest = + final S3Request.AcquiredRequest readAheadRequest = getRequestFromSharedCache(firstFragmentIx + i + 1); - if (acquiredReadAheadRequest == null) { - break; - } - final S3Request request = acquiredReadAheadRequest.request; - if (!request.isDone()) { + if (readAheadRequest == null || !readAheadRequest.isDone()) { break; } // non-blocking since we know isDone - filled += request.fill(position + filled, dest); + filled += readAheadRequest.fill(position + filled, dest); } return filled; } @@ -173,7 +172,7 @@ private S3Request.AcquiredRequest getRequestFromSharedCache(final long fragmentI return null; } // Send the request, if not sent already. The following method is idempotent, so we always call it. - cachedRequest.request.sendRequest(); + cachedRequest.sendRequest(); return cachedRequest; } @@ -181,7 +180,7 @@ private S3Request.AcquiredRequest getRequestFromSharedCache(final long fragmentI private S3Request.AcquiredRequest getOrCreateRequest(final long fragmentIndex) { final S3Request.AcquiredRequest cachedRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this); // Send the request, if not sent already. The following method is idempotent, so we always call it. - cachedRequest.request.sendRequest(); + cachedRequest.sendRequest(); return cachedRequest; } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index d031b5c8c11..aa2df307f3c 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -36,14 +36,14 @@ * * @implNote This class extends from a {@link SoftReference < ByteBuffer >} and implements {@link CleanupReference} to * allow for cancelling the request once all references to the buffer have been released. Users should not - * access the buffer directly, but instead use the {@link #fill(long, ByteBuffer)} method. Also, users should + * access the buffer directly, but instead use the {@link AcquiredRequest#fill} method. Also, users should * hold instances of {@link AcquiredRequest} to keep the requests alive. */ final class S3Request extends SoftReference implements AsyncResponseTransformer, BiConsumer, CleanupReference { - static class AcquiredRequest { + static final class AcquiredRequest { final S3Request request; /** * The ownership token keeps the request alive. When the ownership token is GC'd, the request is no longer @@ -55,6 +55,28 @@ static class AcquiredRequest { this.request = request; this.ownershipToken = ownershipToken; } + + /** + * Send the request to the S3 service. This method is idempotent and can be called multiple times. + */ + void sendRequest() { + request.sendRequestImpl(); + } + + /** + * Fill the provided buffer with data from this request, starting at the given local position. Returns the + * number of bytes filled. + */ + int fill(long localPosition, ByteBuffer dest) throws IOException { + return request.fillImpl(localPosition, dest); + } + + /** + * Are we done fetching the data for this request? + */ + boolean isDone() { + return request.isDone(); + } } /** @@ -159,9 +181,11 @@ AcquiredRequest tryAcquire() { } /** - * Send the request to the S3 service. This method is idempotent and can be called multiple times. + * Send the request to the S3 service. This method is idempotent and can be called multiple times. Note that the + * request must be acquired before calling this method. Therefore, this method should only be called from inside the + * {@link AcquiredRequest#sendRequest()} method. */ - void sendRequest() { + private void sendRequestImpl() { if (consumerFuture == null) { synchronized (this) { if (consumerFuture == null) { @@ -176,15 +200,16 @@ void sendRequest() { } } - boolean isDone() { + private boolean isDone() { return consumerFuture.isDone(); } /** * Fill the provided buffer with data from this request, starting at the given local position. Returns the number of - * bytes filled. Note that the request must be acquired before calling this method. + * bytes filled. Note that the request must be acquired before calling this method. Therefore, this method should + * only be called from inside the {@link AcquiredRequest#fill(long, ByteBuffer)} method. */ - int fill(long localPosition, ByteBuffer dest) throws IOException { + private int fillImpl(long localPosition, ByteBuffer dest) throws IOException { if (get() == null) { throw new IllegalStateException(String.format("Trying to fill data after release, %s", requestStr())); } @@ -274,7 +299,8 @@ private ByteBuffer getFullFragment() throws ExecutionException, InterruptedExcep final long readNanos = instructions.readTimeout().plusMillis(100).toNanos(); final Boolean isComplete = consumerFuture.get(readNanos, TimeUnit.NANOSECONDS); if (!Boolean.TRUE.equals(isComplete)) { - throw new IllegalStateException(String.format("Failed to complete request %s", requestStr())); + throw new IllegalStateException(String.format("Failed to complete request %s, probably because the " + + "underlying buffer got freed while completing the request", requestStr())); } final ByteBuffer result = get(); if (result == null) { @@ -289,10 +315,6 @@ private ByteBuffer getFullFragment() throws ExecutionException, InterruptedExcep return result; } - boolean isFragment(final long fragmentIndex) { - return this.fragmentIndex == fragmentIndex; - } - private int requestLength() { return (int) (to - from + 1); } From 9af2bf145bcd69a35b321b69b4c778b1f4e62a1d Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 14 Jun 2024 10:40:15 -0500 Subject: [PATCH 04/10] Test commit causing Cancellation Exception --- .../deephaven/extensions/s3/S3ChannelContext.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index f8266a77077..76aff6f322e 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -35,14 +35,6 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka final S3AsyncClient client; final S3Instructions instructions; - /** - * Used to temporarily hold a request to prevent it from being GC'd. - *

- * This field should NOT be converted to a local variable, so that it is not optimized away by the compiler, - * and the request is not prematurely GC'd. - */ - private S3Request.AcquiredRequest acquiredRequest; - /** * The URI associated with this context. A single context object can only be associated with a single URI at a time. * But it can be re-associated with a different URI after {@link #reset() resetting}. @@ -124,14 +116,14 @@ int fill(final long position, final ByteBuffer dest) throws IOException { readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments); } // Hold a reference to the first request to ensure it is not evicted from the cache - acquiredRequest = getOrCreateRequest(firstFragmentIx); + S3Request.AcquiredRequest firstRequest = getOrCreateRequest(firstFragmentIx); for (int i = 0; i < readAhead; ++i) { // Do not hold references to the read-ahead requests getOrCreateRequest(firstFragmentIx + i + 1); } // blocking - int filled = acquiredRequest.fill(position, dest); - acquiredRequest = null; // release the reference + int filled = firstRequest.fill(position, dest); + firstRequest = null; // release the reference for (int i = 0; dest.hasRemaining(); ++i) { final S3Request.AcquiredRequest readAheadRequest = From 3d6ec5d290363ac2d0cf4fce7f417c4cc9884b84 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 14 Jun 2024 10:40:36 -0500 Subject: [PATCH 05/10] Revert "Test commit causing Cancellation Exception" This reverts commit 9af2bf145bcd69a35b321b69b4c778b1f4e62a1d. --- .../deephaven/extensions/s3/S3ChannelContext.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index 76aff6f322e..f8266a77077 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -35,6 +35,14 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka final S3AsyncClient client; final S3Instructions instructions; + /** + * Used to temporarily hold a request to prevent it from being GC'd. + *

+ * This field should NOT be converted to a local variable, so that it is not optimized away by the compiler, + * and the request is not prematurely GC'd. + */ + private S3Request.AcquiredRequest acquiredRequest; + /** * The URI associated with this context. A single context object can only be associated with a single URI at a time. * But it can be re-associated with a different URI after {@link #reset() resetting}. @@ -116,14 +124,14 @@ int fill(final long position, final ByteBuffer dest) throws IOException { readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments); } // Hold a reference to the first request to ensure it is not evicted from the cache - S3Request.AcquiredRequest firstRequest = getOrCreateRequest(firstFragmentIx); + acquiredRequest = getOrCreateRequest(firstFragmentIx); for (int i = 0; i < readAhead; ++i) { // Do not hold references to the read-ahead requests getOrCreateRequest(firstFragmentIx + i + 1); } // blocking - int filled = firstRequest.fill(position, dest); - firstRequest = null; // release the reference + int filled = acquiredRequest.fill(position, dest); + acquiredRequest = null; // release the reference for (int i = 0; dest.hasRemaining(); ++i) { final S3Request.AcquiredRequest readAheadRequest = From bb0d0d77368bb1c5499321157bc1227ecd5da59a Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 14 Jun 2024 11:12:19 -0500 Subject: [PATCH 06/10] Minor optimizations to readahead --- .../deephaven/extensions/s3/S3ChannelContext.java | 13 +------------ .../java/io/deephaven/extensions/s3/S3Request.java | 2 +- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index f8266a77077..0f69f356c04 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -135,7 +135,7 @@ int fill(final long position, final ByteBuffer dest) throws IOException { for (int i = 0; dest.hasRemaining(); ++i) { final S3Request.AcquiredRequest readAheadRequest = - getRequestFromSharedCache(firstFragmentIx + i + 1); + sharedCache.getRequest(uri, firstFragmentIx + i + 1); if (readAheadRequest == null || !readAheadRequest.isDone()) { break; } @@ -165,17 +165,6 @@ public void close() { // -------------------------------------------------------------------------------------------------- - @Nullable - private S3Request.AcquiredRequest getRequestFromSharedCache(final long fragmentIndex) { - final S3Request.AcquiredRequest cachedRequest = sharedCache.getRequest(uri, fragmentIndex); - if (cachedRequest == null) { - return null; - } - // Send the request, if not sent already. The following method is idempotent, so we always call it. - cachedRequest.sendRequest(); - return cachedRequest; - } - @NotNull private S3Request.AcquiredRequest getOrCreateRequest(final long fragmentIndex) { final S3Request.AcquiredRequest cachedRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index aa2df307f3c..d8db4a54dbb 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -201,7 +201,7 @@ private void sendRequestImpl() { } private boolean isDone() { - return consumerFuture.isDone(); + return consumerFuture != null && consumerFuture.isDone(); } /** From c841c516eb150cb0f2c23b377dacafcfa41454c5 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 20 Jun 2024 16:04:14 -0500 Subject: [PATCH 07/10] Merged with Devin's patch --- .../extensions/s3/S3ChannelContext.java | 27 +-- .../io/deephaven/extensions/s3/S3Request.java | 180 ++++++++---------- .../extensions/s3/S3RequestCache.java | 5 +- 3 files changed, 97 insertions(+), 115 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index 0f69f356c04..1e5ab3b4b42 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -35,14 +35,6 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka final S3AsyncClient client; final S3Instructions instructions; - /** - * Used to temporarily hold a request to prevent it from being GC'd. - *

- * This field should NOT be converted to a local variable, so that it is not optimized away by the compiler, - * and the request is not prematurely GC'd. - */ - private S3Request.AcquiredRequest acquiredRequest; - /** * The URI associated with this context. A single context object can only be associated with a single URI at a time. * But it can be re-associated with a different URI after {@link #reset() resetting}. @@ -123,16 +115,17 @@ int fill(final long position, final ByteBuffer dest) throws IOException { final long totalRemainingFragments = numFragments - firstFragmentIx - 1; readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments); } - // Hold a reference to the first request to ensure it is not evicted from the cache - acquiredRequest = getOrCreateRequest(firstFragmentIx); - for (int i = 0; i < readAhead; ++i) { - // Do not hold references to the read-ahead requests - getOrCreateRequest(firstFragmentIx + i + 1); + int filled; + { + // Hold a reference to the first request to ensure it is not evicted from the cache + final S3Request.AcquiredRequest acquiredRequest = getOrCreateRequest(firstFragmentIx); + for (int i = 0; i < readAhead; ++i) { + // Do not hold references to the read-ahead requests + getOrCreateRequest(firstFragmentIx + i + 1); + } + // blocking + filled = acquiredRequest.fill(position, dest); } - // blocking - int filled = acquiredRequest.fill(position, dest); - acquiredRequest = null; // release the reference - for (int i = 0; dest.hasRemaining(); ++i) { final S3Request.AcquiredRequest readAheadRequest = sharedCache.getRequest(uri, firstFragmentIx + i + 1); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index d8db4a54dbb..1338b21dc8e 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -43,42 +43,6 @@ final class S3Request extends SoftReference implements AsyncResponseTransformer, BiConsumer, CleanupReference { - static final class AcquiredRequest { - final S3Request request; - /** - * The ownership token keeps the request alive. When the ownership token is GC'd, the request is no longer - * usable and will be cleaned up. - */ - final Object ownershipToken; - - AcquiredRequest(final S3Request request, final Object ownershipToken) { - this.request = request; - this.ownershipToken = ownershipToken; - } - - /** - * Send the request to the S3 service. This method is idempotent and can be called multiple times. - */ - void sendRequest() { - request.sendRequestImpl(); - } - - /** - * Fill the provided buffer with data from this request, starting at the given local position. Returns the - * number of bytes filled. - */ - int fill(long localPosition, ByteBuffer dest) throws IOException { - return request.fillImpl(localPosition, dest); - } - - /** - * Are we done fetching the data for this request? - */ - boolean isDone() { - return request.isDone(); - } - } - /** * A unique identifier for a request, consisting of the URI and fragment index. */ @@ -130,8 +94,8 @@ public boolean equals(Object obj) { /** * Create a new request for the given fragment index using the provided context object. * - * @return A new {@link AcquiredRequest} object containing newly created request and an ownership token. The request - * will stay alive as long as the ownership token is held. + * @return A new {@link AcquiredRequest} object containing newly created request and hard reference to the + * underlying buffer. The request will stay alive as long as the buffer is held. * * @implNote This method does not cache the context because contexts are short-lived while a request may be cached. */ @@ -142,7 +106,7 @@ static AcquiredRequest createAndAcquire(final long fragmentIndex, @NotNull final final long requestLength = to - from + 1; final ByteBuffer buffer = ByteBuffer.allocate((int) requestLength); final S3Request request = new S3Request(fragmentIndex, context, buffer, from, to); - return new AcquiredRequest(request, buffer); + return request.acquire(buffer); } private S3Request(final long fragmentIndex, @NotNull final S3ChannelContext context, @@ -168,16 +132,20 @@ ID getId() { } /** - * Try to acquire a reference to this request and ownership token. Returns {@code null} if the token is already + * Try to acquire a reference to this request and underlying buffer. Returns {@code null} if the buffer is already * released. */ @Nullable AcquiredRequest tryAcquire() { - final Object token = get(); - if (token == null) { + final ByteBuffer acquiredBuffer = get(); + if (acquiredBuffer == null) { return null; } - return new AcquiredRequest(this, token); + return acquire(acquiredBuffer); + } + + private AcquiredRequest acquire(final ByteBuffer buffer) { + return new AcquiredRequest(buffer); } /** @@ -185,7 +153,7 @@ AcquiredRequest tryAcquire() { * request must be acquired before calling this method. Therefore, this method should only be called from inside the * {@link AcquiredRequest#sendRequest()} method. */ - private void sendRequestImpl() { + private void sendRequest() { if (consumerFuture == null) { synchronized (this) { if (consumerFuture == null) { @@ -200,40 +168,82 @@ private void sendRequestImpl() { } } - private boolean isDone() { - return consumerFuture != null && consumerFuture.isDone(); - } + class AcquiredRequest { + /** + * This instance keeps a hard reference to the buffer, which is needed to keep the request alive. When the + * buffer is GC'd, the request is no longer usable and will be available for {@link #cleanup()}. + */ + final ByteBuffer acquiredBuffer; - /** - * Fill the provided buffer with data from this request, starting at the given local position. Returns the number of - * bytes filled. Note that the request must be acquired before calling this method. Therefore, this method should - * only be called from inside the {@link AcquiredRequest#fill(long, ByteBuffer)} method. - */ - private int fillImpl(long localPosition, ByteBuffer dest) throws IOException { - if (get() == null) { - throw new IllegalStateException(String.format("Trying to fill data after release, %s", requestStr())); + AcquiredRequest(final ByteBuffer buffer) { + this.acquiredBuffer = buffer; + } + + /** + * Are we done fetching the data for this request? + */ + boolean isDone() { + return consumerFuture != null && consumerFuture.isDone(); } - final int resultOffset = (int) (localPosition - from); - final int resultLength = Math.min((int) (to - localPosition + 1), dest.remaining()); - final ByteBuffer fullFragment; - try { - fullFragment = getFullFragment().asReadOnlyBuffer(); - } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { - throw S3ChannelContext.handleS3Exception(e, String.format("fetching fragment %s", requestStr()), - instructions); + + /** + * Send the request to the S3 service. This method is idempotent and can be called multiple times. + */ + void sendRequest() { + S3Request.this.sendRequest(); } - // fullFragment has limit == capacity. This lets us have safety around math and the ability to simply - // clear to reset. - fullFragment.limit(resultOffset + resultLength); - fullFragment.position(resultOffset); - try { - dest.put(fullFragment); - } finally { - fullFragment.clear(); + + final S3Request getRequest() { + return S3Request.this; + } + + /** + * Fill the provided buffer with data from this request, starting at the given local position. Returns the + * number of bytes filled. + */ + int fill(long localPosition, ByteBuffer dest) throws IOException { + final int resultOffset = (int) (localPosition - from); + final int resultLength = Math.min((int) (to - localPosition + 1), dest.remaining()); + final ByteBuffer fullFragment = getFullFragment(); + // fullFragment has limit == capacity. This lets us have safety around math and the ability to simply + // clear to reset. + fullFragment.limit(resultOffset + resultLength); + fullFragment.position(resultOffset); + try { + dest.put(fullFragment); + } finally { + fullFragment.clear(); + } + ++fillCount; + fillBytes += resultLength; + return resultLength; + } + + private ByteBuffer getFullFragment() throws IOException { + // Giving our own get() a bit of overhead - the clients should already be constructed with appropriate + // apiCallTimeout. + final long readNanos = instructions.readTimeout().plusMillis(100).toNanos(); + final Boolean isComplete; + try { + isComplete = consumerFuture.get(readNanos, TimeUnit.NANOSECONDS); + } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { + throw S3ChannelContext.handleS3Exception(e, String.format("fetching fragment %s", requestStr()), + instructions); + } + if (!Boolean.TRUE.equals(isComplete)) { + throw new IllegalStateException(String.format("Failed to complete request %s, probably because the " + + "underlying buffer got freed while completing the request", requestStr())); + } + final ByteBuffer fullFragment = acquiredBuffer.asReadOnlyBuffer(); + if (fullFragment.position() != 0 || fullFragment.limit() != fullFragment.capacity() + || fullFragment.limit() != requestLength()) { + throw new IllegalStateException(String.format( + "Expected: pos=0, limit=%d, capacity=%d. Actual: pos=%d, limit=%d, capacity=%d", + requestLength(), requestLength(), fullFragment.position(), fullFragment.limit(), + fullFragment.capacity())); + } + return fullFragment; } - ++fillCount; - fillBytes += resultLength; - return resultLength; } @Override @@ -293,28 +303,6 @@ public void exceptionOccurred(Throwable error) { // -------------------------------------------------------------------------------------------------- - private ByteBuffer getFullFragment() throws ExecutionException, InterruptedException, TimeoutException { - // Giving our own get() a bit of overhead - the clients should already be constructed with appropriate - // apiCallTimeout. - final long readNanos = instructions.readTimeout().plusMillis(100).toNanos(); - final Boolean isComplete = consumerFuture.get(readNanos, TimeUnit.NANOSECONDS); - if (!Boolean.TRUE.equals(isComplete)) { - throw new IllegalStateException(String.format("Failed to complete request %s, probably because the " + - "underlying buffer got freed while completing the request", requestStr())); - } - final ByteBuffer result = get(); - if (result == null) { - throw new IllegalStateException( - String.format("Tried to read from no-longer-acquired Request, %s", requestStr())); - } - if (result.position() != 0 || result.limit() != result.capacity() || result.limit() != requestLength()) { - throw new IllegalStateException(String.format( - "Expected: pos=0, limit=%d, capacity=%d. Actual: pos=%d, limit=%d, capacity=%d", - requestLength(), requestLength(), result.position(), result.limit(), result.capacity())); - } - return result; - } - private int requestLength() { return (int) (to - from + 1); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java index 9f768d4e9ca..51b28bdff44 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java @@ -95,10 +95,11 @@ S3Request.AcquiredRequest getOrCreateRequest(@NotNull final S3Uri uri, final lon if (newAcquiredRequest == null) { newAcquiredRequest = S3Request.createAndAcquire(fragmentIndex, context); } - if ((existingRequest = requests.putIfAbsent(key, newAcquiredRequest.request)) == null) { + if ((existingRequest = requests.putIfAbsent(key, newAcquiredRequest.getRequest())) == null) { if (log.isDebugEnabled()) { log.debug().append("Added new request to cache: ").append(String.format("ctx=%d ", - System.identityHashCode(context))).append(newAcquiredRequest.request.requestStr()).endl(); + System.identityHashCode(context))).append(newAcquiredRequest.getRequest().requestStr()) + .endl(); } return newAcquiredRequest; } From 6a0da89e7b82e204d4e9fbe096a3b0ac12f5993e Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 21 Jun 2024 10:03:24 -0500 Subject: [PATCH 08/10] Cancel subscription when buffer gets freed --- .../s3/src/main/java/io/deephaven/extensions/s3/S3Request.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index 1338b21dc8e..e6e84b68b77 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -368,6 +368,7 @@ public void onNext(final ByteBuffer dataBuffer) { final ByteBuffer resultBuffer = S3Request.this.get(); if (resultBuffer == null) { localProducer.complete(false); + subscription.cancel(); return; } final int numBytes = dataBuffer.remaining(); From 1c1212090b4e560983753a4643b269fbef24bc96 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 21 Jun 2024 10:29:55 -0500 Subject: [PATCH 09/10] Removing some extra includes --- .../main/java/io/deephaven/extensions/s3/S3ChannelContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index 1e5ab3b4b42..e9335be34c3 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -8,7 +8,6 @@ import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.BaseSeekableChannelContext; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Uri; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; From 72c08f8155b16793d3885f1c0306e56448643b4f Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 21 Jun 2024 15:04:15 -0500 Subject: [PATCH 10/10] Renaming variables based on review --- .../extensions/s3/S3ChannelContext.java | 11 ++-- .../io/deephaven/extensions/s3/S3Request.java | 58 +++++++++---------- .../extensions/s3/S3RequestCache.java | 22 +++---- 3 files changed, 45 insertions(+), 46 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index e9335be34c3..fb58ee43632 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -117,7 +117,7 @@ int fill(final long position, final ByteBuffer dest) throws IOException { int filled; { // Hold a reference to the first request to ensure it is not evicted from the cache - final S3Request.AcquiredRequest acquiredRequest = getOrCreateRequest(firstFragmentIx); + final S3Request.Acquired acquiredRequest = getOrCreateRequest(firstFragmentIx); for (int i = 0; i < readAhead; ++i) { // Do not hold references to the read-ahead requests getOrCreateRequest(firstFragmentIx + i + 1); @@ -126,8 +126,7 @@ int fill(final long position, final ByteBuffer dest) throws IOException { filled = acquiredRequest.fill(position, dest); } for (int i = 0; dest.hasRemaining(); ++i) { - final S3Request.AcquiredRequest readAheadRequest = - sharedCache.getRequest(uri, firstFragmentIx + i + 1); + final S3Request.Acquired readAheadRequest = sharedCache.getRequest(uri, firstFragmentIx + i + 1); if (readAheadRequest == null || !readAheadRequest.isDone()) { break; } @@ -158,10 +157,10 @@ public void close() { // -------------------------------------------------------------------------------------------------- @NotNull - private S3Request.AcquiredRequest getOrCreateRequest(final long fragmentIndex) { - final S3Request.AcquiredRequest cachedRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this); + private S3Request.Acquired getOrCreateRequest(final long fragmentIndex) { + final S3Request.Acquired cachedRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this); // Send the request, if not sent already. The following method is idempotent, so we always call it. - cachedRequest.sendRequest(); + cachedRequest.send(); return cachedRequest; } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index e6e84b68b77..c5c2727f296 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -36,8 +36,8 @@ * * @implNote This class extends from a {@link SoftReference < ByteBuffer >} and implements {@link CleanupReference} to * allow for cancelling the request once all references to the buffer have been released. Users should not - * access the buffer directly, but instead use the {@link AcquiredRequest#fill} method. Also, users should - * hold instances of {@link AcquiredRequest} to keep the requests alive. + * access the buffer directly, but instead use the {@link Acquired#fill} method. Also, users should hold + * instances of {@link Acquired} to keep the requests alive. */ final class S3Request extends SoftReference implements AsyncResponseTransformer, BiConsumer, @@ -94,13 +94,13 @@ public boolean equals(Object obj) { /** * Create a new request for the given fragment index using the provided context object. * - * @return A new {@link AcquiredRequest} object containing newly created request and hard reference to the - * underlying buffer. The request will stay alive as long as the buffer is held. + * @return A new {@link Acquired} object containing newly created request and hard reference to the underlying + * buffer. The request will stay alive as long as the buffer is held. * * @implNote This method does not cache the context because contexts are short-lived while a request may be cached. */ @NotNull - static AcquiredRequest createAndAcquire(final long fragmentIndex, @NotNull final S3ChannelContext context) { + static Acquired createAndAcquire(final long fragmentIndex, @NotNull final S3ChannelContext context) { final long from = fragmentIndex * context.instructions.fragmentSize(); final long to = Math.min(from + context.instructions.fragmentSize(), context.size) - 1; final long requestLength = to - from + 1; @@ -136,7 +136,7 @@ ID getId() { * released. */ @Nullable - AcquiredRequest tryAcquire() { + Acquired tryAcquire() { final ByteBuffer acquiredBuffer = get(); if (acquiredBuffer == null) { return null; @@ -144,16 +144,16 @@ AcquiredRequest tryAcquire() { return acquire(acquiredBuffer); } - private AcquiredRequest acquire(final ByteBuffer buffer) { - return new AcquiredRequest(buffer); + private Acquired acquire(final ByteBuffer buffer) { + return new Acquired(buffer); } /** * Send the request to the S3 service. This method is idempotent and can be called multiple times. Note that the * request must be acquired before calling this method. Therefore, this method should only be called from inside the - * {@link AcquiredRequest#sendRequest()} method. + * {@link Acquired#send()} method. */ - private void sendRequest() { + private void sendImpl() { if (consumerFuture == null) { synchronized (this) { if (consumerFuture == null) { @@ -168,14 +168,14 @@ private void sendRequest() { } } - class AcquiredRequest { + class Acquired { /** * This instance keeps a hard reference to the buffer, which is needed to keep the request alive. When the * buffer is GC'd, the request is no longer usable and will be available for {@link #cleanup()}. */ - final ByteBuffer acquiredBuffer; + private final ByteBuffer acquiredBuffer; - AcquiredRequest(final ByteBuffer buffer) { + private Acquired(final ByteBuffer buffer) { this.acquiredBuffer = buffer; } @@ -189,11 +189,11 @@ boolean isDone() { /** * Send the request to the S3 service. This method is idempotent and can be called multiple times. */ - void sendRequest() { - S3Request.this.sendRequest(); + void send() { + sendImpl(); } - final S3Request getRequest() { + final S3Request request() { return S3Request.this; } @@ -204,22 +204,22 @@ final S3Request getRequest() { int fill(long localPosition, ByteBuffer dest) throws IOException { final int resultOffset = (int) (localPosition - from); final int resultLength = Math.min((int) (to - localPosition + 1), dest.remaining()); - final ByteBuffer fullFragment = getFullFragment(); - // fullFragment has limit == capacity. This lets us have safety around math and the ability to simply + final ByteBuffer filledBuffer = getFilledBuffer(); + // filledBuffer has limit == capacity. This lets us have safety around math and the ability to simply // clear to reset. - fullFragment.limit(resultOffset + resultLength); - fullFragment.position(resultOffset); + filledBuffer.limit(resultOffset + resultLength); + filledBuffer.position(resultOffset); try { - dest.put(fullFragment); + dest.put(filledBuffer); } finally { - fullFragment.clear(); + filledBuffer.clear(); } ++fillCount; fillBytes += resultLength; return resultLength; } - private ByteBuffer getFullFragment() throws IOException { + private ByteBuffer getFilledBuffer() throws IOException { // Giving our own get() a bit of overhead - the clients should already be constructed with appropriate // apiCallTimeout. final long readNanos = instructions.readTimeout().plusMillis(100).toNanos(); @@ -234,15 +234,15 @@ private ByteBuffer getFullFragment() throws IOException { throw new IllegalStateException(String.format("Failed to complete request %s, probably because the " + "underlying buffer got freed while completing the request", requestStr())); } - final ByteBuffer fullFragment = acquiredBuffer.asReadOnlyBuffer(); - if (fullFragment.position() != 0 || fullFragment.limit() != fullFragment.capacity() - || fullFragment.limit() != requestLength()) { + final ByteBuffer filledBuffer = acquiredBuffer.asReadOnlyBuffer(); + if (filledBuffer.position() != 0 || filledBuffer.limit() != filledBuffer.capacity() + || filledBuffer.limit() != requestLength()) { throw new IllegalStateException(String.format( "Expected: pos=0, limit=%d, capacity=%d. Actual: pos=%d, limit=%d, capacity=%d", - requestLength(), requestLength(), fullFragment.position(), fullFragment.limit(), - fullFragment.capacity())); + requestLength(), requestLength(), filledBuffer.position(), filledBuffer.limit(), + filledBuffer.capacity())); } - return fullFragment; + return filledBuffer; } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java index 51b28bdff44..81a757f83f3 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3RequestCache.java @@ -52,14 +52,14 @@ int getFragmentSize() { * * @param uri the URI * @param fragmentIndex the fragment index - * @return the request + * @return the request if we could acquire it from the cache, or null */ @Nullable - S3Request.AcquiredRequest getRequest(@NotNull final S3Uri uri, final long fragmentIndex) { + S3Request.Acquired getRequest(@NotNull final S3Uri uri, final long fragmentIndex) { final S3Request.ID key = new S3Request.ID(uri, fragmentIndex); final S3Request existingRequest = requests.get(key); if (existingRequest != null) { - final S3Request.AcquiredRequest acquired = existingRequest.tryAcquire(); + final S3Request.Acquired acquired = existingRequest.tryAcquire(); if (acquired != null) { return acquired; } @@ -78,30 +78,30 @@ S3Request.AcquiredRequest getRequest(@NotNull final S3Uri uri, final long fragme * @return the request */ @NotNull - S3Request.AcquiredRequest getOrCreateRequest(@NotNull final S3Uri uri, final long fragmentIndex, + S3Request.Acquired getOrCreateRequest(@NotNull final S3Uri uri, final long fragmentIndex, @NotNull final S3ChannelContext context) { final S3Request.ID key = new S3Request.ID(uri, fragmentIndex); - S3Request.AcquiredRequest newAcquiredRequest = null; + S3Request.Acquired newAcquired = null; S3Request existingRequest = requests.get(key); while (true) { if (existingRequest != null) { - final S3Request.AcquiredRequest acquired = existingRequest.tryAcquire(); + final S3Request.Acquired acquired = existingRequest.tryAcquire(); if (acquired != null) { return acquired; } else { remove(existingRequest); } } - if (newAcquiredRequest == null) { - newAcquiredRequest = S3Request.createAndAcquire(fragmentIndex, context); + if (newAcquired == null) { + newAcquired = S3Request.createAndAcquire(fragmentIndex, context); } - if ((existingRequest = requests.putIfAbsent(key, newAcquiredRequest.getRequest())) == null) { + if ((existingRequest = requests.putIfAbsent(key, newAcquired.request())) == null) { if (log.isDebugEnabled()) { log.debug().append("Added new request to cache: ").append(String.format("ctx=%d ", - System.identityHashCode(context))).append(newAcquiredRequest.getRequest().requestStr()) + System.identityHashCode(context))).append(newAcquired.request().requestStr()) .endl(); } - return newAcquiredRequest; + return newAcquired; } // TODO(deephaven-core#5486): Instead of remove + putIfAbsent pattern, we could have used replace + get // pattern, but KeyedObjectHashMap potentially has a bug in replace method.