Skip to content

Commit

Permalink
Adress PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Aug 4, 2024
1 parent f821c86 commit da8106b
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 104 deletions.
217 changes: 141 additions & 76 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1541,16 +1541,110 @@ dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctio
return v;
}

typedef enum { PrefetchStart, PrefetchBucket, PrefetchEntry, PrefetchValue, PrefetchDone } PrefetchState;

/* -------------------------- Dict Prefetching ------------------------------ */

typedef enum {
PrefetchBucket, /* Initial state, determines which hash table to use, and prefetch the table's bucket */
PrefetchEntry, /* Prefetches entries associated with the given key's hash */
PrefetchValue, /* Prefetches the value object of the entry found in the previous step*/
PrefetchValueData, /* Prefetches the value object's data (if applicable) */
PrefetchDone /* Indicates that prefetching for this key is complete */
} PrefetchState;

/************************************ State machine diagram for the prefetch operation. ********************************
start
┌────────▼─────────┐
┌─────────►│ PrefetchBucket ├────►────────┐
│ └────────┬─────────┘ no more tables -> done
| bucket|found |
│ | │
entry not found - goto next table ┌────────▼────────┐ │
└────◄─────┤ PrefetchEntry | ▼
┌────────────►└────────┬────────┘ │
| Entry│found │
│ | │
value not found - goto next entry ┌───────▼────────┐ |
└───────◄──────┤ PrefetchValue | ▼
└───────┬────────┘ │
Value│found │
| |
┌───────────▼──────────────┐ │
│ PrefetchValueData │ ▼
└───────────┬──────────────┘ │
| │
┌───────-─▼─────────────┐ │
│ PrefetchDone │◄────────┘
└───────────────────────┘
**********************************************************************************************************************/

typedef struct {
PrefetchState state;
int ht_idx;
uint64_t idx;
uint64_t key_hash;
dictEntry *current_entry;
PrefetchState state; /* Current state of the prefetch operation */
int ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
uint64_t key_hash; /* Hash value of the key being prefetched */
dictEntry *current_entry; /* Pointer to the current entry being processed */
} PrefetchInfo;

typedef struct {
PrefetchInfo prefetch_info[DictMaxPrefetchSize];
size_t current_batch_size; /* Number of keys in the current batch */
size_t cur_idx; /* Index of the current key being prefetched */
size_t keys_done; /* Number of keys that have been processed */
} PrefetchBatch;

static PrefetchBatch prefetchBatch; /* Global prefetch batch - holds the current batch of keys being prefetched */

static void incrCurIdx(void) {
prefetchBatch.cur_idx++;
if (prefetchBatch.cur_idx >= prefetchBatch.current_batch_size) {
prefetchBatch.cur_idx %= prefetchBatch.current_batch_size;
}
}

/* Prefetches the given pointer and move to the next key in the batch */
static void prefetch(void *ptr) {
__builtin_prefetch(ptr);
/* while the prefetch is in progress, we can continue to the next key */
incrCurIdx();
}

static void markDone(PrefetchInfo *info) {
info->state = PrefetchDone;
prefetchBatch.keys_done++;
}

static PrefetchInfo *getNextPrefetchInfo(void) {
while (prefetchBatch.prefetch_info[prefetchBatch.cur_idx].state == PrefetchDone) {
incrCurIdx();
}
return &prefetchBatch.prefetch_info[prefetchBatch.cur_idx];
}

static void initBatch(dict **keys_dicts, size_t num_keys, const void **keys) {
assert(num_keys <= DictMaxPrefetchSize);

prefetchBatch.current_batch_size = num_keys;
prefetchBatch.cur_idx = 0;
prefetchBatch.keys_done = 0;

/* Initialize the prefetch info */
for (size_t i = 0; i < prefetchBatch.current_batch_size; i++) {
PrefetchInfo *info = &prefetchBatch.prefetch_info[i];
if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) {
info->state = PrefetchDone;
prefetchBatch.keys_done++;
continue;
}
info->ht_idx = -1;
info->current_entry = NULL;
info->state = PrefetchBucket;
info->key_hash = dictHashKey(keys_dicts[i], keys[i]);
}
}

