Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature COMMANDLOG to record slow execution and large request/reply #1294

Open
wants to merge 26 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3aba75c
change slowlog to commandlog and add heavytraffic
soloestoy Oct 21, 2024
91dbd11
add commandlog and its subcommands
soloestoy Nov 12, 2024
ef57a33
explain in valkey.conf
soloestoy Nov 12, 2024
b0806f2
add test case
soloestoy Nov 12, 2024
df4f9f3
change skip-slowlog to skip-commandlog
soloestoy Nov 12, 2024
e809abc
make clang-format happy
soloestoy Nov 12, 2024
8014aaa
Merge branch 'unstable' into feature-commandlog
soloestoy Nov 13, 2024
7c33199
change slowlog to commandlog for cmake
soloestoy Nov 13, 2024
6caadba
fix type order
soloestoy Nov 13, 2024
28bd4af
change version to 8.1.0
soloestoy Nov 14, 2024
03a6736
rename heavytraffic to large
soloestoy Nov 20, 2024
804bc60
Merge remote-tracking branch 'upstream/unstable' into feature-commandlog
soloestoy Nov 28, 2024
2fe4507
rename config
soloestoy Nov 28, 2024
b9dc3ff
Merge branch 'unstable' into feature-commandlog
soloestoy Jan 13, 2025
432d035
fix typo
soloestoy Jan 13, 2025
e09304d
Update src/commandlog.c
soloestoy Jan 13, 2025
23ce598
fix format
soloestoy Jan 13, 2025
0b8f651
chage large reply's default value to 1MB
soloestoy Jan 15, 2025
5aa87fd
change commandlog type to enum
soloestoy Jan 15, 2025
c86bc80
chage large request's default value to 1MB
soloestoy Jan 16, 2025
ac79536
Merge branch 'unstable' into feature-commandlog
soloestoy Jan 20, 2025
8df87ef
for comment
soloestoy Jan 21, 2025
885a11c
Apply suggestions from code review
soloestoy Jan 21, 2025
0912399
Update valkey.conf
soloestoy Jan 21, 2025
05f3d90
Update valkey.conf
soloestoy Jan 21, 2025
63d1d88
Update valkey.conf
soloestoy Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/cluster_slot_stats.c
${CMAKE_SOURCE_DIR}/src/crc16.c
${CMAKE_SOURCE_DIR}/src/endianconv.c
${CMAKE_SOURCE_DIR}/src/slowlog.c
${CMAKE_SOURCE_DIR}/src/commandlog.c
${CMAKE_SOURCE_DIR}/src/eval.c
${CMAKE_SOURCE_DIR}/src/bio.c
${CMAKE_SOURCE_DIR}/src/rio.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
16 changes: 8 additions & 8 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/

#include "server.h"
#include "slowlog.h"
#include "commandlog.h"
#include "latency.h"
#include "monotonic.h"
#include "cluster_slot_stats.h"
Expand Down Expand Up @@ -117,15 +117,15 @@ void blockClient(client *c, int btype) {
* he will attempt to reprocess the command which will update the statistics.
* However in case the client was timed out or in case of module blocked client is being unblocked
* the command will not be reprocessed and we need to make stats update.
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.
* This function will make updates to the commandstats, slot-stats, commandlog and monitors.
* The failed_or_rejected parameter is an indication that the blocked command was either failed internally or
* rejected/aborted externally. In case the command was rejected the value ERROR_COMMAND_REJECTED should be passed.
* In case the command failed internally, ERROR_COMMAND_FAILED should be passed.
* A value of zero indicate no error was reported after the command was unblocked */
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int failed_or_rejected) {
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->duration += blocked_us + reply_us;
c->lastcmd->microseconds += c->duration;
clusterSlotStatsAddCpuDuration(c, c->duration);
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
Expand All @@ -139,9 +139,9 @@ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int failed_
debugServerAssertWithInfo(c, NULL, 0);
}
if (server.latency_tracking_enabled)
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration * 1000);
/* Log the command into the Slow log if needed. */
slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), c->duration * 1000);
/* Log the command into the commandlog if needed. */
commandlogPushCurrentCommand(c, c->lastcmd);
c->duration = 0;
/* Log the reply duration event. */
latencyAddSampleIfNeeded("command-unblocking", reply_us / 1000);
Expand Down
242 changes: 242 additions & 0 deletions src/commandlog.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/* Commandlog implements a system that is able to remember the latest N
* queries that took more than M microseconds to execute, or consumed
* too much network bandwidth and memory for input/output buffers.
*
* The execution time to reach to be logged in the slow log is set
* using the 'commandlog-execution-slower-than' config directive, that is also
* readable and writable using the CONFIG SET/GET command.
*
* Other configurations such as `commandlog-request-larger-than` and
* `commandlog-reply-larger-than` can be found with more detailed
* explanations in the config file.
*
* The command log is actually not "logged" in the server log file
* but is accessible thanks to the COMMANDLOG command.
*
* ----------------------------------------------------------------------------
*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "commandlog.h"

/* Create a new commandlog entry.
* Incrementing the ref count of all the objects retained is up to
* this function. */
commandlogEntry *commandlogCreateEntry(client *c, robj **argv, int argc, long long value, int type) {
soloestoy marked this conversation as resolved.
Show resolved Hide resolved
commandlogEntry *ce = zmalloc(sizeof(*ce));
int j, slargc = argc;
soloestoy marked this conversation as resolved.
Show resolved Hide resolved

if (slargc > COMMANDLOG_ENTRY_MAX_ARGC) slargc = COMMANDLOG_ENTRY_MAX_ARGC;
ce->argc = slargc;
ce->argv = zmalloc(sizeof(robj *) * slargc);
for (j = 0; j < slargc; j++) {
/* Logging too many arguments is a useless memory waste, so we stop
* at COMMANDLOG_ENTRY_MAX_ARGC, but use the last argument to specify
* how many remaining arguments there were in the original command. */
if (slargc != argc && j == slargc - 1) {
ce->argv[j] =
createObject(OBJ_STRING, sdscatprintf(sdsempty(), "... (%d more arguments)", argc - slargc + 1));
} else {
/* Trim too long strings as well... */
if (argv[j]->type == OBJ_STRING && sdsEncodedObject(argv[j]) &&
sdslen(argv[j]->ptr) > COMMANDLOG_ENTRY_MAX_STRING) {
sds s = sdsnewlen(argv[j]->ptr, COMMANDLOG_ENTRY_MAX_STRING);

s = sdscatprintf(s, "... (%lu more bytes)",
(unsigned long)sdslen(argv[j]->ptr) - COMMANDLOG_ENTRY_MAX_STRING);
ce->argv[j] = createObject(OBJ_STRING, s);
} else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) {
ce->argv[j] = argv[j];
} else {
/* Here we need to duplicate the string objects composing the
* argument vector of the command, because those may otherwise
* end shared with string objects stored into keys. Having
* shared objects between any part of the server, and the data
* structure holding the data, is a problem: FLUSHALL ASYNC
* may release the shared string object and create a race. */
ce->argv[j] = dupStringObject(argv[j]);
}
}
}
ce->time = time(NULL);
ce->value = value;
ce->id = server.commandlog[type].entry_id++;
ce->peerid = sdsnew(getClientPeerId(c));
ce->cname = c->name ? sdsnew(c->name->ptr) : sdsempty();
return ce;
}

