Skip to content

Commit

Permalink
Refine and clarify operations in asynchronous caching implementation.
Browse files Browse the repository at this point in the history
Uses more descriptive names for operations, especially Reactive operations, by either calling a local, private method or introducing a (functional) variable with strongly typed parameters.

Edits and refines Javadoc.

Original Pull Request: #2717

Closes #2743
  • Loading branch information
jxblum committed Oct 18, 2023
1 parent 9a81e6c commit 2b5586f
Showing 1 changed file with 111 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/
package org.springframework.data.redis.cache;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

import org.springframework.dao.PessimisticLockingFailureException;
Expand All @@ -38,7 +36,10 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
Expand All @@ -47,11 +48,11 @@
* <p>
* {@link DefaultRedisCacheWriter} can be used in
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
* requests and potential command wait times.
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
* aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
* multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
* by setting an explicit lock key and checking against presence of this key which leads to additional requests
* and potential command wait times.
*
* @author Christoph Strobl
* @author Mark Paluch
Expand All @@ -61,8 +62,12 @@
*/
class DefaultRedisCacheWriter implements RedisCacheWriter {

private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
public static final boolean FLUX_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", null);

private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);

private final AsyncCacheWriter asyncCacheWriter;

private final BatchStrategy batchStrategy;

Expand All @@ -74,8 +79,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {

private final TtlFunction lockTtl;

private final AsyncCacheWriter asyncCacheWriter;

/**
* @param connectionFactory must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
Expand All @@ -86,8 +89,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {

/**
* @param connectionFactory must not be {@literal null}.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
* to disable locking.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
* Use {@link Duration#ZERO} to disable locking.
* @param batchStrategy must not be {@literal null}.
*/
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
Expand All @@ -96,8 +99,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {

/**
* @param connectionFactory must not be {@literal null}.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
* to disable locking.
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
* Use {@link Duration#ZERO} to disable locking.
* @param lockTtl Lock TTL function must not be {@literal null}.
* @param cacheStatisticsCollector must not be {@literal null}.
* @param batchStrategy must not be {@literal null}.
Expand All @@ -116,12 +119,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
this.lockTtl = lockTtl;
this.statistics = cacheStatisticsCollector;
this.batchStrategy = batchStrategy;
this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate()
: UnsupportedAsyncCacheWriter.INSTANCE;
}

if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
asyncCacheWriter = new AsynchronousCacheWriterDelegate();
} else {
asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
}
private boolean isAsyncCacheSupportEnabled() {
return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT
&& this.connectionFactory instanceof ReactiveRedisConnectionFactory;
}

@Override
Expand Down Expand Up @@ -168,7 +172,8 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur

if (cachedValue != null) {
statistics.incHits(name);
} else {
}
else {
statistics.incMisses(name);
}

Expand All @@ -186,8 +191,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
execute(name, connection -> {

if (shouldExpireWithin(ttl)) {
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
SetOption.upsert());
connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert());
} else {
connection.stringCommands().set(key, value);
}
Expand Down Expand Up @@ -224,16 +228,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat

try {

boolean put;

if (shouldExpireWithin(ttl)) {
put = ObjectUtils.nullSafeEquals(
connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
} else {
put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
}
Boolean wasSet = shouldExpireWithin(ttl)
? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent())
: connection.stringCommands().setNX(key, value);

if (put) {
if (Boolean.TRUE.equals(wasSet)) {
statistics.incPuts(name);
return null;
}
Expand Down Expand Up @@ -322,9 +321,11 @@ void lock(String name) {
private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue,
RedisConnection connection) {

Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue));
byte[] cacheLockKey = createCacheLockKey(name);

return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
Expiration expiration = toExpiration(contextualKey, contextualValue);

return connection.stringCommands().set(cacheLockKey, new byte[0], expiration, SetOption.SET_IF_ABSENT);
}

/**
Expand Down Expand Up @@ -378,29 +379,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
Thread.sleep(this.sleepTime.toMillis());
}
} catch (InterruptedException cause) {

// Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();

throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
cause);
String message = "Interrupted while waiting to unlock cache %s".formatted(name);
throw new PessimisticLockingFailureException(message, cause);
} finally {
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}

boolean doCheckLock(String name, RedisConnection connection) {
return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true);
Boolean cacheLockExists = connection.keyCommands().exists(createCacheLockKey(name));
return Boolean.TRUE.equals(cacheLockExists);
}

byte[] createCacheLockKey(String name) {
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}

private ReactiveRedisConnectionFactory getReactiveConnectionFactory() {
return (ReactiveRedisConnectionFactory) this.connectionFactory;
}

private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}

private Expiration toExpiration(Duration ttl) {
return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS);
}

private Expiration toExpiration(Object key, @Nullable Object value) {
return Expiration.from(this.lockTtl.getTimeToLive(key, value));
}

/**
* Interface for asynchronous cache retrieval.
*
Expand All @@ -419,8 +431,8 @@ interface AsyncCacheWriter {
* @param name the cache name from which to retrieve the cache entry.
* @param key the cache entry key.
* @param ttl optional TTL to set for Time-to-Idle eviction.
* @return a future that completes either with a value if the value exists or completing with {@code null} if the
* cache does not contain an entry.
* @return a future that completes either with a value if the value exists or completing with {@code null}
* if the cache does not contain an entry.
*/
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);

Expand Down Expand Up @@ -463,8 +475,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
}

