Skip to content

Commit

Permalink
Retry when an async slotmap update fails (#159)
Browse files Browse the repository at this point in the history
- Small fix to reattempt failing slotmap updates.
- Handle empty and NULL addresses in `CLUSTER SLOTS` responses.  
   As described in cluster slots docs:
  "Clients may treat the empty string in the same way as NULL,
   that is the same endpoint it used to send the current command to"
- clusterclient_async received a new flag --async-initial-update to
  enable usage of valkeyClusterAsyncConnect2, which avoids blocking
  initial slotmap update.
- Use short timeout when scheduling processing of next command in
  clusterclient_async and allow the event engine to read from sockets
  before next command, which improves predictability in tests.

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv authored Jan 23, 2025
1 parent e63166b commit fb8af4c
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 96 deletions.
96 changes: 41 additions & 55 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,37 +389,9 @@ static int authenticate(valkeyClusterContext *cc, valkeyContext *c) {
* Return a new node with the "cluster slots" command reply.
*/
static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc,
valkeyReply *host_elem,
valkeyReply *port_elem,
char *host, int port,
uint8_t role) {
valkeyClusterNode *node = NULL;

if (host_elem == NULL || port_elem == NULL) {
return NULL;
}

if (host_elem->type != VALKEY_REPLY_STRING || host_elem->len <= 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"node ip is not string.");
goto error;
}

if (port_elem->type != VALKEY_REPLY_INTEGER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"node port is not integer.");
goto error;
}

if (port_elem->integer < 1 || port_elem->integer > UINT16_MAX) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"node port is not valid.");
goto error;
}

node = createValkeyClusterNode();
valkeyClusterNode *node = createValkeyClusterNode();
if (node == NULL) {
goto oom;
}
Expand All @@ -433,29 +405,26 @@ static valkeyClusterNode *node_get_with_slots(valkeyClusterContext *cc,
node->slots->free = listClusterSlotDestructor;
}

node->addr = sdsnewlen(host_elem->str, host_elem->len);
node->addr = sdsnew(host);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", port_elem->integer);
node->addr = sdscatfmt(node->addr, ":%i", port);
if (node->addr == NULL) {
goto oom;
}
node->host = sdsnewlen(host_elem->str, host_elem->len);
node->host = sdsnew(host);
if (node->host == NULL) {
goto oom;
}
node->name = NULL;
node->port = (int)port_elem->integer;
node->port = port;
node->role = role;

return node;

oom:
valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory");
// passthrough

error:
if (node != NULL) {
sdsfree(node->addr);
sdsfree(node->host);
Expand Down Expand Up @@ -510,7 +479,8 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) {
/**
* Parse the "cluster slots" command reply to nodes dict.
*/
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyContext *c,
valkeyReply *reply) {
int ret;
cluster_slot *slot = NULL;
dict *nodes = NULL;
Expand Down Expand Up @@ -594,22 +564,39 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
elem_ip = elem_nodes->element[0];
elem_port = elem_nodes->element[1];

if (elem_ip == NULL || elem_port == NULL ||
elem_ip->type != VALKEY_REPLY_STRING ||
elem_port->type != VALKEY_REPLY_INTEGER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"Command(cluster slots) reply error: "
"ip or port is not correct.");
/* Validate ip element. Accept a NULL value ip (NIL type) since
* we will handle the unknown endpoint special. */
if (elem_ip == NULL || (elem_ip->type != VALKEY_REPLY_STRING &&
elem_ip->type != VALKEY_REPLY_NIL)) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid node address");
goto error;
}

/* Validate port element. */
if (elem_port == NULL || elem_port->type != VALKEY_REPLY_INTEGER ||
(elem_port->integer < 1 || elem_port->integer > UINT16_MAX)) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port");
goto error;
}

/* Get the received ip/host. According to the docs an unknown
* endpoint or an empty string can be treated as it means
* the same address as we sent this command to.
* An unknown endpoint has the type VALKEY_REPLY_NIL and its
* length is initiated to zero. */
char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host;
if (host == NULL) {
goto oom;
}
int port = elem_port->integer;

if (idx == 2) {
/* Parse a primary node. */
sds address = sdsnewlen(elem_ip->str, elem_ip->len);
sds address = sdsnew(host);
if (address == NULL) {
goto oom;
}
address = sdscatfmt(address, ":%i", elem_port->integer);
address = sdscatfmt(address, ":%i", port);
if (address == NULL) {
goto oom;
}
Expand All @@ -628,8 +615,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
break;
}