/* Free a command log entry. The argument is void so that the prototype of this
* function matches the one of the 'free' method of adlist.c.
*
* This function will take care to release all the retained object. */
void commandlogFreeEntry(void *ceptr) {
commandlogEntry *ce = ceptr;
int j;

for (j = 0; j < ce->argc; j++) decrRefCount(ce->argv[j]);
zfree(ce->argv);
sdsfree(ce->peerid);
sdsfree(ce->cname);
zfree(ce);
}

/* Initialize the command log. This function should be called a single time
* at server startup. */
void commandlogInit(void) {
for (int i = 0; i < COMMANDLOG_TYPE_NUM; i++) {
server.commandlog[i].entries = listCreate();
server.commandlog[i].entry_id = 0;
listSetFreeMethod(server.commandlog[i].entries, commandlogFreeEntry);
}
}

/* Push a new entry into the command log.
* This function will make sure to trim the command log accordingly to the
* configured max length. */
void commandlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long value, int type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once commandlogPushCurrentCommand is moved into this file, I think it would be beneficial to inline commandlogPushEntryIfNeeded to help reduce the likelihood of branching since we are now testing 3 different thresholds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should trust the compiler to do the inlining. As long we have o3 enabled it may choose to inline functions, we don't need to mark it explicitly.

if (server.commandlog[type].threshold < 0 || server.commandlog[type].max_len == 0) return; /* The corresponding commandlog disabled */
if (value >= server.commandlog[type].threshold)
listAddNodeHead(server.commandlog[type].entries, commandlogCreateEntry(c, argv, argc, value, type));

/* Remove old entries if needed. */
while (listLength(server.commandlog[type].entries) > server.commandlog[type].max_len) listDelNode(server.commandlog[type].entries, listLast(server.commandlog[type].entries));
}

/* Remove all the entries from the current command log of the specified type. */
void commandlogReset(int type) {
while (listLength(server.commandlog[type].entries) > 0) listDelNode(server.commandlog[type].entries, listLast(server.commandlog[type].entries));
}

/* Reply command logs to client. */
void commandlogGetReply(client *c, int type, long count) {
listIter li;
listNode *ln;
commandlogEntry *ce;

if (count > (long)listLength(server.commandlog[type].entries)) {
count = listLength(server.commandlog[type].entries);
}
addReplyArrayLen(c, count);
listRewind(server.commandlog[type].entries, &li);
while (count--) {
int j;

ln = listNext(&li);
ce = ln->value;
addReplyArrayLen(c, 6);
addReplyLongLong(c, ce->id);
addReplyLongLong(c, ce->time);
addReplyLongLong(c, ce->value);
addReplyArrayLen(c, ce->argc);
for (j = 0; j < ce->argc; j++) addReplyBulk(c, ce->argv[j]);
addReplyBulkCBuffer(c, ce->peerid, sdslen(ce->peerid));
addReplyBulkCBuffer(c, ce->cname, sdslen(ce->cname));
}
}

