Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Apr 30, 2024
1 parent 5935a8a commit 38bc83e
Showing 1 changed file with 160 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -1017,6 +1018,165 @@ public void execute(Runnable command) {
threadPool.shutdown();
}

public void testMaybeFetchRange() throws Exception {
final long cacheSize = size(500L);
final long regionSize = size(100L);
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep())
.put("path.home", createTempDir())
.build();

final var bulkTaskCount = new AtomicInteger(0);
final var threadPool = new TestThreadPool("test");
final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) {
@Override
public void execute(Runnable command) {
super.execute(command);
bulkTaskCount.incrementAndGet();
}
};

try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
environment,
settings,
threadPool,
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP
)
) {
{
// fetch a random range in a random region of the blob
final var cacheKey = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());

// blobLength is 1024000 bytes and requires 3 regions
final long blobLength = size(250);
final var regions = List.of(
// region 0: 0-409600
ByteRange.of(cacheService.getRegionStart(0), cacheService.getRegionEnd(0)),
// region 1: 409600-819200
ByteRange.of(cacheService.getRegionStart(1), cacheService.getRegionEnd(1)),
// region 2: 819200-1228800
ByteRange.of(cacheService.getRegionStart(2), cacheService.getRegionEnd(2))
);

long pos = randomLongBetween(0, blobLength - 1L);
long len = randomLongBetween(1, blobLength - pos);
var range = ByteRange.of(pos, pos + len);
var region = between(0, regions.size() - 1);
var regionRange = cacheService.mapSubRangeToRegion(range, region);

var bytesCopied = new AtomicLong(0L);
var future = new PlainActionFuture<Boolean>();
cacheService.maybeFetchRange(
cacheKey,
region,
range,
blobLength,
(channel, channelPos, relativePos, length, progressUpdater) -> {
assertThat(range.start() + relativePos, equalTo(cacheService.getRegionStart(region) + regionRange.start()));
assertThat(channelPos, equalTo(Math.toIntExact(regionRange.start())));
assertThat(length, equalTo(Math.toIntExact(regionRange.length())));
bytesCopied.addAndGet(length);
},
bulkExecutor,
future
);
var fetched = future.get(10, TimeUnit.SECONDS);

assertThat(regionRange.length(), equalTo(bytesCopied.get()));
if (regionRange.isEmpty()) {
assertThat(fetched, is(false));
assertEquals(5, cacheService.freeRegionCount());
assertEquals(0, bulkTaskCount.get());
} else {
assertThat(fetched, is(true));
assertEquals(4, cacheService.freeRegionCount());
assertEquals(1, bulkTaskCount.get());
}
}
{
// fetch multiple ranges to use all the cache
final int remainingFreeRegions = cacheService.freeRegionCount();
assertThat(remainingFreeRegions, greaterThanOrEqualTo(4));
bulkTaskCount.set(0);

final var cacheKey = generateCacheKey();
final long blobLength = regionSize * remainingFreeRegions;
AtomicLong bytesCopied = new AtomicLong(0L);

final PlainActionFuture<Collection<Boolean>> future = new PlainActionFuture<>();
final var listener = new GroupedActionListener<>(remainingFreeRegions, future);
for (int region = 0; region < remainingFreeRegions; region++) {
cacheService.maybeFetchRange(
cacheKey,
region,
ByteRange.of(0L, blobLength),
blobLength,
(channel, channelPos, relativePos, length, progressUpdater) -> bytesCopied.addAndGet(length),
bulkExecutor,
listener
);
}

var results = future.get(10, TimeUnit.SECONDS);
assertThat(results.stream().allMatch(result -> result), is(true));
assertEquals(blobLength, bytesCopied.get());
assertEquals(0, cacheService.freeRegionCount());
assertEquals(remainingFreeRegions, bulkTaskCount.get());
}
{
// cache fully used, no entry old enough to be evicted
assertEquals(0, cacheService.freeRegionCount());
final var cacheKey = generateCacheKey();
final var blobLength = randomLongBetween(1L, regionSize);
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
cacheService.maybeFetchRange(
cacheKey,
randomIntBetween(0, 10),
ByteRange.of(0L, blobLength),
blobLength,
(channel, channelPos, relativePos, length, progressUpdater) -> {
throw new AssertionError("should not be executed");
},
bulkExecutor,
future
);
assertThat("Listener is immediately completed", future.isDone(), is(true));
assertThat("Region already exists in cache", future.get(), is(false));
}
{
cacheService.computeDecay();

// fetch one more range should evict an old cache entry
final var cacheKey = generateCacheKey();
assertEquals(0, cacheService.freeRegionCount());
long blobLength = randomLongBetween(1L, regionSize);
AtomicLong bytesCopied = new AtomicLong(0L);
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
cacheService.maybeFetchRange(
cacheKey,
0,
ByteRange.of(0L, blobLength),
blobLength,
(channel, channelPos, relativePos, length, progressUpdater) -> bytesCopied.addAndGet(length),
bulkExecutor,
future
);

var fetched = future.get(10, TimeUnit.SECONDS);
assertThat("Region has been fetched", fetched, is(true));
assertEquals(blobLength, bytesCopied.get());
assertEquals(0, cacheService.freeRegionCount());
}
}
threadPool.shutdown();
}

public void testPopulate() throws Exception {
final long regionSize = size(1L);
Settings settings = Settings.builder()
Expand Down

0 comments on commit 38bc83e

Please sign in to comment.