Skip to content

Commit

Permalink
Merge pull request #506 from rabbitmq/candidate-rpc-fix
Browse files Browse the repository at this point in the history
Force pipelined rpcs for noop command.
  • Loading branch information
kjnilsson authored Feb 3, 2025
2 parents 77230dd + 143414b commit 2926d39
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 27 deletions.
46 changes: 28 additions & 18 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,16 @@ handle_leader({command, Cmd}, #{cfg := #cfg{id = Self,
Effects = append_error_reply(Cmd, Reason, Effects0),
{leader, State, Effects};
{ok, Idx, Term, State0, Effects00} ->
{State, _, Effects0} = make_pipelined_rpc_effects(State0, Effects00),
%% if the command is a noop command we should force it to
%% be pipelined to all followers
Force = case Cmd of
{noop, _, _} ->
true;
_ ->
false
end,
{State, _, Effects0} =
make_pipelined_rpc_effects(State0, Effects00, Force),
% check if a reply is required.
Effects = after_log_append_reply(Cmd, Idx, Term, Effects0),
{leader, State, Effects}
Expand Down Expand Up @@ -934,11 +943,9 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
[LogId, Term, NewVotes]),
case required_quorum(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
State = State1#{leader_id => Id},
PostElectionEffects = post_election_effects(State),
{leader, maps:without([votes], State),
PostElectionEffects ++ Effects};
State = initialise_peers(State0#{leader_id => Id}),
Effects = post_election_effects(State),
{leader, maps:without([votes], State), Effects};
_ ->
{candidate, State0#{votes => NewVotes}, []}
end;
Expand Down Expand Up @@ -1982,14 +1989,17 @@ evaluate_commit_index_follower(State, Effects) ->
%% when no leader is known
{follower, State, Effects}.

make_pipelined_rpc_effects(State, Effects) ->
make_pipelined_rpc_effects(State, Effects, false).

make_pipelined_rpc_effects(#{cfg := #cfg{id = Id,
max_append_entries_rpc_batch_size =
MaxBatchSize,
max_pipeline_count = MaxPipelineCount},
commit_index := CommitIndex,
log := Log,
cluster := Cluster} = State0,
Effects0) ->
Effects0, Force) ->
NextLogIdx = ra_log:next_index(Log),
%% TODO: refactor this please, why does make_rpc_effect need to take the
%% full state
Expand All @@ -2006,7 +2016,8 @@ make_pipelined_rpc_effects(#{cfg := #cfg{id = Id,
% check if the match index isn't too far behind the
% next index
NumInFlight = NextIdx - MatchIdx - 1,
case NumInFlight < MaxPipelineCount of
case NumInFlight < MaxPipelineCount orelse
Force of
true ->
%% use the last list of entries as a cache
%% for the next to potentially avoid additional reads
Expand Down Expand Up @@ -2686,7 +2697,8 @@ append_self(Self, Nodes) ->
initialise_peers(State = #{log := Log, cluster := Cluster0}) ->
NextIdx = ra_log:next_index(Log),
Cluster = maps:map(fun (_, Peer0) ->
Peer1 = maps:with([voter_status], Peer0),
Peer1 = maps:with([voter_status,
machine_version], Peer0),
Peer2 = Peer1#{next_index => NextIdx},
new_peer_with(Peer2)
end, Cluster0),
Expand Down Expand Up @@ -3461,10 +3473,9 @@ after_log_append_reply(Cmd, Idx, Term, Effects0) ->
Effects0
end.

post_election_effects(
#{cfg := #cfg{effective_machine_version = EffectiveMacVer,
machine = Mac,
system_config = SystemConfig}} = State) ->
post_election_effects(#{cfg := #cfg{effective_machine_version = EffectiveMacVer,
machine = Mac,
system_config = SystemConfig}} = State) ->
Peers = peers(State),
PeerIds = maps:keys(Peers),

Expand Down Expand Up @@ -3510,8 +3521,8 @@ info_rpc_effects(#{cfg := #cfg{id = Id}, cluster := Cluster} = State) ->
end, [], Cluster),
InfoRpcEffects.

info_rpc_effects_for_peer(
#{cluster := Cluster, current_term := CurTerm} = State, PeerId) ->
info_rpc_effects_for_peer(#{cluster := Cluster,
current_term := CurTerm} = State, PeerId) ->
%% We determine if we need to ask (for the fist time or again) the info
%% from a peer.
SendRpc = case Cluster of
Expand Down Expand Up @@ -3571,9 +3582,8 @@ ra_server_info(State, Keys) ->
end, Info0),
Info1.

handle_info_reply(
#{cluster := Cluster} = State,
#info_reply{from = PeerId, keys = Keys, info = Info}) ->
handle_info_reply(#{cluster := Cluster} = State,
#info_reply{from = PeerId, keys = Keys, info = Info}) ->
PeerState0 = maps:get(PeerId, Cluster),
PeerState1 = lists:foldl(fun(Key, PS) ->
case Info of
Expand Down
2 changes: 1 addition & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ handle_effects(RaftState, Effects0, EvtType, State0, Actions0) ->
end, {State0, Actions0}, Effects0),
{State, lists:reverse(Actions)}.

handle_effect(leader, {send_rpc, To, Rpc}, _,
handle_effect(_RaftState, {send_rpc, To, Rpc}, _,
#state{conf = Conf} = State0, Actions) ->
% fully qualified use only so that we can mock it for testing
% TODO: review / refactor to remove the mod call here
Expand Down
21 changes: 13 additions & 8 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1873,19 +1873,24 @@ candidate_election(_Config) ->
{leader, #{cluster := #{N2 := PeerState,
N3 := PeerState,
N4 := PeerState,
N5 := PeerState}},
N5 := PeerState}} = State2,
[
{next_event, cast, {command, {noop, _, EffectiveMacVer}}},
{next_event, cast, {command, {noop, _, EffectiveMacVer}} = Noop},
{send_rpc, N2, {info_rpc, _, _, _}},
{send_rpc, N3, {info_rpc, _, _, _}},
{send_rpc, N4, {info_rpc, _, _, _}},
{send_rpc, N5, {info_rpc, _, _, _}},
{send_rpc, N5, {info_rpc, _, _, _}}

{send_rpc, _, _},
{send_rpc, _, _},
{send_rpc, _, _},
{send_rpc, _, _}
]} = ra_server:handle_candidate(Reply, State1).
]} = ra_server:handle_candidate(Reply, State1),

{leader, _,
[
{send_rpc, N5, #append_entries_rpc{}},
{send_rpc, N4, _},
{send_rpc, N3, _},
{send_rpc, N2, _}
]} = ra_server:handle_leader(Noop, State2),
ok.

pre_vote_election(_Config) ->
Token = make_ref(),
Expand Down

0 comments on commit 2926d39

Please sign in to comment.