diff --git a/CHANGELOG.md b/CHANGELOG.md index 0283eb2686..7d595d5328 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,7 +79,7 @@ * Python: Added DUMP and Restore commands ([#1733](https://github.com/aws/glide-for-redis/pull/1733)) * Java: Added SCAN command ([#1751](https://github.com/aws/glide-for-redis/pull/1751)) * Python: Type migration for entries_read ([#1768](https://github.com/aws/glide-for-redis/pull/1768)) - +* Python: Added FUNCTION DUMP and FUNCTION RESTORE commands ([#1769](https://github.com/aws/glide-for-redis/pull/1769)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index 70d490a6b2..53564a5127 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -26,6 +26,7 @@ ExpiryType, ExpiryTypeGetEx, FlushMode, + FunctionRestorePolicy, InfoSection, InsertPosition, UpdateOptions, @@ -144,6 +145,7 @@ "ExpiryType", "ExpiryTypeGetEx", "FlushMode", + "FunctionRestorePolicy", "GeoSearchByBox", "GeoSearchByRadius", "GeoSearchCount", diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index e447ae59a6..98b83f21fb 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -8,6 +8,7 @@ from glide.async_commands.core import ( CoreCommands, FlushMode, + FunctionRestorePolicy, InfoSection, _build_sort_args, ) @@ -554,6 +555,76 @@ async def fcall_ro_route( await self._execute_command(RequestType.FCallReadOnly, args, route), ) + async def function_dump( + self, route: Optional[Route] = None + ) -> TClusterResponse[bytes]: + """ + Returns the serialized payload of all loaded libraries. + + See https://valkey.io/commands/function-dump/ for more details. + + Args: + route (Optional[Route]): The command will be routed to a random node, unless + `route` is provided, in which case the client will route the command to the + nodes defined by `route`. + + Returns: + TClusterResponse[bytes]: The serialized payload of all loaded libraries. + + Examples: + >>> payload = await client.function_dump() + # The serialized payload of all loaded libraries. This response can + # be used to restore loaded functions on any Valkey instance. + >>> await client.function_restore(payload) + "OK" # The serialized dump response was used to restore the libraries. + + Since: Redis 7.0.0. + """ + return cast( + TClusterResponse[bytes], + await self._execute_command(RequestType.FunctionDump, [], route), + ) + + async def function_restore( + self, + payload: TEncodable, + policy: Optional[FunctionRestorePolicy] = None, + route: Optional[Route] = None, + ) -> TOK: + """ + Restores libraries from the serialized payload returned by the `function_dump` command. + + See https://valkey.io/commands/function-restore/ for more details. + + Args: + payload (bytes): The serialized data from the `function_dump` command. + policy (Optional[FunctionRestorePolicy]): A policy for handling existing libraries. + route (Optional[Route]): The command will be sent to all primaries, unless + `route` is provided, in which case the client will route the command to the + nodes defined by `route`. + + Returns: + TOK: OK. + + Examples: + >>> payload = await client.function_dump() + # The serialized payload of all loaded libraries. This response can + # be used to restore loaded functions on any Valkey instance. + >>> await client.function_restore(payload, AllPrimaries()) + "OK" # The serialized dump response was used to restore the libraries with the specified route. + >>> await client.function_restore(payload, FunctionRestorePolicy.FLUSH, AllPrimaries()) + "OK" # The serialized dump response was used to restore the libraries with the specified route and policy. + + Since: Redis 7.0.0. + """ + args: List[TEncodable] = [payload] + if policy is not None: + args.append(policy.value) + + return cast( + TOK, await self._execute_command(RequestType.FunctionRestore, args, route) + ) + async def time( self, route: Optional[Route] = None ) -> TClusterResponse[List[bytes]]: diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index d02ffb2d5f..4169b5d4af 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -307,6 +307,22 @@ class FlushMode(Enum): SYNC = "SYNC" +class FunctionRestorePolicy(Enum): + """ + Options for the FUNCTION RESTORE command. + + - APPEND: Appends the restored libraries to the existing libraries and aborts on collision. This is the + default policy. + - FLUSH: Deletes all existing libraries before restoring the payload. + - REPLACE: Appends the restored libraries to the existing libraries, replacing any existing ones in case + of name collisions. Note that this policy doesn't prevent function name collisions, only libraries. + """ + + APPEND = "APPEND" + FLUSH = "FLUSH" + REPLACE = "REPLACE" + + def _build_sort_args( key: TEncodable, by_pattern: Optional[TEncodable] = None, diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index 925b0dd2a9..83823ca6b9 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -8,6 +8,7 @@ from glide.async_commands.core import ( CoreCommands, FlushMode, + FunctionRestorePolicy, InfoSection, _build_sort_args, ) @@ -361,6 +362,58 @@ async def function_delete(self, library_name: TEncodable) -> TOK: ), ) + async def function_dump(self) -> bytes: + """ + Returns the serialized payload of all loaded libraries. + + See https://valkey.io/docs/latest/commands/function-dump/ for more details. + + Returns: + bytes: The serialized payload of all loaded libraries. + + Examples: + >>> payload = await client.function_dump() + # The serialized payload of all loaded libraries. This response can + # be used to restore loaded functions on any Valkey instance. + >>> await client.function_restore(payload) + "OK" # The serialized dump response was used to restore the libraries. + + Since: Redis 7.0.0. + """ + return cast(bytes, await self._execute_command(RequestType.FunctionDump, [])) + + async def function_restore( + self, payload: TEncodable, policy: Optional[FunctionRestorePolicy] = None + ) -> TOK: + """ + Restores libraries from the serialized payload returned by the `function_dump` command. + + See https://valkey.io/docs/latest/commands/function-restore/ for more details. + + Args: + payload (TEncodable): The serialized data from the `function_dump` command. + policy (Optional[FunctionRestorePolicy]): A policy for handling existing libraries. + + Returns: + TOK: OK. + + Examples: + >>> payload = await client.function_dump() + # The serialized payload of all loaded libraries. This response can + # be used to restore loaded functions on any Valkey instance. + >>> await client.function_restore(payload) + "OK" # The serialized dump response was used to restore the libraries. + >>> await client.function_restore(payload, FunctionRestorePolicy.FLUSH) + "OK" # The serialized dump response was used to restore the libraries with the specified policy. + + Since: Redis 7.0.0. + """ + args: List[TEncodable] = [payload] + if policy is not None: + args.append(policy.value) + + return cast(TOK, await self._execute_command(RequestType.FunctionRestore, args)) + async def time(self) -> List[bytes]: """ Returns the server time. diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 686639340c..b4a9106b02 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -34,6 +34,7 @@ ExpiryType, ExpiryTypeGetEx, FlushMode, + FunctionRestorePolicy, InfBound, InfoSection, InsertPosition, @@ -8047,6 +8048,171 @@ async def test_fcall_readonly_function(self, redis_client: GlideClusterClient): == 42 ) + @pytest.mark.parametrize("cluster_mode", [False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_function_dump_restore_standalone(self, redis_client: GlideClient): + min_version = "7.0.0" + if await check_if_server_version_lt(redis_client, min_version): + return pytest.mark.skip(reason=f"Redis version required >= {min_version}") + + assert await redis_client.function_flush(FlushMode.SYNC) is OK + + # Dump an empty lib + emptyDump = await redis_client.function_dump() + assert emptyDump is not None and len(emptyDump) > 0 + + name1 = f"Foster{get_random_string(5)}" + name2 = f"Dogster{get_random_string(5)}" + + # function name1 returns first argument; function name2 returns argument array len + code = generate_lua_lib_code( + name1, {name1: "return args[1]", name2: "return #args"}, False + ) + assert await redis_client.function_load(code, True) == name1.encode() + flist = await redis_client.function_list(with_code=True) + + dump = await redis_client.function_dump() + assert dump is not None + + # restore without cleaning the lib and/or overwrite option causes an error + with pytest.raises(RequestError) as e: + assert await redis_client.function_restore(dump) + assert "already exists" in str(e) + + # APPEND policy also fails for the same reason (name collision) + with pytest.raises(RequestError) as e: + assert await redis_client.function_restore( + dump, FunctionRestorePolicy.APPEND + ) + assert "already exists" in str(e) + + # REPLACE policy succeed + assert ( + await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE) + is OK + ) + + # but nothing changed - all code overwritten + assert await redis_client.function_list(with_code=True) == flist + + # create lib with another name, but with the same function names + assert await redis_client.function_flush(FlushMode.SYNC) is OK + code = generate_lua_lib_code( + name2, {name1: "return args[1]", name2: "return #args"}, False + ) + assert await redis_client.function_load(code, True) == name2.encode() + + # REPLACE policy now fails due to a name collision + with pytest.raises(RequestError) as e: + await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE) + assert "already exists" in str(e) + + # FLUSH policy succeeds, but deletes the second lib + assert ( + await redis_client.function_restore(dump, FunctionRestorePolicy.FLUSH) is OK + ) + assert await redis_client.function_list(with_code=True) == flist + + # call restored functions + assert ( + await redis_client.fcall(name1, arguments=["meow", "woem"]) + == "meow".encode() + ) + assert await redis_client.fcall(name2, arguments=["meow", "woem"]) == 2 + + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_function_dump_restore_cluster( + self, redis_client: GlideClusterClient + ): + min_version = "7.0.0" + if await check_if_server_version_lt(redis_client, min_version): + return pytest.mark.skip(reason=f"Redis version required >= {min_version}") + + assert await redis_client.function_flush(FlushMode.SYNC) is OK + + # Dump an empty lib + emptyDump = await redis_client.function_dump() + assert emptyDump is not None and len(emptyDump) > 0 + + name1 = f"Foster{get_random_string(5)}" + libname1 = f"FosterLib{get_random_string(5)}" + name2 = f"Dogster{get_random_string(5)}" + libname2 = f"DogsterLib{get_random_string(5)}" + + # function name1 returns first argument; function name2 returns argument array len + code = generate_lua_lib_code( + libname1, {name1: "return args[1]", name2: "return #args"}, True + ) + assert await redis_client.function_load(code, True) == libname1.encode() + flist = await redis_client.function_list(with_code=True) + dump = await redis_client.function_dump(RandomNode()) + assert dump is not None and isinstance(dump, bytes) + + # restore without cleaning the lib and/or overwrite option causes an error + with pytest.raises(RequestError) as e: + assert await redis_client.function_restore(dump) + assert "already exists" in str(e) + + # APPEND policy also fails for the same reason (name collision) + with pytest.raises(RequestError) as e: + assert await redis_client.function_restore( + dump, FunctionRestorePolicy.APPEND + ) + assert "already exists" in str(e) + + # REPLACE policy succeed + assert ( + await redis_client.function_restore( + dump, FunctionRestorePolicy.REPLACE, route=AllPrimaries() + ) + is OK + ) + + # but nothing changed - all code overwritten + restoredFunctionList = await redis_client.function_list(with_code=True) + assert restoredFunctionList is not None + assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1 + assert restoredFunctionList[0]["library_name".encode()] == libname1.encode() + + # Note that function ordering may differ across nodes so we can't do a deep equals + assert len(restoredFunctionList[0]["functions".encode()]) == 2 + + # create lib with another name, but with the same function names + assert await redis_client.function_flush(FlushMode.SYNC) is OK + code = generate_lua_lib_code( + libname2, {name1: "return args[1]", name2: "return #args"}, True + ) + assert await redis_client.function_load(code, True) == libname2.encode() + restoredFunctionList = await redis_client.function_list(with_code=True) + assert restoredFunctionList is not None + assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1 + assert restoredFunctionList[0]["library_name".encode()] == libname2.encode() + + # REPLACE policy now fails due to a name collision + with pytest.raises(RequestError) as e: + await redis_client.function_restore(dump, FunctionRestorePolicy.REPLACE) + assert "already exists" in str(e) + + # FLUSH policy succeeds, but deletes the second lib + assert ( + await redis_client.function_restore(dump, FunctionRestorePolicy.FLUSH) is OK + ) + restoredFunctionList = await redis_client.function_list(with_code=True) + assert restoredFunctionList is not None + assert isinstance(restoredFunctionList, List) and len(restoredFunctionList) == 1 + assert restoredFunctionList[0]["library_name".encode()] == libname1.encode() + + # Note that function ordering may differ across nodes so we can't do a deep equals + assert len(restoredFunctionList[0]["functions".encode()]) == 2 + + # call restored functions + assert ( + await redis_client.fcall_ro(name1, arguments=["meow", "woem"]) + == "meow".encode() + ) + assert await redis_client.fcall_ro(name2, arguments=["meow", "woem"]) == 2 + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_srandmember(self, redis_client: TGlideClient):