Skip to content

Commit

Permalink
JT-78303: Performance optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-zatsepin committed Jan 19, 2024
1 parent daff243 commit f045711
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal class EntityIterableCacheAdapterMutable private constructor(
fun cloneFrom(cacheAdapter: EntityIterableCacheAdapter): EntityIterableCacheAdapterMutable {
val oldCache = cacheAdapter.cache
val handleDistribution = HandleDistribution(oldCache.count().toInt())
val newCache = oldCache.createNextVersion { key, _ ->
val newCache = oldCache.createNextVersion { key ->
handleDistribution.addHandle(key)
}
val stickyObjects = HashMap(cacheAdapter.stickyObjects)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package jetbrains.exodus.entitystore

import jetbrains.exodus.entitystore.PersistentEntityStoreConfig.ENTITY_ITERABLE_CACHE_MEMORY_PERCENTAGE
import mu.KLogger
import mu.KLogging
import java.util.concurrent.CountDownLatch
Expand All @@ -38,7 +37,7 @@ class EntityIterableCacheTest : EntityStoreTestBase() {
init {
// Use for local experiments to change cache params
//System.setProperty(ENTITY_ITERABLE_CACHE_SIZE, "8096")
System.setProperty(ENTITY_ITERABLE_CACHE_MEMORY_PERCENTAGE, "50")
//System.setProperty(ENTITY_ITERABLE_CACHE_MEMORY_PERCENTAGE, "50")
}
}

Expand Down Expand Up @@ -68,7 +67,7 @@ class EntityIterableCacheTest : EntityStoreTestBase() {
assertEquals(1, store.entityIterableCache.stats.totalHits)
}

fun testHitRate() {
fun testStressReadPerformance() {
// Given
val projectCount = 2
val userCount = 20
Expand All @@ -77,8 +76,8 @@ class EntityIterableCacheTest : EntityStoreTestBase() {
val queryConcurrencyLevel = 10
val queryDelayMillis = 1L

val updateConcurrently = false
val updateDelayMillis = 10L
val updateConcurrently = true
val updateDelayMillis = 100L

val store = entityStore
val projects = Project.createMany(projectCount, store)
Expand All @@ -89,14 +88,14 @@ class EntityIterableCacheTest : EntityStoreTestBase() {
// When
logger.info("Running $queryCount queries...")
val finishedRef = AtomicBoolean(false)
val updateProcess = thread {
while (!finishedRef.get()) {
if (updateConcurrently) {
val updateProcess = if(updateConcurrently) {
thread {
while (!finishedRef.get()) {
test.changeIssueAssignee()
Thread.sleep(updateDelayMillis)
}
Thread.sleep(updateDelayMillis)
}
}
} else null
val executor = Executors.newFixedThreadPool(queryConcurrencyLevel)
repeat(queryCount) {
executor.submit {
Expand All @@ -108,17 +107,57 @@ class EntityIterableCacheTest : EntityStoreTestBase() {
executor.shutdown()
executor.awaitTermination(10, MINUTES)
finishedRef.set(true)
updateProcess.join()
updateProcess?.join()

// Then
reportInLogEntityIterableCacheStats()
val actualHitRate = store.entityIterableCache.stats.hitRate
val expectedHitRate = 0.5
println("Actual hitRate: $actualHitRate")
assertTrue(
"hitRate should be more or equal to $expectedHitRate, but was $actualHitRate",
actualHitRate >= expectedHitRate
)
assertHitRateToBeNotLessThan(0.5)
}

fun testStressWritePerformance() {
// Given
// Use these params to experiment with cache locally
val projectCount = 2
val userCount = 20
val issueCount = 1000
val writeCount = 10000

val queryConcurrently = true
val queryDelayMillis = 100L

val store = entityStore
val projects = Project.createMany(projectCount, store)
val users = User.createMany(userCount, store)
val issues = Issue.createMany(issueCount, projects, users, store)
val test = TestCase(store, projects, users, issues)

// When
logger.info("Running $writeCount writes...")
val finishedRef = AtomicBoolean(false)
val queryThread = if(queryConcurrently) {
thread {
while (!finishedRef.get()) {
test.queryComplexList()
test.queryAssignedIssues()
Thread.sleep(queryDelayMillis)
}
}
} else null
repeat(writeCount) {
test.changeIssueAssignee()
test.changeIssueTitle()
test.queryComplexList()
if (it % 1000 == 0) {
println("Progress: $it/$writeCount")
}
}
finishedRef.set(true)
queryThread?.join()

// Then
reportInLogEntityIterableCacheStats()
// Expected hit rate is low because of intensive concurrent writes
assertHitRateToBeNotLessThan(0.3)
}

fun testCacheTransactionIsolation() {
Expand Down Expand Up @@ -188,6 +227,16 @@ class EntityIterableCacheTest : EntityStoreTestBase() {
assertEquals(4, writeCount2)

reportInLogEntityIterableCacheStats()
assertHitRateToBeNotLessThan(0.5)
}

private fun assertHitRateToBeNotLessThan(expectedHitRate: Double) {
val actualHitRate = entityStore.entityIterableCache.stats.hitRate
println("Actual hitRate: $actualHitRate")
assertTrue(
"hitRate should be more or equal to $expectedHitRate, but was $actualHitRate",
actualHitRate >= expectedHitRate
)
}

data class TestCase(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright ${inceptionYear} - ${year} ${owner}
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jetbrains.exodus.core.cache

import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
import java.util.*
import java.util.Collections.synchronizedMap

interface EvictionListener<K, V> {
fun onEvict(key: K?, value: V?)
}

internal class CaffeineEvictionSubject<K, V> : RemovalListener<K, V> {

private val listeners: MutableMap<EvictionListener<K, V>, Any> = synchronizedMap(WeakHashMap())

override fun onRemoval(key: K?, value: V?, cause: RemovalCause?) {
val keys = listeners.keys
synchronized(listeners) {
keys.forEach { it.onEvict(key, value) }
}
}

fun addListener(listener: EvictionListener<K, V>) {
listeners[listener] = listener
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package jetbrains.exodus.core.cache.persistent

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import jetbrains.exodus.core.cache.CaffeineEvictionSubject
import jetbrains.exodus.core.cache.EvictionListener
import java.util.concurrent.ConcurrentHashMap
import java.util.function.BiConsumer
import java.util.function.Consumer


typealias Version = Long
Expand All @@ -32,11 +35,16 @@ class CaffeinePersistentCache<K, V> private constructor(
private val config: CaffeineCacheConfig<V>,
override val version: Long,
private val versionTracker: VersionTracker,
) : PersistentCache<K, V> {
private val evictionSubject: CaffeineEvictionSubject<K, V>,
// Local index as map of keys to corresponding versions available for the current version of cache
// This map is eventually consistent with the cache and not intended to be in full sync with it due to performance reasons
private val keyVersions: ConcurrentHashMap<K, Version> = ConcurrentHashMap()
) : PersistentCache<K, V>, EvictionListener<K, V> {

companion object {

fun <K, V> create(config: CaffeineCacheConfig<V>): CaffeinePersistentCache<K, V> {
val evictionSubject = CaffeineEvictionSubject<K, V>()
val cache = Caffeine.newBuilder()
.apply { if (config.expireAfterAccess != null) expireAfterAccess(config.expireAfterAccess) }
.apply { if (config.useSoftValues) softValues() }
Expand All @@ -50,16 +58,23 @@ class CaffeinePersistentCache<K, V> private constructor(
weigher { _: K, values: WeightedValueMap<Version, V> -> values.totalWeight }
}
}
.apply { evictionListener(evictionSubject) }
.build<K, WeightedValueMap<Version, V>>()
val version = 0L
val tracker = VersionTracker(version)

return CaffeinePersistentCache(cache, config, version, tracker)
return CaffeinePersistentCache(cache, config, version, tracker, evictionSubject)
}
}
init {
evictionSubject.addListener(this)
}

// Local index as map of keys to corresponding versions available for the current version of cache
private val keyVersions = ConcurrentHashMap<K, Long>()
private val cacheMap = cache.asMap()

override fun onEvict(key: K?, value: V?) {
key?.let { keyVersions.remove(key) }
}

// Generic cache impl
override fun size(): Long {
Expand All @@ -73,12 +88,12 @@ class CaffeinePersistentCache<K, V> private constructor(
override fun get(key: K): V? {
val valueVersion = keyVersions[key] ?: return null
val values = cache.getIfPresent(key) ?: return null
values.removeUnusedVersionsAndPutBack(key, valueVersion)
values.removeUnusedVersionsAndUpdateCache(key, valueVersion)
return values.get(valueVersion)
}

override fun put(key: K, value: V) {
cache.asMap().compute(key) { _, map ->
cacheMap.compute(key) { _, map ->
val values = map ?: createNewValueMap()
values.put(version, value)
values.removeUnusedVersions(version)
Expand All @@ -94,7 +109,7 @@ class CaffeinePersistentCache<K, V> private constructor(
}

override fun remove(key: K) {
cache.asMap().computeIfPresent(key) { _, values ->
cacheMap.computeIfPresent(key) { _, values ->
values.remove(version)
values.orNullIfEmpty()
}
Expand All @@ -120,22 +135,24 @@ class CaffeinePersistentCache<K, V> private constructor(
}

// Persistent cache impl
override fun createNextVersion(entryConsumer: BiConsumer<K, V>?): PersistentCache<K, V> {
override fun createNextVersion(keyConsumer: Consumer<K>?): PersistentCache<K, V> {
val nextVersion = versionTracker.nextVersion()
val newCache = CaffeinePersistentCache(cache, config, nextVersion, versionTracker)

// Copy key index for the next cache
// It effectively prohibits new version from seeing new values cached for previous versions
// as they might be stale, e.g. due to values already changed by another transaction
val keyVersionsCopy = ConcurrentHashMap<K, Version>(keyVersions.size)
keyVersions.forEach { (key, version) ->
val value = cache.getVersioned(key, version)
if (value != null) {
newCache.keyVersions[key] = version
entryConsumer?.accept(key, value)
}
keyVersionsCopy[key] = version
keyConsumer?.accept(key)
}

return newCache
return CaffeinePersistentCache(
cache, config,
nextVersion,
versionTracker,
evictionSubject,
keyVersionsCopy
)
}

override fun register(): PersistentCacheClient {
Expand Down Expand Up @@ -167,7 +184,7 @@ class CaffeinePersistentCache<K, V> private constructor(
return changed
}

private fun WeightedValueMap<Version, V>.removeUnusedVersionsAndPutBack(key: K, currentVersion: Version) {
private fun WeightedValueMap<Version, V>.removeUnusedVersionsAndUpdateCache(key: K, currentVersion: Version) {
if (removeUnusedVersions(currentVersion)) {
cache.put(key, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package jetbrains.exodus.core.cache.persistent

import jetbrains.exodus.core.cache.BasicCache
import java.util.function.BiConsumer
import java.util.function.Consumer

/**
* This interface represents a cache that can store its previous versions,
Expand All @@ -33,7 +33,7 @@ interface PersistentCache<K, V> : BasicCache<K, V> {
/**
* Creates new version of the cache with the same configuration.
*/
fun createNextVersion(entryConsumer: BiConsumer<K, V>? = null): PersistentCache<K, V>
fun createNextVersion(keyConsumer: Consumer<K>? = null): PersistentCache<K, V>

/**
* Register a client for the current version of the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class WeightedValueMap<K, V>(private val weigher: ValueWeigher<V>) {
}

fun remove(key: K) {
map.computeIfPresent(key) { _, value ->
map.computeIfPresent(key) { _, _ ->
val weight = weights.remove(key) ?: 0
totalWeightRef.updateAndGet { it - weight }
null
Expand Down

0 comments on commit f045711

Please sign in to comment.