Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lua binary support #1729

Merged
merged 16 commits into from
Jul 2, 2024
4 changes: 2 additions & 2 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ authors = ["Amazon Web Services"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = { version = "^1.3", optional = true }
bytes = "1"
futures = "^0.3"
redis = { path = "../submodules/redis-rs/redis", features = ["aio", "tokio-comp", "tokio-rustls-comp", "connection-manager","cluster", "cluster-async"] }
tokio = { version = "1", features = ["macros", "time"] }
Expand All @@ -28,7 +28,7 @@ arcstr = "1.1.5"
sha1_smol = "1.0.0"

[features]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util", "bytes"]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util"]

[dev-dependencies]
rsevents = "0.3.1"
Expand Down
17 changes: 8 additions & 9 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use redis::{Cmd, ErrorKind, PushInfo, Value};
use redis::{RedisError, RedisResult};
pub use standalone_client::StandaloneClient;
use std::io;
use std::ops::Deref;
use std::time::Duration;
pub use types::*;

Expand Down Expand Up @@ -327,11 +326,11 @@ impl Client {
.boxed()
}

pub async fn invoke_script<'a, T: Deref<Target = str>>(
pub async fn invoke_script<'a>(
&'a mut self,
hash: &'a str,
keys: Vec<T>,
args: Vec<T>,
keys: &Vec<&[u8]>,
args: &Vec<&[u8]>,
routing: Option<RoutingInfo>,
) -> redis::RedisResult<Value> {
let eval = eval_cmd(hash, keys, args);
Expand All @@ -343,7 +342,7 @@ impl Client {
let Some(code) = get_script(hash) else {
return Err(err);
};
let load = load_cmd(code.as_str());
let load = load_cmd(&code);
self.send_command(&load, None).await?;
self.send_command(&eval, routing).await
} else {
Expand All @@ -352,20 +351,20 @@ impl Client {
}
}

fn load_cmd(code: &str) -> Cmd {
fn load_cmd(code: &[u8]) -> Cmd {
let mut cmd = redis::cmd("SCRIPT");
cmd.arg("LOAD").arg(code);
cmd
}

fn eval_cmd<T: Deref<Target = str>>(hash: &str, keys: Vec<T>, args: Vec<T>) -> Cmd {
fn eval_cmd(hash: &str, keys: &Vec<&[u8]>, args: &Vec<&[u8]>) -> Cmd {
let mut cmd = redis::cmd("EVALSHA");
cmd.arg(hash).arg(keys.len());
for key in keys {
cmd.arg(&*key);
cmd.arg(key);
}
for arg in args {
cmd.arg(&*arg);
cmd.arg(arg);
}
cmd
}
Expand Down
4 changes: 2 additions & 2 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ message Command {

message ScriptInvocation {
string hash = 1;
repeated string keys = 2;
repeated string args = 3;
repeated bytes keys = 2;
repeated bytes args = 3;
}

message Transaction {
Expand Down
16 changes: 9 additions & 7 deletions glide-core/src/scripts_container.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use bytes::BytesMut;
/**
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/
use arcstr::ArcStr;
use logger_core::log_info;
use once_cell::sync::Lazy;
use sha1_smol::Sha1;
use std::{collections::HashMap, sync::Mutex};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

static CONTAINER: Lazy<Mutex<HashMap<String, ArcStr>>> = Lazy::new(|| Mutex::new(HashMap::new()));
static CONTAINER: Lazy<Mutex<HashMap<String, Arc<BytesMut>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn add_script(script: &str) -> String {
pub fn add_script(script: &[u8]) -> String {
let mut hash = Sha1::new();
hash.update(script.as_bytes());
hash.update(script);
let hash = hash.digest().to_string();
log_info(
"script lifetime",
Expand All @@ -20,11 +22,11 @@ pub fn add_script(script: &str) -> String {
CONTAINER
.lock()
.unwrap()
.insert(hash.clone(), script.into());
.insert(hash.clone(), Arc::new(script.into()));
hash
}

pub fn get_script(hash: &str) -> Option<ArcStr> {
pub fn get_script(hash: &str) -> Option<Arc<BytesMut>> {
CONTAINER.lock().unwrap().get(hash).cloned()
}

Expand Down
5 changes: 4 additions & 1 deletion glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,11 @@ async fn invoke_script(
mut client: Client,
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
// convert Vec<bytes> to vec<[u8]>
let keys: Vec<&[u8]> = script.keys.iter().map(|e| e.as_ref()).collect();
let args: Vec<&[u8]> = script.args.iter().map(|e| e.as_ref()).collect();
client
.invoke_script(&script.hash, script.keys, script.args, routing)
.invoke_script(&script.hash, &keys, &args, routing)
.await
.map_err(|err| err.into())
}
Expand Down
2 changes: 1 addition & 1 deletion glide-core/tests/test_socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ mod socket_listener {
let key = generate_random_string(KEY_LENGTH);
let value = generate_random_string(VALUE_LENGTH);
let script = r#"redis.call("SET", KEYS[1], ARGV[1]); return redis.call("GET", KEYS[1])"#;
let hash = add_script(script);
let hash = add_script(script.as_bytes());

let approx_message_length = hash.len() + value.len() + key.len() + APPROX_RESP_HEADER_LEN;
let mut buffer = Vec::with_capacity(approx_message_length);
Expand Down
41 changes: 37 additions & 4 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
import glide.api.models.commands.RestoreOptions;
import glide.api.models.commands.ScoreFilter;
import glide.api.models.commands.ScriptOptions;
import glide.api.models.commands.ScriptOptionsGlideString;
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.WeightAggregateOptions.Aggregate;
import glide.api.models.commands.WeightAggregateOptions.KeyArray;
Expand Down Expand Up @@ -392,6 +393,10 @@ protected Object handleObjectOrNullResponse(Response response) throws RedisExcep
Object.class, EnumSet.of(ResponseFlags.IS_NULLABLE, ResponseFlags.ENCODING_UTF8), response);
}

protected Object handleBinaryObjectOrNullResponse(Response response) throws RedisException {
return handleRedisResponse(Object.class, EnumSet.of(ResponseFlags.IS_NULLABLE), response);
}

protected String handleStringResponse(Response response) throws RedisException {
return handleRedisResponse(String.class, EnumSet.of(ResponseFlags.ENCODING_UTF8), response);
}
Expand Down Expand Up @@ -1439,15 +1444,43 @@ public CompletableFuture<Long> pexpiretime(@NonNull GlideString key) {

@Override
public CompletableFuture<Object> invokeScript(@NonNull Script script) {
return commandManager.submitScript(
script, List.of(), List.of(), this::handleObjectOrNullResponse);
if (script.getBinarySafeOutput()) {
return commandManager.submitScript(
script, List.of(), List.of(), this::handleObjectOrNullResponse);
} else {
return commandManager.submitScript(
script, List.of(), List.of(), this::handleBinaryObjectOrNullResponse);
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public CompletableFuture<Object> invokeScript(
@NonNull Script script, @NonNull ScriptOptions options) {
return commandManager.submitScript(
script, options.getKeys(), options.getArgs(), this::handleObjectOrNullResponse);
if (script.getBinarySafeOutput()) {
return commandManager.submitScript(
script,
options.getKeys().stream().map(GlideString::gs).collect(Collectors.toList()),
options.getArgs().stream().map(GlideString::gs).collect(Collectors.toList()),
this::handleObjectOrNullResponse);
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
} else {
return commandManager.submitScript(
script,
options.getKeys().stream().map(GlideString::gs).collect(Collectors.toList()),
options.getArgs().stream().map(GlideString::gs).collect(Collectors.toList()),
this::handleBinaryObjectOrNullResponse);
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public CompletableFuture<Object> invokeScript(
@NonNull Script script, @NonNull ScriptOptionsGlideString options) {
if (script.getBinarySafeOutput()) {
return commandManager.submitScript(
script, options.getKeys(), options.getArgs(), this::handleObjectOrNullResponse);
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
} else {
return commandManager.submitScript(
script, options.getKeys(), options.getArgs(), this::handleBinaryObjectOrNullResponse);
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.RestoreOptions;
import glide.api.models.commands.ScriptOptions;
import glide.api.models.commands.ScriptOptionsGlideString;
import glide.api.models.configuration.ReadFrom;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -619,6 +620,7 @@ CompletableFuture<Boolean> pexpireAt(
* @example
* <pre>{@code
* try(Script luaScript = new Script("return 'Hello'")) {
* luaScript.setBinarySafeOutput(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* luaScript.setBinarySafeOutput(true);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* String result = (String) client.invokeScript(luaScript).get();
* assert result.equals("Hello");
* }
Expand All @@ -642,6 +644,7 @@ CompletableFuture<Boolean> pexpireAt(
* @example
* <pre>{@code
* try(Script luaScript = new Script("return { KEYS[1], ARGV[1] }")) {
* luaScript.setBinarySafeOutput(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* luaScript.setBinarySafeOutput(true);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* ScriptOptions scriptOptions = ScriptOptions.builder().key("foo").arg("bar").build();
* Object[] result = (Object[]) client.invokeScript(luaScript, scriptOptions).get();
* assert result[0].equals("foo");
Expand All @@ -651,6 +654,31 @@ CompletableFuture<Boolean> pexpireAt(
*/
CompletableFuture<Object> invokeScript(Script script, ScriptOptions options);

/**
* Invokes a Lua script with its keys and arguments.<br>
* This method simplifies the process of invoking scripts on a Redis server by using an object
* that represents a Lua script. The script loading, argument preparation, and execution will all
* be handled internally. If the script has not already been loaded, it will be loaded
* automatically using the Redis <code>SCRIPT LOAD</code> command. After that, it will be invoked
* using the Redis <code>EVALSHA</code> command.
*
* @see <a href="https://redis.io/commands/script-load/">SCRIPT LOAD</a> and <a
* href="https://redis.io/commands/evalsha/">EVALSHA</a> for details.
* @param script The Lua script to execute.
* @param options The script option that contains keys and arguments for the script.
* @return a value that depends on the script that was executed.
* @example
* <pre>{@code
* try(Script luaScript = new Script(gs("return { KEYS[1], ARGV[1] }"))) {
* ScriptOptionsGlideString scriptOptions = ScriptOptionsGlideString.builder().key(gs("foo")).arg(gs("bar")).build();
* Object[] result = (Object[]) client.invokeScript(luaScript, scriptOptions).get();
* assert result[0].equals(gs("foo"));
* assert result[1].equals(gs("bar"));
* }
* }</pre>
*/
CompletableFuture<Object> invokeScript(Script script, ScriptOptionsGlideString options);
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
16 changes: 15 additions & 1 deletion java/client/src/main/java/glide/api/models/Script.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models;

import static glide.api.models.GlideString.gs;
import static glide.ffi.resolvers.ScriptResolver.dropScript;
import static glide.ffi.resolvers.ScriptResolver.storeScript;

import glide.api.commands.GenericBaseCommands;
import lombok.Getter;
import lombok.Setter;

/**
* A wrapper for a Script object for {@link GenericBaseCommands#invokeScript(Script)} As long as
Expand All @@ -18,13 +20,25 @@ public class Script implements AutoCloseable {
/** Hash string representing the code. */
@Getter private final String hash;

/** Indicatoin if script invocation output can return binary data. */
@Getter @Setter private Boolean binarySafeOutput = false;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

/**
* Wraps around creating a Script object from <code>code</code>.
*
* @param code To execute with a ScriptInvoke call.
*/
public Script(String code) {
hash = storeScript(code);
hash = storeScript(gs(code).getBytes());
}

/**
* Wraps around creating a Script object from <code>code</code>.
*
* @param code To execute with a ScriptInvoke call.
*/
public Script(GlideString code) {
hash = storeScript(code.getBytes());
}

/** Drop the linked script from glide-rs <code>code</code>. */
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace existing options without introducing new classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully understand

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patch existing class to store GlideString instead of String

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands;

import glide.api.commands.GenericBaseCommands;
import glide.api.models.GlideString;
import glide.api.models.Script;
import java.util.List;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;

/**
* Optional arguments for {@link GenericBaseCommands#invokeScript(Script, ScriptOptionsGlideString)}
* command.
*
* @see <a href="https://redis.io/commands/evalsha/">redis.io</a>
*/
@Builder
public class ScriptOptionsGlideString {

/** The keys that are used in the script. */
@Singular @Getter private final List<GlideString> keys;

/** The arguments for the script. */
@Singular @Getter private final List<GlideString> args;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ScriptResolver {
* @param code The Lua script
* @return String representing the saved hash
*/
public static native String storeScript(String code);
public static native String storeScript(byte[] code);

/**
* Unload or drop the stored Lua script from the script cache.
Expand Down
18 changes: 13 additions & 5 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ public <T> CompletableFuture<T> submitNewTransaction(
*/
public <T> CompletableFuture<T> submitScript(
Script script,
List<String> keys,
List<String> args,
List<GlideString> keys,
List<GlideString> args,
RedisExceptionCheckedFunction<Response, T> responseHandler) {

RedisRequest.Builder command = prepareRedisRequest(script, keys, args);
Expand Down Expand Up @@ -254,13 +254,21 @@ protected RedisRequest.Builder prepareRedisRequest(Transaction transaction) {
* adding a callback id.
*/
protected RedisRequest.Builder prepareRedisRequest(
Script script, List<String> keys, List<String> args) {
Script script, List<GlideString> keys, List<GlideString> args) {
return RedisRequest.newBuilder()
.setScriptInvocation(
ScriptInvocation.newBuilder()
.setHash(script.getHash())
.addAllKeys(keys)
.addAllArgs(args)
.addAllKeys(
keys.stream()
.map(GlideString::getBytes)
.map(ByteString::copyFrom)
.collect(Collectors.toList()))
.addAllArgs(
args.stream()
.map(GlideString::getBytes)
.map(ByteString::copyFrom)
.collect(Collectors.toList()))
.build());
}

Expand Down
Loading
Loading