Skip to content

Commit

Permalink
Switch from preallocating all databases to lazy allocation,
Browse files Browse the repository at this point in the history
reducing overhead and improving scalability for large database counts.
  • Loading branch information
xbasel committed Jan 27, 2025
1 parent 3f21705 commit 33da0ea
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 105 deletions.
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2210,8 +2210,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {

for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
if (databaseEmpty(j)) continue;
serverDb *db = server.db[j];

/* SELECT the new DB */
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
Expand Down
6 changes: 3 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreHashtableSize(server.db->keys, slot);
return kvstoreHashtableSize(server.db[0]->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -910,7 +910,7 @@ void clusterCommand(client *c) {
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreHashtableIterator *kvs_di = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0);
kvs_di = kvstoreGetHashtableIterator(server.db[0]->keys, slot, 0);
for (unsigned int i = 0; i < numkeys; i++) {
void *next;
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
Expand Down Expand Up @@ -1099,7 +1099,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !pubsubshard_included) {
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
if (lookupKeyReadWithFlags(server.db[0], thiskey, flags) == NULL)
missing_keys++;
else
existing_keys++;
Expand Down
17 changes: 9 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5711,7 +5711,7 @@ int verifyClusterConfigWithData(void) {

/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (kvstoreSize(server.db[j].keys)) return C_ERR;
if (!databaseEmpty(j)) return C_ERR;
}

/* Check that all the slots we see populated memory have a corresponding
Expand Down Expand Up @@ -6347,19 +6347,20 @@ unsigned int delKeysInSlot(unsigned int hashslot) {

kvstoreHashtableIterator *kvs_di = NULL;
void *next;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE);
serverDb *db = server.db[0];
kvs_di = kvstoreGetHashtableIterator(db->keys, hashslot, HASHTABLE_ITER_SAFE);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, &server.db[0], key);
dbDelete(db, key);
propagateDeletion(db, key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, db, key);
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(key);
Expand Down Expand Up @@ -6827,7 +6828,7 @@ int clusterCommandSpecial(client *c) {
}
} else if (!strcasecmp(c->argv[1]->ptr, "flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (kvstoreSize(server.db[0].keys) != 0) {
if (!databaseEmpty(0)) {
addReplyError(c, "DB must be empty to perform CLUSTER FLUSHSLOTS.");
return 1;
}
Expand Down Expand Up @@ -6968,7 +6969,7 @@ int clusterCommandSpecial(client *c) {
/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || !databaseEmpty(0))) {
addReplyError(c, "To set a master the node must be empty and "
"without assigned slots.");
return 1;
Expand Down
50 changes: 30 additions & 20 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ robj *dbUnshareStringValue(serverDb *db, robj *key, robj *o) {
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
* The function returns the number of keys removed from the database(s). */
long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long emptyDbStructure(serverDb **dbarray, int dbnum, int async, void(callback)(hashtable *)) {
long long removed = 0;
int startdb, enddb;

Expand All @@ -583,16 +583,17 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
}

for (int j = startdb; j <= enddb; j++) {
removed += kvstoreSize(dbarray[j].keys);
if (dbarray[j] == NULL) continue;
removed += kvstoreSize(dbarray[j]->keys);
if (async) {
emptyDbAsync(&dbarray[j]);
emptyDbAsync(dbarray[j]);
} else {
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
kvstoreEmpty(dbarray[j]->keys, callback);
kvstoreEmpty(dbarray[j]->expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
dbarray[j]->avg_ttl = 0;
dbarray[j]->expires_cursor = 0;
}

return removed;
Expand Down Expand Up @@ -669,28 +670,31 @@ serverDb *initTempDb(void) {
}

/* Discard tempDb, it's always async. */
void discardTempDb(serverDb *tempDb) {
void discardTempDb(serverDb **tempDb) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, 1, NULL);
for (int i = 0; i < server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
if (tempDb[i]) {
kvstoreRelease(tempDb[i]->keys);
kvstoreRelease(tempDb[i]->expires);
}
}

zfree(tempDb);
}

int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum) return C_ERR;
c->db = &server.db[id];
initDatabase(id);
c->db = server.db[id];
return C_OK;
}

long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += kvstoreSize(server.db[j].keys);
if (databaseEmpty(j)) continue;
total += kvstoreSize(server.db[j]->keys);
}
return total;
}
Expand Down Expand Up @@ -721,8 +725,9 @@ void signalFlushedDb(int dbid, int async) {
}

for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
if (server.db[j] == NULL) continue;
scanDatabaseForDeletedKeys(server.db[j], NULL);
touchAllWatchedKeysInDb(server.db[j], NULL);
}

