Skip to content

Commit

Permalink
JT-78303: Fix key eviction performance bottleneck
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-zatsepin committed Jan 29, 2024
1 parent 7841ead commit 0a86ffd
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private boolean doCommit() {
if (txn.commit()) {
store.unregisterTransaction(this);
flushNonTransactionalBlobs();
revertCaches();
flushCaches(true);
return true;
}
revert();
Expand Down Expand Up @@ -217,7 +217,7 @@ public boolean flush() {
boolean doFlush() {
if (txn.flush()) {
flushNonTransactionalBlobs();
revertCaches(false); // do not clear props & links caches
flushCaches(false); // do not clear props & links caches
return true;
}
revert();
Expand All @@ -229,9 +229,6 @@ public void revert() {
closeOpenedBlobStreams();
txn.revert();
revertCaches();

mutableCache = null;
mutatedInTxn = new ArrayList<>();
}

public PersistentStoreTransaction getSnapshot() {
Expand Down Expand Up @@ -936,24 +933,31 @@ void closeCaches() {
blobStringsCache.close();
}

public void revertCaches() {
revertCaches(true);
}

public void checkInvalidateBlobsFlag() {
checkInvalidateBlobsFlag = true;
}

private PersistentCacheClient cacheClient = null;

private void initCaches() {
revertCaches(false);
if (mutableCache != null) {
// Mutable cache might not be null in case of transaction being reverted
mutableCache.release();
}
resetCaches(false);
if (cacheClient == null) {
cacheClient = localCache.getCache().register();
cacheClient = localCache.registerClient();
}
}

public void flushCaches(final boolean clearPropsAndLinksCache) {
resetCaches(clearPropsAndLinksCache);
}

public void revertCaches() {
if (mutableCache != null) {
mutableCache.release();
}
resetCaches(true);
}

private void revertCaches(final boolean clearPropsAndLinksCache) {
private void resetCaches(final boolean clearPropsAndLinksCache) {
if (clearPropsAndLinksCache) {
propsCache.clear();
linksCache.clear();
Expand Down Expand Up @@ -1008,7 +1012,7 @@ void apply() {

txn.setCommitHook(() -> {
log.flushed();
final EntityIterableCacheAdapterMutable cache = PersistentStoreTransaction.this.mutableCache;
final EntityIterableCacheAdapterMutable cache = this.mutableCache;
if (cache != null) { // mutableCache can be null if only blobs are modified
applyAtomicCaches(cache);
}
Expand Down Expand Up @@ -1112,14 +1116,16 @@ private void flushBlobs(final BlobVault blobVault) throws Exception {
}
}

private void applyAtomicCaches(@NotNull EntityIterableCacheAdapterMutable cache) {
private void applyAtomicCaches(@NotNull EntityIterableCacheAdapterMutable mutableCache) {
final EntityIterableCache entityIterableCache = store.getEntityIterableCache();
for (final Updatable it : mutatedInTxn) {
it.endUpdate(PersistentStoreTransaction.this);
}
if (!entityIterableCache.compareAndSetCacheAdapter(localCache, cache.endWrite())) {
EntityIterableCacheAdapter oldCache = localCache;
if (!entityIterableCache.compareAndSetCacheAdapter(localCache, mutableCache.endWrite())) {
throw new EntityStoreException("This exception should never be thrown");
}
oldCache.release();
}

private void applyExclusiveTransactionCaches() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ internal open class EntityIterableCacheAdapter(

open fun clear() = cache.clear()

fun registerClient(): PersistentCacheClient {
return cache.registerClient()
}

fun release() {
cache.release()
}

val halfFull: Boolean get() = cache.count() > cache.size() / 2

fun cloneToMutable(): EntityIterableCacheAdapterMutable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CaffeinePersistentCache<K : Any, V> private constructor(
weigher { _: K, values: WeightedValueMap<Version, V> -> values.totalWeight }
}
}
.evictionListener(evictionSubject)
.removalListener(evictionSubject)
.build<K, WeightedValueMap<Version, V>>()
val version = 0L
val tracker = VersionTracker(version)
Expand All @@ -71,10 +71,13 @@ class CaffeinePersistentCache<K : Any, V> private constructor(

private val cacheMap = cache.asMap()

// The implementation assumes that there is at least a single the most actual cache client is registered for eviction,
// otherwise it might lead to memory leaks as key eviction is ignored and keys might retain in key index forever
private val evictionListener: EvictionListener<K> = { key: K? -> key?.let { keyVersionIndex.removeKey(key) } }

init {
val listener: EvictionListener<K> = { key: K? -> key?.let { keyVersionIndex.removeKey(key) } }
evictionSubject.addListener(listener)
cleaner.register(this) { evictionSubject.removeListener(listener) }
evictionSubject.addListener(evictionListener)
cleaner.register(this) { evictionSubject.removeListener(evictionListener) }
}

// Generic cache impl
Expand Down Expand Up @@ -162,7 +165,7 @@ class CaffeinePersistentCache<K : Any, V> private constructor(
)
}

override fun register(): PersistentCacheClient {
override fun registerClient(): PersistentCacheClient {
val client = object : PersistentCacheClient {

private var unregistered = false
Expand All @@ -177,6 +180,10 @@ class CaffeinePersistentCache<K : Any, V> private constructor(
return client
}

override fun release() {
evictionSubject.removeListener(evictionListener)
}

// For tests
fun localIndexSize(): Int {
return keyVersionIndex.current.size()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,42 @@ interface PersistentCache<K, V> : BasicCache<K, V> {
* Returns a client that should be used to unregister the client to enable entries associated with its version
* to be clean up later during the ordinary operations like get or put.
*
* Should be used when there several concurrent clients might use old versions of cache.
*
* Must be invoked once per client.
*
* Should be used in the way like:
* ```
* val client = cache.registerClient()
* val nextCacheVersion = cache.createNextVersion()
* try {
* // do something with the cache
* cache.get(key)
* cache.put(key, value)
* // ...
* } finally {
* // As soon as client is unregistered, all values stored with its version are considered as stale
* client.unregister()
* }
*/
fun registerClient(): PersistentCacheClient

/**
* Release resources associated with the cache when it is no longer needed.
*
* Should be used in the way like:
* ```
* val cache = PersistentCacheImpl(...)
* try {
* // do something with the cache
* cache.get(key)
* cache.put(key, value)
* // ...
* } finally {
* cache.release()
* }
*/
fun register(): PersistentCacheClient
fun release()
}

interface PersistentCacheClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CaffeinePersistentCacheTest {
// Given
val cache1 = givenSizedCache(2)
// Register client to prevent cache from being removed for it's version
cache1.register()
cache1.registerClient()
val cache2 = cache1.createNextVersion()

// When
Expand Down Expand Up @@ -154,17 +154,18 @@ class CaffeinePersistentCacheTest {
val cache1 = givenSizedCache(2)
val cache2 = cache1.createNextVersion()

// When
val registration = cache1.register()
// When register
val registration = cache1.registerClient()
cache1.put("key", "value1")
cache2.put("key", "value2")
assertEquals("value1", cache1.get("key"))

// When unregister
registration.unregister()
// Trigger removal of unused values
cache2.get("key")

// Then
assertEquals("value2", cache2.get("key")) // also triggers eviction of old value
assertEquals(null, cache1.get("key"))
assertEquals("value2", cache2.get("key"))
}

@Test
Expand Down

0 comments on commit 0a86ffd

Please sign in to comment.