Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/unstable' into engine-api-1261
Browse files Browse the repository at this point in the history
  • Loading branch information
rjd15372 committed Dec 16, 2024
2 parents a0eefee + e024b4b commit bdc7e64
Show file tree
Hide file tree
Showing 23 changed files with 390 additions and 406 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/build-release-packages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
paths:
- '.github/workflows/build-release-packages.yml'
- '.github/workflows/call-build-linux-arm-packages.yml'
- '.github/workflows/call-build-linux-x86_64-packages.yml'
- '.github/workflows/call-build-linux-x86-packages.yml'
- 'utils/releasetools/build-config.json'
workflow_dispatch:
inputs:
Expand All @@ -23,6 +23,7 @@ jobs:
# This job provides the version metadata from the tag for the other jobs to use.
release-build-get-meta:
name: Get metadata to build
if: github.repository == 'valkey-io/valkey'
runs-on: ubuntu-latest
outputs:
version: ${{ steps.get_version.outputs.VERSION }}
Expand Down Expand Up @@ -66,6 +67,7 @@ jobs:

generate-build-matrix:
name: Generating build matrix
if: github.repository == 'valkey-io/valkey'
runs-on: ubuntu-latest
outputs:
x86_64-build-matrix: ${{ steps.set-matrix.outputs.x86_64-build-matrix }}
Expand Down
49 changes: 26 additions & 23 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,6 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->cport = 0;
node->tls_port = 0;
node->fail_reports = listCreate();
node->voted_time = 0;
node->orphaned_time = 0;
node->repl_offset_time = 0;
node->repl_offset = 0;
Expand Down Expand Up @@ -3004,7 +3003,8 @@ int clusterIsValidPacket(clusterLink *link) {
}

if (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2) {
serverLog(LL_WARNING, "Dropping packet that matches debug drop filter");
serverLog(LL_WARNING, "Dropping packet of type %s that matches debug drop filter",
clusterGetMessageTypeString(type));
return 0;
}

Expand Down Expand Up @@ -3095,7 +3095,7 @@ int clusterProcessPacket(clusterLink *link) {
if (server.debug_cluster_close_link_on_packet_drop &&
(type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) {
freeClusterLink(link);
serverLog(LL_WARNING, "Closing link for matching packet type %hu", type);
serverLog(LL_WARNING, "Closing link for matching packet type %s", clusterGetMessageTypeString(type));
return 0;
}
return 1;
Expand All @@ -3111,8 +3111,8 @@ int clusterProcessPacket(clusterLink *link) {
freeClusterLink(link);
serverLog(
LL_NOTICE,
"Closing link for node that sent a lightweight message of type %hu as its first message on the link",
type);
"Closing link for node that sent a lightweight message of type %s as its first message on the link",
clusterGetMessageTypeString(type));
return 0;
}
clusterNode *sender = link->node;
Expand All @@ -3121,6 +3121,27 @@ int clusterProcessPacket(clusterLink *link) {
return 1;
}

if (type == CLUSTERMSG_TYPE_MEET && link->node && nodeInHandshake(link->node)) {
/* If the link is bound to a node and the node is in the handshake state, and we receive
* a MEET packet, it may be that the sender sent multiple MEET packets so in here we are
* dropping the MEET to avoid the assert in setClusterNodeToInboundClusterLink. The assert
* will happen if the other sends a MEET packet because it detects that there is no inbound
* link, this node creates a new node in HANDSHAKE state (with a random node name), and
* respond with a PONG. The other node receives the PONG and removes the CLUSTER_NODE_MEET
* flag. This node is supposed to open an outbound connection to the other node in the next
* cron cycle, but before this happens, the other node re-sends a MEET on the same link
* because it still detects no inbound connection. We improved the re-send logic of MEET in
* #1441, now we will only re-send MEET packet once every handshake timeout period.
*
* Note that in getNodeFromLinkAndMsg, the node in the handshake state has a random name
* and not truly "known", so we don't know the sender. Dropping the MEET packet can prevent
* us from creating a random node, avoid incorrect link binding, and avoid duplicate MEET
* packet eliminate the handshake state. */
serverLog(LL_NOTICE, "Dropping MEET packet from node %.40s because the node is already in handshake state",
link->node->name);
return 1;
}

uint16_t flags = ntohs(hdr->flags);
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
Expand Down Expand Up @@ -4396,23 +4417,6 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
return;
}

