Skip to content

Commit

Permalink
Prefetching values for optimized iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
NadavGigi committed Jan 20, 2025
1 parent dd92d07 commit 38c8eec
Show file tree
Hide file tree
Showing 22 changed files with 150 additions and 110 deletions.
8 changes: 4 additions & 4 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
ACLResetFirstArgsForCommand(selector, id);
if (cmd->subcommands_ht) {
hashtableIterator iter;
hashtableInitSafeIterator(&iter, cmd->subcommands_ht);
hashtableInitIterator(&iter, cmd->subcommands_ht, HASHTABLE_ITER_SAFE);
void *next;
while (hashtableNext(&iter, &next)) {
struct serverCommand *sub = next;
Expand All @@ -673,7 +673,7 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
* found and the operation was performed. */
void ACLSetSelectorCommandBitsForCategory(hashtable *commands, aclSelector *selector, uint64_t cflag, int value) {
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
hashtableInitIterator(&iter, commands, 0);
void *next;
while (hashtableNext(&iter, &next)) {
struct serverCommand *cmd = next;
Expand Down Expand Up @@ -741,7 +741,7 @@ void ACLCountCategoryBitsForCommands(hashtable *commands,
unsigned long *off,
uint64_t cflag) {
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
hashtableInitIterator(&iter, commands, 0);
void *next;
while (hashtableNext(&iter, &next)) {
struct serverCommand *cmd = next;
Expand Down Expand Up @@ -2765,7 +2765,7 @@ sds getAclErrorMessage(int acl_res, user *user, struct serverCommand *cmd, sds e
/* ACL CAT category */
void aclCatWithFlags(client *c, hashtable *commands, uint64_t cflag, int *arraylen) {
hashtableIterator iter;
hashtableInitIterator(&iter, commands);
hashtableInitIterator(&iter, commands, 0);
void *next;
while (hashtableNext(&iter, &next)) {
struct serverCommand *cmd = next;
Expand Down
4 changes: 2 additions & 2 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
hashtableInitIterator(&iter, zs->ht, 0);
void *next;
while (hashtableNext(&iter, &next)) {
zskiplistNode *node = next;
Expand Down Expand Up @@ -2217,7 +2217,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (rioWrite(aof, selectcmd, sizeof(selectcmd) - 1) == 0) goto werr;
if (rioWriteBulkLongLong(aof, j) == 0) goto werr;

kvs_it = kvstoreIteratorInit(db->keys);
kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);
/* Iterate this DB writing every entry */
void *next;
while (kvstoreIteratorNext(kvs_it, &next)) {
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ void clusterCommand(client *c) {
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreHashtableIterator *kvs_di = NULL;
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot);
kvs_di = kvstoreGetHashtableIterator(server.db->keys, slot, 0);
for (unsigned int i = 0; i < numkeys; i++) {
void *next;
serverAssert(kvstoreHashtableIteratorNext(kvs_di, &next));
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6347,7 +6347,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {

kvstoreHashtableIterator *kvs_di = NULL;
void *next;
kvs_di = kvstoreGetHashtableSafeIterator(server.db->keys, hashslot);
kvs_di = kvstoreGetHashtableIterator(server.db->keys, hashslot, HASHTABLE_ITER_SAFE);
while (kvstoreHashtableIteratorNext(kvs_di, &next)) {
robj *valkey = next;
enterExecutionUnit(1, 0);
Expand Down
4 changes: 2 additions & 2 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,9 @@ void keysCommand(client *c) {
kvstoreHashtableIterator *kvs_di = NULL;
kvstoreIterator *kvs_it = NULL;
if (pslot != -1) {
kvs_di = kvstoreGetHashtableSafeIterator(c->db->keys, pslot);
kvs_di = kvstoreGetHashtableIterator(c->db->keys, pslot, HASHTABLE_ITER_SAFE);
} else {
kvs_it = kvstoreIteratorInit(c->db->keys);
kvs_it = kvstoreIteratorInit(c->db->keys, HASHTABLE_ITER_SAFE);
}
void *next;
while (kvs_di ? kvstoreHashtableIteratorNext(kvs_di, &next) : kvstoreIteratorNext(kvs_it, &next)) {
Expand Down
4 changes: 2 additions & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
hashtableIterator iter;
hashtableInitIterator(&iter, zs->ht);
hashtableInitIterator(&iter, zs->ht, 0);

void *next;
while (hashtableNext(&iter, &next)) {
Expand Down Expand Up @@ -291,7 +291,7 @@ void computeDatasetDigest(unsigned char *final) {
for (int j = 0; j < server.dbnum; j++) {
serverDb *db = server.db + j;
if (kvstoreSize(db->keys) == 0) continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys);
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys, HASHTABLE_ITER_SAFE | HASHTABLE_ITER_PREFETCH_VALUES);

/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
aux = htonl(j);
Expand Down
114 changes: 74 additions & 40 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ typedef struct {
long index;
uint16_t pos_in_bucket;
uint8_t table;
uint8_t safe;
uint8_t flags;
union {
/* Unsafe iterator fingerprint for misuse detection. */
uint64_t fingerprint;
Expand Down Expand Up @@ -379,6 +379,12 @@ static inline int compareKeys(hashtable *ht, const void *key1, const void *key2)
}
}

static inline void entryPrefetchValue(hashtable *ht, const void *entry) {
if (ht->type->entryPrefetchValue != NULL) {
ht->type->entryPrefetchValue(entry);
}
}

static inline const void *entryGetKey(hashtable *ht, const void *entry) {
if (ht->type->entryGetKey != NULL) {
return ht->type->entryGetKey(entry);
Expand Down Expand Up @@ -936,6 +942,7 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin

/* Prefetches all filled entries in the given bucket to optimize future memory access. */
static void prefetchBucketEntries(bucket *b) {
if (!b->presence) return;
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
valkey_prefetch(b->entries[pos]);
Expand Down Expand Up @@ -979,6 +986,24 @@ static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) {
}
}

/* Prefetches the values associated with filled entries in the given bucket. */
static void prefetchBucketValues(bucket *b, hashtable *ht) {
if (!b->presence) return;
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
entryPrefetchValue(ht, b->entries[pos]);
}
}
}

static inline int isSafe(iter *iter) {
return (iter->flags & HASHTABLE_ITER_SAFE);
}

static inline int isPrefetchingValues(iter *iter) {
return (iter->flags & HASHTABLE_ITER_PREFETCH_VALUES);
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -1792,31 +1817,32 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f

/* --- Iterator --- */

/* Initialize a iterator, that is not allowed to insert, delete or even lookup
* entries in the hashtable, because such operations can trigger incremental
* rehashing which moves entries around and confuses the iterator. Only
* hashtableNext is allowed. Each entry is returned exactly once. Call
* hashtableResetIterator when you are done. See also
* hashtableInitSafeIterator. */
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
iter *iter;
iter = iteratorFromOpaque(iterator);
iter->hashtable = ht;
iter->table = 0;
iter->index = -1;
iter->safe = 0;
}

/* Initialize a safe iterator, which is allowed to modify the hash table while
* iterating. It pauses incremental rehashing to prevent entries from moving
* around. Call hashtableNext to fetch each entry. You must call
* hashtableResetIterator when you are done with a safe iterator.
/* Initialize an iterator for a hashtable.
*
* The 'flags' argument can be used to tweak the behaviour. It's a bitwise-or
* (zero means no flags) of the following:
*
* - HASHTABLE_ITER_SAFE: Use a safe iterator that can handle concurrent
* modifications to the hash table.
* - HASHTABLE_ITER_PREFETCH_VALUES: Enables prefetching of entries values,
* which can improve performance in some scenarios. Because the hashtable is generic and
* doesn't care which object we store, the callback entryPrefetchValue helps
* us prefetch necessary fields of specific object types stored in the hashtable.
*
* It's allowed to insert and replace entries. Deleting entries is only allowed
* for the entry that was just returned by hashtableNext. Deleting other entries
* is possible, but doing so can cause internal fragmentation, so don't.
* For a non-safe iterator (default, when HASHTABLE_ITER_SAFE is not set):
* It is not allowed to insert, delete or even lookup entries in the hashtable,
* because such operations can trigger incremental rehashing which moves entries
* around and confuses the iterator. Only hashtableNext is allowed. Each entry
* is returned exactly once.
*
* Guarantees:
* For a safe iterator (when HASHTABLE_ITER_SAFE is set):
* It is allowed to modify the hash table while iterating. It pauses incremental
* rehashing to prevent entries from moving around. It's allowed to insert and
* replace entries. Deleting entries is only allowed for the entry that was just
* returned by hashtableNext. Deleting other entries is possible, but doing so
* can cause internal fragmentation, so don't.
*
* Guarantees for safe iterators:
*
* - Entries that are in the hash table for the entire iteration are returned
* exactly once.
Expand All @@ -1829,18 +1855,31 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
*
* - Entries that are inserted during the iteration may or may not be returned
* by the iterator.
*
* Call hashtableNext to fetch each entry. You must call hashtableResetIterator
* when you are done with the iterator.
*/
void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) {
hashtableInitIterator(iterator, ht);
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht, uint8_t flags) {
iter *iter;
iter = iteratorFromOpaque(iterator);
iter->hashtable = ht;
iter->table = 0;
iter->index = -1;
iter->flags = flags;
}

/* Reinitializes the iterator for the provided hashtable while
* preserving the flags from its previous initialization. */
void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht) {
iter *iter = iteratorFromOpaque(iterator);
iter->safe = 1;
hashtableInitIterator(iterator, ht, iter->flags);
}

/* Resets a stack-allocated iterator. */
void hashtableResetIterator(hashtableIterator *iterator) {
iter *iter = iteratorFromOpaque(iterator);
if (!(iter->index == -1 && iter->table == 0)) {
if (iter->safe) {
if (isSafe(iter)) {
hashtableResumeRehashing(iter->hashtable);
assert(iter->hashtable->pause_rehash >= 0);
} else {
Expand All @@ -1850,21 +1889,13 @@ void hashtableResetIterator(hashtableIterator *iterator) {
}

/* Allocates and initializes an iterator. */
hashtableIterator *hashtableCreateIterator(hashtable *ht) {
hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags) {
iter *iter = zmalloc(sizeof(*iter));
hashtableIterator *opaque = iteratorToOpaque(iter);
hashtableInitIterator(opaque, ht);
hashtableInitIterator(opaque, ht, flags);
return opaque;
}

/* Allocates and initializes a safe iterator. */
hashtableIterator *hashtableCreateSafeIterator(hashtable *ht) {
hashtableIterator *iterator = hashtableCreateIterator(ht);
iter *iter = iteratorFromOpaque(iterator);
iter->safe = 1;
return iterator;
}

/* Resets and frees the memory of an allocated iterator, i.e. one created using
* hashtableCreate(Safe)Iterator. */
void hashtableReleaseIterator(hashtableIterator *iterator) {
Expand All @@ -1880,7 +1911,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
while (1) {
if (iter->index == -1 && iter->table == 0) {
/* It's the first call to next. */
if (iter->safe) {
if (isSafe(iter)) {
hashtablePauseRehashing(iter->hashtable);
iter->last_seen_size = iter->hashtable->used[iter->table];
} else {
Expand All @@ -1907,7 +1938,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter->bucket = getChildBucket(iter->bucket);
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
/* Bucket index done. */
if (iter->safe) {
if (isSafe(iter)) {
/* If entries in this bucket chain have been deleted,
* they've left empty spaces in the buckets. The chain is
* not automatically compacted when rehashing is paused. If
Expand Down Expand Up @@ -1936,6 +1967,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
}
bucket *b = iter->bucket;
if (iter->pos_in_bucket == 0) {
if (isPrefetchingValues(iter)) {
prefetchBucketValues(b, iter->hashtable);
}
prefetchNextBucketEntries(iter, b);
}
if (!isPositionFilled(b, iter->pos_in_bucket)) {
Expand Down
13 changes: 9 additions & 4 deletions src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ typedef uint64_t hashtableIncrementalFindState[5];
* optional. With all callbacks omitted, the hashtable is effectively a set of
* pointer-sized integers. */
typedef struct {
/* Callback to prefetch the value associated with a hashtable entry. */
void (*entryPrefetchValue)(const void *entry);
/* If the type of an entry is not the same as the type of a key used for
* lookup, this callback needs to return the key within an entry. */
const void *(*entryGetKey)(const void *entry);
Expand Down Expand Up @@ -91,6 +93,10 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry);
/* Scan flags */
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)

/* Iterator flags */
#define HASHTABLE_ITER_SAFE (1 << 0)
#define HASHTABLE_ITER_PREFETCH_VALUES (1 << 1)

/* --- Prototypes --- */

/* Hash function (global seed) */
Expand Down Expand Up @@ -144,11 +150,10 @@ int hashtableIncrementalFindGetResult(hashtableIncrementalFindState *state, void
/* Iteration & scan */
size_t hashtableScan(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata);
size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction fn, void *privdata, void *(*defragfn)(void *), int flags);
void hashtableInitIterator(hashtableIterator *iter, hashtable *ht);
void hashtableInitSafeIterator(hashtableIterator *iter, hashtable *ht);
void hashtableInitIterator(hashtableIterator *iter, hashtable *ht, uint8_t flags);
void hashtableReinitIterator(hashtableIterator *iterator, hashtable *ht);
void hashtableResetIterator(hashtableIterator *iter);
hashtableIterator *hashtableCreateIterator(hashtable *ht);
hashtableIterator *hashtableCreateSafeIterator(hashtable *ht);
hashtableIterator *hashtableCreateIterator(hashtable *ht, uint8_t flags);
void hashtableReleaseIterator(hashtableIterator *iter);
int hashtableNext(hashtableIterator *iter, void **elemptr);

Expand Down
22 changes: 7 additions & 15 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) {
hashtableStats *mainHtStats = NULL;
hashtableStats *rehashHtStats = NULL;
hashtable *ht;
kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs);
kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs, HASHTABLE_ITER_SAFE);
while ((ht = kvstoreIteratorNextHashtable(kvs_it))) {
hashtableStats *stats = hashtableGetStatsHt(ht, 0, full);
if (!mainHtStats) {
Expand Down Expand Up @@ -576,12 +576,12 @@ int kvstoreNumHashtables(kvstore *kvs) {
/* Returns kvstore iterator that can be used to iterate through sub-hash tables.
*
* The caller should free the resulting kvs_it with kvstoreIteratorRelease. */
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) {
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs, int flags) {
kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it));
kvs_it->kvs = kvs;
kvs_it->didx = -1;
kvs_it->next_didx = kvstoreGetFirstNonEmptyHashtableIndex(kvs_it->kvs); /* Finds first non-empty hashtable index. */
hashtableInitSafeIterator(&kvs_it->di, NULL);
hashtableInitIterator(&kvs_it->di, NULL, flags);
return kvs_it;
}

Expand Down Expand Up @@ -625,7 +625,7 @@ int kvstoreIteratorNext(kvstoreIterator *kvs_it, void **next) {
/* No current hashtable or reached the end of the hash table. */
hashtable *ht = kvstoreIteratorNextHashtable(kvs_it);
if (!ht) return 0;
hashtableInitSafeIterator(&kvs_it->di, ht);
hashtableReinitIterator(&kvs_it->di, ht);
return hashtableNext(&kvs_it->di, next);
}
}
Expand Down Expand Up @@ -691,23 +691,15 @@ unsigned long kvstoreHashtableSize(kvstore *kvs, int didx) {
return hashtableSize(ht);
}

kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx) {
kvstoreHashtableIterator *kvstoreGetHashtableIterator(kvstore *kvs, int didx, int flags) {
kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di));
kvs_di->kvs = kvs;
kvs_di->didx = didx;
hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx));
hashtableInitIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx), flags);
return kvs_di;
}

kvstoreHashtableIterator *kvstoreGetHashtableSafeIterator(kvstore *kvs, int didx) {
kvstoreHashtableIterator *kvs_di = zmalloc(sizeof(*kvs_di));
kvs_di->kvs = kvs;
kvs_di->didx = didx;
hashtableInitSafeIterator(&kvs_di->di, kvstoreGetHashtable(kvs, didx));
return kvs_di;
}

/* Free the kvs_di returned by kvstoreGetHashtableIterator and kvstoreGetHashtableSafeIterator. */
/* Free the kvs_di returned by kvstoreGetHashtableIterator. */
void kvstoreReleaseHashtableIterator(kvstoreHashtableIterator *kvs_di) {
/* The hashtable may be deleted during the iteration process, so here need to check for NULL. */
if (kvstoreGetHashtable(kvs_di->kvs, kvs_di->didx)) {
Expand Down
Loading

0 comments on commit 38c8eec

Please sign in to comment.