diff --git a/src/defrag.c b/src/defrag.c index be7ff07510..8c1ad29de2 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -121,7 +121,7 @@ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdat // Private data for main dictionary keys typedef struct { kvstoreIterState kvstate; - serverDb *db; + int dbid; } defragKeysCtx; static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); @@ -736,7 +736,7 @@ static void defragModule(serverDb *db, robj *obj) { /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ static void defragKey(defragKeysCtx *ctx, robj **elemref) { - serverDb *db = ctx->db; + serverDb *db = &server.db[ctx->dbid]; int slot = ctx->kvstate.slot; robj *newob, *ob; unsigned char *newzl; @@ -920,7 +920,7 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) { robj *ob = found; long long key_defragged = server.stat_active_defrag_hits; - bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->db->id) == 1); + bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->dbid) == 1); if (key_defragged != server.stat_active_defrag_hits) { server.stat_active_defrag_key_hits++; } else { @@ -963,7 +963,10 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, state.cursor = 0; return DEFRAG_NOT_DONE; } - serverAssert(kvs == state.kvs); // Shouldn't change during the stage + if (kvs != state.kvs) { + // There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. + return DEFRAG_DONE; + } unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; @@ -1013,26 +1016,30 @@ static doneStatus defragStageKvstoreHelper(monotime endtime, } -// Note: target is a DB, (not a KVS like most stages) +// Target is a DBID static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) { UNUSED(privdata); - serverDb *db = (serverDb *)target; + int dbid = (uintptr_t)target; + serverDb *db = &server.db[dbid]; static defragKeysCtx ctx; // STATIC - this persists if (endtime == 0) { - ctx.db = db; + ctx.dbid = dbid; // Don't return yet. Call the helper with endtime==0 below. } - serverAssert(ctx.db == db); + serverAssert(ctx.dbid == dbid); return defragStageKvstoreHelper(endtime, db->keys, dbKeysScanCallback, defragLaterStep, &ctx); } +// Target is a DBID static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) { UNUSED(privdata); - return defragStageKvstoreHelper(endtime, (kvstore *)target, + int dbid = (uintptr_t)target; + serverDb *db = &server.db[dbid]; + return defragStageKvstoreHelper(endtime, db->expires, scanHashtableCallbackCountScanned, NULL, NULL); } @@ -1226,29 +1233,38 @@ static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long i } monotime starttime = getMonotonicUs(); - monotime endtime = starttime + computeDefragCycleUs(); + int dutyCycleUs = computeDefragCycleUs(); + monotime endtime = starttime + dutyCycleUs; + bool haveMoreWork = true; mstime_t latency; latencyStartMonitor(latency); - if (!defrag.current_stage) { - defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); - listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); - // Initialize the stage with endtime==0 - doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); - serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE - } + do { + if (!defrag.current_stage) { + defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages)); + listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages)); + // Initialize the stage with endtime==0 + doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata); + serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE + } - doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); - if (status == DEFRAG_DONE) { - zfree(defrag.current_stage); - defrag.current_stage = NULL; - } + doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata); + if (status == DEFRAG_DONE) { + zfree(defrag.current_stage); + defrag.current_stage = NULL; + } + + haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); + /* If we've completed a stage early, and still have a standard time allotment remaining, + * we'll start another stage. This can happen when defrag is running infrequently, and + * starvation protection has increased the duty-cycle. */ + } while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us); latencyEndMonitor(latency); latencyAddSampleIfNeeded("active-defrag-cycle", latency); - if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) { + if (haveMoreWork) { return computeDelayMs(endtime); } else { endDefragCycle(true); @@ -1287,9 +1303,8 @@ static void beginDefragCycle(void) { defrag.remaining_stages = listCreate(); for (int dbid = 0; dbid < server.dbnum; dbid++) { - serverDb *db = &server.db[dbid]; - addDefragStage(defragStageDbKeys, db, NULL); - addDefragStage(defragStageExpiresKvstore, db->expires, NULL); + addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL); + addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL); } static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels}; diff --git a/src/server.c b/src/server.c index 1e38b5ac69..8e65b1f5cd 100644 --- a/src/server.c +++ b/src/server.c @@ -1669,6 +1669,12 @@ void whileBlockedCron(void) { * latency monitor if this function is called too often. */ if (server.blocked_last_cron >= server.mstime) return; + /* Increment server.cronloops so that run_with_period works. */ + long hz_ms = 1000 / server.hz; + int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; // rounding up + server.blocked_last_cron += cronloops * hz_ms; + server.cronloops += cronloops; + mstime_t latency; latencyStartMonitor(latency); diff --git a/src/server.h b/src/server.h index 14a16593b0..e9332233aa 100644 --- a/src/server.h +++ b/src/server.h @@ -1900,8 +1900,7 @@ struct valkeyServer { int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */ int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */ int jemalloc_bg_thread; /* Enable jemalloc background thread */ - int active_defrag_configuration_changed; /* defrag configuration has been changed and need to reconsider - * active_defrag_running in computeDefragCycles. */ + int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index ce74b7c618..78a68a682d 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -172,10 +172,12 @@ run_solo {defrag} { # make sure the defragger did enough work to keep the fragmentation low during loading. # we cannot check that it went all the way down, since we don't wait for full defrag cycle to complete. assert {$frag < 1.4} - # since the AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands), - # it'll still not block the loading for long periods of time. + # The AOF contains simple (fast) SET commands (and the cron during loading runs every 1024 commands). + # Even so, defrag can get starved for periods exceeding 100ms. Using 200ms for test stability, and + # a 75% CPU requirement (as set above), we should allow up to 600ms latency + # (as total time = 200 non duty + 600 duty = 800ms, and 75% of 800ms is 600ms). if {!$::no_latency} { - assert {$max_latency <= 40} + assert {$max_latency <= 600} } } } ;# Active defrag - AOF loading