Skip to content

Commit

Permalink
JT-78303: Prototype implementation with only keys coping for next ver…
Browse files Browse the repository at this point in the history
…sion without touching cache
  • Loading branch information
vladimir-zatsepin committed Jan 12, 2024
1 parent 946eba3 commit 49ed229
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package jetbrains.exodus.core.cache

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener

internal object CaffeineCacheBuilder {

Expand All @@ -25,18 +26,23 @@ internal object CaffeineCacheBuilder {
*/
fun <K, V, ConfigK> build(
config: CaffeineCacheConfig<ConfigK, V>,
keyTransformer: ((K) -> ConfigK)
keyTransformer: ((K) -> ConfigK),
evictionListener: RemovalListener<K, V>? = null
): Cache<K, V> {
return doBuild(config, keyTransformer)
return doBuild(config, keyTransformer, evictionListener)
}

fun <K, V> build(config: CaffeineCacheConfig<K, V>): Cache<K, V> {
return doBuild(config)
fun <K, V> build(
config: CaffeineCacheConfig<K, V>,
evictionListener: RemovalListener<K, V>? = null
): Cache<K, V> {
return doBuild(config, null, evictionListener)
}

private fun <K, V, ConfigK> doBuild(
config: CaffeineCacheConfig<ConfigK, V>,
keyTransformer: ((K) -> ConfigK)? = null
keyTransformer: ((K) -> ConfigK)? = null,
evictionListener: RemovalListener<K, V>? = null
): Cache<K, V> {
return Caffeine.newBuilder()
// Size eviction
Expand All @@ -50,7 +56,7 @@ internal object CaffeineCacheBuilder {
is WeightSizeEviction -> {
maximumWeight(sizeEviction.maxWeight)
weigher { key: K, value: V ->
@Suppress("UNCHECKED_CAST")
@Suppress("UNCHECKED_CAST", "NAME_SHADOWING")
val key = keyTransformer?.invoke(key) ?: (key as ConfigK)
sizeEviction.weigher(key, value)
}
Expand All @@ -62,6 +68,7 @@ internal object CaffeineCacheBuilder {
// Reference eviction
.apply { if (config.useSoftValues) softValues() }
.apply { if (config.directExecution) executor(Runnable::run) }
.apply { if (evictionListener != null) evictionListener(evictionListener) }
.build<K, V>()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package jetbrains.exodus.core.cache

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.RemovalListener
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.function.BiConsumer
import kotlin.math.max


/**
Expand All @@ -31,6 +33,7 @@ class CaffeinePersistentCache<K, V> private constructor(
private val config: CaffeineCacheConfig<K, V>,
override val version: Long,
private val versionTracker: VersionTracker,
private val latestKeyVersions: MutableMap<K, Long>
) : PersistentCache<K, V> {

private data class VersionedKey<K>(val key: K, val version: Long)
Expand Down Expand Up @@ -75,15 +78,27 @@ class CaffeinePersistentCache<K, V> private constructor(
companion object {

fun <K, V> create(config: CaffeineCacheConfig<K, V>): CaffeinePersistentCache<K, V> {
val cache = CaffeineCacheBuilder.build(config, VersionedKey<K>::key)
val latestVersionKeys = ConcurrentHashMap<K, Long>()
val evictionListener = RemovalListener<VersionedKey<K>, V> { versionedKey, _, _ ->
if (versionedKey == null) {
return@RemovalListener
}
val (key, version) = versionedKey
latestVersionKeys.compute(key) { _, latestVersion ->
if (version == latestVersion) null else latestVersion
}
}

val cache = CaffeineCacheBuilder.build(config, VersionedKey<K>::key, evictionListener)
val version = 0L
val tracker = VersionTracker(version)

return CaffeinePersistentCache(cache, config, version, tracker)
return CaffeinePersistentCache(cache, config, version, tracker, latestVersionKeys)
}
}

private val currentVersionKeys = ConcurrentHashMap.newKeySet<K>()
// Map of keys with their corresponding version available for the current version of cache
private val currentKeys = ConcurrentHashMap<K, Long>()

// Generic cache impl
override fun size(): Long {
Expand All @@ -95,42 +110,59 @@ class CaffeinePersistentCache<K, V> private constructor(
}

override fun get(key: K): V? {
val versionedKey = VersionedKey(key, version)
val keyVersion = currentKeys[key] ?: return null
val versionedKey = VersionedKey(key, keyVersion)
return cache.getIfPresent(versionedKey)
}

override fun put(key: K, value: V) {
val versionedKey = VersionedKey(key, version)
cache.put(versionedKey, value)
currentVersionKeys.add(key)
val currentVersion = version
currentKeys.compute(key) { _, _ ->
val versionedKey = VersionedKey(key, currentVersion)
cache.put(versionedKey, value)

currentVersion
}
latestKeyVersions.compute(key) { _, latestKeyVersion ->
max(currentVersion, latestKeyVersion ?: -1)
}
}

override fun remove(key: K) {
val versionedKey = VersionedKey(key, version)
cache.invalidate(versionedKey)
currentVersionKeys.remove(key)
currentKeys.computeIfPresent(key) { _, keyVersion ->
val versionedKey = VersionedKey(key, keyVersion)
cache.invalidate(versionedKey)
null
}
}

override fun clear() {
cache.invalidateAll()
currentVersionKeys.clear()
currentKeys.clear()
}

override fun forceEviction() {
cache.cleanUp()
}

override fun forEachEntry(consumer: BiConsumer<K, V>) {
currentVersionKeys.forEachCacheEntry(consumer::accept)
currentKeys.forEachCacheEntry(consumer::accept)
}

// Persistent cache impl
override fun createNextVersion(entryConsumer: BiConsumer<K, V>?): PersistentCache<K, V> {
val nextVersion = versionTracker.next()
val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker)
currentVersionKeys.forEachCacheEntry { key, value ->
newCache.put(key, value)
entryConsumer?.accept(key, value)
val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker, latestKeyVersions)

// Copy keys available for the next cache
// It effectively prohibits new version from seeing updates for previous versions
currentKeys.forEach { (key, version) ->
val versionedKey = VersionedKey(key, version)
val value = cache.getIfPresent(versionedKey)
if (value != null) {
newCache.currentKeys[key] = version
entryConsumer?.accept(key, value)
}
}
return newCache
}
Expand All @@ -148,21 +180,17 @@ class CaffeinePersistentCache<K, V> private constructor(
private fun unregisterAndCleanUp(client: CacheClient) {
val leftClients = versionTracker.unregister(client, version)
if (leftClients == 0L && version < versionTracker.currentVersion) {
currentVersionKeys.forEachVersionedKey {
cache.invalidate(it)
currentKeys.forEach { (key, version) ->
val latestVersion = latestKeyVersions[key] ?: Long.MAX_VALUE
if (version <= latestVersion) {
cache.invalidate(VersionedKey(key, version))
}
}
}
}

private fun Set<K>.forEachVersionedKey(consumer: (VersionedKey<K>) -> Unit) {
forEach { key ->
val versionedKey = VersionedKey(key, version)
consumer(versionedKey)
}
}

private fun Set<K>.forEachCacheEntry(consumer: (K, V) -> Unit) {
forEach { key ->
private fun Map<K, Long>.forEachCacheEntry(consumer: (K, V) -> Unit) {
forEach { (key, version) ->
val versionedKey = VersionedKey(key, version)
val value = cache.getIfPresent(versionedKey)
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jetbrains.exodus.core.dataStructures.cache
package jetbrains.exodus.core.cache

import jetbrains.exodus.core.cache.CaffeineCacheConfig
import jetbrains.exodus.core.cache.CaffeinePersistentCache
import jetbrains.exodus.core.cache.FixedSizeEviction
import jetbrains.exodus.core.cache.WeightSizeEviction
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
Expand Down Expand Up @@ -114,24 +110,6 @@ class CaffeinePersistentCacheTest {
assertNull(value2)
}

@Test
fun `should copy entries to new version`() {
// Given
val cache1 = givenSizedCache(4) // 2 entries for each version
cache1.put("key1", "value1")
cache1.put("key2", "value2")
val cache2 = cache1.createNextVersion()

// When
val value1 = cache2.get("key1")
val value2 = cache2.get("key2")

// Then
assertEquals(4, cache2.count())
assertEquals("value1", value1)
assertEquals("value2", value2)

}

@Test
fun `should evict when client unregisters`() {
Expand Down

0 comments on commit 49ed229

Please sign in to comment.