Skip to content

Commit

Permalink
remove CLUSTER SLOT-STATS command
Browse files Browse the repository at this point in the history
  • Loading branch information
hwware committed Jan 13, 2025
1 parent a47bb89 commit 511fb75
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 1,274 deletions.
187 changes: 0 additions & 187 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,111 +27,6 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);

return clusterNodeCoversSlot(primary, slot);
}

static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
int assigned_slots_count = 0;
for (int slot = start_slot; slot <= end_slot; slot++) {
if (doesSlotBelongToMyShard(slot)) {
assigned_slots[slot]++;
assigned_slots_count++;
}
}
return assigned_slots_count;
}

static uint64_t getSlotStat(int slot, slotStatType stat_type) {
uint64_t slot_stat = 0;
switch (stat_type) {
case KEY_COUNT: slot_stat = countKeysInSlot(slot); break;
case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break;
case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break;
case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break;
case SLOT_STAT_COUNT:
case INVALID: serverPanic("Invalid slot stat type %d was found.", stat_type);
}
return slot_stat;
}

/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortAscCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
if (entry_a.stat == entry_b.stat) {
return entry_a.slot - entry_b.slot;
}
return entry_a.stat - entry_b.stat;
}

/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortDescCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
if (entry_b.stat == entry_a.stat) {
return entry_a.slot - entry_b.slot;
}
return entry_b.stat - entry_a.stat;
}

static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
int i = 0;

for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) {
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
}
qsort(slot_stats, i, sizeof(slotStatForSort), (desc) ? slotStatForSortDescCmp : slotStatForSortAscCmp);
}

static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, (server.cluster_slot_stats_enabled) ? SLOT_STAT_COUNT
: 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));

/* Any additional metrics aside from key-count come with a performance trade-off,
* and are aggregated and returned based on its server config. */
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

/* Adds reply for the SLOTSRANGE variant.
* Response is ordered in ascending slot number. */
static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int startslot, int endslot, int len) {
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */

for (int slot = startslot; slot <= endslot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}

static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */

for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}
Expand Down Expand Up @@ -194,14 +89,6 @@ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(clien
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
Expand Down Expand Up @@ -269,77 +156,3 @@ void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

void clusterSlotStatsCommand(client *c) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}

/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 ||
(endslot = getSlotOrReply(c, c->argv[4])) == -1) {
return;
}
if (startslot > endslot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", startslot, endslot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS] = {UNASSIGNED_SLOT};
int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, startslot, endslot);
addReplySlotsRange(c, assigned_slots, startslot, endslot, assigned_slots_count);

} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1;
slotStatType order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit = CLUSTER_SLOTS;
while (i < c->argc) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
return;
}
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
addReplyOrderBy(c, order_by, limit, desc);

} else {
addReplySubcommandSyntaxError(c);
}
}
1 change: 0 additions & 1 deletion src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,6 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0}
};
Expand Down
111 changes: 0 additions & 111 deletions src/commands/cluster-slot-stats.json

This file was deleted.

1 change: 0 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3770,7 +3770,6 @@ void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);
void clusterSlotStatsCommand(client *c);
void restoreCommand(client *c);
void migrateCommand(client *c);
void askingCommand(client *c);
Expand Down
Loading

0 comments on commit 511fb75

Please sign in to comment.