Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRC removal during diskless full sync with TLS enabled #1478

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3156,6 +3156,7 @@ static int applyClientMaxMemoryUsage(const char **err) {
standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
createBoolConfig("disable-sync-crc", NULL, MODIFIABLE_CONFIG, server.disable_sync_crc, 0, NULL, NULL),
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
Expand Down
22 changes: 20 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3010,6 +3010,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int error;
long long empty_keys_skipped = 0;

if (rdb->flags & RIO_FLAG_DISABLE_CRC) server.stat_total_crc_disabled_syncs_stated++;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
Expand Down Expand Up @@ -3354,7 +3355,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0) {
if (cksum == 0 || (rdb->flags & RIO_FLAG_DISABLE_CRC) != 0) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
Expand Down Expand Up @@ -3545,8 +3546,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports disabled CRC, run with CRC disabled, otherwise, use CRC.
*/
int disable_sync_crc_capa = server.disable_sync_crc;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
Expand Down Expand Up @@ -3578,6 +3584,11 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}

// do not disable CRC on the primary if TLS is disabled or if the replica doesn't support it
if (!connIsTLS(replica->conn) || (replica->replica_capa & REPLICA_CAPA_DISABLE_SYNC_CRC) == 0)
disable_sync_crc_capa = 0;

}

/* Create the child process. */
Expand All @@ -3601,6 +3612,12 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);

if (disable_sync_crc_capa == 1) {
serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer");
// mark rdb object to skip CRC checksum calculations
rdb.flags |= RIO_FLAG_DISABLE_CRC;
}

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;

Expand Down Expand Up @@ -3666,6 +3683,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
}
if (!dual_channel) close(safe_to_exit_pipe);
if (disable_sync_crc_capa) server.stat_total_crc_disabled_syncs_stated++;
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
28 changes: 23 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,12 @@ void syncCommand(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel>
* - capa <eof|psync2|dual-channel|disable_sync_crc>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* disable_sync_crc: supports disabling CRC during TLS enabled diskless sync.
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
Expand Down Expand Up @@ -1314,7 +1315,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_DISABLE_SYNC_CRC_STR))
c->replica_capa |= REPLICA_CAPA_DISABLE_SYNC_CRC;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -2084,6 +2086,12 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s",
(long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk");
}

// Set a flag to determin later whether or not the replica will skip CRC calculations for this sync -
// Disable CRC on replica if: (1) TLS is enabled; (2) replica disable_sync_crc is enabled; (3) diskelss sync enabled on both replica and primary.
// Otherwise, CRC should be enabled/disabled as per server.rdb_checksum
if (connIsTLS(conn) && server.disable_sync_crc && use_diskless_load && usemark)
server.repl_meet_disable_crc_cond = 1;
return;
}

Expand Down Expand Up @@ -2251,6 +2259,7 @@ void readSyncBulkPayload(connection *conn) {

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
if (server.repl_meet_disable_crc_cond == 1) rdb.flags |= RIO_FLAG_DISABLE_CRC;

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
Expand Down Expand Up @@ -2493,6 +2502,7 @@ char *sendCommand(connection *conn, ...) {
while (1) {
arg = va_arg(ap, char *);
if (arg == NULL) break;
if (strcmp(arg, "") == 0) continue;
cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg);
argslen++;
}
Expand Down Expand Up @@ -3513,11 +3523,19 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* DISABLE-SYNC-CRC: supports disabling CRC calculations during full sync.
* Inform the primary of this capa only during diskless sync with TLS enabled.
* In disk-based sync, or non-TLS, there is more concern for data corruprion
* so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
server.repl_meet_disable_crc_cond = 0; // reset this value before sync starts
int send_disable_crc_capa = (connIsTLS(conn) && server.disable_sync_crc && useDisklessLoad());
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
send_disable_crc_capa ? "capa" : "",
send_disable_crc_capa ? REPLICA_CAPA_DISABLE_SYNC_CRC_STR : "",
server.dual_channel_replication ? "capa" : "",
server.dual_channel_replication ? "dual-channel" : "", NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
Expand Down
1 change: 1 addition & 0 deletions src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_DISABLE_CRC) != 0) return; // skip CRC64 calculations
r->cksum = crc64(r->cksum, buf, len);
}

