Skip to content

Commit

Permalink
Make struct valkeyClusterNodeIterator opaque
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Sep 27, 2024
1 parent 4ee3474 commit 585e913
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 16 deletions.
14 changes: 9 additions & 5 deletions include/valkey/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#define VALKEY_CLUSTER_H

#include "async.h"
#include "dict.h"
#include "valkey.h"

#define UNUSED(x) (void)(x)
Expand Down Expand Up @@ -66,6 +65,7 @@
extern "C" {
#endif

struct dict;
struct hilist;
struct valkeyClusterAsyncContext;
struct valkeyTLSContext;
Expand Down Expand Up @@ -148,11 +148,15 @@ typedef struct valkeyClusterAsyncContext {

} valkeyClusterAsyncContext;

#if UINTPTR_MAX == UINT64_MAX
#define VALKEY_NODE_ITERATOR_SIZE 56
#elif defined(__arm__)
#define VALKEY_NODE_ITERATOR_SIZE 40
#else
#define VALKEY_NODE_ITERATOR_SIZE 32
#endif
typedef struct valkeyClusterNodeIterator {
valkeyClusterContext *cc;
uint64_t route_version;
int retries_left;
dictIterator di;
char opaque_data[VALKEY_NODE_ITERATOR_SIZE];
} valkeyClusterNodeIterator;

/*
Expand Down
31 changes: 21 additions & 10 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3611,33 +3611,44 @@ void valkeyClusterAsyncFree(valkeyClusterAsyncContext *acc) {
vk_free(acc);
}

struct nodeIterator {
valkeyClusterContext *cc;
uint64_t route_version;
int retries_left;
dictIterator di;
};
/* Make sure VALKEY_NODE_ITERATOR_SIZE is correct. */
vk_static_assert(sizeof(struct nodeIterator) == VALKEY_NODE_ITERATOR_SIZE);

/* Initiate an iterator for iterating over current cluster nodes */
void valkeyClusterInitNodeIterator(valkeyClusterNodeIterator *iter,
valkeyClusterContext *cc) {
iter->cc = cc;
iter->route_version = cc->route_version;
dictInitIterator(&iter->di, cc->nodes);
iter->retries_left = 1;
struct nodeIterator *ni = (struct nodeIterator *)iter;
ni->cc = cc;
ni->route_version = cc->route_version;
dictInitIterator(&ni->di, cc->nodes);
ni->retries_left = 1;
}

/* Get next node from the iterator
* The iterator will restart if the routing table is updated
* before all nodes have been iterated. */
valkeyClusterNode *valkeyClusterNodeNext(valkeyClusterNodeIterator *iter) {
if (iter->retries_left <= 0)
struct nodeIterator *ni = (struct nodeIterator *)iter;
if (ni->retries_left <= 0)
return NULL;

if (iter->route_version != iter->cc->route_version) {
if (ni->route_version != ni->cc->route_version) {
// The routing table has changed and current iterator
// is invalid. The nodes dict has been recreated in
// the cluster context. We need to re-init the dictIter.
dictInitIterator(&iter->di, iter->cc->nodes);
iter->route_version = iter->cc->route_version;
iter->retries_left--;
dictInitIterator(&ni->di, ni->cc->nodes);
ni->route_version = ni->cc->route_version;
ni->retries_left--;
}

dictEntry *de;
if ((de = dictNext(&iter->di)) != NULL)
if ((de = dictNext(&ni->di)) != NULL)
return dictGetEntryVal(de);
else
return NULL;
Expand Down
3 changes: 3 additions & 0 deletions src/vkutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
#include <stdint.h>
#include <sys/types.h>

/* Static assert macro for C99. */
#define vk_static_assert(cond) extern char vk_static_assert[sizeof(char[1 - 2 * !(cond)])]

/*
* Wrapper to workaround well known, safe, implicit type conversion when
* invoking system calls.
Expand Down
3 changes: 2 additions & 1 deletion tests/clusterclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ int main(int argc, char **argv) {
if (send_to_all) {
valkeyClusterNodeIterator ni;
valkeyClusterInitNodeIterator(&ni, cc);
uint64_t route_version = cc->route_version;

valkeyClusterNode *node;
while ((node = valkeyClusterNodeNext(&ni)) != NULL) {
Expand All @@ -133,7 +134,7 @@ int main(int argc, char **argv) {
printReply(reply);
}
freeReplyObject(reply);
if (ni.route_version != cc->route_version) {
if (route_version != cc->route_version) {
/* Updated slotmap resets the iterator. Abort iteration. */
break;
}
Expand Down

0 comments on commit 585e913

Please sign in to comment.