Skip to content

Commit

Permalink
cache file
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Feb 23, 2024
1 parent 08ab2c4 commit 6097b49
Showing 1 changed file with 12 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ protected long getRegionStart(int region) {
return (long) region * regionSize;
}

private long getRegionEnd(int region) {
protected long getRegionEnd(int region) {
return (long) (region + 1) * regionSize;
}

Expand Down Expand Up @@ -683,18 +683,17 @@ public final boolean isEvicted() {
}
}

protected boolean assertOffsetsWithinFileLength(long offset, long length, long fileLength, int region, long regionLength) {
protected boolean assertOffsetsWithinFileLength(long offset, long length, long fileLength) {
assert offset >= 0L;
assert length > 0L;
assert fileLength > 0L;
assert regionLength > 0L;
assert offset + length <= fileLength
: "accessing ["
+ length
+ "] bytes at offset ["
+ offset
+ "] in region ["
+ region
+ "] in cache file ["
+ this
+ "] would be beyond file length ["
+ fileLength
+ ']';
Expand All @@ -710,16 +709,13 @@ class CacheFileRegion extends EvictableRefCounted {

final RegionKey<KeyType> regionKey;
final SparseFileTracker tracker;
final int regionLength;
final long fileLength; // used in assertions only
// io can be null when not init'ed or after evict/take
volatile SharedBytes.IO io = null;

CacheFileRegion(RegionKey<KeyType> regionKey, long fileLength) {
CacheFileRegion(RegionKey<KeyType> regionKey, int regionSize) {
this.regionKey = regionKey;
this.regionLength = computeCacheFileRegionSize(fileLength, regionKey.region);
this.tracker = new SparseFileTracker("file", regionLength);
this.fileLength = Assertions.ENABLED ? fileLength : -1L;
assert regionSize > 0;
tracker = new SparseFileTracker("file", regionSize);
}

public long physicalStartOffset() {
Expand Down Expand Up @@ -805,7 +801,6 @@ private static void throwAlreadyEvicted() {
boolean tryRead(ByteBuffer buf, long offset) throws IOException {
SharedBytes.IO ioRef = this.io;
if (ioRef != null) {
assert assertOffsetsWithinFileLength(offset, buf.remaining(), fileLength, regionKey.region, regionLength);
int readBytes = ioRef.read(buf, getRegionRelativePosition(offset));
if (isEvicted()) {
buf.position(buf.position() - readBytes);
Expand Down Expand Up @@ -835,7 +830,6 @@ void populate(
final Executor executor,
final ActionListener<Boolean> listener
) {
assert assertOffsetsWithinFileLength(rangeToWrite.start(), rangeToWrite.length(), fileLength, regionKey.region, regionLength);
Releasable resource = null;
try {
incRefEnsureOpen();
Expand Down Expand Up @@ -870,8 +864,6 @@ void populateAndRead(
final Executor executor,
final ActionListener<Integer> listener
) {
assert assertOffsetsWithinFileLength(rangeToWrite.start(), rangeToWrite.length(), fileLength, regionKey.region, regionLength);
assert assertOffsetsWithinFileLength(rangeToRead.start(), rangeToRead.length(), fileLength, regionKey.region, regionLength);
Releasable resource = null;
try {
incRefEnsureOpen();
Expand Down Expand Up @@ -980,6 +972,7 @@ public KeyType getCacheKey() {
}

public boolean tryRead(ByteBuffer buf, long offset) throws IOException {
assert assertOffsetsWithinFileLength(offset, buf.remaining(), length);
final int startRegion = getRegion(offset);
final long end = offset + buf.remaining();
final int endRegion = getEndingRegion(end);
Expand Down Expand Up @@ -1009,6 +1002,8 @@ public int populateAndRead(
final RangeAvailableHandler reader,
final RangeMissingHandler writer
) throws Exception {
assert assertOffsetsWithinFileLength(rangeToWrite.start(), rangeToWrite.length(), length);
assert assertOffsetsWithinFileLength(rangeToRead.start(), rangeToRead.length(), length);
// We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start
// our measurement here.
final long startTime = threadPool.relativeTimeInNanos();
Expand Down Expand Up @@ -1241,7 +1236,8 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
// if we did not find an entry
var entry = keyMapping.get(regionKey);
if (entry == null) {
entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, fileLength), now));
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now));
}
// io is volatile, double locking is fine, as long as we assign it last.
if (entry.chunk.io == null) {
Expand Down

0 comments on commit 6097b49

Please sign in to comment.