Skip to content

Commit

Permalink
Replace dict with new hashtable: hash datatype (valkey-io#1502)
Browse files Browse the repository at this point in the history
This PR replaces dict with the new hashtable data structure in the HASH
datatype. There is a new struct for hashtable items which contains a
pointer to value sds string and the embedded key sds string. These
values were previously stored in dictEntry. This structure is kept
opaque so we can easily add small value embedding or other optimizations
in the future.

closes valkey-io#1095

---------

Signed-off-by: Rain Valentine <[email protected]>
  • Loading branch information
SoftlyRaining authored Jan 13, 2025
1 parent dc9ca1b commit d13aad4
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 333 deletions.
6 changes: 3 additions & 3 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
/* Write either the key or the value of the currently selected item of a hash.
* The 'hi' argument passes a valid hash iterator.
* The 'what' filed specifies if to write a key or a value and can be
* either OBJ_HASH_KEY or OBJ_HASH_VALUE.
* either OBJ_HASH_FIELD or OBJ_HASH_VALUE.
*
* The function returns 0 on error, non-zero on success. */
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
Expand All @@ -1936,7 +1936,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
return rioWriteBulkString(r, (char *)vstr, vlen);
else
return rioWriteBulkLongLong(r, vll);
} else if (hi->encoding == OBJ_ENCODING_HT) {
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
sds value = hashTypeCurrentFromHashTable(hi, what);
return rioWriteBulkString(r, value, sdslen(value));
}
Expand All @@ -1963,7 +1963,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
}
}

if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_FIELD) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
hashTypeResetIterator(&hi);
return 0;
}
Expand Down
68 changes: 18 additions & 50 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -979,39 +979,6 @@ void keysScanCallback(void *privdata, void *entry) {

/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void dictScanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
sds val = NULL;
sds key = NULL;
data->sampled++;

/* This callback is only used for scanning elements within a key (hash
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* Filter element if it does not match the pattern. */
sds keysds = dictGetKey(de);
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keysds, sdslen(keysds), 0)) {
return;
}
}

