Skip to content

Commit

Permalink
Add RangeMissingHandler with completion listener
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Apr 8, 2024
1 parent 12d819a commit 3ba8ade
Showing 1 changed file with 211 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -393,6 +394,14 @@ public static long calculateCacheSize(Settings settings, long totalFsSize) {
.getBytes();
}

public Executor getIOExecutor() {
return ioExecutor;
}

public Executor getBulkIOExecutor() {
return bulkIOExecutor;
}

public int getRangeSize() {
return rangeSize;
}
Expand Down Expand Up @@ -846,8 +855,8 @@ void populate(
if (hasGapsToFill) {
final var cacheFileRegion = CacheFileRegion.this;
for (SparseFileTracker.Gap gap : gaps) {
var fillGapRunnable = fillGapRunnable(cacheFileRegion, writer, gap);
executor.execute(ActionRunnable.run(refs.acquire(), fillGapRunnable::run));
var gapListener = gapListener(gap, refs.acquire());
executor.execute(new WriteGapRunnable(cacheFileRegion, writer, gap, gapListener));
}
}
}
Expand All @@ -866,7 +875,7 @@ void populateAndRead(
) {
Releasable resource = null;
try {
incRefEnsureOpen();
incRefEnsureOpen(); // incRef for read operation
resource = Releasables.releaseOnce(this::decRef);
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
rangeToWrite,
Expand All @@ -890,46 +899,100 @@ void populateAndRead(
);

if (gaps.isEmpty() == false) {
incRefEnsureOpen(); // incRef for write operation
final var cacheFileRegion = CacheFileRegion.this;
for (SparseFileTracker.Gap gap : gaps) {
executor.execute(fillGapRunnable(cacheFileRegion, writer, gap));
try (RefCountingListener refs = new RefCountingListener(ActionListener.releasing(cacheFileRegion::decRef))) {
for (SparseFileTracker.Gap gap : gaps) {
var gapListener = gapListener(gap, refs.acquire());
executor.execute(new WriteGapRunnable(cacheFileRegion, writer, gap, gapListener));
}
}
}
} catch (Exception e) {
releaseAndFail(listener, resource, e);
}
}

private AbstractRunnable fillGapRunnable(CacheFileRegion cacheFileRegion, RangeMissingHandler writer, SparseFileTracker.Gap gap) {
return new AbstractRunnable() {
/**
* Creates a listener that completes (or fails) the provided {@link SparseFileTracker.Gap} before calling another listener.
*
* @param gap
* @param listener
* @return
*/
private ActionListener<Void> gapListener(SparseFileTracker.Gap gap, ActionListener<Void> listener) {
return ActionListener.assertOnce(new ActionListener<>() {

final AtomicBoolean completed = new AtomicBoolean();

@Override
protected void doRun() throws Exception {
if (cacheFileRegion.tryIncRefEnsureOpen() == false) {
throw new AlreadyClosedException("File chunk [" + cacheFileRegion.regionKey + "] has been released");
}
try {
final int start = Math.toIntExact(gap.start());
var ioRef = io;
assert regionOwners.get(ioRef) == cacheFileRegion;
writer.fillCacheRange(
ioRef,
start,
start,
Math.toIntExact(gap.end() - start),
progress -> gap.onProgress(start + progress)
);
writeCount.increment();
} finally {
cacheFileRegion.decRef();
public void onResponse(Void response) {
if (completed.compareAndSet(true, false)) {
try {
gap.onCompletion();
} catch (Exception e) {
assert false : e;
onFailure(e);
return;
} finally {
writeCount.increment();
}
}
gap.onCompletion();
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
public void onFailure(Exception exception) {
if (completed.compareAndSet(true, false)) {
try {
gap.onFailure(exception);
} catch (Exception e) {
if (exception != null && e != exception) {
e.addSuppressed(e);
}
assert false : e;
listener.onFailure(e);
return;
}
}
listener.onFailure(exception);
}
};
});
}

private class WriteGapRunnable extends ActionRunnable<Void> {

private final CacheFileRegion cacheFileRegion;
private final RangeMissingHandler writer;
private final SparseFileTracker.Gap gap;

public WriteGapRunnable(
CacheFileRegion cacheFileRegion,
RangeMissingHandler writer,
SparseFileTracker.Gap gap,
ActionListener<Void> listener
) {
super(listener);
this.cacheFileRegion = cacheFileRegion;
this.writer = writer;
this.gap = gap;
}

@Override
protected void doRun() throws Exception {
var ioRef = io;
assert regionOwners.get(ioRef) == cacheFileRegion;
assert cacheFileRegion.hasReferences() : cacheFileRegion.regionKey + " has been released";
final int start = Math.toIntExact(gap.start());
writer.fillCacheRange(
ioRef,
start,
start,
Math.toIntExact(gap.end() - start),
progress -> gap.onProgress(start + progress),
listener
);
}
}

private static void releaseAndFail(ActionListener<?> listener, Releasable decrementRef, Exception e) {
Expand Down Expand Up @@ -996,47 +1059,76 @@ public boolean tryRead(ByteBuffer buf, long offset) throws IOException {
return res;
}

public Executor getIOExecutor() {
return ioExecutor;
}

public int populateAndRead(
final ByteRange rangeToWrite,
final ByteRange rangeToRead,
final RangeAvailableHandler reader,
final RangeMissingHandler writer
) throws Exception {
return populateAndRead(rangeToWrite, rangeToRead, reader, writer, ioExecutor);
}

public int populateAndRead(
final ByteRange rangeToWrite,
final ByteRange rangeToRead,
final RangeAvailableHandler reader,
final RangeMissingHandler writer,
final Executor executor
) throws Exception {
// some cache files can grow after being created, so rangeToWrite can be larger than the initial {@code length}
assert rangeToWrite.start() >= 0 : rangeToWrite;
assert assertOffsetsWithinFileLength(rangeToRead.start(), rangeToRead.length(), length);
// We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start
// our measurement here.
final long startTime = threadPool.relativeTimeInNanos();
RangeMissingHandler writerInstrumentationDecorator = (
SharedBytes.IO channel,
int channelPos,
int relativePos,
int length,
IntConsumer progressUpdater) -> {
writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater);
var elapsedTime = TimeUnit.NANOSECONDS.toMicros(threadPool.relativeTimeInNanos() - startTime);
SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime);
SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment();
};
if (rangeToRead.isEmpty()) {
// nothing to read, skip
return 0;
}
// We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start
// our measurement here.
final long startTime = threadPool.relativeTimeInNanos();
RangeMissingHandler instrumentedWriter = new RangeMissingHandler() {
@Override
public void fillCacheRange(SharedBytes.IO channel, int channelPos, int relativePos, int length, IntConsumer progressUpdater)
throws IOException {
writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater);
var elapsedTime = TimeUnit.NANOSECONDS.toMicros(threadPool.relativeTimeInNanos() - startTime);
SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime);
SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment();
}

@Override
public void fillCacheRange(
SharedBytes.IO channel,
int channelPos,
int relativePos,
int length,
IntConsumer progressUpdater,
ActionListener<Void> listener
) {
writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater, listener);
var elapsedTime = TimeUnit.NANOSECONDS.toMicros(threadPool.relativeTimeInNanos() - startTime);
SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime);
SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment();
}
};
final int startRegion = getRegion(rangeToWrite.start());
final int endRegion = getEndingRegion(rangeToWrite.end());
if (startRegion == endRegion) {
return readSingleRegion(rangeToWrite, rangeToRead, reader, writerInstrumentationDecorator, startRegion);
return readSingleRegion(rangeToWrite, rangeToRead, reader, instrumentedWriter, startRegion, executor);
}
return readMultiRegions(rangeToWrite, rangeToRead, reader, writerInstrumentationDecorator, startRegion, endRegion);
return readMultiRegions(rangeToWrite, rangeToRead, reader, instrumentedWriter, startRegion, endRegion, executor);
}

