Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Cluster SCAN request #1737

Merged
merged 32 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4385ced
Added Scan logic to Glide-core
avifenesh Jun 27, 2024
1620543
added scan to python
avifenesh Jun 27, 2024
164d6bb
added changes to changelog
avifenesh Jun 27, 2024
2a52e77
fixed comments
avifenesh Jun 28, 2024
9bfadcd
changed from hash to nanoid
avifenesh Jun 28, 2024
e6e3cf6
fixed typing
avifenesh Jun 28, 2024
f0c7be1
fixed comments
avifenesh Jun 30, 2024
396aefa
Stubbing for cluster scan
jduo Jun 30, 2024
398dc82
Update ScanOptions
acarbonetto Jun 30, 2024
d65bedd
ClusterScan implementation
jduo Jun 30, 2024
d3767ea
Make cursor termination behave as Python does
jduo Jul 1, 2024
81df50a
Unrelated: avoid double-dropping of scripts
jduo Jul 1, 2024
2d9e7be
Better state management of cursors
jduo Jul 1, 2024
24d94f9
Update design for an iterative cursor
jduo Jul 1, 2024
6cd05a7
Change the API to match Python
jduo Jul 1, 2024
7c80cd4
Unit tests
jduo Jul 1, 2024
8fc7d62
Fix integration tests
jduo Jul 1, 2024
7c6e05d
Integration test for cluster scan options
jduo Jul 1, 2024
693363c
Spotless
jduo Jul 1, 2024
92d7f60
Add more integration tests
jduo Jul 1, 2024
4bb2cc7
Move population of the ScanOptions protobuf to CommandManager
jduo Jul 1, 2024
277e57e
Rename clusterScan() method to scan()
jduo Jul 1, 2024
da2631c
Add javadoc comments
jduo Jul 2, 2024
d18f07d
Spotless
jduo Jul 2, 2024
9b890ed
Abstract the cursor string from the public API
jduo Jul 2, 2024
3894f71
Spotless
jduo Jul 2, 2024
ef3f3d6
Revert accidental changes
jduo Jul 2, 2024
c0af505
Address review comments
jduo Jul 2, 2024
7948b57
Get protocol string constants from Rust instead of hard-coding them
jduo Jul 2, 2024
180a509
Add integration testing for the Stream type filter
jduo Jul 2, 2024
0d001e8
Spotless
jduo Jul 2, 2024
4110295
Update scan documentation
acarbonetto Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
import glide.api.logging.Logger;
import glide.api.models.ClusterTransaction;
import glide.api.models.ClusterValue;
import glide.api.models.GlideString;
import glide.api.models.commands.FlushMode;
import glide.api.models.commands.InfoOptions;
import glide.api.models.commands.SortClusterOptions;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.commands.scan.ClusterScanCursor;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
import glide.ffi.resolvers.ClusterScanCursorResolver;
import glide.managers.CommandManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -908,6 +913,22 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor) {
return commandManager
.submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponse)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options) {
return commandManager
.submitClusterScan(cursor, options, this::handleArrayResponse)
.thenApply(
result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]});
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
Expand Down Expand Up @@ -980,4 +1001,61 @@ public CompletableFuture<Long> sortStore(
new GlideString[] {key}, sortClusterOptions.toGlideStringArgs(), storeArguments);
return commandManager.submitNewCommand(Sort, arguments, this::handleLongResponse);
}

