diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 9961e3cf1c..527c38ddd3 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -204,6 +204,7 @@ fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult get_timeout_from_cmd_arg(cmd, 2, TimeUnit::Milliseconds), _ => Ok(RequestTimeoutOption::ClientConfig), }?; @@ -736,6 +737,17 @@ mod tests { 0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION )) ); + + let mut cmd = Cmd::new(); + cmd.arg("WAIT").arg(1).arg("500"); + let result = get_request_timeout(&cmd, Duration::from_millis(500)); + assert!(result.is_ok()); + assert_eq!( + result.unwrap(), + Some(Duration::from_secs_f64( + 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION + )) + ); } #[test] diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index 0c09cf9650..dc1f48735e 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -241,6 +241,7 @@ enum RequestType { SScan = 200; ZScan = 201; HScan = 202; + Wait = 208; } message Command { diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index ba6285f8ff..ef4a7f1f80 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -211,6 +211,7 @@ pub enum RequestType { SScan = 200, ZScan = 201, HScan = 202, + Wait = 208, } fn get_two_word_command(first: &str, second: &str) -> Cmd { @@ -425,6 +426,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::SScan => RequestType::SScan, ProtobufRequestType::ZScan => RequestType::ZScan, ProtobufRequestType::HScan => RequestType::HScan, + ProtobufRequestType::Wait => RequestType::Wait, } } } @@ -637,6 +639,7 @@ impl RequestType { RequestType::SScan => Some(cmd("SSCAN")), RequestType::ZScan => Some(cmd("ZSCAN")), RequestType::HScan => Some(cmd("HSCAN")), + RequestType::Wait => Some(cmd("WAIT")), } } } diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index ea4cb995c4..188576579b 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -131,6 +131,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.Watch; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; @@ -2950,4 +2951,12 @@ public CompletableFuture hscan( String[] arguments = concatenateArrays(new String[] {key, cursor}, hScanOptions.toArgs()); return commandManager.submitNewCommand(HScan, arguments, this::handleArrayResponse); } + + @Override + public CompletableFuture wait(long numreplicas, long timeout) { + return commandManager.submitNewCommand( + Wait, + new String[] {Long.toString(numreplicas), Long.toString(timeout)}, + this::handleLongResponse); + } } diff --git a/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java b/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java index e440b969d7..9621f5a33c 100644 --- a/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java @@ -1207,4 +1207,22 @@ CompletableFuture restore( * } */ CompletableFuture sortStore(String key, String destination); + + /** + * Blocks the current client until all the previous write commands are successfully transferred + * and acknowledged by at least numreplicas of replicas. If timeout is + * reached, the command returns even if the specified number of replicas were not yet reached. + * + * @param numreplicas The number of replicas to reach. + * @param timeout The timeout value specified in milliseconds. A value of 0 will + * block indefinitely. + * @return The number of replicas reached by all the writes performed in the context of the + * current connection. + * @example + *
{@code
+     * client.set("key", "value).get();
+     * assert client.wait(1L, 1000L).get() == 1L;
+     * }
+ */ + CompletableFuture wait(long numreplicas, long timeout); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 33738ed7d0..32cbc482d3 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -159,6 +159,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; @@ -5623,6 +5624,22 @@ public T hscan(@NonNull String key, @NonNull String cursor, @NonNull HScanOption return getThis(); } + /** + * Returns the number of replicas that acknowledged the write commands sent by the current client + * before this command, both in the case where the specified number of replicas are reached, or + * when the timeout is reached. + * + * @param numreplicas The number of replicas to reach. + * @param timeout The timeout value specified in milliseconds. + * @return Command Response - The number of replicas reached by all the writes performed in the + * context of the current connection. + */ + public T wait(long numreplicas, long timeout) { + String[] args = buildArgs(Long.toString(numreplicas), Long.toString(timeout)); + protobufTransaction.addCommands(buildCommand(Wait, args)); + return getThis(); + } + /** Build protobuf {@link Command} object for given command and arguments. */ protected Command buildCommand(RequestType requestType) { // An empty args array is still needed for parameter-less commands. diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 03a6a0f181..ea21fde707 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -216,6 +216,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.UnWatch; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.Watch; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; @@ -9002,6 +9003,30 @@ public void sortStore_with_options_returns_success() { assertEquals(result, payload); } + @SneakyThrows + @Test + public void wait_returns_success() { + // setup + long numreplicas = 1L; + long timeout = 1000L; + Long result = 5L; + String[] args = new String[] {"1", "1000"}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(result); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(Wait), eq(args), any())).thenReturn(testResponse); + + // exercise + CompletableFuture response = service.wait(numreplicas, timeout); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(result, payload); + } + @SneakyThrows @Test public void sscan_with_options_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 7ff892095b..121615eeae 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -180,6 +180,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; @@ -1387,6 +1388,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), HScanOptions.COUNT_OPTION_STRING, "10"))); + transaction.wait(1L, 1000L); + results.add(Pair.of(Wait, buildArgs("1", "1000"))); + var protobufTransaction = transaction.getProtobufTransaction().build(); for (int idx = 0; idx < protobufTransaction.getCommandsCount(); idx++) { diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 78db33e715..4a3ce815a2 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -8054,4 +8054,47 @@ public void hscan(BaseClient client) { () -> client.hscan(key1, "-1", HScanOptions.builder().count(-1L).build()).get()); assertInstanceOf(RequestException.class, executionException.getCause()); } + + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void waitTest(BaseClient client) { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + + // assert that wait returns 0 under standalone and 1 under cluster mode. + assertEquals(OK, client.set(key, "value").get()); + assertTrue(client.wait(numreplicas, timeout).get() >= (client instanceof RedisClient ? 0 : 1)); + + // command should fail on a negative timeout value + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.wait(1L, -1L).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void wait_timeout_check(BaseClient client) { + String key = UUID.randomUUID().toString(); + // create new client with default request timeout (250 millis) + try (var testClient = + client instanceof RedisClient + ? RedisClient.CreateClient(commonClientConfig().build()).get() + : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + + // ensure that commands do not time out, even if timeout > request timeout + assertEquals(OK, testClient.set(key, "value").get()); + assertEquals((client instanceof RedisClient ? 0 : 1), testClient.wait(1L, 1000L).get()); + + // with 0 timeout (no timeout) wait should block indefinitely, + // but we wrap the test with timeout to avoid test failing or being stuck forever + assertEquals(OK, testClient.set(key, "value2").get()); + assertThrows( + TimeoutException.class, // <- future timeout, not command timeout + () -> testClient.wait(100L, 0L).get(1000, TimeUnit.MILLISECONDS)); + } + } } diff --git a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java index 8ab06923af..4f24b6b327 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java @@ -308,4 +308,24 @@ public void sort() { assertDeepEquals(expectedResult, results); } + + @SneakyThrows + @Test + public void waitTest() { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + ClusterTransaction transaction = new ClusterTransaction(); + + transaction.set(key, "value").wait(numreplicas, timeout); + Object[] results = clusterClient.exec(transaction).get(); + Object[] expectedResult = + new Object[] { + OK, // set(key, "value") + 0L, // wait(numreplicas, timeout) + }; + assertEquals(expectedResult[0], results[0]); + assertTrue((Long) expectedResult[1] <= (Long) results[1]); + } } diff --git a/java/integTest/src/test/java/glide/standalone/TransactionTests.java b/java/integTest/src/test/java/glide/standalone/TransactionTests.java index 811f89a091..3b4e4ca186 100644 --- a/java/integTest/src/test/java/glide/standalone/TransactionTests.java +++ b/java/integTest/src/test/java/glide/standalone/TransactionTests.java @@ -492,4 +492,26 @@ public void sort_and_sortReadOnly() { assertArrayEquals(expectedResults, client.exec(transaction2).get()); } } + + @SneakyThrows + @Test + public void waitTest() { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + Transaction transaction = new Transaction(); + + transaction.set(key, "value"); + transaction.wait(numreplicas, timeout); + + Object[] results = client.exec(transaction).get(); + Object[] expectedResult = + new Object[] { + OK, // set(key, "value") + 0L, // wait(numreplicas, timeout) + }; + assertEquals(expectedResult[0], results[0]); + assertTrue((long) expectedResult[1] <= (long) results[1]); + } }