Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce configuration options in the cluster API #137

Merged
merged 26 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
554481a
Remove flags from cluster connect APIs
bjosv Sep 13, 2024
188e9ab
Add cluster connect with options
bjosv Oct 16, 2024
0403b39
Add async cluster connect with options
bjosv Oct 16, 2024
c582f07
Remove config functions that are replaced with options
bjosv Oct 20, 2024
b5ad0a9
New option: Blocking slotmap updates after initial async connect
bjosv Dec 4, 2024
b61a813
Update documentation
bjosv Jan 3, 2025
978e2e1
fixup: correcting ct_out_of_memory_handling
bjosv Jan 22, 2025
aaa4eca
Merge remote-tracking branch 'upstream/main' into clusterconfig
bjosv Jan 29, 2025
a1f704c
Use CLUSTER SLOTS during slotmap updates by default
bjosv Jan 29, 2025
3681d88
fixup: remove valkeyClusterOptionsSetEventCallback
bjosv Jan 29, 2025
8473801
fixup: remote valkeyClusterOptionsSetConnectCallback
bjosv Jan 29, 2025
a5ecc02
fixup: fix cluster.md review comments
bjosv Jan 29, 2025
bf83e65
fixup: remove helper valkeyClusterOptionsEnableTLS
bjosv Jan 29, 2025
6c88335
fixup: Add 'bitwise' to spellcheck wordlist in CI
bjosv Jan 29, 2025
97e02b6
fixup: fix leak warnings during OOM testing
bjosv Jan 29, 2025
28f790b
Update migration guide
bjosv Jan 30, 2025
03b2aaf
Update docs and option name `async_connect_callback`
bjosv Jan 30, 2025
fa7c35d
fixup: spelling correction
bjosv Jan 30, 2025
7a5bc1a
fixup: review comments
bjosv Jan 30, 2025
e145437
Add a TLS section in cluster.md as in standalone.md
bjosv Jan 30, 2025
f421006
fixup: correcting options
bjosv Jan 30, 2025
3cf42b1
Update async API readme
bjosv Feb 3, 2025
d57b28a
fixup: spelling
bjosv Feb 3, 2025
5270d93
fixup: correcting grammar
bjosv Feb 3, 2025
c29e0db
fixup: add markdown links in migration guide
bjosv Feb 3, 2025
8002264
fixup: link to heading
bjosv Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Autoloading
backend
backends
behaviour
bitwise
boolean
bugfix
CAS
Expand Down
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ IF(ENABLE_TLS)
ENDIF()
FIND_PACKAGE(OpenSSL REQUIRED)
SET(valkey_tls_sources
src/cluster_tls.c
src/tls.c)
ADD_LIBRARY(valkey_tls ${valkey_tls_sources})
ADD_LIBRARY(valkey::valkey_tls ALIAS valkey_tls)
Expand Down
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ INCLUDE_DIR = include/valkey
TEST_SRCS = $(TEST_DIR)/client_test.c $(TEST_DIR)/ut_parse_cmd.c $(TEST_DIR)/ut_slotmap_update.c
TEST_BINS = $(patsubst $(TEST_DIR)/%.c,$(TEST_DIR)/%,$(TEST_SRCS))