/** A {@link ClusterScanCursor} implementation for interacting with the Rust layer. */
private static final class NativeClusterScanCursor
implements CommandManager.ClusterScanCursorDetail {

private String cursorHandle;
private boolean isFinished;
private boolean isClosed = false;

// This is for internal use only.
public NativeClusterScanCursor(@NonNull String cursorHandle) {
this.cursorHandle = cursorHandle;
this.isFinished = ClusterScanCursorResolver.FINISHED_CURSOR_HANDLE.equals(cursorHandle);
}

@Override
public String getCursorHandle() {
return cursorHandle;
}

@Override
public boolean isFinished() {
return isFinished;
}

@Override
public void releaseCursorHandle() {
internalClose();
}

@Override
protected void finalize() throws Throwable {
try {
// Release the native cursor
this.internalClose();
} finally {
super.finalize();
}
}

private void internalClose() {
if (!isClosed) {
try {
ClusterScanCursorResolver.releaseNativeCursor(cursorHandle);
} catch (Exception ex) {
Logger.log(
Logger.Level.ERROR,
"ClusterScanCursor",
() -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage());
Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex);
} finally {
// Mark the cursor as closed to avoid double-free (if close() gets called more than once).
isClosed = true;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import glide.api.models.GlideString;
import glide.api.models.Transaction;
import glide.api.models.commands.SortClusterOptions;
import glide.api.models.commands.scan.ClusterScanCursor;
import glide.api.models.commands.scan.ScanOptions;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
Expand Down Expand Up @@ -152,6 +154,109 @@ public interface GenericClusterCommands {
*/
CompletableFuture<String> randomKey();

/**
* Incrementally iterates over the keys in the Cluster.
*
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* <p>This command is similar to the SCAN command, but it is designed to work in a Cluster
* environment. For each iteration the new cursor object should be used to continue the scan.
* Using the same cursor object for multiple iterations will result in the same keys or unexpected
* behavior. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the SCAN command, the method can be used to iterate over the keys in the database,
* to return all keys that were in the database from the time the scan started until the scan
* finishes (that is, {@link ClusterScanCursor#isFinished()} returns true). When the cursor is not
* needed, call {@link ClusterScanCursor#releaseCursorHandle()} to immediately free resources tied
* to the cursor. Note that this makes the cursor unusable in subsequent calls to scan.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>The same key can be returned in multiple scans iteration.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @return An <code>Array</code> of <code>Objects</code>. The first element is always the {@link
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current chunk immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after using
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* the cursor in a call to this method. The cursor cannot be used in a scan again after {@link
* ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an <code>
* Array</code> of Objects where each entry is a <code>String</code> representing a key.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor);

/**
* Incrementally iterates over the keys in the Cluster.
*
* <p>This command is similar to the SCAN command, but it is designed to work in a Cluster
* environment. For each iteration the new cursor object should be used to continue the scan.
* Using the same cursor object for multiple iterations will result in the same keys or unexpected
* behavior. For more information about the Cluster Scan implementation, see <a
* href="https://github.com/aws/glide-for-redis/wiki/General-Concepts#cluster-scan">Cluster
* Scan</a>.
*
* <p>As with the SCAN command, the method can be used to iterate over the keys in the database,
* to return all keys that were in the database from the time the scan started until the scan
* finishes (that is, {@link ClusterScanCursor#isFinished()} returns true). When the cursor is not
* needed, call {@link ClusterScanCursor#releaseCursorHandle()} to immediately free resources tied
* to the cursor. Note that this makes the cursor unusable in subsequent calls to scan.
*
* <p>This method guarantees that all keyslots available when the first SCAN is called will be
* scanned before the cursor is finished. Any keys added after the initial scan request is made
* are not guaranteed to be scanned.
*
* <p>The same key can be returned in multiple scans iteration.
*
* @see ClusterScanCursor for more details about how to use the cursor.
* @see <a href="https://valkey.io/commands/scan">valkey.io</a> for details.
* @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new
* scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}.
* @param options The {@link ScanOptions}.
* @return An <code>Array</code> of <code>Objects</code>. The first element is always the {@link
* ClusterScanCursor} for the next iteration of results. To see if there is more data on the
* given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the
* current chunk immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after using
* the cursor in a call to this method. The cursor cannot be used in a scan again after {@link
* ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an <code>
* Array</code> of Objects where each entry is a <code>String</code> representing a key.
* @example
* <pre>{@code
* // Assume key contains a set with 200 keys
* ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
* Object[] result;
* while (!cursor.isFinished()) {
* result = client.scan(cursor).get();
* cursor.releaseCursorHandle();
* cursor = (ClusterScanCursor) result[0];
* Object[] stringResults = (Object[]) result[1];
*
* System.out.println("\nSCAN iteration:");
* Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
* }
* }</pre>
*/
CompletableFuture<Object[]> scan(ClusterScanCursor cursor, ScanOptions options);

/**
* Sorts the elements in the list, set, or sorted set at <code>key</code> and returns the result.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ CompletableFuture<Boolean> copy(
/**
* Returns a random key from currently selected database.
*
* @see <a href="https://redis.io/docs/latest/commands/randomkey/">redis.io</a> for details.
* @see <a href="https://valkey.io/commands/randomkey/">valkey.io</a> for details.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* @return A random <code>key</code> from the database.
* @example
* <pre>{@code
Expand All @@ -203,6 +203,7 @@ CompletableFuture<Boolean> copy(
* apply transformations on sorted elements.<br>
* To store the result into a new key, see {@link #sortStore(String, String, SortOptions)}.
*
* @see <a href="https://valkey.io/commands/sort/">valkey.io</a> for details.
* @param key The key of the list, set, or sorted set to be sorted.
* @param sortOptions The {@link SortOptions}.
* @return An <code>Array</code> of sorted elements.
Expand Down Expand Up @@ -247,6 +248,7 @@ CompletableFuture<Boolean> copy(
* This command is routed depending on the client's {@link ReadFrom} strategy.
*
* @since Redis 7.0 and above.
* @see <a href="https://valkey.io/commands/sort/">valkey.io</a> for details.
* @param key The key of the list, set, or sorted set to be sorted.
* @param sortOptions The {@link SortOptions}.
* @return An <code>Array</code> of sorted elements.
Expand Down Expand Up @@ -291,6 +293,7 @@ CompletableFuture<Boolean> copy(
* key.<br>
* To get the sort result without storing it into a key, see {@link #sort(String, SortOptions)}.
*
* @see <a href="https://valkey.io/commands/sort/">valkey.io</a> for details.
* @param key The key of the list, set, or sorted set to be sorted.
* @param sortOptions The {@link SortOptions}.
* @param destination The key where the sorted result will be stored.
Expand Down
28 changes: 28 additions & 0 deletions java/client/src/main/java/glide/api/logging/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import static glide.ffi.resolvers.LoggerResolver.initInternal;
import static glide.ffi.resolvers.LoggerResolver.logInternal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -170,6 +173,31 @@ public static void log(
logInternal(level.getLevel(), logIdentifier, message);
}

/**
* Logs the provided exception or error if the provided log level is lower than the logger level.
*
* @param level The log level of the provided message.
* @param logIdentifier The log identifier should give the log a context.
* @param throwable The exception or error to log.
*/
public static void log(
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
@NonNull Level level, @NonNull String logIdentifier, @NonNull Throwable throwable) {
// TODO: Add the corresponding API to Python and Node.js.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to post such TODO as a GH task

log(
level,
logIdentifier,
() -> {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream(outputStream)) {
throwable.printStackTrace(printStream);
return printStream.toString();
} catch (IOException e) {
// This can't happen with a ByteArrayOutputStream.
return null;
}
});
}

/**
* Creates a new logger instance and configure it with the provided log level and file name.
*
Expand Down
8 changes: 7 additions & 1 deletion java/client/src/main/java/glide/api/models/Script.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class Script implements AutoCloseable {
/** Hash string representing the code. */
@Getter private final String hash;

private boolean isDropped = false;

/** Indicatoin if script invocation output can return binary data. */
@Getter private final Boolean binarySafeOutput;

Expand All @@ -35,7 +37,11 @@ public <T> Script(T code, Boolean binarySafeOutput) {
/** Drop the linked script from glide-rs <code>code</code>. */
@Override
public void close() throws Exception {
dropScript(hash);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to synchronise this?

if (!isDropped) {
dropScript(hash);
isDropped = true;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.experimental.SuperBuilder;

/**
Expand All @@ -26,15 +27,23 @@ public abstract class BaseScanOptions {
* <code>COUNT</code> being <code>10</code> which indicates that it will only fetch and match
* <code>10</code> items from the list.
*/
private final String matchPattern;
protected final String matchPattern;

/**
* <code>COUNT</code> is a just a hint for the command for how many elements to fetch from the
* set, hash, or list. <code>COUNT</code> could be ignored until the set, hash, or list is large
* enough for the <code>SCAN</code> commands to represent the results as compact single-allocation
* packed encoding.
*/
private final Long count;
protected final Long count;

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof BaseScanOptions)) return false;
BaseScanOptions that = (BaseScanOptions) o;
return Objects.equals(matchPattern, that.matchPattern) && Objects.equals(count, that.count);
}

/**
* Creates the arguments to be used in <code>SCAN</code> commands.
Expand Down
Loading
Loading