diff --git a/CMakeLists.txt b/CMakeLists.txt index c1480cb6..f9af3054 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ OPTION(DISABLE_TESTS "If tests should be compiled or not" OFF) OPTION(ENABLE_EXAMPLES "Enable building valkey examples" OFF) option(ENABLE_IPV6_TESTS "Enable IPv6 tests requiring special prerequisites" OFF) OPTION(ENABLE_NUGET "Install NuGET packaging details" OFF) +OPTION(ENABLE_RDMA "Build valkey_rdma for RDMA support" OFF) # valkey requires C99 SET(CMAKE_C_STANDARD 99) @@ -33,6 +34,7 @@ SET(valkey_sources src/alloc.c src/async.c src/command.c + src/conn.c src/crc16.c src/dict.c src/net.c @@ -226,6 +228,53 @@ IF(ENABLE_SSL) DESTINATION ${CMAKE_CONF_INSTALL_DIR}) ENDIF() +if(ENABLE_RDMA) + find_library(RDMACM_LIBRARIES rdmacm) + find_library(IBVERBS_LIBRARIES ibverbs) + set(valkey_rdma_sources src/rdma.c) + add_library(valkey_rdma ${valkey_rdma_sources}) + add_library(valkey::valkey_rdma ALIAS valkey_rdma) + + target_link_libraries(valkey_rdma LINK_PRIVATE ${RDMACM_LIBRARIES} ${IBVERBS_LIBRARIES}) + target_include_directories(valkey_rdma + PRIVATE + $ + $ + ) + + set_target_properties(valkey_rdma + PROPERTIES + WINDOWS_EXPORT_ALL_SYMBOLS TRUE + VERSION "${LIBVALKEY_SONAME}") + configure_file(valkey_rdma.pc.in valkey_rdma.pc @ONLY) + + install(TARGETS valkey_rdma + EXPORT valkey_rdma-targets + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) + + install(FILES ${CMAKE_CURRENT_BINARY_DIR}/valkey_rdma.pc + DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig) + + export(EXPORT valkey_rdma-targets + FILE "${CMAKE_CURRENT_BINARY_DIR}/valkey_rdma-targets.cmake" + NAMESPACE valkey::) + + set(CMAKE_CONF_INSTALL_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/valkey_rdma) + configure_package_config_file(valkey_rdma-config.cmake.in ${CMAKE_CURRENT_BINARY_DIR}/valkey_rdma-config.cmake + INSTALL_DESTINATION ${CMAKE_CONF_INSTALL_DIR} + PATH_VARS INCLUDE_INSTALL_DIR) + + install(EXPORT valkey_rdma-targets + FILE valkey_rdma-targets.cmake + NAMESPACE valkey:: + DESTINATION ${CMAKE_CONF_INSTALL_DIR}) + + install(FILES ${CMAKE_CURRENT_BINARY_DIR}/valkey_rdma-config.cmake + DESTINATION ${CMAKE_CONF_INSTALL_DIR}) +endif() + # Add tests if(NOT DISABLE_TESTS) # Make sure ctest prints the output when a test fails. diff --git a/Makefile b/Makefile index 5d2cb278..3c1d6c14 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,8 @@ INCLUDE_DIR = include/valkey TEST_SRCS = $(TEST_DIR)/client_test.c TEST_BINS = $(patsubst $(TEST_DIR)/%.c,$(TEST_DIR)/%,$(TEST_SRCS)) -SOURCES = $(filter-out $(wildcard $(SRC_DIR)/*ssl*.c), $(wildcard $(SRC_DIR)/*.c)) -HEADERS = $(filter-out $(INCLUDE_DIR)/valkey_ssl.h, $(wildcard $(INCLUDE_DIR)/*.h)) +SOURCES = $(filter-out $(wildcard $(SRC_DIR)/*ssl*.c, wildcard $(SRC_DIR)/*rdma*.c), $(wildcard $(SRC_DIR)/*.c)) +HEADERS = $(filter-out $(INCLUDE_DIR)/valkey_ssl.h $(INCLUDE_DIR)/valkey_rdma.h, $(wildcard $(INCLUDE_DIR)/*.h)) OBJS = $(patsubst $(SRC_DIR)/%.c,$(OBJ_DIR)/%.o,$(SOURCES)) @@ -25,6 +25,7 @@ PKGCONFNAME=$(LIB_DIR)/valkey.pc PKGCONF_TEMPLATE = valkey.pc.in SSL_PKGCONF_TEMPLATE = valkey_ssl.pc.in +RDMA_PKGCONF_TEMPLATE = valkey_rdma.pc.in LIBVALKEY_HEADER=$(INCLUDE_DIR)/valkey.h LIBVALKEY_VERSION=$(shell awk '/LIBVALKEY_(MAJOR|MINOR|PATCH|SONAME)/{print $$3}' $(LIBVALKEY_HEADER)) @@ -111,6 +112,38 @@ else endif ##################### SSL variables end ##################### +#################### RDMA variables start #################### +RDMA_LIBNAME=libvalkey_rdma +RDMA_PKGCONFNAME=$(LIB_DIR)/valkey_rdma.pc +RDMA_INSTALLNAME=install-rdma +RDMA_DYLIB_MINOR_NAME=$(RDMA_LIBNAME).$(DYLIBSUFFIX).$(LIBVALKEY_SONAME) +RDMA_DYLIB_MAJOR_NAME=$(RDMA_LIBNAME).$(DYLIBSUFFIX).$(LIBVALKEY_MAJOR) +RDMA_ROOT_DYLIB_NAME=$(RDMA_LIBNAME).$(DYLIBSUFFIX) +RDMA_DYLIBNAME=$(LIB_DIR)/$(RDMA_LIBNAME).$(DYLIBSUFFIX) +RDMA_STLIBNAME=$(LIB_DIR)/$(RDMA_LIBNAME).$(STLIBSUFFIX) +RDMA_DYLIB_MAKE_CMD=$(CC) $(OPTIMIZATION) $(PLATFORM_FLAGS) -shared -Wl,-soname,$(RDMA_DYLIB_MINOR_NAME) + +USE_RDMA?=0 + +ifeq ($(USE_RDMA),1) + RDMA_SOURCES = $(wildcard $(SRC_DIR)/*rdma*.c) + RDMA_OBJS = $(patsubst $(SRC_DIR)/%.c,$(OBJ_DIR)/%.o,$(RDMA_SOURCES)) + + RDMA_LDFLAGS+=-lrdmacm -libverbs + # This is required for test.c only + CFLAGS+=-DVALKEY_TEST_RDMA + RDMA_STLIB=$(RDMA_STLIBNAME) + RDMA_DYLIB=$(RDMA_DYLIBNAME) + RDMA_PKGCONF=$(RDMA_PKGCONFNAME) + RDMA_INSTALL=$(RDMA_INSTALLNAME) +else + RDMA_STLIB= + RDMA_DYLIB= + RDMA_PKGCONF= + RDMA_INSTALL= +endif +##################### RDMA variables end ##################### + # Platform-specific overrides uname_S := $(shell uname -s 2>/dev/null || echo not) @@ -174,6 +207,12 @@ $(SSL_DYLIBNAME): $(SSL_OBJS) $(SSL_STLIBNAME): $(SSL_OBJS) $(STLIB_MAKE_CMD) $(SSL_STLIBNAME) $(SSL_OBJS) +$(RDMA_DYLIBNAME): $(RDMA_OBJS) + $(RDMA_DYLIB_MAKE_CMD) $(DYLIB_PLUGIN) -o $(RDMA_DYLIBNAME) $(RDMA_OBJS) $(REAL_LDFLAGS) $(LDFLAGS) $(RDMA_LDFLAGS) + +$(RDMA_STLIBNAME): $(RDMA_OBJS) + $(STLIB_MAKE_CMD) $(RDMA_STLIBNAME) $(RDMA_OBJS) + $(OBJ_DIR)/%.o: $(SRC_DIR)/%.c | $(OBJ_DIR) $(CC) -std=c99 -pedantic $(REAL_CFLAGS) -I$(INCLUDE_DIR) -MMD -MP -c $< -o $@ @@ -181,7 +220,7 @@ $(OBJ_DIR)/%.o: $(TEST_DIR)/%.c | $(OBJ_DIR) $(CC) -std=c99 -pedantic $(REAL_CFLAGS) -I$(INCLUDE_DIR) -MMD -MP -c $< -o $@ $(TEST_DIR)/%: $(OBJ_DIR)/%.o $(STLIBNAME) - $(CC) -o $@ $< $(STLIBNAME) $(SSL_STLIB) $(LDFLAGS) $(TEST_LDFLAGS) + $(CC) -o $@ $< $(RDMA_STLIB) $(STLIBNAME) $(SSL_STLIB) $(LDFLAGS) $(TEST_LDFLAGS) $(OBJ_DIR): mkdir -p $(OBJ_DIR) @@ -189,15 +228,15 @@ $(OBJ_DIR): $(LIB_DIR): mkdir -p $(LIB_DIR) -dynamic: $(DYLIBNAME) $(SSL_DYLIB) +dynamic: $(DYLIBNAME) $(SSL_DYLIB) $(RDMA_DYLIB) -static: $(STLIBNAME) $(SSL_STLIB) +static: $(STLIBNAME) $(SSL_STLIB) $(RDMA_STLIB) -pkgconfig: $(PKGCONFNAME) $(SSL_PKGCONF) +pkgconfig: $(PKGCONFNAME) $(SSL_PKGCONF) $(RDMA_PKGCONF) -include $(OBJS:.o=.d) -TEST_LDFLAGS = $(SSL_LDFLAGS) +TEST_LDFLAGS = $(SSL_LDFLAGS) $(RDMA_LDFLAGS) ifeq ($(USE_SSL),1) TEST_LDFLAGS += -pthread endif @@ -232,6 +271,14 @@ $(SSL_PKGCONFNAME): $(SSL_PKGCONF_TEMPLATE) -e 's|@PROJECT_VERSION@|$(LIBVALKEY_SONAME)|g' \ $< > $@ +$(RDMA_PKGCONFNAME): $(RDMA_PKGCONF_TEMPLATE) + @echo "Generating $@ for pkgconfig..." + sed \ + -e 's|@CMAKE_INSTALL_PREFIX@|$(PREFIX)|g' \ + -e 's|@CMAKE_INSTALL_LIBDIR@|$(INSTALL_LIBRARY_PATH)|g' \ + -e 's|@PROJECT_VERSION@|$(LIBVALKEY_SONAME)|g' \ + $< > $@ + install: $(DYLIBNAME) $(STLIBNAME) $(PKGCONFNAME) $(SSL_INSTALL) mkdir -p $(INSTALL_INCLUDE_PATH)/adapters $(INSTALL_LIBRARY_PATH) $(INSTALL) $(HEADERS) $(INSTALL_INCLUDE_PATH) @@ -253,6 +300,16 @@ install-ssl: $(SSL_DYLIBNAME) $(SSL_STLIBNAME) $(SSL_PKGCONFNAME) mkdir -p $(INSTALL_PKGCONF_PATH) $(INSTALL) $(SSL_PKGCONFNAME) $(INSTALL_PKGCONF_PATH) +install-rdma: $(RDMA_DYLIBNAME) $(RDMA_STLIBNAME) $(RDMA_PKGCONFNAME) + mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_LIBRARY_PATH) + $(INSTALL) $(INCLUDE_DIR)/valkey_rdma.h $(INSTALL_INCLUDE_PATH) + $(INSTALL) $(RDMA_DYLIBNAME) $(INSTALL_LIBRARY_PATH)/$(RDMA_DYLIB_MINOR_NAME) + ln -sf $(RDMA_DYLIB_MINOR_NAME) $(INSTALL_LIBRARY_PATH)/$(RDMA_ROOT_DYLIB_NAME) + ln -sf $(RDMA_DYLIB_MINOR_NAME) $(INSTALL_LIBRARY_PATH)/$(RDMA_DYLIB_MAJOR_NAME) + $(INSTALL) $(RDMA_STLIBNAME) $(INSTALL_LIBRARY_PATH) + mkdir -p $(INSTALL_PKGCONF_PATH) + $(INSTALL) $(RDMA_PKGCONFNAME) $(INSTALL_PKGCONF_PATH) + 32bit: @echo "" @echo "WARNING: if this fails under Linux you probably need to install libc6-dev-i386" diff --git a/include/valkey/net.h b/include/valkey/net.h index 2ca73725..faeb09d4 100644 --- a/include/valkey/net.h +++ b/include/valkey/net.h @@ -38,16 +38,10 @@ #include "valkey.h" void valkeyNetClose(valkeyContext *c); -ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap); -ssize_t valkeyNetWrite(valkeyContext *c); int valkeyCheckSocketError(valkeyContext *c); -int valkeyContextSetTimeout(valkeyContext *c, const struct timeval tv); -int valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port, const struct timeval *timeout); -int valkeyContextConnectBindTcp(valkeyContext *c, const char *addr, int port, - const struct timeval *timeout, - const char *source_addr); -int valkeyContextConnectUnix(valkeyContext *c, const char *path, const struct timeval *timeout); +int valkeyTcpSetTimeout(valkeyContext *c, const struct timeval tv); +int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options); int valkeyKeepAlive(valkeyContext *c, int interval); int valkeyCheckConnectDone(valkeyContext *c, int *completed); diff --git a/include/valkey/valkey.h b/include/valkey/valkey.h index 8d41bba5..0658a417 100644 --- a/include/valkey/valkey.h +++ b/include/valkey/valkey.h @@ -147,7 +147,10 @@ void valkeyFreeSdsCommand(sds cmd); enum valkeyConnectionType { VALKEY_CONN_TCP, VALKEY_CONN_UNIX, - VALKEY_CONN_USERFD + VALKEY_CONN_USERFD, + VALKEY_CONN_RDMA, /* experimental, may be removed in any version */ + + VALKEY_CONN_MAX }; struct valkeySsl; @@ -239,6 +242,7 @@ typedef struct { } while(0) typedef struct valkeyContextFuncs { + int (*connect)(struct valkeyContext *, const valkeyOptions *); void (*close)(struct valkeyContext *); void (*free_privctx)(void *); void (*async_read)(struct valkeyAsyncContext *); @@ -250,6 +254,7 @@ typedef struct valkeyContextFuncs { * recoverable error, they should return 0. */ ssize_t (*read)(struct valkeyContext *, char *, size_t); ssize_t (*write)(struct valkeyContext *); + int (*set_timeout)(struct valkeyContext *, const struct timeval); } valkeyContextFuncs; diff --git a/include/valkey/valkey_rdma.h b/include/valkey/valkey_rdma.h new file mode 100644 index 00000000..3e6fa220 --- /dev/null +++ b/include/valkey/valkey_rdma.h @@ -0,0 +1,60 @@ + +/* + * Copyright (c) 2021-2024 zhenwei pi + * + * Valkey Over RDMA has been supported as experimental feature since Valkey-8.0. + * It's also supported as an experimental feature by libvalkey, + * It may be removed or changed in any version. + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef VALKEY_RDMA_H +#define VALKEY_RDMA_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Helper macros to initialize options for RDMA. + * It's ok to reuse TCP options. + */ +#define VALKEY_OPTIONS_SET_RDMA(opts, ip_, port_) do { \ + (opts)->type = VALKEY_CONN_RDMA; \ + (opts)->endpoint.tcp.ip = ip_; \ + (opts)->endpoint.tcp.port = port_; \ + } while(0) + + +int valkeyInitiateRdma(void); + +#ifdef __cplusplus +} +#endif + +#endif /* VALKEY_RDMA_H */ diff --git a/src/conn.c b/src/conn.c new file mode 100644 index 00000000..5146b124 --- /dev/null +++ b/src/conn.c @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024, zhenwei pi + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "valkey_private.h" + +#include + +static valkeyContextFuncs *valkeyContextFuncsArray[VALKEY_CONN_MAX]; + +int valkeyContextRegisterFuncs(valkeyContextFuncs *funcs, enum valkeyConnectionType type) { + assert(type < VALKEY_CONN_MAX); + assert(!valkeyContextFuncsArray[type]); + + valkeyContextFuncsArray[type] = funcs; + return VALKEY_OK; +} + +void valkeyContextSetFuncs(valkeyContext *c) { + static int initialized; + + if (!initialized) { + initialized = 1; + valkeyContextRegisterTcpFuncs(); + valkeyContextRegisterUnixFuncs(); + valkeyContextRegisterUserfdFuncs(); + } + + assert(c->connection_type < VALKEY_CONN_MAX); + assert(!c->funcs); + c->funcs = valkeyContextFuncsArray[c->connection_type]; + assert(c->funcs != NULL); +} diff --git a/src/net.c b/src/net.c index d27acba5..9c0f9038 100644 --- a/src/net.c +++ b/src/net.c @@ -43,6 +43,7 @@ #include #include +#include "async.h" #include "net.h" #include "sds.h" #include "sockcompat.h" @@ -56,7 +57,7 @@ void valkeyNetClose(valkeyContext *c) { } } -ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap) { +static ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap) { ssize_t nread = recv(c->fd, buf, bufcap, 0); if (nread == -1) { if ((errno == EWOULDBLOCK && !(c->flags & VALKEY_BLOCK)) || (errno == EINTR)) { @@ -78,7 +79,7 @@ ssize_t valkeyNetRead(valkeyContext *c, char *buf, size_t bufcap) { } } -ssize_t valkeyNetWrite(valkeyContext *c) { +static ssize_t valkeyNetWrite(valkeyContext *c) { ssize_t nwritten; nwritten = send(c->fd, c->obuf, sdslen(c->obuf), 0); @@ -354,14 +355,10 @@ int valkeyCheckSocketError(valkeyContext *c) { return VALKEY_OK; } -int valkeyContextSetTimeout(valkeyContext *c, const struct timeval tv) { +int valkeyTcpSetTimeout(valkeyContext *c, const struct timeval tv) { const void *to_ptr = &tv; size_t to_sz = sizeof(tv); - if (valkeyContextUpdateCommandTimeout(c, &tv) != VALKEY_OK) { - valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); - return VALKEY_ERR; - } if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,to_ptr,to_sz) == -1) { valkeySetErrorFromErrno(c,VALKEY_ERR_IO,"setsockopt(SO_RCVTIMEO)"); return VALKEY_ERR; @@ -373,9 +370,11 @@ int valkeyContextSetTimeout(valkeyContext *c, const struct timeval tv) { return VALKEY_OK; } -static int _valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port, - const struct timeval *timeout, - const char *source_addr) { +int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options) { + const struct timeval *timeout = options->connect_timeout; + const char *addr = options->endpoint.tcp.ip; + const char *source_addr = options->endpoint.tcp.source_addr; + int port = options->endpoint.tcp.port; valkeyFD s; int rv, n; char _port[6]; /* strlen("65535"); */ @@ -553,19 +552,10 @@ static int _valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port return rv; // Need to return VALKEY_OK if alright } -int valkeyContextConnectTcp(valkeyContext *c, const char *addr, int port, - const struct timeval *timeout) { - return _valkeyContextConnectTcp(c, addr, port, timeout, NULL); -} - -int valkeyContextConnectBindTcp(valkeyContext *c, const char *addr, int port, - const struct timeval *timeout, - const char *source_addr) { - return _valkeyContextConnectTcp(c, addr, port, timeout, source_addr); -} - -int valkeyContextConnectUnix(valkeyContext *c, const char *path, const struct timeval *timeout) { +static int valkeyContextConnectUnix(valkeyContext *c, const valkeyOptions *options) { #ifndef _WIN32 + const struct timeval *timeout = options->connect_timeout; + const char *path = options->endpoint.unix_socket; int blocking = (c->flags & VALKEY_BLOCK); struct sockaddr_un *sa; long timeout_msec = -1; @@ -630,3 +620,55 @@ int valkeyContextConnectUnix(valkeyContext *c, const char *path, const struct ti valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); return VALKEY_ERR; } + +static valkeyContextFuncs valkeyContextTcpFuncs = { + .connect = valkeyContextConnectTcp, + .close = valkeyNetClose, + .free_privctx = NULL, + .async_read = valkeyAsyncRead, + .async_write = valkeyAsyncWrite, + .read = valkeyNetRead, + .write = valkeyNetWrite, + .set_timeout = valkeyTcpSetTimeout +}; + +void valkeyContextRegisterTcpFuncs(void) { + valkeyContextRegisterFuncs(&valkeyContextTcpFuncs, VALKEY_CONN_TCP); +} + +static valkeyContextFuncs valkeyContextUnixFuncs = { + .connect = valkeyContextConnectUnix, + .close = valkeyNetClose, + .free_privctx = NULL, + .async_read = valkeyAsyncRead, + .async_write = valkeyAsyncWrite, + .read = valkeyNetRead, + .write = valkeyNetWrite, + .set_timeout = valkeyTcpSetTimeout +}; + +void valkeyContextRegisterUnixFuncs(void) { + valkeyContextRegisterFuncs(&valkeyContextUnixFuncs, VALKEY_CONN_UNIX); +} + +static int valkeyContextConnectUserfd(valkeyContext *c, const valkeyOptions *options) { + c->fd = options->endpoint.fd; + c->flags |= VALKEY_CONNECTED; + + return VALKEY_OK; +} + +static valkeyContextFuncs valkeyContextUserfdFuncs = { + .connect = valkeyContextConnectUserfd, + .close = valkeyNetClose, + .free_privctx = NULL, + .async_read = valkeyAsyncRead, + .async_write = valkeyAsyncWrite, + .read = valkeyNetRead, + .write = valkeyNetWrite, + .set_timeout = valkeyTcpSetTimeout +}; + +void valkeyContextRegisterUserfdFuncs(void) { + valkeyContextRegisterFuncs(&valkeyContextUserfdFuncs, VALKEY_CONN_USERFD); +} diff --git a/src/rdma.c b/src/rdma.c new file mode 100644 index 00000000..9d3f5891 --- /dev/null +++ b/src/rdma.c @@ -0,0 +1,998 @@ +/* + * Copyright (c) 2024, zhenwei pi + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifdef __linux__ /* currently RDMA is only supported on Linux */ + +#define _GNU_SOURCE +#include "valkey.h" +#include "async.h" +#include "vkutil.h" +#include "valkey_rdma.h" +#include "valkey_private.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static valkeyContextFuncs valkeyContextRdmaFuncs; + +typedef struct valkeyRdmaFeature { + /* defined as following Opcodes */ + uint16_t opcode; + /* select features */ + uint16_t select; + uint8_t rsvd[20]; + /* feature bits */ + uint64_t features; +} valkeyRdmaFeature; + +typedef struct valkeyRdmaKeepalive { + /* defined as following Opcodes */ + uint16_t opcode; + uint8_t rsvd[30]; +} valkeyRdmaKeepalive; + +typedef struct valkeyRdmaMemory { + /* defined as following Opcodes */ + uint16_t opcode; + uint8_t rsvd[14]; + /* address of a transfer buffer which is used to receive remote streaming data, + * aka 'RX buffer address'. The remote side should use this as 'TX buffer address' */ + uint64_t addr; + /* length of the 'RX buffer' */ + uint32_t length; + /* the RDMA remote key of 'RX buffer' */ + uint32_t key; +} valkeyRdmaMemory; + +typedef union valkeyRdmaCmd { + valkeyRdmaFeature feature; + valkeyRdmaKeepalive keepalive; + valkeyRdmaMemory memory; +} valkeyRdmaCmd; + +typedef enum valkeyRdmaOpcode { + GetServerFeature = 0, + SetClientFeature = 1, + Keepalive = 2, + RegisterXferMemory = 3, +} valkeyRdmaOpcode; + +#define VALKEY_RDMA_MAX_WQE 1024 +#define VALKEY_RDMA_DEFAULT_RX_LEN (1024*1024) +#define VALKEY_RDMA_INVALID_OPCODE 0xffff + +typedef struct RdmaContext { + struct rdma_cm_id *cm_id; + struct rdma_event_channel *cm_channel; + struct ibv_comp_channel *comp_channel; + struct ibv_cq *cq; + struct ibv_pd *pd; + + /* TX */ + char *tx_addr; + uint32_t tx_length; + uint32_t tx_offset; + uint32_t tx_key; + char *send_buf; + uint32_t send_length; + uint32_t send_ops; + struct ibv_mr *send_mr; + + /* RX */ + uint32_t rx_offset; + char *recv_buf; + unsigned int recv_length; + unsigned int recv_offset; + struct ibv_mr *recv_mr; + + /* CMD 0 ~ VALKEY_RDMA_MAX_WQE for recv buffer + * VALKEY_RDMA_MAX_WQE ~ 2 * VALKEY_RDMA_MAX_WQE -1 for send buffer */ + valkeyRdmaCmd *cmd_buf; + struct ibv_mr *cmd_mr; +} RdmaContext; + +static int valkeyRdmaCM(valkeyContext *c, long timeout); + +static int valkeyRdmaSetFdBlocking(valkeyContext *c, int fd, int blocking) { + int flags; + + if ((flags = fcntl(fd, F_GETFL)) == -1) { + valkeySetError(c, VALKEY_ERR_IO, "fcntl(F_GETFL)"); + return VALKEY_ERR; + } + + if (blocking) + flags &= ~O_NONBLOCK; + else + flags |= O_NONBLOCK; + + if (fcntl(fd, F_SETFL, flags) == -1) { + valkeySetError(c, VALKEY_ERR_IO, "fcntl(F_SETFL)"); + return VALKEY_ERR; + } + + return 0; +} + +static int rdmaPostRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd) { + struct ibv_sge sge; + size_t length = sizeof(valkeyRdmaCmd); + struct ibv_recv_wr recv_wr, *bad_wr; + + + sge.addr = (uint64_t)(uintptr_t)cmd; + sge.length = length; + sge.lkey = ctx->cmd_mr->lkey; + + recv_wr.wr_id = (uint64_t)cmd; + recv_wr.sg_list = &sge; + recv_wr.num_sge = 1; + recv_wr.next = NULL; + + if (ibv_post_recv(cm_id->qp, &recv_wr, &bad_wr)) { + return VALKEY_ERR; + } + + return VALKEY_OK; +} + +static void rdmaDestroyIoBuf(RdmaContext *ctx) +{ + if (ctx->recv_mr) { + ibv_dereg_mr(ctx->recv_mr); + ctx->recv_mr = NULL; + } + + vk_free(ctx->recv_buf); + ctx->recv_buf = NULL; + + if (ctx->send_mr) { + ibv_dereg_mr(ctx->send_mr); + ctx->send_mr = NULL; + } + + vk_free(ctx->send_buf); + ctx->send_buf = NULL; + + if (ctx->cmd_mr) { + ibv_dereg_mr(ctx->cmd_mr); + ctx->cmd_mr = NULL; + } + + vk_free(ctx->cmd_buf); + ctx->cmd_buf = NULL; +} + +static int rdmaSetupIoBuf(valkeyContext *c, RdmaContext *ctx, struct rdma_cm_id *cm_id) { + int access = IBV_ACCESS_LOCAL_WRITE; + size_t length = sizeof(valkeyRdmaCmd) * VALKEY_RDMA_MAX_WQE * 2; + valkeyRdmaCmd *cmd; + int i; + + /* setup CMD buf & MR */ + ctx->cmd_buf = vk_calloc(length, 1); + ctx->cmd_mr = ibv_reg_mr(ctx->pd, ctx->cmd_buf, length, access); + if (!ctx->cmd_mr) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: reg recv mr failed"); + goto destroy_iobuf; + } + + for (i = 0; i < VALKEY_RDMA_MAX_WQE; i++) { + cmd = ctx->cmd_buf + i; + + if (rdmaPostRecv(ctx, cm_id, cmd) == VALKEY_ERR) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: post recv failed"); + goto destroy_iobuf; + } + } + + for (i = VALKEY_RDMA_MAX_WQE; i < VALKEY_RDMA_MAX_WQE * 2; i++) { + cmd = ctx->cmd_buf + i; + cmd->keepalive.opcode = VALKEY_RDMA_INVALID_OPCODE; + } + + /* setup recv buf & MR */ + access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; + length = VALKEY_RDMA_DEFAULT_RX_LEN; + ctx->recv_buf = vk_calloc(length, 1); + ctx->recv_length = length; + ctx->recv_mr = ibv_reg_mr(ctx->pd, ctx->recv_buf, length, access); + if (!ctx->recv_mr) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: reg send mr failed"); + goto destroy_iobuf; + } + + return VALKEY_OK; + +destroy_iobuf: + rdmaDestroyIoBuf(ctx); + return VALKEY_ERR; +} + +static int rdmaAdjustSendbuf(valkeyContext *c, RdmaContext *ctx, unsigned int length) { + int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; + + if (length == ctx->send_length) { + return VALKEY_OK; + } + + /* try to free old MR & buffer */ + if (ctx->send_length) { + ibv_dereg_mr(ctx->send_mr); + vk_free(ctx->send_buf); + ctx->send_length = 0; + } + + /* create a new buffer & MR */ + ctx->send_buf = vk_calloc(length, 1); + ctx->send_length = length; + ctx->send_mr = ibv_reg_mr(ctx->pd, ctx->send_buf, length, access); + if (!ctx->send_mr) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: reg send buf mr failed"); + vk_free(ctx->send_buf); + ctx->send_buf = NULL; + ctx->send_length = 0; + return VALKEY_ERR; + } + + return VALKEY_OK; +} + + +static int rdmaSendCommand(valkeyContext *c, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd) { + RdmaContext *ctx = c->privctx; + struct ibv_send_wr send_wr, *bad_wr; + struct ibv_sge sge; + valkeyRdmaCmd *_cmd; + int i; + int ret; + + /* find an unused cmd buffer */ + for (i = VALKEY_RDMA_MAX_WQE; i < 2 * VALKEY_RDMA_MAX_WQE; i++) { + _cmd = ctx->cmd_buf + i; + if (_cmd->keepalive.opcode == VALKEY_RDMA_INVALID_OPCODE) { + break; + } + } + + if (i >= 2 * VALKEY_RDMA_MAX_WQE) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: no empty command buffers"); + return VALKEY_ERR; + } + + memcpy(_cmd, cmd, sizeof(valkeyRdmaCmd)); + sge.addr = (uint64_t)(uintptr_t)_cmd; + sge.length = sizeof(valkeyRdmaCmd); + sge.lkey = ctx->cmd_mr->lkey; + + send_wr.sg_list = &sge; + send_wr.num_sge = 1; + send_wr.wr_id = (uint64_t)_cmd; + send_wr.opcode = IBV_WR_SEND; + send_wr.send_flags = IBV_SEND_SIGNALED; + send_wr.next = NULL; + ret = ibv_post_send(cm_id->qp, &send_wr, &bad_wr); + if (ret) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: failed to send command buffers"); + return VALKEY_ERR; + } + + return VALKEY_OK; +} + +static int connRdmaRegisterRx(valkeyContext *c, struct rdma_cm_id *cm_id) { + RdmaContext *ctx = c->privctx; + valkeyRdmaCmd cmd = { 0 }; + + cmd.memory.opcode = htons(RegisterXferMemory); + cmd.memory.addr = htobe64((uint64_t)ctx->recv_buf); + cmd.memory.length = htonl(ctx->recv_length); + cmd.memory.key = htonl(ctx->recv_mr->rkey); + + ctx->rx_offset = 0; + ctx->recv_offset = 0; + + return rdmaSendCommand(c, cm_id, &cmd); +} + +static int connRdmaHandleRecv(valkeyContext *c, RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd, uint32_t byte_len) { + if (byte_len != sizeof(valkeyRdmaCmd)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: FATAL error, recv corrupted cmd"); + return VALKEY_ERR; + } + + switch (ntohs(cmd->keepalive.opcode)) { + case RegisterXferMemory: + ctx->tx_addr = (char *)be64toh(cmd->memory.addr); + ctx->tx_length = ntohl(cmd->memory.length); + ctx->tx_key = ntohl(cmd->memory.key); + ctx->tx_offset = 0; + rdmaAdjustSendbuf(c, ctx, ctx->tx_length); + break; + + case Keepalive: + break; + + default: + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: FATAL error, unknown cmd"); + return VALKEY_ERR; + } + + return rdmaPostRecv(ctx, cm_id, cmd); +} + +static int connRdmaHandleRecvImm(RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd, uint32_t byte_len) { + assert(byte_len + ctx->rx_offset <= ctx->recv_length); + ctx->rx_offset += byte_len; + + return rdmaPostRecv(ctx, cm_id, cmd); +} + +static int connRdmaHandleSend(valkeyRdmaCmd *cmd) { + /* mark this cmd has already sent */ + memset(cmd, 0x00, sizeof(*cmd)); + cmd->keepalive.opcode = VALKEY_RDMA_INVALID_OPCODE; + + return VALKEY_OK; +} + +static int connRdmaHandleWrite(VALKEY_UNUSED RdmaContext *ctx, uint32_t VALKEY_UNUSED byte_len) { + + return VALKEY_OK; +} + +static int connRdmaHandleCq(valkeyContext *c) { + RdmaContext *ctx = c->privctx; + struct rdma_cm_id *cm_id = ctx->cm_id; + struct ibv_cq *ev_cq = NULL; + void *ev_ctx = NULL; + struct ibv_wc wc = {0}; + valkeyRdmaCmd *cmd; + int ret; + + if (ibv_get_cq_event(ctx->comp_channel, &ev_cq, &ev_ctx) < 0) { + if (errno != EAGAIN) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: get cq event failed"); + return VALKEY_ERR; + } + } else if (ibv_req_notify_cq(ev_cq, 0)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: notify cq failed"); + return VALKEY_ERR; + } + +pollcq: + ret = ibv_poll_cq(ctx->cq, 1, &wc); + if (ret < 0) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: poll cq failed"); + return VALKEY_ERR; + } else if (ret == 0) { + return VALKEY_OK; + } + + ibv_ack_cq_events(ctx->cq, 1); + + if (wc.status != IBV_WC_SUCCESS) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: send/recv failed"); + return VALKEY_ERR; + } + + switch (wc.opcode) { + case IBV_WC_RECV: + cmd = (valkeyRdmaCmd *)(uintptr_t)wc.wr_id; + if (connRdmaHandleRecv(c, ctx, cm_id, cmd, wc.byte_len) == VALKEY_ERR) { + return VALKEY_ERR; + } + + break; + + case IBV_WC_RECV_RDMA_WITH_IMM: + cmd = (valkeyRdmaCmd *)(uintptr_t)wc.wr_id; + if (connRdmaHandleRecvImm(ctx, cm_id, cmd, ntohl(wc.imm_data)) == VALKEY_ERR) { + return VALKEY_ERR; + } + + break; + case IBV_WC_RDMA_WRITE: + if (connRdmaHandleWrite(ctx, wc.byte_len) == VALKEY_ERR) { + return VALKEY_ERR; + } + + break; + case IBV_WC_SEND: + cmd = (valkeyRdmaCmd *)(uintptr_t)wc.wr_id; + if (connRdmaHandleSend(cmd) == VALKEY_ERR) { + return VALKEY_ERR; + } + + break; + default: + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: unexpected opcode"); + return VALKEY_ERR; + } + + goto pollcq; + + return VALKEY_OK; +} + +/* There are two FD(s) in use: + * - fd of CM channel: handle CM event. Return error on Disconnected. + * - fd of completion channel: handle CQ event. + * Return OK on CQ event ready, then CQ event should be handled outside. + */ +static int valkeyRdmaPollCqCm(valkeyContext *c, long timed) { +#define VALKEY_RDMA_POLLFD_CM 0 +#define VALKEY_RDMA_POLLFD_CQ 1 +#define VALKEY_RDMA_POLLFD_MAX 2 + struct pollfd pfd[VALKEY_RDMA_POLLFD_MAX]; + RdmaContext *ctx = c->privctx; + long now = vk_msec_now(); + int ret; + + if (now >= timed) { + valkeySetError(c, VALKEY_ERR_IO, "RDMA: IO timeout"); + return VALKEY_ERR; + } + + /* pfd[0] for CM event */ + pfd[VALKEY_RDMA_POLLFD_CM].fd = ctx->cm_channel->fd; + pfd[VALKEY_RDMA_POLLFD_CM].events = POLLIN; + pfd[VALKEY_RDMA_POLLFD_CM].revents = 0; + + /* pfd[1] for CQ event */ + pfd[VALKEY_RDMA_POLLFD_CQ].fd = ctx->comp_channel->fd; + pfd[VALKEY_RDMA_POLLFD_CQ].events = POLLIN; + pfd[VALKEY_RDMA_POLLFD_CQ].revents = 0; + ret = poll(pfd, VALKEY_RDMA_POLLFD_MAX, timed - now); + if (ret < 0) { + valkeySetError(c, VALKEY_ERR_IO, "RDMA: Poll CQ/CM failed"); + return VALKEY_ERR; + } else if (ret == 0) { + valkeySetError(c, VALKEY_ERR_IO, "RDMA: IO timeout"); + return VALKEY_ERR; + } + + if (pfd[VALKEY_RDMA_POLLFD_CM].revents & POLLIN) { + valkeyRdmaCM(c, 0); + if (!(c->flags & VALKEY_CONNECTED)) { + valkeySetError(c, VALKEY_ERR_EOF, "Server closed the connection"); + return VALKEY_ERR; + } + } + + return VALKEY_OK; +} + +static ssize_t valkeyRdmaRead(valkeyContext *c, char *buf, size_t bufcap) { + RdmaContext *ctx = c->privctx; + struct rdma_cm_id *cm_id = ctx->cm_id; + long timed, end; + uint32_t toread, remained; + + if (valkeyCommandTimeoutMsec(c, &timed)) { + return VALKEY_ERR; + } + + end = vk_msec_now() + timed; + +pollcq: + /* try to poll a CQ first */ + if (connRdmaHandleCq(c) == VALKEY_ERR) { + return VALKEY_ERR; + } + + if (ctx->recv_offset < ctx->rx_offset) { + remained = ctx->rx_offset - ctx->recv_offset; + toread = valkeyMin(remained, bufcap); + + memcpy(buf, ctx->recv_buf + ctx->recv_offset, toread); + ctx->recv_offset += toread; + + if (ctx->recv_offset == ctx->recv_length) { + connRdmaRegisterRx(c, cm_id); + } + + return toread; + } + + if (valkeyRdmaPollCqCm(c, end) == VALKEY_OK) { + goto pollcq; + } else { + return VALKEY_ERR; + } +} + + +static size_t connRdmaSend(RdmaContext *ctx, struct rdma_cm_id *cm_id, const void *data, size_t data_len) { + struct ibv_send_wr send_wr, *bad_wr; + struct ibv_sge sge; + uint32_t off = ctx->tx_offset; + char *addr = ctx->send_buf + off; + char *remote_addr = ctx->tx_addr + off; + int ret; + + assert(data_len <= ctx->tx_length); + memcpy(addr, data, data_len); + + sge.addr = (uint64_t)(uintptr_t)addr; + sge.lkey = ctx->send_mr->lkey; + sge.length = data_len; + + send_wr.sg_list = &sge; + send_wr.num_sge = 1; + send_wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + send_wr.send_flags = (++ctx->send_ops % VALKEY_RDMA_MAX_WQE) ? 0 : IBV_SEND_SIGNALED; + send_wr.imm_data = htonl(data_len); + send_wr.wr.rdma.remote_addr = (uint64_t)(uintptr_t)remote_addr; + send_wr.wr.rdma.rkey = ctx->tx_key; + send_wr.next = NULL; + ret = ibv_post_send(cm_id->qp, &send_wr, &bad_wr); + if (ret) { + return VALKEY_ERR; + } + + ctx->tx_offset += data_len; + + return data_len; +} + +static ssize_t valkeyRdmaWrite(valkeyContext *c) { + RdmaContext *ctx = c->privctx; + struct rdma_cm_id *cm_id = ctx->cm_id; + size_t data_len = sdslen(c->obuf); + long timed, end; + uint32_t towrite, wrote = 0; + size_t ret; + + if (valkeyCommandTimeoutMsec(c, &timed)) { + return VALKEY_ERR; + } + + end = vk_msec_now() + timed; + +pollcq: + if (connRdmaHandleCq(c) == VALKEY_ERR) { + return VALKEY_ERR; + } + + assert(ctx->tx_offset <= ctx->tx_length); + if (ctx->tx_offset == ctx->tx_length) { + /* wait a new TX buffer */ + goto waitcq; + } + + towrite = valkeyMin(ctx->tx_length - ctx->tx_offset, data_len - wrote); + ret = connRdmaSend(ctx, cm_id, c->obuf + wrote, towrite); + if (ret == (size_t)VALKEY_ERR) { + return VALKEY_ERR; + } + + wrote += ret; + if (wrote == data_len) { + return data_len; + } + +waitcq: + if (valkeyRdmaPollCqCm(c, end) == VALKEY_OK) { + goto pollcq; + } else { + return VALKEY_ERR; + } +} + +/* RDMA has no POLLOUT event supported, so it could't work well with valkey async mechanism */ +static void valkeyRdmaAsyncRead(VALKEY_UNUSED valkeyAsyncContext *ac) { + assert("valkey async mechanism can't work with RDMA" == NULL); +} + +static void valkeyRdmaAsyncWrite(VALKEY_UNUSED valkeyAsyncContext *ac) { + assert("valkey async mechanism can't work with RDMA" == NULL); +} + +static void valkeyRdmaClose(valkeyContext *c) { + RdmaContext *ctx = c->privctx; + struct rdma_cm_id *cm_id; + + if (!ctx) { + return; /* connect failed? */ + } + + cm_id = ctx->cm_id; + connRdmaHandleCq(c); + rdma_disconnect(cm_id); + ibv_destroy_cq(ctx->cq); + rdmaDestroyIoBuf(ctx); + ibv_destroy_qp(cm_id->qp); + ibv_destroy_comp_channel(ctx->comp_channel); + ibv_dealloc_pd(ctx->pd); + rdma_destroy_id(cm_id); + + rdma_destroy_event_channel(ctx->cm_channel); +} + +static void valkeyRdmaFree(void *privctx) { + if (!privctx) + return; + + vk_free(privctx); +} + +static int valkeyRdmaConnect(valkeyContext *c, struct rdma_cm_id *cm_id) { + RdmaContext *ctx = c->privctx; + struct ibv_comp_channel *comp_channel = NULL; + struct ibv_cq *cq = NULL; + struct ibv_pd *pd = NULL; + struct ibv_qp_init_attr init_attr = {0}; + struct rdma_conn_param conn_param = {0}; + + pd = ibv_alloc_pd(cm_id->verbs); + if (!pd) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: alloc pd failed"); + goto error; + } + + comp_channel = ibv_create_comp_channel(cm_id->verbs); + if (!comp_channel) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: alloc comp channel failed"); + goto error; + } + + if (valkeyRdmaSetFdBlocking(c, comp_channel->fd, 0) != VALKEY_OK) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: set recv comp channel fd non-block failed"); + goto error; + } + + cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, ctx, comp_channel, 0); + if (!cq) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: create send cq failed"); + goto error; + } + + if (ibv_req_notify_cq(cq, 0)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: notify send cq failed"); + goto error; + } + + /* create qp with attr */ + init_attr.cap.max_send_wr = VALKEY_RDMA_MAX_WQE; + init_attr.cap.max_recv_wr = VALKEY_RDMA_MAX_WQE; + init_attr.cap.max_send_sge = 1; + init_attr.cap.max_recv_sge = 1; + init_attr.qp_type = IBV_QPT_RC; + init_attr.send_cq = cq; + init_attr.recv_cq = cq; + if (rdma_create_qp(cm_id, pd, &init_attr)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: create qp failed"); + goto error; + } + + ctx->cm_id = cm_id; + ctx->comp_channel = comp_channel; + ctx->cq = cq; + ctx->pd = pd; + + if (rdmaSetupIoBuf(c, ctx, cm_id) != VALKEY_OK) + goto free_qp; + + /* rdma connect with param */ + conn_param.responder_resources = 1; + conn_param.initiator_depth = 1; + conn_param.retry_count = 7; + conn_param.rnr_retry_count = 7; + if (rdma_connect(cm_id, &conn_param)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: connect failed"); + goto destroy_iobuf; + } + + return VALKEY_OK; + +destroy_iobuf: + rdmaDestroyIoBuf(ctx); +free_qp: + ibv_destroy_qp(cm_id->qp); +error: + if (cq) + ibv_destroy_cq(cq); + if (pd) + ibv_dealloc_pd(pd); + if (comp_channel) + ibv_destroy_comp_channel(comp_channel); + + return VALKEY_ERR; +} + +static int valkeyRdmaEstablished(valkeyContext *c, struct rdma_cm_id *cm_id) { + RdmaContext *ctx = c->privctx; + + /* it's time to tell redis we have already connected */ + c->flags |= VALKEY_CONNECTED; + c->funcs = &valkeyContextRdmaFuncs; + c->fd = ctx->comp_channel->fd; + + return connRdmaRegisterRx(c, cm_id); +} + +static int valkeyRdmaCM(valkeyContext *c, long timeout) { + RdmaContext *ctx = c->privctx; + struct rdma_cm_event *event; + char errorstr[128]; + int ret = VALKEY_ERR; + + while (rdma_get_cm_event(ctx->cm_channel, &event) == 0) { + switch (event->event) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + if (timeout < 0 || timeout > 100) + timeout = 100; /* at most 100ms to resolve route */ + ret = rdma_resolve_route(event->id, timeout); + if (ret) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: route resolve failed on"); + } + break; + case RDMA_CM_EVENT_ROUTE_RESOLVED: + ret = valkeyRdmaConnect(c, event->id); + break; + case RDMA_CM_EVENT_ESTABLISHED: + ret = valkeyRdmaEstablished(c, event->id); + break; + case RDMA_CM_EVENT_TIMEWAIT_EXIT: + ret = VALKEY_ERR; + valkeySetError(c, VALKEY_ERR_TIMEOUT, "RDMA: connect timeout"); + break; + case RDMA_CM_EVENT_ADDR_ERROR: + case RDMA_CM_EVENT_ROUTE_ERROR: + case RDMA_CM_EVENT_CONNECT_ERROR: + case RDMA_CM_EVENT_UNREACHABLE: + case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_DISCONNECTED: + c->flags &= ~VALKEY_CONNECTED; + break; + case RDMA_CM_EVENT_ADDR_CHANGE: + default: + snprintf(errorstr, sizeof(errorstr), "RDMA: connect failed - %s", rdma_event_str(event->event)); + valkeySetError(c, VALKEY_ERR_OTHER, errorstr); + ret = VALKEY_ERR; + break; + } + + rdma_ack_cm_event(event); + } + + return ret; +} + +static int valkeyRdmaWaitConn(valkeyContext *c, long timeout) { + struct pollfd pfd; + long now, end; + RdmaContext *ctx = c->privctx; + + assert (timeout >= 0); + end = vk_msec_now() + timeout; + + while (1) { + now = vk_msec_now(); + if (now >= end) { + break; + } + + pfd.fd = ctx->cm_channel->fd; + pfd.events = POLLIN; + pfd.revents = 0; + if (poll(&pfd, 1, end - now) < 0) { + return VALKEY_ERR; + } + + if (valkeyRdmaCM(c, end - now) == VALKEY_ERR) { + return VALKEY_ERR; + } + + if (c->flags & VALKEY_CONNECTED) { + return VALKEY_OK; + } + } + + return VALKEY_ERR; +} + +static int valkeyContextConnectRdma(valkeyContext *c, const valkeyOptions *options) { + const struct timeval *timeout = options->connect_timeout; + const char *addr = options->endpoint.tcp.ip; + int port = options->endpoint.tcp.port; + int ret; + char _port[6]; /* strlen("65535"); */ + struct addrinfo hints, *servinfo, *p; + long timeout_msec = -1; + struct rdma_event_channel *cm_channel = NULL; + struct rdma_cm_id *cm_id = NULL; + RdmaContext *ctx = NULL; + struct sockaddr_storage saddr; + long start = vk_msec_now(), timed; + + servinfo = NULL; + c->connection_type = VALKEY_CONN_RDMA; + c->tcp.port = port; + c->flags &= ~VALKEY_CONNECTED; + + if (port < 0 || port > UINT16_MAX) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: Port number must be between 0-65535"); + return VALKEY_ERR; + } + + if (c->tcp.host != addr) { + vk_free(c->tcp.host); + + c->tcp.host = vk_strdup(addr); + if (c->tcp.host == NULL) { + valkeySetError(c, VALKEY_ERR_OOM, "RDMA: Out of memory"); + return VALKEY_ERR; + } + } + + if (timeout) { + if (valkeyContextUpdateConnectTimeout(c, timeout) == VALKEY_ERR) { + return VALKEY_ERR; + } + } else { + vk_free(c->connect_timeout); + c->connect_timeout = NULL; + } + + if (valkeyConnectTimeoutMsec(c, &timeout_msec) != VALKEY_OK) { + return VALKEY_ERR; + } else if (timeout_msec == -1) { + timeout_msec = INT_MAX; + } + + snprintf(_port, 6, "%d", port); + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + if ((ret = getaddrinfo(c->tcp.host, _port, &hints, &servinfo)) != 0) { + hints.ai_family = AF_INET6; + if ((ret = getaddrinfo(addr, _port, &hints, &servinfo)) != 0) { + valkeySetError(c, VALKEY_ERR_OTHER, gai_strerror(ret)); + return VALKEY_ERR; + } + } + + ctx = vk_calloc(sizeof(RdmaContext), 1); + if (!ctx) { + valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); + goto error; + } + + c->privctx = ctx; + + cm_channel = rdma_create_event_channel(); + if (!cm_channel) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: create event channel failed"); + goto error; + } + + ctx->cm_channel = cm_channel; + + if (rdma_create_id(cm_channel, &cm_id, (void *)ctx, RDMA_PS_TCP)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: create id failed"); + return VALKEY_ERR; + } + ctx->cm_id = cm_id; + + if ((valkeyRdmaSetFdBlocking(c, cm_channel->fd, 0) != VALKEY_OK)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: set cm channel fd non-block failed"); + goto free_rdma; + } + + for (p = servinfo; p != NULL; p = p->ai_next) { + if (p->ai_family == PF_INET) { + memcpy(&saddr, p->ai_addr, sizeof(struct sockaddr_in)); + ((struct sockaddr_in *)&saddr)->sin_port = htons(port); + } else if (p->ai_family == PF_INET6) { + memcpy(&saddr, p->ai_addr, sizeof(struct sockaddr_in6)); + ((struct sockaddr_in6 *)&saddr)->sin6_port = htons(port); + } else { + valkeySetError(c, VALKEY_ERR_PROTOCOL, "RDMA: unsupported family"); + goto free_rdma; + } + + /* resolve addr as most 100ms */ + if (rdma_resolve_addr(cm_id, NULL, (struct sockaddr *)&saddr, 100)) { + continue; + } + + timed = timeout_msec - (vk_msec_now() - start); + if ((valkeyRdmaWaitConn(c, timed) == VALKEY_OK) && (c->flags & VALKEY_CONNECTED)) { + ret = VALKEY_OK; + goto end; + } + } + + if ((!c->err) && (p == NULL)) { + valkeySetError(c, VALKEY_ERR_OTHER, "RDMA: resolve failed"); + } + +free_rdma: + if (cm_id) { + rdma_destroy_id(cm_id); + } + if (cm_channel) { + rdma_destroy_event_channel(cm_channel); + } + +error: + ret = VALKEY_ERR; + if (ctx) { + vk_free(ctx); + } + +end: + if(servinfo) { + freeaddrinfo(servinfo); + } + + return ret; +} + +/* tv has already been updated into @c successfully, do nothing here */ +static int valkeyRdmaSetTimeout(VALKEY_UNUSED valkeyContext *c, VALKEY_UNUSED const struct timeval tv) { + return VALKEY_OK; +} + +static valkeyContextFuncs valkeyContextRdmaFuncs = { + .connect = valkeyContextConnectRdma, + .close = valkeyRdmaClose, + .free_privctx = valkeyRdmaFree, + .async_read = valkeyRdmaAsyncRead, + .async_write = valkeyRdmaAsyncWrite, + .read = valkeyRdmaRead, + .write = valkeyRdmaWrite, + .set_timeout = valkeyRdmaSetTimeout +}; + +int valkeyInitiateRdma(void) { + valkeyContextRegisterFuncs(&valkeyContextRdmaFuncs, VALKEY_CONN_RDMA); + + return VALKEY_OK; +} + +#else /* __linux__ */ + +#error "BUILD ERROR: RDMA is only supported on linux" + +#endif /* __linux__ */ diff --git a/src/ssl.c b/src/ssl.c index 31f82431..904c8f41 100644 --- a/src/ssl.c +++ b/src/ssl.c @@ -609,11 +609,13 @@ static void valkeySSLAsyncWrite(valkeyAsyncContext *ac) { } static valkeyContextFuncs valkeyContextSSLFuncs = { + .connect = valkeyContextConnectTcp, .close = valkeyNetClose, .free_privctx = valkeySSLFree, .async_read = valkeySSLAsyncRead, .async_write = valkeySSLAsyncWrite, .read = valkeySSLRead, - .write = valkeySSLWrite + .write = valkeySSLWrite, + .set_timeout = valkeyTcpSetTimeout }; diff --git a/src/valkey.c b/src/valkey.c index 0eb31dcf..2119bb5c 100644 --- a/src/valkey.c +++ b/src/valkey.c @@ -42,18 +42,8 @@ #include "valkey_private.h" #include "net.h" #include "sds.h" -#include "async.h" #include "win32.h" -static valkeyContextFuncs valkeyContextDefaultFuncs = { - .close = valkeyNetClose, - .free_privctx = NULL, - .async_read = valkeyAsyncRead, - .async_write = valkeyAsyncWrite, - .read = valkeyNetRead, - .write = valkeyNetWrite -}; - static valkeyReply *createReplyObject(int type); static void *createStringObject(const valkeyReadTask *task, char *str, size_t len); static void *createArrayObject(const valkeyReadTask *task, size_t elements); @@ -718,8 +708,6 @@ static valkeyContext *valkeyContextInit(void) { if (c == NULL) return NULL; - c->funcs = &valkeyContextDefaultFuncs; - c->obuf = sdsempty(); c->reader = valkeyReaderCreate(); c->fd = VALKEY_INVALID_FD; @@ -767,18 +755,20 @@ valkeyFD valkeyFreeKeepFd(valkeyContext *c) { } int valkeyReconnect(valkeyContext *c) { + valkeyOptions options = { .connect_timeout = c->connect_timeout }; + c->err = 0; memset(c->errstr, '\0', strlen(c->errstr)); + if (c->funcs && c->funcs->close) { + c->funcs->close(c); + } + if (c->privctx && c->funcs->free_privctx) { c->funcs->free_privctx(c->privctx); c->privctx = NULL; } - if (c->funcs && c->funcs->close) { - c->funcs->close(c); - } - sdsfree(c->obuf); valkeyReaderFree(c->reader); @@ -790,28 +780,43 @@ int valkeyReconnect(valkeyContext *c) { return VALKEY_ERR; } - int ret = VALKEY_ERR; - if (c->connection_type == VALKEY_CONN_TCP) { - ret = valkeyContextConnectBindTcp(c, c->tcp.host, c->tcp.port, - c->connect_timeout, c->tcp.source_addr); - } else if (c->connection_type == VALKEY_CONN_UNIX) { - ret = valkeyContextConnectUnix(c, c->unix_sock.path, c->connect_timeout); - } else { + switch (c->connection_type) { + case VALKEY_CONN_TCP: + options.endpoint.tcp.source_addr = c->tcp.source_addr; + /* FALLTHRU */ + case VALKEY_CONN_RDMA: + options.endpoint.tcp.ip = c->tcp.host; + options.endpoint.tcp.port = c->tcp.port; + break; + case VALKEY_CONN_UNIX: + options.endpoint.unix_socket = c->unix_sock.path; + break; + default: /* Something bad happened here and shouldn't have. There isn't enough information in the context to reconnect. */ valkeySetError(c,VALKEY_ERR_OTHER,"Not enough information to reconnect"); - ret = VALKEY_ERR; + return VALKEY_ERR; + } + + if (c->funcs->connect(c, &options) != VALKEY_OK) { + return VALKEY_ERR; } if (c->command_timeout != NULL && (c->flags & VALKEY_BLOCK) && c->fd != VALKEY_INVALID_FD) { - valkeyContextSetTimeout(c, *c->command_timeout); + c->funcs->set_timeout(c, *c->command_timeout); } - return ret; + return VALKEY_OK; } valkeyContext *valkeyConnectWithOptions(const valkeyOptions *options) { - valkeyContext *c = valkeyContextInit(); + valkeyContext *c; + + if (options->type >= VALKEY_CONN_MAX) { + return NULL; + } + + c = valkeyContextInit(); if (c == NULL) { return NULL; } @@ -850,25 +855,13 @@ valkeyContext *valkeyConnectWithOptions(const valkeyOptions *options) { return c; } - if (options->type == VALKEY_CONN_TCP) { - valkeyContextConnectBindTcp(c, options->endpoint.tcp.ip, - options->endpoint.tcp.port, options->connect_timeout, - options->endpoint.tcp.source_addr); - } else if (options->type == VALKEY_CONN_UNIX) { - valkeyContextConnectUnix(c, options->endpoint.unix_socket, - options->connect_timeout); - } else if (options->type == VALKEY_CONN_USERFD) { - c->fd = options->endpoint.fd; - c->flags |= VALKEY_CONNECTED; - } else { - valkeyFree(c); - return NULL; - } - + c->connection_type = options->type; + valkeyContextSetFuncs(c); + c->funcs->connect(c, options); if (c->err == 0 && c->fd != VALKEY_INVALID_FD && options->command_timeout != NULL && (c->flags & VALKEY_BLOCK)) { - valkeyContextSetTimeout(c, *options->command_timeout); + c->funcs->set_timeout(c, *options->command_timeout); } return c; @@ -944,9 +937,15 @@ valkeyContext *valkeyConnectFd(valkeyFD fd) { /* Set read/write timeout on a blocking socket. */ int valkeySetTimeout(valkeyContext *c, const struct timeval tv) { - if (c->flags & VALKEY_BLOCK) - return valkeyContextSetTimeout(c,tv); - return VALKEY_ERR; + if (!(c->flags & VALKEY_BLOCK)) + return VALKEY_ERR; + + if (valkeyContextUpdateCommandTimeout(c, &tv) != VALKEY_OK) { + valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); + return VALKEY_ERR; + } + + return c->funcs->set_timeout(c,tv); } int valkeyEnableKeepAliveWithInterval(valkeyContext *c, int interval) { diff --git a/src/valkey_private.h b/src/valkey_private.h index 078c5dea..cd493d83 100644 --- a/src/valkey_private.h +++ b/src/valkey_private.h @@ -122,4 +122,11 @@ static inline int valkeyContextUpdateCommandTimeout(valkeyContext *c, return VALKEY_OK; } +int valkeyContextRegisterFuncs(valkeyContextFuncs *funcs, enum valkeyConnectionType type); +void valkeyContextRegisterTcpFuncs(void); +void valkeyContextRegisterUnixFuncs(void); +void valkeyContextRegisterUserfdFuncs(void); + +void valkeyContextSetFuncs(valkeyContext *c); + #endif /* VALKEY_VK_PRIVATE_H */ diff --git a/src/vkutil.h b/src/vkutil.h index a2700dc3..fdaafe32 100644 --- a/src/vkutil.h +++ b/src/vkutil.h @@ -52,4 +52,8 @@ static inline int64_t vk_msec_now(void) { uint16_t crc16(const char *buf, int len); +static inline int valkeyMin(long long a, long long b) { + return (a < b) ? a : b; +} + #endif /* VALKEY_VKUTIL_H */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8a747a40..64768035 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -80,6 +80,11 @@ endif() if(SSL_LIBRARY) set_property(TEST client_test PROPERTY ENVIRONMENT "TEST_SSL=1") endif() +if(ENABLE_RDMA) + target_compile_definitions(client_test PUBLIC VALKEY_TEST_RDMA=1) + target_link_libraries(client_test valkey_rdma) + set_property(TEST client_test PROPERTY ENVIRONMENT "TEST_RDMA=1") +endif() # Add cluster tests if we have libevent if (LIBEVENT_LIBRARY) diff --git a/tests/client_test.c b/tests/client_test.c index 1a69dd06..56788064 100644 --- a/tests/client_test.c +++ b/tests/client_test.c @@ -25,6 +25,9 @@ #ifdef VALKEY_TEST_SSL #include "valkey_ssl.h" #endif +#ifdef VALKEY_TEST_RDMA +#include "valkey_rdma.h" +#endif #ifdef VALKEY_TEST_ASYNC #include "adapters/libevent.h" #include @@ -34,7 +37,8 @@ enum connection_type { CONN_TCP, CONN_UNIX, CONN_FD, - CONN_SSL + CONN_SSL, + CONN_RDMA }; struct config { @@ -57,6 +61,11 @@ struct config { const char *cert; const char *key; } ssl; + + struct { + const char *host; + /* int port; use the same port as TCP */ + } rdma; }; struct privdata { @@ -234,6 +243,12 @@ static valkeyContext *do_connect(struct config config) { c = valkeyConnect(config.ssl.host, config.ssl.port); } else if (config.type == CONN_UNIX) { c = valkeyConnectUnix(config.unix_sock.path); +#ifdef VALKEY_TEST_RDMA + } else if (config.type == CONN_RDMA) { + valkeyOptions options = {0}; + VALKEY_OPTIONS_SET_RDMA(&options, config.rdma.host, config.tcp.port); + c = valkeyConnectWithOptions(&options); +#endif } else if (config.type == CONN_FD) { /* Create a dummy connection just to get an fd to inherit */ valkeyContext *dummy_ctx = valkeyConnectUnix(config.unix_sock.path); @@ -1424,6 +1439,13 @@ static void test_invalid_timeout_errors(struct config config) { c = valkeyConnectWithTimeout(config.tcp.host, config.tcp.port, config.connect_timeout); } else if(config.type == CONN_UNIX) { c = valkeyConnectUnixWithTimeout(config.unix_sock.path, config.connect_timeout); +#ifdef VALKEY_TEST_RDMA + } else if(config.type == CONN_RDMA) { + valkeyOptions options = {0}; + VALKEY_OPTIONS_SET_RDMA(&options, config.tcp.host, config.tcp.port); + options.connect_timeout = &config.connect_timeout; + c = valkeyConnectWithOptions(&options); +#endif } else { valkeyTestPanic("Unknown connection type!"); } @@ -1440,6 +1462,13 @@ static void test_invalid_timeout_errors(struct config config) { c = valkeyConnectWithTimeout(config.tcp.host, config.tcp.port, config.connect_timeout); } else if(config.type == CONN_UNIX) { c = valkeyConnectUnixWithTimeout(config.unix_sock.path, config.connect_timeout); +#ifdef VALKEY_TEST_RDMA + } else if(config.type == CONN_RDMA) { + valkeyOptions options = {0}; + VALKEY_OPTIONS_SET_RDMA(&options, config.tcp.host, config.tcp.port); + options.connect_timeout = &config.connect_timeout; + c = valkeyConnectWithOptions(&options); +#endif } else { valkeyTestPanic("Unknown connection type!"); } @@ -2268,6 +2297,11 @@ int main(int argc, char **argv) { } else if (argc >= 2 && !strcmp(argv[0],"--ssl-key")) { argv++; argc--; cfg.ssl.key = argv[0]; +#endif +#ifdef VALKEY_TEST_RDMA + } else if (argc >= 1 && !strcmp(argv[0],"--rdma-addr")) { + argv++; argc--; + cfg.rdma.host = argv[0]; #endif } else { fprintf(stderr, "Invalid argument: %s\n", argv[0]); @@ -2340,6 +2374,22 @@ int main(int argc, char **argv) { } #endif +#ifdef VALKEY_TEST_RDMA + if (cfg.rdma.host) { + + valkeyInitiateRdma(); + printf("\nTesting against RDMA connection (%s:%d):\n", cfg.rdma.host, cfg.tcp.port); + cfg.type = CONN_RDMA; + + test_blocking_connection(cfg); + test_blocking_connection_timeouts(cfg); + test_blocking_io_errors(cfg); + test_invalid_timeout_errors(cfg); + test_append_formatted_commands(cfg); + if (throughput) test_throughput(cfg); + } +#endif + #ifdef VALKEY_TEST_ASYNC cfg.type = CONN_TCP; printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); diff --git a/tests/test.sh b/tests/test.sh index 8620b78b..e5021871 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -16,6 +16,8 @@ ENABLE_DEBUG_CMD= SSL_TEST_ARGS= SKIPS_ARG=${SKIPS_ARG:-} VALKEY_DOCKER=${VALKEY_DOCKER:-} +TEST_RDMA=${TEST_RDMA:-0} +RDMA_TEST_ARGS= check_executable "$VALKEY_SERVER" @@ -98,6 +100,15 @@ tls-key-file ${SSL_KEY} EOF fi +# if doing RDMA, add these +if [ "$TEST_RDMA" = "1" ]; then + cat >> ${CONF_FILE} <