Skip to content

Commit

Permalink
Python: add FCALL command (#1739)
Browse files Browse the repository at this point in the history
* Start implementing fcall

* Implement cluster command for fcall

* Adjust documentation for cluster command implementation of fcall

* Implement standalone command for fcall

* Rework API for fcall to avoid overloads

* Add transaction FCALL implementation

* Adjust API for FCALL and also add tests

* Align API with FCALL_RO implementation

* Move fcall command implementations to appropriate locations

* Fix indentation issue

* Try to fix tests

* Run black

* Fix intepreter errors in tests

* Add cross-slot notice
  • Loading branch information
jonathanl-bq authored Jul 1, 2024
1 parent ad81545 commit 4f17b69
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 19 deletions.
37 changes: 37 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,43 @@ async def function_delete(
),
)

async def fcall_route(
self,
function: str,
arguments: Optional[List[str]] = None,
route: Optional[Route] = None,
) -> TClusterResponse[TResult]:
"""
Invokes a previously loaded function.
See https://redis.io/commands/fcall/ for more details.
Args:
function (str): The function name.
arguments (Optional[List[str]]): A list of `function` arguments. `Arguments`
should not represent names of keys.
route (Optional[Route]): The command will be routed to a random primay node, unless `route` is provided, in which
case the client will route the command to the nodes defined by `route`. Defaults to None.
Returns:
TClusterResponse[TResult]:
If a single node route is requested, returns a Optional[TResult] representing the function's return value.
Otherwise, returns a dict of [str , Optional[TResult]] where each key contains the address of
the queried node and the value contains the function's return value.
Example:
>>> await client.fcall("Deep_Thought", ["Answer", "to", "the", "Ultimate", "Question", "of", "Life,", "the", "Universe,", "and", "Everything"], RandomNode())
'new_value' # Returns the function's return value.
Since: Redis version 7.0.0.
"""
args = [function, "0"]
if arguments is not None:
args.extend(arguments)
return cast(
TClusterResponse[TResult],
await self._execute_command(RequestType.FCall, args, route),
)

