diff --git a/java/client/src/main/java/glide/api/RedisClusterClient.java b/java/client/src/main/java/glide/api/RedisClusterClient.java index 99f3a41ee6..800c57045e 100644 --- a/java/client/src/main/java/glide/api/RedisClusterClient.java +++ b/java/client/src/main/java/glide/api/RedisClusterClient.java @@ -50,6 +50,7 @@ import glide.api.commands.ScriptingAndFunctionsClusterCommands; import glide.api.commands.ServerManagementClusterCommands; import glide.api.commands.TransactionsClusterCommands; +import glide.api.logging.Logger; import glide.api.models.ClusterTransaction; import glide.api.models.ClusterValue; import glide.api.models.GlideString; @@ -57,9 +58,13 @@ import glide.api.models.commands.InfoOptions; import glide.api.models.commands.SortClusterOptions; import glide.api.models.commands.function.FunctionRestorePolicy; +import glide.api.models.commands.scan.ClusterScanCursor; +import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.RedisClusterClientConfiguration; import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute; +import glide.ffi.resolvers.ClusterScanCursorResolver; +import glide.managers.CommandManager; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -908,6 +913,22 @@ public CompletableFuture randomKey() { RandomKey, new String[0], this::handleStringOrNullResponse); } + @Override + public CompletableFuture scan(ClusterScanCursor cursor) { + return commandManager + .submitClusterScan(cursor, ScanOptions.builder().build(), this::handleArrayResponse) + .thenApply( + result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); + } + + @Override + public CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options) { + return commandManager + .submitClusterScan(cursor, options, this::handleArrayResponse) + .thenApply( + result -> new Object[] {new NativeClusterScanCursor(result[0].toString()), result[1]}); + } + @Override public CompletableFuture spublish(@NonNull String channel, @NonNull String message) { return commandManager.submitNewCommand( @@ -980,4 +1001,61 @@ public CompletableFuture sortStore( new GlideString[] {key}, sortClusterOptions.toGlideStringArgs(), storeArguments); return commandManager.submitNewCommand(Sort, arguments, this::handleLongResponse); } + + /** A {@link ClusterScanCursor} implementation for interacting with the Rust layer. */ + private static final class NativeClusterScanCursor + implements CommandManager.ClusterScanCursorDetail { + + private String cursorHandle; + private boolean isFinished; + private boolean isClosed = false; + + // This is for internal use only. + public NativeClusterScanCursor(@NonNull String cursorHandle) { + this.cursorHandle = cursorHandle; + this.isFinished = ClusterScanCursorResolver.FINISHED_CURSOR_HANDLE.equals(cursorHandle); + } + + @Override + public String getCursorHandle() { + return cursorHandle; + } + + @Override + public boolean isFinished() { + return isFinished; + } + + @Override + public void releaseCursorHandle() { + internalClose(); + } + + @Override + protected void finalize() throws Throwable { + try { + // Release the native cursor + this.internalClose(); + } finally { + super.finalize(); + } + } + + private void internalClose() { + if (!isClosed) { + try { + ClusterScanCursorResolver.releaseNativeCursor(cursorHandle); + } catch (Exception ex) { + Logger.log( + Logger.Level.ERROR, + "ClusterScanCursor", + () -> "Error releasing cursor " + cursorHandle + ": " + ex.getMessage()); + Logger.log(Logger.Level.ERROR, "ClusterScanCursor", ex); + } finally { + // Mark the cursor as closed to avoid double-free (if close() gets called more than once). + isClosed = true; + } + } + } + } } diff --git a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java index 3a121c2d5b..5815df16e5 100644 --- a/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericClusterCommands.java @@ -6,6 +6,8 @@ import glide.api.models.GlideString; import glide.api.models.Transaction; import glide.api.models.commands.SortClusterOptions; +import glide.api.models.commands.scan.ClusterScanCursor; +import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.ReadFrom; import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute; @@ -152,6 +154,129 @@ public interface GenericClusterCommands { */ CompletableFuture randomKey(); + /** + * Incrementally iterates over the keys in the Cluster. + * + *

This command is similar to the SCAN command, but it is designed to work in a + * Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor} + * object to manage iterations. For more information about the Cluster Scan implementation, see Cluster + * Scan. + * + *

As with the SCAN command, this command is a cursor-based iterator. This means + * that at every call of the command, the server returns an updated cursor ({@link + * ClusterScanCursor}) that the user needs to re-send as the cursor argument in the + * next call. The iteration terminates when the returned cursor {@link + * ClusterScanCursor#isFinished()} returns true. + * + *

This method guarantees that all keyslots available when the first SCAN is called will be + * scanned before the cursor is finished. Any keys added after the initial scan request is made + * are not guaranteed to be scanned. + * + *

Note that the same key may be returned in multiple scan iterations. + * + *

How to use the {@link ClusterScanCursor}:
+ * For each iteration, the previous scan {@link ClusterScanCursor} object should be used to + * continue the SCAN by passing it in the cursor argument. Using the + * same cursor object for multiple iterations may result in the same keys returned or unexpected + * behavior. + * + *

When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to + * immediately free resources tied to the cursor. Note that this makes the cursor unusable in + * subsequent calls to SCAN. + * + * @see ClusterScanCursor for more details about how to use the cursor. + * @see valkey.io for details. + * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new + * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. + * @return An Array with two elements. The first element is always the {@link + * ClusterScanCursor} for the next iteration of results. To see if there is more data on the + * given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the + * current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after + * using the cursor in a call to this method. The cursor cannot be used in a scan again after + * {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an + * + * Array of String elements each representing a key. + * @example + *

{@code
+     * // Assume key contains a set with 200 keys
+     * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+     * Object[] result;
+     * while (!cursor.isFinished()) {
+     *   result = client.scan(cursor).get();
+     *   cursor.releaseCursorHandle();
+     *   cursor = (ClusterScanCursor) result[0];
+     *   Object[] stringResults = (Object[]) result[1];
+     *
+     *   System.out.println("\nSCAN iteration:");
+     *   Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
+     * }
+     * }
+ */ + CompletableFuture scan(ClusterScanCursor cursor); + + /** + * Incrementally iterates over the keys in the Cluster. + * + *

This command is similar to the SCAN command, but it is designed to work in a + * Cluster environment. The main difference is that this command uses a {@link ClusterScanCursor} + * object to manage iterations. For more information about the Cluster Scan implementation, see Cluster + * Scan. + * + *

As with the SCAN command, this command is a cursor-based iterator. This means + * that at every call of the command, the server returns an updated cursor ({@link + * ClusterScanCursor}) that the user needs to re-send as the cursor argument in the + * next call. The iteration terminates when the returned cursor {@link + * ClusterScanCursor#isFinished()} returns true. + * + *

This method guarantees that all keyslots available when the first SCAN is called will be + * scanned before the cursor is finished. Any keys added after the initial scan request is made + * are not guaranteed to be scanned. + * + *

Note that the same key may be returned in multiple scan iterations. + * + *

How to use the {@link ClusterScanCursor}:
+ * For each iteration, the previous scan {@link ClusterScanCursor} object should be used to + * continue the SCAN by passing it in the cursor argument. Using the + * same cursor object for multiple iterations may result in the same keys returned or unexpected + * behavior. + * + *

When the cursor is no longer needed, call {@link ClusterScanCursor#releaseCursorHandle()} to + * immediately free resources tied to the cursor. Note that this makes the cursor unusable in + * subsequent calls to SCAN. + * + * @see ClusterScanCursor for more details about how to use the cursor. + * @see valkey.io for details. + * @param cursor The {@link ClusterScanCursor} object that wraps the scan state. To start a new + * scan, create a new empty ClusterScanCursor using {@link ClusterScanCursor#initalCursor()}. + * @param options The {@link ScanOptions}. + * @return An Array with two elements. The first element is always the {@link + * ClusterScanCursor} for the next iteration of results. To see if there is more data on the + * given cursor, call {@link ClusterScanCursor#isFinished()}. To release resources for the + * current cursor immediately, call {@link ClusterScanCursor#releaseCursorHandle()} after + * using the cursor in a call to this method. The cursor cannot be used in a scan again after + * {@link ClusterScanCursor#releaseCursorHandle()} has been called. The second element is an + * + * Array of String elements each representing a key. + * @example + *

{@code
+     * // Assume key contains a set with 200 keys
+     * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+     * Object[] result;
+     * while (!cursor.isFinished()) {
+     *   result = client.scan(cursor).get();
+     *   cursor.releaseCursorHandle();
+     *   cursor = (ClusterScanCursor) result[0];
+     *   Object[] stringResults = (Object[]) result[1];
+     *
+     *   System.out.println("\nSCAN iteration:");
+     *   Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
+     * }
+     * }
+ */ + CompletableFuture scan(ClusterScanCursor cursor, ScanOptions options); + /** * Sorts the elements in the list, set, or sorted set at key and returns the result. *
diff --git a/java/client/src/main/java/glide/api/commands/GenericCommands.java b/java/client/src/main/java/glide/api/commands/GenericCommands.java index 1fdc2464b7..8bbbbb5e97 100644 --- a/java/client/src/main/java/glide/api/commands/GenericCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericCommands.java @@ -184,7 +184,7 @@ CompletableFuture copy( /** * Returns a random key from currently selected database. * - * @see redis.io for details. + * @see valkey.io for details. * @return A random key from the database. * @example *
{@code
@@ -203,6 +203,7 @@ CompletableFuture copy(
      * apply transformations on sorted elements.
* To store the result into a new key, see {@link #sortStore(String, String, SortOptions)}. * + * @see valkey.io for details. * @param key The key of the list, set, or sorted set to be sorted. * @param sortOptions The {@link SortOptions}. * @return An Array of sorted elements. @@ -247,6 +248,7 @@ CompletableFuture copy( * This command is routed depending on the client's {@link ReadFrom} strategy. * * @since Redis 7.0 and above. + * @see valkey.io for details. * @param key The key of the list, set, or sorted set to be sorted. * @param sortOptions The {@link SortOptions}. * @return An Array of sorted elements. @@ -291,6 +293,7 @@ CompletableFuture copy( * key.
* To get the sort result without storing it into a key, see {@link #sort(String, SortOptions)}. * + * @see valkey.io for details. * @param key The key of the list, set, or sorted set to be sorted. * @param sortOptions The {@link SortOptions}. * @param destination The key where the sorted result will be stored. diff --git a/java/client/src/main/java/glide/api/logging/Logger.java b/java/client/src/main/java/glide/api/logging/Logger.java index 744c5c2ddc..2b711b8d68 100644 --- a/java/client/src/main/java/glide/api/logging/Logger.java +++ b/java/client/src/main/java/glide/api/logging/Logger.java @@ -4,6 +4,9 @@ import static glide.ffi.resolvers.LoggerResolver.initInternal; import static glide.ffi.resolvers.LoggerResolver.logInternal; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; import java.util.function.Supplier; import lombok.Getter; import lombok.NonNull; @@ -170,6 +173,31 @@ public static void log( logInternal(level.getLevel(), logIdentifier, message); } + /** + * Logs the provided exception or error if the provided log level is lower than the logger level. + * + * @param level The log level of the provided message. + * @param logIdentifier The log identifier should give the log a context. + * @param throwable The exception or error to log. + */ + public static void log( + @NonNull Level level, @NonNull String logIdentifier, @NonNull Throwable throwable) { + // TODO: Add the corresponding API to Python and Node.js. + log( + level, + logIdentifier, + () -> { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(outputStream)) { + throwable.printStackTrace(printStream); + return printStream.toString(); + } catch (IOException e) { + // This can't happen with a ByteArrayOutputStream. + return null; + } + }); + } + /** * Creates a new logger instance and configure it with the provided log level and file name. * diff --git a/java/client/src/main/java/glide/api/models/Script.java b/java/client/src/main/java/glide/api/models/Script.java index 0d06374e26..bec0ec6960 100644 --- a/java/client/src/main/java/glide/api/models/Script.java +++ b/java/client/src/main/java/glide/api/models/Script.java @@ -18,6 +18,8 @@ public class Script implements AutoCloseable { /** Hash string representing the code. */ @Getter private final String hash; + private boolean isDropped = false; + /** Indicatoin if script invocation output can return binary data. */ @Getter private final Boolean binarySafeOutput; @@ -35,7 +37,11 @@ public Script(T code, Boolean binarySafeOutput) { /** Drop the linked script from glide-rs code. */ @Override public void close() throws Exception { - dropScript(hash); + + if (!isDropped) { + dropScript(hash); + isDropped = true; + } } @Override diff --git a/java/client/src/main/java/glide/api/models/commands/scan/BaseScanOptions.java b/java/client/src/main/java/glide/api/models/commands/scan/BaseScanOptions.java index e97560fd3d..f85ba0fc9f 100644 --- a/java/client/src/main/java/glide/api/models/commands/scan/BaseScanOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/scan/BaseScanOptions.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import lombok.experimental.SuperBuilder; /** @@ -26,7 +27,7 @@ public abstract class BaseScanOptions { * COUNT being 10 which indicates that it will only fetch and match * 10 items from the list. */ - private final String matchPattern; + protected final String matchPattern; /** * COUNT is a just a hint for the command for how many elements to fetch from the @@ -34,7 +35,15 @@ public abstract class BaseScanOptions { * enough for the SCAN commands to represent the results as compact single-allocation * packed encoding. */ - private final Long count; + protected final Long count; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof BaseScanOptions)) return false; + BaseScanOptions that = (BaseScanOptions) o; + return Objects.equals(matchPattern, that.matchPattern) && Objects.equals(count, that.count); + } /** * Creates the arguments to be used in SCAN commands. diff --git a/java/client/src/main/java/glide/api/models/commands/scan/ClusterScanCursor.java b/java/client/src/main/java/glide/api/models/commands/scan/ClusterScanCursor.java new file mode 100644 index 0000000000..bc3101ae71 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/scan/ClusterScanCursor.java @@ -0,0 +1,92 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands.scan; + +import glide.api.commands.GenericClusterCommands; + +/** + * A cursor is used to iterate through data returned by cluster SCAN requests. + * + *

This interface is used in two ways: + * + *

    + *
  1. An {@link #initalCursor()} is passed to {@link GenericClusterCommands#scan} to start a + * cluster SCAN request. + *
  2. The result of the {@link GenericClusterCommands#scan} call returns a cursor at index + * 0 of the returned Object[]. This cursor can be supplied again to a call + * to {@link GenericClusterCommands#scan}, provided that {@link #isFinished()} returns + * false. + *
+ * + *

Note that cursors returned by {@link GenericClusterCommands#scan} may hold external resources. + * These resources can be released by calling {@link #releaseCursorHandle()}. However, doing so will + * invalidate the cursor from being used in another {@link GenericClusterCommands#scan}. + * + *

To do this safely, follow this procedure: + * + *

    + *
  1. Call {@link GenericClusterCommands#scan} with the cursor. + *
  2. Call {@link #releaseCursorHandle()} to destroy the cursor. + *
  3. Assign the new cursor returned by {@link GenericClusterCommands#scan}. + *
+ * + * @see GenericClusterCommands#scan + * @example + *
{@code
+ * ClusterScanCursor cursor = ClusterScanCursor.initialCursor();
+ * Object[] result;
+ * while (!cursor.isFinished()) {
+ *     result = client.scan(cursor).get();
+ *     cursor.releaseCursorHandle();
+ *     cursor = (ClusterScanCursor) result[0];
+ *     Object[] stringResults = (Object[]) result[1];
+ *
+ *     System.out.println("\nSCAN iteration:");
+ *     Arrays.asList(stringResults).stream().forEach(i -> System.out.print(i + ", "));
+ * }
+ * }
+ */ +public interface ClusterScanCursor { + /** + * Indicates if this cursor is the last set of data available. + * + *

If this method returns false, this cursor instance should get passed to {@link + * GenericClusterCommands#scan} to get next set of data. + * + * @return true if this cursor is the last set of data. False if there is potentially more data + * available. + */ + boolean isFinished(); + + /** + * Releases resources related to this cursor. + * + *

This method can be called to immediately release resources tied to this cursor instance. + * Note that if this is called, this cursor cannot be used in {@link GenericClusterCommands#scan}. + * Also, this method is optional for the caller. If it does not get called, cursor resources will + * be freed during garbage collection. + */ + void releaseCursorHandle(); + + /** + * The special cursor used to start the first in a series of {@link GenericClusterCommands#scan} + * calls. + */ + ClusterScanCursor INITIAL_CURSOR_INSTANCE = + new ClusterScanCursor() { + @Override + public boolean isFinished() { + // The initial cursor can always request more data. + return false; + } + + @Override + public void releaseCursorHandle() { + // Ignore. + } + }; + + /** Creates an empty cursor to be used in the initial {@link GenericClusterCommands#scan} call. */ + static ClusterScanCursor initalCursor() { + return INITIAL_CURSOR_INSTANCE; + } +} diff --git a/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java b/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java new file mode 100644 index 0000000000..153de3f1c4 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/scan/ScanOptions.java @@ -0,0 +1,88 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands.scan; + +import glide.api.commands.GenericClusterCommands; +import glide.api.commands.GenericCommands; +import glide.ffi.resolvers.ObjectTypeResolver; +import glide.utils.ArrayTransformUtils; +import lombok.experimental.SuperBuilder; + +/** + * Optional arguments for {@link GenericCommands#scan} and {@link GenericClusterCommands#scan}. + * + * @see valkey.io + */ +@SuperBuilder +public class ScanOptions extends BaseScanOptions { + /** TYPE option string to include in the SCAN commands. */ + public static final String TYPE_OPTION_STRING = "TYPE"; + + /** + * Use this option to ask SCAN to only return objects that match a given type.
+ * The filter is applied after elements are retrieved from the database, so the option does not + * reduce the amount of work the server has to do to complete a full iteration. For rare types you + * may receive no elements in many iterations. + */ + private final ObjectType type; + + public enum ObjectType { + STRING(ObjectTypeResolver.OBJECT_TYPE_STRING_NATIVE_NAME), + LIST(ObjectTypeResolver.OBJECT_TYPE_LIST_NATIVE_NAME), + SET(ObjectTypeResolver.OBJECT_TYPE_SET_NATIVE_NAME), + ZSET(ObjectTypeResolver.OBJECT_TYPE_ZSET_NATIVE_NAME), + HASH(ObjectTypeResolver.OBJECT_TYPE_HASH_NATIVE_NAME), + STREAM(ObjectTypeResolver.OBJECT_TYPE_STREAM_NATIVE_NAME); + + /** + * @return the name of the enum when communicating with the native layer. + */ + public String getNativeName() { + return nativeName; + } + + private final String nativeName; + + ObjectType(String nativeName) { + this.nativeName = nativeName; + } + } + + @Override + public String[] toArgs() { + if (type != null) { + return ArrayTransformUtils.concatenateArrays( + super.toArgs(), new String[] {TYPE_OPTION_STRING, type.name()}); + } + return super.toArgs(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ScanOptions)) return false; + if (!super.equals(o)) return false; + ScanOptions that = (ScanOptions) o; + return type == that.type; + } + + /** + * @return the pattern used for the MATCH filter. + */ + public String getMatchPattern() { + return matchPattern; + } + + /** + * @return the count used for the COUNT field. + */ + public Long getCount() { + return count; + } + + /** + * @return the type used for the TYPE filter. + */ + public ObjectType getType() { + return type; + } +} diff --git a/java/client/src/main/java/glide/ffi/resolvers/ClusterScanCursorResolver.java b/java/client/src/main/java/glide/ffi/resolvers/ClusterScanCursorResolver.java new file mode 100644 index 0000000000..8657bfa0f4 --- /dev/null +++ b/java/client/src/main/java/glide/ffi/resolvers/ClusterScanCursorResolver.java @@ -0,0 +1,22 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.ffi.resolvers; + +import glide.managers.CommandManager; + +/** + * Helper class for invoking JNI resources for {@link CommandManager.ClusterScanCursorDetail} + * implementations. + */ +public final class ClusterScanCursorResolver { + public static final String FINISHED_CURSOR_HANDLE; + + // TODO: consider lazy loading the glide_rs library + static { + NativeUtils.loadGlideLib(); + FINISHED_CURSOR_HANDLE = getFinishedCursorHandleConstant(); + } + + public static native void releaseNativeCursor(String cursor); + + public static native String getFinishedCursorHandleConstant(); +} diff --git a/java/client/src/main/java/glide/ffi/resolvers/ObjectTypeResolver.java b/java/client/src/main/java/glide/ffi/resolvers/ObjectTypeResolver.java new file mode 100644 index 0000000000..deaaa54b68 --- /dev/null +++ b/java/client/src/main/java/glide/ffi/resolvers/ObjectTypeResolver.java @@ -0,0 +1,37 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.ffi.resolvers; + +import glide.api.models.commands.scan.ScanOptions; + +/** Helper class for invoking JNI resources for the {@link ScanOptions.ObjectType} enum. */ +public class ObjectTypeResolver { + public static final String OBJECT_TYPE_STRING_NATIVE_NAME; + public static final String OBJECT_TYPE_LIST_NATIVE_NAME; + public static final String OBJECT_TYPE_SET_NATIVE_NAME; + public static final String OBJECT_TYPE_ZSET_NATIVE_NAME; + public static final String OBJECT_TYPE_HASH_NATIVE_NAME; + public static final String OBJECT_TYPE_STREAM_NATIVE_NAME; + + // TODO: consider lazy loading the glide_rs library + static { + NativeUtils.loadGlideLib(); + OBJECT_TYPE_STRING_NATIVE_NAME = getTypeStringConstant(); + OBJECT_TYPE_LIST_NATIVE_NAME = getTypeListConstant(); + OBJECT_TYPE_SET_NATIVE_NAME = getTypeSetConstant(); + OBJECT_TYPE_ZSET_NATIVE_NAME = getTypeZSetConstant(); + OBJECT_TYPE_HASH_NATIVE_NAME = getTypeHashConstant(); + OBJECT_TYPE_STREAM_NATIVE_NAME = getTypeStreamConstant(); + } + + public static native String getTypeStringConstant(); + + public static native String getTypeListConstant(); + + public static native String getTypeSetConstant(); + + public static native String getTypeZSetConstant(); + + public static native String getTypeHashConstant(); + + public static native String getTypeStreamConstant(); +} diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index ebbb9df024..77595c6af9 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -6,6 +6,8 @@ import glide.api.models.GlideString; import glide.api.models.Script; import glide.api.models.Transaction; +import glide.api.models.commands.scan.ClusterScanCursor; +import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.RequestRoutingConfiguration.ByAddressRoute; import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute; @@ -22,6 +24,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import redis_request.RedisRequestOuterClass; import redis_request.RedisRequestOuterClass.Command; @@ -44,6 +47,19 @@ public class CommandManager { /** UDS connection representation. */ private final ChannelHandler channel; + /** + * Internal interface for exposing implementation details about a ClusterScanCursor. This is an + * interface so that it can be mocked in tests. + */ + public interface ClusterScanCursorDetail extends ClusterScanCursor { + /** + * Returns the handle String representing the cursor. + * + * @return the handle String representing the cursor. + */ + String getCursorHandle(); + } + /** * Build a command and send. * @@ -166,6 +182,23 @@ public CompletableFuture submitNewTransaction( return submitCommandToChannel(command, responseHandler); } + /** + * Submits a scan request with cursor + * + * @param cursor Iteration cursor + * @param options {@link ScanOptions} + * @param responseHandler The handler for the response object + * @return A result promise of type T + */ + public CompletableFuture submitClusterScan( + ClusterScanCursor cursor, + @NonNull ScanOptions options, + RedisExceptionCheckedFunction responseHandler) { + + final RedisRequest.Builder command = prepareCursorRequest(cursor, options); + return submitCommandToChannel(command, responseHandler); + } + /** * Take a redis request and send to channel. * @@ -288,6 +321,43 @@ protected RedisRequest.Builder prepareRedisRequest( return route.isPresent() ? prepareRedisRequestRoute(builder, route.get()) : builder; } + /** + * Build a protobuf cursor scan request. + * + * @param cursor Iteration cursor + * @param options {@link ScanOptions} + * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by + * adding a callback id. + */ + protected RedisRequest.Builder prepareCursorRequest( + @NonNull ClusterScanCursor cursor, @NonNull ScanOptions options) { + + RedisRequestOuterClass.ClusterScan.Builder clusterScanBuilder = + RedisRequestOuterClass.ClusterScan.newBuilder(); + + if (cursor != ClusterScanCursor.INITIAL_CURSOR_INSTANCE) { + if (cursor instanceof ClusterScanCursorDetail) { + clusterScanBuilder.setCursor(((ClusterScanCursorDetail) cursor).getCursorHandle()); + } else { + throw new IllegalArgumentException("Illegal cursor submitted."); + } + } + + if (options.getMatchPattern() != null) { + clusterScanBuilder.setMatchPattern(options.getMatchPattern()); + } + + if (options.getCount() != null) { + clusterScanBuilder.setCount(options.getCount()); + } + + if (options.getType() != null) { + clusterScanBuilder.setObjectType(options.getType().getNativeName()); + } + + return RedisRequest.newBuilder().setClusterScan(clusterScanBuilder.build()); + } + /** * Build a protobuf command request object. * diff --git a/java/client/src/test/java/glide/api/RedisClusterClientTest.java b/java/client/src/test/java/glide/api/RedisClusterClientTest.java index dd049c3504..0cbc82cf38 100644 --- a/java/client/src/test/java/glide/api/RedisClusterClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClusterClientTest.java @@ -10,9 +10,6 @@ import static glide.api.models.commands.SortBaseOptions.LIMIT_COMMAND_STRING; import static glide.api.models.commands.SortBaseOptions.OrderBy.DESC; import static glide.api.models.commands.SortBaseOptions.STORE_COMMAND_STRING; -import static glide.api.models.commands.SortOptions.ALPHA_COMMAND_STRING; -import static glide.api.models.commands.SortOptions.LIMIT_COMMAND_STRING; -import static glide.api.models.commands.SortOptions.STORE_COMMAND_STRING; import static glide.api.models.commands.function.FunctionListOptions.LIBRARY_NAME_REDIS_API; import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API; import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; @@ -66,6 +63,8 @@ import glide.api.models.commands.SortClusterOptions; import glide.api.models.commands.function.FunctionLoadOptions; import glide.api.models.commands.function.FunctionRestorePolicy; +import glide.api.models.commands.scan.ClusterScanCursor; +import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.RequestRoutingConfiguration.Route; import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute; import glide.managers.CommandManager; @@ -78,6 +77,7 @@ import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.ConstantResponse; import response.ResponseOuterClass.Response; @@ -2872,4 +2872,120 @@ public void sortStore_with_options_binary_returns_success() { assertEquals(testResponse, response); assertEquals(result, payload); } + + @SneakyThrows + @Test + public void scan_new_cursor() { + CommandManager.ClusterScanCursorDetail mockCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockCursor.getCursorHandle()).thenReturn("1"); + + final Object[] result = new Object[] {mockCursor.getCursorHandle(), new Object[] {"foo"}}; + final CompletableFuture testResponse = CompletableFuture.completedFuture(result); + when(commandManager.submitClusterScan( + eq(ClusterScanCursor.INITIAL_CURSOR_INSTANCE), + eq(ScanOptions.builder().build()), + any())) + .thenReturn(testResponse); + + final CompletableFuture actualResponse = + service.scan(ClusterScanCursor.initalCursor()); + assertEquals( + mockCursor.getCursorHandle(), + ((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle()); + } + + @SneakyThrows + @Test + public void scan_existing_cursor() { + CommandManager.ClusterScanCursorDetail mockCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockCursor.getCursorHandle()).thenReturn("1"); + + CommandManager.ClusterScanCursorDetail mockResultCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockResultCursor.getCursorHandle()).thenReturn("2"); + + final Object[] result = new Object[] {mockResultCursor.getCursorHandle(), new Object[] {"foo"}}; + final CompletableFuture testResponse = CompletableFuture.completedFuture(result); + when(commandManager.submitClusterScan( + eq(mockCursor), eq(ScanOptions.builder().build()), any())) + .thenReturn(testResponse); + + CompletableFuture actualResponse = service.scan(mockCursor); + assertEquals( + mockResultCursor.getCursorHandle(), + ((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle()); + } + + @SneakyThrows + @Test + public void scan_new_cursor_options() { + CommandManager.ClusterScanCursorDetail mockCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockCursor.getCursorHandle()).thenReturn("1"); + + final Object[] result = new Object[] {mockCursor.getCursorHandle(), new Object[] {"foo"}}; + final CompletableFuture testResponse = CompletableFuture.completedFuture(result); + when(commandManager.submitClusterScan( + eq(ClusterScanCursor.INITIAL_CURSOR_INSTANCE), + eq( + ScanOptions.builder() + .matchPattern("key:*") + .count(10L) + .type(ScanOptions.ObjectType.STRING) + .build()), + any())) + .thenReturn(testResponse); + + final CompletableFuture actualResponse = + service.scan( + ClusterScanCursor.initalCursor(), + ScanOptions.builder() + .matchPattern("key:*") + .count(10L) + .type(ScanOptions.ObjectType.STRING) + .build()); + + assertEquals( + mockCursor.getCursorHandle(), + ((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle()); + } + + @SneakyThrows + @Test + public void scan_existing_cursor_options() { + CommandManager.ClusterScanCursorDetail mockCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockCursor.getCursorHandle()).thenReturn("1"); + + CommandManager.ClusterScanCursorDetail mockResultCursor = + Mockito.mock(CommandManager.ClusterScanCursorDetail.class); + when(mockResultCursor.getCursorHandle()).thenReturn("2"); + + final Object[] result = new Object[] {mockResultCursor.getCursorHandle(), new Object[] {"foo"}}; + final CompletableFuture testResponse = CompletableFuture.completedFuture(result); + when(commandManager.submitClusterScan( + eq(mockCursor), + eq( + ScanOptions.builder() + .matchPattern("key:*") + .count(10L) + .type(ScanOptions.ObjectType.STRING) + .build()), + any())) + .thenReturn(testResponse); + + CompletableFuture actualResponse = + service.scan( + mockCursor, + ScanOptions.builder() + .matchPattern("key:*") + .count(10L) + .type(ScanOptions.ObjectType.STRING) + .build()); + assertEquals( + mockResultCursor.getCursorHandle(), + ((CommandManager.ClusterScanCursorDetail) actualResponse.get()[0]).getCursorHandle()); + } } diff --git a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java index 65a4f40572..063dbb3a8d 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterClientTests.java @@ -6,15 +6,28 @@ import static glide.TestUtilities.getRandomString; import static glide.api.BaseClient.OK; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; import glide.api.RedisClusterClient; +import glide.api.models.commands.scan.ClusterScanCursor; +import glide.api.models.commands.scan.ScanOptions; import glide.api.models.configuration.RedisCredentials; import glide.api.models.exceptions.ClosingException; import glide.api.models.exceptions.RequestException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -158,4 +171,377 @@ public void closed_client_throws_ExecutionException_with_ClosingException_as_cau assertThrows(ExecutionException.class, () -> client.set("foo", "bar").get()); assertTrue(executionException.getCause() instanceof ClosingException); } + + @Test + @SneakyThrows + public void test_cluster_scan_simple() { + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + assertEquals(OK, client.flushall().get()); + + String key = "key:test_cluster_scan_simple" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + for (int i = 0; i < 100; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, client.mset(expectedData).get()); + + Set result = new LinkedHashSet<>(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + while (!cursor.isFinished()) { + final Object[] response = client.scan(cursor).get(); + cursor.releaseCursorHandle(); + + cursor = (ClusterScanCursor) response[0]; + final Object[] data = (Object[]) response[1]; + for (Object datum : data) { + result.add(datum.toString()); + } + } + cursor.releaseCursorHandle(); + + assertEquals(expectedData.keySet(), result); + } + } + + @Test + @SneakyThrows + public void test_cluster_scan_with_object_type_and_pattern() { + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + + assertEquals(OK, client.flushall().get()); + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 100; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, client.mset(expectedData).get()); + + ArrayList unexpectedTypeKeys = new ArrayList<>(); + for (int i = baseNumberOfEntries; i < baseNumberOfEntries + 100; i++) { + unexpectedTypeKeys.add(key + ":" + i); + } + + for (String keyStr : unexpectedTypeKeys) { + assertEquals(1L, client.sadd(keyStr, new String[] {"value"}).get()); + } + + Map unexpectedPatterns = new LinkedHashMap<>(); + for (int i = baseNumberOfEntries + 100; i < baseNumberOfEntries + 200; i++) { + unexpectedPatterns.put("foo:" + i, "value " + i); + } + assertEquals(OK, client.mset(unexpectedPatterns).get()); + + Set result = new LinkedHashSet<>(); + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + while (!cursor.isFinished()) { + final Object[] response = + client + .scan( + cursor, + ScanOptions.builder() + .matchPattern("key:*") + .type(ScanOptions.ObjectType.STRING) + .build()) + .get(); + cursor.releaseCursorHandle(); + + cursor = (ClusterScanCursor) response[0]; + final Object[] data = (Object[]) response[1]; + for (Object datum : data) { + result.add(datum.toString()); + } + } + cursor.releaseCursorHandle(); + assertEquals(expectedData.keySet(), result); + + // Ensure that no unexpected types were in the result. + assertFalse(new LinkedHashSet<>(result).removeAll(new LinkedHashSet<>(unexpectedTypeKeys))); + assertFalse(new LinkedHashSet<>(result).removeAll(unexpectedPatterns.keySet())); + } + } + + @Test + @SneakyThrows + public void test_cluster_scan_with_count() { + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + + assertEquals(OK, client.flushall().get()); + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 2000; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + + assertEquals(OK, client.mset(expectedData).get()); + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set keys = new LinkedHashSet<>(); + int successfulComparedScans = 0; + while (!cursor.isFinished()) { + Object[] resultOf1 = client.scan(cursor, ScanOptions.builder().count(1L).build()).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) resultOf1[0]; + keys.addAll( + Arrays.stream((Object[]) resultOf1[1]) + .map(Object::toString) + .collect(Collectors.toList())); + if (cursor.isFinished()) { + break; + } + + Object[] resultOf100 = client.scan(cursor, ScanOptions.builder().count(100L).build()).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) resultOf100[0]; + keys.addAll( + Arrays.stream((Object[]) resultOf100[1]) + .map(Object::toString) + .collect(Collectors.toList())); + + // Note: count is only an optimization hint. It does not have to return the size specified. + if (resultOf1.length <= resultOf100.length) { + successfulComparedScans++; + } + } + cursor.releaseCursorHandle(); + assertTrue(successfulComparedScans > 0); + assertEquals(expectedData.keySet(), keys); + } + } + + @Test + @SneakyThrows + public void test_cluster_scan_with_match() { + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + + assertEquals(OK, client.flushall().get()); + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 2000; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + assertEquals(OK, client.mset(expectedData).get()); + + Map unexpectedPatterns = new LinkedHashMap<>(); + for (int i = baseNumberOfEntries + 100; i < baseNumberOfEntries + 200; i++) { + unexpectedPatterns.put("foo:" + i, "value " + i); + } + assertEquals(OK, client.mset(unexpectedPatterns).get()); + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set keys = new LinkedHashSet<>(); + while (!cursor.isFinished()) { + Object[] result = + client.scan(cursor, ScanOptions.builder().matchPattern("key:*").build()).get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) result[0]; + keys.addAll( + Arrays.stream((Object[]) result[1]).map(Object::toString).collect(Collectors.toList())); + } + cursor.releaseCursorHandle(); + assertEquals(expectedData.keySet(), keys); + assertFalse(new LinkedHashSet<>(keys).removeAll(unexpectedPatterns.keySet())); + } + } + + @Test + @SneakyThrows + public void test_cluster_scan_cleaning_cursor() { + // We test whether the cursor is cleaned up after it is deleted, which we expect to happen when + // th GC is called. + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + assertEquals(OK, client.flushall().get()); + + String key = "key:" + UUID.randomUUID(); + Map expectedData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 100; + for (int i = 0; i < baseNumberOfEntries; i++) { + expectedData.put(key + ":" + i, "value " + i); + } + assertEquals(OK, client.mset(expectedData).get()); + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + final Object[] response = client.scan(cursor).get(); + cursor = (ClusterScanCursor) (response[0]); + cursor.releaseCursorHandle(); + final ClusterScanCursor brokenCursor = cursor; + ExecutionException exception = + assertThrows(ExecutionException.class, () -> client.scan(brokenCursor).get()); + assertInstanceOf(RequestException.class, exception.getCause()); + assertTrue(exception.getCause().getMessage().contains("Invalid scan_state_cursor id")); + } + } + + @Test + @SneakyThrows + public void test_cluster_scan_all_types() { + try (RedisClusterClient client = + RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + assertEquals(OK, client.flushall().get()); + + String key = "key:" + UUID.randomUUID(); + Map stringData = new LinkedHashMap<>(); + final int baseNumberOfEntries = 100; + for (int i = 0; i < baseNumberOfEntries; i++) { + stringData.put(key + ":" + i, "value " + i); + } + assertEquals(OK, client.mset(stringData).get()); + + String setKey = "setKey:" + UUID.randomUUID(); + Map setData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + setData.put(setKey + ":" + i, "value " + i); + } + for (String k : setData.keySet()) { + assertEquals(1L, client.sadd(k, new String[] {"value" + k}).get()); + } + + String hashKey = "hashKey:" + UUID.randomUUID(); + Map hashData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + hashData.put(hashKey + ":" + i, "value " + i); + } + for (String k : hashData.keySet()) { + assertEquals(1L, client.hset(k, Map.of("field" + k, "value" + k)).get()); + } + + String listKey = "listKey:" + UUID.randomUUID(); + Map listData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + listData.put(listKey + ":" + i, "value " + i); + } + for (String k : listData.keySet()) { + assertEquals(1L, client.lpush(k, new String[] {"value" + k}).get()); + } + + String zSetKey = "zSetKey:" + UUID.randomUUID(); + Map zSetData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + zSetData.put(zSetKey + ":" + i, "value " + i); + } + for (String k : zSetData.keySet()) { + assertEquals(1L, client.zadd(k, Map.of(k, 1.0)).get()); + } + + String streamKey = "streamKey:" + UUID.randomUUID(); + Map streamData = new LinkedHashMap<>(); + for (int i = 0; i < baseNumberOfEntries; i++) { + streamData.put(streamKey + ":" + i, "value " + i); + } + for (String k : streamData.keySet()) { + assertNotNull(client.xadd(k, Map.of(k, "value " + k)).get()); + } + + ClusterScanCursor cursor = ClusterScanCursor.initalCursor(); + Set results = new LinkedHashSet<>(); + while (!cursor.isFinished()) { + Object[] response = + client + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.STRING).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]) + .map(Object::toString) + .collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(stringData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + client + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.SET).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]) + .map(Object::toString) + .collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(setData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + client + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.HASH).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]) + .map(Object::toString) + .collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(hashData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + client + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.LIST).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]) + .map(Object::toString) + .collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(listData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + client + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.ZSET).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]) + .map(Object::toString) + .collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(zSetData.keySet(), results); + + cursor = ClusterScanCursor.initalCursor(); + results.clear(); + while (!cursor.isFinished()) { + Object[] response = + client + .scan(cursor, ScanOptions.builder().type(ScanOptions.ObjectType.STREAM).build()) + .get(); + cursor.releaseCursorHandle(); + cursor = (ClusterScanCursor) response[0]; + results.addAll( + Arrays.stream((Object[]) response[1]) + .map(Object::toString) + .collect(Collectors.toSet())); + } + cursor.releaseCursorHandle(); + assertEquals(streamData.keySet(), results); + } + } } diff --git a/java/src/lib.rs b/java/src/lib.rs index f8e6cda231..5a8282897d 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -2,7 +2,16 @@ * Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ use glide_core::start_socket_listener as start_socket_listener_core; + +// Protocol constants to expose to Java. +use glide_core::client::FINISHED_SCAN_CURSOR; +use glide_core::HASH as TYPE_HASH; +use glide_core::LIST as TYPE_LIST; use glide_core::MAX_REQUEST_ARGS_LENGTH as MAX_REQUEST_ARGS_LENGTH_IN_BYTES; +use glide_core::SET as TYPE_SET; +use glide_core::STREAM as TYPE_STREAM; +use glide_core::STRING as TYPE_STRING; +use glide_core::ZSET as TYPE_ZSET; use bytes::Bytes; use jni::errors::Error as JniError; @@ -418,3 +427,170 @@ pub extern "system" fn Java_glide_ffi_resolvers_LoggerResolver_initInternal<'loc ) .unwrap_or(0) } + +/// Releases a ClusterScanCursor handle allocated in Rust. +/// +/// This function is meant to be invoked by Java using JNI. +/// +/// * `_env` - The JNI environment. Not used. +/// * `_class` - The class object. Not used. +/// * cursor - The cursor handle to release. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ClusterScanCursorResolver_releaseNativeCursor< + 'local, +>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + cursor: JString<'local>, +) { + handle_panics( + move || { + fn release_native_cursor( + env: &mut JNIEnv<'_>, + cursor: JString<'_>, + ) -> Result<(), FFIError> { + let cursor_str: String = env.get_string(&cursor)?.into(); + glide_core::cluster_scan_container::remove_scan_state_cursor(cursor_str); + Ok(()) + } + let result = release_native_cursor(&mut env, cursor); + handle_errors(&mut env, result) + }, + "releaseNativeCursor", + ) + .unwrap_or(()) +} + +/// Returns the String representing a finished cursor handle. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ClusterScanCursorResolver_getFinishedCursorHandleConstant< + 'local, +>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, FINISHED_SCAN_CURSOR, "getFinishedCursorHandleConstant") +} + +/// Returns the String representing the name of the ObjectType String. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ObjectTypeResolver_getTypeStringConstant<'local>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, TYPE_STRING, "getTypeStringConstant") +} + +/// Returns the String representing the name of the ObjectType List. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ObjectTypeResolver_getTypeListConstant<'local>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, TYPE_LIST, "getTypeListConstant") +} + +/// Returns the String representing the name of the ObjectType Set. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ObjectTypeResolver_getTypeSetConstant<'local>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, TYPE_SET, "getTypeSetConstant") +} + +/// Returns the String representing the name of the ObjectType ZSet. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ObjectTypeResolver_getTypeZSetConstant<'local>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, TYPE_ZSET, "getTypeZSetConstant") +} + +/// Returns the String representing the name of the ObjectType Hash. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ObjectTypeResolver_getTypeHashConstant<'local>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, TYPE_HASH, "getTypeHashConstant") +} + +/// Returns the String representing the name of the ObjectType Set. +/// +/// This function is meant to be invoked by Java using JNI. This is used to ensure +/// that this constant is consistent with the Rust client. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +#[no_mangle] +pub extern "system" fn Java_glide_ffi_resolvers_ObjectTypeResolver_getTypeStreamConstant<'local>( + env: JNIEnv<'local>, + _class: JClass<'local>, +) -> JString<'local> { + safe_create_jstring(env, TYPE_STREAM, "getTypeStreamConstant") +} + +/// Convert a Rust string to a Java String and handle errors. +/// +/// * `env` - The JNI environment. +/// * `_class` - The class object. Not used. +/// * `input` - The String to convert. +/// * `functionName` - The name of the calling function. +fn safe_create_jstring<'local>( + mut env: JNIEnv<'local>, + input: &str, + function_name: &str, +) -> JString<'local> { + handle_panics( + move || { + fn create_jstring<'a>( + env: &mut JNIEnv<'a>, + input: &str, + ) -> Result, FFIError> { + Ok(env.new_string(input)?) + } + let result = create_jstring(&mut env, input); + handle_errors(&mut env, result) + }, + function_name, + ) + .unwrap_or(JString::<'_>::default()) +}