From a8176a10d3421f097251f7d46053b233164954a3 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Mon, 16 Dec 2024 13:03:30 +0000 Subject: [PATCH 01/11] Refactor syncWithPrimary - on error sdsfree in `error` label Signed-off-by: Nitai Caro --- src/replication.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/replication.c b/src/replication.c index b5ce77f5e0..67989e5a97 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3464,7 +3464,6 @@ void syncWithPrimary(connection *conn) { if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && strncmp(err, "-ERR operation not permitted", 28) != 0) { serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err); - sdsfree(err); goto error; } else { serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); @@ -3537,7 +3536,6 @@ void syncWithPrimary(connection *conn) { if (err == NULL) goto no_response_error; if (err[0] == '-') { serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); - sdsfree(err); goto error; } sdsfree(err); @@ -3759,11 +3757,11 @@ void syncWithPrimary(connection *conn) { server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_state = REPL_STATE_CONNECT; + if (err) sdsfree(err); return; write_error: /* Handle sendCommand() errors. */ serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", err); - sdsfree(err); goto error; } From f33d06bb8133f3d8a6f22a971edb288771652cf2 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 22 Dec 2024 10:50:21 +0000 Subject: [PATCH 02/11] Refactor syncWithPrimary success cases to `ok` label Signed-off-by: Nitai Caro --- src/replication.c | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/replication.c b/src/replication.c index 67989e5a97..c490e2718e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3446,7 +3446,7 @@ void syncWithPrimary(connection *conn) { * that will take care about this. */ err = sendCommand(conn, "PING", NULL); if (err) goto write_error; - return; + goto ok; } /* Receive the PONG command. */ @@ -3524,7 +3524,7 @@ void syncWithPrimary(connection *conn) { if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; - return; + goto ok; } if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.primary_auth) @@ -3538,10 +3538,8 @@ void syncWithPrimary(connection *conn) { serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); goto error; } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - return; + goto ok; } /* Receive REPLCONF listening-port reply. */ @@ -3556,9 +3554,8 @@ void syncWithPrimary(connection *conn) { "REPLCONF listening-port: %s", err); } - sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; - return; + goto ok; } if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.replica_announce_ip) @@ -3576,9 +3573,8 @@ void syncWithPrimary(connection *conn) { "REPLCONF ip-address: %s", err); } - sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - return; + goto ok; } /* Receive CAPA reply. */ @@ -3593,10 +3589,8 @@ void syncWithPrimary(connection *conn) { "REPLCONF capa: %s", err); } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; - return; + goto ok; } /* Receive VERSION reply. */ @@ -3610,8 +3604,6 @@ void syncWithPrimary(connection *conn) { "REPLCONF VERSION: %s", err); } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_SEND_PSYNC; } @@ -3627,7 +3619,7 @@ void syncWithPrimary(connection *conn) { goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - return; + goto ok; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ @@ -3669,7 +3661,7 @@ void syncWithPrimary(connection *conn) { serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " "accept connections in read-write mode.\n"); } - return; + goto ok; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC @@ -3724,7 +3716,7 @@ void syncWithPrimary(connection *conn) { goto error; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; - return; + goto ok; } /* Setup the non blocking download of the bulk file. */ if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { @@ -3739,7 +3731,7 @@ void syncWithPrimary(connection *conn) { server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; - return; + goto ok; no_response_error: /* Handle receiveSynchronousResponse() error when primary has no reply */ serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); @@ -3763,6 +3755,10 @@ void syncWithPrimary(connection *conn) { write_error: /* Handle sendCommand() errors. */ serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", err); goto error; + +ok: + if (err) sdsfree(err); + return; } int connectWithPrimary(void) { From fcf2b359a47c53285497ae2e74d77343468c3c98 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 22 Dec 2024 11:36:04 +0000 Subject: [PATCH 03/11] Refactor syncWithPrimary if-elses of the replication state to switch-case Signed-off-by: Nitai Caro --- src/replication.c | 383 ++++++++++++++++++++++------------------------ 1 file changed, 184 insertions(+), 199 deletions(-) diff --git a/src/replication.c b/src/replication.c index c490e2718e..ccd678757e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3433,210 +3433,195 @@ void syncWithPrimary(connection *conn) { serverLog(LL_WARNING, "Error condition on socket for SYNC: %s", connGetLastError(conn)); goto error; } - - /* Send a PING to check the primary is able to reply without errors. */ - if (server.repl_state == REPL_STATE_CONNECTING) { - serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); - /* Delete the writable event so that the readable event remains - * registered and we can wait for the PONG reply. */ - connSetReadHandler(conn, syncWithPrimary); - connSetWriteHandler(conn, NULL); - server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; - /* Send the PING, don't check for errors at all, we have the timeout - * that will take care about this. */ - err = sendCommand(conn, "PING", NULL); - if (err) goto write_error; - goto ok; - } - - /* Receive the PONG command. */ - if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) { - err = receiveSynchronousResponse(conn); - - /* The primary did not reply */ - if (err == NULL) goto no_response_error; + switch (server.repl_state) { + /* Send a PING to check the primary is able to reply without errors. */ + case REPL_STATE_CONNECTING: + serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); + /* Delete the writable event so that the readable event remains + * registered and we can wait for the PONG reply. */ + connSetReadHandler(conn, syncWithPrimary); + connSetWriteHandler(conn, NULL); + server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; + /* Send the PING, don't check for errors at all, we have the timeout + * that will take care about this. */ + err = sendCommand(conn, "PING", NULL); + if (err) goto write_error; + goto ok; + /* Receive the PONG command. */ + case REPL_STATE_RECEIVE_PING_REPLY: + err = receiveSynchronousResponse(conn); + + /* The primary did not reply */ + if (err == NULL) goto no_response_error; /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis OSS replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ - if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && - strncmp(err, "-ERR operation not permitted", 28) != 0) { - serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err); - goto error; - } else { - serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); - } - sdsfree(err); - err = NULL; - server.repl_state = REPL_STATE_SEND_HANDSHAKE; - } - - if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { - /* AUTH with the primary if required. */ - if (server.primary_auth) { - char *args[3] = {"AUTH", NULL, NULL}; - size_t lens[3] = {4, 0, 0}; - int argc = 1; - if (server.primary_user) { - args[argc] = server.primary_user; - lens[argc] = strlen(server.primary_user); + if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && + strncmp(err, "-ERR operation not permitted", 28) != 0) { + serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err); + goto error; + } else { + serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); + } + sdsfree(err); + err = NULL; + server.repl_state = REPL_STATE_SEND_HANDSHAKE; + case REPL_STATE_SEND_HANDSHAKE: + /* AUTH with the primary if required. */ + if (server.primary_auth) { + char *args[3] = {"AUTH", NULL, NULL}; + size_t lens[3] = {4, 0, 0}; + int argc = 1; + if (server.primary_user) { + args[argc] = server.primary_user; + lens[argc] = strlen(server.primary_user); + argc++; + } + args[argc] = server.primary_auth; + lens[argc] = sdslen(server.primary_auth); argc++; + err = sendCommandArgv(conn, argc, args, lens); + if (err) goto write_error; } - args[argc] = server.primary_auth; - lens[argc] = sdslen(server.primary_auth); - argc++; - err = sendCommandArgv(conn, argc, args, lens); - if (err) goto write_error; - } - - /* Set the replica port, so that primary's INFO command can list the - * replica listening port correctly. */ - { - sds portstr = getReplicaPortString(); - err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); - sdsfree(portstr); - if (err) goto write_error; - } - - /* Set the replica ip, so that primary's INFO command can list the - * replica IP address port correctly in case of port forwarding or NAT. - * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ - if (server.replica_announce_ip) { - err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); - if (err) goto write_error; - } - - /* Inform the primary of our (replica) capabilities. - * - * EOF: supports EOF-style RDB transfer for diskless replication. - * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * - * The primary will ignore capabilities it does not understand. */ - err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); - if (err) goto write_error; - - /* Inform the primary of our (replica) version. */ - err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); - if (err) goto write_error; - - server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; - goto ok; - } - - if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.primary_auth) - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - - /* Receive AUTH reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); - goto error; - } - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - goto ok; - } - /* Receive REPLCONF listening-port reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF listening-port. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF listening-port: %s", - err); - } - server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; - goto ok; - } - - if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.replica_announce_ip) - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + /* Set the replica port, so that primary's INFO command can list the + * replica listening port correctly. */ + { + sds portstr = getReplicaPortString(); + err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); + sdsfree(portstr); + if (err) goto write_error; + } - /* Receive REPLCONF ip-address reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF ip-address. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF ip-address: %s", - err); - } - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - goto ok; - } + /* Set the replica ip, so that primary's INFO command can list the + * replica IP address port correctly in case of port forwarding or NAT. + * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ + if (server.replica_announce_ip) { + err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); + if (err) goto write_error; + } - /* Receive CAPA reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF capa. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF capa: %s", - err); - } - server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; - goto ok; - } + /* Inform the primary of our (replica) capabilities. + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * + * The primary will ignore capabilities it does not understand. */ + err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", + server.dual_channel_replication ? "capa" : NULL, + server.dual_channel_replication ? "dual-channel" : NULL, NULL); + if (err) goto write_error; - /* Receive VERSION reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF VERSION: %s", - err); - } - server.repl_state = REPL_STATE_SEND_PSYNC; - } - - /* Try a partial resynchronization. If we don't have a cached primary - * replicaTryPartialResynchronization() will at least try to use PSYNC - * to start a full resynchronization so that we get the primary replid - * and the global offset, to try a partial resync at the next - * reconnection attempt. */ - if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { - err = sdsnew("Write error sending the PSYNC command."); - abortFailover("Write error to failover target"); - goto write_error; - } - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - goto ok; - } + /* Inform the primary of our (replica) version. */ + err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); + if (err) goto write_error; - /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { - serverLog(LL_WARNING, - "syncWithPrimary(): state machine error, " - "state should be RECEIVE_PSYNC but is %d", - server.repl_state); - goto error; + server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; + goto ok; + /* Receive AUTH reply. */ + case REPL_STATE_RECEIVE_AUTH_REPLY: + if (server.primary_auth) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + if (err[0] == '-') { + serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); + goto error; + } + server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + goto ok; + } + server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + /* Receive REPLCONF listening-port reply. */ + case REPL_STATE_RECEIVE_PORT_REPLY: + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF listening-port. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF listening-port: %s", + err); + } + server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; + goto ok; + /* Receive REPLCONF ip-address reply. */ + case REPL_STATE_RECEIVE_IP_REPLY: + if (server.replica_announce_ip) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF ip-address. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF ip-address: %s", + err); + } + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + goto ok; + } + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + /* Receive CAPA reply. */ + case REPL_STATE_RECEIVE_CAPA_REPLY: + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF capa. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF capa: %s", + err); + } + server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; + goto ok; + /* Receive VERSION reply. */ + case REPL_STATE_RECEIVE_VERSION_REPLY: + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF VERSION: %s", + err); + } + server.repl_state = REPL_STATE_SEND_PSYNC; + /* Try a partial resynchronization. If we don't have a cached primary + * replicaTryPartialResynchronization() will at least try to use PSYNC + * to start a full resynchronization so that we get the primary replid + * and the global offset, to try a partial resync at the next + * reconnection attempt. */ + case REPL_STATE_SEND_PSYNC: + if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + err = sdsnew("Write error sending the PSYNC command."); + abortFailover("Write error to failover target"); + goto write_error; + } + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + goto ok; + + /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ + default: + if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { + serverLog(LL_WARNING, + "syncWithPrimary(): state machine error, " + "state should be RECEIVE_PSYNC but is %d", + server.repl_state); + goto error; + } } - + psync_result = replicaTryPartialResynchronization(conn, 1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, - * but there is nothing technically wrong with a full resync which - * could happen in edge cases. */ + * but there is nothing technically wrong with a full resync which + * could happen in edge cases. */ if (server.failover_state == FAILOVER_IN_PROGRESS) { if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { clearFailoverState(); @@ -3647,26 +3632,26 @@ void syncWithPrimary(connection *conn) { } /* If the primary is in an transient error, we should try to PSYNC - * from scratch later, so go to the error path. This happens when - * the server is loading the dataset or is not connected with its - * primary and so forth. */ + * from scratch later, so go to the error path. This happens when + * the server is loading the dataset or is not connected with its + * primary and so forth. */ if (psync_result == PSYNC_TRY_LATER) goto error; - + /* Note: if PSYNC does not return WAIT_REPLY, it will take care of - * uninstalling the read handler from the file descriptor. */ + * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " - "accept connections in read-write mode.\n"); + "accept connections in read-write mode.\n"); } goto ok; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC - * and the server.primary_replid and primary_initial_offset are - * already populated. */ + * and the server.primary_replid and primary_initial_offset are + * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE, "Retrying with SYNC..."); if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { @@ -3683,14 +3668,14 @@ void syncWithPrimary(connection *conn) { dfd = open(tmpfile, O_CREAT | O_WRONLY | O_EXCL, 0644); if (dfd != -1) break; /* We save the errno of open to prevent some systems from modifying it after - * the sleep call. For example, sleep in Mac will change errno to ETIMEDOUT. */ + * the sleep call. For example, sleep in Mac will change errno to ETIMEDOUT. */ int saved_errno = errno; sleep(1); errno = saved_errno; } if (dfd == -1) { serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", - strerror(errno)); + strerror(errno)); goto error; } server.repl_transfer_tmpfile = zstrdup(tmpfile); @@ -3698,7 +3683,7 @@ void syncWithPrimary(connection *conn) { } /* Using dual-channel-replication, the primary responded +DUALCHANNELSYNC. We need to - * initialize the RDB channel. */ + * initialize the RDB channel. */ if (psync_result == PSYNC_FULLRESYNC_DUAL_CHANNEL) { /* Create RDB connection */ server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); @@ -3712,7 +3697,7 @@ void syncWithPrimary(connection *conn) { if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; @@ -3722,7 +3707,7 @@ void syncWithPrimary(connection *conn) { if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } From 2ff8a00bf5b00c8e2e6c1c91bec845ba52258514 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 22 Dec 2024 12:53:01 +0000 Subject: [PATCH 04/11] Refactor syncWithPrimary - add fall through comments Signed-off-by: Nitai Caro --- src/replication.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/replication.c b/src/replication.c index ccd678757e..58c176bafc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3469,6 +3469,7 @@ void syncWithPrimary(connection *conn) { sdsfree(err); err = NULL; server.repl_state = REPL_STATE_SEND_HANDSHAKE; + /* fall through */ case REPL_STATE_SEND_HANDSHAKE: /* AUTH with the primary if required. */ if (server.primary_auth) { @@ -3534,6 +3535,7 @@ void syncWithPrimary(connection *conn) { goto ok; } server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + /* fall through */ /* Receive REPLCONF listening-port reply. */ case REPL_STATE_RECEIVE_PORT_REPLY: err = receiveSynchronousResponse(conn); @@ -3565,6 +3567,7 @@ void syncWithPrimary(connection *conn) { goto ok; } server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + /* fall through */ /* Receive CAPA reply. */ case REPL_STATE_RECEIVE_CAPA_REPLY: err = receiveSynchronousResponse(conn); @@ -3591,6 +3594,7 @@ void syncWithPrimary(connection *conn) { err); } server.repl_state = REPL_STATE_SEND_PSYNC; + /* fall through */ /* Try a partial resynchronization. If we don't have a cached primary * replicaTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the primary replid From 3b04925f667a015445dfc31f5e58da396db6e034 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 22 Dec 2024 12:10:51 +0000 Subject: [PATCH 05/11] Refactor replicaTryPartialResynchronization into replicaSendPsyncCommand and replicaProcessPsyncReply Signed-off-by: Nitai Caro --- src/replication.c | 93 ++++++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/src/replication.c b/src/replication.c index 58c176bafc..9f771e88d0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3051,54 +3051,55 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { #define PSYNC_NOT_SUPPORTED 4 #define PSYNC_TRY_LATER 5 #define PSYNC_FULLRESYNC_DUAL_CHANNEL 6 -int replicaTryPartialResynchronization(connection *conn, int read_reply) { +int replicaSendPsyncCommand(connection *conn) { char *psync_replid; char psync_offset[32]; sds reply; - /* Writing half */ - if (!read_reply) { - /* Initially set primary_initial_offset to -1 to mark the current - * primary replid and offset as not valid. Later if we'll be able to do - * a FULL resync using the PSYNC command we'll set the offset at the - * right value, so that this information will be propagated to the - * client structure representing the primary into server.primary. */ - server.primary_initial_offset = -1; + /* Initially set primary_initial_offset to -1 to mark the current + * primary replid and offset as not valid. Later if we'll be able to do + * a FULL resync using the PSYNC command we'll set the offset at the + * right value, so that this information will be propagated to the + * client structure representing the primary into server.primary. */ + server.primary_initial_offset = -1; - if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { - /* While in dual channel replication, we should use our prepared repl id and offset. */ - psync_replid = server.repl_provisional_primary.replid; - snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1); - dualChannelServerLog(LL_NOTICE, - "Trying a partial resynchronization using main channel (request %s:%s).", - psync_replid, psync_offset); - } else if (server.cached_primary) { - psync_replid = server.cached_primary->replid; - snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->reploff + 1); - serverLog(LL_NOTICE, "Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); - } else { - serverLog(LL_NOTICE, "Partial resynchronization not possible (no cached primary)"); - psync_replid = "?"; - memcpy(psync_offset, "-1", 3); - } + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { + /* While in dual channel replication, we should use our prepared repl id and offset. */ + psync_replid = server.repl_provisional_primary.replid; + snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1); + dualChannelServerLog(LL_NOTICE, + "Trying a partial resynchronization using main channel (request %s:%s).", + psync_replid, psync_offset); + } else if (server.cached_primary) { + psync_replid = server.cached_primary->replid; + snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->reploff + 1); + serverLog(LL_NOTICE, "Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); + } else { + serverLog(LL_NOTICE, "Partial resynchronization not possible (no cached primary)"); + psync_replid = "?"; + memcpy(psync_offset, "-1", 3); + } - /* Issue the PSYNC command, if this is a primary with a failover in - * progress then send the failover argument to the replica to cause it - * to become a primary */ - if (server.failover_state == FAILOVER_IN_PROGRESS) { - reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL); - } else { - reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, NULL); - } + /* Issue the PSYNC command, if this is a primary with a failover in + * progress then send the failover argument to the replica to cause it + * to become a primary */ + if (server.failover_state == FAILOVER_IN_PROGRESS) { + reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL); + } else { + reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, NULL); + } - if (reply != NULL) { - serverLog(LL_WARNING, "Unable to send PSYNC to primary: %s", reply); - sdsfree(reply); - connSetReadHandler(conn, NULL); - return PSYNC_WRITE_ERROR; - } - return PSYNC_WAIT_REPLY; + if (reply != NULL) { + serverLog(LL_WARNING, "Unable to send PSYNC to primary: %s", reply); + sdsfree(reply); + connSetReadHandler(conn, NULL); + return PSYNC_WRITE_ERROR; } + return PSYNC_WAIT_REPLY; +} + +int replicaProcessPsyncReply(connection *conn) { + sds reply; /* Reading half */ reply = receiveSynchronousResponse(conn); @@ -3270,7 +3271,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { if (server.debug_pause_after_fork) debugPauseProcess(); - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) { dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Write error."); *err = sdsnew(connGetLastError(conn)); return C_ERR; @@ -3279,7 +3280,7 @@ int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { } int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) { - int psync_result = replicaTryPartialResynchronization(conn, 1); + int psync_result = replicaProcessPsyncReply(conn); if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { @@ -3596,12 +3597,12 @@ void syncWithPrimary(connection *conn) { server.repl_state = REPL_STATE_SEND_PSYNC; /* fall through */ /* Try a partial resynchronization. If we don't have a cached primary - * replicaTryPartialResynchronization() will at least try to use PSYNC + * replicaSendPsyncCommand() will at least try to use PSYNC * to start a full resynchronization so that we get the primary replid * and the global offset, to try a partial resync at the next * reconnection attempt. */ - case REPL_STATE_SEND_PSYNC: - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + case REPL_STATE_SEND_PSYNC: + if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); abortFailover("Write error to failover target"); goto write_error; @@ -3620,7 +3621,7 @@ void syncWithPrimary(connection *conn) { } } - psync_result = replicaTryPartialResynchronization(conn, 1); + psync_result = replicaProcessPsyncReply(conn); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, From 4f0cb2724f7b623de28c04c626255b666281506a Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Mon, 23 Dec 2024 09:44:49 +0000 Subject: [PATCH 06/11] Refactor syncWithPrimary - move state machine logic into helper functions Signed-off-by: Nitai Caro --- src/replication.c | 322 +++++++++++++++++++++++++++------------------- 1 file changed, 190 insertions(+), 132 deletions(-) diff --git a/src/replication.c b/src/replication.c index 9f771e88d0..b52e098bec 100644 --- a/src/replication.c +++ b/src/replication.c @@ -56,6 +56,7 @@ void replicationSteadyStateInit(void); void dualChannelSetupMainConnForPsync(connection *conn); void dualChannelSyncHandleRdbLoadCompletion(void); static void dualChannelFullSyncWithPrimary(connection *conn); +void syncWithPrimary(connection *conn); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case @@ -3339,6 +3340,171 @@ void dualChannelSetupMainConnForPsync(connection *conn) { sdsfree(err); } + +int syncWithPrimaryHandleConnectingState(connection *conn, sds *err) { + serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); + /* Delete the writable event so that the readable event remains + * registered and we can wait for the PONG reply. */ + connSetReadHandler(conn, syncWithPrimary); + connSetWriteHandler(conn, NULL); + /* Send the PING, don't check for errors at all, we have the timeout + * that will take care about this. */ + *err = sendCommand(conn, "PING", NULL); + if (*err) return C_ERR; + return C_OK; +} + +int syncWithPrimaryHandleReceivePingReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + + /* The primary did not reply */ + if (*err == NULL) return C_ERR; + + /* We accept only two replies as valid, a positive +PONG reply + * (we just check for "+") or an authentication error. + * Note that older versions of Redis OSS replied with "operation not + * permitted" instead of using a proper error code, so we test + * both. */ + if (*err[0] != '+' && strncmp(*err, "-NOAUTH", 7) != 0 && strncmp(*err, "-NOPERM", 7) != 0 && + strncmp(*err, "-ERR operation not permitted", 28) != 0) { + serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", *err); + return C_ERR; + } else { + serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); + } + return C_OK; +} + +int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) { + /* AUTH with the primary if required. */ + if (server.primary_auth) { + char *args[3] = {"AUTH", NULL, NULL}; + size_t lens[3] = {4, 0, 0}; + int argc = 1; + if (server.primary_user) { + args[argc] = server.primary_user; + lens[argc] = strlen(server.primary_user); + argc++; + } + args[argc] = server.primary_auth; + lens[argc] = sdslen(server.primary_auth); + argc++; + *err = sendCommandArgv(conn, argc, args, lens); + if (*err) return C_ERR; + } + + /* Set the replica port, so that primary's INFO command can list the + * replica listening port correctly. */ + { + sds portstr = getReplicaPortString(); + *err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); + sdsfree(portstr); + if (*err) return C_ERR; + } + + /* Set the replica ip, so that primary's INFO command can list the + * replica IP address port correctly in case of port forwarding or NAT. + * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ + if (server.replica_announce_ip) { + *err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); + if (*err) return C_ERR; + } + + /* Inform the primary of our (replica) capabilities. + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * + * The primary will ignore capabilities it does not understand. */ + *err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", + server.dual_channel_replication ? "capa" : NULL, + server.dual_channel_replication ? "dual-channel" : NULL, NULL); + if (*err) return C_ERR; + + /* Inform the primary of our (replica) version. */ + *err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); + if (*err) return C_ERR; + return C_OK; +} + +int syncWithPrimaryHandleReceiveAuthReplyState(connection *conn, sds *err) { + if (server.primary_auth) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + if (*err[0] == '-') { + serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", *err); + return C_ERR; + } + } + return C_OK; +} + +int syncWithPrimaryHandleReceivePortReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF listening-port. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF listening-port: %s", + *err); + } + return C_OK; +} + +int syncWithPrimaryHandleReceiveIPReplyState(connection *conn, sds *err) { + if (server.replica_announce_ip) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF ip-address. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF ip-address: %s", + *err); + } + } + return C_OK; +} + +int syncWithPrimaryHandleReceiveCapaReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF capa. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF capa: %s", + *err); + } + return C_OK; +} + +int syncWithPrimaryHandleReceiveVersionReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF VERSION: %s", + *err); + } + return C_OK; +} + +int syncWithPrimaryHandleSendPsyncState(connection *conn, sds *err) { + if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) { + *err = sdsnew("Write error sending the PSYNC command."); + abortFailover("Write error to failover target"); + return C_ERR; + } + return C_OK; +} + /* * Dual channel for full sync * @@ -3419,7 +3585,7 @@ void dualChannelSetupMainConnForPsync(connection *conn) { * establish a connection with the primary. */ void syncWithPrimary(connection *conn) { char tmpfile[256], *err = NULL; - int psync_result; + int psync_result, ret; /* If this event fired after the user turned the instance into a primary * with REPLICAOF NO ONE we must just return ASAP. */ @@ -3437,163 +3603,59 @@ void syncWithPrimary(connection *conn) { switch (server.repl_state) { /* Send a PING to check the primary is able to reply without errors. */ case REPL_STATE_CONNECTING: - serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); - /* Delete the writable event so that the readable event remains - * registered and we can wait for the PONG reply. */ - connSetReadHandler(conn, syncWithPrimary); - connSetWriteHandler(conn, NULL); + ret = syncWithPrimaryHandleConnectingState(conn, &err); + if (ret == C_ERR) goto write_error; server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; - /* Send the PING, don't check for errors at all, we have the timeout - * that will take care about this. */ - err = sendCommand(conn, "PING", NULL); - if (err) goto write_error; goto ok; /* Receive the PONG command. */ case REPL_STATE_RECEIVE_PING_REPLY: - err = receiveSynchronousResponse(conn); - - /* The primary did not reply */ - if (err == NULL) goto no_response_error; - - /* We accept only two replies as valid, a positive +PONG reply - * (we just check for "+") or an authentication error. - * Note that older versions of Redis OSS replied with "operation not - * permitted" instead of using a proper error code, so we test - * both. */ - if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && - strncmp(err, "-ERR operation not permitted", 28) != 0) { - serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err); + ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err); + if (ret == C_ERR) { + if (err == NULL) goto no_response_error; goto error; - } else { - serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); } - sdsfree(err); + if (err) sdsfree(err); err = NULL; server.repl_state = REPL_STATE_SEND_HANDSHAKE; /* fall through */ case REPL_STATE_SEND_HANDSHAKE: - /* AUTH with the primary if required. */ - if (server.primary_auth) { - char *args[3] = {"AUTH", NULL, NULL}; - size_t lens[3] = {4, 0, 0}; - int argc = 1; - if (server.primary_user) { - args[argc] = server.primary_user; - lens[argc] = strlen(server.primary_user); - argc++; - } - args[argc] = server.primary_auth; - lens[argc] = sdslen(server.primary_auth); - argc++; - err = sendCommandArgv(conn, argc, args, lens); - if (err) goto write_error; - } - - /* Set the replica port, so that primary's INFO command can list the - * replica listening port correctly. */ - { - sds portstr = getReplicaPortString(); - err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); - sdsfree(portstr); - if (err) goto write_error; - } - - /* Set the replica ip, so that primary's INFO command can list the - * replica IP address port correctly in case of port forwarding or NAT. - * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ - if (server.replica_announce_ip) { - err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); - if (err) goto write_error; - } - - /* Inform the primary of our (replica) capabilities. - * - * EOF: supports EOF-style RDB transfer for diskless replication. - * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * - * The primary will ignore capabilities it does not understand. */ - err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); - if (err) goto write_error; - - /* Inform the primary of our (replica) version. */ - err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); - if (err) goto write_error; - + ret = syncWithPrimaryHandleSendHandshakeState(conn, &err); + if (ret == C_ERR) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; goto ok; /* Receive AUTH reply. */ case REPL_STATE_RECEIVE_AUTH_REPLY: - if (server.primary_auth) { - err = receiveSynchronousResponse(conn); + ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err); + if (ret == C_ERR) { if (err == NULL) goto no_response_error; - if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); - goto error; - } - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - goto ok; + goto error; } server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + if (server.primary_auth) goto ok; /* fall through */ /* Receive REPLCONF listening-port reply. */ case REPL_STATE_RECEIVE_PORT_REPLY: - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF listening-port. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF listening-port: %s", - err); - } + ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; goto ok; /* Receive REPLCONF ip-address reply. */ case REPL_STATE_RECEIVE_IP_REPLY: - if (server.replica_announce_ip) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF ip-address. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF ip-address: %s", - err); - } - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - goto ok; - } + ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + if (server.replica_announce_ip) goto ok; /* fall through */ /* Receive CAPA reply. */ case REPL_STATE_RECEIVE_CAPA_REPLY: - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF capa. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF capa: %s", - err); - } + ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; goto ok; /* Receive VERSION reply. */ case REPL_STATE_RECEIVE_VERSION_REPLY: - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF VERSION: %s", - err); - } + ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; server.repl_state = REPL_STATE_SEND_PSYNC; /* fall through */ /* Try a partial resynchronization. If we don't have a cached primary @@ -3602,14 +3664,10 @@ void syncWithPrimary(connection *conn) { * and the global offset, to try a partial resync at the next * reconnection attempt. */ case REPL_STATE_SEND_PSYNC: - if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) { - err = sdsnew("Write error sending the PSYNC command."); - abortFailover("Write error to failover target"); - goto write_error; - } + ret = syncWithPrimaryHandleSendPsyncState(conn, &err); + if (ret == C_ERR) goto write_error; server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; goto ok; - /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ default: if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { From 6df1d3ccd3ebaa8b0c26d439994d34dc00537503 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Mon, 23 Dec 2024 11:29:43 +0000 Subject: [PATCH 07/11] Run formatter --- src/replication.c | 258 +++++++++++++++++++++++----------------------- 1 file changed, 129 insertions(+), 129 deletions(-) diff --git a/src/replication.c b/src/replication.c index b52e098bec..c3970c8945 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3058,10 +3058,10 @@ int replicaSendPsyncCommand(connection *conn) { sds reply; /* Initially set primary_initial_offset to -1 to mark the current - * primary replid and offset as not valid. Later if we'll be able to do - * a FULL resync using the PSYNC command we'll set the offset at the - * right value, so that this information will be propagated to the - * client structure representing the primary into server.primary. */ + * primary replid and offset as not valid. Later if we'll be able to do + * a FULL resync using the PSYNC command we'll set the offset at the + * right value, so that this information will be propagated to the + * client structure representing the primary into server.primary. */ server.primary_initial_offset = -1; if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { @@ -3082,8 +3082,8 @@ int replicaSendPsyncCommand(connection *conn) { } /* Issue the PSYNC command, if this is a primary with a failover in - * progress then send the failover argument to the replica to cause it - * to become a primary */ + * progress then send the failover argument to the replica to cause it + * to become a primary */ if (server.failover_state == FAILOVER_IN_PROGRESS) { reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL); } else { @@ -3344,11 +3344,11 @@ void dualChannelSetupMainConnForPsync(connection *conn) { int syncWithPrimaryHandleConnectingState(connection *conn, sds *err) { serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains - * registered and we can wait for the PONG reply. */ + * registered and we can wait for the PONG reply. */ connSetReadHandler(conn, syncWithPrimary); connSetWriteHandler(conn, NULL); /* Send the PING, don't check for errors at all, we have the timeout - * that will take care about this. */ + * that will take care about this. */ *err = sendCommand(conn, "PING", NULL); if (*err) return C_ERR; return C_OK; @@ -3361,10 +3361,10 @@ int syncWithPrimaryHandleReceivePingReplyState(connection *conn, sds *err) { if (*err == NULL) return C_ERR; /* We accept only two replies as valid, a positive +PONG reply - * (we just check for "+") or an authentication error. - * Note that older versions of Redis OSS replied with "operation not - * permitted" instead of using a proper error code, so we test - * both. */ + * (we just check for "+") or an authentication error. + * Note that older versions of Redis OSS replied with "operation not + * permitted" instead of using a proper error code, so we test + * both. */ if (*err[0] != '+' && strncmp(*err, "-NOAUTH", 7) != 0 && strncmp(*err, "-NOPERM", 7) != 0 && strncmp(*err, "-ERR operation not permitted", 28) != 0) { serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", *err); @@ -3394,7 +3394,7 @@ int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) { } /* Set the replica port, so that primary's INFO command can list the - * replica listening port correctly. */ + * replica listening port correctly. */ { sds portstr = getReplicaPortString(); *err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); @@ -3403,22 +3403,22 @@ int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) { } /* Set the replica ip, so that primary's INFO command can list the - * replica IP address port correctly in case of port forwarding or NAT. - * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ + * replica IP address port correctly in case of port forwarding or NAT. + * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ if (server.replica_announce_ip) { *err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); if (*err) return C_ERR; } /* Inform the primary of our (replica) capabilities. - * - * EOF: supports EOF-style RDB transfer for diskless replication. - * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * - * The primary will ignore capabilities it does not understand. */ + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * + * The primary will ignore capabilities it does not understand. */ *err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); + server.dual_channel_replication ? "capa" : NULL, + server.dual_channel_replication ? "dual-channel" : NULL, NULL); if (*err) return C_ERR; /* Inform the primary of our (replica) version. */ @@ -3443,12 +3443,12 @@ int syncWithPrimaryHandleReceivePortReplyState(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF listening-port. */ + * REPLCONF listening-port. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF listening-port: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF listening-port: %s", + *err); } return C_OK; } @@ -3458,12 +3458,12 @@ int syncWithPrimaryHandleReceiveIPReplyState(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF ip-address. */ + * REPLCONF ip-address. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF ip-address: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF ip-address: %s", + *err); } } return C_OK; @@ -3473,12 +3473,12 @@ int syncWithPrimaryHandleReceiveCapaReplyState(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF capa. */ + * REPLCONF capa. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF capa: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF capa: %s", + *err); } return C_OK; } @@ -3489,9 +3489,9 @@ int syncWithPrimaryHandleReceiveVersionReplyState(connection *conn, sds *err) { /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF VERSION: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF VERSION: %s", + *err); } return C_OK; } @@ -3601,90 +3601,90 @@ void syncWithPrimary(connection *conn) { goto error; } switch (server.repl_state) { - /* Send a PING to check the primary is able to reply without errors. */ - case REPL_STATE_CONNECTING: - ret = syncWithPrimaryHandleConnectingState(conn, &err); - if (ret == C_ERR) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; - goto ok; - /* Receive the PONG command. */ - case REPL_STATE_RECEIVE_PING_REPLY: - ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err); - if (ret == C_ERR) { - if (err == NULL) goto no_response_error; - goto error; - } - if (err) sdsfree(err); - err = NULL; - server.repl_state = REPL_STATE_SEND_HANDSHAKE; - /* fall through */ - case REPL_STATE_SEND_HANDSHAKE: - ret = syncWithPrimaryHandleSendHandshakeState(conn, &err); - if (ret == C_ERR) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; - goto ok; - /* Receive AUTH reply. */ - case REPL_STATE_RECEIVE_AUTH_REPLY: - ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err); - if (ret == C_ERR) { - if (err == NULL) goto no_response_error; - goto error; - } - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - if (server.primary_auth) goto ok; - /* fall through */ - /* Receive REPLCONF listening-port reply. */ - case REPL_STATE_RECEIVE_PORT_REPLY: - ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; - goto ok; - /* Receive REPLCONF ip-address reply. */ - case REPL_STATE_RECEIVE_IP_REPLY: - ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - if (server.replica_announce_ip) goto ok; - /* fall through */ - /* Receive CAPA reply. */ - case REPL_STATE_RECEIVE_CAPA_REPLY: - ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; - goto ok; - /* Receive VERSION reply. */ - case REPL_STATE_RECEIVE_VERSION_REPLY: - ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_SEND_PSYNC; - /* fall through */ - /* Try a partial resynchronization. If we don't have a cached primary - * replicaSendPsyncCommand() will at least try to use PSYNC - * to start a full resynchronization so that we get the primary replid - * and the global offset, to try a partial resync at the next - * reconnection attempt. */ - case REPL_STATE_SEND_PSYNC: - ret = syncWithPrimaryHandleSendPsyncState(conn, &err); - if (ret == C_ERR) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - goto ok; - /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ - default: - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { - serverLog(LL_WARNING, - "syncWithPrimary(): state machine error, " - "state should be RECEIVE_PSYNC but is %d", - server.repl_state); - goto error; - } + /* Send a PING to check the primary is able to reply without errors. */ + case REPL_STATE_CONNECTING: + ret = syncWithPrimaryHandleConnectingState(conn, &err); + if (ret == C_ERR) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; + goto ok; + /* Receive the PONG command. */ + case REPL_STATE_RECEIVE_PING_REPLY: + ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err); + if (ret == C_ERR) { + if (err == NULL) goto no_response_error; + goto error; + } + if (err) sdsfree(err); + err = NULL; + server.repl_state = REPL_STATE_SEND_HANDSHAKE; + /* fall through */ + case REPL_STATE_SEND_HANDSHAKE: + ret = syncWithPrimaryHandleSendHandshakeState(conn, &err); + if (ret == C_ERR) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; + goto ok; + /* Receive AUTH reply. */ + case REPL_STATE_RECEIVE_AUTH_REPLY: + ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err); + if (ret == C_ERR) { + if (err == NULL) goto no_response_error; + goto error; + } + server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + if (server.primary_auth) goto ok; + /* fall through */ + /* Receive REPLCONF listening-port reply. */ + case REPL_STATE_RECEIVE_PORT_REPLY: + ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; + goto ok; + /* Receive REPLCONF ip-address reply. */ + case REPL_STATE_RECEIVE_IP_REPLY: + ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + if (server.replica_announce_ip) goto ok; + /* fall through */ + /* Receive CAPA reply. */ + case REPL_STATE_RECEIVE_CAPA_REPLY: + ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; + goto ok; + /* Receive VERSION reply. */ + case REPL_STATE_RECEIVE_VERSION_REPLY: + ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_SEND_PSYNC; + /* fall through */ + /* Try a partial resynchronization. If we don't have a cached primary + * replicaSendPsyncCommand() will at least try to use PSYNC + * to start a full resynchronization so that we get the primary replid + * and the global offset, to try a partial resync at the next + * reconnection attempt. */ + case REPL_STATE_SEND_PSYNC: + ret = syncWithPrimaryHandleSendPsyncState(conn, &err); + if (ret == C_ERR) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + goto ok; + /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ + default: + if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { + serverLog(LL_WARNING, + "syncWithPrimary(): state machine error, " + "state should be RECEIVE_PSYNC but is %d", + server.repl_state); + goto error; + } } - + psync_result = replicaProcessPsyncReply(conn); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, - * but there is nothing technically wrong with a full resync which - * could happen in edge cases. */ + * but there is nothing technically wrong with a full resync which + * could happen in edge cases. */ if (server.failover_state == FAILOVER_IN_PROGRESS) { if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { clearFailoverState(); @@ -3695,26 +3695,26 @@ void syncWithPrimary(connection *conn) { } /* If the primary is in an transient error, we should try to PSYNC - * from scratch later, so go to the error path. This happens when - * the server is loading the dataset or is not connected with its - * primary and so forth. */ + * from scratch later, so go to the error path. This happens when + * the server is loading the dataset or is not connected with its + * primary and so forth. */ if (psync_result == PSYNC_TRY_LATER) goto error; - + /* Note: if PSYNC does not return WAIT_REPLY, it will take care of - * uninstalling the read handler from the file descriptor. */ + * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " - "accept connections in read-write mode.\n"); + "accept connections in read-write mode.\n"); } goto ok; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC - * and the server.primary_replid and primary_initial_offset are - * already populated. */ + * and the server.primary_replid and primary_initial_offset are + * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE, "Retrying with SYNC..."); if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { @@ -3731,14 +3731,14 @@ void syncWithPrimary(connection *conn) { dfd = open(tmpfile, O_CREAT | O_WRONLY | O_EXCL, 0644); if (dfd != -1) break; /* We save the errno of open to prevent some systems from modifying it after - * the sleep call. For example, sleep in Mac will change errno to ETIMEDOUT. */ + * the sleep call. For example, sleep in Mac will change errno to ETIMEDOUT. */ int saved_errno = errno; sleep(1); errno = saved_errno; } if (dfd == -1) { serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", - strerror(errno)); + strerror(errno)); goto error; } server.repl_transfer_tmpfile = zstrdup(tmpfile); @@ -3746,7 +3746,7 @@ void syncWithPrimary(connection *conn) { } /* Using dual-channel-replication, the primary responded +DUALCHANNELSYNC. We need to - * initialize the RDB channel. */ + * initialize the RDB channel. */ if (psync_result == PSYNC_FULLRESYNC_DUAL_CHANNEL) { /* Create RDB connection */ server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); @@ -3760,7 +3760,7 @@ void syncWithPrimary(connection *conn) { if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; @@ -3770,7 +3770,7 @@ void syncWithPrimary(connection *conn) { if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } From d3c08592bd7a0fa0e10e8689ad2c058661485ac2 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Mon, 23 Dec 2024 13:33:14 +0000 Subject: [PATCH 08/11] Update descriptions of replicaProcessPsyncReply and replicaSendPsyncCommand --- src/replication.c | 79 ++++++++++++++++++++--------------------------- 1 file changed, 33 insertions(+), 46 deletions(-) diff --git a/src/replication.c b/src/replication.c index c3970c8945..b6c73ffbc4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2997,54 +2997,19 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { return; } -/* Try a partial resynchronization with the primary if we are about to reconnect. - * If there is no cached primary structure, at least try to issue a - * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC - * command in order to obtain the primary replid and the primary replication - * global offset. +/** + * Handles the initial step of the partial resynchronization process by + * preparing and sending a PSYNC command to the primary server. + * This function determines the appropriate replication ID (replid) + * and replication offset based on the server's state. If successful, + * the function signals readiness for the reply processing phase, which is + * processed in replicaProcessPsyncReply(). * - * This function is designed to be called from syncWithPrimary(), so the - * following assumptions are made: - * - * 1) We pass the function an already connected socket "fd". - * 2) This function does not close the file descriptor "fd". However in case - * of successful partial resynchronization, the function will reuse - * 'fd' as file descriptor of the server.primary client structure. - * - * The function is split in two halves: if read_reply is 0, the function - * writes the PSYNC command on the socket, and a new function call is - * needed, with read_reply set to 1, in order to read the reply of the - * command. This is useful in order to support non blocking operations, so - * that we write, return into the event loop, and read when there are data. - * - * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there - * was a write error, or PSYNC_WAIT_REPLY to signal we need another call - * with read_reply set to 1. However even when read_reply is set to 1 - * the function may return PSYNC_WAIT_REPLY again to signal there were - * insufficient data to read to complete its work. We should re-enter - * into the event loop and wait in such a case. - * - * The function returns: - * - * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue. - * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. - * In this case the primary replid and global replication - * offset is saved. - * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and - * the caller should fall back to SYNC. - * PSYNC_WRITE_ERROR: There was an error writing the command to the socket. - * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. - * PSYNC_TRY_LATER: Primary is currently in a transient error condition. - * - * Notable side effects: - * - * 1) As a side effect of the function call the function removes the readable - * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. - * 2) server.primary_initial_offset is set to the right value according - * to the primary reply. This will be used to populate the 'server.primary' - * structure replication offset. + * Return Values: + * - PSYNC_WRITE_ERROR: There was an error writing the command to the socket. + * - PSYNC_WAIT_REPLY: PSYNC was successfully sent, awaiting a reply. The next + * step is to call replicaProcessPsyncReply(). */ - #define PSYNC_WRITE_ERROR 0 #define PSYNC_WAIT_REPLY 1 #define PSYNC_CONTINUE 2 @@ -3099,6 +3064,28 @@ int replicaSendPsyncCommand(connection *conn) { return PSYNC_WAIT_REPLY; } + +/** + * Processes the reply from the primary server following a PSYNC command that + * was sent with replicaSendPsyncCommand(). + * This function interprets the reply to determine if partial resynchronization + * can proceed or if a full synchronization is required. + * + * Return Values: + * - PSYNC_TRY_LATER: Primary is currently in a transient error condition. + * - PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. + * In this case the primary replid and global replication + * offset is saved. + * - PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue + * - PSYNC_WAIT_REPLY: Still awaiting a reply, call this function again. + * - PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and + * the caller should fall back to SYNC. + * - PSYNC_FULLRESYNC_DUAL_CHANNEL: If partial synchronization is not possible + * but the primary supports full synchronization using + * a dedicated RDB channel. In this case, the RDB channel + * is initialized, and full synchronization will continue + * via the dual-channel approach. + */ int replicaProcessPsyncReply(connection *conn) { sds reply; From fc489751fbafccd67fd10043a293405787c848f5 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 12 Jan 2025 14:40:30 +0000 Subject: [PATCH 09/11] Fix usage of /* in comments and double space Signed-off-by: Nitai Caro --- src/replication.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/replication.c b/src/replication.c index 51c26592b0..0ccf44a30e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3067,8 +3067,7 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { return; } -/** - * Handles the initial step of the partial resynchronization process by +/* Handles the initial step of the partial resynchronization process by * preparing and sending a PSYNC command to the primary server. * This function determines the appropriate replication ID (replid) * and replication offset based on the server's state. If successful, @@ -3078,8 +3077,7 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { * Return Values: * - PSYNC_WRITE_ERROR: There was an error writing the command to the socket. * - PSYNC_WAIT_REPLY: PSYNC was successfully sent, awaiting a reply. The next - * step is to call replicaProcessPsyncReply(). - */ + * step is to call replicaProcessPsyncReply(). */ #define PSYNC_WRITE_ERROR 0 #define PSYNC_WAIT_REPLY 1 #define PSYNC_CONTINUE 2 @@ -3134,9 +3132,7 @@ int replicaSendPsyncCommand(connection *conn) { return PSYNC_WAIT_REPLY; } - -/** - * Processes the reply from the primary server following a PSYNC command that +/* Processes the reply from the primary server following a PSYNC command that * was sent with replicaSendPsyncCommand(). * This function interprets the reply to determine if partial resynchronization * can proceed or if a full synchronization is required. @@ -3154,8 +3150,7 @@ int replicaSendPsyncCommand(connection *conn) { * but the primary supports full synchronization using * a dedicated RDB channel. In this case, the RDB channel * is initialized, and full synchronization will continue - * via the dual-channel approach. - */ + * via the dual-channel approach. */ int replicaProcessPsyncReply(connection *conn) { sds reply; From 761fc9b60c3f5d4c8f57cc0fbcc637e1864711da Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 12 Jan 2025 16:39:45 +0000 Subject: [PATCH 10/11] Refactor error labels in syncWithPrimary into a function Signed-off-by: Nitai Caro --- src/replication.c | 170 ++++++++++++++++++++++++++-------------------- 1 file changed, 98 insertions(+), 72 deletions(-) diff --git a/src/replication.c b/src/replication.c index 0ccf44a30e..4c45cd5f54 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3557,6 +3557,32 @@ int syncWithPrimaryHandleSendPsyncState(connection *conn, sds *err) { return C_OK; } +void syncWithPrimaryHandleError(connection *conn, sds *err) { + connClose(conn); + server.repl_transfer_s = NULL; + if (server.repl_rdb_transfer_s) { + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + } + if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); + if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + server.repl_state = REPL_STATE_CONNECT; + if (*err) sdsfree(*err); +} + +void syncWithPrimaryHandleWriteError(connection *conn, sds *err) { + serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", *err); + syncWithPrimaryHandleError(conn, err); +} + +void syncWithPrimaryHandleNoResponseError(connection *conn, sds *err) { + serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); + syncWithPrimaryHandleError(conn, err); +} + + /* * Dual channel for full sync * @@ -3637,7 +3663,7 @@ int syncWithPrimaryHandleSendPsyncState(connection *conn, sds *err) { * establish a connection with the primary. */ void syncWithPrimary(connection *conn) { char tmpfile[256], *err = NULL; - int psync_result, ret; + int psync_result; /* If this event fired after the user turned the instance into a primary * with REPLICAOF NO ONE we must just return ASAP. */ @@ -3650,76 +3676,95 @@ void syncWithPrimary(connection *conn) { * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING, "Error condition on socket for SYNC: %s", connGetLastError(conn)); - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } switch (server.repl_state) { /* Send a PING to check the primary is able to reply without errors. */ case REPL_STATE_CONNECTING: - ret = syncWithPrimaryHandleConnectingState(conn, &err); - if (ret == C_ERR) goto write_error; + if (syncWithPrimaryHandleConnectingState(conn, &err) == C_ERR) { + syncWithPrimaryHandleWriteError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; - goto ok; + return; /* Receive the PONG command. */ case REPL_STATE_RECEIVE_PING_REPLY: - ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err); - if (ret == C_ERR) { - if (err == NULL) goto no_response_error; - goto error; + if (syncWithPrimaryHandleReceivePingReplyState(conn, &err) == C_ERR) { + if (err == NULL) + syncWithPrimaryHandleNoResponseError(conn, &err); + else + syncWithPrimaryHandleError(conn, &err); + return; } if (err) sdsfree(err); err = NULL; server.repl_state = REPL_STATE_SEND_HANDSHAKE; /* fall through */ case REPL_STATE_SEND_HANDSHAKE: - ret = syncWithPrimaryHandleSendHandshakeState(conn, &err); - if (ret == C_ERR) goto write_error; + if (syncWithPrimaryHandleSendHandshakeState(conn, &err) == C_ERR) { + syncWithPrimaryHandleWriteError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; - goto ok; + return; /* Receive AUTH reply. */ case REPL_STATE_RECEIVE_AUTH_REPLY: - ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err); - if (ret == C_ERR) { - if (err == NULL) goto no_response_error; - goto error; + if (syncWithPrimaryHandleReceiveAuthReplyState(conn, &err) == C_ERR) { + if (err == NULL) + syncWithPrimaryHandleNoResponseError(conn, &err); + else + syncWithPrimaryHandleError(conn, &err); + return; } server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - if (server.primary_auth) goto ok; + if (server.primary_auth) return; /* fall through */ /* Receive REPLCONF listening-port reply. */ case REPL_STATE_RECEIVE_PORT_REPLY: - ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; + if (syncWithPrimaryHandleReceivePortReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; - goto ok; + return; /* Receive REPLCONF ip-address reply. */ case REPL_STATE_RECEIVE_IP_REPLY: - ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; + if (syncWithPrimaryHandleReceiveIPReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - if (server.replica_announce_ip) goto ok; + if (server.replica_announce_ip) return; /* fall through */ /* Receive CAPA reply. */ case REPL_STATE_RECEIVE_CAPA_REPLY: - ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; + if (syncWithPrimaryHandleReceiveCapaReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; - goto ok; + return; /* Receive VERSION reply. */ case REPL_STATE_RECEIVE_VERSION_REPLY: - ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; + if (syncWithPrimaryHandleReceiveVersionReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn, &err); + return; + } server.repl_state = REPL_STATE_SEND_PSYNC; /* fall through */ /* Try a partial resynchronization. If we don't have a cached primary - * replicaSendPsyncCommand() will at least try to use PSYNC - * to start a full resynchronization so that we get the primary replid - * and the global offset, to try a partial resync at the next - * reconnection attempt. */ + * replicaSendPsyncCommand() will at least try to use PSYNC + * to start a full resynchronization so that we get the primary replid + * and the global offset, to try a partial resync at the next + * reconnection attempt. */ case REPL_STATE_SEND_PSYNC: - ret = syncWithPrimaryHandleSendPsyncState(conn, &err); - if (ret == C_ERR) goto write_error; + if (syncWithPrimaryHandleSendPsyncState(conn, &err) == C_ERR) { + syncWithPrimaryHandleWriteError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - goto ok; + return; /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ default: if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { @@ -3727,7 +3772,8 @@ void syncWithPrimary(connection *conn) { "syncWithPrimary(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } } @@ -3750,7 +3796,10 @@ void syncWithPrimary(connection *conn) { * from scratch later, so go to the error path. This happens when * the server is loading the dataset or is not connected with its * primary and so forth. */ - if (psync_result == PSYNC_TRY_LATER) goto error; + if (psync_result == PSYNC_TRY_LATER) { + syncWithPrimaryHandleError(conn, &err); + return; + } /* Note: if PSYNC does not return WAIT_REPLY, it will take care of * uninstalling the read handler from the file descriptor. */ @@ -3761,7 +3810,7 @@ void syncWithPrimary(connection *conn) { serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " "accept connections in read-write mode.\n"); } - goto ok; + return; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC @@ -3771,7 +3820,8 @@ void syncWithPrimary(connection *conn) { serverLog(LL_NOTICE, "Retrying with SYNC..."); if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { serverLog(LL_WARNING, "I/O error writing to PRIMARY: %s", connGetLastError(conn)); - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } } @@ -3791,7 +3841,8 @@ void syncWithPrimary(connection *conn) { if (dfd == -1) { serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", strerror(errno)); - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } server.repl_transfer_tmpfile = zstrdup(tmpfile); server.repl_transfer_fd = dfd; @@ -3807,23 +3858,26 @@ void syncWithPrimary(connection *conn) { serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; - goto ok; + return; } /* Setup the non blocking download of the bulk file. */ if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); - goto error; + syncWithPrimaryHandleError(conn, &err); + return; } server.repl_state = REPL_STATE_TRANSFER; @@ -3831,34 +3885,6 @@ void syncWithPrimary(connection *conn) { server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; - goto ok; - -no_response_error: /* Handle receiveSynchronousResponse() error when primary has no reply */ - serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); - /* Fall through to regular error handling */ - -error: - connClose(conn); - server.repl_transfer_s = NULL; - if (server.repl_rdb_transfer_s) { - connClose(server.repl_rdb_transfer_s); - server.repl_rdb_transfer_s = NULL; - } - if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); - if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile); - server.repl_transfer_tmpfile = NULL; - server.repl_transfer_fd = -1; - server.repl_state = REPL_STATE_CONNECT; - if (err) sdsfree(err); - return; - -write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", err); - goto error; - -ok: - if (err) sdsfree(err); - return; } int connectWithPrimary(void) { From 36fe717305f5a020f0c450c301a0b62b80083cc2 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Sun, 12 Jan 2025 17:07:45 +0000 Subject: [PATCH 11/11] Fix comment formatting --- src/replication.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/replication.c b/src/replication.c index 4c45cd5f54..c13bb3208c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3754,10 +3754,10 @@ void syncWithPrimary(connection *conn) { server.repl_state = REPL_STATE_SEND_PSYNC; /* fall through */ /* Try a partial resynchronization. If we don't have a cached primary - * replicaSendPsyncCommand() will at least try to use PSYNC - * to start a full resynchronization so that we get the primary replid - * and the global offset, to try a partial resync at the next - * reconnection attempt. */ + * replicaSendPsyncCommand() will at least try to use PSYNC + * to start a full resynchronization so that we get the primary replid + * and the global offset, to try a partial resync at the next + * reconnection attempt. */ case REPL_STATE_SEND_PSYNC: if (syncWithPrimaryHandleSendPsyncState(conn, &err) == C_ERR) { syncWithPrimaryHandleWriteError(conn, &err);