From 3ba8ade188824e8580f404a18e4dba2b3d510f21 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 8 Apr 2024 15:56:55 +0200 Subject: [PATCH] Add RangeMissingHandler with completion listener --- .../shared/SharedBlobCacheService.java | 272 ++++++++++++++---- 1 file changed, 211 insertions(+), 61 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 0d51a4271e85b..17b412837fff3 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -55,6 +55,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -393,6 +394,14 @@ public static long calculateCacheSize(Settings settings, long totalFsSize) { .getBytes(); } + public Executor getIOExecutor() { + return ioExecutor; + } + + public Executor getBulkIOExecutor() { + return bulkIOExecutor; + } + public int getRangeSize() { return rangeSize; } @@ -846,8 +855,8 @@ void populate( if (hasGapsToFill) { final var cacheFileRegion = CacheFileRegion.this; for (SparseFileTracker.Gap gap : gaps) { - var fillGapRunnable = fillGapRunnable(cacheFileRegion, writer, gap); - executor.execute(ActionRunnable.run(refs.acquire(), fillGapRunnable::run)); + var gapListener = gapListener(gap, refs.acquire()); + executor.execute(new WriteGapRunnable(cacheFileRegion, writer, gap, gapListener)); } } } @@ -866,7 +875,7 @@ void populateAndRead( ) { Releasable resource = null; try { - incRefEnsureOpen(); + incRefEnsureOpen(); // incRef for read operation resource = Releasables.releaseOnce(this::decRef); final List gaps = tracker.waitForRange( rangeToWrite, @@ -890,9 +899,13 @@ void populateAndRead( ); if (gaps.isEmpty() == false) { + incRefEnsureOpen(); // incRef for write operation final var cacheFileRegion = CacheFileRegion.this; - for (SparseFileTracker.Gap gap : gaps) { - executor.execute(fillGapRunnable(cacheFileRegion, writer, gap)); + try (RefCountingListener refs = new RefCountingListener(ActionListener.releasing(cacheFileRegion::decRef))) { + for (SparseFileTracker.Gap gap : gaps) { + var gapListener = gapListener(gap, refs.acquire()); + executor.execute(new WriteGapRunnable(cacheFileRegion, writer, gap, gapListener)); + } } } } catch (Exception e) { @@ -900,36 +913,86 @@ void populateAndRead( } } - private AbstractRunnable fillGapRunnable(CacheFileRegion cacheFileRegion, RangeMissingHandler writer, SparseFileTracker.Gap gap) { - return new AbstractRunnable() { + /** + * Creates a listener that completes (or fails) the provided {@link SparseFileTracker.Gap} before calling another listener. + * + * @param gap + * @param listener + * @return + */ + private ActionListener gapListener(SparseFileTracker.Gap gap, ActionListener listener) { + return ActionListener.assertOnce(new ActionListener<>() { + + final AtomicBoolean completed = new AtomicBoolean(); + @Override - protected void doRun() throws Exception { - if (cacheFileRegion.tryIncRefEnsureOpen() == false) { - throw new AlreadyClosedException("File chunk [" + cacheFileRegion.regionKey + "] has been released"); - } - try { - final int start = Math.toIntExact(gap.start()); - var ioRef = io; - assert regionOwners.get(ioRef) == cacheFileRegion; - writer.fillCacheRange( - ioRef, - start, - start, - Math.toIntExact(gap.end() - start), - progress -> gap.onProgress(start + progress) - ); - writeCount.increment(); - } finally { - cacheFileRegion.decRef(); + public void onResponse(Void response) { + if (completed.compareAndSet(true, false)) { + try { + gap.onCompletion(); + } catch (Exception e) { + assert false : e; + onFailure(e); + return; + } finally { + writeCount.increment(); + } } - gap.onCompletion(); + listener.onResponse(response); } @Override - public void onFailure(Exception e) { - gap.onFailure(e); + public void onFailure(Exception exception) { + if (completed.compareAndSet(true, false)) { + try { + gap.onFailure(exception); + } catch (Exception e) { + if (exception != null && e != exception) { + e.addSuppressed(e); + } + assert false : e; + listener.onFailure(e); + return; + } + } + listener.onFailure(exception); } - }; + }); + } + + private class WriteGapRunnable extends ActionRunnable { + + private final CacheFileRegion cacheFileRegion; + private final RangeMissingHandler writer; + private final SparseFileTracker.Gap gap; + + public WriteGapRunnable( + CacheFileRegion cacheFileRegion, + RangeMissingHandler writer, + SparseFileTracker.Gap gap, + ActionListener listener + ) { + super(listener); + this.cacheFileRegion = cacheFileRegion; + this.writer = writer; + this.gap = gap; + } + + @Override + protected void doRun() throws Exception { + var ioRef = io; + assert regionOwners.get(ioRef) == cacheFileRegion; + assert cacheFileRegion.hasReferences() : cacheFileRegion.regionKey + " has been released"; + final int start = Math.toIntExact(gap.start()); + writer.fillCacheRange( + ioRef, + start, + start, + Math.toIntExact(gap.end() - start), + progress -> gap.onProgress(start + progress), + listener + ); + } } private static void releaseAndFail(ActionListener listener, Releasable decrementRef, Exception e) { @@ -996,39 +1059,67 @@ public boolean tryRead(ByteBuffer buf, long offset) throws IOException { return res; } + public Executor getIOExecutor() { + return ioExecutor; + } + public int populateAndRead( final ByteRange rangeToWrite, final ByteRange rangeToRead, final RangeAvailableHandler reader, final RangeMissingHandler writer + ) throws Exception { + return populateAndRead(rangeToWrite, rangeToRead, reader, writer, ioExecutor); + } + + public int populateAndRead( + final ByteRange rangeToWrite, + final ByteRange rangeToRead, + final RangeAvailableHandler reader, + final RangeMissingHandler writer, + final Executor executor ) throws Exception { // some cache files can grow after being created, so rangeToWrite can be larger than the initial {@code length} assert rangeToWrite.start() >= 0 : rangeToWrite; assert assertOffsetsWithinFileLength(rangeToRead.start(), rangeToRead.length(), length); - // We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start - // our measurement here. - final long startTime = threadPool.relativeTimeInNanos(); - RangeMissingHandler writerInstrumentationDecorator = ( - SharedBytes.IO channel, - int channelPos, - int relativePos, - int length, - IntConsumer progressUpdater) -> { - writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater); - var elapsedTime = TimeUnit.NANOSECONDS.toMicros(threadPool.relativeTimeInNanos() - startTime); - SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime); - SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment(); - }; if (rangeToRead.isEmpty()) { // nothing to read, skip return 0; } + // We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start + // our measurement here. + final long startTime = threadPool.relativeTimeInNanos(); + RangeMissingHandler instrumentedWriter = new RangeMissingHandler() { + @Override + public void fillCacheRange(SharedBytes.IO channel, int channelPos, int relativePos, int length, IntConsumer progressUpdater) + throws IOException { + writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater); + var elapsedTime = TimeUnit.NANOSECONDS.toMicros(threadPool.relativeTimeInNanos() - startTime); + SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime); + SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment(); + } + + @Override + public void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + int relativePos, + int length, + IntConsumer progressUpdater, + ActionListener listener + ) { + writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater, listener); + var elapsedTime = TimeUnit.NANOSECONDS.toMicros(threadPool.relativeTimeInNanos() - startTime); + SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime); + SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment(); + } + }; final int startRegion = getRegion(rangeToWrite.start()); final int endRegion = getEndingRegion(rangeToWrite.end()); if (startRegion == endRegion) { - return readSingleRegion(rangeToWrite, rangeToRead, reader, writerInstrumentationDecorator, startRegion); + return readSingleRegion(rangeToWrite, rangeToRead, reader, instrumentedWriter, startRegion, executor); } - return readMultiRegions(rangeToWrite, rangeToRead, reader, writerInstrumentationDecorator, startRegion, endRegion); + return readMultiRegions(rangeToWrite, rangeToRead, reader, instrumentedWriter, startRegion, endRegion, executor); } private int readSingleRegion( @@ -1036,7 +1127,8 @@ private int readSingleRegion( ByteRange rangeToRead, RangeAvailableHandler reader, RangeMissingHandler writer, - int region + int region, + Executor executor ) throws InterruptedException, ExecutionException { final PlainActionFuture readFuture = new PlainActionFuture<>(); final CacheFileRegion fileRegion = get(cacheKey, length, region); @@ -1046,7 +1138,7 @@ private int readSingleRegion( mapSubRangeToRegion(rangeToRead, region), readerWithOffset(reader, fileRegion, Math.toIntExact(rangeToRead.start() - regionStart)), writerWithOffset(writer, fileRegion, Math.toIntExact(rangeToWrite.start() - regionStart)), - ioExecutor, + executor, readFuture ); return readFuture.get(); @@ -1058,7 +1150,8 @@ private int readMultiRegions( RangeAvailableHandler reader, RangeMissingHandler writer, int startRegion, - int endRegion + int endRegion, + Executor executor ) throws InterruptedException, ExecutionException { final PlainActionFuture readsComplete = new PlainActionFuture<>(); final AtomicInteger bytesRead = new AtomicInteger(); @@ -1078,7 +1171,7 @@ private int readMultiRegions( subRangeToRead, readerWithOffset(reader, fileRegion, Math.toIntExact(rangeToRead.start() - regionStart)), writerWithOffset(writer, fileRegion, Math.toIntExact(rangeToWrite.start() - regionStart)), - ioExecutor, + executor, listener ); } catch (Exception e) { @@ -1097,20 +1190,63 @@ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFi // no need to allocate a new capturing lambda if the offset isn't adjusted adjustedWriter = writer; } else { - adjustedWriter = (channel, channelPos, relativePos, len, progressUpdater) -> writer.fillCacheRange( - channel, - channelPos, - relativePos - writeOffset, - len, - progressUpdater - ); + adjustedWriter = new RangeMissingHandler() { + @Override + public void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + int relativePos, + int len, + IntConsumer progressUpdater + ) throws IOException { + writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater); + } + + @Override + public void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + int relativePos, + int len, + IntConsumer progressUpdater, + ActionListener listener + ) { + writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater, listener); + } + }; } if (Assertions.ENABLED) { - return (channel, channelPos, relativePos, len, progressUpdater) -> { - assert assertValidRegionAndLength(fileRegion, channelPos, len); - adjustedWriter.fillCacheRange(channel, channelPos, relativePos, len, progressUpdater); - assert regionOwners.get(fileRegion.io) == fileRegion - : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]"; + return new RangeMissingHandler() { + @Override + public void fillCacheRange(SharedBytes.IO channel, int channelPos, int relativePos, int len, IntConsumer progressUpdater) throws IOException { + assert assertValidRegionAndLength(fileRegion, channelPos, len); + adjustedWriter.fillCacheRange(channel, channelPos, relativePos, len, progressUpdater); + assert regionOwners.get(fileRegion.io) == fileRegion + : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]"; + } + + @Override + public void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + int relativePos, + int len, + IntConsumer progressUpdater, + ActionListener listener + ) { + assert assertValidRegionAndLength(fileRegion, channelPos, len); + adjustedWriter.fillCacheRange( + channel, + channelPos, + relativePos, + len, + progressUpdater, + ActionListener.runBefore(listener, () -> { + assert regionOwners.get(fileRegion.io) == fileRegion + : "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]"; + }) + ); + } }; } return adjustedWriter; @@ -1164,6 +1300,20 @@ public interface RangeAvailableHandler { public interface RangeMissingHandler { void fillCacheRange(SharedBytes.IO channel, int channelPos, int relativePos, int length, IntConsumer progressUpdater) throws IOException; + + default void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + int relativePos, + int length, + IntConsumer progressUpdater, + ActionListener listener + ) { + ActionListener.completeWith(listener, () -> { + fillCacheRange(channel, channelPos, relativePos, length, progressUpdater); + return null; + }); + } } public record Stats(