Skip to content

Commit

Permalink
Merge branch 'unstable' into feature-commandlog
Browse files Browse the repository at this point in the history
  • Loading branch information
soloestoy committed Jan 20, 2025
2 parents c86bc80 + 3032ccd commit ac79536
Show file tree
Hide file tree
Showing 58 changed files with 1,932 additions and 1,255 deletions.
1 change: 1 addition & 0 deletions .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Clang Format Check

on:
push:
pull_request:
paths:
- 'src/**'
Expand Down
1 change: 1 addition & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/script_lua.c
${CMAKE_SOURCE_DIR}/src/script.c
${CMAKE_SOURCE_DIR}/src/functions.c
${CMAKE_SOURCE_DIR}/src/scripting_engine.c
${CMAKE_SOURCE_DIR}/src/function_lua.c
${CMAKE_SOURCE_DIR}/src/commands.c
${CMAKE_SOURCE_DIR}/src/strl.c
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ else
endef
endif

# Determine install/uninstall Redis symlinks for compatibility when
# Determine install/uninstall Redis symlinks for compatibility when
# installing/uninstalling Valkey binaries (defaulting to `yes`)
USE_REDIS_SYMLINKS?=yes
ifeq ($(USE_REDIS_SYMLINKS),yes)
Expand Down Expand Up @@ -416,7 +416,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
6 changes: 3 additions & 3 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
/* Write either the key or the value of the currently selected item of a hash.
* The 'hi' argument passes a valid hash iterator.
* The 'what' filed specifies if to write a key or a value and can be
* either OBJ_HASH_KEY or OBJ_HASH_VALUE.
* either OBJ_HASH_FIELD or OBJ_HASH_VALUE.
*
* The function returns 0 on error, non-zero on success. */
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
Expand All @@ -1936,7 +1936,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
return rioWriteBulkString(r, (char *)vstr, vlen);
else
return rioWriteBulkLongLong(r, vll);
} else if (hi->encoding == OBJ_ENCODING_HT) {
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
sds value = hashTypeCurrentFromHashTable(hi, what);
return rioWriteBulkString(r, value, sdslen(value));
}
Expand All @@ -1963,7 +1963,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
}
}

if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_FIELD) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
hashTypeResetIterator(&hi);
return 0;
}
Expand Down
41 changes: 24 additions & 17 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2936,6 +2936,10 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (n && n != myself && !(nodeIsReplica(myself) && myself->replicaof == n)) {
sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN);
dictEntry *de = dictAddOrFind(server.cluster->nodes_black_list, id);
if (dictGetKey(de) != id) {
/* The dict did not take ownership of the id string, so we need to free it. */
sdsfree(id);
}
uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl);
dictSetUnsignedIntegerVal(de, expire);
clusterDelNode(n);
Expand Down Expand Up @@ -3278,8 +3282,8 @@ int clusterProcessPacket(clusterLink *link) {
/* Unable to retrieve the node's IP address from the connection. Without a
* valid IP, the node becomes unusable in the cluster. This failure might be
* due to the connection being closed. */
serverLog(LL_NOTICE, "Closing link even though we received a MEET packet on it, "
"because the connection has an error");
serverLog(LL_NOTICE, "Closing cluster link due to failure to retrieve IP from the connection, "
"possibly caused by a closed connection.");
freeClusterLink(link);
return 0;
}
Expand All @@ -3302,14 +3306,14 @@ int clusterProcessPacket(clusterLink *link) {
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else {
/* A second MEET packet was received on an existing link during the handshake process.
* This happens when the other node detects no inbound link, and re-sends a MEET packet
* before this node can respond with a PING. This MEET is a no-op.
/* A second MEET packet was received on an existing link during the handshake
* process. This happens when the other node detects no inbound link, and
* re-sends a MEET packet before this node can respond with a PING.
* This MEET is a no-op.
*
* Note: Nodes in HANDSHAKE state are not fully "known" (random names), so the sender
* remains unidentified at this point. The MEET packet might be re-sent if the inbound
* connection is still unestablished by the next cron cycle.
*/
* Note: Nodes in HANDSHAKE state are not fully "known" (random names), so the
* sender remains unidentified at this point. The MEET packet might be re-sent
* if the inbound connection is still unestablished by the next cron cycle. */
debugServerAssert(link->inbound && nodeInHandshake(link->node));
}

Expand All @@ -3318,16 +3322,19 @@ int clusterProcessPacket(clusterLink *link) {
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) {
/* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not
* know it.
* Free my outbound link to that node, triggering a reconnect and a PING over the new link.
* Once that node receives our PING, it should recognize the new connection as an inbound link from me.
* We should only free the outbound link if the node is known for more time than the handshake timeout,
* since during this time, the other side might still be trying to complete the handshake. */
/* The MEET packet is from a known node, after the handshake timeout, so the sender
* thinks that I do not know it.
* Free my outbound link to that node, triggering a reconnect and a PING over the
* new link.
* Once that node receives our PING, it should recognize the new connection as an
* inbound link from me. We should only free the outbound link if the node is known
* for more time than the handshake timeout, since during this time, the other side
* might still be trying to complete the handshake. */

/* We should always receive a MEET packet on an inbound link. */
serverAssert(link != sender->link);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet from this known node",
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet "
"from this known node",
sender->name, sender->human_nodename);
freeClusterLink(sender->link);
}
Expand Down Expand Up @@ -3920,7 +3927,7 @@ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
server.stat_cluster_links_memory += sizeof(listNode);

/* Populate sent messages stats. */
uint16_t type = ntohs(getMessageFromSendBlock(msgblock)->type);
uint16_t type = ntohs(getMessageFromSendBlock(msgblock)->type) & ~CLUSTERMSG_MODIFIER_MASK;
if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_sent[type]++;
}

Expand Down
Loading

0 comments on commit ac79536

Please sign in to comment.