Skip to content

Commit

Permalink
Fix the wrong woff when execute WAIT / WAITAOF in script (valkey-io#776)
Browse files Browse the repository at this point in the history
When executing the script, the client passed in is a fake
client, and its woff is always 0.

This results in woff always being 0 when executing wait/waitaof
in the script, and the command returns a wrong number.

---------

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin authored Jul 22, 2024
1 parent 6eb19cf commit 14e09e9
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
24 changes: 19 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4209,6 +4209,18 @@ void replicationRequestAckFromReplicas(void) {
server.get_ack_from_replicas = 1;
}

/* This function return client woff. If the script is currently running,
* returns the actual client woff */
long long getClientWriteOffset(client *c) {
if (scriptIsRunning()) {
/* If a script is currently running, the client passed in is a fake
* client, and its woff is always 0. */
serverAssert(scriptGetClient() == c);
c = scriptGetCaller();
}
return c->woff;
}

/* Return the number of replicas that already acknowledged the specified
* replication offset. */
int replicationCountAcksByOffset(long long offset) {
Expand Down Expand Up @@ -4248,7 +4260,7 @@ int replicationCountAOFAcksByOffset(long long offset) {
void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;
long long offset = getClientWriteOffset(c);

if (server.primary_host) {
addReplyError(
Expand All @@ -4262,7 +4274,7 @@ void waitCommand(client *c) {
if (getTimeoutFromObjectOrReply(c, c->argv[2], &timeout, UNIT_MILLISECONDS) != C_OK) return;

/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
ackreplicas = replicationCountAcksByOffset(offset);
if (ackreplicas >= numreplicas || c->flag.deny_blocking) {
addReplyLongLong(c, ackreplicas);
return;
Expand Down Expand Up @@ -4298,9 +4310,11 @@ void waitaofCommand(client *c) {
return;
}

long long offset = getClientWriteOffset(c);

/* First try without blocking at all. */
ackreplicas = replicationCountAOFAcksByOffset(c->woff);
acklocal = server.fsynced_reploff >= c->woff;
ackreplicas = replicationCountAOFAcksByOffset(offset);
acklocal = server.fsynced_reploff >= offset;
if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flag.deny_blocking) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, acklocal);
Expand All @@ -4310,7 +4324,7 @@ void waitaofCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from replicas. */
blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal);
blockClientForReplicaAck(c, timeout, offset, numreplicas, numlocal);

/* Make sure that the server will send an ACK request to all the replicas
* before returning to the event loop. */
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/scripting.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ start_server {tags {"scripting"}} {
} {0}

test {EVAL - Scripts do not block on waitaof} {
run_script {redis.call('incr', 'x') return redis.pcall('waitaof','0','1','0')} 0
run_script {return redis.pcall('waitaof','0','1','0')} 0
} {* 0}

test {EVAL - Scripts do not block on XREAD with BLOCK option} {
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/wait.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ start_server {} {
$rd close
$rd2 close
}

start_server {} {
test {Setup a new replica} {
r replicaof $master_host $master_port
wait_for_ofs_sync $master r
wait_for_ofs_sync $master $slave
}

test {WAIT in script will work} {
# Pause the old replica so it can not catch up the offset.
pause_process $slave_pid

# Primary set a new key and wait the new replica catch up the offset.
$master set foo bar
wait_for_ofs_sync $master r

# Wait for the new replica to report the acked offset to the primary.
# Because the old replica is paused, so the WAIT can only return 1.
# In an earlier version it returned 2, because the fake client's woff
# is always 0 so WAIT counted all the replicas.
wait_for_condition 50 100 {
[$master eval "return server.call('wait', '2', '0')" 0] eq 1
} else {
fail "WAIT in script does not work as expected."
}

resume_process $slave_pid
}
}
}}


Expand Down

0 comments on commit 14e09e9

Please sign in to comment.