diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index e55e1cf0c79d2..edeed9a16034a 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -50,6 +50,7 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -1017,6 +1018,165 @@ public void execute(Runnable command) { threadPool.shutdown(); } + public void testMaybeFetchRange() throws Exception { + final long cacheSize = size(500L); + final long regionSize = size(100L); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + final var bulkTaskCount = new AtomicInteger(0); + final var threadPool = new TestThreadPool("test"); + final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) { + @Override + public void execute(Runnable command) { + super.execute(command); + bulkTaskCount.incrementAndGet(); + } + }; + + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + threadPool, + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + { + // fetch a random range in a random region of the blob + final var cacheKey = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + + // blobLength is 1024000 bytes and requires 3 regions + final long blobLength = size(250); + final var regions = List.of( + // region 0: 0-409600 + ByteRange.of(cacheService.getRegionStart(0), cacheService.getRegionEnd(0)), + // region 1: 409600-819200 + ByteRange.of(cacheService.getRegionStart(1), cacheService.getRegionEnd(1)), + // region 2: 819200-1228800 + ByteRange.of(cacheService.getRegionStart(2), cacheService.getRegionEnd(2)) + ); + + long pos = randomLongBetween(0, blobLength - 1L); + long len = randomLongBetween(1, blobLength - pos); + var range = ByteRange.of(pos, pos + len); + var region = between(0, regions.size() - 1); + var regionRange = cacheService.mapSubRangeToRegion(range, region); + + var bytesCopied = new AtomicLong(0L); + var future = new PlainActionFuture(); + cacheService.maybeFetchRange( + cacheKey, + region, + range, + blobLength, + (channel, channelPos, relativePos, length, progressUpdater) -> { + assertThat(range.start() + relativePos, equalTo(cacheService.getRegionStart(region) + regionRange.start())); + assertThat(channelPos, equalTo(Math.toIntExact(regionRange.start()))); + assertThat(length, equalTo(Math.toIntExact(regionRange.length()))); + bytesCopied.addAndGet(length); + }, + bulkExecutor, + future + ); + var fetched = future.get(10, TimeUnit.SECONDS); + + assertThat(regionRange.length(), equalTo(bytesCopied.get())); + if (regionRange.isEmpty()) { + assertThat(fetched, is(false)); + assertEquals(5, cacheService.freeRegionCount()); + assertEquals(0, bulkTaskCount.get()); + } else { + assertThat(fetched, is(true)); + assertEquals(4, cacheService.freeRegionCount()); + assertEquals(1, bulkTaskCount.get()); + } + } + { + // fetch multiple ranges to use all the cache + final int remainingFreeRegions = cacheService.freeRegionCount(); + assertThat(remainingFreeRegions, greaterThanOrEqualTo(4)); + bulkTaskCount.set(0); + + final var cacheKey = generateCacheKey(); + final long blobLength = regionSize * remainingFreeRegions; + AtomicLong bytesCopied = new AtomicLong(0L); + + final PlainActionFuture> future = new PlainActionFuture<>(); + final var listener = new GroupedActionListener<>(remainingFreeRegions, future); + for (int region = 0; region < remainingFreeRegions; region++) { + cacheService.maybeFetchRange( + cacheKey, + region, + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, relativePos, length, progressUpdater) -> bytesCopied.addAndGet(length), + bulkExecutor, + listener + ); + } + + var results = future.get(10, TimeUnit.SECONDS); + assertThat(results.stream().allMatch(result -> result), is(true)); + assertEquals(blobLength, bytesCopied.get()); + assertEquals(0, cacheService.freeRegionCount()); + assertEquals(remainingFreeRegions, bulkTaskCount.get()); + } + { + // cache fully used, no entry old enough to be evicted + assertEquals(0, cacheService.freeRegionCount()); + final var cacheKey = generateCacheKey(); + final var blobLength = randomLongBetween(1L, regionSize); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.maybeFetchRange( + cacheKey, + randomIntBetween(0, 10), + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, relativePos, length, progressUpdater) -> { + throw new AssertionError("should not be executed"); + }, + bulkExecutor, + future + ); + assertThat("Listener is immediately completed", future.isDone(), is(true)); + assertThat("Region already exists in cache", future.get(), is(false)); + } + { + cacheService.computeDecay(); + + // fetch one more range should evict an old cache entry + final var cacheKey = generateCacheKey(); + assertEquals(0, cacheService.freeRegionCount()); + long blobLength = randomLongBetween(1L, regionSize); + AtomicLong bytesCopied = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.maybeFetchRange( + cacheKey, + 0, + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, relativePos, length, progressUpdater) -> bytesCopied.addAndGet(length), + bulkExecutor, + future + ); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(blobLength, bytesCopied.get()); + assertEquals(0, cacheService.freeRegionCount()); + } + } + threadPool.shutdown(); + } + public void testPopulate() throws Exception { final long regionSize = size(1L); Settings settings = Settings.builder()