/* dictPrefetch - Prefetches dictionary data for an array of keys
*
* This function takes an array of dictionaries and keys, attempting to bring
Expand All @@ -1571,109 +1665,80 @@ typedef struct {
* dictPrefetch can be invoked with a callback function, get_val_data_func,
* to bring the key's value data closer to the L1 cache as well. */
void dictPrefetch(dict **keys_dicts, size_t num_keys, const void **keys, void *(*get_val_data_func)(const void *val)) {
PrefetchInfo prefetchInfo[DictMaxPrefetchSize];
size_t done = 0;

assert(num_keys <= DictMaxPrefetchSize);

/* Initialize the prefetch info */
for (size_t i = 0; i < num_keys; i++) {
PrefetchInfo *info = &prefetchInfo[i];
if (!keys_dicts[i] || dictSize(keys_dicts[i]) == 0) {
info->state = PrefetchDone;
done++;
continue;
}
info->ht_idx = -1;
info->current_entry = NULL;
info->state = PrefetchStart;
info->key_hash = dictHashKey(keys_dicts[i], keys[i]);
}
initBatch(keys_dicts, num_keys, keys);

for (size_t j = 0; done < num_keys; j++) {
size_t i = j % num_keys;
PrefetchInfo *info = &prefetchInfo[i];
while (prefetchBatch.keys_done < prefetchBatch.current_batch_size) {
PrefetchInfo *info = getNextPrefetchInfo();
size_t i = prefetchBatch.cur_idx;
switch (info->state) {
case PrefetchDone:
/* Skip already processed keys */
break;

case PrefetchStart:
case PrefetchBucket:
/* Determine which hash table to use */
if (info->ht_idx == -1) {
info->ht_idx = 0;
} else if (info->ht_idx == 0 && dictIsRehashing(keys_dicts[i])) {
info->ht_idx = 1;
} else {
done++;
info->state = PrefetchDone;
/* No more tables left - mark as done. */
markDone(info);
break;
}

/* Prefetch the bucket */
info->idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]);
__builtin_prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->idx]);
info->state = PrefetchBucket;
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(keys_dicts[i]->ht_size_exp[info->ht_idx]);
prefetch(&keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->current_entry = NULL;
info->state = PrefetchEntry;
break;

case PrefetchBucket:
case PrefetchEntry:
/* Prefetch the first entry in the bucket */
info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->idx];
if (info->current_entry) {
__builtin_prefetch(info->current_entry);
info->state = PrefetchEntry;
/* We already found an entry in the bucket - move to the next entry */
info->current_entry = dictGetNext(info->current_entry);
} else {
/* Find the first entry in the bucket */
info->current_entry = keys_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
}

if (info->current_entry) {
prefetch(info->current_entry);
info->state = PrefetchValue;
} else {
/* No entry found in the bucket - try the next table */
info->state = PrefetchStart;
/* No entry found in the bucket - try the bucket in the next table */
info->state = PrefetchBucket;
}
break;

