Skip to content

Commit

Permalink
Merge pull request valkey-io#2027 from Bit-Quill/node/integ_cyip10_bl…
Browse files Browse the repository at this point in the history
…move

Node: add BLMOVE
  • Loading branch information
cyip10 authored Jul 30, 2024
2 parents 9a7df38 + d24f0ad commit 59aef04
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added BLMOVE command ([#2027](https://github.com/valkey-io/valkey-glide/pull/2027))
* Node: Exported client configuration types ([#2023](https://github.com/valkey-io/valkey-glide/pull/2023))
* Java, Python: Update docs for GEOSEARCH command ([#2017](https://github.com/valkey-io/valkey-glide/pull/2017))
* Python: Update docs for BITFIELD and BITFIELD_RO commands ([#2048](https://github.com/valkey-io/valkey-glide/pull/2048))
Expand Down
48 changes: 48 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import {
createLInsert,
createLLen,
createLMove,
createBLMove,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -1695,6 +1696,53 @@ export class BaseClient {
);
}

/**
* Blocks the connection until it pops atomically and removes the left/right-most element to the
* list stored at `source` depending on `whereFrom`, and pushes the element at the first/last element
* of the list stored at `destination` depending on `whereTo`.
* `BLMOVE` is the blocking variant of {@link lmove}.
*
* @remarks
* 1. When in cluster mode, both `source` and `destination` must map to the same hash slot.
* 2. `BLMOVE` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
*
* See https://valkey.io/commands/blmove/ for details.
*
* @param source - The key to the source list.
* @param destination - The key to the destination list.
* @param whereFrom - The {@link ListDirection} to remove the element from.
* @param whereTo - The {@link ListDirection} to add the element to.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
* @returns The popped element, or `null` if `source` does not exist or if the operation timed-out.
*
* since Valkey version 6.2.0.
*
* @example
* ```typescript
* await client.lpush("testKey1", ["two", "one"]);
* await client.lpush("testKey2", ["four", "three"]);
* const result = await client.blmove("testKey1", "testKey2", ListDirection.LEFT, ListDirection.LEFT, 0.1);
* console.log(result); // Output: "one"
*
* const result2 = await client.lrange("testKey1", 0, -1);
* console.log(result2); // Output: "two"
*
* const updated_array2 = await client.lrange("testKey2", 0, -1);
* console.log(updated_array2); // Output: ["one", "three", "four"]
* ```
*/
public async blmove(
source: string,
destination: string,
whereFrom: ListDirection,
whereTo: ListDirection,
timeout: number,
): Promise<string | null> {
return this.createWritePromise(
createBLMove(source, destination, whereFrom, whereTo, timeout),
);
}

/**
* Sets the list element at `index` to `element`.
* The index is zero-based, so `0` means the first element, `1` the second element and so on.
Expand Down
19 changes: 19 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,25 @@ export function createLMove(
]);
}

/**
* @internal
*/
export function createBLMove(
source: string,
destination: string,
whereFrom: ListDirection,
whereTo: ListDirection,
timeout: number,
): command_request.Command {
return createCommand(RequestType.BLMove, [
source,
destination,
whereFrom,
whereTo,
timeout.toString(),
]);
}

/**
* @internal
*/
Expand Down
36 changes: 36 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import {
createLInsert,
createLLen,
createLMove,
createBLMove,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -873,6 +874,41 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
);
}

/**
*
* Blocks the connection until it pops atomically and removes the left/right-most element to the
* list stored at `source` depending on `whereFrom`, and pushes the element at the first/last element
* of the list stored at `destination` depending on `whereTo`.
* `BLMOVE` is the blocking variant of {@link lmove}.
*
* @remarks
* 1. When in cluster mode, both `source` and `destination` must map to the same hash slot.
* 2. `BLMOVE` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
*
* See https://valkey.io/commands/blmove/ for details.
*
* @param source - The key to the source list.
* @param destination - The key to the destination list.
* @param whereFrom - The {@link ListDirection} to remove the element from.
* @param whereTo - The {@link ListDirection} to add the element to.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
*
* Command Response - The popped element, or `null` if `source` does not exist or if the operation timed-out.
*
* since Valkey version 6.2.0.
*/
public blmove(
source: string,
destination: string,
whereFrom: ListDirection,
whereTo: ListDirection,
timeout: number,
): T {
return this.addAndReturn(
createBLMove(source, destination, whereFrom, whereTo, timeout),
);
}

/**
* Sets the list element at `index` to `element`.
* The index is zero-based, so `0` means the first element, `1` the second element and so on.
Expand Down
33 changes: 33 additions & 0 deletions node/tests/RedisClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
transactionTest,
validateTransactionResponse,
} from "./TestUtilities";
import { ListDirection } from "..";

/* eslint-disable @typescript-eslint/no-var-requires */

Expand Down Expand Up @@ -126,6 +127,38 @@ describe("GlideClient", () => {
},
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"check that blocking commands returns never timeout_%p",
async (protocol) => {
client = await GlideClient.createClient(
getClientConfigurationOption(
cluster.getAddresses(),
protocol,
300,
),
);

const blmovePromise = client.blmove(
"source",
"destination",
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
);
const timeoutPromise = new Promise((resolve) => {
setTimeout(resolve, 500);
});

try {
await Promise.race([blmovePromise, timeoutPromise]);
} finally {
Promise.resolve(blmovePromise);
client.close();
}
},
5000,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"select dbsize flushdb test %p",
async (protocol) => {
Expand Down
8 changes: 8 additions & 0 deletions node/tests/RedisClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
FunctionListResponse,
GlideClusterClient,
InfoOptions,
ListDirection,
ProtocolVersion,
Routes,
ScoreFilter,
Expand Down Expand Up @@ -324,6 +325,13 @@ describe("GlideClusterClient", () => {

if (gte(cluster.getVersion(), "6.2.0")) {
promises.push(
client.blmove(
"abc",
"def",
ListDirection.LEFT,
ListDirection.LEFT,
0.2,
),
client.zdiff(["abc", "zxy", "lkn"]),
client.zdiffWithScores(["abc", "zxy", "lkn"]),
client.zdiffstore("abc", ["zxy", "lkn"]),
Expand Down
127 changes: 127 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,133 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`blmove list_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster) => {
if (cluster.checkIfServerVersionLessThan("6.2.0")) {
return;
}

const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const lpushArgs1 = ["2", "1"];
const lpushArgs2 = ["4", "3"];

// Initialize the tests
expect(await client.lpush(key1, lpushArgs1)).toEqual(2);
expect(await client.lpush(key2, lpushArgs2)).toEqual(2);

// Move from LEFT to LEFT with blocking
checkSimple(
await client.blmove(
key1,
key2,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).toEqual("1");

// Move from LEFT to RIGHT with blocking
checkSimple(
await client.blmove(
key1,
key2,
ListDirection.LEFT,
ListDirection.RIGHT,
0.1,
),
).toEqual("2");

checkSimple(await client.lrange(key2, 0, -1)).toEqual([
"1",
"3",
"4",
"2",
]);
checkSimple(await client.lrange(key1, 0, -1)).toEqual([]);

// Move from RIGHT to LEFT non-existing destination with blocking
checkSimple(
await client.blmove(
key2,
key1,
ListDirection.RIGHT,
ListDirection.LEFT,
0.1,
),
).toEqual("2");

checkSimple(await client.lrange(key2, 0, -1)).toEqual([
"1",
"3",
"4",
]);
checkSimple(await client.lrange(key1, 0, -1)).toEqual(["2"]);

// Move from RIGHT to RIGHT with blocking
checkSimple(
await client.blmove(
key2,
key1,
ListDirection.RIGHT,
ListDirection.RIGHT,
0.1,
),
).toEqual("4");

checkSimple(await client.lrange(key2, 0, -1)).toEqual([
"1",
"3",
]);
checkSimple(await client.lrange(key1, 0, -1)).toEqual([
"2",
"4",
]);

// Non-existing source key with blocking
expect(
await client.blmove(
"{key}-non_existing_key" + uuidv4(),
key1,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).toEqual(null);

// Non-list source key with blocking
const key3 = "{key}-3" + uuidv4();
checkSimple(await client.set(key3, "value")).toEqual("OK");
await expect(
client.blmove(
key3,
key1,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).rejects.toThrow(RequestError);

// Non-list destination key
await expect(
client.blmove(
key1,
key3,
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
).rejects.toThrow(RequestError);

// TODO: add test case with 0 timeout (no timeout) should never time out,
// but we wrap the test with timeout to avoid test failing or stuck forever
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lset test_%p`,
async (protocol) => {
Expand Down
19 changes: 14 additions & 5 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,22 @@ export async function transactionTest(
field + "3",
]);

baseTransaction.lpopCount(key5, 2);
responseData.push(["lpopCount(key5, 2)", [field + "2"]]);
} else {
baseTransaction.lpopCount(key5, 2);
responseData.push(["lpopCount(key5, 2)", [field + "3", field + "2"]]);
baseTransaction.blmove(
key20,
key5,
ListDirection.LEFT,
ListDirection.LEFT,
3,
);
responseData.push([
"blmove(key20, key5, ListDirection.LEFT, ListDirection.LEFT, 3)",
field + "3",
]);
}

baseTransaction.lpopCount(key5, 2);
responseData.push(["lpopCount(key5, 2)", [field + "3", field + "2"]]);

baseTransaction.linsert(
key5,
InsertPosition.Before,
Expand Down

0 comments on commit 59aef04

Please sign in to comment.