From 6df1d3ccd3ebaa8b0c26d439994d34dc00537503 Mon Sep 17 00:00:00 2001 From: Nitai Caro Date: Mon, 23 Dec 2024 11:29:43 +0000 Subject: [PATCH] Run formatter --- src/replication.c | 258 +++++++++++++++++++++++----------------------- 1 file changed, 129 insertions(+), 129 deletions(-) diff --git a/src/replication.c b/src/replication.c index b52e098bec..c3970c8945 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3058,10 +3058,10 @@ int replicaSendPsyncCommand(connection *conn) { sds reply; /* Initially set primary_initial_offset to -1 to mark the current - * primary replid and offset as not valid. Later if we'll be able to do - * a FULL resync using the PSYNC command we'll set the offset at the - * right value, so that this information will be propagated to the - * client structure representing the primary into server.primary. */ + * primary replid and offset as not valid. Later if we'll be able to do + * a FULL resync using the PSYNC command we'll set the offset at the + * right value, so that this information will be propagated to the + * client structure representing the primary into server.primary. */ server.primary_initial_offset = -1; if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { @@ -3082,8 +3082,8 @@ int replicaSendPsyncCommand(connection *conn) { } /* Issue the PSYNC command, if this is a primary with a failover in - * progress then send the failover argument to the replica to cause it - * to become a primary */ + * progress then send the failover argument to the replica to cause it + * to become a primary */ if (server.failover_state == FAILOVER_IN_PROGRESS) { reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL); } else { @@ -3344,11 +3344,11 @@ void dualChannelSetupMainConnForPsync(connection *conn) { int syncWithPrimaryHandleConnectingState(connection *conn, sds *err) { serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains - * registered and we can wait for the PONG reply. */ + * registered and we can wait for the PONG reply. */ connSetReadHandler(conn, syncWithPrimary); connSetWriteHandler(conn, NULL); /* Send the PING, don't check for errors at all, we have the timeout - * that will take care about this. */ + * that will take care about this. */ *err = sendCommand(conn, "PING", NULL); if (*err) return C_ERR; return C_OK; @@ -3361,10 +3361,10 @@ int syncWithPrimaryHandleReceivePingReplyState(connection *conn, sds *err) { if (*err == NULL) return C_ERR; /* We accept only two replies as valid, a positive +PONG reply - * (we just check for "+") or an authentication error. - * Note that older versions of Redis OSS replied with "operation not - * permitted" instead of using a proper error code, so we test - * both. */ + * (we just check for "+") or an authentication error. + * Note that older versions of Redis OSS replied with "operation not + * permitted" instead of using a proper error code, so we test + * both. */ if (*err[0] != '+' && strncmp(*err, "-NOAUTH", 7) != 0 && strncmp(*err, "-NOPERM", 7) != 0 && strncmp(*err, "-ERR operation not permitted", 28) != 0) { serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", *err); @@ -3394,7 +3394,7 @@ int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) { } /* Set the replica port, so that primary's INFO command can list the - * replica listening port correctly. */ + * replica listening port correctly. */ { sds portstr = getReplicaPortString(); *err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL); @@ -3403,22 +3403,22 @@ int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) { } /* Set the replica ip, so that primary's INFO command can list the - * replica IP address port correctly in case of port forwarding or NAT. - * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ + * replica IP address port correctly in case of port forwarding or NAT. + * Skip REPLCONF ip-address if there is no replica-announce-ip option set. */ if (server.replica_announce_ip) { *err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL); if (*err) return C_ERR; } /* Inform the primary of our (replica) capabilities. - * - * EOF: supports EOF-style RDB transfer for diskless replication. - * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * - * The primary will ignore capabilities it does not understand. */ + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * + * The primary will ignore capabilities it does not understand. */ *err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); + server.dual_channel_replication ? "capa" : NULL, + server.dual_channel_replication ? "dual-channel" : NULL, NULL); if (*err) return C_ERR; /* Inform the primary of our (replica) version. */ @@ -3443,12 +3443,12 @@ int syncWithPrimaryHandleReceivePortReplyState(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF listening-port. */ + * REPLCONF listening-port. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF listening-port: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF listening-port: %s", + *err); } return C_OK; } @@ -3458,12 +3458,12 @@ int syncWithPrimaryHandleReceiveIPReplyState(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF ip-address. */ + * REPLCONF ip-address. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF ip-address: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF ip-address: %s", + *err); } } return C_OK; @@ -3473,12 +3473,12 @@ int syncWithPrimaryHandleReceiveCapaReplyState(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; /* Ignore the error if any, not all the Redis OSS versions support - * REPLCONF capa. */ + * REPLCONF capa. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF capa: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF capa: %s", + *err); } return C_OK; } @@ -3489,9 +3489,9 @@ int syncWithPrimaryHandleReceiveVersionReplyState(connection *conn, sds *err) { /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ if (*err[0] == '-') { serverLog(LL_NOTICE, - "(Non critical) Primary does not understand " - "REPLCONF VERSION: %s", - *err); + "(Non critical) Primary does not understand " + "REPLCONF VERSION: %s", + *err); } return C_OK; } @@ -3601,90 +3601,90 @@ void syncWithPrimary(connection *conn) { goto error; } switch (server.repl_state) { - /* Send a PING to check the primary is able to reply without errors. */ - case REPL_STATE_CONNECTING: - ret = syncWithPrimaryHandleConnectingState(conn, &err); - if (ret == C_ERR) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; - goto ok; - /* Receive the PONG command. */ - case REPL_STATE_RECEIVE_PING_REPLY: - ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err); - if (ret == C_ERR) { - if (err == NULL) goto no_response_error; - goto error; - } - if (err) sdsfree(err); - err = NULL; - server.repl_state = REPL_STATE_SEND_HANDSHAKE; - /* fall through */ - case REPL_STATE_SEND_HANDSHAKE: - ret = syncWithPrimaryHandleSendHandshakeState(conn, &err); - if (ret == C_ERR) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; - goto ok; - /* Receive AUTH reply. */ - case REPL_STATE_RECEIVE_AUTH_REPLY: - ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err); - if (ret == C_ERR) { - if (err == NULL) goto no_response_error; - goto error; - } - server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; - if (server.primary_auth) goto ok; - /* fall through */ - /* Receive REPLCONF listening-port reply. */ - case REPL_STATE_RECEIVE_PORT_REPLY: - ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; - goto ok; - /* Receive REPLCONF ip-address reply. */ - case REPL_STATE_RECEIVE_IP_REPLY: - ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - if (server.replica_announce_ip) goto ok; - /* fall through */ - /* Receive CAPA reply. */ - case REPL_STATE_RECEIVE_CAPA_REPLY: - ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; - goto ok; - /* Receive VERSION reply. */ - case REPL_STATE_RECEIVE_VERSION_REPLY: - ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err); - if (ret == C_ERR) goto no_response_error; - server.repl_state = REPL_STATE_SEND_PSYNC; - /* fall through */ - /* Try a partial resynchronization. If we don't have a cached primary - * replicaSendPsyncCommand() will at least try to use PSYNC - * to start a full resynchronization so that we get the primary replid - * and the global offset, to try a partial resync at the next - * reconnection attempt. */ - case REPL_STATE_SEND_PSYNC: - ret = syncWithPrimaryHandleSendPsyncState(conn, &err); - if (ret == C_ERR) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - goto ok; - /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ - default: - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { - serverLog(LL_WARNING, - "syncWithPrimary(): state machine error, " - "state should be RECEIVE_PSYNC but is %d", - server.repl_state); - goto error; - } + /* Send a PING to check the primary is able to reply without errors. */ + case REPL_STATE_CONNECTING: + ret = syncWithPrimaryHandleConnectingState(conn, &err); + if (ret == C_ERR) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; + goto ok; + /* Receive the PONG command. */ + case REPL_STATE_RECEIVE_PING_REPLY: + ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err); + if (ret == C_ERR) { + if (err == NULL) goto no_response_error; + goto error; + } + if (err) sdsfree(err); + err = NULL; + server.repl_state = REPL_STATE_SEND_HANDSHAKE; + /* fall through */ + case REPL_STATE_SEND_HANDSHAKE: + ret = syncWithPrimaryHandleSendHandshakeState(conn, &err); + if (ret == C_ERR) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; + goto ok; + /* Receive AUTH reply. */ + case REPL_STATE_RECEIVE_AUTH_REPLY: + ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err); + if (ret == C_ERR) { + if (err == NULL) goto no_response_error; + goto error; + } + server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + if (server.primary_auth) goto ok; + /* fall through */ + /* Receive REPLCONF listening-port reply. */ + case REPL_STATE_RECEIVE_PORT_REPLY: + ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; + goto ok; + /* Receive REPLCONF ip-address reply. */ + case REPL_STATE_RECEIVE_IP_REPLY: + ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + if (server.replica_announce_ip) goto ok; + /* fall through */ + /* Receive CAPA reply. */ + case REPL_STATE_RECEIVE_CAPA_REPLY: + ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; + goto ok; + /* Receive VERSION reply. */ + case REPL_STATE_RECEIVE_VERSION_REPLY: + ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err); + if (ret == C_ERR) goto no_response_error; + server.repl_state = REPL_STATE_SEND_PSYNC; + /* fall through */ + /* Try a partial resynchronization. If we don't have a cached primary + * replicaSendPsyncCommand() will at least try to use PSYNC + * to start a full resynchronization so that we get the primary replid + * and the global offset, to try a partial resync at the next + * reconnection attempt. */ + case REPL_STATE_SEND_PSYNC: + ret = syncWithPrimaryHandleSendPsyncState(conn, &err); + if (ret == C_ERR) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + goto ok; + /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */ + default: + if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { + serverLog(LL_WARNING, + "syncWithPrimary(): state machine error, " + "state should be RECEIVE_PSYNC but is %d", + server.repl_state); + goto error; + } } - + psync_result = replicaProcessPsyncReply(conn); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* Check the status of the planned failover. We expect PSYNC_CONTINUE, - * but there is nothing technically wrong with a full resync which - * could happen in edge cases. */ + * but there is nothing technically wrong with a full resync which + * could happen in edge cases. */ if (server.failover_state == FAILOVER_IN_PROGRESS) { if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { clearFailoverState(); @@ -3695,26 +3695,26 @@ void syncWithPrimary(connection *conn) { } /* If the primary is in an transient error, we should try to PSYNC - * from scratch later, so go to the error path. This happens when - * the server is loading the dataset or is not connected with its - * primary and so forth. */ + * from scratch later, so go to the error path. This happens when + * the server is loading the dataset or is not connected with its + * primary and so forth. */ if (psync_result == PSYNC_TRY_LATER) goto error; - + /* Note: if PSYNC does not return WAIT_REPLY, it will take care of - * uninstalling the read handler from the file descriptor. */ + * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " - "accept connections in read-write mode.\n"); + "accept connections in read-write mode.\n"); } goto ok; } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC - * and the server.primary_replid and primary_initial_offset are - * already populated. */ + * and the server.primary_replid and primary_initial_offset are + * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE, "Retrying with SYNC..."); if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { @@ -3731,14 +3731,14 @@ void syncWithPrimary(connection *conn) { dfd = open(tmpfile, O_CREAT | O_WRONLY | O_EXCL, 0644); if (dfd != -1) break; /* We save the errno of open to prevent some systems from modifying it after - * the sleep call. For example, sleep in Mac will change errno to ETIMEDOUT. */ + * the sleep call. For example, sleep in Mac will change errno to ETIMEDOUT. */ int saved_errno = errno; sleep(1); errno = saved_errno; } if (dfd == -1) { serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s", - strerror(errno)); + strerror(errno)); goto error; } server.repl_transfer_tmpfile = zstrdup(tmpfile); @@ -3746,7 +3746,7 @@ void syncWithPrimary(connection *conn) { } /* Using dual-channel-replication, the primary responded +DUALCHANNELSYNC. We need to - * initialize the RDB channel. */ + * initialize the RDB channel. */ if (psync_result == PSYNC_FULLRESYNC_DUAL_CHANNEL) { /* Create RDB connection */ server.repl_rdb_transfer_s = connCreate(connTypeOfReplication()); @@ -3760,7 +3760,7 @@ void syncWithPrimary(connection *conn) { if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; @@ -3770,7 +3770,7 @@ void syncWithPrimary(connection *conn) { if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; }