Expand Down
1 change: 1 addition & 0 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_DISABLE_CRC (1 << 2)

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down
3 changes: 3 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,7 @@ void initServerConfig(void) {
server.fsynced_reploff_pending = 0;
server.rdb_client_id = -1;
server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT;
server.repl_meet_disable_crc_cond = 0;

/* Replication partial resync backlog */
server.repl_backlog = NULL;
Expand Down Expand Up @@ -2638,6 +2639,7 @@ void resetServerStats(void) {
server.stat_fork_rate = 0;
server.stat_total_forks = 0;
server.stat_rejected_conn = 0;
server.stat_total_crc_disabled_syncs_stated = 0;
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
Expand Down Expand Up @@ -5878,6 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024,
"instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024,
"rejected_connections:%lld\r\n", server.stat_rejected_conn,
"total_crc_disabled_syncs_stated:%ld\r\n", server.stat_total_crc_disabled_syncs_stated,
"sync_full:%lld\r\n", server.stat_sync_full,
"sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok,
"sync_partial_err:%lld\r\n", server.stat_sync_partial_err,
Expand Down
16 changes: 12 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,15 @@ typedef enum {
* a replica that only wants RDB without replication buffer */
#define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */

/* Replica capabilities. */
/* Replica capability flags */
#define REPLICA_CAPA_NONE 0
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_DISABLE_SYNC_CRC (1 << 3) /* Disable CRC checks for sync requests. */

/* Replica capability strings */
#define REPLICA_CAPA_DISABLE_SYNC_CRC_STR "disable-sync-crc" /* Disable CRC calculations during full sync */

/* Replica requirements */
#define REPLICA_REQ_NONE 0
Expand Down Expand Up @@ -1838,6 +1842,7 @@ struct valkeyServer {
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
long long stat_rejected_conn; /* Clients rejected because of maxclients */
size_t stat_total_crc_disabled_syncs_stated; /* Total number of full syncs stated with CRC checksum disabled */ // AMZN
long long stat_sync_full; /* Number of full resyncs with replicas. */
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */
Expand Down Expand Up @@ -1986,6 +1991,7 @@ struct valkeyServer {
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
int disable_sync_crc; /* Use RDB checksum during sync? Applicable only for TLS enabled diskless sync */
int rdb_del_sync_files; /* Remove RDB files used only for SYNC if
the instance does not use persistence. */
time_t lastsave; /* Unix time of last successful save */
Expand Down Expand Up @@ -2112,6 +2118,8 @@ struct valkeyServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */
int repl_meet_disable_crc_cond; /* Set to true only when replica meets all conditions for disabling CRC */

/* The following two fields is where we store primary PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->primary client structure. */
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/disable-sync-crc.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
start_server {tags {"repl tls"} overrides {save {}}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
foreach master_disable_crc {yes no} {
foreach replica_disable_crc {yes no} {
foreach mds {no yes} {
foreach sdl {disabled on-empty-db swapdb flush-before-load} {
test "CRC disabled sync - master:$master_disable_crc, replica:$replica_disable_crc, tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" {
$master config set disable-sync-crc $master_disable_crc
$master config set repl-diskless-sync $mds
start_server {overrides {save {}}} {
set replica [srv 0 client]

$replica config set disable-sync-crc $replica_disable_crc
$replica config set repl-diskless-load $sdl

$replica replicaof $master_host $master_port

wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica info replication]]
} else {
fail "Replication not started"
}
set is_master_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$master info stats]]
set is_replica_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$replica info stats]]

if {$replica_disable_crc eq "no" || $sdl eq "disabled" || $mds eq "no" || !$::tls} {
assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled"
assert_equal 0 $is_replica_crc_disabled "Replica should not have CRC disabled"
} else {
if {$replica_disable_crc eq "no"} {
assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled"
} else {
assert_equal 1 $is_master_crc_disabled "Master should have CRC disabled"
}
assert_equal 1 $is_replica_crc_disabled "Replica should have CRC disabled"
}
}
}
}
}
}
}
}