trackingInvalidateKeysOnFlush(async);
Expand Down Expand Up @@ -1641,8 +1646,10 @@ void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with) {
int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
serverDb aux = server.db[id1];
serverDb *db1 = &server.db[id1], *db2 = &server.db[id2];
initDatabase(id1);
initDatabase(id2);
serverDb aux = *server.db[id1];
serverDb *db1 = server.db[id1], *db2 = server.db[id2];

/* Swapdb should make transaction fail if there is any
* client watching keys */
Expand Down Expand Up @@ -1683,10 +1690,13 @@ int dbSwapDatabases(int id1, int id2) {
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(serverDb *tempDb) {
void swapMainDbWithTempDb(serverDb **tempDb) {
for (int i = 0; i < server.dbnum; i++) {
serverDb aux = server.db[i];
serverDb *activedb = &server.db[i], *newdb = &tempDb[i];
if (tempDb[i] == NULL && server.db[i] == NULL) continue;
if (tempDb[i] == NULL) tempDb[i] = createDatabase(i);
if (server.db[i] == NULL) server.db[i] = createDatabase(i);
serverDb aux = *server.db[i];
serverDb *activedb = server.db[i], *newdb = tempDb[i];

/* Swapping databases should make transaction fail if there is any
* client watching keys. */
Expand Down
18 changes: 12 additions & 6 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ void computeDatasetDigest(unsigned char *final) {
memset(final, 0, 20); /* Start with a clean result */

for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
serverDb *db = server.db[j];
if (db == NULL || kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);

/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
Expand Down Expand Up @@ -907,14 +907,20 @@ void debugCommand(client *c) {
if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;

stats = sdscatprintf(stats, "[Dictionary HT]\n");
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
serverDb *db = server.db[dbid];
if (db) {
kvstoreGetStats(db->keys, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}

stats = sdscatprintf(stats, "[Expires HT]\n");
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
if (db) {
kvstoreGetStats(db->expires, buf, sizeof(buf), full);
stats = sdscat(stats, buf);
}

addReplyVerbatim(c, stats, sdslen(stats), "txt");

sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
int full = 0;
Expand Down
7 changes: 4 additions & 3 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = &server.db[ctx->dbid];
serverDb *db = server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
Expand Down Expand Up @@ -987,7 +987,7 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];

static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
Expand All @@ -1005,7 +1005,7 @@ static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privda
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
serverDb *db = server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}
Expand Down Expand Up @@ -1273,6 +1273,7 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();

for (int dbid = 0; dbid < server.dbnum; dbid++) {
if (databaseEmpty(dbid)) continue;
addDefragStage(defragStageDbKeys, (void *)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void *)(uintptr_t)dbid, NULL);
}
Expand Down
12 changes: 7 additions & 5 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ int performEvictions(void) {
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db + i;
db = server.db[i];
if (db == NULL) continue;;
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = db->keys;
Expand Down Expand Up @@ -601,9 +602,9 @@ int performEvictions(void) {

kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = server.db[bestdbid].keys;
kvs = server.db[bestdbid]->keys;
} else {
kvs = server.db[bestdbid].expires;
kvs = server.db[bestdbid]->expires;
}
void *entry = NULL;
int found = kvstoreHashtableFind(kvs, pool[k].slot, pool[k].key, &entry);
Expand Down Expand Up @@ -634,7 +635,8 @@ int performEvictions(void) {
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db + j;
db = server.db[j];
if (db == NULL) continue;;
kvstore *kvs;
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
kvs = db->keys;
Expand All @@ -653,7 +655,7 @@ int performEvictions(void) {

/* Finally remove the selected key. */
if (bestkey) {
db = server.db + bestdbid;
db = server.db[bestdbid];
robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
Expand Down
14 changes: 9 additions & 5 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void activeExpireCycle(int type) {
data.ttl_sum = 0;
data.ttl_samples = 0;

serverDb *db = server.db + (current_db % server.dbnum);
serverDb *db = server.db[(current_db % server.dbnum)];
data.db = db;

int db_done = 0; /* The scan of the current DB is done? */
Expand All @@ -245,13 +245,17 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;

if (kvstoreSize(db->expires)) dbs_performed++;
if (db && kvstoreSize(db->expires)) dbs_performed++;

/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the configured "expire effort". */
do {
if (db == NULL) {
break; /* DB not allocated since it was never used */
}

unsigned long num;
iteration++;

Expand Down Expand Up @@ -421,11 +425,11 @@ void expireReplicaKeys(void) {
int dbid = 0;
while (dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
serverDb *db = server.db + dbid;
robj *expire = dbFindExpires(db, keyname);
serverDb *db = server.db[dbid];
robj *expire = db == NULL ? NULL : dbFindExpires(db, keyname);
int expired = 0;

if (expire && activeExpireCycleTryExpire(server.db + dbid, expire, start)) {
if (expire && activeExpireCycleTryExpire(db, expire, start)) {
expired = 1;
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
* but they might propagate to AOF) and trigger module hooks. */
Expand Down
4 changes: 2 additions & 2 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -1357,8 +1357,8 @@ struct serverMemOverhead *getMemoryOverheadData(void) {
mem_total += mh->functions_caches;

for (j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (!kvstoreNumAllocatedHashtables(db->keys)) continue;
serverDb *db = server.db[j];
if (db == NULL || !kvstoreNumAllocatedHashtables(db->keys)) continue;

unsigned long long keyscount = kvstoreSize(db->keys);

Expand Down
Loading

0 comments on commit 33da0ea

Please sign in to comment.