Skip to content

Commit

Permalink
fix(cache): fix cache reuse for re-created topic (#1418)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Jun 14, 2024
1 parent 4168ef0 commit dc8ee74
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class ElasticTimeIndex extends TimeIndex {
private final File file;
private final FileCache cache;
private final Long cachePathId;
final ElasticStreamSlice stream;

private volatile CompletableFuture<?> lastAppend = CompletableFuture.completedFuture(null);
Expand All @@ -46,6 +47,7 @@ public ElasticTimeIndex(
super(file, baseOffset, maxIndexSize, true, true);
this.file = file;
this.cache = cache;
this.cachePathId = cache.newPathId();
this.stream = sliceSupplier.get();
setEntries((int) (stream.nextOffset() / ENTRY_SIZE));
if (entries() == 0) {
Expand Down Expand Up @@ -132,7 +134,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
buffer.flip();
long position = stream.nextOffset();
lastAppend = stream.append(RawPayloadRecordBatch.of(buffer));
cache.put(file.getPath(), position, Unpooled.wrappedBuffer(buffer));
cache.put(cachePathId, position, Unpooled.wrappedBuffer(buffer));
incrementEntries();
lastEntry(new TimestampOffset(timestamp, offset));
}
Expand Down Expand Up @@ -250,7 +252,7 @@ protected TimestampOffset parseEntry(ByteBuffer buffer, int n) {
}

private TimestampOffset tryGetEntryFromCache(int n) {
Optional<ByteBuf> rst = cache.get(file.getPath(), (long) n * ENTRY_SIZE, ENTRY_SIZE);
Optional<ByteBuf> rst = cache.get(cachePathId, (long) n * ENTRY_SIZE, ENTRY_SIZE);
if (rst.isPresent()) {
ByteBuf buffer = rst.get();
return new TimestampOffset(buffer.readLong(), baseOffset() + buffer.readInt());
Expand Down Expand Up @@ -292,7 +294,7 @@ private TimestampOffset parseEntry0(int n) throws ExecutionException, Interrupte
}
ByteBuf buf = Unpooled.buffer(records.size() * ENTRY_SIZE);
records.forEach(record -> buf.writeBytes(record.rawPayload()));
cache.put(file.getPath(), startOffset, buf);
cache.put(cachePathId, startOffset, buf);
ByteBuf indexEntry = Unpooled.wrappedBuffer(records.get(0).rawPayload());
timestampOffset = new TimestampOffset(indexEntry.readLong(), baseOffset() + indexEntry.readInt());
rst.free();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ElasticTransactionIndex extends TransactionIndex {
private final StreamSliceSupplier streamSupplier;
ElasticStreamSlice stream;
private final FileCache cache;
private final Long cachePathId;
private final String path;
private volatile LastAppend lastAppend;

Expand All @@ -47,6 +48,7 @@ public ElasticTransactionIndex(long startOffset, File file, StreamSliceSupplier
this.streamSupplier = streamSupplier;
this.stream = streamSupplier.get();
this.cache = cache;
this.cachePathId = cache.newPathId();
this.path = file.getPath();
lastAppend = new LastAppend(stream.nextOffset(), CompletableFuture.completedFuture(null));
}
Expand All @@ -70,7 +72,7 @@ public void append(AbortedTxn abortedTxn) {
long position = stream.nextOffset();
CompletableFuture<?> cf = stream.append(RawPayloadRecordBatch.of(abortedTxn.buffer().duplicate()));
lastAppend = new LastAppend(stream.nextOffset(), cf);
cache.put(path, position, Unpooled.wrappedBuffer(abortedTxn.buffer()));
cache.put(cachePathId, position, Unpooled.wrappedBuffer(abortedTxn.buffer()));
}

@Override
Expand Down Expand Up @@ -179,18 +181,18 @@ public AbortedTxnWithPosition next() {
+ AbortedTxn.CURRENT_VERSION);
return item;
}
int getLength = Math.min(position.value + AbortedTxn.TOTAL_SIZE * 128, endPosition);
Optional<ByteBuf> cacheDataOpt = cache.get(path, position.value, getLength);
int endOffset = Math.min(position.value + AbortedTxn.TOTAL_SIZE * 128, endPosition);
Optional<ByteBuf> cacheDataOpt = cache.get(cachePathId, position.value, endOffset - position.value);
ByteBuf buf;
if (cacheDataOpt.isPresent()) {
buf = cacheDataOpt.get();
} else {
FetchResult records = fetchStream(position.value, getLength, getLength);
FetchResult records = fetchStream(position.value, endOffset, endOffset - position.value);
ByteBuf txnListBuf = Unpooled.buffer(records.recordBatchList().size() * AbortedTxn.TOTAL_SIZE);
records.recordBatchList().forEach(r -> {
txnListBuf.writeBytes(r.rawPayload());
});
cache.put(path, position.value, txnListBuf);
cache.put(cachePathId, position.value, txnListBuf);
records.free();
buf = txnListBuf;
}
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -49,13 +50,14 @@ public class FileCache {
private final int blockSize;
private final BitSet freeBlocks;
private final LRUCache<Key, Value> lru = new LRUCache<>();
final Map<String, NavigableMap<Long, Value>> path2cache = new HashMap<>();
final Map<Long, NavigableMap<Long, Value>> path2cache = new HashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
int freeBlockCount;
private int freeCheckPoint = 0;
private final MappedByteBuffer cacheByteBuffer;
private final AtomicLong pathIdAlloc = new AtomicLong();

public FileCache(String path, int size, int blockSize) throws IOException {
this.blockSize = blockSize;
Expand Down Expand Up @@ -84,7 +86,11 @@ public FileCache(String path, int size) throws IOException {
this(path, size, BLOCK_SIZE);
}

public void put(String path, long position, ByteBuf data) {
public Long newPathId() {
return pathIdAlloc.incrementAndGet();
}

public void put(Long path, long position, ByteBuf data) {
writeLock.lock();
try {
int dataLength = data.readableBytes();
Expand Down Expand Up @@ -140,7 +146,7 @@ public void put(String path, long position, ByteBuf data) {
}
}

public Optional<ByteBuf> get(String filePath, long position, int length) {
public Optional<ByteBuf> get(Long filePath, long position, int length) {
ByteBuf buf = Unpooled.buffer(length);
readLock.lock();
try {
Expand Down Expand Up @@ -243,10 +249,10 @@ private int align(int size) {
}

static class Key implements Comparable<Key> {
String path;
Long path;
long position;

public Key(String path, long position) {
public Key(Long path, long position) {
this.path = path;
this.position = position;
}
Expand Down
81 changes: 44 additions & 37 deletions core/src/test/java/kafka/log/streamaspect/cache/FileCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,99 +39,106 @@ public void test() throws IOException {
FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024);

// occupy block 0,1
fileCache.put("test1", 10, genBuf((byte) 1, 2 * 1024));
Long pathId1 = fileCache.newPathId();
fileCache.put(pathId1, 10, genBuf((byte) 1, 2 * 1024));

ByteBuf rst = fileCache.get("test1", 10 + 1000, 1024).get();
ByteBuf rst = fileCache.get(pathId1, 10 + 1000, 1024).get();
assertEquals(1024, rst.readableBytes());
assertTrue(verify(rst, (byte) 1));

Assertions.assertFalse(fileCache.get("test1", 10 + 1000, 2048).isPresent());
Assertions.assertFalse(fileCache.get(pathId1, 10 + 1000, 2048).isPresent());

// occupy block 2,3
fileCache.put("test2", 233, genBuf((byte) 2, 1025));
Long pathId2 = fileCache.newPathId();
fileCache.put(pathId2, 233, genBuf((byte) 2, 1025));

// occupy block 4~8
fileCache.put("test2", 2048, genBuf((byte) 4, 1024 * 5));
fileCache.put(pathId2, 2048, genBuf((byte) 4, 1024 * 5));

// occupy block 9
fileCache.put("test2", 10000, genBuf((byte) 5, 1024));
fileCache.put(pathId2, 10000, genBuf((byte) 5, 1024));

// touch lru
assertEquals(1025, fileCache.get("test2", 233, 1025).get().readableBytes());
assertEquals(1024, fileCache.get("test2", 10000, 1024).get().readableBytes());
assertEquals(2048, fileCache.get("test1", 10, 2048).get().readableBytes());
assertEquals(1024 * 5, fileCache.get("test2", 2048, 1024 * 5).get().readableBytes());
assertEquals(1025, fileCache.get(pathId2, 233, 1025).get().readableBytes());
assertEquals(1024, fileCache.get(pathId2, 10000, 1024).get().readableBytes());
assertEquals(2048, fileCache.get(pathId1, 10, 2048).get().readableBytes());
assertEquals(1024 * 5, fileCache.get(pathId2, 2048, 1024 * 5).get().readableBytes());

// expect evict test2-233 and test2-10000
fileCache.put("test3", 123, genBuf((byte) 6, 2049));
Long pathId3 = fileCache.newPathId();
fileCache.put(pathId3, 123, genBuf((byte) 6, 2049));

FileCache.Value value = fileCache.path2cache.get("test3").get(123L);
FileCache.Value value = fileCache.path2cache.get(pathId3).get(123L);
assertEquals(2049, value.dataLength);
assertArrayEquals(new int[]{2, 3, 9}, value.blocks);


rst = fileCache.get("test3", 123, 2049).get();
rst = fileCache.get(pathId3, 123, 2049).get();
assertEquals(2049, rst.readableBytes());
assertTrue(verify(rst, (byte) 6));

// expect evict test1-10 and test2-2048
fileCache.put("test4", 123, genBuf((byte) 7, 2049));
value = fileCache.path2cache.get("test4").get(123L);
Long pathId4 = fileCache.newPathId();
fileCache.put(pathId4, 123, genBuf((byte) 7, 2049));
value = fileCache.path2cache.get(pathId4).get(123L);
assertArrayEquals(new int[]{0, 1, 4}, value.blocks);
rst = fileCache.get("test4", 123, 2049).get();
rst = fileCache.get(pathId4, 123, 2049).get();
assertTrue(verify(rst, (byte) 7));

assertEquals(4, fileCache.freeBlockCount);

// expect occupy free blocks 5,6,7
fileCache.put("test5", 123, genBuf((byte) 8, 2049));
value = fileCache.path2cache.get("test5").get(123L);
Long pathId5 = fileCache.newPathId();
fileCache.put(pathId5, 123, genBuf((byte) 8, 2049));
value = fileCache.path2cache.get(pathId5).get(123L);
assertArrayEquals(new int[]{5, 6, 7}, value.blocks);
rst = fileCache.get("test5", 123, 2049).get();
rst = fileCache.get(pathId5, 123, 2049).get();
assertTrue(verify(rst, (byte) 8));
assertEquals(1, fileCache.freeBlockCount);

fileCache.put("test6", 6666, genBuf((byte) 9, 3333));
rst = fileCache.get("test6", 6666L, 3333).get();
Long pathId6 = fileCache.newPathId();
fileCache.put(pathId6, 6666, genBuf((byte) 9, 3333));
rst = fileCache.get(pathId6, 6666L, 3333).get();
assertTrue(verify(rst, (byte) 9));

}

@Test
public void testMergePut() throws IOException {
FileCache fileCache = new FileCache("/tmp/file_cache_test", 10 * 1024, 1024);
Long pathId = fileCache.newPathId();
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 1, 500));
buf.addComponent(true, genBuf((byte) 2, 500));
buf.addComponent(true, genBuf((byte) 3, 500));
fileCache.put("test", 3333L, buf);
fileCache.put(pathId, 3333L, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(1500, fileCache.path2cache.get("test").get(3333L).dataLength);
assertEquals(1, fileCache.path2cache.get(pathId).size());
assertEquals(1500, fileCache.path2cache.get(pathId).get(3333L).dataLength);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 4, 500));
buf.addComponent(true, genBuf((byte) 5, 500));
buf.addComponent(true, genBuf((byte) 6, 500));
fileCache.put("test", 3333L + 1000, buf);
fileCache.put(pathId, 3333L + 1000, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(2500, fileCache.path2cache.get("test").get(3333L).dataLength);
assertEquals(1, fileCache.path2cache.get(pathId).size());
assertEquals(2500, fileCache.path2cache.get(pathId).get(3333L).dataLength);
{
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, genBuf((byte) 7, 500));
fileCache.put("test", 3333L + 1000 + 1500, buf);
fileCache.put(pathId, 3333L + 1000 + 1500, buf);
}
assertEquals(1, fileCache.path2cache.get("test").size());
assertEquals(3000, fileCache.path2cache.get("test").get(3333L).dataLength);

assertTrue(verify(fileCache.get("test", 3333L, 500).get(), (byte) 1));
assertTrue(verify(fileCache.get("test", 3333L + 500, 500).get(), (byte) 2));
assertTrue(verify(fileCache.get("test", 3333L + 1000, 500).get(), (byte) 4));
assertTrue(verify(fileCache.get("test", 3333L + 1500, 500).get(), (byte) 5));
assertTrue(verify(fileCache.get("test", 3333L + 2000, 500).get(), (byte) 6));
assertTrue(verify(fileCache.get("test", 3333L + 2500, 500).get(), (byte) 7));
assertEquals(1, fileCache.path2cache.get(pathId).size());
assertEquals(3000, fileCache.path2cache.get(pathId).get(3333L).dataLength);

assertTrue(verify(fileCache.get(pathId, 3333L, 500).get(), (byte) 1));
assertTrue(verify(fileCache.get(pathId, 3333L + 500, 500).get(), (byte) 2));
assertTrue(verify(fileCache.get(pathId, 3333L + 1000, 500).get(), (byte) 4));
assertTrue(verify(fileCache.get(pathId, 3333L + 1500, 500).get(), (byte) 5));
assertTrue(verify(fileCache.get(pathId, 3333L + 2000, 500).get(), (byte) 6));
assertTrue(verify(fileCache.get(pathId, 3333L + 2500, 500).get(), (byte) 7));
}

ByteBuf genBuf(byte data, int length) {
Expand Down

0 comments on commit dc8ee74

Please sign in to comment.