Skip to content

Commit

Permalink
JT-78303: Refactor persistent cache components; Add tests;
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-zatsepin committed Jan 17, 2024
1 parent 3031050 commit f100dc6
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import jetbrains.exodus.bindings.IntegerBinding;
import jetbrains.exodus.bindings.LongBinding;
import jetbrains.exodus.core.dataStructures.*;
import jetbrains.exodus.core.cache.CacheClient;
import jetbrains.exodus.core.cache.persistent.PersistentCacheClient;
import jetbrains.exodus.core.dataStructures.hash.*;
import jetbrains.exodus.crypto.EncryptedBlobVault;
import jetbrains.exodus.entitystore.iterate.*;
Expand Down Expand Up @@ -944,7 +944,7 @@ public void checkInvalidateBlobsFlag() {
checkInvalidateBlobsFlag = true;
}

private CacheClient cacheClient = null;
private PersistentCacheClient cacheClient = null;

private void initCaches() {
revertCaches(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package jetbrains.exodus.entitystore

import jetbrains.exodus.core.cache.*
import jetbrains.exodus.core.cache.persistent.*
import jetbrains.exodus.entitystore.iterate.CachedInstanceIterable
import java.time.Duration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package jetbrains.exodus.entitystore

import jetbrains.exodus.core.cache.PersistentCache
import jetbrains.exodus.core.cache.persistent.PersistentCache
import jetbrains.exodus.entitystore.PersistentStoreTransaction.HandleCheckerAdapter
import jetbrains.exodus.entitystore.iterate.CachedInstanceIterable
import jetbrains.exodus.entitystore.iterate.EntityIterableBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,106 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jetbrains.exodus.core.cache
package jetbrains.exodus.core.cache.persistent

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.function.BiConsumer


typealias Version = Long
typealias ValueWeigher<V> = (V) -> Int

/**
* This cache implementation is based on Caffeine cache. It versions each value stored.
* Put or remove values for the current version doesn't affect other versions.
* Put or remove values for the current version doesn't affect other existing versions.
*/
class CaffeinePersistentCache<K, V> private constructor(
private val cache: Cache<K, VersionedValues<V>>,
private val cache: Cache<K, WeightedValueMap<Version, V>>,
private val config: CaffeineCacheConfig<V>,
override val version: Long,
private val versionTracker: VersionTracker,
) : PersistentCache<K, V> {

// Thread-safe container for cached versioned values
private class VersionedValues<V>(private val weigher: ValueWeigher<V>) {
// Version -> Value map
private val map = ConcurrentHashMap<Version, V>()

// Total weight of all values collectively
private val totalWeightRef = AtomicInteger()

val totalWeight get() = totalWeightRef.get()
val size get() = map.size
val keys get() = map.keys

fun put(version: Version, value: V) {
map.compute(version) { _, prevValue ->
val toSubtract = prevValue?.let(weigher) ?: 0
totalWeightRef.updateAndGet { (it + weigher(value) - toSubtract).coerceAtLeast(0) }
value
}
}

fun remove(version: Version) {
map.computeIfPresent(version) { _, value ->
totalWeightRef.updateAndGet { (it - weigher(value)).coerceAtLeast(0) }
null
}
}

fun get(version: Version): V? {
return map[version]
}

fun orNullIfEmpty(): VersionedValues<V>? {
return if (map.isEmpty()) null else this
}
}

// Thread-safe class for tracking current version and clients registered for different versions
private class VersionTracker(initialVersion: Long = 0) {

data class ClientVersion(val client: CacheClient, val version: Long)

private val registeredClients = ConcurrentHashMap.newKeySet<ClientVersion>()
private val versionClientCount = ConcurrentHashMap<Long, Long>()
private val versionRef = AtomicLong(initialVersion)

fun next(): Long {
return versionRef.incrementAndGet()
}

fun register(client: CacheClient, version: Long) {
val wasAdded = registeredClients.add(ClientVersion(client, version))
if (wasAdded) {
versionClientCount.compute(version) { _, count -> if (count == null) 1 else count + 1 }
}
}

fun unregister(client: CacheClient, version: Long): Long {
val wasRemoved = registeredClients.remove(ClientVersion(client, version))
val clientsLeft = if (wasRemoved) {
versionClientCount.compute(version) { _, count ->
if (count == null || (count - 1) == 0L) {
null
} else {
count - 1
}
}
} else {
versionClientCount[version]
}
return clientsLeft ?: 0
}

fun hasNoClients(version: Version): Boolean {
return !versionClientCount.containsKey(version)
}
}

companion object {

fun <K, V> create(config: CaffeineCacheConfig<V>): CaffeinePersistentCache<K, V> {
Expand All @@ -126,10 +47,10 @@ class CaffeinePersistentCache<K, V> private constructor(
} else {
val eviction = config.sizeEviction as WeightedEviction
maximumWeight(eviction.maxWeight)
weigher { _: K, values: VersionedValues<V> -> values.totalWeight }
weigher { _: K, values: WeightedValueMap<Version, V> -> values.totalWeight }
}
}
.build<K, VersionedValues<V>>()
.build<K, WeightedValueMap<Version, V>>()
val version = 0L
val tracker = VersionTracker(version)

Expand Down Expand Up @@ -158,14 +79,20 @@ class CaffeinePersistentCache<K, V> private constructor(

override fun put(key: K, value: V) {
cache.asMap().compute(key) { _, map ->
val values = map ?: VersionedValues((config.sizeEviction as? WeightedEviction<V>)?.weigher ?: { 1 })
val values = map ?: createNewValueMap()
values.put(version, value)
values.removeUnusedVersions(version)
values
}
keyVersions[key] = version
}

private fun createNewValueMap(): WeightedValueMap<Version, V> {
// Weigher is used only for weighted eviction
val evictionConfig = config.sizeEviction as? WeightedEviction<V>
return WeightedValueMap(evictionConfig?.weigher ?: { 1 })
}

override fun remove(key: K) {
cache.asMap().computeIfPresent(key) { _, values ->
values.remove(version)
Expand Down Expand Up @@ -194,7 +121,7 @@ class CaffeinePersistentCache<K, V> private constructor(

// Persistent cache impl
override fun createNextVersion(entryConsumer: BiConsumer<K, V>?): PersistentCache<K, V> {
val nextVersion = versionTracker.next()
val nextVersion = versionTracker.nextVersion()
val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker)

// Copy key index for the next cache
Expand All @@ -211,8 +138,8 @@ class CaffeinePersistentCache<K, V> private constructor(
return newCache
}

override fun register(): CacheClient {
val client = object : CacheClient {
override fun register(): PersistentCacheClient {
val client = object : PersistentCacheClient {
override fun unregister() {
versionTracker.unregister(this, version)
}
Expand All @@ -221,12 +148,12 @@ class CaffeinePersistentCache<K, V> private constructor(
return client
}

private fun Cache<K, VersionedValues<V>>.getVersioned(key: K, version: Version): V? {
private fun Cache<K, WeightedValueMap<Version, V>>.getVersioned(key: K, version: Version): V? {
return this.getIfPresent(key)?.get(version)
}

// Returns true if values were changed
private fun VersionedValues<V>.removeUnusedVersions(currentVersion: Version): Boolean {
private fun WeightedValueMap<Version, V>.removeUnusedVersions(currentVersion: Version): Boolean {
if (this.size <= 1) {
return false
}
Expand All @@ -240,7 +167,7 @@ class CaffeinePersistentCache<K, V> private constructor(
return changed
}

private fun VersionedValues<V>.removeUnusedVersionsAndPutBack(key: K, currentVersion: Version) {
private fun WeightedValueMap<Version, V>.removeUnusedVersionsAndPutBack(key: K, currentVersion: Version) {
if (removeUnusedVersions(currentVersion)) {
cache.put(key, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jetbrains.exodus.core.cache
package jetbrains.exodus.core.cache.persistent

import java.time.Duration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jetbrains.exodus.core.cache
package jetbrains.exodus.core.cache.persistent

import jetbrains.exodus.core.cache.BasicCache
import java.util.function.BiConsumer

/**
Expand All @@ -39,9 +40,9 @@ interface PersistentCache<K, V> : BasicCache<K, V> {
* Returns a client that should be used to unregister the client to enable entries associated with its version
* to be clean up later during the ordinary operations like get or put.
*/
fun register(): CacheClient
fun register(): PersistentCacheClient
}

interface CacheClient {
interface PersistentCacheClient {
fun unregister()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package jetbrains.exodus.core.cache.persistent

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

// Thread-safe class for tracking current version and clients registered for different versions
internal class VersionTracker(initialVersion: Long = 0) {

@JvmRecord
internal data class ClientVersion(val client: PersistentCacheClient, val version: Long)

private val versionRef = AtomicLong(initialVersion)

private val registeredClients = ConcurrentHashMap.newKeySet<ClientVersion>()
private val versionClientCount = ConcurrentHashMap<Long, Long>()

fun nextVersion(): Long {
return versionRef.incrementAndGet()
}

fun register(client: PersistentCacheClient, version: Long) {
val wasAdded = registeredClients.add(ClientVersion(client, version))
if (wasAdded) {
versionClientCount.compute(version) { _, count -> if (count == null) 1 else count + 1 }
}
}

fun unregister(client: PersistentCacheClient, version: Long): Long {
val wasRemoved = registeredClients.remove(ClientVersion(client, version))
val clientsLeft = if (wasRemoved) {
versionClientCount.compute(version) { _, count ->
if (count == null || (count - 1) == 0L) null else count - 1
}
} else {
versionClientCount[version]
}
return clientsLeft ?: 0
}

fun hasNoClients(version: Version): Boolean {
return !versionClientCount.containsKey(version)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package jetbrains.exodus.core.cache.persistent

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

typealias ValueWeigher<V> = (V) -> Int

// Thread-safe container for versioned cache entry value that holds total weight of all values collectively
internal class WeightedValueMap<K, V>(private val weigher: ValueWeigher<V>) {

private val map = ConcurrentHashMap<K, V>()

// Total weight of all values collectively
private val totalWeightRef = AtomicInteger()

val size get() = map.size
val keys get() = map.keys
val totalWeight get() = totalWeightRef.get()

fun put(key: K, value: V) {
map.compute(key) { _, prevValue ->
val toSubtract = prevValue?.let(weigher) ?: 0
// Coersing is needed to avoid negative values as weigher can return different values for the same value
totalWeightRef.updateAndGet { (it + weigher(value) - toSubtract).coerceAtLeast(0) }
value
}
}

fun remove(key: K) {
map.computeIfPresent(key) { _, value ->
totalWeightRef.updateAndGet { (it - weigher(value)).coerceAtLeast(0) }
null
}
}

fun get(key: K): V? {
return map[key]
}

fun orNullIfEmpty(): WeightedValueMap<K, V>? {
return if (map.isEmpty()) null else this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jetbrains.exodus.core.cache
package jetbrains.exodus.core.cache.persistent

import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
Expand Down
Loading

0 comments on commit f100dc6

Please sign in to comment.