/* We did not voted for a replica about this primary for two
* times the node timeout. This is not strictly needed for correctness
* of the algorithm but makes the base case more linear.
*
* This limitation does not restrict manual failover. If a user initiates
* a manual failover, we need to allow it to vote, otherwise the manual
* failover may time out. */
if (!force_ack && mstime() - node->replicaof->voted_time < server.cluster_node_timeout * 2) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s (%s): "
"can't vote for any replica of %.40s (%s) within %lld milliseconds",
node->name, node->human_nodename,
node->replicaof->name, node->replicaof->human_nodename,
(long long)((server.cluster_node_timeout * 2) - (mstime() - node->replicaof->voted_time)));
return;
}

/* The replica requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the primaries currently serving the same
* slots in the current configuration. */
Expand All @@ -4434,7 +4438,6 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {

/* We can vote for this replica. */
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
if (!force_ack) node->replicaof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node);
serverLog(LL_NOTICE, "Failover auth granted to %.40s (%s) for epoch %llu", node->name, node->human_nodename,
Expand Down
2 changes: 0 additions & 2 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,6 @@ struct _clusterNode {
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a replica of this primary in non manual
* failover scenarios. */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node.
Expand Down
72 changes: 46 additions & 26 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ void keysScanCallback(void *privdata, void *entry) {

/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void scanCallback(void *privdata, const dictEntry *de) {
void dictScanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
Expand All @@ -998,9 +998,7 @@ void scanCallback(void *privdata, const dictEntry *de) {
}
}

if (o->type == OBJ_SET) {
key = keysds;
} else if (o->type == OBJ_HASH) {
if (o->type == OBJ_HASH) {
key = keysds;
if (!data->only_keys) {
val = dictGetVal(de);
Expand All @@ -1013,13 +1011,33 @@ void scanCallback(void *privdata, const dictEntry *de) {
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in SCAN callback.");
serverPanic("Type not handled in dict SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
robj *o = data->o;
list *keys = data->keys;
data->sampled++;

/* currently only implemented for SET scan */
serverAssert(o && o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE);
sds key = (sds)entry; /* Specific for OBJ_SET */

/* Filter element if it does not match the pattern. */
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), key, sdslen(key), 0)) {
return;
}
}

listAddNodeTail(keys, key);
}

/* Try to parse a SCAN cursor stored at object 'o':
* if the cursor is valid, store it as unsigned integer into *cursor and
* returns C_OK. Otherwise return C_ERR and send an error to the
Expand Down Expand Up @@ -1083,7 +1101,6 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
sds typename = NULL;
long long type = LLONG_MAX;
int patlen = 0, use_pattern = 0, only_keys = 0;
dict *ht;

/* Object must be NULL (to iterate keys names), or the type of the object
* must be Set, Sorted Set, or Hash. */
Expand Down Expand Up @@ -1152,34 +1169,35 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* just return everything inside the object in a single call, setting the
* cursor to zero to signal the end of the iteration. */

/* Handle the case of a hash table. */
ht = NULL;
/* Handle the case of kvstore, dict or hashtable. */
dict *dict_table = NULL;
hashtable *hashtable_table = NULL;
int shallow_copied_list_items = 0;
if (o == NULL) {
ht = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable_table = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
dict_table = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
ht = zs->dict;
dict_table = zs->dict;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}

list *keys = listCreate();
/* Set a free callback for the contents of the collected keys list.
* For the main keyspace dict, and when we scan a key that's dict encoded
* (we have 'ht'), we don't need to define free method because the strings
* in the list are just a shallow copy from the pointer in the dictEntry.
* When scanning a key with other encodings (e.g. listpack), we need to
* free the temporary strings we add to that list.
* The exception to the above is ZSET, where we do allocate temporary
* strings even when scanning a dict. */
if (o && (!ht || o->type == OBJ_ZSET)) {
/* Set a free callback for the contents of the collected keys list if they
* are deep copied temporary strings. We must not free them if they are just
* a shallow copy - a pointer to the actual data in the data structure */
if (!shallow_copied_list_items) {
listSetFreeMethod(keys, (void (*)(void *))sdsfree);
}

/* For main dictionary scan or data structure using hashtable. */
if (!o || ht) {
/* For main hash table scan or scannable data structure. */
if (!o || dict_table || hashtable_table) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
Expand All @@ -1188,7 +1206,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {

/* We pass scanData which have three pointers to the callback:
* 1. data.keys: the list to which it will add new elements;
* 2. data.o: the object containing the dictionary so that
* 2. data.o: the object containing the hash table so that
* it is possible to fetch more data in a type-dependent way;
* 3. data.type: the specified type scan in the db, LLONG_MAX means
* type matching is no needed;
Expand Down Expand Up @@ -1219,8 +1237,10 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, keysScanCallback, NULL, &data);
} else if (dict_table) {
cursor = dictScan(dict_table, cursor, dictScanCallback, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
cursor = hashtableScan(hashtable_table, cursor, hashtableScanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
Expand Down
29 changes: 17 additions & 12 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,30 +916,35 @@ void debugCommand(client *c) {
addReplyVerbatim(c, stats, sdslen(stats), "txt");
sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr, "htstats-key") && c->argc >= 3) {
robj *o;
dict *ht = NULL;
int full = 0;

if (c->argc >= 4 && !strcasecmp(c->argv[3]->ptr, "full")) full = 1;

if ((o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr)) == NULL) return;
robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr);
if (o == NULL) return;

/* Get the hash table reference from the object, if possible. */
/* Get the dict reference from the object, if possible. */
dict *d = NULL;
hashtable *ht = NULL;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
ht = zs->dict;
d = zs->dict;
} break;
case OBJ_ENCODING_HT: ht = o->ptr; break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
}

if (ht == NULL) {
addReplyError(c, "The value stored at the specified key is not "
"represented using an hash table");
} else {
if (d != NULL) {
char buf[4096];
dictGetStats(buf, sizeof(buf), ht, full);
dictGetStats(buf, sizeof(buf), d, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else if (ht != NULL) {
char buf[4096];
hashtableGetStats(buf, sizeof(buf), ht, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else {
addReplyError(c, "The value stored at the specified key is not "
"represented using an hash table");
}
} else if (!strcasecmp(c->argv[1]->ptr, "change-repl-id") && c->argc == 2) {
serverLog(LL_NOTICE, "Changing replication IDs after receiving DEBUG change-repl-id");
Expand Down
42 changes: 28 additions & 14 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/

#include "server.h"
#include "hashtable.h"
#include "script.h"
#include <stddef.h>

Expand Down Expand Up @@ -379,6 +380,20 @@ static void activeDefragSdsDict(dict *d, int val_type) {
} while (cursor != 0);
}

void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) {
UNUSED(privdata);
sds *sds_ref = (sds *)entry_ref;
sds new_sds = activeDefragSds(*sds_ref);
if (new_sds != NULL) *sds_ref = new_sds;
}

void activeDefragSdsHashtable(hashtable *ht) {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
Expand Down Expand Up @@ -497,11 +512,9 @@ static void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
}

static void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void scanLaterHash(robj *ob, unsigned long *cursor) {
Expand Down Expand Up @@ -560,15 +573,16 @@ static void defragHash(robj *ob) {
}

static void defragSet(robj *ob) {
dict *d, *newd;
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
} else {
activeDefragSdsHashtable(ht);
}
/* defrag the hashtable struct and tables */
hashtable *newHashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (newHashtable) ob->ptr = newHashtable;
}

/* Defrag callback for radix tree iterator, called for each node,
Expand Down Expand Up @@ -766,7 +780,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverPanic("Unknown list encoding");
}
} else if (ob->type == OBJ_SET) {
if (ob->encoding == OBJ_ENCODING_HT) {
if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragSet(ob);
} else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) {
void *newptr, *ptr = ob->ptr;
Expand Down
Loading

0 comments on commit bdc7e64

Please sign in to comment.