Skip to content

Commit

Permalink
Add setting to shared cache service to always use full regions
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Feb 15, 2024
1 parent 949a294 commit 91d8bb5
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public List<Setting<?>> getSettings() {
SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING,
SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING,
SharedBlobCacheService.SHARED_CACHE_MMAP,
SharedBlobCacheService.SHARED_CACHE_COUNT_READS
SharedBlobCacheService.SHARED_CACHE_COUNT_READS,
SharedBlobCacheService.USE_FULL_REGION_SIZE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.store.IndexInput;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.shared.SharedBytes;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.Streams;

Expand All @@ -31,6 +32,17 @@ public static int toIntBytes(long l) {
return ByteSizeUnit.BYTES.toIntBytes(l);
}

/**
* Rounds the length up so that it is aligned on the next page size (defined by SharedBytes.PAGE_SIZE). For example
*/
public static long toPageAlignedBytes(long length) {
int remainder = (int) length % SharedBytes.PAGE_SIZE;
if (remainder > 0L) {
return length + (SharedBytes.PAGE_SIZE - remainder);
}
return length;
}

public static void throwEOF(long channelPos, long len) throws EOFException {
throw new EOFException(format("unexpected EOF reading [%d-%d]", channelPos, channelPos + len));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool
Setting.Property.NodeScope
);

public static final Setting<Boolean> USE_FULL_REGION_SIZE = Setting.boolSetting(
SHARED_CACHE_SETTINGS_PREFIX + "use_full_region_size",
false,
Setting.Property.NodeScope
);

private final Boolean useFullRegionSize;

// used in tests
void computeDecay() {
if (cache instanceof LFUCache lfuCache) {
Expand Down Expand Up @@ -382,6 +390,7 @@ public SharedBlobCacheService(

this.blobCacheMetrics = blobCacheMetrics;
this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment;
this.useFullRegionSize = USE_FULL_REGION_SIZE.get(settings);
}

public static long calculateCacheSize(Settings settings, long totalFsSize) {
Expand Down Expand Up @@ -435,6 +444,15 @@ private ByteRange mapSubRangeToRegion(ByteRange range, int region) {
);
}

// package private for tests
int computeCacheFileRegionSize(long fileLength, int region) {
if (useFullRegionSize) {
return getRegionSize();
}
// the size of the region is computed from the file length
return getRegionSize(fileLength, region);
}

private int getRegionSize(long fileLength, int region) {
assert fileLength > 0;
final int maxRegion = getEndingRegion(fileLength);
Expand Down Expand Up @@ -1209,7 +1227,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
// if we did not find an entry
var entry = keyMapping.get(regionKey);
if (entry == null) {
final int effectiveRegionSize = getRegionSize(fileLength, region);
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now));
}
// io is volatile, double locking is fine, as long as we assign it last.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.blobcache.BlobCacheMetrics;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -1052,7 +1053,6 @@ public void testPopulate() throws Exception {
.put("path.home", createTempDir())
.build();

final AtomicLong relativeTimeInMillis = new AtomicLong(0L);
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
Expand Down Expand Up @@ -1136,4 +1136,42 @@ public void testNonPositiveRecoveryRangeSizeRejected() {
assertThatNonPositiveRecoveryRangeSizeRejected(SharedBlobCacheService.SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING);
}

public void testUseFullRegionSize() throws IOException {
final long regionSize = size(randomIntBetween(1, 100));
final long cacheSize = regionSize * randomIntBetween(1, 10);

Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep())
.put(SharedBlobCacheService.USE_FULL_REGION_SIZE.getKey(), true)
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
environment,
settings,
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP
)
) {
final var cacheKey = generateCacheKey();
final var blobLength = randomLongBetween(1L, cacheSize);

int regions = Math.toIntExact(blobLength / regionSize);
regions += (blobLength % regionSize == 0L ? 0L : 1L);
assertThat(
cacheService.computeCacheFileRegionSize(blobLength, randomFrom(regions)),
equalTo(BlobCacheUtils.toIntBytes(regionSize))
);
for (int region = 0; region < regions; region++) {
var cacheFileRegion = cacheService.get(cacheKey, blobLength, region);
assertThat(cacheFileRegion.tracker.getLength(), equalTo(regionSize));
}
}
}
}

0 comments on commit 91d8bb5

Please sign in to comment.