Skip to content

Commit

Permalink
Python: add BZMPOP command (#1412)
Browse files Browse the repository at this point in the history
* Python: add BZMPOP command (#273)

* Update CHANGELOG with PR link

* Apply relevant PR suggestions from other PRs

* PR suggestions

* Use Mapping instead of Dict

* Minor documentation update

* PR suggestions

* Fix mypy errors

* Fix tests

* Minor doc change

* Align docs and cross-slot tests with recent changes

* Minor doc fix

* Only run cross-slot test for bzmpop if redis version is >= 7.0.0

* Fix formatting

* Fix mypy

* Fix crossslot test
  • Loading branch information
aaron-congo authored May 17, 2024
1 parent 9712df2 commit 642be9c
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Python: Added BZPOPMIN and BZPOPMAX commands ([#1399](https://github.com/aws/glide-for-redis/pull/1399))
* Python: Added ZUNIONSTORE, ZINTERSTORE commands ([#1388](https://github.com/aws/glide-for-redis/pull/1388))
* Python: Added ZRANDMEMBER command ([#1413](https://github.com/aws/glide-for-redis/pull/1413))
* Python: Added BZMPOP command ([#1412](https://github.com/aws/glide-for-redis/pull/1412))


#### Fixes
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
RangeByLex,
RangeByScore,
ScoreBoundary,
ScoreFilter,
)
from glide.async_commands.transaction import ClusterTransaction, Transaction
from glide.config import (
Expand Down Expand Up @@ -100,6 +101,7 @@
"RangeByIndex",
"RangeByLex",
"RangeByScore",
"ScoreFilter",
"StreamAddOptions",
"StreamTrimOptions",
"TrimByMaxLen",
Expand Down
58 changes: 56 additions & 2 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
from abc import ABC, abstractmethod
from collections.abc import Mapping
from datetime import datetime, timedelta
from enum import Enum
from typing import (
Dict,
List,
Mapping,
Optional,
Protocol,
Set,
Expand All @@ -24,6 +24,7 @@
RangeByLex,
RangeByScore,
ScoreBoundary,
ScoreFilter,
_create_z_cmd_store_args,
_create_zrange_args,
_create_zrangestore_args,
Expand Down Expand Up @@ -2841,7 +2842,7 @@ async def zdiff_withscores(self, keys: List[str]) -> Mapping[str, float]:
keys (List[str]): The keys of the sorted sets.
Returns:
Mapping[str, float]: A dictionary of elements and their scores representing the difference between the sorted
Mapping[str, float]: A mapping of elements and their scores representing the difference between the sorted
sets.
If the first `key` does not exist, it is treated as an empty sorted set, and the command returns an
empty list.
Expand Down Expand Up @@ -3067,6 +3068,59 @@ async def zrandmember_withscores(
),
)

async def bzmpop(
self,
keys: List[str],
modifier: ScoreFilter,
timeout: float,
count: Optional[int] = None,
) -> Optional[List[Union[str, Mapping[str, float]]]]:
"""
Pops a member-score pair from the first non-empty sorted set, with the given keys being checked in the order
that they are given. Blocks the connection when there are no members to pop from any of the given sorted sets.
The optional `count` argument can be used to specify the number of elements to pop, and is set to 1 by default.
The number of popped elements is the minimum from the sorted set's cardinality and `count`.
`BZMPOP` is the blocking variant of `ZMPOP`.
See https://valkey.io/commands/bzmpop for more details.
Notes:
1. When in cluster mode, all `keys` must map to the same hash slot.
2. `BZMPOP` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
Args:
keys (List[str]): The keys of the sorted sets.
modifier (ScoreFilter): The element pop criteria - either ScoreFilter.MIN or ScoreFilter.MAX to pop
members with the lowest/highest scores accordingly.
timeout (float): The number of seconds to wait for a blocking operation to complete. A value of 0 will
block indefinitely.
count (Optional[int]): The number of elements to pop.
Returns:
Optional[List[Union[str, Mapping[str, float]]]]: A two-element list containing the key name of the set from
which elements were popped, and a member-score mapping of the popped elements. If no members could be
popped and the timeout expired, returns None.
Examples:
>>> await client.zadd("zSet1", {"one": 1.0, "two": 2.0, "three": 3.0})
>>> await client.zadd("zSet2", {"four": 4.0})
>>> await client.bzmpop(["zSet1", "zSet2"], ScoreFilter.MAX, 0.5, 2)
['zSet1', {'three': 3.0, 'two': 2.0}] # "three" with score 3.0 and "two" with score 2.0 were popped from "zSet1".
Since: Redis version 7.0.0.
"""
args = [str(timeout), str(len(keys))] + keys + [modifier.value]
if count is not None:
args = args + ["COUNT", str(count)]

return cast(
Optional[List[Union[str, Mapping[str, float]]]],
await self._execute_command(RequestType.BZMPop, args),
)

async def invoke_script(
self,
script: Script,
Expand Down
18 changes: 18 additions & 0 deletions python/python/glide/async_commands/sorted_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ class AggregationType(Enum):
"""


class ScoreFilter(Enum):
"""
Defines which elements to pop from a sorted set.
ScoreFilter is a mandatory option for ZMPOP (https://valkey.io/commands/zmpop)
and BZMPOP (https://valkey.io/commands/bzmpop).
"""

MIN = "MIN"
"""
Pop elements with the lowest scores.
"""
MAX = "MAX"
"""
Pop elements with the highest scores.
"""


class ScoreBoundary:
"""
Represents a specific numeric score boundary in a sorted set.
Expand Down
46 changes: 45 additions & 1 deletion python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
RangeByLex,
RangeByScore,
ScoreBoundary,
ScoreFilter,
_create_z_cmd_store_args,
_create_zrange_args,
_create_zrangestore_args,
Expand Down Expand Up @@ -2020,7 +2021,7 @@ def zdiff_withscores(self: TTransaction, keys: List[str]) -> TTransaction:
keys (List[str]): The keys of the sorted sets.
Command response:
Mapping[str, float]: A dictionary of elements and their scores representing the difference between the sorted sets.
Mapping[str, float]: A mapping of elements and their scores representing the difference between the sorted sets.
If the first `key` does not exist, it is treated as an empty sorted set, and the command returns an
empty list.
"""
Expand Down Expand Up @@ -2162,6 +2163,49 @@ def zrandmember_withscores(
RequestType.ZRandMember, [key, str(count), "WITHSCORES"]
)

def bzmpop(
self: TTransaction,
keys: List[str],
modifier: ScoreFilter,
timeout: float,
count: Optional[int] = None,
) -> TTransaction:
"""
Pops a member-score pair from the first non-empty sorted set, with the given keys being checked in the order
that they are given. Blocks the connection when there are no members to pop from any of the given sorted sets.
The optional `count` argument can be used to specify the number of elements to pop, and is set to 1 by default.
The number of popped elements is the minimum from the sorted set's cardinality and `count`.
`BZMPOP` is the blocking variant of `ZMPOP`.
See https://valkey.io/commands/bzmpop for more details.
Note:
`BZMPOP` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
Args:
keys (List[str]): The keys of the sorted sets.
modifier (ScoreFilter): The element pop criteria - either ScoreFilter.MIN or ScoreFilter.MAX to pop
members with the lowest/highest scores accordingly.
timeout (float): The number of seconds to wait for a blocking operation to complete. A value of 0 will
block indefinitely.
count (Optional[int]): The number of elements to pop.
Command response:
Optional[List[Union[str, Mapping[str, float]]]]: A two-element list containing the key name of the set from
which elements were popped, and a member-score mapping. If no members could be popped and the timeout
expired, returns None.
Since: Redis version 7.0.0.
"""
args = [str(timeout), str(len(keys))] + keys + [modifier.value]
if count is not None:
args = args + ["COUNT", str(count)]

return self.append_command(RequestType.BZMPop, args)

def dbsize(self: TTransaction) -> TTransaction:
"""
Returns the number of keys in the currently selected database.
Expand Down
99 changes: 82 additions & 17 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import asyncio
import math
import time
from collections.abc import Mapping
from datetime import datetime, timedelta, timezone
from typing import Dict, Union, cast
from typing import Any, Dict, Union, cast

import pytest
from glide import ClosingError, RequestError, Script
Expand Down Expand Up @@ -34,6 +35,7 @@
RangeByLex,
RangeByScore,
ScoreBoundary,
ScoreFilter,
)
from glide.config import ProtocolVersion, RedisCredentials
from glide.constants import OK, TResult
Expand Down Expand Up @@ -884,9 +886,8 @@ async def test_blpop(self, redis_client: TRedisClient):
value_list = [value1, value2]

assert await redis_client.lpush(key1, value_list) == 2
# ensure that command doesn't time out even if timeout > request timeout (250ms by default)
assert await redis_client.blpop([key1, key2], 0.5) == [key1, value2]

# ensure that command doesn't time out even if timeout > request timeout (250ms by default)
assert await redis_client.blpop(["non_existent_key"], 0.5) is None

# key exists, but not a list
Expand Down Expand Up @@ -1882,12 +1883,6 @@ async def test_zinterstore(self, redis_client: TRedisClient):
await redis_client.zinterstore("{xyz}", [])
assert "wrong number of arguments" in str(e)

# Cross slot query
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.zinterstore("{xyz}", ["{abc}", "{def}"])
assert "CrossSlot" in str(e)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zunionstore(self, redis_client: TRedisClient):
Expand Down Expand Up @@ -1971,12 +1966,6 @@ async def test_zunionstore(self, redis_client: TRedisClient):
await redis_client.zunionstore("{xyz}", [])
assert "wrong number of arguments" in str(e)

# Cross slot query
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.zunionstore("{xyz}", ["{abc}", "{def}"])
assert "CrossSlot" in str(e)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zpopmin(self, redis_client: TRedisClient):
Expand Down Expand Up @@ -2651,6 +2640,73 @@ async def test_zdiffstore(self, redis_client: TRedisClient):
with pytest.raises(RequestError):
await redis_client.zdiffstore(key4, [string_key, key1])

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzmpop(self, redis_client: TRedisClient):
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
return pytest.mark.skip(reason=f"Redis version required >= {min_version}")

key1 = f"{{test}}-1-f{get_random_string(10)}"
key2 = f"{{test}}-2-f{get_random_string(10)}"
non_existing_key = f"{{test}}-non_existing_key"
string_key = f"{{test}}-3-f{get_random_string(10)}"

assert await redis_client.zadd(key1, {"a1": 1, "b1": 2}) == 2
assert await redis_client.zadd(key2, {"a2": 0.1, "b2": 0.2}) == 2

assert await redis_client.bzmpop([key1, key2], ScoreFilter.MAX, 0.1) == [
key1,
{"b1": 2},
]
assert await redis_client.bzmpop([key2, key1], ScoreFilter.MAX, 0.1, 10) == [
key2,
{"b2": 0.2, "a2": 0.1},
]

# ensure that command doesn't time out even if timeout > request timeout (250ms by default)
assert (
await redis_client.bzmpop([non_existing_key], ScoreFilter.MIN, 0.5) is None
)
assert (
await redis_client.bzmpop([non_existing_key], ScoreFilter.MIN, 0.55, 1)
is None
)

# key exists, but it is not a sorted set
assert await redis_client.set(string_key, "value") == OK
with pytest.raises(RequestError):
await redis_client.bzmpop([string_key], ScoreFilter.MAX, 0.1)
with pytest.raises(RequestError):
await redis_client.bzmpop([string_key], ScoreFilter.MAX, 0.1, 1)

# incorrect argument: key list should not be empty
with pytest.raises(RequestError):
assert await redis_client.bzmpop([], ScoreFilter.MAX, 0.1, 1)

# incorrect argument: count should be greater than 0
with pytest.raises(RequestError):
assert await redis_client.bzmpop([key1], ScoreFilter.MAX, 0.1, 0)

# check that order of entries in the response is preserved
entries = {}
for i in range(0, 10):
entries.update({f"a{i}": float(i)})

assert await redis_client.zadd(key2, entries) == 10
result = await redis_client.bzmpop([key2], ScoreFilter.MIN, 0.1, 10)
assert result is not None
result_map = cast(Mapping[str, float], result[1])
assert compare_maps(entries, result_map) is True

async def endless_bzmpop_call():
await redis_client.bzmpop(["non_existent_key"], ScoreFilter.MAX, 0)

# bzmpop is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzmpop_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zrandmember(self, redis_client: TRedisClient):
Expand Down Expand Up @@ -2878,17 +2934,26 @@ class TestMultiKeyCommandCrossSlot:
async def test_multi_key_command_returns_cross_slot_error(
self, redis_client: RedisClusterClient
):
for promise in [
promises: list[Any] = [
redis_client.blpop(["abc", "zxy", "lkn"], 0.1),
redis_client.brpop(["abc", "zxy", "lkn"], 0.1),
redis_client.rename("abc", "zxy"),
redis_client.zdiffstore("abc", ["zxy", "lkn"]),
redis_client.zdiff(["abc", "zxy", "lkn"]),
redis_client.zdiff_withscores(["abc", "zxy", "lkn"]),
redis_client.zrangestore("abc", "zxy", RangeByIndex(0, -1)),
redis_client.zinterstore("{xyz}", ["{abc}", "{def}"]),
redis_client.zunionstore("{xyz}", ["{abc}", "{def}"]),
redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5),
redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5),
]:
]

if not check_if_server_version_lt(redis_client, "7.0.0"):
promises.extend(
[redis_client.bzmpop(["abc", "zxy", "lkn"], ScoreFilter.MAX, 0.1)]
)

for promise in promises:
with pytest.raises(RequestError) as e:
await promise
assert "crossslot" in str(e).lower()
Expand Down
12 changes: 12 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
LexBoundary,
RangeByIndex,
ScoreBoundary,
ScoreFilter,
)
from glide.async_commands.transaction import (
BaseTransaction,
Expand Down Expand Up @@ -50,6 +51,7 @@ async def transaction_test(
key13 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # sorted set
key14 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # sorted set
key15 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # sorted set
key16 = "{{{}}}:{}".format(keyslot, get_random_string(3)) # sorted set

value = datetime.now(timezone.utc).strftime("%m/%d/%Y, %H:%M:%S")
value2 = get_random_string(5)
Expand Down Expand Up @@ -300,6 +302,16 @@ async def transaction_test(
args.append("0-2")
transaction.xtrim(key11, TrimByMinId(threshold="0-2", exact=True))
args.append(1)

min_version = "7.0.0"
if not await check_if_server_version_lt(redis_client, min_version):
transaction.zadd(key16, {"a": 1, "b": 2, "c": 3, "d": 4})
args.append(4)
transaction.bzmpop([key16], ScoreFilter.MAX, 0.1)
args.append([key16, {"d": 4.0}])
transaction.bzmpop([key16], ScoreFilter.MIN, 0.1, 2)
args.append([key16, {"a": 1.0, "b": 2.0}])

return args


Expand Down

0 comments on commit 642be9c

Please sign in to comment.