Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Shabanov <[email protected]>
  • Loading branch information
alexander-shabanov committed Jan 16, 2025
1 parent d840115 commit cb5e97f
Show file tree
Hide file tree
Showing 18 changed files with 243 additions and 230 deletions.
21 changes: 7 additions & 14 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,9 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(int slot) {
return clusterSlotStatsEnabled() && slot != -1;
}

/* Accumulates egress bytes for the slot. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
if (!canAddNetworkBytesOut(slot)) return;
if (!clusterSlotStatsEnabled(slot)) return;

serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
Expand All @@ -152,7 +148,7 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c->slot)) return;
if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsPrimary(server.cluster->myself));
Expand All @@ -179,7 +175,7 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
if (!canAddNetworkBytesOut(slot)) return;
if (!clusterSlotStatsEnabled(slot)) return;

serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;
Expand Down Expand Up @@ -214,8 +210,7 @@ void clusterSlotStatResetAll(void) {
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return clusterSlotStatsEnabled() &&
c->slot != -1 && /* Command should be slot specific. */
return clusterSlotStatsEnabled(c->slot) &&
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
Expand All @@ -242,8 +237,7 @@ static int canAddNetworkBytesIn(client *c) {
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return clusterSlotStatsEnabled() && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
Expand Down Expand Up @@ -338,7 +332,6 @@ void clusterSlotStatsCommand(client *c) {
}
}

int clusterSlotStatsEnabled(void) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled; /* Cluster mode should be enabled. */
int clusterSlotStatsEnabled(int slot) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1;
}
2 changes: 1 addition & 1 deletion src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
int clusterSlotStatsEnabled(void);
int clusterSlotStatsEnabled(int slot);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
Expand Down
3 changes: 2 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3188,7 +3188,6 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("reply-offload", NULL, MODIFIABLE_CONFIG, server.reply_offload_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down Expand Up @@ -3251,6 +3250,8 @@ standardConfig static_configs[] = {
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-io-threads-reply-offload-on", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_for_reply_offload, 7, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-io-threads-value-prefetch-off", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_value_prefetch_off, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
Expand Down
10 changes: 10 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -618,3 +618,13 @@ int trySendAcceptToIOThreads(connection *conn) {

return C_OK;
}

int isReplyOffloadIndicatedByIOThreads(void) {
/* Starting min_io_threads_for_reply_offload I/O threads reply offload should be beneficial for any string size */
return server.min_io_threads_for_reply_offload && server.io_threads_num >= server.min_io_threads_for_reply_offload;
}

int isValuePrefetchIndicatedByIOThreads(void) {
/* Starting min_io_threads_value_prefetch_off I/O threads reply offload should be more efficient without value prefetch */
return server.io_threads_num < server.min_io_threads_value_prefetch_off;
}
2 changes: 2 additions & 0 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);
int trySendAcceptToIOThreads(connection *conn);
int isReplyOffloadIndicatedByIOThreads(void);
int isValuePrefetchIndicatedByIOThreads(void);

#endif /* IO_THREADS_H */
6 changes: 3 additions & 3 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "memory_prefetch.h"
#include "server.h"
#include "io_threads.h"

typedef enum {
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
Expand Down Expand Up @@ -119,9 +120,8 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
/* Not done yet */
moveToNextKey();
/* If reply offload enabled no need to prefetch value because main thread will not access it */
} else if (server.reply_offload_enabled) {
markKeyAsdone(info);
} else if (!isValuePrefetchIndicatedByIOThreads()) {
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}
Expand Down
Loading

0 comments on commit cb5e97f

Please sign in to comment.