Skip to content

Commit

Permalink
Merge branch 'unstable' into cluster_module_support_light_msg
Browse files Browse the repository at this point in the history
Signed-off-by: Harkrishn Patro <[email protected]>
  • Loading branch information
hpatro committed Feb 1, 2025
2 parents 1583bef + 78bcc0a commit df720b2
Show file tree
Hide file tree
Showing 19 changed files with 230 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ jobs:
notify-about-job-results:
runs-on: ubuntu-latest
if: always() && github.event_name == 'schedule' && github.repository == 'valkey-io/valkey'
needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-rpm-distros-jemalloc, test-rpm-distros-tls-module, test-rpm-distros-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator]
needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-tls-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-sanitizer-force-defrag, test-rpm-distros-jemalloc, test-rpm-distros-tls-module, test-rpm-distros-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator]
steps:
- name: Collect job status
run: |
Expand Down
10 changes: 5 additions & 5 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1532,10 +1532,11 @@ int loadSingleAppendOnlyFile(char *filename) {
}

/* Command lookup */
cmd = lookupCommand(argv, argc);
if (!cmd) {
serverLog(LL_WARNING, "Unknown command '%s' reading the append only file %s", (char *)argv[0]->ptr,
filename);
sds err = NULL;
fakeClient->cmd = fakeClient->lastcmd = cmd = lookupCommand(argv, argc);
if ((!cmd && !commandCheckExistence(fakeClient, &err)) || (cmd && !commandCheckArity(cmd, argc, &err))) {
serverLog(LL_WARNING, "Error reading the append only file %s, error: %s", filename, err);
sdsfree(err);
freeClientArgv(fakeClient);
ret = AOF_FAILED;
goto cleanup;
Expand All @@ -1544,7 +1545,6 @@ int loadSingleAppendOnlyFile(char *filename) {
if (cmd->proc == multiCommand) valid_before_multi = valid_up_to;

/* Run the command in the context of a fake client */
fakeClient->cmd = fakeClient->lastcmd = cmd;
if (fakeClient->flag.multi && fakeClient->cmd->proc != execCommand) {
/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
Expand Down
5 changes: 4 additions & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
if (rdbver_ptr) {
*rdbver_ptr = rdbver;
}
if (rdbver > RDB_VERSION) return C_ERR;
if ((rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX) ||
(rdbver > RDB_VERSION && server.rdb_version_check == RDB_VERSION_CHECK_STRICT)) {
return C_ERR;
}

if (server.skip_checksum_validation) return C_OK;

Expand Down
11 changes: 7 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,7 @@ void clusterInit(void) {
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_failed_primary_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
Expand Down Expand Up @@ -1738,14 +1739,16 @@ void freeClusterNode(clusterNode *n) {
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
serverAssert(dictDelete(server.cluster->nodes, nodename) == DICT_OK);
sdsfree(nodename);
sdsfree(n->hostname);
sdsfree(n->human_nodename);
sdsfree(n->announce_client_ipv4);
sdsfree(n->announce_client_ipv6);

/* Release links and associated data structures. */
if (n->link) freeClusterLink(n->link);
if (n->inbound_link) freeClusterLink(n->inbound_link);

/* Free these members after links are freed, as freeClusterLink may access them. */
sdsfree(n->hostname);
sdsfree(n->human_nodename);
sdsfree(n->announce_client_ipv4);
sdsfree(n->announce_client_ipv6);
listRelease(n->fail_reports);
zfree(n->replicas);
zfree(n);
Expand Down
5 changes: 5 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY},
{"milliseconds", LOG_TIMESTAMP_MILLISECONDS},
{NULL, 0}};

configEnum rdb_version_check_enum[] = {{"strict", RDB_VERSION_CHECK_STRICT},
{"relaxed", RDB_VERSION_CHECK_RELAXED},
{NULL, 0}};

/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
Expand Down Expand Up @@ -3244,6 +3248,7 @@ standardConfig static_configs[] = {
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL),
createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL),
createEnumConfig("rdb-version-check", NULL, MODIFIABLE_CONFIG, rdb_version_check_enum, server.rdb_version_check, RDB_VERSION_CHECK_STRICT, NULL, NULL),

/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
Expand Down
15 changes: 12 additions & 3 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <math.h>
#include <fcntl.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
Expand Down Expand Up @@ -1418,6 +1419,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
int j;

if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
/* TODO: Change this to "VALKEY%03d" next time we bump the RDB version. */
snprintf(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
Expand Down Expand Up @@ -3023,17 +3025,24 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
char buf[1024];
int error;
long long empty_keys_skipped = 0;
bool is_valkey_magic;

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf, "REDIS", 5) != 0) {
if (memcmp(buf, "REDIS0", 6) == 0) {
is_valkey_magic = false;
} else if (memcmp(buf, "VALKEY", 6) == 0) {
is_valkey_magic = true;
} else {
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
return C_ERR;
}
rdbver = atoi(buf + 5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
rdbver = atoi(buf + 6);
if (rdbver < 1 ||
(rdbver >= RDB_FOREIGN_VERSION_MIN && !is_valkey_magic) ||
(rdbver > RDB_VERSION && server.rdb_version_check == RDB_VERSION_CHECK_STRICT)) {
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
return C_ERR;
}
Expand Down
20 changes: 19 additions & 1 deletion src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,27 @@
#include "server.h"

/* The current RDB version. When the format changes in a way that is no longer
* backward compatible this number gets incremented. */
* backward compatible this number gets incremented.
*
* RDB 11 is the last open-source Redis RDB version, used by Valkey 7.x and 8.x.
*
* RDB 12+ are non-open-source Redis formats.
*
* Next time we bump the Valkey RDB version, use much higher version to avoid
* collisions with non-OSS Redis RDB versions. For example, we could use RDB
* version 90 for Valkey 9.0.
*
* In an RDB file/stream, we also check the magic string REDIS or VALKEY but in
* the DUMP/RESTORE format, there is only the RDB version number and no magic
* string. */
#define RDB_VERSION 11

/* Reserved range for foreign (unsupported, non-OSS) RDB format. */
#define RDB_FOREIGN_VERSION_MIN 12
#define RDB_FOREIGN_VERSION_MAX 79
static_assert(RDB_VERSION < RDB_FOREIGN_VERSION_MIN || RDB_VERSION > RDB_FOREIGN_VERSION_MAX,
"RDB version in foreign version range");

/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
* the first byte to interpreter the length:
Expand Down
73 changes: 51 additions & 22 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/mman.h>

#define CONN_TYPE_RDMA "rdma"

Expand Down Expand Up @@ -134,6 +135,8 @@ static list *pending_list;
static rdma_listener *rdma_listeners;
static serverRdmaContextConfig *rdma_config;

static size_t page_size;

static ConnectionType CT_RDMA;

static void serverRdmaError(char *err, const char *fmt, ...) {
Expand Down Expand Up @@ -191,31 +194,56 @@ static int rdmaPostRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCm
return C_OK;
}

/* To make Valkey forkable, buffer which is registered as RDMA
* memory region should be aligned to page size. And the length
* also need be aligned to page size.
/* To make Valkey forkable, buffer which is registered as RDMA memory region should be
* aligned to page size. And the length also need be aligned to page size.
* Random segment-fault case like this:
* 0x7f2764ac5000 - 0x7f2764ac7000
* |ptr0 128| ... |ptr1 4096| ... |ptr2 512|
*
* After ibv_reg_mr(pd, ptr1, 4096, access), the full range of 8K
* becomes DONTFORK. And the child process will hit a segment fault
* during access ptr0/ptr2.
* Note that the memory can be freed by libc free only.
* TODO: move it to zmalloc.c if necessary
* After ibv_reg_mr(pd, ptr1, 4096, access), the full range of 8K becomes DONTFORK. And
* the child process will hit a segment fault during access ptr0/ptr2.
*
* The portable posix_memalign(&tmp, page_size, aligned_size) would be fine too. However,
* RDMA is supported by Linux only, so it would not break anything. Using raw mmap syscall
* to allocate a separate virtual memory area(VMA), also make it protected by the 2 guard
* pages (a top one and a bottom one).
*/
static void *page_aligned_zalloc(size_t size) {
void *tmp;
size_t aligned_size, page_size = sysconf(_SC_PAGESIZE);
static void *rdmaMemoryAlloc(size_t size) {
size_t real_size, aligned_size = (size + page_size - 1) & (~(page_size - 1));
uint8_t *ptr;

aligned_size = (size + page_size - 1) & (~(page_size - 1));
if (posix_memalign(&tmp, page_size, aligned_size)) {
serverPanic("posix_memalign failed");
real_size = aligned_size + 2 * page_size;
ptr = mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (ptr == MAP_FAILED) {
serverPanic("failed to allocate memory for RDMA region");
}

madvise(ptr, real_size, MADV_DONTDUMP); /* no need to dump this VMA on coredump */
mprotect(ptr, page_size, PROT_NONE); /* top page of this VMA */
mprotect(ptr + size + page_size, page_size, PROT_NONE); /* bottom page of this VMA */

return ptr + page_size;
}

static void rdmaMemoryFree(void *ptr, size_t size) {
uint8_t *real_ptr;
size_t real_size, aligned_size;

if (!ptr) {
return;
}

memset(tmp, 0x00, aligned_size);
if ((unsigned long)ptr & (page_size - 1)) {
serverPanic("unaligned memory in use for RDMA region");
}

return tmp;
aligned_size = (size + page_size - 1) & (~(page_size - 1));
real_size = aligned_size + 2 * page_size;
real_ptr = (uint8_t *)ptr - page_size;

if (munmap(real_ptr, real_size)) {
serverPanic("failed to free memory for RDMA region");
}
}

static void rdmaDestroyIoBuf(RdmaContext *ctx) {
Expand All @@ -224,23 +252,23 @@ static void rdmaDestroyIoBuf(RdmaContext *ctx) {
ctx->rx.mr = NULL;
}

zlibc_free(ctx->rx.addr);
rdmaMemoryFree(ctx->rx.addr, ctx->rx.length);
ctx->rx.addr = NULL;

if (ctx->tx.mr) {
ibv_dereg_mr(ctx->tx.mr);
ctx->tx.mr = NULL;
}

zlibc_free(ctx->tx.addr);
rdmaMemoryFree(ctx->tx.addr, ctx->tx.length);
ctx->tx.addr = NULL;

if (ctx->cmd_mr) {
ibv_dereg_mr(ctx->cmd_mr);
ctx->cmd_mr = NULL;
}

zlibc_free(ctx->cmd_buf);
rdmaMemoryFree(ctx->cmd_buf, sizeof(ValkeyRdmaCmd) * VALKEY_RDMA_MAX_WQE * 2);
ctx->cmd_buf = NULL;
}

Expand All @@ -251,7 +279,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
int i;

/* setup CMD buf & MR */
ctx->cmd_buf = page_aligned_zalloc(length);
ctx->cmd_buf = rdmaMemoryAlloc(length);
ctx->cmd_mr = ibv_reg_mr(ctx->pd, ctx->cmd_buf, length, access);
if (!ctx->cmd_mr) {
serverLog(LL_WARNING, "RDMA: reg mr for CMD failed");
Expand All @@ -275,7 +303,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
/* setup recv buf & MR */
access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
length = rdma_config->rx_size;
ctx->rx.addr = page_aligned_zalloc(length);
ctx->rx.addr = rdmaMemoryAlloc(length);
ctx->rx.length = length;
ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access);
if (!ctx->rx.mr) {
Expand Down Expand Up @@ -387,7 +415,7 @@ static int rdmaAdjustSendbuf(RdmaContext *ctx, unsigned int length) {
}

/* create a new buffer & MR */
ctx->tx.addr = page_aligned_zalloc(length);
ctx->tx.addr = rdmaMemoryAlloc(length);
ctx->tx_length = length;
ctx->tx.mr = ibv_reg_mr(ctx->pd, ctx->tx.addr, length, access);
if (!ctx->tx.mr) {
Expand Down Expand Up @@ -1705,6 +1733,7 @@ static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, in

static void rdmaInit(void) {
pending_list = listCreate();
page_size = sysconf(_SC_PAGESIZE);

VALKEY_BUILD_BUG_ON(sizeof(ValkeyRdmaFeature) != 32);
VALKEY_BUILD_BUG_ON(sizeof(ValkeyRdmaKeepalive) != 32);
Expand Down
3 changes: 1 addition & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3911,7 +3911,7 @@ void afterCommand(client *c) {
int commandCheckExistence(client *c, sds *err) {
if (c->cmd) return 1;
if (!err) return 0;
if (isContainerCommandBySds(c->argv[0]->ptr)) {
if (isContainerCommandBySds(c->argv[0]->ptr) && c->argc >= 2) {
/* If we can't find the command but argv[0] by itself is a command
* it means we're dealing with an invalid subcommand. Print Help. */
sds cmd = sdsnew((char *)c->argv[0]->ptr);
Expand Down Expand Up @@ -4025,7 +4025,6 @@ int processCommand(client *c) {
return C_OK;
}


/* Check if the command is marked as protected and the relevant configuration allows it */
if (c->cmd->flags & CMD_PROTECTED) {
if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) ||
Expand Down
4 changes: 4 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ typedef enum { LOG_TIMESTAMP_LEGACY = 0,
LOG_TIMESTAMP_ISO8601,
LOG_TIMESTAMP_MILLISECONDS } log_timestamp_type;

typedef enum { RDB_VERSION_CHECK_STRICT = 0,
RDB_VERSION_CHECK_RELAXED } rdb_version_check_type;

/* common sets of actions to pause/unpause */
#define PAUSE_ACTIONS_CLIENT_WRITE_SET \
(PAUSE_ACTION_CLIENT_WRITE | PAUSE_ACTION_EXPIRE | PAUSE_ACTION_EVICT | PAUSE_ACTION_REPLICA)
Expand Down Expand Up @@ -1768,6 +1771,7 @@ struct valkeyServer {
int active_defrag_enabled;
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
int rdb_version_check; /* Try to load RDB produced by a future version. */
int jemalloc_bg_thread; /* Enable jemalloc background thread */
int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
Expand Down
Binary file added tests/assets/encodings-rdb987.rdb
Binary file not shown.
Loading

0 comments on commit df720b2

Please sign in to comment.