diff --git a/src/replication.c b/src/replication.c index 9913d64d65..26465057c7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -57,6 +57,7 @@ void replicationSteadyStateInit(void); void dualChannelSetupMainConnForPsync(connection *conn); void dualChannelSyncHandleRdbLoadCompletion(void); static void dualChannelFullSyncWithPrimary(connection *conn); +void syncWithPrimary(connection *conn); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case @@ -3066,54 +3067,17 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { return; } -/* Try a partial resynchronization with the primary if we are about to reconnect. - * If there is no cached primary structure, at least try to issue a - * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC - * command in order to obtain the primary replid and the primary replication - * global offset. +/* Handles the initial step of the partial resynchronization process by + * preparing and sending a PSYNC command to the primary server. + * This function determines the appropriate replication ID (replid) + * and replication offset based on the server's state. If successful, + * the function signals readiness for the reply processing phase, which is + * processed in replicaProcessPsyncReply(). * - * This function is designed to be called from syncWithPrimary(), so the - * following assumptions are made: - * - * 1) We pass the function an already connected socket "fd". - * 2) This function does not close the file descriptor "fd". However in case - * of successful partial resynchronization, the function will reuse - * 'fd' as file descriptor of the server.primary client structure. - * - * The function is split in two halves: if read_reply is 0, the function - * writes the PSYNC command on the socket, and a new function call is - * needed, with read_reply set to 1, in order to read the reply of the - * command. This is useful in order to support non blocking operations, so - * that we write, return into the event loop, and read when there are data. - * - * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there - * was a write error, or PSYNC_WAIT_REPLY to signal we need another call - * with read_reply set to 1. However even when read_reply is set to 1 - * the function may return PSYNC_WAIT_REPLY again to signal there were - * insufficient data to read to complete its work. We should re-enter - * into the event loop and wait in such a case. - * - * The function returns: - * - * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue. - * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. - * In this case the primary replid and global replication - * offset is saved. - * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and - * the caller should fall back to SYNC. - * PSYNC_WRITE_ERROR: There was an error writing the command to the socket. - * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1. - * PSYNC_TRY_LATER: Primary is currently in a transient error condition. - * - * Notable side effects: - * - * 1) As a side effect of the function call the function removes the readable - * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. - * 2) server.primary_initial_offset is set to the right value according - * to the primary reply. This will be used to populate the 'server.primary' - * structure replication offset. - */ - + * Return Values: + * - PSYNC_WRITE_ERROR: There was an error writing the command to the socket. + * - PSYNC_WAIT_REPLY: PSYNC was successfully sent, awaiting a reply. The next + * step is to call replicaProcessPsyncReply(). */ #define PSYNC_WRITE_ERROR 0 #define PSYNC_WAIT_REPLY 1 #define PSYNC_CONTINUE 2 @@ -3121,54 +3085,74 @@ void dualChannelSyncHandleRdbLoadCompletion(void) { #define PSYNC_NOT_SUPPORTED 4 #define PSYNC_TRY_LATER 5 #define PSYNC_FULLRESYNC_DUAL_CHANNEL 6 -int replicaTryPartialResynchronization(connection *conn, int read_reply) { +int replicaSendPsyncCommand(connection *conn) { char *psync_replid; char psync_offset[32]; sds reply; - /* Writing half */ - if (!read_reply) { - /* Initially set primary_initial_offset to -1 to mark the current - * primary replid and offset as not valid. Later if we'll be able to do - * a FULL resync using the PSYNC command we'll set the offset at the - * right value, so that this information will be propagated to the - * client structure representing the primary into server.primary. */ - server.primary_initial_offset = -1; + /* Initially set primary_initial_offset to -1 to mark the current + * primary replid and offset as not valid. Later if we'll be able to do + * a FULL resync using the PSYNC command we'll set the offset at the + * right value, so that this information will be propagated to the + * client structure representing the primary into server.primary. */ + server.primary_initial_offset = -1; - if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { - /* While in dual channel replication, we should use our prepared repl id and offset. */ - psync_replid = server.repl_provisional_primary.replid; - snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1); - dualChannelServerLog(LL_NOTICE, - "Trying a partial resynchronization using main channel (request %s:%s).", - psync_replid, psync_offset); - } else if (server.cached_primary) { - psync_replid = server.cached_primary->repl_data->replid; - snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->repl_data->reploff + 1); - serverLog(LL_NOTICE, "Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); - } else { - serverLog(LL_NOTICE, "Partial resynchronization not possible (no cached primary)"); - psync_replid = "?"; - memcpy(psync_offset, "-1", 3); - } + if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { + /* While in dual channel replication, we should use our prepared repl id and offset. */ + psync_replid = server.repl_provisional_primary.replid; + snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1); + dualChannelServerLog(LL_NOTICE, + "Trying a partial resynchronization using main channel (request %s:%s).", + psync_replid, psync_offset); + } else if (server.cached_primary) { + psync_replid = server.cached_primary->repl_data->replid; + snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->repl_data->reploff + 1); + serverLog(LL_NOTICE, "Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); + } else { + serverLog(LL_NOTICE, "Partial resynchronization not possible (no cached primary)"); + psync_replid = "?"; + memcpy(psync_offset, "-1", 3); + } - /* Issue the PSYNC command, if this is a primary with a failover in - * progress then send the failover argument to the replica to cause it - * to become a primary */ - if (server.failover_state == FAILOVER_IN_PROGRESS) { - reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL); - } else { - reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, NULL); - } + /* Issue the PSYNC command, if this is a primary with a failover in + * progress then send the failover argument to the replica to cause it + * to become a primary */ + if (server.failover_state == FAILOVER_IN_PROGRESS) { + reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL); + } else { + reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, NULL); + } - if (reply != NULL) { - serverLog(LL_WARNING, "Unable to send PSYNC to primary: %s", reply); - sdsfree(reply); - connSetReadHandler(conn, NULL); - return PSYNC_WRITE_ERROR; - } - return PSYNC_WAIT_REPLY; + if (reply != NULL) { + serverLog(LL_WARNING, "Unable to send PSYNC to primary: %s", reply); + sdsfree(reply); + connSetReadHandler(conn, NULL); + return PSYNC_WRITE_ERROR; } + return PSYNC_WAIT_REPLY; +} + +/* Processes the reply from the primary server following a PSYNC command that + * was sent with replicaSendPsyncCommand(). + * This function interprets the reply to determine if partial resynchronization + * can proceed or if a full synchronization is required. + * + * Return Values: + * - PSYNC_TRY_LATER: Primary is currently in a transient error condition. + * - PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed. + * In this case the primary replid and global replication + * offset is saved. + * - PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue + * - PSYNC_WAIT_REPLY: Still awaiting a reply, call this function again. + * - PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and + * the caller should fall back to SYNC. + * - PSYNC_FULLRESYNC_DUAL_CHANNEL: If partial synchronization is not possible + * but the primary supports full synchronization using + * a dedicated RDB channel. In this case, the RDB channel + * is initialized, and full synchronization will continue + * via the dual-channel approach. */ +int replicaProcessPsyncReply(connection *conn) { + sds reply; /* Reading half */ reply = receiveSynchronousResponse(conn); @@ -3340,7 +3324,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { if (server.debug_pause_after_fork) debugPauseProcess(); - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) { dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Write error."); *err = sdsnew(connGetLastError(conn)); return C_ERR; @@ -3349,7 +3333,7 @@ int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { } int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) { - int psync_result = replicaTryPartialResynchronization(conn, 1); + int psync_result = replicaProcessPsyncReply(conn); if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { @@ -3408,6 +3392,209 @@ void dualChannelSetupMainConnForPsync(connection *conn) { sdsfree(err); } + +int syncWithPrimaryHandleConnectingState(connection *conn, sds *err) { + serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); + /* Delete the writable event so that the readable event remains + * registered and we can wait for the PONG reply. */ + connSetReadHandler(conn, syncWithPrimary); + connSetWriteHandler(conn, NULL); + /* Send the PING, don't check for errors at all, we have the timeout + * that will take care about this. */ + *err = sendCommand(conn, "PING", NULL); + if (*err) return C_ERR; + return C_OK; +} + +int syncWithPrimaryHandleReceivePingReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + + /* The primary did not reply */ + if (*err == NULL) return C_ERR; + + /* We accept only two replies as valid, a positive +PONG reply + * (we just check for "+") or an authentication error. + * Note that older versions of Redis OSS replied with "operation not + * permitted" instead of using a proper error code, so we test + * both. */ + if (*err[0] != '+' && strncmp(*err, "-NOAUTH", 7) != 0 && strncmp(*err, "-NOPERM", 7) != 0 && + strncmp(*err, "-ERR operation not permitted", 28) != 0) { + serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", *err); + sdsfree(*err); + return C_ERR; + } else { + serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); + } + sdsfree(*err); + *err = NULL; + return C_OK; +} + +int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) { + /* AUTH with the primary if required. */ + if (server.primary_auth) { + char *args[3] = {"AUTH", NULL, NULL}; + size_t lens[3] = {4, 0, 0}; + int argc = 1; + if (server.primary_user) { + args[argc] = server.primary_user; + lens[argc] = strlen(server.primary_user); + argc++; + } + args[argc] = server.primary_auth; + lens[argc] = sdslen(server.primary_auth); + argc++; + *err = sendCommandArgv(conn, argc, args, lens); + if (*err) return C_ERR; + } + + /* Set the replica port, so that primary's INFO command can list the + * replica listening port correctly. */ + { + sds portstr = getReplicaPortString(); + *err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); + sdsfree(portstr); + if (*err) return C_ERR; + } + + /* Set the replica ip, so that primary's INFO command can list the + * replica IP address port correctly in case of port forwarding or NAT. + * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ + if (server.replica_announce_ip) { + *err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); + if (*err) return C_ERR; + } + + /* Inform the primary of our (replica) capabilities. + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * + * The primary will ignore capabilities it does not understand. */ + *err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", + server.dual_channel_replication ? "capa" : NULL, + server.dual_channel_replication ? "dual-channel" : NULL, NULL); + if (*err) return C_ERR; + + /* Inform the primary of our (replica) version. */ + *err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); + if (*err) return C_ERR; + return C_OK; +} + +int syncWithPrimaryHandleReceiveAuthReplyState(connection *conn, sds *err) { + if (server.primary_auth) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + if (*err[0] == '-') { + serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", *err); + sdsfree(*err); + return C_ERR; + } + sdsfree(*err); + *err = NULL; + } + return C_OK; +} + +int syncWithPrimaryHandleReceivePortReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF listening-port. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF listening-port: %s", + *err); + } + sdsfree(*err); + return C_OK; +} + +int syncWithPrimaryHandleReceiveIPReplyState(connection *conn, sds *err) { + if (server.replica_announce_ip) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF ip-address. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF ip-address: %s", + *err); + } + sdsfree(*err); + } + return C_OK; +} + +int syncWithPrimaryHandleReceiveCapaReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any, not all the Redis OSS versions support + * REPLCONF capa. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF capa: %s", + *err); + } + sdsfree(*err); + *err = NULL; + return C_OK; +} + +int syncWithPrimaryHandleReceiveVersionReplyState(connection *conn, sds *err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ + if (*err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF VERSION: %s", + *err); + } + sdsfree(*err); + *err = NULL; + return C_OK; +} + +int syncWithPrimaryHandleSendPsyncState(connection *conn, sds *err) { + if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) { + *err = sdsnew("Write error sending the PSYNC command."); + abortFailover("Write error to failover target"); + return C_ERR; + } + return C_OK; +} + +void syncWithPrimaryHandleError(connection *conn) { + connClose(conn); + server.repl_transfer_s = NULL; + if (server.repl_rdb_transfer_s) { + connClose(server.repl_rdb_transfer_s); + server.repl_rdb_transfer_s = NULL; + } + if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); + if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + server.repl_state = REPL_STATE_CONNECT; +} + +void syncWithPrimaryHandleWriteError(connection *conn, sds *err) { + serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", *err); + sdsfree(*err); + syncWithPrimaryHandleError(conn); +} + +void syncWithPrimaryHandleNoResponseError(connection *conn) { + serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); + syncWithPrimaryHandleError(conn); +} + + /* * Dual channel for full sync * @@ -3501,217 +3688,106 @@ void syncWithPrimary(connection *conn) { * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING, "Error condition on socket for SYNC: %s", connGetLastError(conn)); - goto error; + syncWithPrimaryHandleError(conn); + return; } - + switch (server.repl_state) { /* Send a PING to check the primary is able to reply without errors. */ - if (server.repl_state == REPL_STATE_CONNECTING) { - serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); - /* Delete the writable event so that the readable event remains - * registered and we can wait for the PONG reply. */ - connSetReadHandler(conn, syncWithPrimary); - connSetWriteHandler(conn, NULL); + case REPL_STATE_CONNECTING: + if (syncWithPrimaryHandleConnectingState(conn, &err) == C_ERR) { + syncWithPrimaryHandleWriteError(conn, &err); + return; + } server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; - /* Send the PING, don't check for errors at all, we have the timeout - * that will take care about this. */ - err = sendCommand(conn, "PING", NULL); - if (err) goto write_error; return; - } - /* Receive the PONG command. */ - if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) { - err = receiveSynchronousResponse(conn); - - /* The primary did not reply */ - if (err == NULL) goto no_response_error; - - /* We accept only two replies as valid, a positive +PONG reply - * (we just check for "+") or an authentication error. - * Note that older versions of Redis OSS replied with "operation not - * permitted" instead of using a proper error code, so we test - * both. */ - if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 && - strncmp(err, "-ERR operation not permitted", 28) != 0) { - serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err); - sdsfree(err); - goto error; - } else { - serverLog(LL_NOTICE, "Primary replied to PING, replication can continue..."); + case REPL_STATE_RECEIVE_PING_REPLY: + if (syncWithPrimaryHandleReceivePingReplyState(conn, &err) == C_ERR) { + if (err == NULL) + syncWithPrimaryHandleNoResponseError(conn); + else + syncWithPrimaryHandleError(conn); + return; } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_SEND_HANDSHAKE; - } - - if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { - /* AUTH with the primary if required. */ - if (server.primary_auth) { - char *args[3] = {"AUTH", NULL, NULL}; - size_t lens[3] = {4, 0, 0}; - int argc = 1; - if (server.primary_user) { - args[argc] = server.primary_user; - lens[argc] = strlen(server.primary_user); - argc++; - } - args[argc] = server.primary_auth; - lens[argc] = sdslen(server.primary_auth); - argc++; - err = sendCommandArgv(conn, argc, args, lens); - if (err) goto write_error; - } - - /* Set the replica port, so that primary's INFO command can list the - * replica listening port correctly. */ - { - sds portstr = getReplicaPortString(); - err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); - sdsfree(portstr); - if (err) goto write_error; - } - - /* Set the replica ip, so that primary's INFO command can list the - * replica IP address port correctly in case of port forwarding or NAT. - * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ - if (server.replica_announce_ip) { - err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); - if (err) goto write_error; + /* fall through */ + case REPL_STATE_SEND_HANDSHAKE: + if (syncWithPrimaryHandleSendHandshakeState(conn, &err) == C_ERR) { + syncWithPrimaryHandleWriteError(conn, &err); + return; } - - /* Inform the primary of our (replica) capabilities. - * - * EOF: supports EOF-style RDB transfer for diskless replication. - * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * - * The primary will ignore capabilities it does not understand. */ - err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); - if (err) goto write_error; - - /* Inform the primary of our (replica) version. */ - err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); - if (err) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; return; - } - - if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.primary_auth) - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - /* Receive AUTH reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - if (err[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err); - sdsfree(err); - goto error; + case REPL_STATE_RECEIVE_AUTH_REPLY: + if (syncWithPrimaryHandleReceiveAuthReplyState(conn, &err) == C_ERR) { + if (err == NULL) + syncWithPrimaryHandleNoResponseError(conn); + else + syncWithPrimaryHandleError(conn); + return; } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - return; - } - + if (server.primary_auth) return; + /* fall through */ /* Receive REPLCONF listening-port reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF listening-port. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF listening-port: %s", - err); + case REPL_STATE_RECEIVE_PORT_REPLY: + if (syncWithPrimaryHandleReceivePortReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn); + return; } - sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; return; - } - - if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.replica_announce_ip) - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - /* Receive REPLCONF ip-address reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF ip-address. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF ip-address: %s", - err); + case REPL_STATE_RECEIVE_IP_REPLY: + if (syncWithPrimaryHandleReceiveIPReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn); + return; } - sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - return; - } - + if (server.replica_announce_ip) return; + /* fall through */ /* Receive CAPA reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF capa. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF capa: %s", - err); + case REPL_STATE_RECEIVE_CAPA_REPLY: + if (syncWithPrimaryHandleReceiveCapaReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn); + return; } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; return; - } - /* Receive VERSION reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto no_response_error; - /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ - if (err[0] == '-') { - serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF VERSION: %s", - err); + case REPL_STATE_RECEIVE_VERSION_REPLY: + if (syncWithPrimaryHandleReceiveVersionReplyState(conn, &err) == C_ERR) { + syncWithPrimaryHandleNoResponseError(conn); + return; } - sdsfree(err); - err = NULL; server.repl_state = REPL_STATE_SEND_PSYNC; - } - + /* fall through */ /* Try a partial resynchronization. If we don't have a cached primary - * replicaTryPartialResynchronization() will at least try to use PSYNC + * replicaSendPsyncCommand() will at least try to use PSYNC * to start a full resynchronization so that we get the primary replid * and the global offset, to try a partial resync at the next * reconnection attempt. */ - if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { - err = sdsnew("Write error sending the PSYNC command."); - abortFailover("Write error to failover target"); - goto write_error; + case REPL_STATE_SEND_PSYNC: + if (syncWithPrimaryHandleSendPsyncState(conn, &err) == C_ERR) { + syncWithPrimaryHandleWriteError(conn, &err); + return; } server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; return; - } - /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { - serverLog(LL_WARNING, - "syncWithPrimary(): state machine error, " - "state should be RECEIVE_PSYNC but is %d", - server.repl_state); - goto error; + default: + if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { + serverLog(LL_WARNING, + "syncWithPrimary(): state machine error, " + "state should be RECEIVE_PSYNC but is %d", + server.repl_state); + syncWithPrimaryHandleError(conn); + return; + } } - psync_result = replicaTryPartialResynchronization(conn, 1); + psync_result = replicaProcessPsyncReply(conn); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, @@ -3730,7 +3806,10 @@ void syncWithPrimary(connection *conn) { * from scratch later, so go to the error path. This happens when * the server is loading the dataset or is not connected with its * primary and so forth. */ - if (psync_result == PSYNC_TRY_LATER) goto error; + if (psync_result == PSYNC_TRY_LATER) { + syncWithPrimaryHandleError(conn); + return; + } /* Note: if PSYNC does not return WAIT_REPLY, it will take care of * uninstalling the read handler from the file descriptor. */ @@ -3751,7 +3830,8 @@ void syncWithPrimary(connection *conn) { serverLog(LL_NOTICE, "Retrying with SYNC..."); if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { serverLog(LL_WARNING, "I/O error writing to PRIMARY: %s", connGetLastError(conn)); - goto error; + syncWithPrimaryHandleError(conn); + return; } } @@ -3771,7 +3851,8 @@ void syncWithPrimary(connection *conn) { if (dfd == -1) { serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", strerror(errno)); - goto error; + syncWithPrimaryHandleError(conn); + return; } server.repl_transfer_tmpfile = zstrdup(tmpfile); server.repl_transfer_fd = dfd; @@ -3787,13 +3868,15 @@ void syncWithPrimary(connection *conn) { serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; - goto error; + syncWithPrimaryHandleError(conn); + return; } if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); - goto error; + syncWithPrimaryHandleError(conn); + return; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; return; @@ -3803,7 +3886,8 @@ void syncWithPrimary(connection *conn) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); - goto error; + syncWithPrimaryHandleError(conn); + return; } server.repl_state = REPL_STATE_TRANSFER; @@ -3811,30 +3895,6 @@ void syncWithPrimary(connection *conn) { server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; - return; - -no_response_error: /* Handle receiveSynchronousResponse() error when primary has no reply */ - serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake"); - /* Fall through to regular error handling */ - -error: - connClose(conn); - server.repl_transfer_s = NULL; - if (server.repl_rdb_transfer_s) { - connClose(server.repl_rdb_transfer_s); - server.repl_rdb_transfer_s = NULL; - } - if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); - if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile); - server.repl_transfer_tmpfile = NULL; - server.repl_transfer_fd = -1; - server.repl_state = REPL_STATE_CONNECT; - return; - -write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", err); - sdsfree(err); - goto error; } int connectWithPrimary(void) {