async def fcall_ro_route(
self,
function: str,
Expand Down
42 changes: 42 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5700,6 +5700,48 @@ async def hscan(
await self._execute_command(RequestType.HScan, args),
)

async def fcall(
self,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
) -> TResult:
"""
Invokes a previously loaded function.
See https://redis.io/commands/fcall/ for more details.
When in cluster mode, all keys in `keys` must map to the same hash slot.
Args:
function (str): The function name.
keys (Optional[List[str]]): A list of keys accessed by the function. To ensure the correct
execution of functions, both in standalone and clustered deployments, all names of keys
that a function accesses must be explicitly provided as `keys`.
arguments (Optional[List[str]]): A list of `function` arguments. `Arguments`
should not represent names of keys.
Returns:
TResult:
The invoked function's return value.
Example:
>>> await client.fcall("Deep_Thought")
'new_value' # Returns the function's return value.
Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
args.extend(arguments)
return cast(
TResult,
await self._execute_command(RequestType.FCall, args),
)

async def fcall_ro(
self,
function: str,
Expand Down
33 changes: 33 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,39 @@ def function_delete(self: TTransaction, library_name: str) -> TTransaction:
[library_name],
)

def fcall(
self: TTransaction,
function: str,
keys: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
) -> TTransaction:
"""
Invokes a previously loaded function.
See https://redis.io/commands/fcall/ for more details.
Args:
function (str): The function name.
keys (Optional[List[str]]): A list of keys accessed by the function. To ensure the correct
execution of functions, both in standalone and clustered deployments, all names of keys
that a function accesses must be explicitly provided as `keys`.
arguments (Optional[List[str]]): A list of `function` arguments. `Arguments`
should not represent names of keys.
Command Response:
TResult:
The invoked function's return value.
Since: Redis version 7.0.0.
"""
args = []
if keys is not None:
args.extend([function, str(len(keys))] + keys)
else:
args.extend([function, str(0)])
if arguments is not None:
args.extend(arguments)
return self.append_command(RequestType.FCall, args)

def fcall_ro(
self: TTransaction,
function: str,
Expand Down
58 changes: 39 additions & 19 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7041,7 +7041,6 @@ async def test_object_refcount(self, redis_client: TGlideClient):
@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_function_load(self, redis_client: TGlideClient):
# TODO: Test function with FCALL
# TODO: Test with FUNCTION LIST
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
Expand All @@ -7053,11 +7052,7 @@ async def test_function_load(self, redis_client: TGlideClient):

assert await redis_client.function_load(code) == lib_name.encode()

# TODO: change when FCALL is implemented
assert (
await redis_client.custom_command(["FCALL", func_name, "0", "one", "two"])
== b"one"
)
assert await redis_client.fcall(func_name, arguments=["one", "two"]) == b"one"
assert (
await redis_client.fcall_ro(func_name, arguments=["one", "two"]) == b"one"
)
Expand All @@ -7080,7 +7075,7 @@ async def test_function_load(self, redis_client: TGlideClient):

assert await redis_client.function_load(new_code, True) == lib_name.encode()

# TODO: add when FCALL is implemented
assert await redis_client.fcall(func2_name, arguments=["one", "two"]) == 2
assert await redis_client.fcall_ro(func2_name, arguments=["one", "two"]) == 2

assert await redis_client.function_flush(FlushMode.SYNC) is OK
Expand All @@ -7091,7 +7086,6 @@ async def test_function_load(self, redis_client: TGlideClient):
async def test_function_load_cluster_with_route(
self, redis_client: GlideClusterClient, single_route: bool
):
# TODO: Test function with FCALL
# TODO: Test with FUNCTION LIST
min_version = "7.0.0"
if await check_if_server_version_lt(redis_client, min_version):
Expand All @@ -7104,14 +7098,17 @@ async def test_function_load_cluster_with_route(

assert await redis_client.function_load(code, False, route) == lib_name.encode()

# TODO: change when FCALL is implemented.
assert (
await redis_client.custom_command(
["FCALL", func_name, "0", "one", "two"],
SlotKeyRoute(SlotType.PRIMARY, "1"),
)
== b"one"
result = await redis_client.fcall_route(
func_name, arguments=["one", "two"], route=route
)

if single_route:
assert result == b"one"
else:
assert isinstance(result, dict)
for nodeResponse in result.values():
assert nodeResponse == b"one"

result = await redis_client.fcall_ro_route(
func_name, arguments=["one", "two"], route=route
)
Expand Down Expand Up @@ -7143,7 +7140,17 @@ async def test_function_load_cluster_with_route(
await redis_client.function_load(new_code, True, route) == lib_name.encode()
)

# TODO: add when FCALL is implemented.
result = await redis_client.fcall_route(
func2_name, arguments=["one", "two"], route=route
)

if single_route:
assert result == 2
else:
assert isinstance(result, dict)
for nodeResponse in result.values():
assert nodeResponse == 2

result = await redis_client.fcall_ro_route(
func2_name, arguments=["one", "two"], route=route
)
Expand Down Expand Up @@ -7291,25 +7298,32 @@ async def test_fcall_with_key(self, redis_client: GlideClusterClient):
assert await redis_client.function_flush(FlushMode.SYNC, route) is OK
assert await redis_client.function_load(code, False, route) == lib_name.encode()

# TODO: add when FCALL is implemented.
assert (
await redis_client.fcall(func_name, keys=keys, arguments=[])
== key1.encode()
)

assert (
await redis_client.fcall_ro(func_name, keys=keys, arguments=[])
== key1.encode()
)

transaction = ClusterTransaction()
# TODO: add when FCALL is implemented.

transaction.fcall(func_name, keys=keys, arguments=[])
transaction.fcall_ro(func_name, keys=keys, arguments=[])

# check response from a routed transaction request
result = await redis_client.exec(transaction, route)
assert result is not None
assert result[0] == key1.encode()
assert result[1] == key1.encode()

# if no route given, GLIDE should detect it automatically
result = await redis_client.exec(transaction)
assert result is not None
assert result[0] == key1.encode()
assert result[1] == key1.encode()

assert await redis_client.function_flush(FlushMode.SYNC, route) is OK

Expand All @@ -7332,7 +7346,12 @@ async def test_fcall_readonly_function(self, redis_client: GlideClusterClient):
assert await redis_client.function_load(code, False) == lib_name.encode()

# On a replica node should fail, because a function isn't guaranteed to be RO
# TODO: add when FCALL is implemented.
with pytest.raises(RequestError) as e:
assert await redis_client.fcall_route(
func_name, arguments=[], route=replicaRoute
)
assert "You can't write against a read only replica." in str(e)

with pytest.raises(RequestError) as e:
assert await redis_client.fcall_ro_route(
func_name, arguments=[], route=replicaRoute
Expand Down Expand Up @@ -8020,6 +8039,7 @@ async def test_multi_key_command_returns_cross_slot_error(
redis_client.lcs("abc", "def"),
redis_client.lcs_len("abc", "def"),
redis_client.lcs_idx("abc", "def"),
redis_client.fcall("func", ["abc", "zxy", "lkn"], []),
redis_client.fcall_ro("func", ["abc", "zxy", "lkn"], []),
]
)
Expand Down
4 changes: 4 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ async def transaction_test(
args.append(lib_name.encode())
transaction.function_load(code, True)
args.append(lib_name.encode())
transaction.fcall(func_name, [], arguments=["one", "two"])
args.append(b"one")
transaction.fcall(func_name, [key], arguments=["one", "two"])
args.append(b"one")
transaction.fcall_ro(func_name, [], arguments=["one", "two"])
args.append(b"one")
transaction.fcall_ro(func_name, [key], arguments=["one", "two"])
Expand Down

0 comments on commit 4f17b69

Please sign in to comment.