From c9d22947880ece9f03bf1eb7678e4637f6e59150 Mon Sep 17 00:00:00 2001 From: NadavGigi Date: Thu, 16 Jan 2025 09:52:58 +0000 Subject: [PATCH] Prefetching values for optimized iteration --- src/acl.c | 8 +-- src/aof.c | 4 +- src/cluster.c | 2 +- src/cluster_legacy.c | 2 +- src/db.c | 4 +- src/debug.c | 4 +- src/hashtable.c | 114 +++++++++++++++++++++++++------------- src/hashtable.h | 13 +++-- src/kvstore.c | 22 +++----- src/kvstore.h | 5 +- src/latency.c | 4 +- src/module.c | 4 +- src/object.c | 8 +-- src/pubsub.c | 4 +- src/rdb.c | 6 +- src/server.c | 28 +++++++--- src/sort.c | 2 +- src/t_hash.c | 2 +- src/t_set.c | 4 +- src/t_zset.c | 6 +- src/unit/test_hashtable.c | 6 +- src/unit/test_kvstore.c | 8 +-- 22 files changed, 150 insertions(+), 110 deletions(-) diff --git a/src/acl.c b/src/acl.c index 184fa54116..807ef744d2 100644 --- a/src/acl.c +++ b/src/acl.c @@ -655,7 +655,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int ACLResetFirstArgsForCommand(selector, id); if (cmd->subcommands_ht) { hashtableIterator iter; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; @@ -673,7 +673,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int * found and the operation was performed. */ void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *selector, uint64_t cflag, int value) { hashtableIterator iter; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; @@ -741,7 +741,7 @@ void ACLCountCategoryBitsForCommands(hashtable *commands, unsigned long *off, uint64_t cflag) { hashtableIterator iter; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; @@ -2765,7 +2765,7 @@ sds getAclErrorMessage(int acl_res, user *user, struct serverCommand *cmd, sds e /* ACL CAT category */ void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arraylen) { hashtableIterator iter; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; diff --git a/src/aof.c b/src/aof.c index 024cdb2771..c6828d4b6e 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1891,7 +1891,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; hashtableIterator iter; - hashtableInitIterator(&iter, zs->ht); + hashtableInitIterator(&iter, zs->ht, 0); void *next; while (hashtableNext(&iter, &next)) { zskiplistNode *node = next; @@ -2217,7 +2217,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr; if (rioWriteBulkLongLong(aof, j) == 0) goto werr; - kvs_it = kvstoreIteratorInit(db->keys); + kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); /* Iterate this DB writing every entry */ void *next; while (kvstoreIteratorNext(kvs_it, &next)) { diff --git a/src/cluster.c b/src/cluster.c index 309279e0be..cedcd9ecb1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -910,7 +910,7 @@ void clusterCommand(client *c) { unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c, numkeys); kvstoreHashtableIterator *kvs_di = NULL; - kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot); + kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0); for (unsigned int i = 0; i < numkeys; i++) { void *next; serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next)); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 94d3532dfc..5e976d3060 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6347,7 +6347,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { kvstoreHashtableIterator *kvs_di = NULL; void *next; - kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot); + kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &next)) { robj *valkey = next; enterExecutionUnit(1, 0); diff --git a/src/db.c b/src/db.c index 535d493954..f2a000030b 100644 --- a/src/db.c +++ b/src/db.c @@ -895,9 +895,9 @@ void keysCommand(client *c) { kvstoreHashtableIterator *kvs_di = NULL; kvstoreIterator *kvs_it = NULL; if (pslot != -1) { - kvs_di = kvstoreGetHashtableSafeIterator(c->db->keys, pslot); + kvs_di = kvstoreGetHashtableIterator(c->db->keys, pslot, HASHTABLE_ITER_SAFE); } else { - kvs_it = kvstoreIteratorInit(c->db->keys); + kvs_it = kvstoreIteratorInit(c->db->keys, HASHTABLE_ITER_SAFE); } void *next; while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) { diff --git a/src/debug.c b/src/debug.c index 915e0c264d..b7f8df04fa 100644 --- a/src/debug.c +++ b/src/debug.c @@ -207,7 +207,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o) } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; hashtableIterator iter; - hashtableInitIterator(&iter, zs->ht); + hashtableInitIterator(&iter, zs->ht, 0); void *next; while (hashtableNext(&iter, &next)) { @@ -291,7 +291,7 @@ void computeDatasetDigest(unsigned char *final) { for (int j = 0; j < server.dbnum; j++) { serverDb *db = server.db + j; if (kvstoreSize(db->keys) == 0) continue; - kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys); + kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); /* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */ aux = htonl(j); diff --git a/src/hashtable.c b/src/hashtable.c index 3f1eff19c1..5392dd0fbc 100644 --- a/src/hashtable.c +++ b/src/hashtable.c @@ -300,7 +300,7 @@ typedef struct { long index; uint16_t pos_in_bucket; uint8_t table; - uint8_t safe; + uint8_t flags; union { /* Unsafe iterator fingerprint for misuse detection. */ uint64_t fingerprint; @@ -379,6 +379,12 @@ static inline int compareKeys(hashtable *ht, const void *key1, const void *key2) } } +static inline void entryPrefetchValue(hashtable *ht, const void *entry) { + if (ht->type->entryPrefetchValue != NULL) { + ht->type->entryPrefetchValue(entry); + } +} + static inline const void *entryGetKey(hashtable *ht, const void *entry) { if (ht->type->entryGetKey != NULL) { return ht->type->entryGetKey(entry); @@ -936,6 +942,7 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin /* Prefetches all filled entries in the given bucket to optimize future memory access. */ static void prefetchBucketEntries(bucket *b) { + if (!b->presence) return; for (int pos = 0; pos < numBucketPositions(b); pos++) { if (isPositionFilled(b, pos)) { valkey_prefetch(b->entries[pos]); @@ -979,6 +986,24 @@ static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) { } } +/* Prefetches the values associated with filled entries in the given bucket. */ +static void prefetchBucketValues(bucket *b, hashtable *ht) { + if (!b->presence) return; + for (int pos = 0; pos < numBucketPositions(b); pos++) { + if (isPositionFilled(b, pos)) { + entryPrefetchValue(ht, b->entries[pos]); + } + } +} + +static inline int isSafe(iter *iter) { + return (iter->flags & HASHTABLE_ITER_SAFE); +} + +static inline int isPrefetchingValues(iter *iter) { + return (iter->flags & HASHTABLE_ITER_PREFETCH_VALUES); +} + /* --- API functions --- */ /* Allocates and initializes a new hashtable specified by the given type. */ @@ -1792,31 +1817,32 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f /* --- Iterator --- */ -/* Initialize a iterator, that is not allowed to insert, delete or even lookup - * entries in the hashtable, because such operations can trigger incremental - * rehashing which moves entries around and confuses the iterator. Only - * hashtableNext is allowed. Each entry is returned exactly once. Call - * hashtableResetIterator when you are done. See also - * hashtableInitSafeIterator. */ -void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) { - iter *iter; - iter = iteratorFromOpaque(iterator); - iter->hashtable = ht; - iter->table = 0; - iter->index = -1; - iter->safe = 0; -} - -/* Initialize a safe iterator, which is allowed to modify the hash table while - * iterating. It pauses incremental rehashing to prevent entries from moving - * around. Call hashtableNext to fetch each entry. You must call - * hashtableResetIterator when you are done with a safe iterator. +/* Initialize an iterator for a hashtable. + * + * The 'flags' argument can be used to tweak the behaviour. It's a bitwise-or + * (zero means no flags) of the following: + * + * - HASHTABLE_ITER_SAFE: Use a safe iterator that can handle concurrent + * modifications to the hash table. + * - HASHTABLE_ITER_PREFETCH_VALUES: Enables prefetching of entries values, + * which can improve performance in some scenarios. Because the hashtable is generic and + * doesn't care which object we store, the callback entryPrefetchValue helps + * us prefetch necessary fields of specific object types stored in the hashtable. * - * It's allowed to insert and replace entries. Deleting entries is only allowed - * for the entry that was just returned by hashtableNext. Deleting other entries - * is possible, but doing so can cause internal fragmentation, so don't. + * For a non-safe iterator (default, when HASHTABLE_ITER_SAFE is not set): + * It is not allowed to insert, delete or even lookup entries in the hashtable, + * because such operations can trigger incremental rehashing which moves entries + * around and confuses the iterator. Only hashtableNext is allowed. Each entry + * is returned exactly once. * - * Guarantees: + * For a safe iterator (when HASHTABLE_ITER_SAFE is set): + * It is allowed to modify the hash table while iterating. It pauses incremental + * rehashing to prevent entries from moving around. It's allowed to insert and + * replace entries. Deleting entries is only allowed for the entry that was just + * returned by hashtableNext. Deleting other entries is possible, but doing so + * can cause internal fragmentation, so don't. + * + * Guarantees for safe iterators: * * - Entries that are in the hash table for the entire iteration are returned * exactly once. @@ -1829,18 +1855,31 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) { * * - Entries that are inserted during the iteration may or may not be returned * by the iterator. + * + * Call hashtableNext to fetch each entry. You must call hashtableResetIterator + * when you are done with the iterator. */ -void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) { - hashtableInitIterator(iterator, ht); +void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht, uint8_t flags) { + iter *iter; + iter = iteratorFromOpaque(iterator); + iter->hashtable = ht; + iter->table = 0; + iter->index = -1; + iter->flags = flags; +} + +/* Reinitializes the iterator for the provided hashtable while + * preserving the flags from its previous initialization. */ +void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht) { iter *iter = iteratorFromOpaque(iterator); - iter->safe = 1; + hashtableInitIterator(iterator, ht, iter->flags); } /* Resets a stack-allocated iterator. */ void hashtableResetIterator(hashtableIterator *iterator) { iter *iter = iteratorFromOpaque(iterator); if (!(iter->index == -1 && iter->table == 0)) { - if (iter->safe) { + if (isSafe(iter)) { hashtableResumeRehashing(iter->hashtable); assert(iter->hashtable->pause_rehash >= 0); } else { @@ -1850,21 +1889,13 @@ void hashtableResetIterator(hashtableIterator *iterator) { } /* Allocates and initializes an iterator. */ -hashtableIterator *hashtableCreateIterator(hashtable *ht) { +hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags) { iter *iter = zmalloc(sizeof(*iter)); hashtableIterator *opaque = iteratorToOpaque(iter); - hashtableInitIterator(opaque, ht); + hashtableInitIterator(opaque, ht, flags); return opaque; } -/* Allocates and initializes a safe iterator. */ -hashtableIterator *hashtableCreateSafeIterator(hashtable *ht) { - hashtableIterator *iterator = hashtableCreateIterator(ht); - iter *iter = iteratorFromOpaque(iterator); - iter->safe = 1; - return iterator; -} - /* Resets and frees the memory of an allocated iterator, i.e. one created using * hashtableCreate(Safe)Iterator. */ void hashtableReleaseIterator(hashtableIterator *iterator) { @@ -1880,7 +1911,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) { while (1) { if (iter->index == -1 && iter->table == 0) { /* It's the first call to next. */ - if (iter->safe) { + if (isSafe(iter)) { hashtablePauseRehashing(iter->hashtable); iter->last_seen_size = iter->hashtable->used[iter->table]; } else { @@ -1907,7 +1938,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) { iter->bucket = getChildBucket(iter->bucket); } else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) { /* Bucket index done. */ - if (iter->safe) { + if (isSafe(iter)) { /* If entries in this bucket chain have been deleted, * they've left empty spaces in the buckets. The chain is * not automatically compacted when rehashing is paused. If @@ -1936,6 +1967,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) { } bucket *b = iter->bucket; if (iter->pos_in_bucket == 0) { + if (isPrefetchingValues(iter)) { + prefetchBucketValues(b, iter->hashtable); + } prefetchNextBucketEntries(iter, b); } if (!isPositionFilled(b, iter->pos_in_bucket)) { diff --git a/src/hashtable.h b/src/hashtable.h index 4291cf5a5d..3e0c47a5c3 100644 --- a/src/hashtable.h +++ b/src/hashtable.h @@ -48,6 +48,8 @@ typedef uint64_t hashtableIncrementalFindState[5]; * optional. With all callbacks omitted, the hashtable is effectively a set of * pointer-sized integers. */ typedef struct { + /* Callback to prefetch the value associated with a hashtable entry. */ + void (*entryPrefetchValue)(const void *entry); /* If the type of an entry is not the same as the type of a key used for * lookup, this callback needs to return the key within an entry. */ const void *(*entryGetKey)(const void *entry); @@ -91,6 +93,10 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry); /* Scan flags */ #define HASHTABLE_SCAN_EMIT_REF (1 << 0) +/* Iterator flags */ +#define HASHTABLE_ITER_SAFE (1 << 0) +#define HASHTABLE_ITER_PREFETCH_VALUES (1 << 1) + /* --- Prototypes --- */ /* Hash function (global seed) */ @@ -144,11 +150,10 @@ int hashtableIncrementalFindGetResult(hashtableIncrementalFindState *state, void /* Iteration & scan */ size_t hashtableScan(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata); size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata, void *(*defragfn)(void *), int flags); -void hashtableInitIterator(hashtableIterator *iter, hashtable *ht); -void hashtableInitSafeIterator(hashtableIterator *iter, hashtable *ht); +void hashtableInitIterator(hashtableIterator *iter, hashtable *ht, uint8_t flags); +void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht); void hashtableResetIterator(hashtableIterator *iter); -hashtableIterator *hashtableCreateIterator(hashtable *ht); -hashtableIterator *hashtableCreateSafeIterator(hashtable *ht); +hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags); void hashtableReleaseIterator(hashtableIterator *iter); int hashtableNext(hashtableIterator *iter, void **elemptr); diff --git a/src/kvstore.c b/src/kvstore.c index d6db4d3fe1..76bfb35d98 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -467,7 +467,7 @@ void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) { hashtableStats *mainHtStats = NULL; hashtableStats *rehashHtStats = NULL; hashtable *ht; - kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs); + kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs, HASHTABLE_ITER_SAFE); while ((ht = kvstoreIteratorNextHashtable(kvs_it))) { hashtableStats *stats = hashtableGetStatsHt(ht, 0, full); if (!mainHtStats) { @@ -576,12 +576,12 @@ int kvstoreNumHashtables(kvstore *kvs) { /* Returns kvstore iterator that can be used to iterate through sub-hash tables. * * The caller should free the resulting kvs_it with kvstoreIteratorRelease. */ -kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) { +kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags) { kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it)); kvs_it->kvs = kvs; kvs_it->didx = -1; kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); /* Finds first non-empty hashtable index. */ - hashtableInitSafeIterator(&kvs_it->di, NULL); + hashtableInitIterator(&kvs_it->di, NULL, flags); return kvs_it; } @@ -625,7 +625,7 @@ int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) { /* No current hashtable or reached the end of the hash table. */ hashtable *ht = kvstoreIteratorNextHashtable(kvs_it); if (!ht) return 0; - hashtableInitSafeIterator(&kvs_it->di, ht); + hashtableReinitIterator(&kvs_it->di, ht); return hashtableNext(&kvs_it->di, next); } } @@ -691,23 +691,15 @@ unsigned long kvstoreHashtableSize(kvstore *kvs, int didx) { return hashtableSize(ht); } -kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx) { +kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, uint8_t flags) { kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di)); kvs_di->kvs = kvs; kvs_di->didx = didx; - hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx)); + hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx), flags); return kvs_di; } -kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx) { - kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di)); - kvs_di->kvs = kvs; - kvs_di->didx = didx; - hashtableInitSafeIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx)); - return kvs_di; -} - -/* Free the kvs_di returned by kvstoreGetHashtableIterator and kvstoreGetHashtableSafeIterator. */ +/* Free the kvs_di returned by kvstoreGetHashtableIterator. */ void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_di) { /* The hashtable may be deleted during the iteration process, so here need to check for NULL. */ if (kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx)) { diff --git a/src/kvstore.h b/src/kvstore.h index 1a8c74a6b9..d5db1a89aa 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -43,7 +43,7 @@ void kvstoreHashtableTrackMemUsage(hashtable *s, ssize_t delta); size_t kvstoreHashtableMetadataSize(void); /* kvstore iterator specific functions */ -kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); +kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, uint8_t flags); void kvstoreIteratorRelease(kvstoreIterator *kvs_it); int kvstoreIteratorGetCurrentHashtableIndex(kvstoreIterator *kvs_it); int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next); @@ -57,8 +57,7 @@ unsigned long kvstoreHashtableRehashingCount(kvstore *kvs); /* Specific hashtable access by hashtable-index */ unsigned long kvstoreHashtableSize(kvstore *kvs, int didx); -kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx); -kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx); +kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, uint8_t flags); void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_id); int kvstoreHashtableIteratorNext(kvstoreHashtableIterator *kvs_di, void **next); int kvstoreHashtableRandomEntry(kvstore *kvs, int didx, void **found); diff --git a/src/latency.c b/src/latency.c index 2beb4859d1..fa448dac35 100644 --- a/src/latency.c +++ b/src/latency.c @@ -528,7 +528,7 @@ void fillCommandCDF(client *c, struct hdr_histogram *histogram) { * a per command cumulative distribution of latencies. */ void latencyAllCommandsFillCDF(client *c, hashtable *commands, int *command_with_data) { hashtableIterator iter; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; @@ -565,7 +565,7 @@ void latencySpecificCommandsFillCDF(client *c) { if (cmd->subcommands_ht) { hashtableIterator iter; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); void *next; while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; diff --git a/src/module.c b/src/module.c index 75dcd81cd6..17ac4ddf02 100644 --- a/src/module.c +++ b/src/module.c @@ -12162,7 +12162,7 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) { if (cmd->subcommands_ht) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; if (moduleFreeCommand(module, sub) != C_OK) continue; @@ -12185,7 +12185,7 @@ void moduleUnregisterCommands(struct ValkeyModule *module) { /* Unregister all the commands registered by this module. */ hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; if (moduleFreeCommand(module, cmd) != C_OK) continue; diff --git a/src/object.c b/src/object.c index b8200dd815..94c2985edb 100644 --- a/src/object.c +++ b/src/object.c @@ -630,7 +630,7 @@ void dismissSetObject(robj *o, size_t size_hint) { * page size, and there's a high chance we'll actually dismiss something. */ if (size_hint / hashtableSize(ht) >= server.page_size) { hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { sds item = next; @@ -682,7 +682,7 @@ void dismissHashObject(robj *o, size_t size_hint) { * a page size, and there's a high chance we'll actually dismiss something. */ if (size_hint / hashtableSize(ht) >= server.page_size) { hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { dismissHashTypeEntry(next); @@ -1156,7 +1156,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { asize = sizeof(*o) + hashtableMemUsage(ht); hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next) && samples < sample_size) { sds element = next; @@ -1197,7 +1197,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } else if (o->encoding == OBJ_ENCODING_HASHTABLE) { hashtable *ht = o->ptr; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; asize = sizeof(*o) + hashtableMemUsage(ht); diff --git a/src/pubsub.c b/src/pubsub.c index 27b5611788..be6c739e98 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -366,7 +366,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) { if (!kvstoreHashtableSize(server.pubsubshard_channels, slot)) return; - kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableSafeIterator(server.pubsubshard_channels, slot); + kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(server.pubsubshard_channels, slot, HASHTABLE_ITER_SAFE); void *element; while (kvstoreHashtableIteratorNext(kvs_di, &element)) { dict *clients = element; @@ -730,7 +730,7 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) { replylen = addReplyDeferredLen(c); for (unsigned int i = 0; i < slot_cnt; i++) { if (!kvstoreHashtableSize(pubsub_channels, i)) continue; - kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(pubsub_channels, i); + kvstoreHashtableIterator *kvs_di = kvstoreGetHashtableIterator(pubsub_channels, i, 0); void *next; while (kvstoreHashtableIteratorNext(kvs_di, &next)) { dict *clients = next; diff --git a/src/rdb.c b/src/rdb.c index 0bb5d7d45d..6653e99c3a 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -887,7 +887,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; hashtableIterator iterator; - hashtableInitIterator(&iterator, set); + hashtableInitIterator(&iterator, set, 0); void *next; while (hashtableNext(&iterator, &next)) { sds ele = next; @@ -959,7 +959,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { sds field = hashTypeEntryGetField(next); @@ -1349,7 +1349,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { if ((res = rdbSaveLen(rdb, expires_size)) < 0) goto werr; written += res; - kvs_it = kvstoreIteratorInit(db->keys); + kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES); int last_slot = -1; /* Iterate this DB writing every entry */ void *next; diff --git a/src/server.c b/src/server.c index 144841eff9..2799279618 100644 --- a/src/server.c +++ b/src/server.c @@ -577,6 +577,14 @@ const void *hashtableObjectGetKey(const void *entry) { return objectGetKey(entry); } +/* Prefetch the value if it's not embedded. */ +void hashtableObjectPrefetchValue(const void *entry) { + const robj *obj = entry; + if (obj->encoding != OBJ_ENCODING_EMBSTR) { + valkey_prefetch(obj->ptr); + } +} + int hashtableObjKeyCompare(const void *key1, const void *key2) { const robj *o1 = key1, *o2 = key2; return hashtableSdsKeyCompare(o1->ptr, o2->ptr); @@ -589,6 +597,7 @@ void hashtableObjectDestructor(void *val) { /* Kvstore->keys, keys are sds strings, vals are Objects. */ hashtableType kvstoreKeysHashtableType = { + .entryPrefetchValue = hashtableObjectPrefetchValue, .entryGetKey = hashtableObjectGetKey, .hashFunction = hashtableSdsHash, .keyCompare = hashtableSdsKeyCompare, @@ -602,6 +611,7 @@ hashtableType kvstoreKeysHashtableType = { /* Kvstore->expires */ hashtableType kvstoreExpiresHashtableType = { + .entryPrefetchValue = hashtableObjectPrefetchValue, .entryGetKey = hashtableObjectGetKey, .hashFunction = hashtableSdsHash, .keyCompare = hashtableSdsKeyCompare, @@ -3205,7 +3215,7 @@ void populateCommandTable(void) { void resetCommandTableStats(hashtable *commands) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *c = next; c->microseconds = 0; @@ -4988,7 +4998,7 @@ void addReplyCommandSubCommands(client *c, void *next; hashtableIterator iter; - hashtableInitSafeIterator(&iter, cmd->subcommands_ht); + hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *sub = next; if (use_map) addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname)); @@ -5150,7 +5160,7 @@ void commandCommand(client *c) { hashtableIterator iter; void *next; addReplyArrayLen(c, hashtableSize(server.commands)); - hashtableInitIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyCommandInfo(c, cmd); @@ -5209,7 +5219,7 @@ int shouldFilterFromCommandList(struct serverCommand *cmd, commandListFilter *fi void commandListWithFilter(client *c, hashtable *commands, commandListFilter filter, int *numcmds) { hashtableIterator iter; void *next; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; if (!shouldFilterFromCommandList(cmd, &filter)) { @@ -5228,7 +5238,7 @@ void commandListWithFilter(client *c, hashtable *commands, commandListFilter fil void commandListWithoutFilter(client *c, hashtable *commands, int *numcmds) { hashtableIterator iter; void *next; - hashtableInitIterator(&iter, commands); + hashtableInitIterator(&iter, commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname)); @@ -5290,7 +5300,7 @@ void commandInfoCommand(client *c) { hashtableIterator iter; void *next; addReplyArrayLen(c, hashtableSize(server.commands)); - hashtableInitIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyCommandInfo(c, cmd); @@ -5312,7 +5322,7 @@ void commandDocsCommand(client *c) { hashtableIterator iter; void *next; addReplyMapLen(c, hashtableSize(server.commands)); - hashtableInitIterator(&iter, server.commands); + hashtableInitIterator(&iter, server.commands, 0); while (hashtableNext(&iter, &next)) { struct serverCommand *cmd = next; addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname)); @@ -5441,7 +5451,7 @@ const char *getSafeInfoString(const char *s, size_t len, char **tmp) { sds genValkeyInfoStringCommandStats(sds info, hashtable *commands) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *c = next; char *tmpsafe; @@ -5478,7 +5488,7 @@ sds genValkeyInfoStringACLStats(sds info) { sds genValkeyInfoStringLatencyStats(sds info, hashtable *commands) { hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, commands); + hashtableInitIterator(&iter, commands, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { struct serverCommand *c = next; char *tmpsafe; diff --git a/src/sort.c b/src/sort.c index 7af96141e8..754ebef4a2 100644 --- a/src/sort.c +++ b/src/sort.c @@ -447,7 +447,7 @@ void sortCommandGeneric(client *c, int readonly) { } else if (sortval->type == OBJ_ZSET) { hashtable *ht = ((zset *)sortval->ptr)->ht; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { zskiplistNode *node = next; diff --git a/src/t_hash.c b/src/t_hash.c index b6e6457bb6..b347ecf31f 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -382,7 +382,7 @@ void hashTypeInitIterator(robj *subject, hashTypeIterator *hi) { hi->fptr = NULL; hi->vptr = NULL; } else if (hi->encoding == OBJ_ENCODING_HASHTABLE) { - hashtableInitIterator(&hi->iter, subject->ptr); + hashtableInitIterator(&hi->iter, subject->ptr, 0); } else { serverPanic("Unknown hash encoding"); } diff --git a/src/t_set.c b/src/t_set.c index 4279baf82f..a69345de4f 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -317,7 +317,7 @@ setTypeIterator *setTypeInitIterator(robj *subject) { si->subject = subject; si->encoding = subject->encoding; if (si->encoding == OBJ_ENCODING_HASHTABLE) { - si->hashtable_iterator = hashtableCreateIterator(subject->ptr); + si->hashtable_iterator = hashtableCreateIterator(subject->ptr, 0); } else if (si->encoding == OBJ_ENCODING_INTSET) { si->ii = 0; } else if (si->encoding == OBJ_ENCODING_LISTPACK) { @@ -1179,7 +1179,7 @@ void srandmemberWithCountCommand(client *c) { /* CASE 3 & 4: send the result to the user. */ { hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); addReplyArrayLen(c, count); serverAssert(count == hashtableSize(ht)); diff --git a/src/t_zset.c b/src/t_zset.c index 77c96613b7..2444f3ecd0 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -2092,7 +2092,7 @@ static void zuiInitIterator(zsetopsrc *op) { it->is.is = op->subject->ptr; it->is.ii = 0; } else if (op->encoding == OBJ_ENCODING_HASHTABLE) { - it->ht.iter = hashtableCreateIterator(op->subject->ptr); + it->ht.iter = hashtableCreateIterator(op->subject->ptr, 0); } else if (op->encoding == OBJ_ENCODING_LISTPACK) { it->lp.lp = op->subject->ptr; it->lp.p = lpFirst(it->lp.lp); @@ -2349,7 +2349,7 @@ static size_t zsetHashtableGetMaxElementLength(hashtable *ht, size_t *totallen) size_t maxelelen = 0; hashtableIterator iter; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); void *next; while (hashtableNext(&iter, &next)) { zskiplistNode *node = next; @@ -2749,7 +2749,7 @@ static void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIn /* Step 2: Create the skiplist using final score ordering */ hashtableIterator iter; - hashtableInitIterator(&iter, dstzset->ht); + hashtableInitIterator(&iter, dstzset->ht, 0); void *next; while (hashtableNext(&iter, &next)) { diff --git a/src/unit/test_hashtable.c b/src/unit/test_hashtable.c index 689440e43d..71a7dde841 100644 --- a/src/unit/test_hashtable.c +++ b/src/unit/test_hashtable.c @@ -547,7 +547,7 @@ int test_iterator(int argc, char **argv, int flags) { size_t num_returned = 0; hashtableIterator iter; void *next; - hashtableInitIterator(&iter, ht); + hashtableInitIterator(&iter, ht, 0); while (hashtableNext(&iter, &next)) { uint8_t *entry = next; num_returned++; @@ -592,7 +592,7 @@ int test_safe_iterator(int argc, char **argv, int flags) { size_t num_returned = 0; hashtableIterator iter; void *next; - hashtableInitSafeIterator(&iter, ht); + hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SAFE); while (hashtableNext(&iter, &next)) { uint8_t *entry = next; size_t index = entry - entry_counts; @@ -657,7 +657,7 @@ int test_compact_bucket_chain(int argc, char **argv, int flags) { size_t num_chained_buckets = hashtableChainedBuckets(ht, 0); size_t num_returned = 0; hashtableIterator iter; - hashtableInitSafeIterator(&iter, ht); + hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SAFE); void *entry; while (hashtableNext(&iter, &entry)) { /* As long as the iterator is still returning entries from the same diff --git a/src/unit/test_kvstore.c b/src/unit/test_kvstore.c index d4cc91d6d8..55b311c4ba 100644 --- a/src/unit/test_kvstore.c +++ b/src/unit/test_kvstore.c @@ -77,7 +77,7 @@ int test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, char **arg TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i))); } - kvs_it = kvstoreIteratorInit(kvs1); + kvs_it = kvstoreIteratorInit(kvs1, HASHTABLE_ITER_SAFE); while (kvstoreIteratorNext(kvs_it, &key)) { curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it); TEST_ASSERT(kvstoreHashtableDelete(kvs1, curr_slot, key)); @@ -110,7 +110,7 @@ int test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, char **argv, TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i))); } - kvs_it = kvstoreIteratorInit(kvs2); + kvs_it = kvstoreIteratorInit(kvs2, HASHTABLE_ITER_SAFE); while (kvstoreIteratorNext(kvs_it, &key)) { curr_slot = kvstoreIteratorGetCurrentHashtableIndex(kvs_it); TEST_ASSERT(kvstoreHashtableDelete(kvs2, curr_slot, key)); @@ -146,7 +146,7 @@ int test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, c TEST_ASSERT(kvstoreHashtableAdd(kvs1, didx, stringFromInt(i))); } - kvs_di = kvstoreGetHashtableSafeIterator(kvs1, didx); + kvs_di = kvstoreGetHashtableIterator(kvs1, didx, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &key)) { TEST_ASSERT(kvstoreHashtableDelete(kvs1, didx, key)); } @@ -177,7 +177,7 @@ int test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, cha TEST_ASSERT(kvstoreHashtableAdd(kvs2, didx, stringFromInt(i))); } - kvs_di = kvstoreGetHashtableSafeIterator(kvs2, didx); + kvs_di = kvstoreGetHashtableIterator(kvs2, didx, HASHTABLE_ITER_SAFE); while (kvstoreHashtableIteratorNext(kvs_di, &key)) { TEST_ASSERT(kvstoreHashtableDelete(kvs2, didx, key)); }