SOURCES = $(filter-out $(wildcard $(SRC_DIR)/*tls.c) $(SRC_DIR)/rdma.c, $(wildcard $(SRC_DIR)/*.c))
HEADERS = $(filter-out $(wildcard $(INCLUDE_DIR)/*tls.h) $(INCLUDE_DIR)/rdma.h, $(wildcard $(INCLUDE_DIR)/*.h))
SOURCES = $(filter-out $(SRC_DIR)/tls.c $(SRC_DIR)/rdma.c, $(wildcard $(SRC_DIR)/*.c))
HEADERS = $(filter-out $(INCLUDE_DIR)/tls.h $(INCLUDE_DIR)/rdma.h, $(wildcard $(INCLUDE_DIR)/*.h))

OBJS = $(patsubst $(SRC_DIR)/%.c,$(OBJ_DIR)/%.o,$(SOURCES))

Expand Down Expand Up @@ -95,7 +95,7 @@ TLS_DYLIB_MAKE_CMD=$(CC) $(OPTIMIZATION) $(PLATFORM_FLAGS) -shared -Wl,-soname,$
USE_TLS?=0

ifeq ($(USE_TLS),1)
TLS_SOURCES = $(wildcard $(SRC_DIR)/*tls.c)
TLS_SOURCES = $(SRC_DIR)/tls.c
TLS_OBJS = $(patsubst $(SRC_DIR)/%.c,$(OBJ_DIR)/%.o,$(TLS_SOURCES))

# This is required for test.c only
Expand Down Expand Up @@ -293,7 +293,6 @@ install: $(DYLIBNAME) $(STLIBNAME) $(PKGCONFNAME) $(TLS_INSTALL)
install-tls: $(TLS_DYLIBNAME) $(TLS_STLIBNAME) $(TLS_PKGCONFNAME)
mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_LIBRARY_PATH)
$(INSTALL) $(INCLUDE_DIR)/tls.h $(INSTALL_INCLUDE_PATH)
$(INSTALL) $(INCLUDE_DIR)/cluster_tls.h $(INSTALL_INCLUDE_PATH)
$(INSTALL) $(TLS_DYLIBNAME) $(INSTALL_LIBRARY_PATH)/$(TLS_DYLIB_MINOR_NAME)
ln -sf $(TLS_DYLIB_MINOR_NAME) $(INSTALL_LIBRARY_PATH)/$(TLS_ROOT_DYLIB_NAME)
ln -sf $(TLS_DYLIB_MINOR_NAME) $(INSTALL_LIBRARY_PATH)/$(TLS_DYLIB_MAJOR_NAME)
Expand Down
151 changes: 87 additions & 64 deletions docs/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ It is not intended as a complete reference. For that it's always best to refer t

- [Synchronous API](#synchronous-api)
- [Connecting](#connecting)
- [Connection options](#connection-options)
- [Executing commands](#executing-commands)
- [Executing commands on a specific node](#executing-commands-on-a-specific-node)
- [Disconnecting/cleanup](#disconnecting-cleanup)
- [Pipelining](#pipelining)
- [Events](#events)
- [Asynchronous API](#asynchronous-api)
- [Connecting](#connecting-1)
- [Connection options](#connection-options-1)
- [Executing commands](#executing-commands-1)
- [Executing commands on a specific node](#executing-commands-on-a-specific-node-1)
- [Disconnecting/cleanup](#disconnecting-cleanup-1)
Expand All @@ -27,37 +29,61 @@ It is not intended as a complete reference. For that it's always best to refer t

### Connecting

There are a variety of ways to setup and connect to a cluster.
The most basic alternative lacks many options, but can be enough for some use cases.
There are a few alternative ways to setup and connect to a cluster.
The basic alternatives lacks most options, but can be enough for some use cases.

```c
valkeyClusterContext *cc = valkeyClusterConnect("127.0.0.1:6379", VALKEYCLUSTER_FLAG_NULL);
valkeyClusterContext *valkeyClusterConnect(const char *addrs);
valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs,
const struct timeval tv);
```

There is also a convenience struct to specify various options.

```c
valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions *options);
```

When connecting to a cluster, `NULL` is returned when the context can't be allocated, or `err` and `errstr` are set in the returned allocated context when there are issues.
So when connecting it's simple to handle error states.

```c
valkeyClusterContext *cc = valkeyClusterConnect("127.0.0.1:6379,127.0.0.1:6380");
if (cc == NULL || cc->err) {
fprintf(stderr, "Error: %s\n", cc ? cc->errstr : "OOM");
}
```

When connecting to a cluster, libvalkey will return `NULL` in the event that we can't allocate the context, and set the `err` member if we can connect but there are issues.
So when connecting it's simple to handle error states.

To be able to set options before any connection attempt is performed a `valkeyClusterContext` can be created using `valkeyClusterContextInit`.
The context is where the known cluster state and the options are kept, which can be configured using functions like
`valkeyClusterSetOptionAddNodes` to add one or many cluster node addresses,
or `valkeyClusterSetOptionUsername` together with `valkeyClusterSetOptionPassword` to configure authentication and so on.
See [`include/valkey/cluster.h`](../include/valkey/cluster.h) for more details.
### Connection options

The function `valkeyClusterConnect2` is provided to connect using the prepared `valkeyClusterContext`.
There are a variety of options you can specify using the `valkeyClusterOptions` struct when connecting to a cluster.
This includes information about how to connect to the cluster and defining optional callbacks and other options.
See [include/valkey/cluster.h](../include/valkey/cluster.h) for more details.

```c
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyClusterSetOptionAddNodes(cc, "127.0.0.1:6379,127.0.0.1:6380");
int status = valkeyClusterConnect2(cc);
if (status == VALKEY_ERR) {
printf("Error: %s\n", cc->errstr);
// handle error
valkeyClusterOptions opt = {0};
opt.initial_nodes = "127.0.0.1:6379,127.0.0.1:6380"; // Addresses to initially connect to.
opt.options = VALKEY_OPT_USE_CLUSTER_NODES; // See available flags below.
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
opt.password = "password" // Authentication; libvalkey sends the `AUTH` command.

// Enable TLS using a prepared `valkeyTLSContext` when connecting.
options.tls = tlsCtx;
bjosv marked this conversation as resolved.
Show resolved Hide resolved
options.tls_init_fn = &valkeyInitiateTLSWithContext;

valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
if (cc == NULL || cc->err) {
fprintf(stderr, "Error: %s\n", cc ? cc->errstr : "OOM");
}
```

There are also several flags you can specify in `valkeyClusterOptions.flags`. It's a bitwise OR of the following flags:

| Flag | Description |
| --- | --- |
| VALKEY\_OPT\_USE\_CLUSTER\_NODES | Tells libvalkey to use the command `CLUSTER NODES` when updating its slot map (cluster topology).<br>Libvalkey uses `CLUSTER SLOTS` by default. |
bjosv marked this conversation as resolved.
Show resolved Hide resolved
| VALKEY\_OPT\_USE\_REPLICAS| Tells libvalkey to keep parsed information of replica nodes. |
| VALKEY\_OPT\_BLOCKING\_INITIAL\_UPDATE | **ASYNC**: Tells libvalkey to perform the initial slot map update in a blocking fashion. The function call will wait for a slot map update before returning so that the returned context is immediately ready to accept commands. |

### Executing commands

The primary command interface is a `printf`-like function that takes a format string along with a variable number of arguments.
Expand Down Expand Up @@ -121,22 +147,23 @@ if (valkeyClusterAppendCommand(cc, "SET foo bar") != VALKEY_OK) {
fprintf(stderr, "Error appending command: %s\n", cc->errstr);
exit(1);
}

if (valkeyClusterAppendCommand(cc, "GET foo") != VALKEY_OK) {
fprintf(stderr, "Error appending command: %s\n", cc->errstr);
exit(1);
}

valkeyReply *reply;
if (valkeyClusterGetReply(cc,&reply) != VALKEY_OK) {
fprintf(stderr, "Error reading reply %zu: %s\n", i, c->errstr);
exit(1);
fprintf(stderr, "Error reading reply %zu: %s\n", i, c->errstr);
exit(1);
}
// Handle the reply for SET here.
freeReplyObject(reply);

if (valkeyClusterGetReply(cc,&reply) != VALKEY_OK) {
fprintf(stderr, "Error reading reply %zu: %s\n", i, c->errstr);
exit(1);
fprintf(stderr, "Error reading reply %zu: %s\n", i, c->errstr);
exit(1);
}
// Handle the reply for GET here.
freeReplyObject(reply);
Expand All @@ -149,18 +176,26 @@ freeReplyObject(reply);
There is a hook to get notified when certain events occur.

```c
int valkeyClusterSetEventCallback(valkeyClusterContext *cc,
void(fn)(const valkeyClusterContext *cc, int event,
void *privdata),
void *privdata);
/* Function to be called when events occur. */
void callbackFn(const valkeyClusterContext *cc, int event, void *privdata) {
switch (event) {
// Handle event
}
}

valkeyClusterOptions opt = {0};
opt.event_callback = callbackFn;
opt.event_privdata = my_privdata; // User defined data can be provided to the callback.
// Set additional options...
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
```

The callback is called with `event` set to one of the following values:

* `VALKEYCLUSTER_EVENT_SLOTMAP_UPDATED` when the slot mapping has been updated;
* `VALKEYCLUSTER_EVENT_READY` when the slot mapping has been fetched for the first
time and the client is ready to accept commands, useful when initiating the
client with `valkeyClusterAsyncConnect2()` where a client is not immediately
client with `valkeyClusterAsyncConnect()` where a client is not immediately
ready after a successful call;
* `VALKEYCLUSTER_EVENT_FREE_CONTEXT` when the cluster context is being freed, so
that the user can free the event `privdata`.
Expand All @@ -169,11 +204,17 @@ The callback is called with `event` set to one of the following values:

There is a hook to get notified about connect and reconnect attempts.
This is useful for applying socket options or access endpoint information for a connection to a particular node.
The callback is registered using the following function:
The callback is registered using an option:

```c
int valkeyClusterSetConnectCallback(valkeyClusterContext *cc,
void(fn)(const valkeyContext *c, int status));
void connect_callback(const valkeyContext *c, int status) {
// Perform desired action
}

valkeyClusterOptions opt = {0};
opt.connect_callback = connect_callback;
// Set additional options...
valkeyClusterContext *cc = valkeyClusterConnectWithOptions(&opt);
```

The callback is called just after connect, before TLS handshake and authentication.
Expand All @@ -194,18 +235,7 @@ The first alternative is to use the function `valkeyClusterAsyncConnect`, which
Any command sent by the user thereafter will create a new non-blocking connection, unless a non-blocking connection already exists to the destination.
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
The function returns a pointer to a newly created `valkeyClusterAsyncContext` struct and its `err` field should be checked to make sure the initial slot map update was successful.

```c
// Insufficient error handling for brevity.
valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnect("127.0.0.1:6379", VALKEYCLUSTER_FLAG_NULL);
if (acc->err) {
printf("error: %s\n", acc->errstr);
exit(1);
}

// Attach an event engine. In this example we use libevent.
struct event_base *base = event_base_new();
valkeyClusterLibeventAttach(acc, base);
```

The second alternative is to use `valkeyClusterAsyncContextInit` and `valkeyClusterAsyncConnect2` which avoids the initial blocking connect.
This connection alternative requires an attached event engine when `valkeyClusterAsyncConnect2` is called, but the connect and the initial slot map update is done in a non-blocking fashion.
Expand All @@ -214,22 +244,12 @@ This means that commands sent directly after `valkeyClusterAsyncConnect2` may fa
You may use the [`eventCallback`](#events-per-cluster-context-1) to be notified when the slot map is updated and the client is ready to accept commands.
A crude example of using the `eventCallback` can be found in [this test case](../tests/ct_async.c).

```c
// Insufficient error handling for brevity.
valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit();

// Add a cluster node address for the initial connect.
valkeyClusterSetOptionAddNodes(acc->cc, "127.0.0.1:6379");

// Attach an event engine. In this example we use libevent.
struct event_base *base = event_base_new();
valkeyClusterLibeventAttach(acc, base);

if (valkeyClusterAsyncConnect2(acc) != VALKEY_OK) {
printf("error: %s\n", acc->errstr);
exit(1);
}
```

### Connection options


### Executing commands

Expand Down Expand Up @@ -284,18 +304,20 @@ After this, the disconnection callback is executed with the `VALKEY_OK` status a

#### Events per cluster context

Use [`valkeyClusterSetEventCallback`](#events-per-cluster-context) with `acc->cc` as the context to get notified when certain events occur.
Use [`event_callback` in `valkeyClusterOptions`](#events-per-cluster-context) to get notified when certain events occur.
Alternatively `valkeyClusterAsyncSetEventCallback` can be used when the `valkeyClusterAsyncContext` is required to be provided in `privdata`.

#### Events per connection

Because the connections that will be created are non-blocking, the kernel is not able to instantly return if the specified host and port is able to accept a connection.
Instead, use a connect callback to be notified when a connection is established or failed.
Similarly, a disconnect callback can be used to be notified about a disconnected connection (either because of an error or per user request).
The callbacks are installed using the following functions:
The callbacks can be enabled using the following options when calling `valkeyClusterAsyncConnectWithOptions`:

```c
status = valkeyClusterAsyncSetConnectCallback(acc, callbackFn);
status = valkeyClusterAsyncSetDisconnectCallback(acc, callbackFn);
valkeyClusterOptions opt = {0};
opt.async_connect_callback = callbackFn;
opt.async_disconnect_callback = callbackFn;
```

The connect callback function should have the following prototype, aliased to `valkeyConnectCallback`:
Expand All @@ -305,14 +327,15 @@ void(valkeyAsyncContext *ac, int status);

On a connection attempt, the `status` argument is set to `VALKEY_OK` when the connection was successful.
The file description of the connection socket can be retrieved from a `valkeyAsyncContext` as `ac->c->fd`.
On a disconnect, the `status` argument is set to `VALKEY_OK` when disconnection was initiated by the user, or `VALKEY_ERR` when the disconnection was caused by an error.
When it is `VALKEY_ERR`, the `err` field in the context can be accessed to find out the cause of the error.

You don't need to reconnect in the disconnect callback.
libvalkey will reconnect by itself when the next command is handled.
The disconnect callback function should have the following prototype, aliased to `valkeyDisconnectCallback`:
```c
void(const valkeyAsyncContext *ac, int status);
```

Setting the connect and disconnect callbacks can only be done once per context.
For subsequent calls it will return `VALKEY_ERR`.
On a disconnection the `status` argument is set to `VALKEY_OK` if it was initiated by the user, or to `VALKEY_ERR` when it was caused by an error.
When caused by an error the `err` field in the context can be accessed to get the error cause.
You don't need to reconnect in the disconnect callback since libvalkey will reconnect by itself when the next command is handled.

## Miscellaneous

Expand Down
34 changes: 29 additions & 5 deletions docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ The type `sds` is removed from the public API.

## Migrating from `hiredis-cluster` 0.14.0

* The cluster client initiation procedure is changed and `valkeyClusterOptions`
should be used to specify options when creating a context.
The `examples` directory in this repository contains some common client
initiation examples that might be helpful.
bjosv marked this conversation as resolved.
Show resolved Hide resolved
* The command used to update the internal slot map is changed to `CLUSTER SLOTS`.
`CLUSTER NODES` can be re-enabled through options using `VALKEY_OPT_USE_CLUSTER_NODES`.

### Renamed API functions

* `ctx_get_by_node` is renamed to `valkeyClusterGetValkeyContext`.
* `actx_get_by_node` is renamed to `valkeyClusterGetValkeyAsyncContext`.
* `redisClusterAsyncSetConnectCallbackNC` is renamed to `valkeyClusterAsyncSetConnectCallback`.

### Renamed API defines

Expand All @@ -41,14 +47,32 @@ The type `sds` is removed from the public API.

### Removed API functions

* `redisClusterSetMaxRedirect` removed and replaced with `valkeyClusterSetOptionMaxRetry`.
* `redisClusterSetOptionAddNode` removed and replaced with `valkeyClusterSetOptionAddNodes`.
(Note the "s" in the end of the function name.)
* `redisClusterConnect2` removed, use `valkeyClusterConnectWithOptions`.
* `redisClusterContextInit` removed, use `valkeyClusterConnectWithOptions`.
* `redisClusterSetConnectCallback` removed, use `valkeyClusterOptions.connect_callback`.
* `redisClusterSetEventCallback` removed, use `valkeyClusterOptions.event_callback`.
* `redisClusterSetMaxRedirect` removed, use `valkeyClusterOptions.max_retry`.
* `redisClusterSetOptionAddNode` removed, use `valkeyClusterOptions.initial_nodes`.
* `redisClusterSetOptionAddNodes` removed, use `valkeyClusterOptions.initial_nodes`.
* `redisClusterSetOptionConnectBlock` removed since it was deprecated.
* `redisClusterSetOptionConnectNonBlock` removed since it was deprecated.
* `redisClusterSetOptionConnectTimeout` removed, use `valkeyClusterOptions.connect_timeout`.
* `redisClusterSetOptionMaxRetry` removed, use `valkeyClusterOptions.max_retry`.
* `redisClusterSetOptionParseSlaves` removed, use `valkeyClusterOptions.flags` and `VALKEY_OPT_USE_REPLICAS`.
* `redisClusterSetOptionPassword` removed, use `valkeyClusterOptions.password`.
* `redisClusterSetOptionRouteUseSlots` removed, the use of `CLUSTER SLOTS` is enabled by default.
* `redisClusterSetOptionUsername` removed, use `valkeyClusterOptions.username`.
* `redisClusterAsyncSetConnectCallback` removed, but `valkeyClusterOptions.async_connect_callback` can be used which accepts a non-const callback function prototype.
* `redisClusterAsyncSetConnectCallbackNC` removed, use `valkeyClusterOptions.async_connect_callback`.
* `redisClusterAsyncSetDisconnectCallback` removed, use `valkeyClusterOptions.async_disconnect_callback`.
* `parse_cluster_nodes` removed from API, for internal use only.
* `parse_cluster_slots` removed from API, for internal use only.
* `redisClusterAsyncSetConnectCallback` is removed, but can be replaced with `valkeyClusterAsyncSetConnectCallback` which accepts the non-const callback function prototype.

### Removed API defines

* `HIRCLUSTER_FLAG_NULL` removed.
* `HIRCLUSTER_FLAG_ADD_SLAVE` removed, flag can be replaced with an option, see `VALKEY_OPT_USE_REPLICAS`.
* `HIRCLUSTER_FLAG_ROUTE_USE_SLOTS` removed, the use of `CLUSTER SLOTS` is enabled by default.

### Removed support for splitting multi-key commands per slot

Expand Down
Loading