diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml index efc63a1f6f..ab4f7a040d 100644 --- a/.github/workflows/clang-format.yml +++ b/.github/workflows/clang-format.yml @@ -1,6 +1,7 @@ name: Clang Format Check on: + push: pull_request: paths: - 'src/**' diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index 126ecb0146..e51f9b7600 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -100,6 +100,7 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/script_lua.c ${CMAKE_SOURCE_DIR}/src/script.c ${CMAKE_SOURCE_DIR}/src/functions.c + ${CMAKE_SOURCE_DIR}/src/scripting_engine.c ${CMAKE_SOURCE_DIR}/src/function_lua.c ${CMAKE_SOURCE_DIR}/src/commands.c ${CMAKE_SOURCE_DIR}/src/strl.c diff --git a/src/Makefile b/src/Makefile index 5cd20bb0ee..7a951193e4 100644 --- a/src/Makefile +++ b/src/Makefile @@ -374,7 +374,7 @@ else endef endif -# Determine install/uninstall Redis symlinks for compatibility when +# Determine install/uninstall Redis symlinks for compatibility when # installing/uninstalling Valkey binaries (defaulting to `yes`) USE_REDIS_SYMLINKS?=yes ifeq ($(USE_REDIS_SYMLINKS),yes) @@ -416,7 +416,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/aof.c b/src/aof.c index 5dc12db61e..024cdb2771 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1922,7 +1922,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { /* Write either the key or the value of the currently selected item of a hash. * The 'hi' argument passes a valid hash iterator. * The 'what' filed specifies if to write a key or a value and can be - * either OBJ_HASH_KEY or OBJ_HASH_VALUE. + * either OBJ_HASH_FIELD or OBJ_HASH_VALUE. * * The function returns 0 on error, non-zero on success. */ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { @@ -1936,7 +1936,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { return rioWriteBulkString(r, (char *)vstr, vlen); else return rioWriteBulkLongLong(r, vll); - } else if (hi->encoding == OBJ_ENCODING_HT) { + } else if (hi->encoding == OBJ_ENCODING_HASHTABLE) { sds value = hashTypeCurrentFromHashTable(hi, what); return rioWriteBulkString(r, value, sdslen(value)); } @@ -1963,7 +1963,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { } } - if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) { + if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_FIELD) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) { hashTypeResetIterator(&hi); return 0; } diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5c4bb65aae..94d3532dfc 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2936,6 +2936,10 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { if (n && n != myself && !(nodeIsReplica(myself) && myself->replicaof == n)) { sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN); dictEntry *de = dictAddOrFind(server.cluster->nodes_black_list, id); + if (dictGetKey(de) != id) { + /* The dict did not take ownership of the id string, so we need to free it. */ + sdsfree(id); + } uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl); dictSetUnsignedIntegerVal(de, expire); clusterDelNode(n); @@ -3278,8 +3282,8 @@ int clusterProcessPacket(clusterLink *link) { /* Unable to retrieve the node's IP address from the connection. Without a * valid IP, the node becomes unusable in the cluster. This failure might be * due to the connection being closed. */ - serverLog(LL_NOTICE, "Closing link even though we received a MEET packet on it, " - "because the connection has an error"); + serverLog(LL_NOTICE, "Closing cluster link due to failure to retrieve IP from the connection, " + "possibly caused by a closed connection."); freeClusterLink(link); return 0; } @@ -3302,14 +3306,14 @@ int clusterProcessPacket(clusterLink *link) { clusterAddNode(node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } else { - /* A second MEET packet was received on an existing link during the handshake process. - * This happens when the other node detects no inbound link, and re-sends a MEET packet - * before this node can respond with a PING. This MEET is a no-op. + /* A second MEET packet was received on an existing link during the handshake + * process. This happens when the other node detects no inbound link, and + * re-sends a MEET packet before this node can respond with a PING. + * This MEET is a no-op. * - * Note: Nodes in HANDSHAKE state are not fully "known" (random names), so the sender - * remains unidentified at this point. The MEET packet might be re-sent if the inbound - * connection is still unestablished by the next cron cycle. - */ + * Note: Nodes in HANDSHAKE state are not fully "known" (random names), so the + * sender remains unidentified at this point. The MEET packet might be re-sent + * if the inbound connection is still unestablished by the next cron cycle. */ debugServerAssert(link->inbound && nodeInHandshake(link->node)); } @@ -3318,16 +3322,19 @@ int clusterProcessPacket(clusterLink *link) { * of the message type. */ clusterProcessGossipSection(hdr, link); } else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) { - /* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not - * know it. - * Free my outbound link to that node, triggering a reconnect and a PING over the new link. - * Once that node receives our PING, it should recognize the new connection as an inbound link from me. - * We should only free the outbound link if the node is known for more time than the handshake timeout, - * since during this time, the other side might still be trying to complete the handshake. */ + /* The MEET packet is from a known node, after the handshake timeout, so the sender + * thinks that I do not know it. + * Free my outbound link to that node, triggering a reconnect and a PING over the + * new link. + * Once that node receives our PING, it should recognize the new connection as an + * inbound link from me. We should only free the outbound link if the node is known + * for more time than the handshake timeout, since during this time, the other side + * might still be trying to complete the handshake. */ /* We should always receive a MEET packet on an inbound link. */ serverAssert(link != sender->link); - serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet from this known node", + serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet " + "from this known node", sender->name, sender->human_nodename); freeClusterLink(sender->link); } @@ -3920,7 +3927,7 @@ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) { server.stat_cluster_links_memory += sizeof(listNode); /* Populate sent messages stats. */ - uint16_t type = ntohs(getMessageFromSendBlock(msgblock)->type); + uint16_t type = ntohs(getMessageFromSendBlock(msgblock)->type) & ~CLUSTERMSG_MODIFIER_MASK; if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_sent[type]++; } diff --git a/src/commands.def b/src/commands.def index faa599346b..613eb16c9b 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1289,6 +1289,7 @@ commandHistory CLIENT_KILL_History[] = { {"6.2.0","`LADDR` option."}, {"8.0.0","`MAXAGE` option."}, {"8.0.0","Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility."}, +{"8.1.0","`ID` option accepts multiple IDs."}, }; #endif @@ -1320,7 +1321,7 @@ struct COMMAND_ARG CLIENT_KILL_filter_new_format_skipme_Subargs[] = { /* CLIENT KILL filter new_format argument table */ struct COMMAND_ARG CLIENT_KILL_filter_new_format_Subargs[] = { -{MAKE_ARG("client-id",ARG_TYPE_INTEGER,-1,"ID",NULL,"2.8.12",CMD_ARG_OPTIONAL,0,NULL)}, +{MAKE_ARG("client-id",ARG_TYPE_INTEGER,-1,"ID",NULL,"2.8.12",CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE,0,NULL)}, {MAKE_ARG("client-type",ARG_TYPE_ONEOF,-1,"TYPE",NULL,"2.8.12",CMD_ARG_OPTIONAL,6,NULL),.subargs=CLIENT_KILL_filter_new_format_client_type_Subargs}, {MAKE_ARG("username",ARG_TYPE_STRING,-1,"USER",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)}, {MAKE_ARG("addr",ARG_TYPE_STRING,-1,"ADDR",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL),.display_text="ip:port"}, @@ -1352,6 +1353,7 @@ commandHistory CLIENT_LIST_History[] = { {"7.0.0","Added `resp`, `multi-mem`, `rbs` and `rbp` fields."}, {"7.0.3","Added `ssub` field."}, {"8.0.0","Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility."}, +{"8.1.0","Added filters USER, ADDR, LADDR, SKIPME, and MAXAGE"}, }; #endif @@ -1375,10 +1377,21 @@ struct COMMAND_ARG CLIENT_LIST_client_type_Subargs[] = { {MAKE_ARG("pubsub",ARG_TYPE_PURE_TOKEN,-1,"PUBSUB",NULL,NULL,CMD_ARG_NONE,0,NULL)}, }; +/* CLIENT LIST skipme argument table */ +struct COMMAND_ARG CLIENT_LIST_skipme_Subargs[] = { +{MAKE_ARG("yes",ARG_TYPE_PURE_TOKEN,-1,"YES",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("no",ARG_TYPE_PURE_TOKEN,-1,"NO",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + /* CLIENT LIST argument table */ struct COMMAND_ARG CLIENT_LIST_Args[] = { {MAKE_ARG("client-type",ARG_TYPE_ONEOF,-1,"TYPE",NULL,"5.0.0",CMD_ARG_OPTIONAL,4,NULL),.subargs=CLIENT_LIST_client_type_Subargs}, {MAKE_ARG("client-id",ARG_TYPE_INTEGER,-1,"ID",NULL,"6.2.0",CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE,0,NULL)}, +{MAKE_ARG("username",ARG_TYPE_STRING,-1,"USER",NULL,"8.1.0",CMD_ARG_OPTIONAL,0,NULL)}, +{MAKE_ARG("addr",ARG_TYPE_STRING,-1,"ADDR",NULL,"8.1.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="ip:port"}, +{MAKE_ARG("laddr",ARG_TYPE_STRING,-1,"LADDR",NULL,"8.1.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="ip:port"}, +{MAKE_ARG("skipme",ARG_TYPE_ONEOF,-1,"SKIPME",NULL,"8.1.0",CMD_ARG_OPTIONAL,2,NULL),.subargs=CLIENT_LIST_skipme_Subargs}, +{MAKE_ARG("maxage",ARG_TYPE_INTEGER,-1,"MAXAGE",NULL,"8.1.0",CMD_ARG_OPTIONAL,0,NULL)}, }; /********** CLIENT NO_EVICT ********************/ @@ -1652,26 +1665,26 @@ struct COMMAND_ARG CLIENT_UNBLOCK_Args[] = { /* CLIENT command table */ struct COMMAND_STRUCT CLIENT_Subcommands[] = { -{MAKE_CMD("caching","Instructs the server whether to track the keys in the next request.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CACHING_History,0,CLIENT_CACHING_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_CACHING_Keyspecs,0,NULL,1),.args=CLIENT_CACHING_Args}, -{MAKE_CMD("capa","A client claims its capability.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CAPA_History,0,CLIENT_CAPA_Tips,0,clientCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_CAPA_Keyspecs,0,NULL,1),.args=CLIENT_CAPA_Args}, -{MAKE_CMD("getname","Returns the name of the connection.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETNAME_History,0,CLIENT_GETNAME_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETNAME_Keyspecs,0,NULL,0)}, -{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, -{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, -{MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, -{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, -{MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, -{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, -{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, -{MAKE_CMD("no-evict","Sets the client eviction mode of the connection.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_NO_EVICT_History,0,CLIENT_NO_EVICT_Tips,0,clientCommand,3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_NO_EVICT_Keyspecs,0,NULL,1),.args=CLIENT_NO_EVICT_Args}, -{MAKE_CMD("no-touch","Controls whether commands sent by the client affect the LRU/LFU of accessed keys.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_NO_TOUCH_History,0,CLIENT_NO_TOUCH_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_NO_TOUCH_Keyspecs,0,NULL,1),.args=CLIENT_NO_TOUCH_Args}, -{MAKE_CMD("pause","Suspends commands processing.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_PAUSE_History,1,CLIENT_PAUSE_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_PAUSE_Keyspecs,0,NULL,2),.args=CLIENT_PAUSE_Args}, -{MAKE_CMD("reply","Instructs the server whether to reply to commands.","O(1)","3.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_REPLY_History,0,CLIENT_REPLY_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_REPLY_Keyspecs,0,NULL,1),.args=CLIENT_REPLY_Args}, +{MAKE_CMD("caching","Instructs the server whether to track the keys in the next request.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CACHING_History,0,CLIENT_CACHING_Tips,0,clientCachingCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_CACHING_Keyspecs,0,NULL,1),.args=CLIENT_CACHING_Args}, +{MAKE_CMD("capa","A client claims its capability.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CAPA_History,0,CLIENT_CAPA_Tips,0,clientCapaCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_CAPA_Keyspecs,0,NULL,1),.args=CLIENT_CAPA_Args}, +{MAKE_CMD("getname","Returns the name of the connection.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETNAME_History,0,CLIENT_GETNAME_Tips,0,clientGetNameCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETNAME_Keyspecs,0,NULL,0)}, +{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientGetredirCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, +{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientHelpCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, +{MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientIDCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, +{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientImportSourceCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, +{MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientInfoCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, +{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,8,CLIENT_KILL_Tips,0,clientKillCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, +{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,8,CLIENT_LIST_Tips,1,clientListCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,7),.args=CLIENT_LIST_Args}, +{MAKE_CMD("no-evict","Sets the client eviction mode of the connection.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_NO_EVICT_History,0,CLIENT_NO_EVICT_Tips,0,clientNoEvictCommand,3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_NO_EVICT_Keyspecs,0,NULL,1),.args=CLIENT_NO_EVICT_Args}, +{MAKE_CMD("no-touch","Controls whether commands sent by the client affect the LRU/LFU of accessed keys.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_NO_TOUCH_History,0,CLIENT_NO_TOUCH_Tips,0,clientNoTouchCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_NO_TOUCH_Keyspecs,0,NULL,1),.args=CLIENT_NO_TOUCH_Args}, +{MAKE_CMD("pause","Suspends commands processing.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_PAUSE_History,1,CLIENT_PAUSE_Tips,0,clientPauseCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_PAUSE_Keyspecs,0,NULL,2),.args=CLIENT_PAUSE_Args}, +{MAKE_CMD("reply","Instructs the server whether to reply to commands.","O(1)","3.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_REPLY_History,0,CLIENT_REPLY_Tips,0,clientReplyCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_REPLY_Keyspecs,0,NULL,1),.args=CLIENT_REPLY_Args}, {MAKE_CMD("setinfo","Sets information specific to the client or connection.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_SETINFO_History,0,CLIENT_SETINFO_Tips,2,clientSetinfoCommand,4,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_SETINFO_Keyspecs,0,NULL,1),.args=CLIENT_SETINFO_Args}, -{MAKE_CMD("setname","Sets the connection name.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_SETNAME_History,0,CLIENT_SETNAME_Tips,2,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_SETNAME_Keyspecs,0,NULL,1),.args=CLIENT_SETNAME_Args}, -{MAKE_CMD("tracking","Controls server-assisted client-side caching for the connection.","O(1). Some options may introduce additional complexity.","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_TRACKING_History,0,CLIENT_TRACKING_Tips,0,clientCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_TRACKING_Keyspecs,0,NULL,7),.args=CLIENT_TRACKING_Args}, -{MAKE_CMD("trackinginfo","Returns information about server-assisted client-side caching for the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_TRACKINGINFO_History,0,CLIENT_TRACKINGINFO_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_TRACKINGINFO_Keyspecs,0,NULL,0)}, -{MAKE_CMD("unblock","Unblocks a client blocked by a blocking command from a different connection.","O(log N) where N is the number of client connections","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_UNBLOCK_History,0,CLIENT_UNBLOCK_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_UNBLOCK_Keyspecs,0,NULL,2),.args=CLIENT_UNBLOCK_Args}, -{MAKE_CMD("unpause","Resumes processing commands from paused clients.","O(N) Where N is the number of paused clients","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_UNPAUSE_History,0,CLIENT_UNPAUSE_Tips,0,clientCommand,2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_UNPAUSE_Keyspecs,0,NULL,0)}, +{MAKE_CMD("setname","Sets the connection name.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_SETNAME_History,0,CLIENT_SETNAME_Tips,2,clientSetNameCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_SETNAME_Keyspecs,0,NULL,1),.args=CLIENT_SETNAME_Args}, +{MAKE_CMD("tracking","Controls server-assisted client-side caching for the connection.","O(1). Some options may introduce additional complexity.","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_TRACKING_History,0,CLIENT_TRACKING_Tips,0,clientTrackingCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_TRACKING_Keyspecs,0,NULL,7),.args=CLIENT_TRACKING_Args}, +{MAKE_CMD("trackinginfo","Returns information about server-assisted client-side caching for the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_TRACKINGINFO_History,0,CLIENT_TRACKINGINFO_Tips,0,clientTrackingInfoCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_TRACKINGINFO_Keyspecs,0,NULL,0)}, +{MAKE_CMD("unblock","Unblocks a client blocked by a blocking command from a different connection.","O(log N) where N is the number of client connections","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_UNBLOCK_History,0,CLIENT_UNBLOCK_Tips,0,clientUnblockCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_UNBLOCK_Keyspecs,0,NULL,2),.args=CLIENT_UNBLOCK_Args}, +{MAKE_CMD("unpause","Resumes processing commands from paused clients.","O(N) Where N is the number of paused clients","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_UNPAUSE_History,0,CLIENT_UNPAUSE_Tips,0,clientUnpauseCommand,2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_UNPAUSE_Keyspecs,0,NULL,0)}, {0} }; @@ -11051,7 +11064,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { {MAKE_CMD("readwrite","Enables read-write queries for a connection to a Valkey replica node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,READWRITE_History,0,READWRITE_Tips,0,readwriteCommand,1,CMD_FAST|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,READWRITE_Keyspecs,0,NULL,0)}, /* connection */ {MAKE_CMD("auth","Authenticates the connection.","O(N) where N is the number of passwords defined for the user","1.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,AUTH_History,1,AUTH_Tips,0,authCommand,-2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,AUTH_Keyspecs,0,NULL,2),.args=AUTH_Args}, -{MAKE_CMD("client","A container for client connection commands.","Depends on subcommand.","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_History,0,CLIENT_Tips,0,NULL,-2,CMD_SENTINEL,0,CLIENT_Keyspecs,0,NULL,0),.subcommands=CLIENT_Subcommands}, +{MAKE_CMD("client","A container for client connection commands.","Depends on subcommand.","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_History,0,CLIENT_Tips,0,clientCommand,-2,CMD_SENTINEL,0,CLIENT_Keyspecs,0,NULL,0),.subcommands=CLIENT_Subcommands}, {MAKE_CMD("echo","Returns the given string.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,ECHO_History,0,ECHO_Tips,0,echoCommand,2,CMD_LOADING|CMD_STALE|CMD_FAST,ACL_CATEGORY_CONNECTION,ECHO_Keyspecs,0,NULL,1),.args=ECHO_Args}, {MAKE_CMD("hello","Handshakes with the server.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,HELLO_History,2,HELLO_Tips,0,helloCommand,-1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_NO_AUTH|CMD_SENTINEL|CMD_ALLOW_BUSY,ACL_CATEGORY_CONNECTION,HELLO_Keyspecs,0,NULL,1),.args=HELLO_Args}, {MAKE_CMD("ping","Returns the server's liveliness response.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,PING_History,0,PING_Tips,2,pingCommand,-1,CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,PING_Keyspecs,0,NULL,1),.args=PING_Args}, diff --git a/src/commands/client-caching.json b/src/commands/client-caching.json index 2a4ae891db..d661492f45 100644 --- a/src/commands/client-caching.json +++ b/src/commands/client-caching.json @@ -6,7 +6,7 @@ "since": "6.0.0", "arity": 3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientCachingCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-capa.json b/src/commands/client-capa.json index 3c16cd44f9..0d0f577f94 100644 --- a/src/commands/client-capa.json +++ b/src/commands/client-capa.json @@ -6,7 +6,7 @@ "since": "8.0.0", "arity": -3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientCapaCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-getname.json b/src/commands/client-getname.json index 9e237af849..e13db064b7 100644 --- a/src/commands/client-getname.json +++ b/src/commands/client-getname.json @@ -6,7 +6,7 @@ "since": "2.6.9", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientGetNameCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-getredir.json b/src/commands/client-getredir.json index 6fdb002dc8..3df1df6b6f 100644 --- a/src/commands/client-getredir.json +++ b/src/commands/client-getredir.json @@ -6,7 +6,7 @@ "since": "6.0.0", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientGetredirCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-help.json b/src/commands/client-help.json index b49294c9ee..ae771d52ae 100644 --- a/src/commands/client-help.json +++ b/src/commands/client-help.json @@ -6,7 +6,7 @@ "since": "5.0.0", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientHelpCommand", "command_flags": [ "LOADING", "STALE", diff --git a/src/commands/client-id.json b/src/commands/client-id.json index 7c2bf08200..f6131250dd 100644 --- a/src/commands/client-id.json +++ b/src/commands/client-id.json @@ -6,7 +6,7 @@ "since": "5.0.0", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientIDCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json index 113c07d70a..dd5ef65e77 100644 --- a/src/commands/client-import-source.json +++ b/src/commands/client-import-source.json @@ -6,7 +6,7 @@ "since": "8.1.0", "arity": 3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientImportSourceCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-info.json b/src/commands/client-info.json index f974da437b..afda2ca967 100644 --- a/src/commands/client-info.json +++ b/src/commands/client-info.json @@ -6,7 +6,7 @@ "since": "6.2.0", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientInfoCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-kill.json b/src/commands/client-kill.json index 97fa932cd8..0ae3579534 100644 --- a/src/commands/client-kill.json +++ b/src/commands/client-kill.json @@ -6,7 +6,7 @@ "since": "2.4.0", "arity": -3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientKillCommand", "history": [ [ "2.8.12", @@ -35,6 +35,10 @@ [ "8.0.0", "Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility." + ], + [ + "8.1.0", + "`ID` option accepts multiple IDs." ] ], "command_flags": [ @@ -68,6 +72,7 @@ "name": "client-id", "type": "integer", "optional": true, + "multiple": true, "since": "2.8.12" }, { diff --git a/src/commands/client-list.json b/src/commands/client-list.json index d9c0054e60..05e4de2419 100644 --- a/src/commands/client-list.json +++ b/src/commands/client-list.json @@ -6,7 +6,7 @@ "since": "2.4.0", "arity": -2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientListCommand", "history": [ [ "2.8.12", @@ -35,6 +35,10 @@ [ "8.0.0", "Replaced `master` `TYPE` with `primary`. `master` still supported for backward compatibility." + ], + [ + "8.1.0", + "Added filters USER, ADDR, LADDR, SKIPME, and MAXAGE" ] ], "command_flags": [ @@ -91,6 +95,55 @@ "optional": true, "multiple": true, "since": "6.2.0" + }, + { + "token": "USER", + "name": "username", + "type": "string", + "optional": true, + "since": "8.1.0" + }, + { + "token": "ADDR", + "name": "addr", + "display": "ip:port", + "type": "string", + "optional": true, + "since": "8.1.0" + }, + { + "token": "LADDR", + "name": "laddr", + "display": "ip:port", + "type": "string", + "optional": true, + "since": "8.1.0" + }, + { + "token": "SKIPME", + "name": "skipme", + "type": "oneof", + "optional": true, + "since": "8.1.0", + "arguments": [ + { + "name": "yes", + "type": "pure-token", + "token": "YES" + }, + { + "name": "no", + "type": "pure-token", + "token": "NO" + } + ] + }, + { + "token": "MAXAGE", + "name": "maxage", + "type": "integer", + "optional": true, + "since": "8.1.0" } ] } diff --git a/src/commands/client-no-evict.json b/src/commands/client-no-evict.json index 9ed6718405..710f8a97f9 100644 --- a/src/commands/client-no-evict.json +++ b/src/commands/client-no-evict.json @@ -6,7 +6,7 @@ "since": "7.0.0", "arity": 3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientNoEvictCommand", "command_flags": [ "ADMIN", "NOSCRIPT", diff --git a/src/commands/client-no-touch.json b/src/commands/client-no-touch.json index 4cf7b72416..4196770a2e 100644 --- a/src/commands/client-no-touch.json +++ b/src/commands/client-no-touch.json @@ -6,7 +6,7 @@ "since": "7.2.0", "arity": 3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientNoTouchCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-pause.json b/src/commands/client-pause.json index b1dd7bc478..54faf796c2 100644 --- a/src/commands/client-pause.json +++ b/src/commands/client-pause.json @@ -6,7 +6,7 @@ "since": "3.0.0", "arity": -3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientPauseCommand", "history": [ [ "6.2.0", diff --git a/src/commands/client-reply.json b/src/commands/client-reply.json index 9406de85cf..8d2b713a69 100644 --- a/src/commands/client-reply.json +++ b/src/commands/client-reply.json @@ -6,7 +6,7 @@ "since": "3.2.0", "arity": 3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientReplyCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-setname.json b/src/commands/client-setname.json index b071bd18ff..f544dc6a0f 100644 --- a/src/commands/client-setname.json +++ b/src/commands/client-setname.json @@ -6,7 +6,7 @@ "since": "2.6.9", "arity": 3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientSetNameCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-tracking.json b/src/commands/client-tracking.json index 2c3768c2fb..1acf84fafc 100644 --- a/src/commands/client-tracking.json +++ b/src/commands/client-tracking.json @@ -6,7 +6,7 @@ "since": "6.0.0", "arity": -3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientTrackingCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-trackinginfo.json b/src/commands/client-trackinginfo.json index 270a3d5e6e..78ba8201d7 100644 --- a/src/commands/client-trackinginfo.json +++ b/src/commands/client-trackinginfo.json @@ -6,7 +6,7 @@ "since": "6.2.0", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientTrackingInfoCommand", "command_flags": [ "NOSCRIPT", "LOADING", diff --git a/src/commands/client-unblock.json b/src/commands/client-unblock.json index d391ede9e9..2173173f40 100644 --- a/src/commands/client-unblock.json +++ b/src/commands/client-unblock.json @@ -6,7 +6,7 @@ "since": "5.0.0", "arity": -3, "container": "CLIENT", - "function": "clientCommand", + "function": "clientUnblockCommand", "command_flags": [ "ADMIN", "NOSCRIPT", diff --git a/src/commands/client-unpause.json b/src/commands/client-unpause.json index 6c55210d2a..bb78fb848b 100644 --- a/src/commands/client-unpause.json +++ b/src/commands/client-unpause.json @@ -6,7 +6,7 @@ "since": "6.2.0", "arity": 2, "container": "CLIENT", - "function": "clientCommand", + "function": "clientUnpauseCommand", "command_flags": [ "ADMIN", "NOSCRIPT", diff --git a/src/commands/client.json b/src/commands/client.json index b50996128e..116fb4d4a2 100644 --- a/src/commands/client.json +++ b/src/commands/client.json @@ -4,6 +4,7 @@ "complexity": "Depends on subcommand.", "group": "connection", "since": "2.4.0", + "function": "clientCommand", "arity": -2, "command_flags": [ "SENTINEL" diff --git a/src/db.c b/src/db.c index 55ffe5da5a..535d493954 100644 --- a/src/db.c +++ b/src/db.c @@ -979,39 +979,6 @@ void keysScanCallback(void *privdata, void *entry) { /* This callback is used by scanGenericCommand in order to collect elements * returned by the dictionary iterator into a list. */ -void dictScanCallback(void *privdata, const dictEntry *de) { - scanData *data = (scanData *)privdata; - list *keys = data->keys; - robj *o = data->o; - sds val = NULL; - sds key = NULL; - data->sampled++; - - /* This callback is only used for scanning elements within a key (hash - * fields, set elements, etc.) so o must be set here. */ - serverAssert(o != NULL); - - /* Filter element if it does not match the pattern. */ - sds keysds = dictGetKey(de); - if (data->pattern) { - if (!stringmatchlen(data->pattern, sdslen(data->pattern), keysds, sdslen(keysds), 0)) { - return; - } - } - - if (o->type == OBJ_HASH) { - key = keysds; - if (!data->only_keys) { - val = dictGetVal(de); - } - } else { - serverPanic("Type not handled in dict SCAN callback."); - } - - listAddNodeTail(keys, key); - if (val) listAddNodeTail(keys, val); -} - void hashtableScanCallback(void *privdata, void *entry) { scanData *data = (scanData *)privdata; sds val = NULL; @@ -1025,14 +992,20 @@ void hashtableScanCallback(void *privdata, void *entry) { * fields, set elements, etc.) so o must be set here. */ serverAssert(o != NULL); - /* get key */ + /* get key, value */ if (o->type == OBJ_SET) { key = (sds)entry; } else if (o->type == OBJ_ZSET) { zskiplistNode *node = (zskiplistNode *)entry; key = node->ele; + /* zset data is copied after filtering by key */ + } else if (o->type == OBJ_HASH) { + key = hashTypeEntryGetField(entry); + if (!data->only_keys) { + val = hashTypeEntryGetValue(entry); + } } else { - serverPanic("Type not handled in hashset SCAN callback."); + serverPanic("Type not handled in hashtable SCAN callback."); } /* Filter element if it does not match the pattern. */ @@ -1042,9 +1015,9 @@ void hashtableScanCallback(void *privdata, void *entry) { } } - if (o->type == OBJ_SET) { - /* no value, key used by reference */ - } else if (o->type == OBJ_ZSET) { + /* zset data must be copied. Do this after filtering to avoid unneeded + * allocations. */ + if (o->type == OBJ_ZSET) { /* zset data is copied */ zskiplistNode *node = (zskiplistNode *)entry; key = sdsdup(node->ele); @@ -1053,8 +1026,6 @@ void hashtableScanCallback(void *privdata, void *entry) { int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO); val = sdsnewlen(buf, len); } - } else { - serverPanic("Type not handled in hashset SCAN callback."); } listAddNodeTail(keys, key); @@ -1193,20 +1164,19 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { * cursor to zero to signal the end of the iteration. */ /* Handle the case of kvstore, dict or hashtable. */ - dict *dict_table = NULL; - hashtable *hashtable_table = NULL; + hashtable *ht = NULL; int shallow_copied_list_items = 0; if (o == NULL) { shallow_copied_list_items = 1; } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) { - hashtable_table = o->ptr; + ht = o->ptr; shallow_copied_list_items = 1; - } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { - dict_table = o->ptr; + } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HASHTABLE) { + ht = o->ptr; shallow_copied_list_items = 1; } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; - hashtable_table = zs->ht; + ht = zs->ht; /* scanning ZSET allocates temporary strings even though it's a dict */ shallow_copied_list_items = 0; } @@ -1220,7 +1190,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { } /* For main hash table scan or scannable data structure. */ - if (!o || dict_table || hashtable_table) { + if (!o || ht) { /* We set the max number of iterations to ten times the specified * COUNT, so if the hash table is in a pathological state (very * sparsely populated) we avoid to block too much time at the cost @@ -1260,10 +1230,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { * If cursor is empty, we should try exploring next non-empty slot. */ if (o == NULL) { cursor = kvstoreScan(c->db->keys, cursor, onlydidx, keysScanCallback, NULL, &data); - } else if (dict_table) { - cursor = dictScan(dict_table, cursor, dictScanCallback, &data); } else { - cursor = hashtableScan(hashtable_table, cursor, hashtableScanCallback, &data); + cursor = hashtableScan(ht, cursor, hashtableScanCallback, &data); } } while (cursor && maxiterations-- && data.sampled < count); } else if (o->type == OBJ_SET) { @@ -1882,7 +1850,8 @@ void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) { robj *aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c, 2, aux, keyobj); signalModifiedKey(c, c->db, keyobj); - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyobj, c->db->id); + notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, c->db->id); + server.stat_expiredkeys++; } /* Propagate an implicit key deletion into replicas and the AOF file. diff --git a/src/debug.c b/src/debug.c index c80ff5af39..915e0c264d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -231,7 +231,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o) sds sdsele; memset(eledigest, 0, 20); - sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_KEY); + sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_FIELD); mixDigest(eledigest, sdsele, sdslen(sdsele)); sdsfree(sdsele); sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_VALUE); @@ -923,23 +923,17 @@ void debugCommand(client *c) { robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr); if (o == NULL) return; - /* Get the dict reference from the object, if possible. */ - dict *d = NULL; + /* Get the hashtable reference from the object, if possible. */ hashtable *ht = NULL; switch (o->encoding) { case OBJ_ENCODING_SKIPLIST: { zset *zs = o->ptr; ht = zs->ht; } break; - case OBJ_ENCODING_HT: d = o->ptr; break; case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break; } - if (d != NULL) { - char buf[4096]; - dictGetStats(buf, sizeof(buf), d, full); - addReplyVerbatim(c, buf, strlen(buf), "txt"); - } else if (ht != NULL) { + if (ht != NULL) { char buf[4096]; hashtableGetStats(buf, sizeof(buf), ht, full); addReplyVerbatim(c, buf, strlen(buf), "txt"); diff --git a/src/defrag.c b/src/defrag.c index 103730ee14..fb98da96c7 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -373,13 +373,6 @@ void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) { if (new_sds != NULL) *sds_ref = new_sds; } -void activeDefragSdsHashtable(hashtable *ht) { - unsigned long cursor = 0; - do { - cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF); - } while (cursor != 0); -} - /* Defrag a list of ptr, sds or robj string values */ static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; @@ -481,26 +474,25 @@ static void scanHashtableCallbackCountScanned(void *privdata, void *elemref) { server.stat_active_defrag_scanned++; } -/* Used as dict scan callback when all the work is done in the dictDefragFunctions. */ -static void scanCallbackCountScanned(void *privdata, const dictEntry *de) { - UNUSED(privdata); - UNUSED(de); - server.stat_active_defrag_scanned++; -} - static void scanLaterSet(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHTABLE) return; hashtable *ht = ob->ptr; *cursor = hashtableScanDefrag(ht, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF); } +/* Hashtable scan callback for hash datatype */ +static void activeDefragHashTypeEntry(void *privdata, void *element_ref) { + UNUSED(privdata); + hashTypeEntry **entry_ref = (hashTypeEntry **)element_ref; + + hashTypeEntry *new_entry = hashTypeEntryDefrag(*entry_ref, activeDefragAlloc, activeDefragSds); + if (new_entry) *entry_ref = new_entry; +} + static void scanLaterHash(robj *ob, unsigned long *cursor) { - if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) return; - dict *d = ob->ptr; - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc, - .defragKey = (dictDefragAllocFunction *)activeDefragSds, - .defragVal = (dictDefragAllocFunction *)activeDefragSds}; - *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL); + if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HASHTABLE) return; + hashtable *ht = ob->ptr; + *cursor = hashtableScanDefrag(ht, *cursor, activeDefragHashTypeEntry, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF); } static void defragQuicklist(robj *ob) { @@ -538,15 +530,19 @@ static void defragZsetSkiplist(robj *ob) { } static void defragHash(robj *ob) { - dict *d, *newd; - serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); - d = ob->ptr; - if (dictSize(d) > server.active_defrag_max_scan_fields) + serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE); + hashtable *ht = ob->ptr; + if (hashtableSize(ht) > server.active_defrag_max_scan_fields) { defragLater(ob); - else - activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); - /* defrag the dict struct and tables */ - if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd; + } else { + unsigned long cursor = 0; + do { + cursor = hashtableScanDefrag(ht, cursor, activeDefragHashTypeEntry, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF); + } while (cursor != 0); + } + /* defrag the hashtable struct and tables */ + hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc); + if (new_hashtable) ob->ptr = new_hashtable; } static void defragSet(robj *ob) { @@ -555,11 +551,14 @@ static void defragSet(robj *ob) { if (hashtableSize(ht) > server.active_defrag_max_scan_fields) { defragLater(ob); } else { - activeDefragSdsHashtable(ht); + unsigned long cursor = 0; + do { + cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF); + } while (cursor != 0); } /* defrag the hashtable struct and tables */ - hashtable *newHashtable = hashtableDefragTables(ht, activeDefragAlloc); - if (newHashtable) ob->ptr = newHashtable; + hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc); + if (new_hashtable) ob->ptr = new_hashtable; } /* Defrag callback for radix tree iterator, called for each node, @@ -776,7 +775,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) { } else if (ob->type == OBJ_HASH) { if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; - } else if (ob->encoding == OBJ_ENCODING_HT) { + } else if (ob->encoding == OBJ_ENCODING_HASHTABLE) { defragHash(ob); } else { serverPanic("Unknown hash encoding"); diff --git a/src/function_lua.c b/src/function_lua.c index b535528906..59c16eae54 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -39,6 +39,7 @@ * Uses script_lua.c to run the Lua code. */ +#include "scripting_engine.h" #include "functions.h" #include "script_lua.h" #include @@ -121,7 +122,7 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx, const char *code, size_t timeout, size_t *out_num_compiled_functions, - char **err) { + robj **err) { /* The lua engine is implemented in the core, and not in a Valkey Module */ serverAssert(module_ctx == NULL); @@ -139,7 +140,8 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx, /* compile the code */ if (luaL_loadbuffer(lua, code, strlen(code), "@user_function")) { - *err = valkey_asprintf("Error compiling function: %s", lua_tostring(lua, -1)); + sds error = sdscatfmt(sdsempty(), "Error compiling function: %s", lua_tostring(lua, -1)); + *err = createObject(OBJ_STRING, error); lua_pop(lua, 1); /* pops the error */ goto done; } @@ -157,7 +159,8 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx, if (lua_pcall(lua, 0, 0, 0)) { errorInfo err_info = {0}; luaExtractErrorInformation(lua, &err_info); - *err = valkey_asprintf("Error registering functions: %s", err_info.msg); + sds error = sdscatfmt(sdsempty(), "Error registering functions: %s", err_info.msg); + *err = createObject(OBJ_STRING, error); lua_pop(lua, 1); /* pops the error */ luaErrorInformationDiscard(&err_info); listIter *iter = listGetIterator(load_ctx.functions, AL_START_HEAD); @@ -557,8 +560,8 @@ int luaEngineInitEngine(void) { .get_memory_info = luaEngineGetMemoryInfo, }; - return functionsRegisterEngine(LUA_ENGINE_NAME, - NULL, - lua_engine_ctx, - &lua_engine_methods); + return scriptingEngineManagerRegister(LUA_ENGINE_NAME, + NULL, + lua_engine_ctx, + &lua_engine_methods); } diff --git a/src/functions.c b/src/functions.c index 0d003f7fac..14d8c5296e 100644 --- a/src/functions.c +++ b/src/functions.c @@ -31,7 +31,6 @@ #include "sds.h" #include "dict.h" #include "adlist.h" -#include "module.h" #define LOAD_TIMEOUT_MS 500 @@ -41,8 +40,6 @@ typedef enum { restorePolicy_Replace } restorePolicy; -static size_t engine_cache_memory = 0; - /* Forward declaration */ static void engineFunctionDispose(void *obj); static void engineStatsDispose(void *obj); @@ -67,15 +64,6 @@ typedef struct functionsLibMetaData { sds code; } functionsLibMetaData; -dictType engineDictType = { - dictSdsCaseHash, /* hash function */ - dictSdsDup, /* key dup */ - dictSdsKeyCaseCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ -}; - dictType functionDictType = { dictSdsCaseHash, /* hash function */ dictSdsDup, /* key dup */ @@ -112,34 +100,14 @@ dictType librariesDictType = { NULL /* allow to expand */ }; -/* Dictionary of engines */ -static dict *engines = NULL; - /* Libraries Ctx. */ static functionsLibCtx *curr_functions_lib_ctx = NULL; -static void setupEngineModuleCtx(engineInfo *ei, client *c) { - if (ei->engineModule != NULL) { - serverAssert(ei->module_ctx != NULL); - moduleScriptingEngineInitContext(ei->module_ctx, ei->engineModule, c); - } -} - -static void teardownEngineModuleCtx(engineInfo *ei) { - if (ei->engineModule != NULL) { - serverAssert(ei->module_ctx != NULL); - moduleFreeContext(ei->module_ctx); - } -} - static size_t functionMallocSize(functionInfo *fi) { - setupEngineModuleCtx(fi->li->ei, NULL); - size_t size = zmalloc_size(fi) + - sdsAllocSize(fi->name) + - (fi->desc ? sdsAllocSize(fi->desc) : 0) + - fi->li->ei->engine->get_function_memory_overhead(fi->li->ei->module_ctx, fi->function); - teardownEngineModuleCtx(fi->li->ei); - return size; + return zmalloc_size(fi) + + sdsAllocSize(fi->name) + + (fi->desc ? sdsAllocSize(fi->desc) : 0) + + scriptingEngineCallGetFunctionMemoryOverhead(fi->li->engine, fi->function); } static size_t libraryMallocSize(functionLibInfo *li) { @@ -161,12 +129,8 @@ static void engineFunctionDispose(void *obj) { if (fi->desc) { sdsfree(fi->desc); } - setupEngineModuleCtx(fi->li->ei, NULL); - engine *engine = fi->li->ei->engine; - engine->free_function(fi->li->ei->module_ctx, - engine->engine_ctx, - fi->function); - teardownEngineModuleCtx(fi->li->ei); + + scriptingEngineCallFreeFunction(fi->li->engine, fi->function); zfree(fi); } @@ -239,30 +203,30 @@ functionsLibCtx *functionsLibCtxGetCurrent(void) { return curr_functions_lib_ctx; } +static void initializeFunctionsLibEngineStats(scriptingEngine *engine, + void *context) { + functionsLibCtx *lib_ctx = (functionsLibCtx *)context; + functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); + dictAdd(lib_ctx->engines_stats, scriptingEngineGetName(engine), stats); +} + /* Create a new functions ctx */ functionsLibCtx *functionsLibCtxCreate(void) { functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx)); ret->libraries = dictCreate(&librariesDictType); ret->functions = dictCreate(&functionDictType); ret->engines_stats = dictCreate(&engineStatsDictType); - dictIterator *iter = dictGetIterator(engines); - dictEntry *entry = NULL; - while ((entry = dictNext(iter))) { - engineInfo *ei = dictGetVal(entry); - functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); - dictAdd(ret->engines_stats, ei->name, stats); - } - dictReleaseIterator(iter); + scriptingEngineManagerForEachEngine(initializeFunctionsLibEngineStats, ret); ret->cache_memory = 0; return ret; } -void functionsAddEngineStats(engineInfo *ei) { +void functionsAddEngineStats(sds engine_name) { serverAssert(curr_functions_lib_ctx != NULL); - dictEntry *entry = dictFind(curr_functions_lib_ctx->engines_stats, ei->name); + dictEntry *entry = dictFind(curr_functions_lib_ctx->engines_stats, engine_name); if (entry == NULL) { functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); - dictAdd(curr_functions_lib_ctx->engines_stats, ei->name, stats); + dictAdd(curr_functions_lib_ctx->engines_stats, engine_name, stats); } } @@ -312,12 +276,12 @@ static int functionLibCreateFunction(robj *name, return C_OK; } -static functionLibInfo *engineLibraryCreate(sds name, engineInfo *ei, sds code) { +static functionLibInfo *engineLibraryCreate(sds name, scriptingEngine *e, sds code) { functionLibInfo *li = zmalloc(sizeof(*li)); *li = (functionLibInfo){ .name = sdsdup(name), .functions = dictCreate(&libraryFunctionDictType), - .ei = ei, + .engine = e, .code = sdsdup(code), }; return li; @@ -339,7 +303,7 @@ static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) { lib_ctx->cache_memory -= libraryMallocSize(li); /* update stats */ - functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, scriptingEngineGetName(li->engine)); serverAssert(stats); stats->n_lib--; stats->n_functions -= dictSize(li->functions); @@ -359,7 +323,7 @@ static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo *li) { lib_ctx->cache_memory += libraryMallocSize(li); /* update stats */ - functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, scriptingEngineGetName(li->engine)); serverAssert(stats); stats->n_lib++; stats->n_functions += dictSize(li->functions); @@ -446,107 +410,29 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l return ret; } -/* Register an engine, should be called once by the engine on startup and give - * the following: - * - * - engine_name - name of the engine to register - * - * - engine_module - the valkey module that implements this engine - * - * - engine_ctx - the engine ctx that should be used by the server to interact - * with the engine. - * - * - engine_methods - the struct with the scripting engine callback functions - * pointers. - * - */ -int functionsRegisterEngine(const char *engine_name, - ValkeyModule *engine_module, - engineCtx *engine_ctx, - engineMethods *engine_methods) { - sds engine_name_sds = sdsnew(engine_name); - if (dictFetchValue(engines, engine_name_sds)) { - serverLog(LL_WARNING, "Same engine was registered twice"); - sdsfree(engine_name_sds); - return C_ERR; - } - - engine *eng = zmalloc(sizeof(engine)); - *eng = (engine){ - .engine_ctx = engine_ctx, - .create = engine_methods->create_functions_library, - .call = engine_methods->call_function, - .get_function_memory_overhead = engine_methods->get_function_memory_overhead, - .free_function = engine_methods->free_function, - .get_memory_info = engine_methods->get_memory_info, - }; - - client *c = createClient(NULL); - c->flag.deny_blocking = 1; - c->flag.script = 1; - c->flag.fake = 1; - engineInfo *ei = zmalloc(sizeof(*ei)); - *ei = (engineInfo){ - .name = engine_name_sds, - .engineModule = engine_module, - .module_ctx = engine_module ? moduleAllocateContext() : NULL, - .engine = eng, - .c = c, - }; - - dictAdd(engines, engine_name_sds, ei); - - functionsAddEngineStats(ei); - - setupEngineModuleCtx(ei, NULL); - engineMemoryInfo mem_info = eng->get_memory_info(ei->module_ctx, - eng->engine_ctx); - engine_cache_memory += zmalloc_size(ei) + - sdsAllocSize(ei->name) + - zmalloc_size(eng) + - mem_info.engine_memory_overhead; - - teardownEngineModuleCtx(ei); - - return C_OK; +static void replyEngineStats(scriptingEngine *engine, void *context) { + client *c = (client *)context; + addReplyBulkCString(c, scriptingEngineGetName(engine)); + addReplyMapLen(c, 2); + functionsLibEngineStats *e_stats = + dictFetchValue(curr_functions_lib_ctx->engines_stats, scriptingEngineGetName(engine)); + addReplyBulkCString(c, "libraries_count"); + addReplyLongLong(c, e_stats ? e_stats->n_lib : 0); + addReplyBulkCString(c, "functions_count"); + addReplyLongLong(c, e_stats ? e_stats->n_functions : 0); } -/* Removes a scripting engine from the server. - * - * - engine_name - name of the engine to remove - */ -int functionsUnregisterEngine(const char *engine_name) { - sds engine_name_sds = sdsnew(engine_name); - dictEntry *entry = dictFind(engines, engine_name_sds); - if (entry == NULL) { - serverLog(LL_WARNING, "There's no engine registered with name %s", engine_name); - sdsfree(engine_name_sds); - return C_ERR; - } - - engineInfo *ei = dictGetVal(entry); - +void functionsRemoveLibFromEngine(scriptingEngine *engine) { dictIterator *iter = dictGetSafeIterator(curr_functions_lib_ctx->libraries); + dictEntry *entry = NULL; while ((entry = dictNext(iter))) { functionLibInfo *li = dictGetVal(entry); - if (li->ei == ei) { + if (li->engine == engine) { libraryUnlink(curr_functions_lib_ctx, li); engineLibraryFree(li); } } dictReleaseIterator(iter); - - zfree(ei->engine); - sdsfree(ei->name); - freeClient(ei->c); - if (ei->engineModule != NULL) { - serverAssert(ei->module_ctx != NULL); - zfree(ei->module_ctx); - } - zfree(ei); - - sdsfree(engine_name_sds); - return C_OK; } /* @@ -578,20 +464,8 @@ void functionStatsCommand(client *c) { } addReplyBulkCString(c, "engines"); - addReplyMapLen(c, dictSize(engines)); - dictIterator *iter = dictGetIterator(engines); - dictEntry *entry = NULL; - while ((entry = dictNext(iter))) { - engineInfo *ei = dictGetVal(entry); - addReplyBulkCString(c, ei->name); - addReplyMapLen(c, 2); - functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name); - addReplyBulkCString(c, "libraries_count"); - addReplyLongLong(c, e_stats->n_lib); - addReplyBulkCString(c, "functions_count"); - addReplyLongLong(c, e_stats->n_functions); - } - dictReleaseIterator(iter); + addReplyMapLen(c, scriptingEngineManagerGetNumEngines()); + scriptingEngineManagerForEachEngine(replyEngineStats, c); } static void functionListReplyFlags(client *c, functionInfo *fi) { @@ -667,7 +541,8 @@ void functionListCommand(client *c) { addReplyBulkCString(c, "library_name"); addReplyBulkCBuffer(c, li->name, sdslen(li->name)); addReplyBulkCString(c, "engine"); - addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name)); + sds engine_name = scriptingEngineGetName(li->engine); + addReplyBulkCBuffer(c, engine_name, sdslen(engine_name)); addReplyBulkCString(c, "functions"); addReplyArrayLen(c, dictSize(li->functions)); @@ -747,7 +622,7 @@ static void fcallCommandGeneric(client *c, int ro) { return; } functionInfo *fi = dictGetVal(de); - engine *engine = fi->li->ei->engine; + scriptingEngine *engine = fi->li->engine; long long numkeys; /* Get the number of arguments that are keys */ @@ -764,19 +639,16 @@ static void fcallCommandGeneric(client *c, int ro) { } scriptRunCtx run_ctx; - if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) return; - setupEngineModuleCtx(fi->li->ei, run_ctx.original_client); - - engine->call(fi->li->ei->module_ctx, - engine->engine_ctx, - &run_ctx, - fi->function, - c->argv + 3, - numkeys, - c->argv + 3 + numkeys, - c->argc - 3 - numkeys); - - teardownEngineModuleCtx(fi->li->ei); + if (scriptPrepareForRun(&run_ctx, scriptingEngineGetClient(engine), c, fi->name, fi->f_flags, ro) != C_OK) return; + + scriptingEngineCallFunction(engine, + &run_ctx, + run_ctx.original_client, + fi->function, + c->argv + 3, + numkeys, + c->argv + 3 + numkeys, + c->argc - 3 - numkeys); scriptResetRun(&run_ctx); } @@ -1076,12 +948,10 @@ void functionFreeLibMetaData(functionsLibMetaData *md) { if (md->engine) sdsfree(md->engine); } -static void freeCompiledFunctions(engineInfo *ei, +static void freeCompiledFunctions(scriptingEngine *engine, compiledFunction **compiled_functions, size_t num_compiled_functions, size_t free_function_from_idx) { - setupEngineModuleCtx(ei, NULL); - for (size_t i = 0; i < num_compiled_functions; i++) { compiledFunction *func = compiled_functions[i]; decrRefCount(func->name); @@ -1089,16 +959,12 @@ static void freeCompiledFunctions(engineInfo *ei, decrRefCount(func->desc); } if (i >= free_function_from_idx) { - ei->engine->free_function(ei->module_ctx, - ei->engine->engine_ctx, - func->function); + scriptingEngineCallFreeFunction(engine, func->function); } zfree(func); } zfree(compiled_functions); - - teardownEngineModuleCtx(ei); } /* Compile and save the given library, return the loaded library name on success @@ -1120,12 +986,13 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC goto error; } - engineInfo *ei = dictFetchValue(engines, md.engine); - if (!ei) { + scriptingEngine *engine = scriptingEngineManagerFind(md.engine); + if (!engine) { *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); goto error; } - engine *engine = ei->engine; + + functionsAddEngineStats(md.engine); old_li = dictFetchValue(lib_ctx->libraries, md.name); if (old_li && !replace) { @@ -1138,26 +1005,25 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC libraryUnlink(lib_ctx, old_li); } - new_li = engineLibraryCreate(md.name, ei, code); + new_li = engineLibraryCreate(md.name, engine, code); size_t num_compiled_functions = 0; - char *compile_error = NULL; - setupEngineModuleCtx(ei, NULL); + robj *compile_error = NULL; compiledFunction **compiled_functions = - engine->create(ei->module_ctx, - engine->engine_ctx, - md.code, - timeout, - &num_compiled_functions, - &compile_error); - teardownEngineModuleCtx(ei); + scriptingEngineCallCreateFunctionsLibrary(engine, + md.code, + timeout, + &num_compiled_functions, + &compile_error); if (compiled_functions == NULL) { serverAssert(num_compiled_functions == 0); serverAssert(compile_error != NULL); - *err = sdsnew(compile_error); - zfree(compile_error); + *err = sdsdup(compile_error->ptr); + decrRefCount(compile_error); goto error; } + serverAssert(compile_error == NULL); + for (size_t i = 0; i < num_compiled_functions; i++) { compiledFunction *func = compiled_functions[i]; int ret = functionLibCreateFunction(func->name, @@ -1167,7 +1033,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC func->f_flags, err); if (ret == C_ERR) { - freeCompiledFunctions(ei, + freeCompiledFunctions(engine, compiled_functions, num_compiled_functions, i); @@ -1175,7 +1041,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC } } - freeCompiledFunctions(ei, + freeCompiledFunctions(engine, compiled_functions, num_compiled_functions, num_compiled_functions); @@ -1259,32 +1125,26 @@ void functionLoadCommand(client *c) { addReplyBulkSds(c, library_name); } +static void getEngineUsedMemory(scriptingEngine *engine, void *context) { + size_t *engines_memory = (size_t *)context; + engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine); + *engines_memory += mem_info.used_memory; +} + /* Return memory usage of all the engines combine */ unsigned long functionsMemory(void) { - dictIterator *iter = dictGetIterator(engines); - dictEntry *entry = NULL; size_t engines_memory = 0; - while ((entry = dictNext(iter))) { - engineInfo *ei = dictGetVal(entry); - engine *engine = ei->engine; - setupEngineModuleCtx(ei, NULL); - engineMemoryInfo mem_info = engine->get_memory_info(ei->module_ctx, - engine->engine_ctx); - engines_memory += mem_info.used_memory; - teardownEngineModuleCtx(ei); - } - dictReleaseIterator(iter); - + scriptingEngineManagerForEachEngine(getEngineUsedMemory, &engines_memory); return engines_memory; } /* Return memory overhead of all the engines combine */ unsigned long functionsMemoryOverhead(void) { - size_t memory_overhead = dictMemUsage(engines); + size_t memory_overhead = scriptingEngineManagerGetMemoryUsage(); memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); memory_overhead += sizeof(functionsLibCtx); memory_overhead += curr_functions_lib_ctx->cache_memory; - memory_overhead += engine_cache_memory; + memory_overhead += scriptingEngineManagerGetTotalMemoryOverhead(); return memory_overhead; } @@ -1309,8 +1169,6 @@ size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx) { /* Initialize engine data structures. * Should be called once on server initialization */ int functionsInit(void) { - engines = dictCreate(&engineDictType); - curr_functions_lib_ctx = functionsLibCtxCreate(); if (luaEngineInitEngine() != C_OK) { diff --git a/src/functions.h b/src/functions.h index a48ff1b8db..7f6d144365 100644 --- a/src/functions.h +++ b/src/functions.h @@ -49,73 +49,19 @@ */ #include "server.h" +#include "scripting_engine.h" #include "script.h" #include "valkeymodule.h" typedef struct functionLibInfo functionLibInfo; -/* ValkeyModule type aliases for scripting engine structs and types. */ -typedef struct ValkeyModule ValkeyModule; -typedef ValkeyModuleScriptingEngineCtx engineCtx; -typedef ValkeyModuleScriptingEngineFunctionCtx functionCtx; -typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction; -typedef ValkeyModuleScriptingEngineMemoryInfo engineMemoryInfo; -typedef ValkeyModuleScriptingEngineMethods engineMethods; - -typedef struct engine { - /* engine specific context */ - engineCtx *engine_ctx; - - /* Compiles the script code and returns an array of compiled functions - * registered in the script./ - * - * Returns NULL on error and set err to be the error message */ - compiledFunction **(*create)( - ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx, - const char *code, - size_t timeout, - size_t *out_num_compiled_functions, - char **err); - - /* Invoking a function, func_ctx is an opaque object (from engine POV). - * The func_ctx should be used by the engine to interaction with the server, - * such interaction could be running commands, set resp, or set - * replication mode - */ - void (*call)(ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx, - functionCtx *func_ctx, - void *compiled_function, - robj **keys, - size_t nkeys, - robj **args, - size_t nargs); - - /* free the given function */ - void (*free_function)(ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx, - void *compiled_function); - - /* Return memory overhead for a given function, - * such memory is not counted as engine memory but as general - * structs memory that hold different information */ - size_t (*get_function_memory_overhead)(ValkeyModuleCtx *module_ctx, - void *compiled_function); - - /* Get the current used memory by the engine */ - engineMemoryInfo (*get_memory_info)(ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx); - -} engine; - /* Hold information about an engine. * Used on rdb.c so it must be declared here. */ typedef struct engineInfo { sds name; /* Name of the engine */ ValkeyModule *engineModule; /* the module that implements the scripting engine */ ValkeyModuleCtx *module_ctx; /* Scripting engine module context */ - engine *engine; /* engine callbacks that allows to interact with the engine */ + scriptingEngine *engine; /* engine callbacks that allows to interact with the engine */ client *c; /* Client that is used to run commands */ } engineInfo; @@ -133,18 +79,12 @@ typedef struct functionInfo { /* Hold information about the specific library. * Used on rdb.c so it must be declared here. */ struct functionLibInfo { - sds name; /* Library name */ - dict *functions; /* Functions dictionary */ - engineInfo *ei; /* Pointer to the function engine */ - sds code; /* Library code */ + sds name; /* Library name */ + dict *functions; /* Functions dictionary */ + scriptingEngine *engine; /* Pointer to the scripting engine */ + sds code; /* Library code */ }; -int functionsRegisterEngine(const char *engine_name, - ValkeyModule *engine_module, - void *engine_ctx, - engineMethods *engine_methods); -int functionsUnregisterEngine(const char *engine_name); - sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibCtx *lib_ctx, size_t timeout); unsigned long functionsMemory(void); unsigned long functionsMemoryOverhead(void); @@ -159,6 +99,8 @@ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx); void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)); void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async); +void functionsRemoveLibFromEngine(scriptingEngine *engine); + int luaEngineInitEngine(void); int functionsInit(void); diff --git a/src/lazyfree.c b/src/lazyfree.c index c22d3da964..3b061ccd84 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -123,9 +123,9 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) { } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = obj->ptr; return zs->zsl->length; - } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) { - dict *ht = obj->ptr; - return dictSize(ht); + } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HASHTABLE) { + hashtable *ht = obj->ptr; + return hashtableSize(ht); } else if (obj->type == OBJ_STREAM) { size_t effort = 0; stream *s = obj->ptr; diff --git a/src/module.c b/src/module.c index 853eae091d..d03be51b87 100644 --- a/src/module.c +++ b/src/module.c @@ -62,8 +62,8 @@ #include "crc16_slottable.h" #include "valkeymodule.h" #include "io_threads.h" -#include "functions.h" #include "module.h" +#include "scripting_engine.h" #include #include #include @@ -11092,25 +11092,6 @@ typedef struct { ValkeyModuleScanKeyCB fn; } ScanKeyCBData; -static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) { - ScanKeyCBData *data = privdata; - sds key = dictGetKey(de); - robj *o = data->key->value; - robj *field = createStringObject(key, sdslen(key)); - robj *value = NULL; - - if (o->type == OBJ_HASH) { - sds val = dictGetVal(de); - value = createStringObject(val, sdslen(val)); - } else { - serverPanic("unexpected object type"); - } - - data->fn(data->key, field, value, data->user_data); - decrRefCount(field); - if (value) decrRefCount(value); -} - static void moduleScanKeyHashtableCallback(void *privdata, void *entry) { ScanKeyCBData *data = privdata; robj *o = data->key->value; @@ -11124,6 +11105,10 @@ static void moduleScanKeyHashtableCallback(void *privdata, void *entry) { zskiplistNode *node = (zskiplistNode *)entry; key = node->ele; value = createStringObjectFromLongDouble(node->score, 0); + } else if (o->type == OBJ_HASH) { + key = hashTypeEntryGetField(entry); + sds val = hashTypeEntryGetValue(entry); + value = createStringObject(val, sdslen(val)); } else { serverPanic("unexpected object type"); } @@ -11187,13 +11172,12 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul errno = EINVAL; return 0; } - dict *d = NULL; hashtable *ht = NULL; robj *o = key->value; if (o->type == OBJ_SET) { if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr; } else if (o->type == OBJ_HASH) { - if (o->encoding == OBJ_ENCODING_HT) d = o->ptr; + if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr; } else if (o->type == OBJ_ZSET) { if (o->encoding == OBJ_ENCODING_SKIPLIST) ht = ((zset *)o->ptr)->ht; } else { @@ -11205,14 +11189,7 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul return 0; } int ret = 1; - if (d) { - ScanKeyCBData data = {key, privdata, fn}; - cursor->cursor = dictScan(d, cursor->cursor, moduleScanKeyDictCallback, &data); - if (cursor->cursor == 0) { - cursor->done = 1; - ret = 0; - } - } else if (ht) { + if (ht) { ScanKeyCBData data = {key, privdata, fn}; cursor->cursor = hashtableScan(ht, cursor->cursor, moduleScanKeyHashtableCallback, &data); if (cursor->cursor == 0) { @@ -13190,10 +13167,10 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, return VALKEYMODULE_ERR; } - if (functionsRegisterEngine(engine_name, - module_ctx->module, - engine_ctx, - engine_methods) != C_OK) { + if (scriptingEngineManagerRegister(engine_name, + module_ctx->module, + engine_ctx, + engine_methods) != C_OK) { return VALKEYMODULE_ERR; } @@ -13209,7 +13186,9 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, */ int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name) { UNUSED(ctx); - functionsUnregisterEngine(engine_name); + if (scriptingEngineManagerUnregister(engine_name) != C_OK) { + return VALKEYMODULE_ERR; + } return VALKEYMODULE_OK; } diff --git a/src/networking.c b/src/networking.c index 69db342ea8..093d579ef4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -31,6 +31,7 @@ #include "cluster.h" #include "cluster_slot_stats.h" #include "script.h" +#include "intset.h" #include "sds.h" #include "fpconv_dtoa.h" #include "fmtargs.h" @@ -43,10 +44,35 @@ #include #include +/* This struct is used to encapsulate filtering criteria for operations on clients + * such as identifying specific clients to kill or retrieve. Each field in the struct + * represents a filter that can be applied based on specific attributes of a client. */ +typedef struct { + /* A set of client IDs to filter. If NULL, no ID filtering is applied. */ + intset *ids; + /* Maximum age (in seconds) of a client connection for filtering. + * Connections younger than this value will not match. + * A value of 0 means no age filtering. */ + long long max_age; + /* Address/port of the client. If NULL, no address filtering is applied. */ + char *addr; + /* Remote address/port of the client. If NULL, no address filtering is applied. */ + char *laddr; + /* Filtering clients by authentication user. If NULL, no user-based filtering is applied. */ + user *user; + /* Client type to filter. If set to -1, no type filtering is applied. */ + int type; + /* Boolean flag to determine if the current client (`me`) should be filtered. 1 means "skip me", 0 means otherwise. */ + int skipme; +} clientFilter; + static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); +static int parseClientFiltersOrReply(client *c, int index, clientFilter *filter); +static int clientMatchesFilter(client *client, clientFilter client_filter); +static sds getAllFilteredClientsInfoString(clientFilter *client_filter, int hide_user_data); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; @@ -2451,6 +2477,7 @@ int handleClientsWithPendingWrites(void) { /* resetClient prepare the client to process the next command */ void resetClient(client *c) { serverCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; + serverCommandProc *prevParentCmd = c->cmd && c->cmd->parent ? c->cmd->parent->proc : NULL; freeClientArgv(c); freeClientOriginalArgv(c); @@ -2480,7 +2507,7 @@ void resetClient(client *c) { /* We do the same for the CACHING command as well. It also affects * the next command or transaction executed, in a way very similar * to ASKING. */ - if (!c->flag.multi && prevcmd != clientCommand) c->flag.tracking_caching = 0; + if (!c->flag.multi && prevParentCmd != clientCommand) c->flag.tracking_caching = 0; /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply * to the next command will be sent, but set the flag if the command @@ -3354,6 +3381,22 @@ sds getAllClientsInfoString(int type, int hide_user_data) { return o; } +static sds getAllFilteredClientsInfoString(clientFilter *client_filter, int hide_user_data) { + listNode *ln; + listIter li; + client *client; + sds o = sdsempty(); + sdsclear(o); + listRewind(server.clients, &li); + while ((ln = listNext(&li)) != NULL) { + client = listNodeValue(ln); + if (!clientMatchesFilter(client, *client_filter)) continue; + o = catClientInfoString(o, client, hide_user_data); + o = sdscatlen(o, "\n", 1); + } + return o; +} + /* Check validity of an attribute that's gonna be shown in CLIENT LIST. */ int validateClientAttr(const char *val) { /* Check if the charset is ok. We need to do this otherwise @@ -3473,570 +3516,648 @@ void quitCommand(client *c) { c->flag.close_after_reply = 1; } -void clientCommand(client *c) { - listNode *ln; - listIter li; +static int parseClientFiltersOrReply(client *c, int index, clientFilter *filter) { + while (index < c->argc) { + int moreargs = c->argc > index + 1; - if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) { - const char *help[] = { - "CACHING (YES|NO)", - " Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.", - "CAPA