primary = node_get_with_slots(cc, elem_ip, elem_port,
VALKEY_ROLE_PRIMARY);
primary = node_get_with_slots(cc, host, port, VALKEY_ROLE_PRIMARY);
if (primary == NULL) {
goto error;
}
Expand All @@ -654,7 +640,7 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {

slot = NULL;
} else if (cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE) {
replica = node_get_with_slots(cc, elem_ip, elem_port,
replica = node_get_with_slots(cc, host, port,
VALKEY_ROLE_REPLICA);
if (replica == NULL) {
goto error;
Expand Down Expand Up @@ -1061,7 +1047,7 @@ static int clusterUpdateRouteHandleReply(valkeyClusterContext *cc,

dict *nodes;
if (cc->flags & VALKEYCLUSTER_FLAG_ROUTE_USE_SLOTS) {
nodes = parse_cluster_slots(cc, reply);
nodes = parse_cluster_slots(cc, c, reply);
} else {
nodes = parse_cluster_nodes(cc, c, reply);
}
Expand Down Expand Up @@ -2889,7 +2875,6 @@ int valkeyClusterAsyncSetDisconnectCallback(valkeyClusterAsyncContext *acc,
/* Reply callback function for CLUSTER SLOTS */
void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r,
void *privdata) {
UNUSED(ac);
valkeyReply *reply = (valkeyReply *)r;
valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata;
acc->lastSlotmapUpdateAttempt = vk_usec_now();
Expand All @@ -2901,16 +2886,16 @@ void clusterSlotsReplyCallback(valkeyAsyncContext *ac, void *r,
}

valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_slots(cc, reply);
dict *nodes = parse_cluster_slots(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
/* Retry using available nodes */
updateSlotMapAsync(acc, NULL);
}
}

/* Reply callback function for CLUSTER NODES */
void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
void *privdata) {
UNUSED(ac);
valkeyReply *reply = (valkeyReply *)r;
valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata;
acc->lastSlotmapUpdateAttempt = vk_usec_now();
Expand All @@ -2924,7 +2909,8 @@ void clusterNodesReplyCallback(valkeyAsyncContext *ac, void *r,
valkeyClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, &ac->c, reply);
if (updateNodesAndSlotmap(cc, nodes) != VALKEY_OK) {
/* Ignore failures for now */
/* Retry using available nodes */
updateSlotMapAsync(acc, NULL);
}
}

Expand Down
8 changes: 8 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,12 @@ if (LIBEVENT_LIBRARY)
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-without-slotmap-update-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connect-during-cluster-startup-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
add_test(NAME connect-during-cluster-startup-using-cluster-nodes-test-async
COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh"
"$<TARGET_FILE:clusterclient_async>"
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/")
endif()
45 changes: 31 additions & 14 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ char cmd_history[HISTORY_DEPTH][CMD_SIZE];
int num_running = 0;
int resend_failed_cmd = 0;
int send_to_all = 0;
int show_events = 0;
int async_initial_update = 0;

void sendNextCommand(evutil_socket_t, short, void *);

Expand Down Expand Up @@ -100,8 +102,9 @@ void replyCallback(valkeyClusterAsyncContext *acc, void *r, void *privdata) {

if (--num_running == 0) {
/* Schedule a read from stdin and send next command */
struct timeval timeout = {0, 10};
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout);
}
}

Expand Down Expand Up @@ -172,8 +175,9 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
printf("error: %s\n", acc->errstr);

/* Schedule a read from stdin and handle next command. */
struct timeval timeout = {0, 10};
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout);
}
}

Expand All @@ -189,7 +193,17 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {

void eventCallback(const valkeyClusterContext *cc, int event, void *privdata) {
(void)cc;
(void)privdata;
if (event == VALKEYCLUSTER_EVENT_READY) {
/* Schedule a read from stdin and send next command. */
valkeyClusterAsyncContext *acc = (valkeyClusterAsyncContext *)privdata;
struct timeval timeout = {0, 10};
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout);
}

if (!show_events)
return;

const char *e = NULL;
switch (event) {
case VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED:
Expand Down Expand Up @@ -224,7 +238,6 @@ void disconnectCallback(const valkeyAsyncContext *ac, int status) {

int main(int argc, char **argv) {
int use_cluster_slots = 1; // Get topology via CLUSTER SLOTS
int show_events = 0;
int show_connection_events = 0;

int optind;
Expand All @@ -235,6 +248,8 @@ int main(int argc, char **argv) {
show_events = 1;
} else if (strcmp(argv[optind], "--connection-events") == 0) {
show_connection_events = 1;
} else if (strcmp(argv[optind], "--async-initial-update") == 0) {
async_initial_update = 1;
} else {
fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]);
}
Expand All @@ -254,28 +269,30 @@ int main(int argc, char **argv) {
valkeyClusterSetOptionTimeout(acc->cc, timeout);
valkeyClusterSetOptionConnectTimeout(acc->cc, timeout);
valkeyClusterSetOptionMaxRetry(acc->cc, 1);
valkeyClusterSetEventCallback(acc->cc, eventCallback, acc);
if (use_cluster_slots) {
valkeyClusterSetOptionRouteUseSlots(acc->cc);
}
if (show_events) {
valkeyClusterSetEventCallback(acc->cc, eventCallback, NULL);
}
if (show_connection_events) {
valkeyClusterAsyncSetConnectCallback(acc, connectCallback);
valkeyClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
}

if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) {
printf("Connect error: %s\n", acc->cc->errstr);
exit(2);
}

struct event_base *base = event_base_new();
int status = valkeyClusterLibeventAttach(acc, base);
assert(status == VALKEY_OK);

/* Schedule a read from stdin and send next command */
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
if (async_initial_update) {
if (valkeyClusterAsyncConnect2(acc) != VALKEY_OK) {
printf("Connect error: %s\n", acc->errstr);
exit(2);
}
} else {
if (valkeyClusterConnect2(acc->cc) != VALKEY_OK) {
printf("Connect error: %s\n", acc->cc->errstr);
exit(2);
}
}

event_base_dispatch(base);

Expand Down
Loading

0 comments on commit fb8af4c

Please sign in to comment.