/**
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
* {@link ReactiveRedisConnectionFactory}.
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
* using {@link ReactiveRedisConnectionFactory}.
*
* @since 3.2
*/
Expand All @@ -481,11 +493,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
return doWithConnection(connection -> {

ByteBuffer wrappedKey = ByteBuffer.wrap(key);

Mono<?> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty();

ReactiveStringCommands stringCommands = connection.stringCommands();

Mono<ByteBuffer> get = shouldExpireWithin(ttl)
? stringCommands.getEx(wrappedKey, Expiration.from(ttl))
? stringCommands.getEx(wrappedKey, toExpiration(ttl))
: stringCommands.get(wrappedKey);

return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
Expand All @@ -498,75 +512,97 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
return doWithConnection(connection -> {

Mono<?> mono = isLockingCacheWriter()
? doStoreWithLocking(name, key, value, ttl, connection)
? doLockStoreUnlock(name, key, value, ttl, connection)
: doStore(key, value, ttl, connection);

return mono.then().toFuture();
});
}

private Mono<Boolean> doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {

return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
unused -> doUnlock(name, connection));
}

private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {

ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
ByteBuffer wrappedValue = ByteBuffer.wrap(value);

if (shouldExpireWithin(ttl)) {
return connection.stringCommands().set(wrappedKey, wrappedValue,
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
} else {
return connection.stringCommands().set(wrappedKey, wrappedValue);
}
ReactiveStringCommands stringCommands = connection.stringCommands();

return shouldExpireWithin(ttl)
? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())
: stringCommands.set(wrappedKey, wrappedValue);
}

private Mono<Boolean> doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl,
ReactiveRedisConnection connection) {

Mono<Object> lock = doLock(name, key, value, connection);

Function<Object, Mono<Boolean>> store = unused -> doStore(key, value, ttl, connection);
Function<Object, Mono<Void>> unlock = unused -> doUnlock(name, connection);

return Mono.usingWhen(lock, store, unlock);
}

private Mono<Object> doLock(String name, Object contextualKey, @Nullable Object contextualValue,
ReactiveRedisConnection connection) {

ByteBuffer key = ByteBuffer.wrap(createCacheLockKey(name));
ByteBuffer key = toCacheLockKey(name);
ByteBuffer value = ByteBuffer.wrap(new byte[0]);
Expiration expiration = Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue));

Expiration expiration = toExpiration(contextualKey, contextualValue);

return connection.stringCommands().set(key, value, expiration, SetOption.SET_IF_ABSENT) //
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
.thenReturn(Boolean.TRUE);
}

private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
return connection.keyCommands().del(ByteBuffer.wrap(createCacheLockKey(name))).then();
return connection.keyCommands().del(toCacheLockKey(name)).then();
}

private Mono<Void> waitForLock(ReactiveRedisConnection connection, String cacheName) {

AtomicLong lockWaitTimeNs = new AtomicLong();
byte[] cacheLockKey = createCacheLockKey(cacheName);
AtomicLong lockWaitNanoTime = new AtomicLong();

Flux<Long> wait = Flux.interval(Duration.ZERO, sleepTime);
Mono<Boolean> exists = connection.keyCommands().exists(ByteBuffer.wrap(cacheLockKey)).filter(it -> !it);
Consumer<org.reactivestreams.Subscription> setNanoTimeOnLockWait = subscription ->
lockWaitNanoTime.set(System.nanoTime());

return wait.doOnSubscribe(subscription -> lockWaitTimeNs.set(System.nanoTime())) //
.flatMap(it -> exists) //
.doFinally(signalType -> statistics.incLockTime(cacheName, System.nanoTime() - lockWaitTimeNs.get())) //
Consumer<SignalType> recordStatistics = signalType ->
statistics.incLockTime(cacheName, System.nanoTime() - lockWaitNanoTime.get());

Function<Long, Mono<Boolean>> doWhileCacheLockExists = lockWaitTime -> connection.keyCommands()
.exists(toCacheLockKey(cacheName)).filter(cacheLockKeyExists -> !cacheLockKeyExists);

return waitInterval(sleepTime) //
.doOnSubscribe(setNanoTimeOnLockWait) //
.flatMap(doWhileCacheLockExists) //
.doFinally(recordStatistics) //
.next() //
.then();
}

private Flux<Long> waitInterval(Duration period) {
return Flux.interval(Duration.ZERO, period);
}

private ByteBuffer toCacheLockKey(String cacheName) {
return ByteBuffer.wrap(createCacheLockKey(cacheName));
}

private <T> CompletableFuture<T> doWithConnection(
Function<ReactiveRedisConnection, CompletableFuture<T>> callback) {

ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory;
Mono<ReactiveRedisConnection> reactiveConnection =
Mono.fromSupplier(getReactiveConnectionFactory()::getReactiveConnection);

Function<ReactiveRedisConnection, Mono<T>> commandExecution = connection ->
Mono.fromCompletionStage(callback.apply(connection));

Function<ReactiveRedisConnection, Mono<Void>> connectionClose = ReactiveRedisConnection::closeLater;

Mono<T> result = Mono.usingWhen(reactiveConnection, commandExecution, connectionClose);

return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), //
it -> Mono.fromCompletionStage(callback.apply(it)), //
ReactiveRedisConnection::closeLater) //
.toFuture();
return result.toFuture();
}
}
}

0 comments on commit 2b5586f

Please sign in to comment.