Skip to content

Commit

Permalink
Java: Cluster SCAN request (valkey-io#1737)
Browse files Browse the repository at this point in the history
* Added Scan logic to Glide-core

* added scan to python

* added changes to changelog

* fixed comments

* changed from hash to nanoid

* fixed typing

* fixed comments

* Stubbing for cluster scan

* Update ScanOptions

Signed-off-by: Andrew Carbonetto <[email protected]>

* ClusterScan implementation

Notes:
- ClusterScanCursor is an interface end users should work with. It is in a package exposed in the module.
- NativeClusterScanCursor is an implementation of the above that is in an internal package. It wraps Rust cursor strings.
- RedisClusterClient intercepts the result from Rust to wrap the cursor string in a NativeClusterScanCursor
- The caller is responsible for closing ClusterScanCursor. If they don't, it'll get cleaned up at GC-time, but this is non-deterministic.

* Make cursor termination behave as Python does

Look for the "finished" string

* Unrelated: avoid double-dropping of scripts

The same hash could potentially be reused.

* Better state management of cursors

* Validate that either the initial cursor is used, or an unfinished cursor.
* Test-by-reference for the initial cursor instead of evaluating its string.
* Clean-up unfinished cursors if they get re-used in scan.
* Throw errors if a finished cursor is passed to scan.

* Update design for an iterative cursor

- The cursor object manages updating cursor handles and internal state
- It auto-cleans at the end of the data
- The user can also dispose of the cursor early

* Change the API to match Python

* Unit tests

* Fix integration tests

When using the ScanOptions type, pass native names through to the rust layer when using cluster scans, and enum names when using standalone scans.

* Integration test for cluster scan options

* Spotless

* Add more integration tests

Also flushall before each test to avoid flakiness from leftover state

* Move population of the ScanOptions protobuf to CommandManager

* Rename clusterScan() method to scan()

For consistency with Python

* Add javadoc comments

* Spotless

* Abstract the cursor string from the public API

Move access to this to an internal detail interface.

The user shouldn't need this string themselves.

* Spotless

* Revert accidental changes

resolve conflicts

* Address review comments

* Get protocol string constants from Rust instead of hard-coding them

* Add integration testing for the Stream type filter

* Spotless

* Update scan documentation

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
Co-authored-by: avifenesh <[email protected]>
Co-authored-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
3 people authored and cyip10 committed Jul 16, 2024
1 parent b0a1d62 commit 8fc8a59
Show file tree
Hide file tree
Showing 14 changed files with 1,243 additions and 7 deletions.
78 changes: 78 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
import glide.api.logging.Logger;
import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
import glide.api.models.GlideString;
import glide.api.models.commands.FlushMode;
import glide.api.models.commands.InfoOptions;
import glide.api.models.commands.SortClusterOptions;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.commands.scan.ClusterScanCursor;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
import glide.ffi.resolvers.ClusterScanCursorResolver;
import glide.managers.CommandManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -908,6 +913,22 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor) {
return commandManager
.submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponse)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options) {
return commandManager
.submitClusterScan(cursor, options, this::handleArrayResponse)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
Expand Down Expand Up @@ -980,4 +1001,61 @@ public CompletableFuture<Long> sortStore(
new GlideString[] {key}, sortClusterOptions.toGlideStringArgs(), storeArguments);
return commandManager.submitNewCommand(Sort, arguments, this::handleLongResponse);
}

