diff --git a/entity-store/src/main/java/jetbrains/exodus/entitystore/PersistentStoreTransaction.java b/entity-store/src/main/java/jetbrains/exodus/entitystore/PersistentStoreTransaction.java index d6eedfb40..2ffad7298 100644 --- a/entity-store/src/main/java/jetbrains/exodus/entitystore/PersistentStoreTransaction.java +++ b/entity-store/src/main/java/jetbrains/exodus/entitystore/PersistentStoreTransaction.java @@ -21,7 +21,7 @@ import jetbrains.exodus.bindings.IntegerBinding; import jetbrains.exodus.bindings.LongBinding; import jetbrains.exodus.core.dataStructures.*; -import jetbrains.exodus.core.cache.CacheClient; +import jetbrains.exodus.core.cache.persistent.PersistentCacheClient; import jetbrains.exodus.core.dataStructures.hash.*; import jetbrains.exodus.crypto.EncryptedBlobVault; import jetbrains.exodus.entitystore.iterate.*; @@ -944,7 +944,7 @@ public void checkInvalidateBlobsFlag() { checkInvalidateBlobsFlag = true; } - private CacheClient cacheClient = null; + private PersistentCacheClient cacheClient = null; private void initCaches() { revertCaches(false); diff --git a/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapter.kt b/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapter.kt index be658e781..be4586c88 100644 --- a/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapter.kt +++ b/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapter.kt @@ -15,7 +15,7 @@ */ package jetbrains.exodus.entitystore -import jetbrains.exodus.core.cache.* +import jetbrains.exodus.core.cache.persistent.* import jetbrains.exodus.entitystore.iterate.CachedInstanceIterable import java.time.Duration diff --git a/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapterMutable.kt b/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapterMutable.kt index 7761f5823..fdeeae146 100644 --- a/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapterMutable.kt +++ b/entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCacheAdapterMutable.kt @@ -15,7 +15,7 @@ */ package jetbrains.exodus.entitystore -import jetbrains.exodus.core.cache.PersistentCache +import jetbrains.exodus.core.cache.persistent.PersistentCache import jetbrains.exodus.entitystore.PersistentStoreTransaction.HandleCheckerAdapter import jetbrains.exodus.entitystore.iterate.CachedInstanceIterable import jetbrains.exodus.entitystore.iterate.EntityIterableBase diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCache.kt similarity index 58% rename from utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt rename to utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCache.kt index 4bfbe81e6..d725b9d4b 100644 --- a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCache.kt +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCache.kt @@ -13,106 +13,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package jetbrains.exodus.core.cache +package jetbrains.exodus.core.cache.persistent import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicLong import java.util.function.BiConsumer + typealias Version = Long -typealias ValueWeigher = (V) -> Int /** * This cache implementation is based on Caffeine cache. It versions each value stored. - * Put or remove values for the current version doesn't affect other versions. + * Put or remove values for the current version doesn't affect other existing versions. */ class CaffeinePersistentCache private constructor( - private val cache: Cache>, + private val cache: Cache>, private val config: CaffeineCacheConfig, override val version: Long, private val versionTracker: VersionTracker, ) : PersistentCache { - // Thread-safe container for cached versioned values - private class VersionedValues(private val weigher: ValueWeigher) { - // Version -> Value map - private val map = ConcurrentHashMap() - - // Total weight of all values collectively - private val totalWeightRef = AtomicInteger() - - val totalWeight get() = totalWeightRef.get() - val size get() = map.size - val keys get() = map.keys - - fun put(version: Version, value: V) { - map.compute(version) { _, prevValue -> - val toSubtract = prevValue?.let(weigher) ?: 0 - totalWeightRef.updateAndGet { (it + weigher(value) - toSubtract).coerceAtLeast(0) } - value - } - } - - fun remove(version: Version) { - map.computeIfPresent(version) { _, value -> - totalWeightRef.updateAndGet { (it - weigher(value)).coerceAtLeast(0) } - null - } - } - - fun get(version: Version): V? { - return map[version] - } - - fun orNullIfEmpty(): VersionedValues? { - return if (map.isEmpty()) null else this - } - } - - // Thread-safe class for tracking current version and clients registered for different versions - private class VersionTracker(initialVersion: Long = 0) { - - data class ClientVersion(val client: CacheClient, val version: Long) - - private val registeredClients = ConcurrentHashMap.newKeySet() - private val versionClientCount = ConcurrentHashMap() - private val versionRef = AtomicLong(initialVersion) - - fun next(): Long { - return versionRef.incrementAndGet() - } - - fun register(client: CacheClient, version: Long) { - val wasAdded = registeredClients.add(ClientVersion(client, version)) - if (wasAdded) { - versionClientCount.compute(version) { _, count -> if (count == null) 1 else count + 1 } - } - } - - fun unregister(client: CacheClient, version: Long): Long { - val wasRemoved = registeredClients.remove(ClientVersion(client, version)) - val clientsLeft = if (wasRemoved) { - versionClientCount.compute(version) { _, count -> - if (count == null || (count - 1) == 0L) { - null - } else { - count - 1 - } - } - } else { - versionClientCount[version] - } - return clientsLeft ?: 0 - } - - fun hasNoClients(version: Version): Boolean { - return !versionClientCount.containsKey(version) - } - } - companion object { fun create(config: CaffeineCacheConfig): CaffeinePersistentCache { @@ -126,10 +47,10 @@ class CaffeinePersistentCache private constructor( } else { val eviction = config.sizeEviction as WeightedEviction maximumWeight(eviction.maxWeight) - weigher { _: K, values: VersionedValues -> values.totalWeight } + weigher { _: K, values: WeightedValueMap -> values.totalWeight } } } - .build>() + .build>() val version = 0L val tracker = VersionTracker(version) @@ -158,7 +79,7 @@ class CaffeinePersistentCache private constructor( override fun put(key: K, value: V) { cache.asMap().compute(key) { _, map -> - val values = map ?: VersionedValues((config.sizeEviction as? WeightedEviction)?.weigher ?: { 1 }) + val values = map ?: createNewValueMap() values.put(version, value) values.removeUnusedVersions(version) values @@ -166,6 +87,12 @@ class CaffeinePersistentCache private constructor( keyVersions[key] = version } + private fun createNewValueMap(): WeightedValueMap { + // Weigher is used only for weighted eviction + val evictionConfig = config.sizeEviction as? WeightedEviction + return WeightedValueMap(evictionConfig?.weigher ?: { 1 }) + } + override fun remove(key: K) { cache.asMap().computeIfPresent(key) { _, values -> values.remove(version) @@ -194,7 +121,7 @@ class CaffeinePersistentCache private constructor( // Persistent cache impl override fun createNextVersion(entryConsumer: BiConsumer?): PersistentCache { - val nextVersion = versionTracker.next() + val nextVersion = versionTracker.nextVersion() val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker) // Copy key index for the next cache @@ -211,8 +138,8 @@ class CaffeinePersistentCache private constructor( return newCache } - override fun register(): CacheClient { - val client = object : CacheClient { + override fun register(): PersistentCacheClient { + val client = object : PersistentCacheClient { override fun unregister() { versionTracker.unregister(this, version) } @@ -221,12 +148,12 @@ class CaffeinePersistentCache private constructor( return client } - private fun Cache>.getVersioned(key: K, version: Version): V? { + private fun Cache>.getVersioned(key: K, version: Version): V? { return this.getIfPresent(key)?.get(version) } // Returns true if values were changed - private fun VersionedValues.removeUnusedVersions(currentVersion: Version): Boolean { + private fun WeightedValueMap.removeUnusedVersions(currentVersion: Version): Boolean { if (this.size <= 1) { return false } @@ -240,7 +167,7 @@ class CaffeinePersistentCache private constructor( return changed } - private fun VersionedValues.removeUnusedVersionsAndPutBack(key: K, currentVersion: Version) { + private fun WeightedValueMap.removeUnusedVersionsAndPutBack(key: K, currentVersion: Version) { if (removeUnusedVersions(currentVersion)) { cache.put(key, this) } diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheConfig.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCacheConfig.kt similarity index 96% rename from utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheConfig.kt rename to utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCacheConfig.kt index 244693f42..cfcd80392 100644 --- a/utils/src/main/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheConfig.kt +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCacheConfig.kt @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package jetbrains.exodus.core.cache +package jetbrains.exodus.core.cache.persistent import java.time.Duration diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/PersistentCache.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/PersistentCache.kt similarity index 89% rename from utils/src/main/kotlin/jetbrains/exodus/core/cache/PersistentCache.kt rename to utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/PersistentCache.kt index 21213fe84..e710454fd 100644 --- a/utils/src/main/kotlin/jetbrains/exodus/core/cache/PersistentCache.kt +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/PersistentCache.kt @@ -13,8 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package jetbrains.exodus.core.cache +package jetbrains.exodus.core.cache.persistent +import jetbrains.exodus.core.cache.BasicCache import java.util.function.BiConsumer /** @@ -39,9 +40,9 @@ interface PersistentCache : BasicCache { * Returns a client that should be used to unregister the client to enable entries associated with its version * to be clean up later during the ordinary operations like get or put. */ - fun register(): CacheClient + fun register(): PersistentCacheClient } -interface CacheClient { +interface PersistentCacheClient { fun unregister() } diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/VersionTracker.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/VersionTracker.kt new file mode 100644 index 000000000..976053c16 --- /dev/null +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/VersionTracker.kt @@ -0,0 +1,43 @@ +package jetbrains.exodus.core.cache.persistent + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +// Thread-safe class for tracking current version and clients registered for different versions +internal class VersionTracker(initialVersion: Long = 0) { + + @JvmRecord + internal data class ClientVersion(val client: PersistentCacheClient, val version: Long) + + private val versionRef = AtomicLong(initialVersion) + + private val registeredClients = ConcurrentHashMap.newKeySet() + private val versionClientCount = ConcurrentHashMap() + + fun nextVersion(): Long { + return versionRef.incrementAndGet() + } + + fun register(client: PersistentCacheClient, version: Long) { + val wasAdded = registeredClients.add(ClientVersion(client, version)) + if (wasAdded) { + versionClientCount.compute(version) { _, count -> if (count == null) 1 else count + 1 } + } + } + + fun unregister(client: PersistentCacheClient, version: Long): Long { + val wasRemoved = registeredClients.remove(ClientVersion(client, version)) + val clientsLeft = if (wasRemoved) { + versionClientCount.compute(version) { _, count -> + if (count == null || (count - 1) == 0L) null else count - 1 + } + } else { + versionClientCount[version] + } + return clientsLeft ?: 0 + } + + fun hasNoClients(version: Version): Boolean { + return !versionClientCount.containsKey(version) + } +} \ No newline at end of file diff --git a/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/WeightedValueMap.kt b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/WeightedValueMap.kt new file mode 100644 index 000000000..cb06dc3bf --- /dev/null +++ b/utils/src/main/kotlin/jetbrains/exodus/core/cache/persistent/WeightedValueMap.kt @@ -0,0 +1,43 @@ +package jetbrains.exodus.core.cache.persistent + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +typealias ValueWeigher = (V) -> Int + +// Thread-safe container for versioned cache entry value that holds total weight of all values collectively +internal class WeightedValueMap(private val weigher: ValueWeigher) { + + private val map = ConcurrentHashMap() + + // Total weight of all values collectively + private val totalWeightRef = AtomicInteger() + + val size get() = map.size + val keys get() = map.keys + val totalWeight get() = totalWeightRef.get() + + fun put(key: K, value: V) { + map.compute(key) { _, prevValue -> + val toSubtract = prevValue?.let(weigher) ?: 0 + // Coersing is needed to avoid negative values as weigher can return different values for the same value + totalWeightRef.updateAndGet { (it + weigher(value) - toSubtract).coerceAtLeast(0) } + value + } + } + + fun remove(key: K) { + map.computeIfPresent(key) { _, value -> + totalWeightRef.updateAndGet { (it - weigher(value)).coerceAtLeast(0) } + null + } + } + + fun get(key: K): V? { + return map[key] + } + + fun orNullIfEmpty(): WeightedValueMap? { + return if (map.isEmpty()) null else this + } +} \ No newline at end of file diff --git a/utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt b/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCacheTest.kt similarity index 99% rename from utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt rename to utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCacheTest.kt index 7aece6ca5..d0c31e8c7 100644 --- a/utils/src/test/kotlin/jetbrains/exodus/core/cache/CaffeinePersistentCacheTest.kt +++ b/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/CaffeinePersistentCacheTest.kt @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package jetbrains.exodus.core.cache +package jetbrains.exodus.core.cache.persistent import org.junit.Assert.assertEquals import org.junit.Assert.assertNull diff --git a/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/VersionTrackerTest.kt b/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/VersionTrackerTest.kt new file mode 100644 index 000000000..9f285e867 --- /dev/null +++ b/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/VersionTrackerTest.kt @@ -0,0 +1,49 @@ +package jetbrains.exodus.core.cache.persistent + +import org.junit.Assert.assertEquals +import org.junit.Test + + +class VersionTrackerTest { + + private class TestClient(val tracker: VersionTracker, val version: Version) : PersistentCacheClient { + override fun unregister() { + tracker.unregister(this, version) + } + + } + + @Test + fun `should register clients`() { + // Given + val versionTracker = VersionTracker() + val client1 = TestClient(versionTracker, 1) + val client2 = TestClient(versionTracker, 2) + + versionTracker.register(client1, 1) + versionTracker.register(client2, 2) + + // Then + assertEquals(false, versionTracker.hasNoClients(1)) + assertEquals(false, versionTracker.hasNoClients(2)) + assertEquals(true, versionTracker.hasNoClients(3)) + } + + @Test + fun `should unregister clients`() { + // Given + val versionTracker = VersionTracker() + val client1 = TestClient(versionTracker, 1) + val client2 = TestClient(versionTracker, 2) + + versionTracker.register(client1, 1) + versionTracker.register(client2, 2) + + client1.unregister() + client2.unregister() + + // Then + assertEquals(true, versionTracker.hasNoClients(1)) + assertEquals(true, versionTracker.hasNoClients(2)) + } +} \ No newline at end of file diff --git a/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/WeightedValueMapTest.kt b/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/WeightedValueMapTest.kt new file mode 100644 index 000000000..f007476a2 --- /dev/null +++ b/utils/src/test/kotlin/jetbrains/exodus/core/cache/persistent/WeightedValueMapTest.kt @@ -0,0 +1,77 @@ +package jetbrains.exodus.core.cache.persistent + +import org.junit.Assert.assertEquals +import org.junit.Test +import java.util.concurrent.CyclicBarrier +import kotlin.concurrent.thread +import kotlin.random.Random + + +class WeightedValueMapTest { + + @Test + fun `should calculate total weight when put`() { + // Given + val map = givenWeightedValueMap() + + // When + map.put(1, "value1") + map.put(2, "value2") + map.put(3, "value3") + // Replace value1 + map.put(1, "value4") + + // Then + assertEquals(18, map.totalWeight) + } + + @Test + fun `should calculate total weight when remove`() { + // Given + val map = givenWeightedValueMap() + + // When + map.put(1, "value1") + map.put(2, "value2") + map.put(3, "value3") + map.remove(1) + + // Then + assertEquals(12, map.totalWeight) + } + + @Test + fun `should be thread-safe`() { + // Given + val map = givenWeightedValueMap() + val value = "value" // weight is 5 + val putTask = { i: Long -> map.put(i, "value") } + val removeTask = { i: Long -> map.remove(i) } + val rnd = Random(0) + val n = 100 + + // When + val barrier = CyclicBarrier(n + 1) + val threads = (1..n).map { i -> + thread { + barrier.await() + putTask(i.toLong()) + Thread.sleep(rnd.nextLong(1, 10)) + if (i % 2 == 0) { + // Remove only even keys + removeTask(i.toLong()) + } + } + } + barrier.await() + threads.forEach { it.join() } + + // Then + assertEquals(n * value.length / 2, map.totalWeight) + } + + + private fun givenWeightedValueMap(): WeightedValueMap { + return WeightedValueMap { value: String -> value.length } + } +} \ No newline at end of file