diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeineCacheBuilder.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeineCacheBuilder.kt index ee3be5e13..2410683ce 100644 --- a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeineCacheBuilder.kt +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeineCacheBuilder.kt @@ -17,6 +17,7 @@ package jetbrains.exodus.core.cache import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalListener internal object CaffeineCacheBuilder { @@ -25,18 +26,23 @@ internal object CaffeineCacheBuilder { */ fun build( config: CaffeineCacheConfig, - keyTransformer: ((K) -> ConfigK) + keyTransformer: ((K) -> ConfigK), + evictionListener: RemovalListener? = null ): Cache { - return doBuild(config, keyTransformer) + return doBuild(config, keyTransformer, evictionListener) } - fun build(config: CaffeineCacheConfig): Cache { - return doBuild(config) + fun build( + config: CaffeineCacheConfig, + evictionListener: RemovalListener? = null + ): Cache { + return doBuild(config, null, evictionListener) } private fun doBuild( config: CaffeineCacheConfig, - keyTransformer: ((K) -> ConfigK)? = null + keyTransformer: ((K) -> ConfigK)? = null, + evictionListener: RemovalListener? = null ): Cache { return Caffeine.newBuilder() // Size eviction @@ -50,7 +56,7 @@ internal object CaffeineCacheBuilder { is WeightSizeEviction -> { maximumWeight(sizeEviction.maxWeight) weigher { key: K, value: V -> - @Suppress("UNCHECKED_CAST") + @Suppress("UNCHECKED_CAST", "NAME_SHADOWING") val key = keyTransformer?.invoke(key) ?: (key as ConfigK) sizeEviction.weigher(key, value) } @@ -62,6 +68,7 @@ internal object CaffeineCacheBuilder { // Reference eviction .apply { if (config.useSoftValues) softValues() } .apply { if (config.directExecution) executor(Runnable::run) } + .apply { if (evictionListener != null) evictionListener(evictionListener) } .build() } } \ No newline at end of file diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt index 3c01245b3..563330681 100644 --- a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt @@ -16,9 +16,11 @@ package jetbrains.exodus.core.cache import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.RemovalListener import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import java.util.function.BiConsumer +import kotlin.math.max /** @@ -31,6 +33,7 @@ class CaffeinePersistentCache private constructor( private val config: CaffeineCacheConfig, override val version: Long, private val versionTracker: VersionTracker, + private val latestKeyVersions: MutableMap ) : PersistentCache { private data class VersionedKey(val key: K, val version: Long) @@ -75,15 +78,27 @@ class CaffeinePersistentCache private constructor( companion object { fun create(config: CaffeineCacheConfig): CaffeinePersistentCache { - val cache = CaffeineCacheBuilder.build(config, VersionedKey::key) + val latestVersionKeys = ConcurrentHashMap() + val evictionListener = RemovalListener, V> { versionedKey, _, _ -> + if (versionedKey == null) { + return@RemovalListener + } + val (key, version) = versionedKey + latestVersionKeys.compute(key) { _, latestVersion -> + if (version == latestVersion) null else latestVersion + } + } + + val cache = CaffeineCacheBuilder.build(config, VersionedKey::key, evictionListener) val version = 0L val tracker = VersionTracker(version) - return CaffeinePersistentCache(cache, config, version, tracker) + return CaffeinePersistentCache(cache, config, version, tracker, latestVersionKeys) } } - private val currentVersionKeys = ConcurrentHashMap.newKeySet() + // Map of keys with their corresponding version available for the current version of cache + private val currentKeys = ConcurrentHashMap() // Generic cache impl override fun size(): Long { @@ -95,25 +110,35 @@ class CaffeinePersistentCache private constructor( } override fun get(key: K): V? { - val versionedKey = VersionedKey(key, version) + val keyVersion = currentKeys[key] ?: return null + val versionedKey = VersionedKey(key, keyVersion) return cache.getIfPresent(versionedKey) } override fun put(key: K, value: V) { - val versionedKey = VersionedKey(key, version) - cache.put(versionedKey, value) - currentVersionKeys.add(key) + val currentVersion = version + currentKeys.compute(key) { _, _ -> + val versionedKey = VersionedKey(key, currentVersion) + cache.put(versionedKey, value) + + currentVersion + } + latestKeyVersions.compute(key) { _, latestKeyVersion -> + max(currentVersion, latestKeyVersion ?: -1) + } } override fun remove(key: K) { - val versionedKey = VersionedKey(key, version) - cache.invalidate(versionedKey) - currentVersionKeys.remove(key) + currentKeys.computeIfPresent(key) { _, keyVersion -> + val versionedKey = VersionedKey(key, keyVersion) + cache.invalidate(versionedKey) + null + } } override fun clear() { cache.invalidateAll() - currentVersionKeys.clear() + currentKeys.clear() } override fun forceEviction() { @@ -121,16 +146,23 @@ class CaffeinePersistentCache private constructor( } override fun forEachEntry(consumer: BiConsumer) { - currentVersionKeys.forEachCacheEntry(consumer::accept) + currentKeys.forEachCacheEntry(consumer::accept) } // Persistent cache impl override fun createNextVersion(entryConsumer: BiConsumer?): PersistentCache { val nextVersion = versionTracker.next() - val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker) - currentVersionKeys.forEachCacheEntry { key, value -> - newCache.put(key, value) - entryConsumer?.accept(key, value) + val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker, latestKeyVersions) + + // Copy keys available for the next cache + // It effectively prohibits new version from seeing updates for previous versions + currentKeys.forEach { (key, version) -> + val versionedKey = VersionedKey(key, version) + val value = cache.getIfPresent(versionedKey) + if (value != null) { + newCache.currentKeys[key] = version + entryConsumer?.accept(key, value) + } } return newCache } @@ -148,21 +180,17 @@ class CaffeinePersistentCache private constructor( private fun unregisterAndCleanUp(client: CacheClient) { val leftClients = versionTracker.unregister(client, version) if (leftClients == 0L && version < versionTracker.currentVersion) { - currentVersionKeys.forEachVersionedKey { - cache.invalidate(it) + currentKeys.forEach { (key, version) -> + val latestVersion = latestKeyVersions[key] ?: Long.MAX_VALUE + if (version <= latestVersion) { + cache.invalidate(VersionedKey(key, version)) + } } } } - private fun Set.forEachVersionedKey(consumer: (VersionedKey) -> Unit) { - forEach { key -> - val versionedKey = VersionedKey(key, version) - consumer(versionedKey) - } - } - - private fun Set.forEachCacheEntry(consumer: (K, V) -> Unit) { - forEach { key -> + private fun Map.forEachCacheEntry(consumer: (K, V) -> Unit) { + forEach { (key, version) -> val versionedKey = VersionedKey(key, version) val value = cache.getIfPresent(versionedKey) if (value != null) { diff --git a/utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt b/utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt index 24d862cba..7f84afecb 100644 --- a/utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt +++ b/utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt @@ -13,12 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package jetbrains.exodus.core.dataStructures.cache +package jetbrains.exodus.core.cache -import jetbrains.exodus.core.cache.CaffeineCacheConfig -import jetbrains.exodus.core.cache.CaffeinePersistentCache -import jetbrains.exodus.core.cache.FixedSizeEviction -import jetbrains.exodus.core.cache.WeightSizeEviction import org.junit.Assert.assertEquals import org.junit.Assert.assertNull import org.junit.Assert.assertTrue @@ -114,24 +110,6 @@ class CaffeinePersistentCacheTest { assertNull(value2) } - @Test - fun `should copy entries to new version`() { - // Given - val cache1 = givenSizedCache(4) // 2 entries for each version - cache1.put("key1", "value1") - cache1.put("key2", "value2") - val cache2 = cache1.createNextVersion() - - // When - val value1 = cache2.get("key1") - val value2 = cache2.get("key2") - - // Then - assertEquals(4, cache2.count()) - assertEquals("value1", value1) - assertEquals("value2", value2) - - } @Test fun `should evict when client unregisters`() {