Skip to content

Commit

Permalink
fix: don't query current/previous VDF session directly. Add to queue.
Browse files Browse the repository at this point in the history
When the node is unable to validate a block due to a gap in the vdf
steps it triggers a request to query the current and previous vdf
sessions from its vdf peers. This operation can be slow. Previously
the node would keep queueing up these requests and it could overrun
the process.

Now we set a flag and the session queries are handled durnig the next
`pull` iteration. This flag can be set dozens of times beween pull
iterations and it will still only trigger a single set of request.
  • Loading branch information
JamesPiechota committed Feb 23, 2025
1 parent bb4d23d commit 74a3670
Showing 1 changed file with 53 additions and 41 deletions.
94 changes: 53 additions & 41 deletions apps/arweave/src/ar_nonce_limiter_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

-record(state, {
remote_servers,
latest_session_keys = #{}
latest_session_keys = #{},
%% request_sessions is set to true when the node is unable to validate a block due
%% to a gap in its cached step numbers. When true, the node will query the full
%% session and previous session from a VDF server.
request_sessions = false
}).

-define(PULL_FREQUENCY_MS, 800).
Expand Down Expand Up @@ -82,11 +86,15 @@ handle_cast(pull, State) ->
session = #vdf_session{
step_number = SessionStepNumber } } = Update,
State2 = update_latest_session_key(Peer, SessionKey, State),
case ar_nonce_limiter:apply_external_update(Update, Peer) of
ok ->
ar_util:cast_after(?PULL_FREQUENCY_MS, ?MODULE, pull),
{noreply, State2};
#nonce_limiter_update_response{ session_found = false } ->
UpdateResponse =
ar_nonce_limiter:apply_external_update(Update, Peer),
RequestSessions = (
State2#state.request_sessions == true orelse
UpdateResponse#nonce_limiter_update_response.session_found == false
),

case RequestSessions of
true ->
case fetch_and_apply_session_and_previous_session(Peer) of
{error, _} ->
gen_server:cast(?MODULE, pull),
Expand All @@ -95,36 +103,42 @@ handle_cast(pull, State) ->
_ ->
ar_util:cast_after(?PULL_FREQUENCY_MS,
?MODULE, pull),
{noreply, State2}
{noreply, State2#state{ request_sessions = false }}
end;
#nonce_limiter_update_response{ step_number = StepNumber }
when StepNumber > SessionStepNumber ->
%% We are ahead of the server - may be, it is not
%% the fastest server in the list so try another one,
%% if there are more servers in the configuration
%% and they are not on timeout.
gen_server:cast(?MODULE, pull),
{noreply, State2#state{
remote_servers = RotatedServers }};
#nonce_limiter_update_response{ step_number = StepNumber }
when StepNumber == SessionStepNumber ->
%% We are in sync with the server. Re-try soon.
ar_util:cast_after(?NO_UPDATE_PULL_FREQUENCY_MS,
?MODULE, pull),
{noreply, State2};
_ ->
%% We have received a partial session, but there's a gap
%% in the step numbers, e.g., the update we received is at
%% step 100, but our last seen step was 90.
case fetch_and_apply_session(Peer) of
{error, _} ->
false ->
case UpdateResponse of
ok ->
ar_util:cast_after(?PULL_FREQUENCY_MS, ?MODULE, pull),
{noreply, State2};
#nonce_limiter_update_response{ step_number = StepNumber }
when StepNumber > SessionStepNumber ->
%% We are ahead of the server - may be, it is not
%% the fastest server in the list so try another one,
%% if there are more servers in the configuration
%% and they are not on timeout.
gen_server:cast(?MODULE, pull),
{noreply, State2#state{
remote_servers = RotatedServers }};
_ ->
ar_util:cast_after(?PULL_FREQUENCY_MS,
#nonce_limiter_update_response{ step_number = StepNumber }
when StepNumber == SessionStepNumber ->
%% We are in sync with the server. Re-try soon.
ar_util:cast_after(?NO_UPDATE_PULL_FREQUENCY_MS,
?MODULE, pull),
{noreply, State2}
{noreply, State2};
_ ->
%% We have received a partial session, but there's a gap
%% in the step numbers, e.g., the update we received is at
%% step 100, but our last seen step was 90.
case fetch_and_apply_session(Peer) of
{error, _} ->
gen_server:cast(?MODULE, pull),
{noreply, State2#state{
remote_servers = RotatedServers }};
_ ->
ar_util:cast_after(?PULL_FREQUENCY_MS,
?MODULE, pull),
{noreply, State2}
end
end
end;
{error, not_found} ->
Expand All @@ -150,23 +164,21 @@ handle_cast({maybe_request_sessions, SessionKey}, State) ->
#state{ remote_servers = Q } = State,
{{value, {RawPeer, _Timestamp}}, Q2} = queue:out(Q),
Now = erlang:system_time(millisecond),
State2 = State#state{ remote_servers = queue:in({RawPeer, Now}, Q2) },
RotatedServers = queue:in({RawPeer, Now}, Q2),
case ar_peers:resolve_and_cache_peer(RawPeer, vdf_server_peer) of
{error, _} ->
%% Push the peer to the back of the queue.
{noreply, State2};
%% Push the peer to the back of the queue. We'll also wait and see if another
%% `maybe_request_sessions` message comes in before we fetch the full session.
{noreply, State#state{ remote_servers = RotatedServers }};
{ok, Peer} ->
case get_latest_session_key(Peer, State) of
SessionKey ->
%% No reason to make extra requests.
%% No reason to make extra requests. And don't rotate the peers.
{noreply, State};
_ ->
case fetch_and_apply_session_and_previous_session(Peer) of
{error, _} ->
{noreply, State2};
_ ->
{noreply, State}
end
%% Ensure the current and previous sessions are fetched and applied on
%% the next `pull` message.
{noreply, State#state{ request_sessions = true }}
end
end;

Expand Down

0 comments on commit 74a3670

Please sign in to comment.