diff --git a/src/rdb.c b/src/rdb.c index 5fb77a2897..a4eb2823fb 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -64,6 +64,7 @@ char *rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); +int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); #ifdef __GNUC__ void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__((format(printf, 3, 4))); @@ -2991,7 +2992,19 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { functionsLibCtx *functions_lib_ctx = functionsLibCtxGetCurrent(); rdbLoadingCtx loading_ctx = {.dbarray = server.db, .functions_lib_ctx = functions_lib_ctx}; - int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, &loading_ctx); + int retval = rdbLoadRioWithLoadingCtxScopedRdb(rdb, rdbflags, rsi, &loading_ctx); + return retval; +} + +/* Wrapper for rdbLoadRioWithLoadingCtx that manages a scoped RDB context. + * This method wraps the rdbLoadRioWithLoadingCtx function, providing temporary + * RDB context management. It sets a new current loading RDB, calls the wrapped + * function, and then restores the previous loading RDB context. */ +int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) { + rio *prev_rio = server.loading_rio; + server.loading_rio = rdb; + int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, rdb_loading_ctx); + server.loading_rio = prev_rio; return retval; } diff --git a/src/rdb.h b/src/rdb.h index e9d53fa398..7342a926b5 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -172,7 +172,7 @@ int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); -int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); +int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err); int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); ssize_t rdbSaveFunctions(rio *rdb); diff --git a/src/replication.c b/src/replication.c index 3a207a1d0f..f907771e71 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2254,7 +2254,7 @@ void readSyncBulkPayload(connection *conn) { int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; - if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { + if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); @@ -2831,18 +2831,15 @@ typedef struct replDataBufBlock { * Reads replication data from primary into specified repl buffer block */ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t read) { int nread = connRead(conn, data_block->buf + data_block->used, read); - if (nread == -1) { - if (connGetState(conn) != CONN_STATE_CONNECTED) { - dualChannelServerLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn)); + if (nread <= 0) { + if (nread == 0 || connGetState(conn) != CONN_STATE_CONNECTED) { + dualChannelServerLog(LL_WARNING, "Provisional primary closed connection"); + /* Signal ongoing RDB load to terminate gracefully */ + if (server.loading_rio) rioCloseASAP(server.loading_rio); cancelReplicationHandshake(1); } return C_ERR; } - if (nread == 0) { - dualChannelServerLog(LL_VERBOSE, "Provisional primary closed connection"); - cancelReplicationHandshake(1); - return C_ERR; - } data_block->used += nread; server.stat_total_reads_processed++; return read - nread; diff --git a/src/rio.h b/src/rio.h index ee0f27aa7e..d5c3263e79 100644 --- a/src/rio.h +++ b/src/rio.h @@ -39,6 +39,7 @@ #define RIO_FLAG_READ_ERROR (1 << 0) #define RIO_FLAG_WRITE_ERROR (1 << 1) +#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */ #define RIO_TYPE_FILE (1 << 0) #define RIO_TYPE_BUFFER (1 << 1) @@ -115,7 +116,7 @@ typedef struct _rio rio; * if needed. */ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { - if (r->flags & RIO_FLAG_WRITE_ERROR) return 0; + if (r->flags & RIO_FLAG_WRITE_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0; while (len) { size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { } static inline size_t rioRead(rio *r, void *buf, size_t len) { - if (r->flags & RIO_FLAG_READ_ERROR) return 0; + if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0; while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; @@ -156,6 +157,10 @@ static inline int rioFlush(rio *r) { return r->flush(r); } +static inline void rioCloseASAP(rio *r) { + r->flags |= RIO_FLAG_CLOSE_ASAP; +} + /* This function allows to know if there was a read error in any past * operation, since the rio stream was created or since the last call * to rioClearError(). */ @@ -168,8 +173,13 @@ static inline int rioGetWriteError(rio *r) { return (r->flags & RIO_FLAG_WRITE_ERROR) != 0; } +/* Like rioGetReadError() but for async close errors. */ +static inline int rioGetAsyncCloseError(rio *r) { + return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0; +} + static inline void rioClearErrors(rio *r) { - r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR); + r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP); } void rioInitWithFile(rio *r, FILE *fp); diff --git a/src/server.c b/src/server.c index 3cdec9fa9b..8f2ddf75df 100644 --- a/src/server.c +++ b/src/server.c @@ -2218,6 +2218,7 @@ void initServerConfig(void) { server.fsynced_reploff_pending = 0; server.rdb_client_id = -1; server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT; + server.loading_rio = NULL; /* Replication partial resync backlog */ server.repl_backlog = NULL; diff --git a/src/server.h b/src/server.h index 841db70614..2f8b917267 100644 --- a/src/server.h +++ b/src/server.h @@ -2088,6 +2088,7 @@ struct valkeyServer { int dbid; } repl_provisional_primary; client *cached_primary; /* Cached primary to be reused for PSYNC. */ + rio *loading_rio; /* Pointer to the rio object currently used for loading data. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a replica */ int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */ diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index b4b9286d68..3adf9ce9fd 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -1158,8 +1158,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set repl-diskless-sync-delay 0 # Generating RDB will cost 100 sec to generate - $primary debug populate 10000 primary 1 - $primary config set rdb-key-save-delay 10000 + $primary debug populate 100000 primary 1 + $primary config set rdb-key-save-delay 1000 start_server {} { set replica [srv 0 client] @@ -1222,7 +1222,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { fail "replica didn't start sync session in time" } $primary debug log "killing replica main connection" - set replica_main_conn_id [get_client_id_by_last_cmd $primary "sync"] + set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"] assert {$replica_main_conn_id != ""} set loglines [count_log_lines -1] $primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry @@ -1247,3 +1247,59 @@ start_server {tags {"dual-channel-replication external:skip"}} { stop_write_load $load_handle } } + + +start_server {tags {"dual-channel-replication external:skip"}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + + $primary config set repl-diskless-sync yes + $primary config set dual-channel-replication-enabled yes + $primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry + + # Generating RDB will take 100 sec to generate + $primary debug populate 1000000 primary 1 + $primary config set rdb-key-save-delay -10 + + start_server {} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + + $replica config set dual-channel-replication-enabled yes + $replica config set loglevel debug + $replica config set repl-timeout 10 + $replica config set repl-diskless-load flush-before-load + + test "Replica notice main-connection killed during rdb load callback" {; # https://github.com/valkey-io/valkey/issues/1152 + set loglines [count_log_lines 0] + $replica replicaof $primary_host $primary_port + # Wait for sync session to start + wait_for_condition 500 1000 { + [string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] && + [string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] && + [s -1 rdb_bgsave_in_progress] eq 1 + } else { + fail "replica didn't start sync session in time" + } + wait_for_log_messages 0 {"*Loading RDB produced by Valkey version*"} $loglines 1000 10 + $primary set key val + set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"] + $primary debug log "killing replica main connection $replica_main_conn_id" + assert {$replica_main_conn_id != ""} + set loglines [count_log_lines 0] + $primary config set rdb-key-save-delay 0; # disable delay to allow next sync to succeed + $primary client kill id $replica_main_conn_id + # Wait for primary to abort the sync + wait_for_condition 50 1000 { + [string match {*replicas_waiting_psync:0*} [$primary info replication]] + } else { + fail "Primary did not free repl buf block after sync failure" + } + wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10 + verify_replica_online $primary 0 500 + } + } +}