Skip to content

Commit

Permalink
changed from hash to nanoid
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh committed Jun 28, 2024
1 parent f8333c7 commit 20aea16
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 32 deletions.
1 change: 1 addition & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ directories = { version = "4.0", optional = true }
once_cell = "1.18.0"
arcstr = "1.1.5"
sha1_smol = "1.0.0"
nanoid = "0.4.0"

[features]
socket-layer = ["directories", "integer-encoding", "num_cpus", "protobuf", "tokio-util", "bytes"]
Expand Down
13 changes: 8 additions & 5 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ impl Client {
// to continue the scan called ScanState.
// In order to avoid passing Rust GC to clean the ScanState when the cursor (ref) is passed to the warp layer,
// which mean that Rust layer is not aware of the cursor anymore, we need to keep the ScanState alive.
// We do that by storing the ScanState in a global container, and return a hash of the cursor to the warp layer.
// We do that by storing the ScanState in a global container, and return a cursor-id of the cursor to the warp layer.
//
// The warp layer create an object contain the hash with a drop function that will remove the cursor from the container.
// When the ref is removed from the hash map, there's no more references to the ScanState, and the GC will clean it.
// The warp layer create an object contain the cursor-id with a drop function that will remove the cursor from the container.
// When the ref is removed from the hash-map, there's no more references to the ScanState, and the GC will clean it.
pub async fn cluster_scan<'a>(
&'a mut self,
scan_state_cursor: &'a ScanStateRC,
Expand All @@ -252,13 +252,16 @@ impl Client {
)
.await?;

let cluster_hash = if cursor.is_finished() {
let cluster_cursor_id = if cursor.is_finished() {
"finished".to_string()
} else {
insert_cluster_scan_cursor(cursor)
};
convert_to_expected_type(
Value::Array(vec![Value::SimpleString(cluster_hash), Value::Array(keys)]),
Value::Array(vec![
Value::SimpleString(cluster_cursor_id),
Value::Array(keys),
]),
Some(ExpectedReturnType::ClusterScanReturnType {
cursor: &ExpectedReturnType::BulkString,
keys: &ExpectedReturnType::ArrayOfStrings,
Expand Down
16 changes: 8 additions & 8 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,17 +1016,17 @@ mod tests {
// Test conversion of cluster scan return type
#[test]
fn convert_cluster_scan_return_type() {
let cluster_hash = "hash".to_string();
let cluster_cursor_id = "id".to_string();
let keys = Value::Array(vec![
Value::BulkString("key:340".to_string().into_bytes()),
Value::BulkString("value:341".to_string().into_bytes()),
]);
let value = Value::Array(vec![
Value::SimpleString(cluster_hash.clone()),
Value::SimpleString(cluster_cursor_id.clone()),
keys.clone(),
]);
let expected_result = Value::Array(vec![
Value::BulkString(cluster_hash.clone().into_bytes()),
Value::BulkString(cluster_cursor_id.clone().into_bytes()),
keys.clone(),
]);
assert_eq!(
Expand All @@ -1043,14 +1043,14 @@ mod tests {
}
#[test]
fn convert_cluster_scan_return_finished() {
let cluster_hash = "finished".to_string();
let cluster_cursor_id = "finished".to_string();
let keys = Value::Array(vec![]);
let value = Value::Array(vec![
Value::SimpleString(cluster_hash.clone()),
Value::SimpleString(cluster_cursor_id.clone()),
keys.clone(),
]);
let expected_result = Value::Array(vec![
Value::BulkString(cluster_hash.clone().into_bytes()),
Value::BulkString(cluster_cursor_id.clone().into_bytes()),
keys.clone(),
]);
assert_eq!(
Expand All @@ -1068,8 +1068,8 @@ mod tests {

#[test]
fn convert_cluster_scan_fail_on_bad_return() {
let cluster_hash = "hash".to_string();
let value = Value::Array(vec![Value::SimpleString(cluster_hash.clone())]);
let cluster_cursor_id = "id".to_string();
let value = Value::Array(vec![Value::SimpleString(cluster_cursor_id.clone())]);
assert!(convert_to_expected_type(
value,
Some(ExpectedReturnType::ClusterScanReturnType {
Expand Down
27 changes: 13 additions & 14 deletions glide-core/src/cluster_scan_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,43 @@
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
*/
use logger_core::log_info;
use nanoid::nanoid;
use once_cell::sync::Lazy;
use redis::{RedisResult, ScanStateRC};
use sha1_smol::Sha1;
use std::{collections::HashMap, sync::Mutex};

// This is a container for storing the cursor of a cluster scan.
// The cursor for a cluster scan is a ref to the actual ScanState struct in redis-rs.
// In order to avoid dropping it when it is passed between layers of the application,
// we store it in this container and only pass the hash of the cursor.
// The cursor is stored in the container and can be retrieved using the hash.
// In wrapper layer we wrap the hash in an object, which, when dropped, trigger the removal of the cursor from the container.
// we store it in this container and only pass the id of the cursor.
// The cursor is stored in the container and can be retrieved using the id.
// In wrapper layer we wrap the id in an object, which, when dropped, trigger the removal of the cursor from the container.
// When the ref is removed from the container, the actual ScanState struct is dropped by Rust GC.

static CONTAINER: Lazy<Mutex<HashMap<String, ScanStateRC>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn insert_cluster_scan_cursor(scan_state: ScanStateRC) -> String {
let hash = Sha1::new();
let hash = hash.digest().to_string();
CONTAINER.lock().unwrap().insert(hash.clone(), scan_state);
hash
let id = nanoid!();
CONTAINER.lock().unwrap().insert(id.clone(), scan_state);
id
}

pub fn get_cluster_scan_cursor(hash: String) -> RedisResult<ScanStateRC> {
let scan_state_rc = CONTAINER.lock().unwrap().get(&hash).cloned();
pub fn get_cluster_scan_cursor(id: String) -> RedisResult<ScanStateRC> {
let scan_state_rc = CONTAINER.lock().unwrap().get(&id).cloned();
match scan_state_rc {
Some(scan_state_rc) => Ok(scan_state_rc),
None => Err(redis::RedisError::from((
redis::ErrorKind::ResponseError,
"Invalid scan_state_cursor hash",
"Invalid scan_state_cursor id",
))),
}
}

pub fn remove_scan_state_cursor(hash: String) {
pub fn remove_scan_state_cursor(id: String) {
log_info(
"scan_state_cursor lifetime",
format!("Removed scan_state_cursor with hash: `{hash}`"),
format!("Removed scan_state_cursor with id: `{id}`"),
);
CONTAINER.lock().unwrap().remove(&hash);
CONTAINER.lock().unwrap().remove(&id);
}
2 changes: 1 addition & 1 deletion glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ async fn send_command(
async fn cluster_scan(cluster_scan: ClusterScan, mut client: Client) -> ClientUsageResult<Value> {
// Since we don't send the cluster scan as a usual command, but throw a special function in redis-rs library,
// we need to handle the command separately.
// Especially, we need to handle the cursor, which is not the cursor of the ValKey command, but the a hash of the ref
// Especially, we need to handle the cursor, which is not the cursor of the ValKey command, but the a id of the ref
// to the ScanState in redis-rs stored in the cluster scan container.
// We need to get the ref from the table or create a new one if the cursor is empty.
let cursor = cluster_scan.cursor.into();
Expand Down
2 changes: 1 addition & 1 deletion python/python/glide/glide_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ async def _cluster_scan(
)
request = RedisRequest()
request.callback_idx = self._get_callback_index()
# Take out the hash string from the wrapping object
# Take out the id string from the wrapping object
cursor_str = cursor.get_cursor()
if cursor_str is not None:
request.cluster_scan.cursor = cursor_str
Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6770,7 +6770,7 @@ async def test_cluster_scan_cleaning_cursor(self, redis_client: GlideClusterClie
new_cursor_with_same_id = ClusterScanCursor(cursor_string)
with pytest.raises(RequestError) as e_info:
await redis_client.scan(new_cursor_with_same_id)
assert "Invalid scan_state_cursor hash" in str(e_info.value)
assert "Invalid scan_state_cursor id" in str(e_info.value)

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
Expand Down
4 changes: 2 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl Level {

/// This struct is used to keep track of the cursor of a cluster scan.
/// We want to avoid passing the cursor between layers of the application,
/// So we keep the state in the container and only pass the hash of the cursor.
/// The cursor is stored in the container and can be retrieved using the hash.
/// So we keep the state in the container and only pass the id of the cursor.
/// The cursor is stored in the container and can be retrieved using the id.
/// The cursor is removed from the container when the object is deleted (dropped).
#[pyclass]
pub struct ClusterScanCursor {
Expand Down

0 comments on commit 20aea16

Please sign in to comment.