if (o->type == OBJ_HASH) {
key = keysds;
if (!data->only_keys) {
val = dictGetVal(de);
}
} else {
serverPanic("Type not handled in dict SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
sds val = NULL;
Expand All @@ -1025,14 +992,20 @@ void hashtableScanCallback(void *privdata, void *entry) {
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* get key */
/* get key, value */
if (o->type == OBJ_SET) {
key = (sds)entry;
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
/* zset data is copied after filtering by key */
} else if (o->type == OBJ_HASH) {
key = hashTypeEntryGetField(entry);
if (!data->only_keys) {
val = hashTypeEntryGetValue(entry);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
serverPanic("Type not handled in hashtable SCAN callback.");
}

/* Filter element if it does not match the pattern. */
Expand All @@ -1042,9 +1015,9 @@ void hashtableScanCallback(void *privdata, void *entry) {
}
}

if (o->type == OBJ_SET) {
/* no value, key used by reference */
} else if (o->type == OBJ_ZSET) {
/* zset data must be copied. Do this after filtering to avoid unneeded
* allocations. */
if (o->type == OBJ_ZSET) {
/* zset data is copied */
zskiplistNode *node = (zskiplistNode *)entry;
key = sdsdup(node->ele);
Expand All @@ -1053,8 +1026,6 @@ void hashtableScanCallback(void *privdata, void *entry) {
int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

listAddNodeTail(keys, key);
Expand Down Expand Up @@ -1193,20 +1164,19 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* cursor to zero to signal the end of the iteration. */

/* Handle the case of kvstore, dict or hashtable. */
dict *dict_table = NULL;
hashtable *hashtable_table = NULL;
hashtable *ht = NULL;
int shallow_copied_list_items = 0;
if (o == NULL) {
shallow_copied_list_items = 1;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable_table = o->ptr;
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
dict_table = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HASHTABLE) {
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
hashtable_table = zs->ht;
ht = zs->ht;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}
Expand All @@ -1220,7 +1190,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
}

/* For main hash table scan or scannable data structure. */
if (!o || dict_table || hashtable_table) {
if (!o || ht) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
Expand Down Expand Up @@ -1260,10 +1230,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, keysScanCallback, NULL, &data);
} else if (dict_table) {
cursor = dictScan(dict_table, cursor, dictScanCallback, &data);
} else {
cursor = hashtableScan(hashtable_table, cursor, hashtableScanCallback, &data);
cursor = hashtableScan(ht, cursor, hashtableScanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
Expand Down
12 changes: 3 additions & 9 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
sds sdsele;

memset(eledigest, 0, 20);
sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_KEY);
sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_FIELD);
mixDigest(eledigest, sdsele, sdslen(sdsele));
sdsfree(sdsele);
sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_VALUE);
Expand Down Expand Up @@ -923,23 +923,17 @@ void debugCommand(client *c) {
robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr);
if (o == NULL) return;

/* Get the dict reference from the object, if possible. */
dict *d = NULL;
/* Get the hashtable reference from the object, if possible. */
hashtable *ht = NULL;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
ht = zs->ht;
} break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
}

if (d != NULL) {
char buf[4096];
dictGetStats(buf, sizeof(buf), d, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else if (ht != NULL) {
if (ht != NULL) {
char buf[4096];
hashtableGetStats(buf, sizeof(buf), ht, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
Expand Down
63 changes: 31 additions & 32 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,6 @@ void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) {
if (new_sds != NULL) *sds_ref = new_sds;
}

void activeDefragSdsHashtable(hashtable *ht) {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
Expand Down Expand Up @@ -481,26 +474,25 @@ static void scanHashtableCallbackCountScanned(void *privdata, void *elemref) {
server.stat_active_defrag_scanned++;
}

/* Used as dict scan callback when all the work is done in the dictDefragFunctions. */
static void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
UNUSED(privdata);
UNUSED(de);
server.stat_active_defrag_scanned++;
}

static void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

/* Hashtable scan callback for hash datatype */
static void activeDefragHashTypeEntry(void *privdata, void *element_ref) {
UNUSED(privdata);
hashTypeEntry **entry_ref = (hashTypeEntry **)element_ref;

hashTypeEntry *new_entry = hashTypeEntryDefrag(*entry_ref, activeDefragAlloc, activeDefragSds);
if (new_entry) *entry_ref = new_entry;
}

static void scanLaterHash(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragVal = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragHashTypeEntry, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void defragQuicklist(robj *ob) {
Expand Down Expand Up @@ -538,15 +530,19 @@ static void defragZsetSkiplist(robj *ob) {
}

static void defragHash(robj *ob) {
dict *d, *newd;
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
} else {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragHashTypeEntry, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) ob->ptr = new_hashtable;
}

static void defragSet(robj *ob) {
Expand All @@ -555,11 +551,14 @@ static void defragSet(robj *ob) {
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
} else {
activeDefragSdsHashtable(ht);
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *newHashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (newHashtable) ob->ptr = newHashtable;
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) ob->ptr = new_hashtable;
}

/* Defrag callback for radix tree iterator, called for each node,
Expand Down Expand Up @@ -776,7 +775,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
} else if (ob->type == OBJ_HASH) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
} else if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragHash(ob);
} else {
serverPanic("Unknown hash encoding");
Expand Down
6 changes: 3 additions & 3 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable *ht = obj->ptr;
return hashtableSize(ht);
} else if (obj->type == OBJ_STREAM) {
size_t effort = 0;
stream *s = obj->ptr;
Expand Down
35 changes: 6 additions & 29 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -11090,25 +11090,6 @@ typedef struct {
ValkeyModuleScanKeyCB fn;
} ScanKeyCBData;

static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) {
ScanKeyCBData *data = privdata;
sds key = dictGetKey(de);
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;

if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else {
serverPanic("unexpected object type");
}

data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}

static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
ScanKeyCBData *data = privdata;
robj *o = data->key->value;
Expand All @@ -11122,6 +11103,10 @@ static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
value = createStringObjectFromLongDouble(node->score, 0);
} else if (o->type == OBJ_HASH) {
key = hashTypeEntryGetField(entry);
sds val = hashTypeEntryGetValue(entry);
value = createStringObject(val, sdslen(val));
} else {
serverPanic("unexpected object type");
}
Expand Down Expand Up @@ -11185,13 +11170,12 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
errno = EINVAL;
return 0;
}
dict *d = NULL;
hashtable *ht = NULL;
robj *o = key->value;
if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT) d = o->ptr;
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST) ht = ((zset *)o->ptr)->ht;
} else {
Expand All @@ -11203,14 +11187,7 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
return 0;
}
int ret = 1;
if (d) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = dictScan(d, cursor->cursor, moduleScanKeyDictCallback, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
} else if (ht) {
if (ht) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = hashtableScan(ht, cursor->cursor, moduleScanKeyHashtableCallback, &data);
if (cursor->cursor == 0) {
Expand Down
Loading

0 comments on commit d13aad4

Please sign in to comment.