Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Skip the checksum for diskless replication #1181

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,8 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
long key_counter = 0;
int j;

if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
if (server.rdb_checksum && !(rdbflags & RDBFLAGS_CKSUM_SKIP))
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
Expand Down Expand Up @@ -1451,13 +1452,16 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];

int skip_cksum_repl = RDBFLAGS_REPLICATION;
startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark, RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb, "\r\n", 2) == 0) goto werr;
if (server.repl_diskless_sync && req & REPLICA_REQ_CHKSUM_SKIP)
skip_cksum_repl |= RDBFLAGS_CKSUM_SKIP;
if (rdbSaveRio(req, rdb, error, skip_cksum_repl, rsi) == C_ERR) goto werr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably keep only one rdbSaveRio :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for noticing this. I am aware of this and removed in my local branch, this has had happened because I had merge conflict in the branch so created this by copying the code from original branch. it was missed during the copy paste

if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
#define RDBFLAGS_CKSUM_SKIP (1 << 5) /* Skip checksum for diskless sync. */

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
18 changes: 17 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,9 @@ void syncCommand(client *c) {
* - rdb-channel <1|0>
* Used to identify the client as a replica's rdb connection in an dual channel
* sync session.
*
* - repl-diskless-load <1|0>
* Replica is capable of load data from replication stream, request to skip checksum.
* */
void replconfCommand(client *c) {
int j;
Expand Down Expand Up @@ -1302,6 +1305,13 @@ void replconfCommand(client *c) {
sdslen(addr));
return;
}
} else if (!strcasecmp(c->argv[j]->ptr, "repl-diskless-load")) {
/* REPLCONF repl-diskless-load is used to identify the client is capable of
* load directly without creating rdb file */
long rdb_diskless_load = 0;
if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_diskless_load, NULL) != C_OK) return;
if (rdb_diskless_load == 1)
c->replica_req |= REPLICA_REQ_CHKSUM_SKIP;
Copy link
Member

@ranshid ranshid Oct 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this point the skip CRC is more "capability" than "requirement" on the replica side. So I would suggest adding a replconf capa indication instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially my plan also was to add in capabilities, However the capabilies were not passed many function as requirements being passed. Since requirements are so flexible in the available functions I added in the requirements. But I agree with you this should be part of the capa. PR is still in-progress I will try to make this in Capabilities instead of requirements. Thank you!

} else if (!strcasecmp(c->argv[j]->ptr, "capa")) {
/* Ignore capabilities not understood by this primary. */
if (!strcasecmp(c->argv[j + 1]->ptr, "eof"))
Expand Down Expand Up @@ -2613,7 +2623,7 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) {
/* Send replica listening port to primary for clarification */
sds portstr = getReplicaPortString();
*err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port", portstr,
NULL);
"repl-diskless-load", useDisklessLoad() ? "1" : "0", NULL);
sdsfree(portstr);
if (*err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
Expand Down Expand Up @@ -3506,6 +3516,12 @@ void syncWithPrimary(connection *conn) {
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
if (err) goto write_error;

/* Inform the primary of replicas repl-diskless-load config. */
if (useDisklessLoad()) {
err = sendCommand(conn, "REPLCONF", "repl-diskless-load", "1", NULL);
if (err) goto write_error;
}

/* Inform the primary of our (replica) version. */
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ typedef enum {
#define REPLICA_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */
#define REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */
#define REPLICA_REQ_RDB_CHANNEL (1 << 2) /* Use dual-channel-replication */
#define REPLICA_REQ_CHKSUM_SKIP (1 << 3) /* Exclude checksum from RDB */
/* Mask of all bits in the replica requirements bitfield that represent non-standard (filtered) RDB requirements */
#define REPLICA_REQ_RDB_MASK (REPLICA_REQ_RDB_EXCLUDE_DATA | REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS)

Expand Down
Loading