Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] sentinel use info sentinel command to run faster #1511

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3373,6 +3373,9 @@ standardConfig static_configs[] = {
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),

/* Capabalities */
createBoolConfig("info-simple-for-sentinel", NULL, IMMUTABLE_CONFIG, server.info_simple_for_sentinel, 1, NULL, NULL),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config only for sentinel? If yes, I think it should be moved to sentinel.c, we have some specific config parameters for sentinel node.

Copy link
Contributor Author

@kukey kukey Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,this config only for sentinel, but it's mean valkey has a certain ability that sentinel could send diff command to collect instance stats.
It's a server capability. I think it can't move to sentinel.c.


/* NULL Terminator, this is dropped when we convert to the runtime array. */
{NULL},
};
Expand Down
22 changes: 19 additions & 3 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ typedef struct sentinelValkeyInstance {
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
sds info; /* cached INFO output */
int info_simple; /* Instance support info simple for sentinel. */
} sentinelValkeyInstance;

/* Main state. */
Expand Down Expand Up @@ -1346,6 +1347,9 @@ sentinelValkeyInstance *createSentinelValkeyInstance(char *name,
ri->role_reported_time = mstime();
ri->replica_conf_change_time = mstime();

/* capability */
ri->info_simple = 0;

/* Add into the right table. */
dictAdd(table, ri->name, ri);
return ri;
Expand Down Expand Up @@ -2358,6 +2362,8 @@ void sentinelReconnectInstance(sentinelValkeyInstance *ri) {

/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);

ri->info_simple = 0;
}
}
/* Pub / Sub */
Expand Down Expand Up @@ -2429,6 +2435,11 @@ void sentinelRefreshInstanceInfo(sentinelValkeyInstance *ri, const char *info) {
sentinelValkeyInstance *replica;
sds l = lines[j];

/* capability for info simple */
if (sdslen(l) >= 26 && !memcmp(l, "info_simple_for_sentinel:", 25)) {
ri->info_simple = atoi(l + 25);
}

/* run_id:<40 hex chars>*/
if (sdslen(l) >= 47 && !memcmp(l, "run_id:", 7)) {
if (ri->runid == NULL) {
Expand Down Expand Up @@ -2997,8 +3008,13 @@ void sentinelSendPeriodicCommands(sentinelValkeyInstance *ri) {

/* Send INFO to primaries and replicas, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) {
retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri, "INFO"));
if (ri->info_simple) {
retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s %s",
sentinelInstanceMapCommand(ri, "INFO"), "SENTINEL");
} else {
retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri, "INFO"));
}
if (retval == C_OK) ri->link->pending_commands++;
}

Expand Down
99 changes: 64 additions & 35 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -5557,6 +5557,41 @@ void totalNumberOfStatefulKeys(unsigned long *blocking_keys,
if (watched_keys) *watched_keys = wkeys;
}

static sds genValKeyReplicasInfo(sds info) {
if (listLength(server.replicas)) {
int replica_id = 0;
listNode *ln;
listIter li;

listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = listNodeValue(ln);
char ip[NET_IP_STR_LEN], *replica_ip = replica->replica_addr;
int port;
long lag = 0;

if (!replica_ip) {
if (connAddrPeerName(replica->conn, ip, sizeof(ip), &port) == -1) continue;
replica_ip = ip;
}
const char *state = replstateToString(replica->repl_state);
if (state[0] == '\0') continue;
if (replica->repl_state == REPLICA_STATE_ONLINE) lag = time(NULL) - replica->repl_ack_time;

info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld,type=%s\r\n",
replica_id, replica_ip, replica->replica_listening_port, state,
replica->repl_ack_off, lag,
replica->flag.repl_rdb_channel ? "rdb-channel"
: replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-channel"
: "replica");
replica_id++;
}
}
return info;
}

/* Create the string returned by the INFO command. This is decoupled
* by the INFO command itself as we need to report the same information
* on memory corruption problems. */
Expand All @@ -5567,6 +5602,29 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
int sections = 0;
if (everything) all_sections = 1;

if (dictFind(section_dict, "sentinel") != NULL) {
info = sdscatprintf(info, FMTARGS(
"run_id:%s\r\n", server.runid,
"role:%s\r\n", server.primary_host == NULL ? "master" : "slave"));
info = genValKeyReplicasInfo(info);
if (server.primary_host) {
long long replica_repl_offset = 1;
if (server.primary) {
replica_repl_offset = server.primary->reploff;
} else if (server.cached_primary) {
replica_repl_offset = server.cached_primary->reploff;
}

info = sdscatprintf(info, FMTARGS(
"master_host:%s\r\n", server.primary_host,
"master_port:%d\r\n", server.primary_port,
"master_link_status:%s\r\n", (server.repl_state == REPL_STATE_CONNECTED) ? "up" : "down",
"slave_priority:%d\r\n", server.replica_priority,
"slave_repl_offset:%lld\r\n", replica_repl_offset,
"replica_announced:%d\r\n", server.replica_announced));
}
}

/* Server */
if (all_sections || (dictFind(section_dict, "server") != NULL)) {
static int call_uname = 1;
Expand Down Expand Up @@ -5631,7 +5689,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"executable:%s\r\n", server.executable ? server.executable : "",
"config_file:%s\r\n", server.configfile ? server.configfile : "",
"io_threads_active:%i\r\n", server.active_io_threads_num > 1,
"availability_zone:%s\r\n", server.availability_zone));
"availability_zone:%s\r\n", server.availability_zone,
"info_simple_for_sentinel:%i\r\n", server.info_simple_for_sentinel));

/* Conditional properties */
if (isShutdownInitiated()) {
Expand Down Expand Up @@ -6001,37 +6060,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
info = sdscatprintf(info, "min_slaves_good_slaves:%d\r\n", server.repl_good_replicas_count);
}

if (listLength(server.replicas)) {
int replica_id = 0;
listNode *ln;
listIter li;

listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = listNodeValue(ln);
char ip[NET_IP_STR_LEN], *replica_ip = replica->replica_addr;
int port;
long lag = 0;

if (!replica_ip) {
if (connAddrPeerName(replica->conn, ip, sizeof(ip), &port) == -1) continue;
replica_ip = ip;
}
const char *state = replstateToString(replica->repl_state);
if (state[0] == '\0') continue;
if (replica->repl_state == REPLICA_STATE_ONLINE) lag = time(NULL) - replica->repl_ack_time;

info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld,type=%s\r\n",
replica_id, replica_ip, replica->replica_listening_port, state,
replica->repl_ack_off, lag,
replica->flag.repl_rdb_channel ? "rdb-channel"
: replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-channel"
: "replica");
replica_id++;
}
}
info = genValKeyReplicasInfo(info);
info = sdscatprintf(
info,
FMTARGS(
Expand Down Expand Up @@ -6562,8 +6591,8 @@ void dismissMemoryInChild(void) {
/* madvise(MADV_DONTNEED) may not work if Transparent Huge Pages is enabled. */
if (server.thp_enabled) return;

/* Currently we use zmadvise_dontneed only when we use jemalloc with Linux.
* so we avoid these pointless loops when they're not going to do anything. */
/* Currently we use zmadvise_dontneed only when we use jemalloc with Linux.
* so we avoid these pointless loops when they're not going to do anything. */
#if defined(USE_JEMALLOC) && defined(__linux__)
listIter li;
listNode *ln;
Expand Down Expand Up @@ -7008,7 +7037,7 @@ __attribute__((weak)) int main(int argc, char **argv) {
}
if (server.sentinel_mode) sentinelCheckConfigFile();

/* Do system checks */
/* Do system checks */
#ifdef __linux__
linuxMemoryWarnings();
sds err_msg = NULL;
Expand Down
3 changes: 3 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,9 @@ struct valkeyServer {
/* Local environment */
char *locale_collate;
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */

/* capabilities */
int info_simple_for_sentinel; /* server support simple info for sentinel. */
};

#define MAX_KEYS_BUFFER 256
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ start_server {tags {"introspection"}} {
rdma-rx-size
rdma-bind
rdma-port
info-simple-for-sentinel
}

if {!$::tls} {
Expand Down
Loading