case PrefetchEntry: {
case PrefetchValue: {
/* Prefetch the entry's value. */
void *value = get_val_data_func ? dictGetVal(info->current_entry) : NULL;
void *value = dictGetVal(info->current_entry);

if (info->current_entry->next == NULL && !dictIsRehashing(keys_dicts[i])) {
if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(keys_dicts[i])) {
/* If this is the last element we assume a hit and dont compare the keys */
if (value) {
__builtin_prefetch(value);
info->state = PrefetchValue;
} else {
done++;
info->state = PrefetchDone;
}
prefetch(value);
info->state = PrefetchValueData;
break;
}

if (value) {
void *current_entry_key = dictGetKey(info->current_entry);
if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
__builtin_prefetch(value);
info->state = PrefetchValue;
break;
}
}

/* Move to next entry or start over */
info->current_entry = dictGetNext(info->current_entry);
if (info->current_entry) {
__builtin_prefetch(info->current_entry);
info->state = PrefetchEntry;
void *current_entry_key = dictGetKey(info->current_entry);
if (keys[i] == current_entry_key || dictCompareKeys(keys_dicts[i], keys[i], current_entry_key)) {
/* If the key is found, prefetch the value */
prefetch(value);
info->state = PrefetchValueData;
} else {
info->state = PrefetchStart;
/* Move to next entry */
info->state = PrefetchEntry;
}

break;
}

case PrefetchValue: {
case PrefetchValueData: {
/* Prefetch value data if available */
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) {
__builtin_prefetch(value_data);
if (get_val_data_func) {
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
if (value_data) prefetch(value_data);
}
done++;
info->state = PrefetchDone;
markDone(info);
break;
}

Expand Down
4 changes: 3 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4665,8 +4665,10 @@ void processBatchClientsCommands(void) {
batch.keys[i] = ((robj *)batch.keys[i])->ptr;
}

/* Prefetch keys for all commands */
/* Prefetch keys for all commands, prefetch is beneficial only if there are more than one key */
if (batch.keys_count > 1) {
server.stat_total_prefetch_batches++;
server.stat_total_prefetch_entries += batch.keys_count;
/* Keys */
kvstoreDictPrefetch(batch.keys_kvs, batch.slots, (const void **)batch.keys, batch.keys_count, getValData);
/* Expires - with expires no values prefetch are required. */
Expand Down
63 changes: 36 additions & 27 deletions tests/unit/networking.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -172,38 +172,47 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
}

start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} {
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
# Create 17 (prefetch batch size) +1 clients
for {set i 0} {$i < 17} {incr i} {
set rd$i [valkey_deferring_client]
}

# Get the client ID of rd4
$rd4 client id
set rd4_id [$rd4 read]
# Skip if non io-threads mode - as it is relevant only for io-threads mode
if {[r config get io-threads] ne "io-threads 1"} {
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
# Create 17 (prefetch batch size) +1 clients
for {set i 0} {$i < 17} {incr i} {
set rd$i [valkey_deferring_client]
}

# Create a batch of commands by making sure the server sleeps for a while
# before responding to the first command
$rd0 debug sleep 2
after 200 ; # wait a bit to make sure the server is sleeping.
# Get the client ID of rd4
$rd4 client id
set rd4_id [$rd4 read]

# The first client will kill the fourth client
$rd1 client kill id $rd4_id
# Create a batch of commands by making sure the server sleeps for a while
# before responding to the first command
$rd0 debug sleep 2
after 200 ; # wait a bit to make sure the server is sleeping.

# Send set commands for all clients except the first
for {set i 1} {$i < 17} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# The first client will kill the fourth client
$rd1 client kill id $rd4_id

# Read the results
assert_equal {1} [$rd1 read]
catch {$rd4 read} err
assert_match {I/O error reading reply} $err
# Send set commands for all clients except the first
for {set i 1} {$i < 17} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}

# Verify the final state
$rd16 get a
assert_equal {OK} [$rd16 read]
assert_equal {16} [$rd16 read]
# Read the results
assert_equal {1} [$rd1 read]
catch {$rd4 read} err
assert_match {I/O error reading reply} $err

# verify the prefetch stats are as expected
set info [r info stats]
set prefetch_stats [getInfoProperty $info io_threaded_average_prefetch_batch_size]
assert_range [expr $prefetch_stats] 2 15 ; # we expect max 15 as the the kill command doesn't have any keys.

# Verify the final state
$rd16 get a
assert_equal {OK} [$rd16 read]
assert_equal {16} [$rd16 read]
}
}
}

0 comments on commit da8106b

Please sign in to comment.