From 91d8bb5aaeb312a6f2432f8c50707ed3d3df8ac8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 15 Feb 2024 18:27:22 +0100 Subject: [PATCH] Add setting to shared cache service to always use full regions --- .../blobcache/BlobCachePlugin.java | 3 +- .../blobcache/BlobCacheUtils.java | 12 ++++++ .../shared/SharedBlobCacheService.java | 20 +++++++++- .../shared/SharedBlobCacheServiceTests.java | 40 ++++++++++++++++++- 4 files changed, 72 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java index 4f9ac3eb99348..cb440520acbb9 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java @@ -27,7 +27,8 @@ public List> getSettings() { SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING, SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING, SharedBlobCacheService.SHARED_CACHE_MMAP, - SharedBlobCacheService.SHARED_CACHE_COUNT_READS + SharedBlobCacheService.SHARED_CACHE_COUNT_READS, + SharedBlobCacheService.USE_FULL_REGION_SIZE ); } } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java index c4dff2cb4457b..8b1e71d87fc6a 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java @@ -9,6 +9,7 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.blobcache.common.ByteRange; +import org.elasticsearch.blobcache.shared.SharedBytes; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.core.Streams; @@ -31,6 +32,17 @@ public static int toIntBytes(long l) { return ByteSizeUnit.BYTES.toIntBytes(l); } + /** + * Rounds the length up so that it is aligned on the next page size (defined by SharedBytes.PAGE_SIZE). For example + */ + public static long toPageAlignedBytes(long length) { + int remainder = (int) length % SharedBytes.PAGE_SIZE; + if (remainder > 0L) { + return length + (SharedBytes.PAGE_SIZE - remainder); + } + return length; + } + public static void throwEOF(long channelPos, long len) throws EOFException { throw new EOFException(format("unexpected EOF reading [%d-%d]", channelPos, channelPos + len)); } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 2c5997e479209..82fa218a7056d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -253,6 +253,14 @@ public void validate(ByteSizeValue value, Map, Object> settings, bool Setting.Property.NodeScope ); + public static final Setting USE_FULL_REGION_SIZE = Setting.boolSetting( + SHARED_CACHE_SETTINGS_PREFIX + "use_full_region_size", + false, + Setting.Property.NodeScope + ); + + private final Boolean useFullRegionSize; + // used in tests void computeDecay() { if (cache instanceof LFUCache lfuCache) { @@ -382,6 +390,7 @@ public SharedBlobCacheService( this.blobCacheMetrics = blobCacheMetrics; this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment; + this.useFullRegionSize = USE_FULL_REGION_SIZE.get(settings); } public static long calculateCacheSize(Settings settings, long totalFsSize) { @@ -435,6 +444,15 @@ private ByteRange mapSubRangeToRegion(ByteRange range, int region) { ); } + // package private for tests + int computeCacheFileRegionSize(long fileLength, int region) { + if (useFullRegionSize) { + return getRegionSize(); + } + // the size of the region is computed from the file length + return getRegionSize(fileLength, region); + } + private int getRegionSize(long fileLength, int region) { assert fileLength > 0; final int maxRegion = getEndingRegion(fileLength); @@ -1209,7 +1227,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { // if we did not find an entry var entry = keyMapping.get(regionKey); if (entry == null) { - final int effectiveRegionSize = getRegionSize(fileLength, region); + final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region); entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now)); } // io is volatile, double locking is fine, as long as we assign it last. diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 049197edd97df..9d44443467129 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.blobcache.BlobCacheMetrics; +import org.elasticsearch.blobcache.BlobCacheUtils; import org.elasticsearch.blobcache.common.ByteRange; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.Setting; @@ -1052,7 +1053,6 @@ public void testPopulate() throws Exception { .put("path.home", createTempDir()) .build(); - final AtomicLong relativeTimeInMillis = new AtomicLong(0L); final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); try ( NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); @@ -1136,4 +1136,42 @@ public void testNonPositiveRecoveryRangeSizeRejected() { assertThatNonPositiveRecoveryRangeSizeRejected(SharedBlobCacheService.SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING); } + public void testUseFullRegionSize() throws IOException { + final long regionSize = size(randomIntBetween(1, 100)); + final long cacheSize = regionSize * randomIntBetween(1, 10); + + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep()) + .put(SharedBlobCacheService.USE_FULL_REGION_SIZE.getKey(), true) + .put("path.home", createTempDir()) + .build(); + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + taskQueue.getThreadPool(), + ThreadPool.Names.GENERIC, + ThreadPool.Names.GENERIC, + BlobCacheMetrics.NOOP + ) + ) { + final var cacheKey = generateCacheKey(); + final var blobLength = randomLongBetween(1L, cacheSize); + + int regions = Math.toIntExact(blobLength / regionSize); + regions += (blobLength % regionSize == 0L ? 0L : 1L); + assertThat( + cacheService.computeCacheFileRegionSize(blobLength, randomFrom(regions)), + equalTo(BlobCacheUtils.toIntBytes(regionSize)) + ); + for (int region = 0; region < regions; region++) { + var cacheFileRegion = cacheService.get(cacheKey, blobLength, region); + assertThat(cacheFileRegion.tracker.getLength(), equalTo(regionSize)); + } + } + } }