/* The SLOWLOG command. Implements all the subcommands needed to handle the
* slow log. */
void slowlogCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
const char *help[] = {
"GET [<count>]",
" Return top <count> entries from the slowlog (default: 10, -1 mean all).",
" Entries are made of:",
" id, timestamp, time in microseconds, arguments array, client IP and port,",
" client name",
"LEN",
" Return the length of the slowlog.",
"RESET",
" Reset the slowlog.",
NULL,
};
addReplyHelp(c, help);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "reset")) {
commandlogReset(COMMANDLOG_TYPE_SLOW);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the SLOW list can be reset? Why are the others not supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this is only for the old slowlog

addReply(c, shared.ok);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "len")) {
addReplyLongLong(c, listLength(server.commandlog[COMMANDLOG_TYPE_SLOW].entries));
} else if ((c->argc == 2 || c->argc == 3) && !strcasecmp(c->argv[1]->ptr, "get")) {
long count = 10;

if (c->argc == 3) {
/* Consume count arg. */
if (getRangeLongFromObjectOrReply(c, c->argv[2], -1, LONG_MAX, &count,
"count should be greater than or equal to -1") != C_OK)
return;

if (count == -1) {
/* We treat -1 as a special value, which means to get all slow logs.
* Simply set count to the length of server.commandlog. */
count = listLength(server.commandlog[COMMANDLOG_TYPE_SLOW].entries);
}
}

commandlogGetReply(c, COMMANDLOG_TYPE_SLOW, count);
} else {
addReplySubcommandSyntaxError(c);
}
}

int commandlogGetTypeOrReply(client *c, robj *o) {
if (!strcasecmp(o->ptr, "slow")) return COMMANDLOG_TYPE_SLOW;
if (!strcasecmp(o->ptr, "large-request")) return COMMANDLOG_TYPE_LARGE_REQUEST;
if (!strcasecmp(o->ptr, "large-reply")) return COMMANDLOG_TYPE_LARGE_REPLY;
addReplyError(c, "type should be one of the following: slow, large-request, large-reply");
return -1;
}

/* The COMMANDLOG command. Implements all the subcommands needed to handle the
* command log. */
void commandlogCommand(client *c) {
int type;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
const char *help[] = {
"GET <count> <type>",
" Return top <count> entries of the specified <type> from the commandlog (-1 mean all).",
" Entries are made of:",
" id, timestamp,",
" time in microseconds for type of slow,",
" or size in bytes for type of large-request,",
" or size in bytes for type of large-reply",
" arguments array, client IP and port,",
" client name",
"LEN <type>",
" Return the length of the specified type of commandlog.",
"RESET <type>",
" Reset the specified type of commandlog.",
NULL,
};
addReplyHelp(c, help);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr, "reset")) {
if ((type = commandlogGetTypeOrReply(c, c->argv[2])) == -1) return;
commandlogReset(type);
addReply(c, shared.ok);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr, "len")) {
if ((type = commandlogGetTypeOrReply(c, c->argv[2])) == -1) return;
addReplyLongLong(c, listLength(server.commandlog[type].entries));
} else if (c->argc == 4 && !strcasecmp(c->argv[1]->ptr, "get")) {
long count;

/* Consume count arg. */
if (getRangeLongFromObjectOrReply(c, c->argv[2], -1, LONG_MAX, &count,
"count should be greater than or equal to -1") != C_OK)
return;

if ((type = commandlogGetTypeOrReply(c, c->argv[3])) == -1) return;

if (count == -1) {
/* We treat -1 as a special value, which means to get all command logs.
* Simply set count to the length of server.commandlog. */
count = listLength(server.commandlog[type].entries);
}

commandlogGetReply(c, type, count);
} else {
addReplySubcommandSyntaxError(c);
}
}
30 changes: 15 additions & 15 deletions src/slowlog.h → src/commandlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __SLOWLOG_H__
#define __SLOWLOG_H__
#ifndef __COMMANDLOG_H__
#define __COMMANDLOG_H__

#include "server.h"

#define SLOWLOG_ENTRY_MAX_ARGC 32
#define SLOWLOG_ENTRY_MAX_STRING 128
#define COMMANDLOG_ENTRY_MAX_ARGC 32
#define COMMANDLOG_ENTRY_MAX_STRING 128

/* This structure defines an entry inside the slow log list */
typedef struct slowlogEntry {
/* This structure defines an entry inside the command log list */
typedef struct commandlogEntry {
robj **argv;
int argc;
long long id; /* Unique entry identifier. */
long long duration; /* Time spent by the query, in microseconds. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds peerid; /* Client network address. */
} slowlogEntry;
long long id; /* Unique entry identifier. */
long long value; /* The meaning is determined by the type of command log. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds peerid; /* Client network address. */
} commandlogEntry;

/* Exported API */
void slowlogInit(void);
void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long duration);
void commandlogInit(void);
void commandlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long value, int type);

#endif /* __SLOWLOG_H__ */
#endif /* __COMMANDLOG_H__ */
Loading
Loading