From ac096a19efcfa038914fe3d6e2ee1dc542a8af14 Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Wed, 8 Jan 2025 10:28:54 +0200 Subject: [PATCH] client struct: lazy init components and optimize struct layout (#1405) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Refactor client structure to use modular data components ## Current State The client structure allocates memory for replication / pubsub / multi-keys / module / blocked data for every client, despite these features being used by only a small subset of clients. In addition the current field layout in the client struct is suboptimal, with poor alignment and unnecessary padding between fields, leading to a larger than necessary memory footprint of 896 bytes per client. Furthermore, fields that are frequently accessed together during operations are scattered throughout the struct, resulting in poor cache locality. ## This PR's Change 1. Lazy Initialization - **Components are only allocated when first used:** - PubSubData: Created on first SUBSCRIBE/PUBLISH operation - ReplicationData: Initialized only for replica connections - ModuleData: Allocated when module interaction begins - BlockingState: Created when first blocking command is issued - MultiState: Initialized on MULTI command 2. Memory Layout Optimization: - Grouped related fields for better locality - Moved rarely accessed fields (e.g., client->name) to struct end - Optimized field alignment to eliminate padding 3. Additional changes: - Moved watched_keys to be static allocated in the `mstate` struct - Relocated replication init logic to replication.c ### Key Benefits - **Efficient Memory Usage:** - 45% smaller base client structure - Basic clients now use 528 bytes (down from 896). - Better memory locality for related operations - Performance improvement in high throughput scenarios. No performance regressions in other cases. ### Performance Impact Tested with 650 clients and 512 bytes values. #### Single Thread Performance | Operation | Dataset | New (ops/sec) | Old (ops/sec) | Change % | |------------|---------|---------------|---------------|-----------| | SET | 1 key | 261,799 | 258,261 | +1.37% | | SET | 3M keys | 209,134 | ~209,000 | ~0% | | GET | 1 key | 281,564 | 277,965 | +1.29% | | GET | 3M keys | 231,158 | 228,410 | +1.20% | #### 8 IO Threads Performance | Operation | Dataset | New (ops/sec) | Old (ops/sec) | Change % | |------------|---------|---------------|---------------|-----------| | SET | 1 key | 1,331,578 | 1,331,626 | -0.00% | | SET | 3M keys | 1,254,441 | 1,152,645 | +8.83% | | GET | 1 key | 1,293,149 | 1,289,503 | +0.28% | | GET | 3M keys | 1,152,898 | 1,101,791 | +4.64% | #### Pipeline Performance (3M keys) | Operation | Pipeline Size | New (ops/sec) | Old (ops/sec) | Change % | |-----------|--------------|---------------|---------------|-----------| | SET | 10 | 548,964 | 538,498 | +1.94% | | SET | 20 | 606,148 | 594,872 | +1.89% | | SET | 30 | 631,122 | 616,606 | +2.35% | | GET | 10 | 628,482 | 624,166 | +0.69% | | GET | 20 | 687,371 | 681,659 | +0.84% | | GET | 30 | 725,855 | 721,102 | +0.66% | ### Observations: 1. Single-threaded operations show consistent improvements (1-1.4%) 2. Multi-threaded performance shows significant gains for large datasets: - SET with 3M keys: +8.83% improvement - GET with 3M keys: +4.64% improvement 3. Pipeline operations show consistent improvements: - SET operations: +1.89% to +2.35% - GET operations: +0.66% to +0.84% 4. No performance regressions observed in any test scenario Related issue:https://github.com/valkey-io/valkey/issues/761 --------- Signed-off-by: Uri Yagelnik Signed-off-by: uriyage <78144248+uriyage@users.noreply.github.com> Co-authored-by: Viktor Söderqvist --- src/acl.c | 6 +- src/aof.c | 3 +- src/blocked.c | 127 ++++++++------ src/cluster.c | 14 +- src/cluster_legacy.c | 2 +- src/module.c | 100 ++++++----- src/module.h | 1 + src/multi.c | 113 ++++++------ src/networking.c | 238 ++++++++----------------- src/pubsub.c | 66 +++++-- src/rdb.c | 8 +- src/replication.c | 405 +++++++++++++++++++++++++------------------ src/script.c | 1 + src/server.c | 65 +++---- src/server.h | 258 ++++++++++++++------------- src/timeout.c | 8 +- src/tracking.c | 23 +-- 17 files changed, 761 insertions(+), 677 deletions(-) diff --git a/src/acl.c b/src/acl.c index 0928c43914..184fa54116 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1960,7 +1960,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) { if (getClientType(c) == CLIENT_TYPE_PUBSUB) { /* Check for pattern violations. */ - dictIterator *di = dictGetIterator(c->pubsub_patterns); + dictIterator *di = dictGetIterator(c->pubsub_data->pubsub_patterns); dictEntry *de; while (!kill && ((de = dictNext(di)) != NULL)) { o = dictGetKey(de); @@ -1972,7 +1972,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) { /* Check for channel violations. */ if (!kill) { /* Check for global channels violation. */ - di = dictGetIterator(c->pubsub_channels); + di = dictGetIterator(c->pubsub_data->pubsub_channels); while (!kill && ((de = dictNext(di)) != NULL)) { o = dictGetKey(de); @@ -1983,7 +1983,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) { } if (!kill) { /* Check for shard channels violation. */ - di = dictGetIterator(c->pubsubshard_channels); + di = dictGetIterator(c->pubsub_data->pubsubshard_channels); while (!kill && ((de = dictNext(di)) != NULL)) { o = dictGetKey(de); int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0); diff --git a/src/aof.c b/src/aof.c index 8ac44f64c2..3629fa1acf 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1382,7 +1382,8 @@ struct client *createAOFClient(void) { /* We set the fake client as a replica waiting for the synchronization * so that the server will not try to send replies to this client. */ - c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; + initClientReplicationData(c); + c->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; return c; } diff --git a/src/blocked.c b/src/blocked.c index 39050932d9..d2d6a5d314 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -75,16 +75,25 @@ static void moduleUnblockClientOnKey(client *c, robj *key); static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key); void initClientBlockingState(client *c) { - c->bstate.btype = BLOCKED_NONE; - c->bstate.timeout = 0; - c->bstate.unblock_on_nokey = 0; - c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType); - c->bstate.numreplicas = 0; - c->bstate.numlocal = 0; - c->bstate.reploffset = 0; - c->bstate.generic_blocked_list_node = NULL; - c->bstate.module_blocked_handle = NULL; - c->bstate.async_rm_call_handle = NULL; + if (c->bstate) return; + c->bstate = zmalloc(sizeof(blockingState)); + c->bstate->btype = BLOCKED_NONE; + c->bstate->timeout = 0; + c->bstate->unblock_on_nokey = 0; + c->bstate->keys = dictCreate(&objectKeyHeapPointerValueDictType); + c->bstate->numreplicas = 0; + c->bstate->numlocal = 0; + c->bstate->reploffset = 0; + c->bstate->generic_blocked_list_node = NULL; + c->bstate->module_blocked_handle = NULL; + c->bstate->async_rm_call_handle = NULL; +} + +void freeClientBlockingState(client *c) { + if (!c->bstate) return; + dictRelease(c->bstate->keys); + zfree(c->bstate); + c->bstate = NULL; } /* Block a client for the specific operation type. Once the CLIENT_BLOCKED @@ -94,8 +103,10 @@ void blockClient(client *c, int btype) { /* Primary client should never be blocked unless pause or module */ serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE)); + initClientBlockingState(c); + c->flag.blocked = 1; - c->bstate.btype = btype; + c->bstate->btype = btype; if (!c->flag.module) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */ server.blocked_clients_by_type[btype]++; @@ -199,18 +210,18 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c, int queue_for_reprocessing) { - if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { + if (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || c->bstate->btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->bstate.btype == BLOCKED_WAIT) { + } else if (c->bstate->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); - } else if (c->bstate.btype == BLOCKED_MODULE) { + } else if (c->bstate->btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); - } else if (c->bstate.btype == BLOCKED_POSTPONE) { - serverAssert(c->bstate.postponed_list_node); - listDelNode(server.postponed_clients, c->bstate.postponed_list_node); - c->bstate.postponed_list_node = NULL; - } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { + } else if (c->bstate->btype == BLOCKED_POSTPONE) { + serverAssert(c->bstate->postponed_list_node); + listDelNode(server.postponed_clients, c->bstate->postponed_list_node); + c->bstate->postponed_list_node = NULL; + } else if (c->bstate->btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ } else { serverPanic("Unknown btype in unblockClient()."); @@ -218,7 +229,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { /* Reset the client for a new query, unless the client has pending command to process * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ - if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) { + if (!c->flag.pending_command && c->bstate->btype != BLOCKED_SHUTDOWN) { /* Clients that are not blocked on keys are not reprocessed so we must * call reqresAppendResponse here (for clients blocked on key, * unblockClientOnKey is called, which eventually calls processCommand, @@ -229,12 +240,12 @@ void unblockClient(client *c, int queue_for_reprocessing) { /* We count blocked client stats on regular clients and not on module clients */ if (!c->flag.module) server.blocked_clients--; - server.blocked_clients_by_type[c->bstate.btype]--; + server.blocked_clients_by_type[c->bstate->btype]--; /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ c->flag.blocked = 0; - c->bstate.btype = BLOCKED_NONE; - c->bstate.unblock_on_nokey = 0; + c->bstate->btype = BLOCKED_NONE; + c->bstate->unblock_on_nokey = 0; removeClientFromTimeoutTable(c); if (queue_for_reprocessing) queueClientForReprocessing(c); } @@ -243,22 +254,22 @@ void unblockClient(client *c, int queue_for_reprocessing) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { + if (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || c->bstate->btype == BLOCKED_STREAM) { addReplyNullArray(c); updateStatsOnUnblock(c, 0, 0, 0); - } else if (c->bstate.btype == BLOCKED_WAIT) { + } else if (c->bstate->btype == BLOCKED_WAIT) { if (c->cmd->proc == waitCommand) { - addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset)); + addReplyLongLong(c, replicationCountAcksByOffset(c->bstate->reploffset)); } else if (c->cmd->proc == waitaofCommand) { addReplyArrayLen(c, 2); - addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset); - addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset)); + addReplyLongLong(c, server.fsynced_reploff >= c->bstate->reploffset); + addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate->reploffset)); } else if (c->cmd->proc == clusterCommand) { addReplyErrorObject(c, shared.noreplicaserr); } else { serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name); } - } else if (c->bstate.btype == BLOCKED_MODULE) { + } else if (c->bstate->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c, 0); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); @@ -274,7 +285,7 @@ void replyToClientsBlockedOnShutdown(void) { listRewind(server.clients, &li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); - if (c->flag.blocked && c->bstate.btype == BLOCKED_SHUTDOWN) { + if (c->flag.blocked && c->bstate->btype == BLOCKED_SHUTDOWN) { addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); unblockClient(c, 1); } @@ -301,7 +312,7 @@ void disconnectAllBlockedClients(void) { * command processing will start from scratch, and the command will * be either executed or rejected. (unlike LIST blocked clients for * which the command is already in progress in a way. */ - if (c->bstate.btype == BLOCKED_POSTPONE) continue; + if (c->bstate->btype == BLOCKED_POSTPONE) continue; unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> replica?)"); @@ -386,15 +397,17 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo list *l; int j; + initClientBlockingState(c); + if (!c->flag.reprocessing_command) { /* If the client is re-processing the command, we do not set the timeout * because we need to retain the client's original timeout. */ - c->bstate.timeout = timeout; + c->bstate->timeout = timeout; } for (j = 0; j < numkeys; j++) { /* If the key already exists in the dictionary ignore it. */ - if (!(client_blocked_entry = dictAddRaw(c->bstate.keys, keys[j], NULL))) { + if (!(client_blocked_entry = dictAddRaw(c->bstate->keys, keys[j], NULL))) { continue; } incrRefCount(keys[j]); @@ -411,7 +424,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo l = dictGetVal(db_blocked_existing_entry); } listAddNodeTail(l, c); - dictSetVal(c->bstate.keys, client_blocked_entry, listLast(l)); + dictSetVal(c->bstate->keys, client_blocked_entry, listLast(l)); /* We need to add the key to blocking_keys_unblock_on_nokey, if the client * wants to be awakened if key is deleted (like XREADGROUP) */ @@ -425,7 +438,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo } } } - c->bstate.unblock_on_nokey = unblock_on_nokey; + c->bstate->unblock_on_nokey = unblock_on_nokey; /* Currently we assume key blocking will require reprocessing the command. * However in case of modules, they have a different way to handle the reprocessing * which does not require setting the pending command flag */ @@ -439,15 +452,15 @@ static void unblockClientWaitingData(client *c) { dictEntry *de; dictIterator *di; - if (dictSize(c->bstate.keys) == 0) return; + if (dictSize(c->bstate->keys) == 0) return; - di = dictGetIterator(c->bstate.keys); + di = dictGetIterator(c->bstate->keys); /* The client may wait for multiple keys, so unblock it for every key. */ while ((de = dictNext(di)) != NULL) { releaseBlockedEntry(c, de, 0); } dictReleaseIterator(di); - dictEmpty(c->bstate.keys, NULL); + dictEmpty(c->bstate->keys, NULL); } static blocking_type getBlockedTypeByType(int type) { @@ -546,7 +559,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) { if (listLength(l) == 0) { dictDelete(c->db->blocking_keys, key); dictDelete(c->db->blocking_keys_unblock_on_nokey, key); - } else if (c->bstate.unblock_on_nokey) { + } else if (c->bstate->unblock_on_nokey) { unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey, key); /* it is not possible to have a client blocked on nokey with no matching entry */ serverAssertWithInfo(c, key, unblock_on_nokey_entry != NULL); @@ -555,7 +568,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) { dictDelete(c->db->blocking_keys_unblock_on_nokey, key); } } - if (remove_key) dictDelete(c->bstate.keys, key); + if (remove_key) dictDelete(c->bstate->keys, key); } void signalKeyAsReady(serverDb *db, robj *key, int type) { @@ -593,9 +606,9 @@ static void handleClientsBlockedOnKey(readyList *rl) { * module is trying to accomplish right now. * 3. In case of XREADGROUP call we will want to unblock on any change in object type * or in case the key was deleted, since the group is no longer valid. */ - if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) || - (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || (receiver->bstate.unblock_on_nokey)) { - if (receiver->bstate.btype != BLOCKED_MODULE) + if ((o != NULL && (receiver->bstate->btype == getBlockedTypeByType(o->type))) || + (o != NULL && (receiver->bstate->btype == BLOCKED_MODULE)) || (receiver->bstate->unblock_on_nokey)) { + if (receiver->bstate->btype != BLOCKED_MODULE) unblockClientOnKey(receiver, rl->key); else moduleUnblockClientOnKey(receiver, rl->key); @@ -606,16 +619,17 @@ static void handleClientsBlockedOnKey(readyList *rl) { /* block a client for replica acknowledgement */ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) { - c->bstate.timeout = timeout; - c->bstate.reploffset = offset; - c->bstate.numreplicas = numreplicas; - c->bstate.numlocal = numlocal; + initClientBlockingState(c); + c->bstate->timeout = timeout; + c->bstate->reploffset = offset; + c->bstate->numreplicas = numreplicas; + c->bstate->numlocal = numlocal; listAddNodeHead(server.clients_waiting_acks, c); /* Note that we remember the linked list node where the client is stored, * this way removing the client in unblockClientWaitingReplicas() will not * require a linear scan, but just a constant time operation. */ - serverAssert(c->bstate.client_waiting_acks_list_node == NULL); - c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks); + serverAssert(c->bstate->client_waiting_acks_list_node == NULL); + c->bstate->client_waiting_acks_list_node = listFirst(server.clients_waiting_acks); blockClient(c, BLOCKED_WAIT); } @@ -623,11 +637,12 @@ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, lon * requesting to avoid processing clients commands which will be processed later * when the it is ready to accept them. */ void blockPostponeClient(client *c) { - c->bstate.timeout = 0; + initClientBlockingState(c); + c->bstate->timeout = 0; blockClient(c, BLOCKED_POSTPONE); listAddNodeTail(server.postponed_clients, c); - serverAssert(c->bstate.postponed_list_node == NULL); - c->bstate.postponed_list_node = listLast(server.postponed_clients); + serverAssert(c->bstate->postponed_list_node == NULL); + c->bstate->postponed_list_node = listLast(server.postponed_clients); /* Mark this client to execute its command */ c->flag.pending_command = 1; } @@ -644,13 +659,13 @@ void blockClientShutdown(client *c) { static void unblockClientOnKey(client *c, robj *key) { dictEntry *de; - de = dictFind(c->bstate.keys, key); + de = dictFind(c->bstate->keys, key); releaseBlockedEntry(c, de, 1); /* Only in case of blocking API calls, we might be blocked on several keys. however we should force unblock the entire blocking keys */ - serverAssert(c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_LIST || - c->bstate.btype == BLOCKED_ZSET); + serverAssert(c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_LIST || + c->bstate->btype == BLOCKED_ZSET); /* We need to unblock the client before calling processCommandAndResetClient * because it checks the CLIENT_BLOCKED flag */ @@ -712,7 +727,7 @@ static void moduleUnblockClientOnKey(client *c, robj *key) { * command with timeout reply. */ void unblockClientOnTimeout(client *c) { /* The client has been unlocked (in the moduleUnblocked list), return ASAP. */ - if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return; + if (c->bstate->btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return; replyToBlockedClientTimedOut(c); if (c->flag.pending_command) c->flag.pending_command = 0; diff --git a/src/cluster.c b/src/cluster.c index 39d9161b9c..309279e0be 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1006,7 +1006,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int /* If CLIENT_MULTI flag is not set EXEC is just going to return an * error. */ if (!c->flag.multi) return myself; - ms = &c->mstate; + ms = c->mstate; } else { /* In order to have a single codepath create a fake Multi State * structure if the client is not in MULTI/EXEC state, this way @@ -1023,7 +1023,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ int pubsubshard_included = - (cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB)); + (cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_PUBSUB)); /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ @@ -1176,7 +1176,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int * node is a replica and the request is about a hash slot our primary * is serving, we can reply without redirection. */ int is_write_command = - (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); + (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE)); if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) && clusterNodeGetPrimary(myself) == n) { return myself; @@ -1233,14 +1233,14 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { clusterNode *myself = getMyClusterNode(); - if (c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || - c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) { + if (c->flag.blocked && (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET || + c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_MODULE)) { dictEntry *de; dictIterator *di; /* If the client is blocked on module, but not on a specific key, * don't unblock it. */ - if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0; + if (c->bstate->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0; /* If the cluster is down, unblock the client with the right error. * If the cluster is configured to allow reads on cluster down, we @@ -1252,7 +1252,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { } /* All keys must belong to the same slot, so check first key only. */ - di = dictGetIterator(c->bstate.keys); + di = dictGetIterator(c->bstate->keys); if ((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); int slot = keyHashSlot((char *)key->ptr, sdslen(key->ptr)); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b59c30126a..0777d6d8c6 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6574,7 +6574,7 @@ void clusterCommandSetSlot(client *c) { * replication, it would also unlikely win the election. * * And 0x702ff is 7.2.255, we only support new versions in this case. */ - if (r->repl_state == REPLICA_STATE_ONLINE && r->replica_version > 0x702ff) { + if (r->repl_data->repl_state == REPLICA_STATE_ONLINE && r->repl_data->replica_version > 0x702ff) { num_eligible_replicas++; } } diff --git a/src/module.c b/src/module.c index dabea59d49..7388dc6a20 100644 --- a/src/module.c +++ b/src/module.c @@ -651,6 +651,19 @@ void *VM_PoolAlloc(ValkeyModuleCtx *ctx, size_t bytes) { * Helpers for modules API implementation * -------------------------------------------------------------------------- */ +static void initClientModuleData(client *c) { + if (c->module_data) return; + c->module_data = zcalloc(sizeof(ClientModuleData)); +} + +void freeClientModuleData(client *c) { + if (!c->module_data) return; + /* Free the ValkeyModuleBlockedClient held onto for reprocessing if not already freed. */ + zfree(c->module_data->module_blocked_client); + zfree(c->module_data); + c->module_data = NULL; +} + void moduleEnqueueLoadModule(sds path, sds *argv, int argc) { int i; struct moduleLoadQueueEntry *loadmod; @@ -721,11 +734,11 @@ void moduleReleaseTempClient(client *c) { c->flag.fake = 1; c->user = NULL; /* Root user */ c->cmd = c->lastcmd = c->realcmd = c->io_parsed_cmd = NULL; - if (c->bstate.async_rm_call_handle) { - ValkeyModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; + if (c->bstate && c->bstate->async_rm_call_handle) { + ValkeyModuleAsyncRMCallPromise *promise = c->bstate->async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ freeValkeyModuleAsyncRMCallPromise(promise); - c->bstate.async_rm_call_handle = NULL; + c->bstate->async_rm_call_handle = NULL; } moduleTempClients[moduleTempClientCount++] = c; } @@ -897,7 +910,7 @@ static CallReply *moduleParseReply(client *c, ValkeyModuleCtx *ctx) { void moduleCallCommandUnblockedHandler(client *c) { ValkeyModuleCtx ctx; - ValkeyModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; + ValkeyModuleAsyncRMCallPromise *promise = c->bstate->async_rm_call_handle; serverAssert(promise); ValkeyModule *module = promise->module; if (!promise->on_unblocked) { @@ -6569,7 +6582,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const .ctx = (ctx->flags & VALKEYMODULE_CTX_AUTO_MEMORY) ? ctx : NULL, }; reply = callReplyCreatePromise(promise); - c->bstate.async_rm_call_handle = promise; + c->bstate->async_rm_call_handle = promise; if (!(call_flags & CMD_CALL_PROPAGATE_AOF)) { /* No need for AOF propagation, set the relevant flags of the client */ c->flag.module_prevent_aof_prop = 1; @@ -7679,7 +7692,7 @@ void VM_LatencyAddSample(const char *event, mstime_t latency) { /* Returns 1 if the client already in the moduleUnblocked list, 0 otherwise. */ int isModuleClientUnblocked(client *c) { - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; return bc->unblocked == 1; } @@ -7697,7 +7710,7 @@ int isModuleClientUnblocked(client *c) { * The structure ValkeyModuleBlockedClient will be always deallocated when * running the list of clients blocked by a module that need to be unblocked. */ void unblockClientFromModule(client *c) { - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; /* Call the disconnection callback if any. Note that * bc->disconnect_callback is set to NULL if the client gets disconnected @@ -7765,9 +7778,10 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, client *c = ctx->client; int islua = scriptIsRunning(); int ismulti = server.in_exec; + initClientBlockingState(c); - c->bstate.module_blocked_handle = zmalloc(sizeof(ValkeyModuleBlockedClient)); - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + c->bstate->module_blocked_handle = zmalloc(sizeof(ValkeyModuleBlockedClient)); + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; ctx->module->blocked_clients++; /* We need to handle the invalid operation of calling modules blocking @@ -7795,7 +7809,7 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, if (timeout_ms) { mstime_t now = mstime(); if (timeout_ms > LLONG_MAX - now) { - c->bstate.module_blocked_handle = NULL; + c->bstate->module_blocked_handle = NULL; addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */ return bc; } @@ -7803,20 +7817,20 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, } if (islua || ismulti) { - c->bstate.module_blocked_handle = NULL; + c->bstate->module_blocked_handle = NULL; addReplyError(c, islua ? "Blocking module command called from Lua script" : "Blocking module command called from transaction"); } else if (ctx->flags & VALKEYMODULE_CTX_BLOCKED_REPLY) { - c->bstate.module_blocked_handle = NULL; + c->bstate->module_blocked_handle = NULL; addReplyError(c, "Blocking module command called from a Reply callback context"); } else if (!auth_reply_callback && clientHasModuleAuthInProgress(c)) { - c->bstate.module_blocked_handle = NULL; + c->bstate->module_blocked_handle = NULL; addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth"); } else { if (keys) { blockForKeys(c, BLOCKED_MODULE, keys, numkeys, timeout, flags & VALKEYMODULE_BLOCK_UNBLOCK_DELETED); } else { - c->bstate.timeout = timeout; + c->bstate->timeout = timeout; blockClient(c, BLOCKED_MODULE); } } @@ -7912,7 +7926,7 @@ void moduleUnregisterAuthCBs(ValkeyModule *module) { /* Search for & attempt next module auth callback after skipping the ones already attempted. * Returns the result of the module auth callback. */ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) { - int handle_next_callback = c->module_auth_ctx == NULL; + int handle_next_callback = (!c->module_data || c->module_data->module_auth_ctx == NULL); ValkeyModuleAuthCtx *cur_auth_ctx = NULL; listNode *ln; listIter li; @@ -7922,7 +7936,7 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) { cur_auth_ctx = listNodeValue(ln); /* Skip over the previously attempted auth contexts. */ if (!handle_next_callback) { - handle_next_callback = cur_auth_ctx == c->module_auth_ctx; + handle_next_callback = cur_auth_ctx == c->module_data->module_auth_ctx; continue; } /* Remove the module auth complete flag before we attempt the next cb. */ @@ -7931,7 +7945,8 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) { moduleCreateContext(&ctx, cur_auth_ctx->module, VALKEYMODULE_CTX_NONE); ctx.client = c; *err = NULL; - c->module_auth_ctx = cur_auth_ctx; + initClientModuleData(c); + c->module_data->module_auth_ctx = cur_auth_ctx; result = cur_auth_ctx->auth_cb(&ctx, username, password, err); moduleFreeContext(&ctx); if (result == VALKEYMODULE_AUTH_HANDLED) break; @@ -7947,8 +7962,8 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) { * return the result of the reply callback. */ int attemptBlockedAuthReplyCallback(client *c, robj *username, robj *password, robj **err) { int result = VALKEYMODULE_AUTH_NOT_HANDLED; - if (!c->module_blocked_client) return result; - ValkeyModuleBlockedClient *bc = (ValkeyModuleBlockedClient *)c->module_blocked_client; + if (!c->module_data || !c->module_data->module_blocked_client) return result; + ValkeyModuleBlockedClient *bc = (ValkeyModuleBlockedClient *)c->module_data->module_blocked_client; bc->client = c; if (bc->auth_reply_cb) { ValkeyModuleCtx ctx; @@ -7961,7 +7976,7 @@ int attemptBlockedAuthReplyCallback(client *c, robj *username, robj *password, r moduleFreeContext(&ctx); } moduleInvokeFreePrivDataCallback(c, bc); - c->module_blocked_client = NULL; + c->module_data->module_blocked_client = NULL; c->lastcmd->microseconds += bc->background_duration; bc->module->blocked_clients--; zfree(bc); @@ -7989,7 +8004,7 @@ int checkModuleAuthentication(client *c, robj *username, robj *password, robj ** serverAssert(result == VALKEYMODULE_AUTH_HANDLED); return AUTH_BLOCKED; } - c->module_auth_ctx = NULL; + if (c->module_data) c->module_data->module_auth_ctx = NULL; if (result == VALKEYMODULE_AUTH_NOT_HANDLED) { c->flag.module_auth_has_result = 0; return AUTH_NOT_HANDLED; @@ -8011,7 +8026,7 @@ int checkModuleAuthentication(client *c, robj *username, robj *password, robj ** * This function returns 1 if client was served (and should be unblocked) */ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including VM_UnblockClient() @@ -8223,14 +8238,14 @@ int moduleUnblockClientByHandle(ValkeyModuleBlockedClient *bc, void *privdata) { /* This API is used by the server core to unblock a client that was blocked * by a module. */ void moduleUnblockClient(client *c) { - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; moduleUnblockClientByHandle(bc, NULL); } /* Return true if the client 'c' was blocked by a module using * VM_BlockClientOnKeys(). */ int moduleClientIsBlockedOnKeys(client *c) { - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; return bc->blocked_on_keys; } @@ -8340,7 +8355,7 @@ void moduleHandleBlockedClients(void) { /* Hold onto the blocked client if module auth is in progress. The reply callback is invoked * when the client is reprocessed. */ if (c && clientHasModuleAuthInProgress(c)) { - c->module_blocked_client = bc; + c->module_data->module_blocked_client = bc; } else { /* Free privdata if any. */ moduleInvokeFreePrivDataCallback(c, bc); @@ -8402,9 +8417,9 @@ void moduleHandleBlockedClients(void) { * moduleBlockedClientTimedOut(). */ int moduleBlockedClientMayTimeout(client *c) { - if (c->bstate.btype != BLOCKED_MODULE) return 1; + if (c->bstate->btype != BLOCKED_MODULE) return 1; - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; return (bc && bc->timeout_callback != NULL); } @@ -8420,7 +8435,7 @@ int moduleBlockedClientMayTimeout(client *c) { * of the client synchronously. This ensures that we can reply to the client before * resetClient() is called. */ void moduleBlockedClientTimedOut(client *c, int from_module) { - ValkeyModuleBlockedClient *bc = c->bstate.module_blocked_handle; + ValkeyModuleBlockedClient *bc = c->bstate->module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including VM_UnblockClient() @@ -9559,16 +9574,16 @@ static void eventLoopHandleOneShotEvents(void) { * A client's user can be changed through the AUTH command, module * authentication, and when a client is freed. */ void moduleNotifyUserChanged(client *c) { - if (c->auth_callback) { - c->auth_callback(c->id, c->auth_callback_privdata); + if (!c->module_data || !c->module_data->auth_callback) return; - /* The callback will fire exactly once, even if the user remains - * the same. It is expected to completely clean up the state - * so all references are cleared here. */ - c->auth_callback = NULL; - c->auth_callback_privdata = NULL; - c->auth_module = NULL; - } + c->module_data->auth_callback(c->id, c->module_data->auth_callback_privdata); + + /* The callback will fire exactly once, even if the user remains + * the same. It is expected to completely clean up the state + * so all references are cleared here. */ + c->module_data->auth_callback = NULL; + c->module_data->auth_callback_privdata = NULL; + c->module_data->auth_module = NULL; } void revokeClientAuthentication(client *c) { @@ -9599,9 +9614,9 @@ static void moduleFreeAuthenticatedClients(ValkeyModule *module) { listRewind(server.clients, &li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); - if (!c->auth_module) continue; + if (!c->module_data || !c->module_data->auth_module) continue; - ValkeyModule *auth_module = (ValkeyModule *)c->auth_module; + ValkeyModule *auth_module = (ValkeyModule *)c->module_data->auth_module; if (auth_module == module) { revokeClientAuthentication(c); } @@ -9909,9 +9924,10 @@ static int authenticateClientWithUser(ValkeyModuleCtx *ctx, } if (callback) { - ctx->client->auth_callback = callback; - ctx->client->auth_callback_privdata = privdata; - ctx->client->auth_module = ctx->module; + initClientModuleData(ctx->client); + ctx->client->module_data->auth_callback = callback; + ctx->client->module_data->auth_callback_privdata = privdata; + ctx->client->module_data->auth_module = ctx->module; } if (client_id) { diff --git a/src/module.h b/src/module.h index 78d9341ca9..f4e4de67eb 100644 --- a/src/module.h +++ b/src/module.h @@ -228,5 +228,6 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime end void moduleDefragGlobals(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct serverCommand *cmd); +void freeClientModuleData(client *c); #endif /* _MODULE_H_ */ diff --git a/src/multi.c b/src/multi.c index 9e1f019244..9e3aad9d3c 100644 --- a/src/multi.c +++ b/src/multi.c @@ -33,33 +33,42 @@ /* Client state initialization for MULTI/EXEC */ void initClientMultiState(client *c) { - c->mstate.commands = NULL; - c->mstate.count = 0; - c->mstate.cmd_flags = 0; - c->mstate.cmd_inv_flags = 0; - c->mstate.argv_len_sums = 0; - c->mstate.alloc_count = 0; + if (c->mstate) return; + c->mstate = zcalloc(sizeof(multiState)); } -/* Release all the resources associated with MULTI/EXEC state */ -void freeClientMultiState(client *c) { - int j; - - for (j = 0; j < c->mstate.count; j++) { +void freeClientMultiStateCmds(client *c) { + for (int j = 0; j < c->mstate->count; j++) { int i; - multiCmd *mc = c->mstate.commands + j; + multiCmd *mc = c->mstate->commands + j; for (i = 0; i < mc->argc; i++) decrRefCount(mc->argv[i]); zfree(mc->argv); } - zfree(c->mstate.commands); + + zfree(c->mstate->commands); + c->mstate->commands = NULL; +} + +/* Release all the resources associated with MULTI/EXEC state */ +void freeClientMultiState(client *c) { + if (!c->mstate) return; + + freeClientMultiStateCmds(c); + unwatchAllKeys(c); + zfree(c->mstate); + c->mstate = NULL; } void resetClientMultiState(client *c) { - if (c->mstate.commands) { - freeClientMultiState(c); - initClientMultiState(c); - } + if (!c->mstate || !c->mstate->commands) return; + + freeClientMultiStateCmds(c); + c->mstate->count = 0; + c->mstate->cmd_flags = 0; + c->mstate->cmd_inv_flags = 0; + c->mstate->argv_len_sums = 0; + c->mstate->alloc_count = 0; } /* Add a new command into the MULTI commands queue */ @@ -71,26 +80,27 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) { * bother to read previous responses and didn't notice the multi was already * aborted. */ if (c->flag.dirty_cas || c->flag.dirty_exec) return; - if (c->mstate.count == 0) { + if (!c->mstate) initClientMultiState(c); + if (c->mstate->count == 0) { /* If a client is using multi/exec, assuming it is used to execute at least * two commands. Hence, creating by default size of 2. */ - c->mstate.commands = zmalloc(sizeof(multiCmd) * 2); - c->mstate.alloc_count = 2; + c->mstate->commands = zmalloc(sizeof(multiCmd) * 2); + c->mstate->alloc_count = 2; } - if (c->mstate.count == c->mstate.alloc_count) { - c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX / 2 ? c->mstate.alloc_count * 2 : INT_MAX; - c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd) * (c->mstate.alloc_count)); + if (c->mstate->count == c->mstate->alloc_count) { + c->mstate->alloc_count = c->mstate->alloc_count < INT_MAX / 2 ? c->mstate->alloc_count * 2 : INT_MAX; + c->mstate->commands = zrealloc(c->mstate->commands, sizeof(multiCmd) * (c->mstate->alloc_count)); } - mc = c->mstate.commands + c->mstate.count; + mc = c->mstate->commands + c->mstate->count; mc->cmd = c->cmd; mc->argc = c->argc; mc->argv = c->argv; mc->argv_len = c->argv_len; - c->mstate.count++; - c->mstate.cmd_flags |= cmd_flags; - c->mstate.cmd_inv_flags |= ~cmd_flags; - c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj *) * c->argc; + c->mstate->count++; + c->mstate->cmd_flags |= cmd_flags; + c->mstate->cmd_inv_flags |= ~cmd_flags; + c->mstate->argv_len_sums += c->argv_len_sum + sizeof(robj *) * c->argc; /* Reset the client's args since we copied them into the mstate and shouldn't * reference them from c anymore. */ @@ -118,6 +128,7 @@ void flagTransaction(client *c) { } void multiCommand(client *c) { + if (!c->mstate) initClientMultiState(c); c->flag.multi = 1; addReply(c, shared.ok); } @@ -195,12 +206,12 @@ void execCommand(client *c) { orig_argv_len = c->argv_len; orig_argc = c->argc; orig_cmd = c->cmd; - addReplyArrayLen(c, c->mstate.count); - for (j = 0; j < c->mstate.count; j++) { - c->argc = c->mstate.commands[j].argc; - c->argv = c->mstate.commands[j].argv; - c->argv_len = c->mstate.commands[j].argv_len; - c->cmd = c->realcmd = c->mstate.commands[j].cmd; + addReplyArrayLen(c, c->mstate->count); + for (j = 0; j < c->mstate->count; j++) { + c->argc = c->mstate->commands[j].argc; + c->argv = c->mstate->commands[j].argv; + c->argv_len = c->mstate->commands[j].argv_len; + c->cmd = c->realcmd = c->mstate->commands[j].cmd; /* ACL permissions are also checked at the time of execution in case * they were changed after the commands were queued. */ @@ -234,10 +245,10 @@ void execCommand(client *c) { } /* Commands may alter argc/argv, restore mstate. */ - c->mstate.commands[j].argc = c->argc; - c->mstate.commands[j].argv = c->argv; - c->mstate.commands[j].argv_len = c->argv_len; - c->mstate.commands[j].cmd = c->cmd; + c->mstate->commands[j].argc = c->argc; + c->mstate->commands[j].argv = c->argv; + c->mstate->commands[j].argv_len = c->argv_len; + c->mstate->commands[j].cmd = c->cmd; /* The original argv has already been processed for slowlog and monitor, * so we can safely free it before proceeding to the next command. */ @@ -304,10 +315,10 @@ void watchForKey(client *c, robj *key) { listNode *ln; watchedKey *wk; - if (listLength(c->watched_keys) == 0) server.watching_clients++; + if (listLength(&c->mstate->watched_keys) == 0) server.watching_clients++; /* Check if we are already watching for this key */ - listRewind(c->watched_keys, &li); + listRewind(&c->mstate->watched_keys, &li); while ((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key, wk->key)) return; /* Key already watched */ @@ -326,7 +337,7 @@ void watchForKey(client *c, robj *key) { wk->db = c->db; wk->expired = keyIsExpired(c->db, key); incrRefCount(key); - listAddNodeTail(c->watched_keys, wk); + listAddNodeTail(&c->mstate->watched_keys, wk); watchedKeyLinkToClients(clients, wk); } @@ -336,8 +347,8 @@ void unwatchAllKeys(client *c) { listIter li; listNode *ln; - if (listLength(c->watched_keys) == 0) return; - listRewind(c->watched_keys, &li); + if (!c->mstate || listLength(&c->mstate->watched_keys) == 0) return; + listRewind(&c->mstate->watched_keys, &li); while ((ln = listNext(&li))) { list *clients; watchedKey *wk; @@ -350,7 +361,7 @@ void unwatchAllKeys(client *c) { /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key); /* Remove this watched key from the client->watched list */ - listDelNode(c->watched_keys, ln); + listDelNode(&c->mstate->watched_keys, ln); decrRefCount(wk->key); zfree(wk); } @@ -363,8 +374,8 @@ int isWatchedKeyExpired(client *c) { listIter li; listNode *ln; watchedKey *wk; - if (listLength(c->watched_keys) == 0) return 0; - listRewind(c->watched_keys, &li); + if (!c->mstate || listLength(&c->mstate->watched_keys) == 0) return 0; + listRewind(&c->mstate->watched_keys, &li); while ((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->expired) continue; /* was expired when WATCH was called */ @@ -474,6 +485,9 @@ void watchCommand(client *c) { addReply(c, shared.ok); return; } + + if (!c->mstate) initClientMultiState(c); + for (j = 1; j < c->argc; j++) watchForKey(c, c->argv[j]); addReply(c, shared.ok); } @@ -485,11 +499,12 @@ void unwatchCommand(client *c) { } size_t multiStateMemOverhead(client *c) { - size_t mem = c->mstate.argv_len_sums; + if (!c->mstate) return 0; + size_t mem = c->mstate->argv_len_sums; /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't * managed per-client. */ - mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); + mem += listLength(&c->mstate->watched_keys) * (sizeof(listNode) + sizeof(c->mstate->watched_keys)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(multiCmd); + mem += c->mstate->alloc_count * sizeof(multiCmd); return mem; } diff --git a/src/networking.c b/src/networking.c index 86f87deb8b..339cd304d4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -119,7 +119,7 @@ int authRequired(client *c) { } static inline int isReplicaReadyForReplData(client *replica) { - return (replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) && + return (replica->repl_data->repl_state == REPLICA_STATE_ONLINE || replica->repl_data->repl_state == REPLICA_STATE_BG_RDB_LOAD) && !(replica->flag.close_asap); } @@ -154,8 +154,6 @@ client *createClient(connection *conn) { c->bufpos = 0; c->buf_peak = c->buf_usable_size; c->buf_peak_last_reset_time = server.unixtime; - c->ref_repl_buf_node = NULL; - c->ref_block_pos = 0; c->qb_pos = 0; c->querybuf = NULL; c->querybuf_peak = 0; @@ -180,55 +178,31 @@ client *createClient(connection *conn) { c->ctime = c->last_interaction = server.unixtime; c->duration = 0; clientSetDefaultAuth(c); - c->repl_state = REPL_STATE_NONE; - c->repl_start_cmd_stream_on_ack = 0; - c->reploff = 0; - c->read_reploff = 0; - c->repl_applied = 0; - c->repl_ack_off = 0; - c->repl_ack_time = 0; - c->repl_aof_off = 0; - c->repl_last_partial_write = 0; - c->replica_listening_port = 0; - c->replica_addr = NULL; - c->replica_version = 0; - c->replica_capa = REPLICA_CAPA_NONE; - c->replica_req = REPLICA_REQ_NONE; - c->associated_rdb_client_id = 0; - c->rdb_client_disconnect_time = 0; c->reply = listCreate(); c->deferred_reply_errors = NULL; c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply, freeClientReplyValue); listSetDupMethod(c->reply, dupClientReplyValue); - initClientBlockingState(c); + c->repl_data = NULL; + c->bstate = NULL; + c->pubsub_data = NULL; + c->module_data = NULL; + c->mstate = NULL; c->woff = 0; - c->watched_keys = listCreate(); - c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType); - c->pubsub_patterns = dictCreate(&objectKeyPointerValueDictType); - c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType); c->peerid = NULL; c->sockname = NULL; c->client_list_node = NULL; c->io_read_state = CLIENT_IDLE; c->io_write_state = CLIENT_IDLE; c->nwritten = 0; - c->client_tracking_redirection = 0; - c->client_tracking_prefixes = NULL; c->last_memory_usage = 0; c->last_memory_type = CLIENT_TYPE_NORMAL; - c->module_blocked_client = NULL; - c->module_auth_ctx = NULL; - c->auth_callback = NULL; - c->auth_callback_privdata = NULL; - c->auth_module = NULL; listInitNode(&c->clients_pending_write_node, c); listInitNode(&c->pending_read_list_node, c); c->mem_usage_bucket = NULL; c->mem_usage_bucket_node = NULL; if (conn) linkClient(c); - initClientMultiState(c); c->net_input_bytes = 0; c->net_input_bytes_curr_cmd = 0; c->net_output_bytes = 0; @@ -266,7 +240,9 @@ void putClientInPendingWriteQueue(client *c) { * if not already done and, for replicas, if the replica can actually receive * writes at this stage. */ if (!c->flag.pending_write && - (c->repl_state == REPL_STATE_NONE || (isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) { + (!c->repl_data || + c->repl_data->repl_state == REPL_STATE_NONE || + (isReplicaReadyForReplData(c) && !c->repl_data->repl_start_cmd_stream_on_ack))) { /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. This way before re-entering the event @@ -1340,10 +1316,10 @@ void deferredAfterErrorReply(client *c, list *errors) { void copyReplicaOutputBuffer(client *dst, client *src) { serverAssert(src->bufpos == 0 && listLength(src->reply) == 0); - if (src->ref_repl_buf_node == NULL) return; - dst->ref_repl_buf_node = src->ref_repl_buf_node; - dst->ref_block_pos = src->ref_block_pos; - ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++; + if (src->repl_data->ref_repl_buf_node == NULL) return; + dst->repl_data->ref_repl_buf_node = src->repl_data->ref_repl_buf_node; + dst->repl_data->ref_block_pos = src->repl_data->ref_block_pos; + ((replBufBlock *)listNodeValue(dst->repl_data->ref_repl_buf_node))->refcount++; } /* Return true if the specified client has pending reply buffers to write to @@ -1353,13 +1329,13 @@ int clientHasPendingReplies(client *c) { /* Replicas use global shared replication buffer instead of * private output buffer. */ serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); - if (c->ref_repl_buf_node == NULL) return 0; + if (c->repl_data->ref_repl_buf_node == NULL) return 0; /* If the last replication buffer block content is totally sent, * we have nothing to send. */ listNode *ln = listLast(server.repl_buffer_blocks); replBufBlock *tail = listNodeValue(ln); - if (ln == c->ref_repl_buf_node && c->ref_block_pos == tail->used) return 0; + if (ln == c->repl_data->ref_repl_buf_node && c->repl_data->ref_block_pos == tail->used) return 0; return 1; } else { @@ -1526,23 +1502,6 @@ void disconnectReplicas(void) { } } -/* Check if there is any other replica waiting dumping RDB finished expect me. - * This function is useful to judge current dumping RDB can be used for full - * synchronization or not. */ -int anyOtherReplicaWaitRdb(client *except_me) { - listIter li; - listNode *ln; - - listRewind(server.replicas, &li); - while ((ln = listNext(&li))) { - client *replica = ln->value; - if (replica != except_me && replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { - return 1; - } - } - return 0; -} - /* Remove the specified client from global lists where the client could * be referenced, not including the Pub/Sub channels. * This is used by freeClient() and replicationCachePrimary(). */ @@ -1567,7 +1526,7 @@ void unlinkClient(client *c) { /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ - if (c->flag.replica && c->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && server.rdb_pipe_conns) { + if (c->repl_data && c->flag.replica && c->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && server.rdb_pipe_conns) { int i; int still_alive = 0; for (i = 0; i < server.rdb_pipe_numconns; i++) { @@ -1653,11 +1612,7 @@ void clearClientConnectionState(client *c) { clientSetDefaultAuth(c); moduleNotifyUserChanged(c); discardTransaction(c); - - pubsubUnsubscribeAllChannels(c, 0); - pubsubUnsubscribeShardAllChannels(c, 0); - pubsubUnsubscribeAllPatterns(c, 0); - unmarkClientAsPubSub(c); + freeClientPubSubData(c); if (c->name) { decrRefCount(c->name); @@ -1696,9 +1651,7 @@ void freeClient(client *c) { /* Notify module system that this client auth status changed. */ moduleNotifyUserChanged(c); - - /* Free the RedisModuleBlockedClient held onto for reprocessing if not already freed. */ - zfree(c->module_blocked_client); + freeClientModuleData(c); /* If this client was scheduled for async freeing we need to remove it * from the queue. Note that we need to do this here, because later @@ -1745,31 +1698,16 @@ void freeClient(client *c) { /* If there is any in-flight command, we don't record their duration. */ c->duration = 0; if (c->flag.blocked) unblockClient(c, 1); - dictRelease(c->bstate.keys); - - /* UNWATCH all the keys */ - unwatchAllKeys(c); - listRelease(c->watched_keys); - c->watched_keys = NULL; - - /* Unsubscribe from all the pubsub channels */ - pubsubUnsubscribeAllChannels(c, 0); - pubsubUnsubscribeShardAllChannels(c, 0); - pubsubUnsubscribeAllPatterns(c, 0); - unmarkClientAsPubSub(c); - dictRelease(c->pubsub_channels); - c->pubsub_channels = NULL; - dictRelease(c->pubsub_patterns); - c->pubsub_patterns = NULL; - dictRelease(c->pubsubshard_channels); - c->pubsubshard_channels = NULL; + + freeClientBlockingState(c); + freeClientPubSubData(c); /* Free data structures. */ listRelease(c->reply); c->reply = NULL; zfree_with_size(c->buf, c->buf_usable_size); c->buf = NULL; - freeReplicaReferencedReplBuffer(c); + freeClientArgv(c); freeClientOriginalArgv(c); if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); @@ -1787,45 +1725,7 @@ void freeClient(client *c) { * places where active clients may be referenced. */ unlinkClient(c); - /* Primary/replica cleanup Case 1: - * we lost the connection with a replica. */ - if (c->flag.replica) { - /* If there is no any other replica waiting dumping RDB finished, the - * current child process need not continue to dump RDB, then we kill it. - * So child process won't use more memory, and we also can fork a new - * child process asap to dump rdb for next full synchronization or bgsave. - * But we also need to check if users enable 'save' RDB, if enable, we - * should not remove directly since that means RDB is important for users - * to keep data safe and we may delay configured 'save' for full sync. */ - if (server.saveparamslen == 0 && c->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && - server.child_type == CHILD_TYPE_RDB && server.rdb_child_type == RDB_CHILD_TYPE_DISK && - anyOtherReplicaWaitRdb(c) == 0) { - serverLog(LL_NOTICE, "Background saving, persistence disabled, last replica dropped, killing fork child."); - killRDBChild(); - } - if (c->repl_state == REPLICA_STATE_SEND_BULK) { - if (c->repldbfd != -1) close(c->repldbfd); - if (c->replpreamble) sdsfree(c->replpreamble); - } - list *l = (c->flag.monitor) ? server.monitors : server.replicas; - ln = listSearchKey(l, c); - serverAssert(ln != NULL); - listDelNode(l, ln); - /* We need to remember the time when we started to have zero - * attached replicas, as after some time we'll free the replication - * backlog. */ - if (getClientType(c) == CLIENT_TYPE_REPLICA && listLength(server.replicas) == 0) - server.repl_no_replicas_since = server.unixtime; - refreshGoodReplicasCount(); - /* Fire the replica change modules event. */ - if (c->repl_state == REPLICA_STATE_ONLINE) - moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE, VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, - NULL); - } - - /* Primary/replica cleanup Case 2: - * we lost the connection with the primary. */ - if (c->flag.primary) replicationHandlePrimaryDisconnection(); + freeClientReplicationData(c); /* Remove client from memory usage buckets */ if (c->mem_usage_bucket) { @@ -1841,7 +1741,6 @@ void freeClient(client *c) { freeClientMultiState(c); sdsfree(c->peerid); sdsfree(c->sockname); - sdsfree(c->replica_addr); zfree(c); } @@ -1932,10 +1831,10 @@ void beforeNextClient(client *c) { * In these scenarios, qb_pos points to the part of the current command * or the beginning of next command, and the current command is not applied yet, * so the repl_applied is not equal to qb_pos. */ - if (c->repl_applied) { - sdsrange(c->querybuf, c->repl_applied, -1); - c->qb_pos -= c->repl_applied; - c->repl_applied = 0; + if (c->repl_data->repl_applied) { + sdsrange(c->querybuf, c->repl_data->repl_applied, -1); + c->qb_pos -= c->repl_data->repl_applied; + c->repl_data->repl_applied = 0; } } else { trimClientQueryBuffer(c); @@ -1974,18 +1873,18 @@ int freeClientsInAsyncFreeQueue(void) { * The primary gives a grace period before freeing this client because * it serves as a reference to the first required replication data block for * this replica */ - if (!c->rdb_client_disconnect_time) { + if (!c->repl_data->rdb_client_disconnect_time) { if (c->conn) connSetReadHandler(c->conn, NULL); - c->rdb_client_disconnect_time = server.unixtime; + c->repl_data->rdb_client_disconnect_time = server.unixtime; dualChannelServerLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); } - if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; + if (server.unixtime - c->repl_data->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; dualChannelServerLog( LL_NOTICE, "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " "Freeing RDB client %llu.", - (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + (long int)(server.unixtime - c->repl_data->rdb_client_disconnect_time), (unsigned long long)c->id); c->flag.protected_rdb_channel = 0; } @@ -2015,27 +1914,27 @@ void writeToReplica(client *c) { int nwritten = 0; serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); while (clientHasPendingReplies(c)) { - replBufBlock *o = listNodeValue(c->ref_repl_buf_node); - serverAssert(o->used >= c->ref_block_pos); + replBufBlock *o = listNodeValue(c->repl_data->ref_repl_buf_node); + serverAssert(o->used >= c->repl_data->ref_block_pos); /* Send current block if it is not fully sent. */ - if (o->used > c->ref_block_pos) { - nwritten = connWrite(c->conn, o->buf + c->ref_block_pos, o->used - c->ref_block_pos); + if (o->used > c->repl_data->ref_block_pos) { + nwritten = connWrite(c->conn, o->buf + c->repl_data->ref_block_pos, o->used - c->repl_data->ref_block_pos); if (nwritten <= 0) { c->write_flags |= WRITE_FLAGS_WRITE_ERROR; return; } c->nwritten += nwritten; - c->ref_block_pos += nwritten; + c->repl_data->ref_block_pos += nwritten; } /* If we fully sent the object on head, go to the next one. */ - listNode *next = listNextNode(c->ref_repl_buf_node); - if (next && c->ref_block_pos == o->used) { + listNode *next = listNextNode(c->repl_data->ref_repl_buf_node); + if (next && c->repl_data->ref_block_pos == o->used) { o->refcount--; ((replBufBlock *)(listNodeValue(next)))->refcount++; - c->ref_repl_buf_node = next; - c->ref_block_pos = 0; + c->repl_data->ref_repl_buf_node = next; + c->repl_data->ref_block_pos = 0; incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } } @@ -2338,7 +2237,7 @@ int handleReadResult(client *c) { c->last_interaction = server.unixtime; c->net_input_bytes += c->nread; if (c->flag.primary) { - c->read_reploff += c->nread; + c->repl_data->read_reploff += c->nread; server.stat_net_repl_input_bytes += c->nread; } else { server.stat_net_input_bytes += c->nread; @@ -2409,7 +2308,7 @@ parseResult handleParseResults(client *c) { } if (c->read_flags & READ_FLAGS_INLINE_ZERO_QUERY_LEN && getClientType(c) == CLIENT_TYPE_REPLICA) { - c->repl_ack_time = server.unixtime; + c->repl_data->repl_ack_time = server.unixtime; } if (c->read_flags & READ_FLAGS_INLINE_ZERO_QUERY_LEN) { @@ -2993,10 +2892,12 @@ void commandProcessed(client *c) { clusterSlotStatsAddNetworkBytesInForUserClient(c); resetClient(c); - long long prev_offset = c->reploff; + if (!c->repl_data) return; + + long long prev_offset = c->repl_data->reploff; if (c->flag.primary && !c->flag.multi) { /* Update the applied replication offset of our primary. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + c->repl_data->reploff = c->repl_data->read_reploff - sdslen(c->querybuf) + c->qb_pos; } /* If the client is a primary we need to compute the difference @@ -3006,10 +2907,10 @@ void commandProcessed(client *c) { * part of the replication stream, will be propagated to the * sub-replicas and to the replication backlog. */ if (c->flag.primary) { - long long applied = c->reploff - prev_offset; + long long applied = c->repl_data->reploff - prev_offset; if (applied) { - replicationFeedStreamFromPrimaryStream(c->querybuf + c->repl_applied, applied); - c->repl_applied += applied; + replicationFeedStreamFromPrimaryStream(c->querybuf + c->repl_data->repl_applied, applied); + c->repl_data->repl_applied += applied; } } } @@ -3241,7 +3142,7 @@ void readToQueryBuf(client *c) { * so they are also considered a part of the query buffer in a broader sense. * * For unauthenticated clients, the query buffer cannot exceed 1MB at most. */ - size_t qb_memory = sdslen(c->querybuf) + c->mstate.argv_len_sums; + size_t qb_memory = sdslen(c->querybuf) + (c->mstate ? c->mstate->argv_len_sums : 0); if (qb_memory > server.client_max_querybuf_len || (qb_memory > 1024 * 1024 && (c->read_flags & READ_FLAGS_AUTH_REQUIRED))) { c->read_flags |= READ_FLAGS_QB_LIMIT_REACHED; @@ -3369,9 +3270,9 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) { size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); size_t used_blocks_of_repl_buf = 0; - if (client->ref_repl_buf_node) { + if (client->repl_data && client->repl_data->ref_repl_buf_node) { replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); - replBufBlock *cur = listNodeValue(client->ref_repl_buf_node); + replBufBlock *cur = listNodeValue(client->repl_data->ref_repl_buf_node); used_blocks_of_repl_buf = last->id - cur->id + 1; } sds ret = sdscatfmt( @@ -3386,15 +3287,15 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) { " idle=%I", (long long)(server.unixtime - client->last_interaction), " flags=%s", flags, " db=%i", client->db->id, - " sub=%i", (int)dictSize(client->pubsub_channels), - " psub=%i", (int)dictSize(client->pubsub_patterns), - " ssub=%i", (int)dictSize(client->pubsubshard_channels), - " multi=%i", (client->flag.multi) ? client->mstate.count : -1, - " watch=%i", (int)listLength(client->watched_keys), + " sub=%i", client->pubsub_data ? (int)dictSize(client->pubsub_data->pubsub_channels) : 0, + " psub=%i", client->pubsub_data ? (int)dictSize(client->pubsub_data->pubsub_patterns) : 0, + " ssub=%i", client->pubsub_data ? (int)dictSize(client->pubsub_data->pubsubshard_channels) : 0, + " multi=%i", client->mstate ? client->mstate->count : -1, + " watch=%i", client->mstate ? (int)listLength(&client->mstate->watched_keys) : 0, " qbuf=%U", client->querybuf ? (unsigned long long)sdslen(client->querybuf) : 0, " qbuf-free=%U", client->querybuf ? (unsigned long long)sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long)client->argv_len_sum, - " multi-mem=%U", (unsigned long long)client->mstate.argv_len_sums, + " multi-mem=%U", client->mstate ? (unsigned long long)client->mstate->argv_len_sums : 0, " rbs=%U", (unsigned long long)client->buf_usable_size, " rbp=%U", (unsigned long long)client->buf_peak, " obl=%U", (unsigned long long)client->bufpos, @@ -3404,7 +3305,7 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) { " events=%s", events, " cmd=%s", client->lastcmd ? client->lastcmd->fullname : "NULL", " user=%s", hide_user_data ? "*redacted*" : (client->user ? client->user->name : "(superuser)"), - " redir=%I", (client->flag.tracking) ? (long long)client->client_tracking_redirection : -1, + " redir=%I", (client->flag.tracking) ? (long long)client->pubsub_data->client_tracking_redirection : -1, " resp=%i", client->resp, " lib-name=%s", client->lib_name ? (char *)client->lib_name->ptr : "", " lib-ver=%s", client->lib_ver ? (char *)client->lib_ver->ptr : "", @@ -3892,6 +3793,7 @@ void clientCommand(client *c) { struct ClientFlags options = {0}; robj **prefix = NULL; size_t numprefix = 0; + initClientPubSubData(c); /* Parse the options. */ for (int j = 3; j < c->argc; j++) { @@ -4031,7 +3933,7 @@ void clientCommand(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "getredir") && c->argc == 2) { /* CLIENT GETREDIR */ if (c->flag.tracking) { - addReplyLongLong(c, c->client_tracking_redirection); + addReplyLongLong(c, c->pubsub_data->client_tracking_redirection); } else { addReplyLongLong(c, -1); } @@ -4077,17 +3979,17 @@ void clientCommand(client *c) { /* Redirect */ addReplyBulkCString(c, "redirect"); if (c->flag.tracking) { - addReplyLongLong(c, c->client_tracking_redirection); + addReplyLongLong(c, c->pubsub_data->client_tracking_redirection); } else { addReplyLongLong(c, -1); } /* Prefixes */ addReplyBulkCString(c, "prefixes"); - if (c->client_tracking_prefixes) { - addReplyArrayLen(c, raxSize(c->client_tracking_prefixes)); + if (c->pubsub_data->client_tracking_prefixes) { + addReplyArrayLen(c, raxSize(c->pubsub_data->client_tracking_prefixes)); raxIterator ri; - raxStart(&ri, c->client_tracking_prefixes); + raxStart(&ri, c->pubsub_data->client_tracking_prefixes); raxSeek(&ri, "^", NULL, 0); while (raxNext(&ri)) { addReplyBulkCBuffer(c, ri.key, ri.key_len); @@ -4410,9 +4312,9 @@ size_t getClientOutputBufferMemoryUsage(client *c) { size_t repl_buf_size = 0; size_t repl_node_num = 0; size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock); - if (c->ref_repl_buf_node) { + if (c->repl_data->ref_repl_buf_node) { replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); - replBufBlock *cur = listNodeValue(c->ref_repl_buf_node); + replBufBlock *cur = listNodeValue(c->repl_data->ref_repl_buf_node); repl_buf_size = last->repl_offset + last->size - cur->repl_offset; repl_node_num = last->id - cur->id + 1; } @@ -4445,8 +4347,8 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire * rax */ - if (c->client_tracking_prefixes) - mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode *)); + if (c->pubsub_data && c->pubsub_data->client_tracking_prefixes) + mem += c->pubsub_data->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode *)); return mem; } @@ -4612,7 +4514,7 @@ void flushReplicasOutputBuffers(void) { * 3. Obviously if the replica is not ONLINE. */ if (isReplicaReadyForReplData(replica) && !(replica->flag.close_asap) && can_receive_writes && - !replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) { + !replica->repl_data->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) { writeToClient(replica); } } diff --git a/src/pubsub.c b/src/pubsub.c index 3781fa39aa..27b5611788 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -219,20 +219,20 @@ int serverPubsubShardSubscriptionCount(void) { /* Return the number of channels + patterns a client is subscribed to. */ int clientSubscriptionsCount(client *c) { - return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns); + return dictSize(c->pubsub_data->pubsub_channels) + dictSize(c->pubsub_data->pubsub_patterns); } /* Return the number of shard level channels a client is subscribed to. */ int clientShardSubscriptionsCount(client *c) { - return dictSize(c->pubsubshard_channels); + return dictSize(c->pubsub_data->pubsubshard_channels); } dict *getClientPubSubChannels(client *c) { - return c->pubsub_channels; + return c->pubsub_data->pubsub_channels; } dict *getClientPubSubShardChannels(client *c) { - return c->pubsubshard_channels; + return c->pubsub_data->pubsubshard_channels; } /* Return the number of pubsub + pubsub shard level channels @@ -255,6 +255,36 @@ void unmarkClientAsPubSub(client *c) { } } +void initClientPubSubData(client *c) { + if (c->pubsub_data) return; + c->pubsub_data = zmalloc(sizeof(ClientPubSubData)); + c->pubsub_data->pubsub_channels = dictCreate(&objectKeyPointerValueDictType); + c->pubsub_data->pubsub_patterns = dictCreate(&objectKeyPointerValueDictType); + c->pubsub_data->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType); + c->pubsub_data->client_tracking_redirection = 0; + c->pubsub_data->client_tracking_prefixes = NULL; +} + +void freeClientPubSubData(client *c) { + if (!c->pubsub_data) return; + /* Unsubscribe from all the pubsub channels */ + pubsubUnsubscribeAllChannels(c, 0); + pubsubUnsubscribeShardAllChannels(c, 0); + pubsubUnsubscribeAllPatterns(c, 0); + unmarkClientAsPubSub(c); + dictRelease(c->pubsub_data->pubsub_channels); + c->pubsub_data->pubsub_channels = NULL; + dictRelease(c->pubsub_data->pubsub_patterns); + c->pubsub_data->pubsub_patterns = NULL; + dictRelease(c->pubsub_data->pubsubshard_channels); + c->pubsub_data->pubsubshard_channels = NULL; + if (c->pubsub_data->client_tracking_prefixes) { + disableTracking(c); + } + zfree(c->pubsub_data); + c->pubsub_data = NULL; +} + /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { @@ -262,6 +292,8 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { int retval = 0; unsigned int slot = 0; + if (!c->pubsub_data) initClientPubSubData(c); + /* Add the channel to the client -> channels hash table */ void *position = dictFindPositionForInsert(type.clientPubSubChannels(c), channel, NULL); if (position) { /* Not yet subscribed to this channel */ @@ -344,7 +376,7 @@ void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) { dictEntry *entry; while ((entry = dictNext(iter)) != NULL) { client *c = dictGetKey(entry); - int retval = dictDelete(c->pubsubshard_channels, channel); + int retval = dictDelete(c->pubsub_data->pubsubshard_channels, channel); serverAssertWithInfo(c, channel, retval == DICT_OK); addReplyPubsubUnsubscribed(c, channel, pubSubShardType); /* If the client has no other pubsub subscription, @@ -366,7 +398,9 @@ int pubsubSubscribePattern(client *c, robj *pattern) { dict *clients; int retval = 0; - if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) { + if (!c->pubsub_data) initClientPubSubData(c); + + if (dictAdd(c->pubsub_data->pubsub_patterns, pattern, NULL) == DICT_OK) { retval = 1; incrRefCount(pattern); /* Add the client to the pattern -> list of clients hash table */ @@ -392,8 +426,10 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { dict *clients; int retval = 0; + if (!c->pubsub_data) initClientPubSubData(c); + incrRefCount(pattern); /* Protect the object. May be the same we remove */ - if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) { + if (dictDelete(c->pubsub_data->pubsub_patterns, pattern) == DICT_OK) { retval = 1; /* Remove the client from the pattern -> clients list hash table */ de = dictFind(server.pubsub_patterns, pattern); @@ -454,9 +490,10 @@ int pubsubUnsubscribeShardAllChannels(client *c, int notify) { * client was subscribed from. */ int pubsubUnsubscribeAllPatterns(client *c, int notify) { int count = 0; + if (!c->pubsub_data) initClientPubSubData(c); - if (dictSize(c->pubsub_patterns) > 0) { - dictIterator *di = dictGetSafeIterator(c->pubsub_patterns); + if (dictSize(c->pubsub_data->pubsub_patterns) > 0) { + dictIterator *di = dictGetSafeIterator(c->pubsub_data->pubsub_patterns); dictEntry *de; while ((de = dictNext(di)) != NULL) { @@ -560,6 +597,8 @@ void subscribeCommand(client *c) { /* UNSUBSCRIBE [channel ...] */ void unsubscribeCommand(client *c) { + if (!c->pubsub_data) initClientPubSubData(c); + if (c->argc == 1) { pubsubUnsubscribeAllChannels(c, 1); } else { @@ -732,6 +771,8 @@ void ssubscribeCommand(client *c) { /* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */ void sunsubscribeCommand(client *c) { + if (!c->pubsub_data) initClientPubSubData(c); + if (c->argc == 1) { pubsubUnsubscribeShardAllChannels(c, 1); } else { @@ -745,12 +786,13 @@ void sunsubscribeCommand(client *c) { } size_t pubsubMemOverhead(client *c) { + if (!c->pubsub_data) return 0; /* PubSub patterns */ - size_t mem = dictMemUsage(c->pubsub_patterns); + size_t mem = dictMemUsage(c->pubsub_data->pubsub_patterns); /* Global PubSub channels */ - mem += dictMemUsage(c->pubsub_channels); + mem += dictMemUsage(c->pubsub_data->pubsub_channels); /* Sharded PubSub channels */ - mem += dictMemUsage(c->pubsubshard_channels); + mem += dictMemUsage(c->pubsub_data->pubsubshard_channels); return mem; } diff --git a/src/rdb.c b/src/rdb.c index 958eac5d4f..32c9021669 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3573,9 +3573,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { listRewind(server.replicas, &li); while ((ln = listNext(&li))) { client *replica = ln->value; - if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ - if (replica->replica_req != req) continue; + if (replica->repl_data->replica_req != req) continue; conns[connsnum++] = replica->conn; if (dual_channel) { @@ -3646,8 +3646,8 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { listRewind(server.replicas, &li); while ((ln = listNext(&li))) { client *replica = ln->value; - if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { - replica->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { + replica->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; } } if (!dual_channel) { diff --git a/src/replication.c b/src/replication.c index c5611d5a5a..9913d64d65 100644 --- a/src/replication.c +++ b/src/replication.c @@ -82,10 +82,10 @@ char *replicationGetReplicaName(client *c) { ip[0] = '\0'; buf[0] = '\0'; - if (c->replica_addr || connAddrPeerName(c->conn, ip, sizeof(ip), NULL) != -1) { - char *addr = c->replica_addr ? c->replica_addr : ip; - if (c->replica_listening_port) - formatAddr(buf, sizeof(buf), addr, c->replica_listening_port); + if (c->repl_data->replica_addr || connAddrPeerName(c->conn, ip, sizeof(ip), NULL) != -1) { + char *addr = c->repl_data->replica_addr ? c->repl_data->replica_addr : ip; + if (c->repl_data->replica_listening_port) + formatAddr(buf, sizeof(buf), addr, c->repl_data->replica_listening_port); else snprintf(buf, sizeof(buf), "%s:", addr); } else { @@ -231,7 +231,7 @@ void addRdbReplicaToPsyncWait(client *replica_rdb_client) { dualChannelServerLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id, tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); - replica_rdb_client->ref_repl_buf_node = tail ? ln : NULL; + replica_rdb_client->repl_data->ref_repl_buf_node = tail ? ln : NULL; /* Prevent rdb client from being freed before psync is established. */ replica_rdb_client->flag.protected_rdb_channel = 1; uint64_t id = htonu64(replica_rdb_client->id); @@ -250,8 +250,8 @@ void backfillRdbReplicasToPsyncWait(void) { raxSeek(&iter, "^", NULL, 0); while (raxNext(&iter)) { client *replica_rdb_client = iter.data; - if (replica_rdb_client->ref_repl_buf_node) continue; - replica_rdb_client->ref_repl_buf_node = ln; + if (replica_rdb_client->repl_data->ref_repl_buf_node) continue; + replica_rdb_client->repl_data->ref_repl_buf_node = ln; head->refcount++; dualChannelServerLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block", (long long unsigned int)replica_rdb_client->id); @@ -263,18 +263,18 @@ void removeReplicaFromPsyncWait(client *replica_main_client) { listNode *ln; replBufBlock *o; /* Get replBufBlock pointed by this replica */ - client *replica_rdb_client = lookupRdbClientByID(replica_main_client->associated_rdb_client_id); - ln = replica_rdb_client->ref_repl_buf_node; + client *replica_rdb_client = lookupRdbClientByID(replica_main_client->repl_data->associated_rdb_client_id); + ln = replica_rdb_client->repl_data->ref_repl_buf_node; o = ln ? listNodeValue(ln) : NULL; if (o != NULL) { serverAssert(o->refcount > 0); o->refcount--; } - replica_rdb_client->ref_repl_buf_node = NULL; + replica_rdb_client->repl_data->ref_repl_buf_node = NULL; replica_rdb_client->flag.protected_rdb_channel = 0; dualChannelServerLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", replicationGetReplicaName(replica_main_client), - (long long unsigned int)replica_main_client->associated_rdb_client_id, + (long long unsigned int)replica_main_client->repl_data->associated_rdb_client_id, o ? "ref count decreased" : "doesn't exist"); uint64_t id = htonu64(replica_rdb_client->id); raxRemove(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), NULL); @@ -291,7 +291,7 @@ int canFeedReplicaReplBuffer(client *replica) { if (replica->flag.repl_rdbonly) return 0; /* Don't feed replicas that are still waiting for BGSAVE to start. */ - if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) return 0; + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) return 0; return 1; } @@ -396,15 +396,15 @@ void freeReplicaReferencedReplBuffer(client *replica) { replicationGetReplicaName(replica), (long long unsigned int)replica->id); } } - if (replica->ref_repl_buf_node != NULL) { + if (replica->repl_data->ref_repl_buf_node != NULL) { /* Decrease the start buffer node reference count. */ - replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); + replBufBlock *o = listNodeValue(replica->repl_data->ref_repl_buf_node); serverAssert(o->refcount > 0); o->refcount--; incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } - replica->ref_repl_buf_node = NULL; - replica->ref_block_pos = 0; + replica->repl_data->ref_repl_buf_node = NULL; + replica->repl_data->ref_block_pos = 0; } /* Replication: Primary side. @@ -486,9 +486,9 @@ void feedReplicationBuffer(char *s, size_t len) { client *replica = ln->value; if (!canFeedReplicaReplBuffer(replica) && !(replica->flag.protected_rdb_channel)) continue; /* Update shared replication buffer start position. */ - if (replica->ref_repl_buf_node == NULL) { - replica->ref_repl_buf_node = start_node; - replica->ref_block_pos = start_pos; + if (replica->repl_data->ref_repl_buf_node == NULL) { + replica->repl_data->ref_repl_buf_node = start_node; + replica->repl_data->ref_block_pos = start_pos; /* Only increase the start block reference count. */ ((replBufBlock *)listNodeValue(start_node))->refcount++; } @@ -771,8 +771,8 @@ long long addReplyReplicationBacklog(client *c, long long offset) { /* Setting output buffer of the replica. */ replBufBlock *o = listNodeValue(node); o->refcount++; - c->ref_repl_buf_node = node; - c->ref_block_pos = offset - o->repl_offset; + c->repl_data->ref_repl_buf_node = node; + c->repl_data->ref_block_pos = offset - o->repl_offset; return server.repl_backlog->histlen - skip; } @@ -805,8 +805,8 @@ int replicationSetupReplicaForFullResync(client *replica, long long offset) { char buf[128]; int buflen; - replica->psync_initial_offset = offset; - replica->repl_state = REPLICA_STATE_WAIT_BGSAVE_END; + replica->repl_data->psync_initial_offset = offset; + replica->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * replica as well. Set replicas_eldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ @@ -889,19 +889,19 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) { * 4) Send the backlog data (from the offset to the end) to the replica. */ waitForClientIO(c); c->flag.replica = 1; - if (c->associated_rdb_client_id && lookupRdbClientByID(c->associated_rdb_client_id)) { - c->repl_state = REPLICA_STATE_BG_RDB_LOAD; + if (c->repl_data->associated_rdb_client_id && lookupRdbClientByID(c->repl_data->associated_rdb_client_id)) { + c->repl_data->repl_state = REPLICA_STATE_BG_RDB_LOAD; removeReplicaFromPsyncWait(c); } else { - c->repl_state = REPLICA_STATE_ONLINE; + c->repl_data->repl_state = REPLICA_STATE_ONLINE; } - c->repl_ack_time = server.unixtime; - c->repl_start_cmd_stream_on_ack = 0; + c->repl_data->repl_ack_time = server.unixtime; + c->repl_data->repl_start_cmd_stream_on_ack = 0; listAddNodeTail(server.replicas, c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ - if (c->replica_capa & REPLICA_CAPA_PSYNC2) { + if (c->repl_data->replica_capa & REPLICA_CAPA_PSYNC2) { buflen = snprintf(buf, sizeof(buf), "+CONTINUE %s\r\n", server.replid); } else { buflen = snprintf(buf, sizeof(buf), "+CONTINUE\r\n"); @@ -1003,8 +1003,8 @@ int startBgsaveForReplication(int mincapa, int req) { while ((ln = listNext(&li))) { client *replica = ln->value; - if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { - replica->repl_state = REPL_STATE_NONE; + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { + replica->repl_data->repl_state = REPL_STATE_NONE; replica->flag.replica = 0; listDelNode(server.replicas, ln); addReplyError(replica, "BGSAVE failed, replication can't continue"); @@ -1021,9 +1021,9 @@ int startBgsaveForReplication(int mincapa, int req) { while ((ln = listNext(&li))) { client *replica = ln->value; - if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ - if (replica->replica_req != req) continue; + if (replica->repl_data->replica_req != req) continue; replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } } @@ -1037,6 +1037,8 @@ void syncCommand(client *c) { /* ignore SYNC if already replica or in monitor mode */ if (c->flag.replica) return; + initClientReplicationData(c); + /* Wait for any IO pending operation to finish before changing the client state to replica */ waitForClientIO(c); @@ -1089,7 +1091,7 @@ void syncCommand(client *c) { /* Fail sync if replica doesn't support EOF capability but wants a filtered RDB. This is because we force filtered * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing * use of a socket is handled, if needed, in `startBgsaveForReplication`. */ - if (c->replica_req & REPLICA_REQ_RDB_MASK && !(c->replica_capa & REPLICA_CAPA_EOF)) { + if (c->repl_data->replica_req & REPLICA_REQ_RDB_MASK && !(c->repl_data->replica_capa & REPLICA_CAPA_EOF)) { addReplyError(c, "Filtered replica requires EOF capability"); return; } @@ -1124,7 +1126,7 @@ void syncCommand(client *c) { * resync on purpose when they are not able to partially * resync. */ if (primary_replid[0] != '?') server.stat_sync_partial_err++; - if (c->replica_capa & REPLICA_CAPA_DUAL_CHANNEL) { + if (c->repl_data->replica_capa & REPLICA_CAPA_DUAL_CHANNEL) { dualChannelServerLog(LL_NOTICE, "Replica %s is capable of dual channel synchronization, and partial sync " "isn't possible. " @@ -1149,9 +1151,9 @@ void syncCommand(client *c) { /* Setup the replica as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the replica differently. */ - c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; + c->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ - c->repldbfd = -1; + c->repl_data->repldbfd = -1; c->flag.replica = 1; listAddNodeTail(server.replicas, c); @@ -1183,20 +1185,20 @@ void syncCommand(client *c) { replica = ln->value; /* If the client needs a buffer of commands, we can't use * a replica without replication buffer. */ - if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && + if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && (!(replica->flag.repl_rdbonly) || (c->flag.repl_rdbonly))) break; } /* To attach this replica, we check that it has at least all the * capabilities of the replica that triggered the current BGSAVE * and its exact requirements. */ - if (ln && ((c->replica_capa & replica->replica_capa) == replica->replica_capa) && - c->replica_req == replica->replica_req) { + if (ln && ((c->repl_data->replica_capa & replica->repl_data->replica_capa) == replica->repl_data->replica_capa) && + c->repl_data->replica_req == replica->repl_data->replica_req) { /* Perfect, the server is already registering differences for * another replica. Set the right state, and copy the buffer. * We don't copy buffer if clients don't want. */ if (!c->flag.repl_rdbonly) copyReplicaOutputBuffer(c, replica); - replicationSetupReplicaForFullResync(c, replica->psync_initial_offset); + replicationSetupReplicaForFullResync(c, replica->repl_data->psync_initial_offset); serverLog(LL_NOTICE, "Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to @@ -1213,7 +1215,7 @@ void syncCommand(client *c) { /* CASE 3: There is no BGSAVE is in progress. */ } else { - if (server.repl_diskless_sync && (c->replica_capa & REPLICA_CAPA_EOF) && server.repl_diskless_sync_delay) { + if (server.repl_diskless_sync && (c->repl_data->replica_capa & REPLICA_CAPA_EOF) && server.repl_diskless_sync_delay) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more replicas to arrive. */ @@ -1222,7 +1224,7 @@ void syncCommand(client *c) { /* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */ if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->replica_capa, c->replica_req); + startBgsaveForReplication(c->repl_data->replica_capa, c->repl_data->replica_req); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); @@ -1232,6 +1234,72 @@ void syncCommand(client *c) { return; } +/* Check if there is any other replica waiting dumping RDB finished expect me. + * This function is useful to judge current dumping RDB can be used for full + * synchronization or not. */ +int anyOtherReplicaWaitRdb(client *except_me) { + listIter li; + listNode *ln; + + listRewind(server.replicas, &li); + while ((ln = listNext(&li))) { + client *replica = ln->value; + if (replica != except_me && replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { + return 1; + } + } + return 0; +} + +void initClientReplicationData(client *c) { + if (c->repl_data) return; + c->repl_data = (ClientReplicationData *)zcalloc(sizeof(ClientReplicationData)); +} + +void freeClientReplicationData(client *c) { + if (!c->repl_data) return; + freeReplicaReferencedReplBuffer(c); + /* Primary/replica cleanup Case 1: + * we lost the connection with a replica. */ + if (c->flag.replica) { + /* If there is no any other replica waiting dumping RDB finished, the + * current child process need not continue to dump RDB, then we kill it. + * So child process won't use more memory, and we also can fork a new + * child process asap to dump rdb for next full synchronization or bgsave. + * But we also need to check if users enable 'save' RDB, if enable, we + * should not remove directly since that means RDB is important for users + * to keep data safe and we may delay configured 'save' for full sync. */ + if (server.saveparamslen == 0 && c->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && + server.child_type == CHILD_TYPE_RDB && server.rdb_child_type == RDB_CHILD_TYPE_DISK && + anyOtherReplicaWaitRdb(c) == 0) { + serverLog(LL_NOTICE, "Background saving, persistence disabled, last replica dropped, killing fork child."); + killRDBChild(); + } + if (c->repl_data->repl_state == REPLICA_STATE_SEND_BULK) { + if (c->repl_data->repldbfd != -1) close(c->repl_data->repldbfd); + if (c->repl_data->replpreamble) sdsfree(c->repl_data->replpreamble); + } + list *l = (c->flag.monitor) ? server.monitors : server.replicas; + listNode *ln = listSearchKey(l, c); + serverAssert(ln != NULL); + listDelNode(l, ln); + /* We need to remember the time when we started to have zero + * attached replicas, as after some time we'll free the replication + * backlog. */ + if (getClientType(c) == CLIENT_TYPE_REPLICA && listLength(server.replicas) == 0) + server.repl_no_replicas_since = server.unixtime; + refreshGoodReplicasCount(); + /* Fire the replica change modules event. */ + if (c->repl_data->repl_state == REPLICA_STATE_ONLINE) + moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE, VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, + NULL); + } + if (c->flag.primary) replicationHandlePrimaryDisconnection(); + sdsfree(c->repl_data->replica_addr); + zfree(c->repl_data); + c->repl_data = NULL; +} + /* REPLCONF