private int readSingleRegion(
ByteRange rangeToWrite,
ByteRange rangeToRead,
RangeAvailableHandler reader,
RangeMissingHandler writer,
int region
int region,
Executor executor
) throws InterruptedException, ExecutionException {
final PlainActionFuture<Integer> readFuture = new PlainActionFuture<>();
final CacheFileRegion fileRegion = get(cacheKey, length, region);
Expand All @@ -1046,7 +1138,7 @@ private int readSingleRegion(
mapSubRangeToRegion(rangeToRead, region),
readerWithOffset(reader, fileRegion, Math.toIntExact(rangeToRead.start() - regionStart)),
writerWithOffset(writer, fileRegion, Math.toIntExact(rangeToWrite.start() - regionStart)),
ioExecutor,
executor,
readFuture
);
return readFuture.get();
Expand All @@ -1058,7 +1150,8 @@ private int readMultiRegions(
RangeAvailableHandler reader,
RangeMissingHandler writer,
int startRegion,
int endRegion
int endRegion,
Executor executor
) throws InterruptedException, ExecutionException {
final PlainActionFuture<Void> readsComplete = new PlainActionFuture<>();
final AtomicInteger bytesRead = new AtomicInteger();
Expand All @@ -1078,7 +1171,7 @@ private int readMultiRegions(
subRangeToRead,
readerWithOffset(reader, fileRegion, Math.toIntExact(rangeToRead.start() - regionStart)),
writerWithOffset(writer, fileRegion, Math.toIntExact(rangeToWrite.start() - regionStart)),
ioExecutor,
executor,
listener
);
} catch (Exception e) {
Expand All @@ -1097,20 +1190,63 @@ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFi
// no need to allocate a new capturing lambda if the offset isn't adjusted
adjustedWriter = writer;
} else {
adjustedWriter = (channel, channelPos, relativePos, len, progressUpdater) -> writer.fillCacheRange(
channel,
channelPos,
relativePos - writeOffset,
len,
progressUpdater
);
adjustedWriter = new RangeMissingHandler() {
@Override
public void fillCacheRange(
SharedBytes.IO channel,
int channelPos,
int relativePos,
int len,
IntConsumer progressUpdater
) throws IOException {
writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater);
}

@Override
public void fillCacheRange(
SharedBytes.IO channel,
int channelPos,
int relativePos,
int len,
IntConsumer progressUpdater,
ActionListener<Void> listener
) {
writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater, listener);
}
};
}
if (Assertions.ENABLED) {
return (channel, channelPos, relativePos, len, progressUpdater) -> {
assert assertValidRegionAndLength(fileRegion, channelPos, len);
adjustedWriter.fillCacheRange(channel, channelPos, relativePos, len, progressUpdater);
assert regionOwners.get(fileRegion.io) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]";
return new RangeMissingHandler() {
@Override
public void fillCacheRange(SharedBytes.IO channel, int channelPos, int relativePos, int len, IntConsumer progressUpdater) throws IOException {
assert assertValidRegionAndLength(fileRegion, channelPos, len);
adjustedWriter.fillCacheRange(channel, channelPos, relativePos, len, progressUpdater);
assert regionOwners.get(fileRegion.io) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]";
}

@Override
public void fillCacheRange(
SharedBytes.IO channel,
int channelPos,
int relativePos,
int len,
IntConsumer progressUpdater,
ActionListener<Void> listener
) {
assert assertValidRegionAndLength(fileRegion, channelPos, len);
adjustedWriter.fillCacheRange(
channel,
channelPos,
relativePos,
len,
progressUpdater,
ActionListener.runBefore(listener, () -> {
assert regionOwners.get(fileRegion.io) == fileRegion
: "File chunk [" + fileRegion.regionKey + "] no longer owns IO [" + fileRegion.io + "]";
})
);
}
};
}
return adjustedWriter;
Expand Down Expand Up @@ -1164,6 +1300,20 @@ public interface RangeAvailableHandler {
public interface RangeMissingHandler {
void fillCacheRange(SharedBytes.IO channel, int channelPos, int relativePos, int length, IntConsumer progressUpdater)
throws IOException;

default void fillCacheRange(
SharedBytes.IO channel,
int channelPos,
int relativePos,
int length,
IntConsumer progressUpdater,
ActionListener<Void> listener
) {
ActionListener.completeWith(listener, () -> {
fillCacheRange(channel, channelPos, relativePos, length, progressUpdater);
return null;
});
}
}

public record Stats(
Expand Down

0 comments on commit 3ba8ade

Please sign in to comment.