/** A {@link ClusterScanCursor} implementation for interacting with the Rust layer. */
private static final class NativeClusterScanCursor
implements CommandManager.ClusterScanCursorDetail {

private String cursorHandle;
private boolean isFinished;
private boolean isClosed = false;

// This is for internal use only.
public NativeClusterScanCursor(@NonNull String cursorHandle) {
this.cursorHandle = cursorHandle;
this.isFinished = ClusterScanCursorResolver.FINISHED_CURSOR_HANDLE.equals(cursorHandle);
}

@Override
public String getCursorHandle() {
return cursorHandle;
}

@Override
public boolean isFinished() {
return isFinished;
}

@Override
public void releaseCursorHandle() {
internalClose();
}

@Override
protected void finalize() throws Throwable {
try {
// Release the native cursor
this.internalClose();
} finally {
super.finalize();
}
}

private void internalClose() {
if (!isClosed) {
try {
ClusterScanCursorResolver.releaseNativeCursor(cursorHandle);
} catch (Exception ex) {
Logger.log(
Logger.Level.ERROR,
"ClusterScanCursor",
() -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage());
Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex);
} finally {
// Mark the cursor as closed to avoid double-free (if close() gets called more than once).
isClosed = true;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import glide.api.models.GlideString;
import glide.api.models.Transaction;
import glide.api.models.commands.SortClusterOptions;
import glide.api.models.commands.scan.ClusterScanCursor;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
Expand Down Expand Up @@ -152,6 +154,129 @@ public interface GenericClusterCommands {
*/
CompletableFuture<String> randomKey();

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>
* Array</code> of <code>String</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor);

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the <code>SCAN</code> command, but it is designed to work in a
* Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor}
* object to manage iterations. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the <code>SCAN</code> command, this command is a cursor-based iterator. This means
* that at every call of the command, the server returns an updated cursor ({@link
* ClusterScanCursor}) that the user needs to re-send as the <code>cursor</code> argument in the
* next call. The iteration terminates when the returned cursor {@link
* ClusterScanCursor#isFinished()} returns <code>true</code>.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>Note that the same key may be returned in multiple scan iterations.
*
* <p>How to use the {@link ClusterScanCursor}: <br>
* For each iteration, the previous scan {@link ClusterScanCursor} object should be used to
* continue the <code>SCAN</code> by passing it in the <code>cursor</code> argument. Using the
* same cursor object for multiple iterations may result in the same keys returned or unexpected
* behavior.
*
* <p>When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to
* immediately free resources tied to the cursor. Note that this makes the cursor unusable in
* subsequent calls to <code>SCAN</code>.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> with two elements. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after
* using the cursor in a call to this method. The cursor cannot be used in a scan again after
* {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an
* <code>
* Array</code> of <code>String</code> elements each representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options);

/**
* Sorts the elements in the list, set, or sorted set at <code>key</code> and returns the result.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ CompletableFuture<Boolean> copy(
/**
* Returns a random key from currently selected database.
*
* @see <a href="https://redis.io/docs/latest/commands/randomkey/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/randomkey/">valkey.io</a> for details.
* @return A random <code>key</code> from the database.
* @example
* <pre>{@code
Expand All @@ -203,6 +203,7 @@ CompletableFuture<Boolean> copy(
* apply transformations on sorted elements.<br>
* To store the result into a new key, see {@link #sortStore(String, String, SortOptions)}.
*
* @see <a href="https://valkey.io/commands/sort/">valkey.io</a> for details.
* @param key The key of the list, set, or sorted set to be sorted.
* @param sortOptions The {@link SortOptions}.
* @return An <code>Array</code> of sorted elements.
Expand Down Expand Up @@ -247,6 +248,7 @@ CompletableFuture<Boolean> copy(
* This command is routed depending on the client's {@link ReadFrom} strategy.
*
* @since Redis 7.0 and above.
* @see <a href="https://valkey.io/commands/sort/">valkey.io</a> for details.
* @param key The key of the list, set, or sorted set to be sorted.
* @param sortOptions The {@link SortOptions}.
* @return An <code>Array</code> of sorted elements.
Expand Down Expand Up @@ -291,6 +293,7 @@ CompletableFuture<Boolean> copy(
* key.<br>
* To get the sort result without storing it into a key, see {@link #sort(String, SortOptions)}.
*
* @see <a href="https://valkey.io/commands/sort/">valkey.io</a> for details.
* @param key The key of the list, set, or sorted set to be sorted.
* @param sortOptions The {@link SortOptions}.
* @param destination The key where the sorted result will be stored.
Expand Down
28 changes: 28 additions & 0 deletions java/client/src/main/java/glide/api/logging/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import static glide.ffi.resolvers.LoggerResolver.initInternal;
import static glide.ffi.resolvers.LoggerResolver.logInternal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -170,6 +173,31 @@ public static void log(
logInternal(level.getLevel(), logIdentifier, message);
}

/**
* Logs the provided exception or error if the provided log level is lower than the logger level.
*
* @param level The log level of the provided message.
* @param logIdentifier The log identifier should give the log a context.
* @param throwable The exception or error to log.
*/
public static void log(
@NonNull Level level, @NonNull String logIdentifier, @NonNull Throwable throwable) {
// TODO: Add the corresponding API to Python and Node.js.
log(
level,
logIdentifier,
() -> {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(outputStream)) {
throwable.printStackTrace(printStream);
return printStream.toString();
} catch (IOException e) {
// This can't happen with a ByteArrayOutputStream.
return null;
}
});
}

/**
* Creates a new logger instance and configure it with the provided log level and file name.
*
Expand Down
8 changes: 7 additions & 1 deletion java/client/src/main/java/glide/api/models/Script.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class Script implements AutoCloseable {
/** Hash string representing the code. */
@Getter private final String hash;

private boolean isDropped = false;

/** Indicatoin if script invocation output can return binary data. */
@Getter private final Boolean binarySafeOutput;

Expand All @@ -35,7 +37,11 @@ public <T> Script(T code, Boolean binarySafeOutput) {
/** Drop the linked script from glide-rs <code>code</code>. */
@Override
public void close() throws Exception {
dropScript(hash);

if (!isDropped) {
dropScript(hash);
isDropped = true;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.experimental.SuperBuilder;

/**
Expand All @@ -26,15 +27,23 @@ public abstract class BaseScanOptions {
* <code>COUNT</code> being <code>10</code> which indicates that it will only fetch and match
* <code>10</code> items from the list.
*/
private final String matchPattern;
protected final String matchPattern;

/**
* <code>COUNT</code> is a just a hint for the command for how many elements to fetch from the
* set, hash, or list. <code>COUNT</code> could be ignored until the set, hash, or list is large
* enough for the <code>SCAN</code> commands to represent the results as compact single-allocation
* packed encoding.
*/
private final Long count;
protected final Long count;

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof BaseScanOptions)) return false;
BaseScanOptions that = (BaseScanOptions) o;
return Objects.equals(matchPattern, that.matchPattern) && Objects.equals(count, that.count);
}

/**
* Creates the arguments to be used in <code>SCAN</code> commands.
Expand Down
Loading

0 comments on commit 8fc8a59

Please sign in to comment.