diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 76b1121e4..afaa8aeb2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -221,7 +221,6 @@ jobs: ar_block, ar_block_cache, ar_chain_stats, - ar_chunk_cache, ar_chunk_copy, ar_chunk_storage, ar_data_sync_worker_master, @@ -236,6 +235,7 @@ jobs: ar_join, ar_kv, ar_merkle, + ar_mining_cache, ar_mining_server, ar_mining_stats, ar_node, diff --git a/apps/arweave/include/ar_chunk_cache.hrl b/apps/arweave/include/ar_chunk_cache.hrl deleted file mode 100644 index 2ef37f1b9..000000000 --- a/apps/arweave/include/ar_chunk_cache.hrl +++ /dev/null @@ -1,12 +0,0 @@ --record(ar_chunk_cache_group, { - chunk_cache = #{} :: #{term() => binary()}, - chunk_cache_size_bytes = 0 :: non_neg_integer(), - reserved_chunk_cache_bytes = 0 :: non_neg_integer() -}). - - --record(ar_chunk_cache, { - chunk_cache_groups = #{} :: #{term() => #ar_chunk_cache_group{}}, - chunk_cache_groups_queue = queue:new() :: queue:queue(), - chunk_cache_limit_bytes = 0 :: non_neg_integer() -}). diff --git a/apps/arweave/include/ar_mining_cache.hrl b/apps/arweave/include/ar_mining_cache.hrl new file mode 100644 index 000000000..0142d16c8 --- /dev/null +++ b/apps/arweave/include/ar_mining_cache.hrl @@ -0,0 +1,25 @@ +-ifndef(AR_MINING_CACHE_HRL). +-define(AR_MINING_CACHE_HRL, true). + +-record(ar_mining_cache_value, { + chunk1 :: binary() | undefined, + chunk2 :: binary() | undefined, + chunk2_missing :: boolean(), + h1 :: binary() | undefined, + h2 :: binary() | undefined +}). + +-record(ar_mining_cache_session, { + mining_cache = #{} :: #{term() => #ar_mining_cache_value{}}, + mining_cache_size_bytes = 0 :: non_neg_integer(), + reserved_mining_cache_bytes = 0 :: non_neg_integer() +}). + + +-record(ar_mining_cache, { + mining_cache_sessions = #{} :: #{term() => #ar_mining_cache_session{}}, + mining_cache_sessions_queue = queue:new() :: queue:queue(), + mining_cache_limit_bytes = 0 :: non_neg_integer() +}). + +-endif. diff --git a/apps/arweave/src/ar_chunk_cache.erl b/apps/arweave/src/ar_chunk_cache.erl deleted file mode 100644 index 2d067b3dc..000000000 --- a/apps/arweave/src/ar_chunk_cache.erl +++ /dev/null @@ -1,495 +0,0 @@ --module(ar_chunk_cache). --include_lib("arweave/include/ar_chunk_cache.hrl"). - --export([ - new/0, new/1, set_limit/2, cache_size/1, available_size/1, reserved_size/2, - add_group/2, reserve/3, drop_group/2, group_exists/2, get_groups/1, - add_chunk_to_existing_group/4, add_chunk/4, take_chunk/3, - drop_chunk/3, chunk_exists/3 -]). - - - --define(CACHE_GROUPS_LIMIT, 4). - - - --define(CACHE_VALUE(Chunk, Meta), {Chunk, Meta}). - - - -%%%=================================================================== -%%% Public API. -%%%=================================================================== - - - --spec new() -> - Cache :: #ar_chunk_cache{}. - -new() -> #ar_chunk_cache{chunk_cache_limit_bytes = 0}. - - - --spec new(Limit :: pos_integer()) -> - Cache :: #ar_chunk_cache{}. - -new(Limit) -> #ar_chunk_cache{chunk_cache_limit_bytes = Limit}. - - - --spec set_limit(Limit :: pos_integer(), Cache :: #ar_chunk_cache{}) -> - Cache :: #ar_chunk_cache{}. - -set_limit(Limit, Cache) -> - Cache#ar_chunk_cache{chunk_cache_limit_bytes = Limit}. - - - - --spec cache_size(Cache :: #ar_chunk_cache{}) -> - Size :: non_neg_integer(). - -cache_size(Cache) -> - maps:fold( - fun(_, #ar_chunk_cache_group{chunk_cache_size_bytes = Size, reserved_chunk_cache_bytes = ReservedSize}, Acc) -> - Acc + Size + ReservedSize - end, - 0, - Cache#ar_chunk_cache.chunk_cache_groups - ). - - - --spec available_size(Cache :: #ar_chunk_cache{}) -> - Size :: non_neg_integer(). - -available_size(Cache) -> - Cache#ar_chunk_cache.chunk_cache_limit_bytes - cache_size(Cache). - - - --spec reserved_size(GroupId :: term(), Cache0 :: #ar_chunk_cache{}) -> - {ok, Size :: non_neg_integer()} | {error, Reason :: term()}. - -reserved_size(GroupId, Cache0) -> - with_chunk_cache_group(GroupId, fun(#ar_chunk_cache_group{reserved_chunk_cache_bytes = ReservedSize}) -> {ok, ReservedSize} end, Cache0). - - - --spec add_group(GroupId :: term(), Cache0 :: #ar_chunk_cache{}) -> - Cache1 :: #ar_chunk_cache{}. -add_group(GroupId, Cache0) -> - case maps:is_key(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups) of - true -> Cache0; - false -> add_chunk_cache_group(GroupId, #ar_chunk_cache_group{}, Cache0) - end. - - - --spec reserve(GroupId :: term(), Size :: non_neg_integer(), Cache0 :: #ar_chunk_cache{}) -> - {ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}. - -reserve(GroupId, Size, Cache0) -> - case available_size(Cache0) < Size of - true -> {error, cache_limit_exceeded}; - false -> - map_chunk_cache_group(GroupId, fun(#ar_chunk_cache_group{reserved_chunk_cache_bytes = ReservedSize}) -> - {ok, #ar_chunk_cache_group{reserved_chunk_cache_bytes = ReservedSize + Size}} - end, Cache0) - end. - - - --spec drop_group(GroupId :: term(), Cache0 :: #ar_chunk_cache{}) -> - Cache1 :: #ar_chunk_cache{}. - -drop_group(GroupId, Cache0) -> - Cache0#ar_chunk_cache{ - chunk_cache_groups = maps:remove(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups), - chunk_cache_groups_queue = queue:filter( - fun(GroupId0) -> GroupId0 =/= GroupId end, - Cache0#ar_chunk_cache.chunk_cache_groups_queue - ) - }. - - - --spec group_exists(GroupId :: term(), Cache0 :: #ar_chunk_cache{}) -> - Exists :: boolean(). - -group_exists(GroupId, Cache0) -> - maps:is_key(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups). - - - --spec get_groups(Cache0 :: #ar_chunk_cache{}) -> - Groups :: [term()]. - -get_groups(Cache0) -> - queue:to_list(Cache0#ar_chunk_cache.chunk_cache_groups_queue). - - - --spec add_chunk_to_existing_group( - GroupId :: term(), - ChunkId :: term(), - Chunk :: binary() | ?CACHE_VALUE(binary(), #{}), - Cache0 :: #ar_chunk_cache{} -) -> - {ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}. - -add_chunk_to_existing_group(GroupId, ChunkId, ?CACHE_VALUE(Chunk, ChunkMeta), Cache0) -> - case (byte_size(Chunk) + cache_size(Cache0)) > Cache0#ar_chunk_cache.chunk_cache_limit_bytes of - true when Cache0#ar_chunk_cache.chunk_cache_limit_bytes =/= 0 -> {error, cache_limit_exceeded}; - _ -> map_chunk_cache_group(GroupId, add_chunk_map_fun(ChunkId, Chunk, ChunkMeta), Cache0) - end; - -add_chunk_to_existing_group(GroupId, ChunkId, Chunk, Cache0) -> - add_chunk_to_existing_group(GroupId, ChunkId, ?CACHE_VALUE(Chunk, #{}), Cache0). - - - --spec add_chunk( - GroupId :: term(), - ChunkId :: term(), - Chunk :: binary() | {binary(), #{}}, - Cache0 :: #ar_chunk_cache{} -) -> - {ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}. - -add_chunk(GroupId, ChunkId, ?CACHE_VALUE(Chunk, ChunkMeta), Cache0) -> - case (byte_size(Chunk) + cache_size(Cache0)) > Cache0#ar_chunk_cache.chunk_cache_limit_bytes of - true when Cache0#ar_chunk_cache.chunk_cache_limit_bytes =/= 0 -> {error, cache_limit_exceeded}; - _ -> map_chunk_cache_group(GroupId, add_chunk_map_fun(ChunkId, Chunk, ChunkMeta), Cache0, true) - end; - -add_chunk(GroupId, ChunkId, Chunk, Cache0) -> - add_chunk(GroupId, ChunkId, ?CACHE_VALUE(Chunk, #{}), Cache0). - - - --spec take_chunk(GroupId :: term(), ChunkId :: term(), Cache0 :: #ar_chunk_cache{}) -> - {ok, Chunk :: binary(), Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}. - -take_chunk(GroupId, ChunkId, Cache0) -> - map_chunk_cache_group(GroupId, fun(#ar_chunk_cache_group{ - chunk_cache = ChunkCache0, - chunk_cache_size_bytes = ChunkCacheSize0 - } = Group0) -> - case maps:take(ChunkId, ChunkCache0) of - {?CACHE_VALUE(Chunk, _Meta) = RetVal, ChunkCache1} -> {ok, RetVal, Group0#ar_chunk_cache_group{ - chunk_cache = ChunkCache1, - chunk_cache_size_bytes = ChunkCacheSize0 - byte_size(Chunk) - }}; - error -> {error, chunk_not_found} - end - end, Cache0). - - - --spec drop_chunk(GroupId :: term(), ChunkId :: term(), Cache0 :: #ar_chunk_cache{}) -> - {ok, Cache1 :: #ar_chunk_cache{}} | {error, Reason :: term()}. - -drop_chunk(GroupId, ChunkId, Cache0) -> - case take_chunk(GroupId, ChunkId, Cache0) of - {ok, _, Cache1} -> {ok, Cache1}; - {error, chunk_not_found} -> {ok, Cache0}; - {error, Reason} -> {error, Reason} - end. - - --spec chunk_exists(GroupId :: term(), ChunkId :: term(), Cache0 :: #ar_chunk_cache{}) -> - {ok, boolean()} | {error, Reason :: term()}. - -chunk_exists(GroupId, ChunkId, Cache0) -> - with_chunk_cache_group(GroupId, fun(Group) -> - {ok, maps:is_key(ChunkId, Group#ar_chunk_cache_group.chunk_cache)} - end, Cache0). - - - -%%%=================================================================== -%%% Private functions. -%%%=================================================================== - - - -%% Generates a closure that captures the `ChunkId` and `Chunk` and returns a function -%% that can be used to add a chunk to the chunk cache group. - -add_chunk_map_fun(ChunkId, Chunk, ChunkMeta) -> - fun(#ar_chunk_cache_group{ - chunk_cache = ChunkCache0, - chunk_cache_size_bytes = ChunkCacheSize0, - reserved_chunk_cache_bytes = ReservedChunkCacheBytes0 - } = Group0) -> - case maps:find(ChunkId, ChunkCache0) of - {ok, ?CACHE_VALUE(Chunk0, _Meta)} -> - {ok, Group0#ar_chunk_cache_group{ - chunk_cache = maps:put(ChunkId, {Chunk, ChunkMeta}, ChunkCache0), - chunk_cache_size_bytes = ChunkCacheSize0 + byte_size(Chunk) - byte_size(Chunk0), - reserved_chunk_cache_bytes = max(0, ReservedChunkCacheBytes0 - byte_size(Chunk) + byte_size(Chunk0)) - }}; - error when ReservedChunkCacheBytes0 > 0 -> - {ok, Group0#ar_chunk_cache_group{ - chunk_cache = maps:put(ChunkId, {Chunk, ChunkMeta}, ChunkCache0), - chunk_cache_size_bytes = ChunkCacheSize0 + byte_size(Chunk), - reserved_chunk_cache_bytes = max(0, ReservedChunkCacheBytes0 - byte_size(Chunk)) - }}; - error -> - {ok, Group0#ar_chunk_cache_group{ - chunk_cache = maps:put(ChunkId, {Chunk, ChunkMeta}, ChunkCache0), - chunk_cache_size_bytes = ChunkCacheSize0 + byte_size(Chunk), - reserved_chunk_cache_bytes = 0 - }} - end - end. - - - -%% Executes the `Fun` function with the chunk cache group as argument. -%% If the group does not exist, it returns an error without executing the `Fun`. - -with_chunk_cache_group(GroupId, Fun, Cache0) -> - case maps:is_key(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups) of - true -> - Fun(maps:get(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups)); - false -> - {error, group_not_found} - end. - - - -%% Executes the `Fun` function with the chunk cache group as argument. -%% If the group does not exist, it returns an error without executing the `Fun`, -%% unless `InsertIfNotFound` is true (false by default). -%% The `Fun` function should return either: -%% - a new chunk cache group `{ok, Group}`, which will be used to replace the old one. -%% - a new chunk cache group with return value `{ok, Return, Group}`, which will -%% be used to replace the old cache group and return a value to the caller. -%% - an error `{error, Reason}` to report back to the caller. - -map_chunk_cache_group(GroupId, Fun, Cache0) -> - map_chunk_cache_group(GroupId, Fun, Cache0, false). - -map_chunk_cache_group(GroupId, Fun, Cache0, InsertIfNotFound) -> - case maps:find(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups) of - {ok, Group0} -> - case Fun(Group0) of - {ok, Group1} -> - Cache1 = Cache0#ar_chunk_cache{ - chunk_cache_groups = maps:put(GroupId, Group1, Cache0#ar_chunk_cache.chunk_cache_groups) - }, - {ok, Cache1}; - {ok, RetVal, Group1} -> - Cache1 = Cache0#ar_chunk_cache{ - chunk_cache_groups = maps:put(GroupId, Group1, Cache0#ar_chunk_cache.chunk_cache_groups) - }, - {ok, RetVal, Cache1}; - {error, Reason} -> {error, Reason} - end; - error when InsertIfNotFound -> - case Fun(#ar_chunk_cache_group{}) of - {ok, Group1} -> - Cache1 = add_chunk_cache_group(GroupId, Group1, Cache0), - {ok, Cache1}; - {error, Reason} -> {error, Reason} - end; - error -> - {error, group_not_found} - end. - - - -add_chunk_cache_group(GroupId, Group, Cache0) -> - Cache1 = Cache0#ar_chunk_cache{ - chunk_cache_groups = maps:put(GroupId, Group, Cache0#ar_chunk_cache.chunk_cache_groups), - chunk_cache_groups_queue = queue:in(GroupId, Cache0#ar_chunk_cache.chunk_cache_groups_queue) - }, - case queue:len(Cache1#ar_chunk_cache.chunk_cache_groups_queue) > ?CACHE_GROUPS_LIMIT of - true -> - {{value, LastGroupId}, Queue1} = queue:out(Cache1#ar_chunk_cache.chunk_cache_groups_queue), - Cache2 = drop_group(LastGroupId, Cache1), - Cache2#ar_chunk_cache{chunk_cache_groups_queue = Queue1}; - false -> - Cache1 - end. - - - -%%%=================================================================== -%%% Tests. -%%%=================================================================== - - - --include_lib("eunit/include/eunit.hrl"). - - - -cache_size_test() -> - Cache = new(1024), - ?assertEqual(0, cache_size(Cache)). - - - -add_group_test() -> - Cache0 = new(1024), - GroupId0 = session0, - Cache1 = add_group(GroupId0, Cache0), - ?assert(group_exists(GroupId0, Cache1)), - ?assertEqual(0, cache_size(Cache1)), - Cache1 = add_group(GroupId0, Cache1), - ?assertEqual([GroupId0], get_groups(Cache1)). - - - -add_group_limit_test() -> - Cache0 = new(1024), - Data = <<"chunk_data">>, - {ok, Cache1} = add_chunk(session0, chunk0, Data, Cache0), - {ok, Cache2} = add_chunk(session1, chunk0, Data, Cache1), - {ok, Cache3} = add_chunk(session2, chunk0, Data, Cache2), - {ok, Cache4} = add_chunk(session3, chunk0, Data, Cache3), - ?assertEqual([session0, session1, session2, session3], get_groups(Cache4)), - ?assertEqual(4 * byte_size(Data), cache_size(Cache4)), - {ok, Cache5} = add_chunk(session4, chunk0, Data, Cache4), - ?assertEqual([session1, session2, session3, session4], get_groups(Cache5)), - ?assertEqual(4 * byte_size(Data), cache_size(Cache5)). - - - -reserve_test() -> - Cache0 = new(1024), - GroupId0 = session0, - ChunkId = chunk0, - Data = <<"chunk_data">>, - ReservedSize = 100, - - Cache1 = add_group(GroupId0, Cache0), - - {ok, Cache2} = reserve(GroupId0, ReservedSize, Cache1), - ?assertEqual(ReservedSize, cache_size(Cache2)), - ?assertMatch({ok, ReservedSize}, reserved_size(GroupId0, Cache2)), - - {ok, Cache3} = add_chunk(GroupId0, ChunkId, Data, Cache2), - ?assertEqual(ReservedSize, cache_size(Cache3)), - ExpectedReservedSize = ReservedSize - byte_size(Data), - ?assertMatch({ok, ExpectedReservedSize}, reserved_size(GroupId0, Cache3)), - - ?assertMatch({error, cache_limit_exceeded}, reserve(GroupId0, 1024 + ReservedSize, Cache3)), - - Cache4 = drop_group(GroupId0, Cache3), - ?assertEqual(0, cache_size(Cache4)). - - - -add_chunk_to_existing_group_test() -> - Cache0 = new(1024), - ChunkId = chunk0, - Data = <<"chunk_data">>, - - GroupId0 = session0, - Cache1 = add_group(GroupId0, Cache0), - {ok, Cache2} = add_chunk_to_existing_group(GroupId0, ChunkId, Data, Cache1), - ?assertEqual({ok, true}, chunk_exists(GroupId0, ChunkId, Cache2)), - ?assertEqual(byte_size(Data), cache_size(Cache2)), - {ok, Cache3} = add_chunk_to_existing_group(GroupId0, ChunkId, Data, Cache2), - ?assertEqual(Cache3, Cache2), - - GroupId1 = session1, - {error, group_not_found} = add_chunk_to_existing_group(GroupId1, ChunkId, Data, Cache2). - - - -add_chunk_test() -> - Cache0 = new(1024), - ChunkId = chunk0, - Data = <<"chunk_data">>, - - GroupId0 = session0, - {ok, Cache1} = add_chunk(GroupId0, ChunkId, Data, Cache0), - ?assertEqual({ok, true}, chunk_exists(GroupId0, ChunkId, Cache1)), - ?assertEqual(byte_size(Data), cache_size(Cache1)), - - GroupId1 = session1, - {ok, Cache2} = add_chunk(GroupId1, ChunkId, Data, Cache1), - ?assertEqual({ok, true}, chunk_exists(GroupId1, ChunkId, Cache2)), - ?assertEqual(byte_size(Data) * 2, cache_size(Cache2)). - - - -add_chunk_meta_test() -> - Cache0 = new(1024), - ChunkId = chunk0, - Data = <<"chunk_data">>, - Meta = #{foo => bar}, - - GroupId0 = session0, - {ok, Cache1} = add_chunk(GroupId0, ChunkId, {Data, Meta}, Cache0), - ?assertEqual({ok, true}, chunk_exists(GroupId0, ChunkId, Cache1)), - ?assertEqual(byte_size(Data), cache_size(Cache1)), - - GroupId1 = session1, - {ok, Cache2} = add_chunk(GroupId1, ChunkId, {Data, Meta}, Cache1), - ?assertEqual({ok, true}, chunk_exists(GroupId1, ChunkId, Cache2)), - ?assertEqual(byte_size(Data) * 2, cache_size(Cache2)). - - - -take_chunk_test() -> - Cache0 = new(1024), - GroupId0 = session0, - ChunkId0 = chunk0, - Data = <<"chunk_data">>, - {ok, Cache1} = add_chunk(GroupId0, ChunkId0, Data, Cache0), - ?assertEqual(byte_size(Data), cache_size(Cache1)), - {ok, {Data, #{}}, Cache2} = take_chunk(GroupId0, ChunkId0, Cache1), - ?assertEqual(0, cache_size(Cache2)). - - - -set_limit_test() -> - Cache0 = new(), - Data = <<"chunk_data">>, - GroupId0 = session0, - - ChunkId0 = chunk0, - {ok, Cache1} = add_chunk(GroupId0, ChunkId0, Data, Cache0), - ?assertEqual(byte_size(Data), cache_size(Cache1)), - - ChunkId1 = chunk1, - Cache2 = set_limit(5, Cache1), - {error, cache_limit_exceeded} = add_chunk(GroupId0, ChunkId1, Data, Cache2), - ?assertEqual(byte_size(Data), cache_size(Cache2)). - - -drop_chunk_test() -> - Cache0 = new(1024), - ChunkId = chunk0, - Data = <<"chunk_data">>, - - GroupId0 = session0, - {ok, Cache1} = add_chunk(GroupId0, ChunkId, Data, Cache0), - ?assertEqual(byte_size(Data), cache_size(Cache1)), - - {ok, Cache2} = drop_chunk(GroupId0, ChunkId, Cache1), - ?assertMatch({ok, false}, chunk_exists(GroupId0, ChunkId, Cache2)), - ?assertEqual(0, cache_size(Cache2)). - - - -drop_group_test() -> - Cache0 = new(1024), - ChunkId = chunk0, - Data = <<"chunk_data">>, - - GroupId0 = session0, - {ok, Cache1} = add_chunk(GroupId0, ChunkId, Data, Cache0), - ?assertEqual({ok, true}, chunk_exists(GroupId0, ChunkId, Cache1)), - ?assertEqual(byte_size(Data), cache_size(Cache1)), - - Cache2 = drop_group(GroupId0, Cache1), - ?assertNot(group_exists(GroupId0, Cache2)), - ?assertEqual(0, cache_size(Cache2)). diff --git a/apps/arweave/src/ar_mining_cache.erl b/apps/arweave/src/ar_mining_cache.erl new file mode 100644 index 000000000..b18ca64b9 --- /dev/null +++ b/apps/arweave/src/ar_mining_cache.erl @@ -0,0 +1,366 @@ +-module(ar_mining_cache). +-include_lib("arweave/include/ar_mining_cache.hrl"). + +-export([ + new/0, new/1, set_limit/2, + cache_size/1, available_size/1, reserved_size/1, reserved_size/2, + add_session/2, reserve_for_session/3, release_for_session/3, drop_session/2, + session_exists/2, get_sessions/1 +]). + +-define(CACHE_SESSIONS_LIMIT, 4). + +%%%=================================================================== +%%% Public API. +%%%=================================================================== + +%% @doc Creates a new mining cache with a default limit of 0. +-spec new() -> + Cache :: #ar_mining_cache{}. +new() -> #ar_mining_cache{}. + +%% @doc Creates a new mining cache with a given limit. +-spec new(Limit :: pos_integer()) -> + Cache :: #ar_mining_cache{}. +new(Limit) -> #ar_mining_cache{mining_cache_limit_bytes = Limit}. + +%% @doc Sets the limit for the mining cache. +-spec set_limit(Limit :: pos_integer(), Cache :: #ar_mining_cache{}) -> + Cache :: #ar_mining_cache{}. +set_limit(Limit, Cache) -> + Cache#ar_mining_cache{mining_cache_limit_bytes = Limit}. + +%% @doc Returns the size of the cached data in bytes. +%% Note, that cache size includes both the cached data and the reserved space for sessions. +-spec cache_size(Cache :: #ar_mining_cache{}) -> + Size :: non_neg_integer(). +cache_size(Cache) -> + maps:fold( + fun(_, #ar_mining_cache_session{mining_cache_size_bytes = Size, reserved_mining_cache_bytes = ReservedSize}, Acc) -> + Acc + Size + ReservedSize + end, + 0, + Cache#ar_mining_cache.mining_cache_sessions + ). + +%% @doc Returns the available size for the mining cache. +%% Note, that this value does not take into account the reserved space for sessions, +%% as this space is considered already used. +%% @see reserved_size/1,2 +-spec available_size(Cache :: #ar_mining_cache{}) -> + Size :: non_neg_integer(). +available_size(Cache) -> + Cache#ar_mining_cache.mining_cache_limit_bytes - cache_size(Cache). + +%% @doc Returns the reserved size for a cache. +-spec reserved_size(Cache0 :: #ar_mining_cache{}) -> + {ok, Size :: non_neg_integer()} | {error, Reason :: term()}. +reserved_size(Cache0) -> + lists:sum([ + begin + {ok, Size} = reserved_size(SessionId, Cache0), + Size + end || SessionId <- get_sessions(Cache0) + ]). + +%% @doc Returns the reserved size for a session. +-spec reserved_size(SessionId :: term(), Cache0 :: #ar_mining_cache{}) -> + {ok, Size :: non_neg_integer()} | {error, Reason :: term()}. +reserved_size(SessionId, Cache0) -> + with_mining_cache_session(SessionId, fun(Session) -> + {ok, Session#ar_mining_cache_session.reserved_mining_cache_bytes, Session} + end, Cache0). + +%% @doc Adds a new mining cache session to the cache. +%% If the cache limit is exceeded, the oldest session is dropped. +-spec add_session(SessionId :: term(), Cache0 :: #ar_mining_cache{}) -> + Cache1 :: #ar_mining_cache{}. +add_session(SessionId, Cache0) -> + case maps:is_key(SessionId, Cache0#ar_mining_cache.mining_cache_sessions) of + true -> Cache0; + false -> + Cache1 = Cache0#ar_mining_cache{ + mining_cache_sessions = maps:put(SessionId, #ar_mining_cache_session{}, Cache0#ar_mining_cache.mining_cache_sessions), + mining_cache_sessions_queue = queue:in(SessionId, Cache0#ar_mining_cache.mining_cache_sessions_queue) + }, + case queue:len(Cache1#ar_mining_cache.mining_cache_sessions_queue) > ?CACHE_SESSIONS_LIMIT of + true -> + {{value, LastSessionId}, Queue1} = queue:out(Cache1#ar_mining_cache.mining_cache_sessions_queue), + Cache2 = drop_session(LastSessionId, Cache1), + Cache2#ar_mining_cache{mining_cache_sessions_queue = Queue1}; + false -> + Cache1 + end + end. + +%% @doc Reserves a certain amount of space for a session. +%% Note, that if the session already has a reserved amount of space, it will be +%% added to the existing reserved space. +-spec reserve_for_session(SessionId :: term(), Size :: non_neg_integer(), Cache0 :: #ar_mining_cache{}) -> + {ok, Cache1 :: #ar_mining_cache{}} | {error, Reason :: term()}. +reserve_for_session(SessionId, Size, Cache0) -> + case available_size(Cache0) < Size of + true -> {error, cache_limit_exceeded}; + false -> + with_mining_cache_session(SessionId, fun(#ar_mining_cache_session{reserved_mining_cache_bytes = ReservedSize} = Session) -> + {ok, Session#ar_mining_cache_session{reserved_mining_cache_bytes = ReservedSize + Size}} + end, Cache0) + end. + +%% @doc Releases the reserved space for a session. +%% If the reserved space is less than the released size, the reserved space will be set to 0. +-spec release_for_session(SessionId :: term(), Size :: non_neg_integer(), Cache0 :: #ar_mining_cache{}) -> + {ok, Cache1 :: #ar_mining_cache{}} | {error, Reason :: term()}. +release_for_session(SessionId, Size, Cache0) -> + with_mining_cache_session(SessionId, fun(#ar_mining_cache_session{reserved_mining_cache_bytes = ReservedSize} = Session) -> + {ok, Session#ar_mining_cache_session{reserved_mining_cache_bytes = max(0, ReservedSize - Size)}} + end, Cache0). + +%% @doc Drops a mining cache session from the cache. +-spec drop_session(SessionId :: term(), Cache0 :: #ar_mining_cache{}) -> + Cache1 :: #ar_mining_cache{}. +drop_session(SessionId, Cache0) -> + Cache0#ar_mining_cache{ + mining_cache_sessions = maps:remove(SessionId, Cache0#ar_mining_cache.mining_cache_sessions), + mining_cache_sessions_queue = queue:filter( + fun(SessionId0) -> SessionId0 =/= SessionId end, + Cache0#ar_mining_cache.mining_cache_sessions_queue + ) + }. + +%% @doc Checks if a session exists in the cache. +-spec session_exists(SessionId :: term(), Cache0 :: #ar_mining_cache{}) -> + Exists :: boolean(). +session_exists(SessionId, Cache0) -> + maps:is_key(SessionId, Cache0#ar_mining_cache.mining_cache_sessions). + +%% @doc Returns the list of sessions in the cache. +%% Note, that this list is not sorted by the chronological order. +-spec get_sessions(Cache0 :: #ar_mining_cache{}) -> + Sessions :: [term()]. +get_sessions(Cache0) -> + queue:to_list(Cache0#ar_mining_cache.mining_cache_sessions_queue). + +%% @doc Maps a cached value for a session into a new value. +%% +%% This function will take care of the cache size and reserved space for the session. +%% If the session does not contain a cached value for the given key, it will be generated, +%% e.g. the very first event for the `Key` is a genesis event. +%% +%% The `Fun` must return one of the following: +%% - `{ok, drop}`: drops the cached value +%% - `{ok, Value1}`: replaces the cached value +%% - `{error, Reason}`: returns an error +%% +%% If the returned value equals to the argument passed into the `Fun`, the cache +%% will not be changed. This implies that cache will not store the empty value. +-spec with_cached_value( + Key :: term(), + SessionId :: term(), + Cache0 :: #ar_mining_cache{}, + Fun :: fun( + (Value :: #ar_mining_cache_value{}) -> + {ok, drop} | + {ok, Value1 :: #ar_mining_cache_value{}} | + {error, Reason :: term()} + ) +) -> + Result :: term(). +with_cached_value(Key, SessionId, Cache0, Fun) -> + with_mining_cache_session(SessionId, fun(Session) -> + Value0 = maps:get(Key, Session#ar_mining_cache_session.mining_cache, #ar_mining_cache_value{}), + case Fun(Value0) of + {ok, Value0} -> + {ok, Session}; + {ok, Value1} -> + SizeDiff = cached_size(Value1) - cached_size(Value0), + SessionAvailableSize = available_size(Cache0) + reserved_size(SessionId, Cache0), + case SizeDiff > SessionAvailableSize of + true -> {error, cache_limit_exceeded}; + false -> + {ok, Session#ar_mining_cache_session{ + mining_cache = maps:put(Key, Value1, Session#ar_mining_cache_session.mining_cache), + reserved_mining_cache_bytes = max(0, Session#ar_mining_cache_session.reserved_mining_cache_bytes + SizeDiff) + }} + end; + {ok, drop} -> + {ok, Session#ar_mining_cache_session{ + mining_cache = maps:remove(Key, Session#ar_mining_cache_session.mining_cache) + }}; + {error, Reason} -> {error, Reason} + end + end, Cache0). + +%%%=================================================================== +%%% Private functions. +%%%=================================================================== + +%% Returns the size of the cached data in bytes. +cached_size(#ar_mining_cache_value{chunk1 = Chunk1, chunk2 = Chunk2}) -> + MaybeBinarySize = fun + (undefined) -> 0; + (Binary) -> byte_size(Binary) + end, + MaybeBinarySize(Chunk1) + MaybeBinarySize(Chunk2). + +%% Executes the `Fun` function with the chunk cache session as argument. +%% If the session does not exist, it returns an error without executing the `Fun`. +%% The `Fun` function should return either: +%% - a new chunk cache session `{ok, Session}`, which will be used to replace the old one. +%% - a new chunk cache session with return value `{ok, Return, Session}`, which will +%% be used to replace the old cache session and return a value to the caller. +%% - an error `{error, Reason}` to report back to the caller. +with_mining_cache_session(SessionId, Fun, Cache0) -> + case maps:is_key(SessionId, Cache0#ar_mining_cache.mining_cache_sessions) of + true -> + case Fun(maps:get(SessionId, Cache0#ar_mining_cache.mining_cache_sessions)) of + {ok, Return, Session1} -> {ok, Return, Session1}; + {ok, Session1} -> {ok, Session1}; + {error, Reason} -> {error, Reason} + end; + false -> + {error, session_not_found} + end. + +%%%=================================================================== +%%% Tests. +%%%=================================================================== + +-include_lib("eunit/include/eunit.hrl"). + +cache_size_test() -> + Cache = new(), + ?assertEqual(0, cache_size(Cache)). + +add_session_test() -> + Cache0 = new(), + SessionId0 = session0, + Cache1 = add_session(SessionId0, Cache0), + ?assert(session_exists(SessionId0, Cache1)), + ?assertEqual(0, cache_size(Cache1)), + Cache1 = add_session(SessionId0, Cache1), + ?assertEqual([SessionId0], get_sessions(Cache1)). + +add_session_limit_test() -> + Cache0 = new(), + Cache1 = add_session(session0, Cache0), + Cache2 = add_session(session1, Cache1), + Cache3 = add_session(session2, Cache2), + Cache4 = add_session(session3, Cache3), + ?assertEqual([session0, session1, session2, session3], get_sessions(Cache4)), + ?assertEqual(0, cache_size(Cache4)), + Cache5 = add_session(session4, Cache4), + ?assertEqual([session1, session2, session3, session4], get_sessions(Cache5)), + ?assertEqual(0, cache_size(Cache5)). + +reserve_test() -> + Cache0 = new(1024), + SessionId0 = session0, + ChunkId = chunk0, + Data = <<"chunk_data">>, + ReservedSize = 100, + %% Add session + Cache1 = add_session(SessionId0, Cache0), + %% Reserve space + {ok, Cache2} = reserve_for_session(SessionId0, ReservedSize, Cache1), + ?assertEqual(ReservedSize, cache_size(Cache2)), + ?assertMatch({ok, ReservedSize}, reserved_size(SessionId0, Cache2)), + %% Add chunk1 + {ok, Cache3} = with_cached_value(ChunkId, SessionId0, Cache2, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk1 = Data}} + end), + ?assertEqual(ReservedSize, cache_size(Cache3)), + ExpectedReservedSize = ReservedSize - byte_size(Data), + ?assertMatch({ok, ExpectedReservedSize}, reserved_size(SessionId0, Cache3)), + %% Reserve more space + ?assertMatch({error, cache_limit_exceeded}, reserve_for_session(SessionId0, 1024 + ReservedSize, Cache3)), + %% Drop session + Cache4 = drop_session(SessionId0, Cache3), + ?assertEqual(0, cache_size(Cache4)). + +with_cached_value_add_chunk_test() -> + Cache0 = new(1024), + ChunkId = chunk0, + Data = <<"chunk_data">>, + SessionId0 = session0, + %% Add session + Cache1 = add_session(SessionId0, Cache0), + %% Add chunk1 + {ok, Cache2} = with_cached_value(ChunkId, SessionId0, Cache1, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk1 = Data}} + end), + ?assertEqual(byte_size(Data), cache_size(Cache2)), + %% Add chunk2 + {ok, Cache3} = with_cached_value(ChunkId, SessionId0, Cache2, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk2 = Data}} + end), + ?assertEqual(byte_size(Data) * 2, cache_size(Cache3)). + +with_cached_value_add_hash_test() -> + Cache0 = new(), + ChunkId = chunk0, + Hash = <<"hash">>, + SessionId0 = session0, + %% Add session + Cache1 = add_session(SessionId0, Cache0), + %% Add h1 + {ok, Cache2} = with_cached_value(ChunkId, SessionId0, Cache1, fun(Value) -> + {ok, Value#ar_mining_cache_value{h1 = Hash}} + end), + ?assertEqual(0, cache_size(Cache2)), + %% Add h2 + {ok, Cache3} = with_cached_value(ChunkId, SessionId0, Cache2, fun(Value) -> + {ok, Value#ar_mining_cache_value{h2 = Hash}} + end), + ?assertEqual(0, cache_size(Cache3)). + +with_cached_value_drop_test() -> + Cache0 = new(1024), + ChunkId = chunk0, + Data = <<"chunk_data">>, + SessionId0 = session0, + %% Add session + Cache1 = add_session(SessionId0, Cache0), + %% Add chunk1 + {ok, Cache2} = with_cached_value(ChunkId, SessionId0, Cache1, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk1 = Data}} + end), + ?assertEqual(byte_size(Data), cache_size(Cache2)), + %% Drop + {ok, Cache3} = with_cached_value(ChunkId, SessionId0, Cache2, fun(_Value) -> + {ok, drop} + end), + ?assertEqual(0, cache_size(Cache3)). + +set_limit_test() -> + Cache0 = new(), + Data = <<"chunk_data">>, + SessionId0 = session0, + + ChunkId0 = chunk0, + {ok, Cache1} = with_cached_value(ChunkId0, SessionId0, Cache0, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk1 = Data}} + end), + ?assertEqual(byte_size(Data), cache_size(Cache1)), + + ChunkId1 = chunk1, + Cache2 = set_limit(5, Cache1), + {error, cache_limit_exceeded} = with_cached_value(ChunkId1, SessionId0, Cache2, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk1 = Data}} + end), + ?assertEqual(byte_size(Data), cache_size(Cache2)). + +drop_session_test() -> + Cache0 = new(1024), + ChunkId = chunk0, + Data = <<"chunk_data">>, + + SessionId0 = session0, + {ok, Cache1} = with_cached_value(ChunkId, SessionId0, Cache0, fun(Value) -> + {ok, Value#ar_mining_cache_value{chunk1 = Data}} + end), + ?assertEqual(byte_size(Data), cache_size(Cache1)), + + Cache2 = drop_session(SessionId0, Cache1), + ?assertNot(session_exists(SessionId0, Cache2)), + ?assertEqual(0, cache_size(Cache2)). diff --git a/apps/arweave/src/ar_mining_io.erl b/apps/arweave/src/ar_mining_io.erl index 6d7746e9e..d7ed3d954 100644 --- a/apps/arweave/src/ar_mining_io.erl +++ b/apps/arweave/src/ar_mining_io.erl @@ -2,9 +2,10 @@ -behaviour(gen_server). --export([start_link/0, start_link/1, set_largest_seen_upper_bound/1, +-export([start_link/0, start_link/1, set_largest_seen_upper_bound/1, get_packing/0, get_partitions/0, get_partitions/1, read_recall_range/4, - garbage_collect/0, get_replica_format_from_packing_difficulty/1]). + is_recall_range_readable/2, garbage_collect/0, + get_replica_format_from_packing_difficulty/1]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -46,6 +47,10 @@ read_recall_range(WhichChunk, Worker, Candidate, RecallRangeStart) -> gen_server:call(?MODULE, {read_recall_range, WhichChunk, Worker, Candidate, RecallRangeStart}, 60000). +is_recall_range_readable(Candidate, RecallRangeStart) -> + gen_server:call(?MODULE, + {is_recall_range_readable, Candidate, RecallRangeStart}, 60000). + get_packing() -> {ok, Config} = application:get_env(arweave, config), %% ar_config:validate_storage_modules/1 ensures that we only mine against a single @@ -63,7 +68,7 @@ get_partitions(PartitionUpperBound) -> AllPartitions = lists:foldl( fun (Module, Acc) -> Addr = ar_storage_module:module_address(Module), - PackingDifficulty = + PackingDifficulty = ar_storage_module:module_packing_difficulty(Module), {Start, End} = ar_storage_module:module_range(Module, 0), Partitions = get_store_id_partitions({Start, End}, []), @@ -122,6 +127,15 @@ handle_call({read_recall_range, WhichChunk, Worker, Candidate, RecallRangeStart} end, {reply, ThreadFound, State}; +handle_call({is_recall_range_readable, Candidate, RecallRangeStart}, _From, State) -> + #mining_candidate{ packing_difficulty = PackingDifficulty } = Candidate, + RangeEnd = RecallRangeStart + ar_block:get_recall_range_size(PackingDifficulty), + ThreadFound = case find_thread(RecallRangeStart, RangeEnd, State) of + not_found -> false; + {_, _} -> true + end, + {reply, ThreadFound, State}; + handle_call(Request, _From, State) -> ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]), {reply, ok, State}. @@ -210,7 +224,7 @@ start_io_threads(State) -> partition_to_store_ids = PartitionToStoreIDs } = StateAcc, StoreIDs2 = sets:to_list(StoreIDs), - + Thread = start_io_thread(Mode, StoreIDs2), ThreadRef = monitor(process, Thread), @@ -288,7 +302,7 @@ handle_io_thread_down(Ref, Reason, State) -> StoreIDs = maps:get(Device, DeviceToStoreIDs, sets:new()), Thread = start_io_thread(Mode, sets:to_list(StoreIDs)), ThreadRef = monitor(process, Thread), - State#state{ io_threads = maps:put(Device, Thread, Threads2), + State#state{ io_threads = maps:put(Device, Thread, Threads2), io_thread_monitor_refs = maps:put(ThreadRef, Device, Refs2) }. io_thread(Mode, Cache, LastClearTime) -> @@ -355,7 +369,7 @@ maybe_clear_cached_chunks(Cache, LastClearTime) -> %% @doc When we're reading a range for a CM peer we'll cache it temporarily in case %% that peer has broken up the batch of H1s into multiple requests. The temporary cache %% prevents us from reading the same range from disk multiple times. -%% +%% %% However if the request is from our local miner there's no need to cache since the H1 %% batch is always handled all at once. get_chunks(Mode, WhichChunk, Candidate, RangeStart, StoreID, Cache) -> @@ -370,7 +384,7 @@ get_chunks(Mode, WhichChunk, Candidate, RangeStart, StoreID, Cache) -> cached_read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID, Cache) -> Now = os:system_time(millisecond), case maps:get(RangeStart, Cache, not_found) of - not_found -> + not_found -> ChunkOffsets = read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID), Cache2 = maps:put(RangeStart, {Now, ChunkOffsets}, Cache), {ChunkOffsets, Cache2}; @@ -416,7 +430,7 @@ log_read_range(standalone, _Candidate, _WhichChunk, _FoundChunks, _StartTime) -> log_read_range(_Mode, Candidate, WhichChunk, FoundChunks, StartTime) -> EndTime = erlang:monotonic_time(), ElapsedTime = erlang:convert_time_unit(EndTime-StartTime, native, millisecond), - ReadRate = case ElapsedTime > 0 of + ReadRate = case ElapsedTime > 0 of true -> (FoundChunks * 1000 div 4) div ElapsedTime; %% MiB per second false -> 0 end, diff --git a/apps/arweave/src/ar_mining_server.erl b/apps/arweave/src/ar_mining_server.erl index 51c85bda4..1906192fb 100644 --- a/apps/arweave/src/ar_mining_server.erl +++ b/apps/arweave/src/ar_mining_server.erl @@ -204,7 +204,7 @@ handle_cast({start_mining, Args}, State) -> maps:foreach( fun(_Partition, Worker) -> - ar_mining_worker:reset_difficulty(Worker, DiffPair) + ar_mining_worker:reset_mining_session(Worker, DiffPair) end, State#state.workers ), diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index 54a0252fa..7a057babf 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -2,7 +2,7 @@ -behaviour(gen_server). --export([start_link/2, name/2, reset_difficulty/2, set_sessions/2, chunks_read/5, computed_hash/5, +-export([start_link/2, name/2, reset_mining_session/2, set_sessions/2, chunks_read/5, computed_hash/5, set_difficulty/2, set_cache_limits/3, add_task/3, garbage_collect/1]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -10,6 +10,7 @@ -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). -include_lib("arweave/include/ar_mining.hrl"). +-include_lib("arweave/include/ar_mining_cache.hrl"). -include_lib("eunit/include/eunit.hrl"). -record(state, { @@ -18,10 +19,6 @@ diff_pair = not_set, packing_difficulty = 0, task_queue = gb_sets:new(), - %% The sub_chunk_cache stores either the first or second sub-chunk for a given nonce. This - %% is because we process both the first and second recall ranges in parallel and don't know - %% which data will be available first. For spora_2_6 packing (aka difficulty 0), sub-chunks - %% and chunks are the same size (256KiB), for replica packing each sub-chunk is 8KiB. chunk_cache = undefined, vdf_queue_limit = 0, latest_vdf_step_number = 0, @@ -33,17 +30,19 @@ -define(TASK_CHECK_FREQUENCY_MS, 200). -define(STATUS_CHECK_FREQUENCY_MS, 5000). +-define(VDF_STEP_CACHE_KEY(CacheRef, Nonce), {CacheRef, Nonce}). + %%%=================================================================== %%% Messages %%%=================================================================== --define(MSG_RESET_DIFFICULTY(DiffPair), {reset_difficulty, DiffPair}). +-define(MSG_RESET_MINING_SESSION(DiffPair), {reset_mining_session, DiffPair}). -define(MSG_SET_SESSIONS(ActiveSessions), {set_sessions, ActiveSessions}). -define(MSG_ADD_TASK(Task), {add_task, Task}). -define(MSG_CHUNKS_READ(WhichChunk, Candidate, RangeStart, ChunkOffsets), {chunks_read, {WhichChunk, Candidate, RangeStart, ChunkOffsets}}). -define(MSG_SET_DIFFICULTY(DiffPair), {set_difficulty, DiffPair}). --define(MSG_SET_CACHE_LIMITS(SubChunkCacheLimitBytes, VDFQueueLimit), {set_cache_limits, SubChunkCacheLimitBytes, VDFQueueLimit}). --define(MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(SubChunkCount, Candidate), {remove_sub_chunks_from_cache, SubChunkCount, Candidate}). +-define(MSG_SET_CACHE_LIMITS(SubchunkCacheLimitBytes, VDFQueueLimit), {set_cache_limits, SubchunkCacheLimitBytes, VDFQueueLimit}). +-define(MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(SubchunkCount, Candidate), {remove_subchunks_from_cache, SubchunkCount, Candidate}). -define(MSG_CHECK_WORKER_STATUS, {check_worker_status}). -define(MSG_HANDLE_TASK, {handle_task}). -define(MSG_GARBAGE_COLLECT(StartTime, GCResult), {garbage_collect, StartTime, GCResult}). @@ -63,9 +62,9 @@ start_link(Partition, PackingDifficulty) -> name(Partition, PackingDifficulty) -> list_to_atom(lists:flatten(["ar_mining_worker_", integer_to_list(Partition), "_", integer_to_list(PackingDifficulty)])). --spec reset_difficulty(Worker :: pid(), DiffPair :: {non_neg_integer(), non_neg_integer()}) -> ok. -reset_difficulty(Worker, DiffPair) -> - gen_server:cast(Worker, ?MSG_RESET_DIFFICULTY(DiffPair)). +-spec reset_mining_session(Worker :: pid(), DiffPair :: {non_neg_integer(), non_neg_integer()}) -> ok. +reset_mining_session(Worker, DiffPair) -> + gen_server:cast(Worker, ?MSG_RESET_MINING_SESSION(DiffPair)). -spec set_sessions(Worker :: pid(), ActiveSessions :: [ar_nonce_limiter:session_key()]) -> ok. set_sessions(Worker, ActiveSessions) -> @@ -122,9 +121,9 @@ computed_hash(Worker, computed_h2, H2, Preimage, Candidate) -> set_difficulty(Worker, DiffPair) -> gen_server:cast(Worker, ?MSG_SET_DIFFICULTY(DiffPair)). --spec set_cache_limits(Worker :: pid(), SubChunkCacheLimitBytes :: non_neg_integer(), VDFQueueLimit :: non_neg_integer()) -> ok. -set_cache_limits(Worker, SubChunkCacheLimitBytes, VDFQueueLimit) -> - gen_server:cast(Worker, ?MSG_SET_CACHE_LIMITS(SubChunkCacheLimitBytes, VDFQueueLimit)). +-spec set_cache_limits(Worker :: pid(), SubchunkCacheLimitBytes :: non_neg_integer(), VDFQueueLimit :: non_neg_integer()) -> ok. +set_cache_limits(Worker, SubchunkCacheLimitBytes, VDFQueueLimit) -> + gen_server:cast(Worker, ?MSG_SET_CACHE_LIMITS(SubchunkCacheLimitBytes, VDFQueueLimit)). -spec garbage_collect(Worker :: pid()) -> ok. garbage_collect(Worker) -> @@ -138,7 +137,7 @@ init({Partition, PackingDifficulty}) -> Name = name(Partition, PackingDifficulty), ?LOG_DEBUG([{event, mining_debug_worker_started}, {worker, Name}, {pid, self()}, {partition, Partition}]), - ChuckCache = ar_chunk_cache:new(), + ChuckCache = ar_mining_cache:new(), State0 = #state{ name = Name, chunk_cache = ChuckCache, @@ -146,26 +145,25 @@ init({Partition, PackingDifficulty}) -> is_pool_client = ar_pool:is_client(), packing_difficulty = PackingDifficulty }, - report_chunk_cache_metrics(State0), gen_server:cast(self(), ?MSG_HANDLE_TASK), gen_server:cast(self(), ?MSG_CHECK_WORKER_STATUS), - {ok, State0}. + {ok, report_chunk_cache_metrics(State0)}. handle_call(Request, _From, State) -> ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]), {reply, ok, State}. -handle_cast(?MSG_SET_CACHE_LIMITS(SubChunkCacheLimitBytes, VDFQueueLimit), State) -> +handle_cast(?MSG_SET_CACHE_LIMITS(SubchunkCacheLimitBytes, VDFQueueLimit), State) -> {noreply, State#state{ %% TODO: Convert to bytes - chunk_cache = ar_chunk_cache:set_limit(SubChunkCacheLimitBytes, State#state.chunk_cache), + chunk_cache = ar_mining_cache:set_limit(SubchunkCacheLimitBytes, State#state.chunk_cache), vdf_queue_limit = VDFQueueLimit }}; handle_cast(?MSG_SET_DIFFICULTY(DiffPair), State) -> {noreply, State#state{ diff_pair = DiffPair }}; -handle_cast(?MSG_RESET_DIFFICULTY(DiffPair), State) -> +handle_cast(?MSG_RESET_MINING_SESSION(DiffPair), State) -> State2 = update_sessions([], State), {noreply, State2#state{ diff_pair = DiffPair }}; @@ -181,7 +179,7 @@ handle_cast(?MSG_CHUNKS_READ(WhichChunk, Candidate, RangeStart, ChunkOffsets), S ?LOG_DEBUG([{event, mining_debug_add_stale_chunks}, {worker, State#state.name}, {active_sessions, - ar_mining_server:encode_sessions(ar_chunk_cache:get_groups(State#state.chunk_cache))}, + ar_mining_server:encode_sessions(ar_mining_cache:get_sessions(State#state.chunk_cache))}, {candidate_session, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, {partition_number, Candidate#mining_candidate.partition_number}, @@ -198,7 +196,7 @@ handle_cast(?MSG_ADD_TASK({TaskType, Candidate, _ExtraArgs} = Task), State) -> {worker, State#state.name}, {task, TaskType}, {active_sessions, - ar_mining_server:encode_sessions(ar_chunk_cache:get_groups(State#state.chunk_cache))}, + ar_mining_server:encode_sessions(ar_mining_cache:get_sessions(State#state.chunk_cache))}, {candidate_session, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, {partition_number, Candidate#mining_candidate.partition_number}, @@ -213,10 +211,9 @@ handle_cast(?MSG_HANDLE_TASK, #state{ task_queue = Q } = State) -> ar_util:cast_after(?TASK_CHECK_FREQUENCY_MS, self(), ?MSG_HANDLE_TASK), {noreply, State}; _ -> - {{_Priority, _ID, Task}, Q2} = gb_sets:take_smallest(Q), - {TaskType, Candidate, _ExtraArgs} = Task, - prometheus_gauge:dec(mining_server_task_queue_len, [TaskType]), gen_server:cast(self(), ?MSG_HANDLE_TASK), + {{_Priority, _ID, {TaskType, Candidate, _ExtraArgs} = Task}, Q2} = gb_sets:take_smallest(Q), + prometheus_gauge:dec(mining_server_task_queue_len, [TaskType]), case is_session_valid(State, Candidate) of true -> handle_task(Task, State#state{ task_queue = Q2 }); @@ -225,7 +222,7 @@ handle_cast(?MSG_HANDLE_TASK, #state{ task_queue = Q } = State) -> {worker, State#state.name}, {task, TaskType}, {active_sessions, - ar_mining_server:encode_sessions(ar_chunk_cache:get_groups(State#state.chunk_cache))}, + ar_mining_server:encode_sessions(ar_mining_cache:get_sessions(State#state.chunk_cache))}, {candidate_session, ar_nonce_limiter:encode_session_key( Candidate#mining_candidate.session_key)}, {partition_number, Candidate#mining_candidate.partition_number}, @@ -235,8 +232,8 @@ handle_cast(?MSG_HANDLE_TASK, #state{ task_queue = Q } = State) -> end end; -handle_cast(?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(SubChunkCount, Candidate), State) -> - {noreply, remove_sub_chunks_from_cache(Candidate, SubChunkCount, State)}; +handle_cast(?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(SubchunkCount, Candidate), State) -> + {noreply, remove_subchunks_from_cache(Candidate, SubchunkCount, State)}; handle_cast(?MSG_CHECK_WORKER_STATUS, State) -> maybe_warn_about_lag(State#state.task_queue, State#state.name), @@ -280,6 +277,7 @@ terminate(_Reason, _State) -> %%%=================================================================== %%% Mining tasks. %%%=================================================================== + add_task({TaskType, Candidate, _ExtraArgs} = Task, State) -> #state{ task_queue = Q } = State, StepNumber = Candidate#mining_candidate.step_number, @@ -287,275 +285,173 @@ add_task({TaskType, Candidate, _ExtraArgs} = Task, State) -> prometheus_gauge:inc(mining_server_task_queue_len, [TaskType]), State#state{ task_queue = Q2 }. -process_chunks(WhichChunk, Candidate, RangeStart, ChunkOffsets, State) -> - PackingDifficulty = Candidate#mining_candidate.packing_difficulty, - MaxNonce = ar_block:get_max_nonce(PackingDifficulty), - SubChunkSize = ar_block:get_sub_chunk_size(PackingDifficulty), - NoncesPerChunk = ar_block:get_nonces_per_chunk(PackingDifficulty), - process_chunks(WhichChunk, Candidate, RangeStart, 0, NoncesPerChunk, MaxNonce, - ChunkOffsets, SubChunkSize, 0, State). - -process_chunks(WhichChunk, Candidate, _RangeStart, Nonce, _NoncesPerChunk, NonceMax, - _ChunkOffsets, _SubChunkSize, Count, State) when Nonce > NonceMax -> - Partition = case WhichChunk of - chunk1 -> - Candidate#mining_candidate.partition_number; - chunk2 -> - Candidate#mining_candidate.partition_number2 - end, - ar_mining_stats:chunks_read(Partition, Count), - State; -process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax, - [], SubChunkSize, Count, State) -> - %% No more ChunkOffsets means no more chunks have been read. Iterate through all the - %% remaining nonces and remove the full chunks from the cache. - gen_server:cast( - self(), - ?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(NoncesPerChunk, Candidate#mining_candidate{ nonce = Nonce }) - ), - process_chunks(WhichChunk, Candidate, RangeStart, Nonce + NoncesPerChunk, NoncesPerChunk, - NonceMax, [], SubChunkSize, Count, State); -process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax, - [{EndOffset, Chunk} | ChunkOffsets], SubChunkSize, Count, State) - when RangeStart + Nonce * SubChunkSize < EndOffset - ?DATA_CHUNK_SIZE -> - %% Nonce falls in a chunk which wasn't read from disk (e.g. because there are holes - %% in the recall range). - gen_server:cast( - self(), - ?MSG_REMOVE_SUB_CHUNKS_FROM_CACHE(NoncesPerChunk, Candidate#mining_candidate{ nonce = Nonce }) - ), - process_chunks(WhichChunk, Candidate, RangeStart, Nonce + NoncesPerChunk, NoncesPerChunk, - NonceMax, [{EndOffset, Chunk} | ChunkOffsets], SubChunkSize, Count, State); -process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax, - [{EndOffset, _Chunk} | ChunkOffsets], SubChunkSize, Count, State) - when RangeStart + Nonce * SubChunkSize >= EndOffset -> - %% Nonce falls in a chunk beyond the current chunk offset. Move ahead to the next - %% chunk offset. - process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax, - ChunkOffsets, SubChunkSize, Count, State); -process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMax, - [{_EndOffset, Chunk} | ChunkOffsets], SubChunkSize, Count, State) -> - %% Process all sub-chunks in Chunk, and then advance to the next chunk. - State2 = process_all_sub_chunks(WhichChunk, Chunk, Candidate, Nonce, State), - process_chunks( - WhichChunk, Candidate, RangeStart, Nonce + NoncesPerChunk, NoncesPerChunk, NonceMax, - ChunkOffsets, SubChunkSize, Count + 1, State2). - -process_all_sub_chunks(_WhichChunk, <<>>, _Candidate, _Nonce, State) -> - State; -process_all_sub_chunks(WhichChunk, Chunk, Candidate, Nonce, State) -> - {SubChunk, Rest} = extract_sub_chunk(Chunk, Candidate), - Candidate2 = Candidate#mining_candidate{ nonce = Nonce }, - State2 = process_sub_chunk(WhichChunk, Candidate2, SubChunk, State), - process_all_sub_chunks(WhichChunk, Rest, Candidate2, Nonce + 1, State2); -process_all_sub_chunks(WhichChunk, Rest, _Candidate, Nonce, State) -> - ?LOG_ERROR([{event, failed_to_split_chunk_into_sub_chunks}, - {remaining_size, byte_size(Rest)}, - {nonce, Nonce}, - {chunk, WhichChunk}]), - State. +-spec handle_task( + Task :: { + EventType :: compute_h0 | computed_h0 | chunk1 | chunk2 | computed_h1 | computed_h2 | compute_h2_for_peer, + Candidate :: #mining_candidate{}, + ExtraArgs :: term() + }, + State :: #state{} +) -> {noreply, State :: #state{}}. -%% @doc Return the sub-chunk and the remaining bytes in the chunk. For spora_2_6 packing -%% (aka difficulty 0), each sub-chunk is the size of a chunk so there are no remaining -%% bytes. -extract_sub_chunk(Chunk, #mining_candidate{ packing_difficulty = 0 }) -> - {Chunk, <<>>}; -extract_sub_chunk(Chunk, _Candidate) -> - << SubChunk:?COMPOSITE_PACKING_SUB_CHUNK_SIZE/binary, Rest/binary >> = Chunk, - {SubChunk, Rest}. - -process_sub_chunk(chunk1, Candidate, SubChunk, State) -> - ar_mining_hash:compute_h1(self(), Candidate#mining_candidate{ chunk1 = SubChunk }), - State; -process_sub_chunk(chunk2, Candidate, SubChunk, State) -> - #mining_candidate{ session_key = SessionKey } = Candidate, - Candidate2 = Candidate#mining_candidate{ chunk2 = SubChunk }, - case cycle_sub_chunk_cache(Candidate2, SubChunk, #{chunk2 => true}, State) of - {{<<>>, #{chunk1 := true, h1 := H1}}, State2} -> - ar_mining_hash:compute_h2(self(), Candidate2#mining_candidate{ h1 = H1 }), - %% Decrement 1 for chunk2: - %% we're computing h2 for a peer so chunk1 was not previously read or cached - %% on this node - State2; - {{Chunk, #{chunk1 := true, h1 := H1}}, State2} -> - ar_mining_hash:compute_h2( - self(), Candidate2#mining_candidate{ chunk1 = Chunk, h1 = H1 }), - %% Decrement 2 for chunk1 and chunk2: - %% 1. chunk1 was previously read and cached - %% 2. chunk2 that was just read and will shortly be used to compute h2 - State2; - {{<<>>, _ChunkMeta}, State2} -> - %% Decrement 1 for chunk2 - %% do_not_cache indicates chunk1 was not and will not be read or cached - State2; - {cached, State2} -> - case Candidate2#mining_candidate.cm_lead_peer of - not_set -> - ok; - _ -> - ?LOG_ERROR([{event, cm_chunk2_cached_before_chunk1}, - {worker, State#state.name}, - {partition_number, Candidate2#mining_candidate.partition_number}, - {partition_number2, Candidate2#mining_candidate.partition_number2}, - {cm_peer, ar_util:format_peer(Candidate2#mining_candidate.cm_lead_peer)}, - {cache_ref, Candidate2#mining_candidate.cache_ref}, - {nonce, Candidate2#mining_candidate.nonce}, - {session, ar_nonce_limiter:encode_session_key(SessionKey)}]) - end, - State2; - {error, Reason} -> - ?LOG_ERROR([{event, mining_worker_failed_to_cycle_chunk_cache}, - {worker, State#state.name}, {partition, State#state.partition_number}, - {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}, - {reason, Reason}]), - State - end. +%% @doc Handle the `compute_h0` task. +%% Indicates that the VDF step has been computed. +handle_task({compute_h0, Candidate, _ExtraArgs}, State) -> + #state{ + latest_vdf_step_number = LatestVDFStepNumber, + vdf_queue_limit = VDFQueueLimit + } = State, + #mining_candidate{ step_number = StepNumber } = Candidate, + State2 = report_and_reset_hashes(State), + % Check if we need to compute h0 early + case StepNumber >= LatestVDFStepNumber - VDFQueueLimit of + true -> + ar_mining_hash:compute_h0(self(), Candidate), + {noreply, State2#state{ latest_vdf_step_number = max(StepNumber, LatestVDFStepNumber) }}; + false -> + {noreply, State2} + end; -priority(computed_h2, StepNumber) -> - {1, -StepNumber}; -priority(computed_h1, StepNumber) -> - {2, -StepNumber}; -priority(compute_h2_for_peer, StepNumber) -> - {2, -StepNumber}; -priority(chunk2, StepNumber) -> - {3, -StepNumber}; -priority(chunk1, StepNumber) -> - {4, -StepNumber}; -priority(computed_h0, StepNumber) -> - {5, -StepNumber}; -priority(compute_h0, StepNumber) -> - {6, -StepNumber}. +%% @doc Handle the `computed_h0` task. +%% Indicates that the hash for the VDF step has been computed. +handle_task({computed_h0, Candidate, _ExtraArgs}, State) -> + #mining_candidate{ + h0 = H0, partition_number = Partition1, partition_upper_bound = PartitionUpperBound + } = Candidate, + {RecallRange1Start, RecallRange2Start} = ar_block:get_recall_range(H0, Partition1, PartitionUpperBound), + Partition2 = ar_node:get_partition_number(RecallRange2Start), + Candidate2 = generate_cache_ref(Candidate#mining_candidate{ partition_number2 = Partition2 }), + %% Check if the recall ranges are readable to avoid reserving cache space for non-existent data. + Range1Exists = ar_mining_io:is_recall_range_readable(Candidate2, RecallRange1Start), + Range2Exists = ar_mining_io:is_recall_range_readable(Candidate2, RecallRange2Start), + + case {Range1Exists, Range2Exists} of + {true, true} -> + %% Both recall ranges are readable, so we need to reserve cache space for both. + case try_to_reserve_cache_space(2, Candidate2#mining_candidate.session_key, State) of + {true, State1} -> + %% Read the recall ranges; the result of the read will be reported by the `chunk1` and `chunk2` tasks. + ar_mining_io:read_recall_range(chunk1, self(), Candidate2, RecallRange1Start), + ar_mining_io:read_recall_range(chunk2, self(), Candidate2, RecallRange2Start), + {noreply, State1}; + false -> + %% We don't have enough cache space to read the recall ranges, so we'll try again later. + add_delayed_task(self(), computed_h0, Candidate), + {noreply, State} + end; + {true, false} -> + %% Only the first recall range is readable, so we need to reserve cache space for it. + case try_to_reserve_cache_space(1, Candidate2#mining_candidate.session_key, State) of + {true, State1} -> + %% Read the recall range; the result of the read will be reported by the `chunk1` task. + ar_mining_io:read_recall_range(chunk1, self(), Candidate2, RecallRange1Start), + %% Mark chunk2 as missing, not to wait for it to arrive. + State2 = mark_chunk2_missing(Candidate2, State1), + {noreply, State2}; + false -> + %% We don't have enough cache space to read the recall range, so we'll try again later. + add_delayed_task(self(), computed_h0, Candidate), + {noreply, State} + end; + {false, _} -> + {noreply,State} + end; +%% @doc Handle the `chunk1` task. +%% Indicates that the first recall range has been read. handle_task({chunk1, Candidate, [RangeStart, ChunkOffsets]}, State) -> State2 = process_chunks(chunk1, Candidate, RangeStart, ChunkOffsets, State), {noreply, State2}; +%% @doc Handle the `chunk2` task. +%% Indicates that the second recall range has been read. handle_task({chunk2, Candidate, [RangeStart, ChunkOffsets]}, State) -> State2 = process_chunks(chunk2, Candidate, RangeStart, ChunkOffsets, State), {noreply, State2}; -handle_task({compute_h0, Candidate, _ExtraArgs}, State) -> - #state{ latest_vdf_step_number = LatestVDFStepNumber, - vdf_queue_limit = VDFQueueLimit } = State, - #mining_candidate{ session_key = SessionKey, step_number = StepNumber } = Candidate, - State2 = report_hashes(State), - State4 = case try_to_reserve_cache_space(SessionKey, State2) of - {true, State3} -> - ar_mining_hash:compute_h0(self(), Candidate), - case StepNumber > LatestVDFStepNumber of - true -> - State3#state{ latest_vdf_step_number = StepNumber }; - false -> - State3 - end; - false -> - case StepNumber >= LatestVDFStepNumber - VDFQueueLimit of - true -> - %% Wait a bit, and then re-add the task. - add_delayed_task(self(), compute_h0, Candidate); - false -> - ok - end, - State2 - end, - {noreply, State4}; - -handle_task({computed_h0, Candidate, _ExtraArgs}, State) -> - #mining_candidate{ h0 = H0, partition_number = Partition1, - partition_upper_bound = PartitionUpperBound } = Candidate, - {RecallRange1Start, RecallRange2Start} = ar_block:get_recall_range(H0, - Partition1, PartitionUpperBound), - Partition2 = ar_node:get_partition_number(RecallRange2Start), - Candidate2 = Candidate#mining_candidate{ partition_number2 = Partition2 }, - Candidate3 = generate_cache_ref(Candidate2), - Range1Exists = ar_mining_io:read_recall_range( - chunk1, self(), Candidate3, RecallRange1Start), - State3 = case Range1Exists of - true -> - Range2Exists = ar_mining_io:read_recall_range( - chunk2, self(), Candidate3, RecallRange2Start), - case Range2Exists of - true -> - State; - false -> - %% Release just the Range2 cache space we reserved with - %% try_to_reserve_cache_space/2 - do_not_cache(Candidate3, State) - end; - false -> - %% Release the Range1 *and* Range2 cache space we reserved with - %% try_to_reserve_cache_space/2 - State - end, - {noreply, State3}; - +%% @doc Handle the `computed_h1` task. +%% Indicates that the single hash for the first recall range has been computed. handle_task({computed_h1, Candidate, _ExtraArgs}, State) -> - #mining_candidate{ h1 = H1, chunk1 = Chunk1 } = Candidate, + #mining_candidate{ h1 = H1 } = Candidate, State2 = hash_computed(h1, Candidate, State), - case h1_passes_diff_checks(H1, Candidate, State2) of + State3 = case h1_passes_diff_checks(H1, Candidate, State2) of true -> + %% h1 solution found, report it. ?LOG_INFO([{event, found_h1_solution}, {step, Candidate#mining_candidate.step_number}, {worker, State2#state.name}, {h1, ar_util:encode(H1)}, {p1, Candidate#mining_candidate.partition_number}, {difficulty, get_difficulty(State2, Candidate)}]), - ar_mining_stats:h1_solution(), - %% Decrement 1 for chunk1: - %% Since we found a solution we won't need chunk2 (and it will be evicted if - %% necessary below) - State3 = remove_sub_chunks_from_cache(Candidate, 1, State2), ar_mining_server:prepare_and_post_solution(Candidate), - {noreply, State3}; - Result -> - case Result of - partial -> - ar_mining_server:prepare_and_post_solution(Candidate); - _ -> - ok - end, - {ok, Config} = application:get_env(arweave, config), - case cycle_sub_chunk_cache(Candidate, Chunk1, #{chunk1 => true, h1 => H1}, State2) of - {cached, State3} -> - %% Chunk2 hasn't been read yet, so we cache Chunk1 and wait for - %% Chunk2 to be read. - {noreply, State3}; - {{<<>>, _Meta}, State3} -> - %% This node does not store Chunk2. If we're part of a coordinated - %% mining set, we can try one of our peers, otherwise we're done. - case Config#config.coordinated_mining of - false -> - ok; - true -> - DiffPair = - case get_partial_difficulty(State3, Candidate) of - not_set -> - get_difficulty(State3, Candidate); - PartialDiffPair -> - PartialDiffPair - end, - ar_coordination:computed_h1(Candidate, DiffPair) - end, - %% Decrement 1 for chunk1: - %% do_not_cache indicates chunk2 was not and will not be read or cached - {noreply, State3}; - {{Chunk2, #{chunk2 := true}}, State3} -> - %% Chunk2 has already been read, so we can compute H2 now. - ar_mining_hash:compute_h2(self(), Candidate#mining_candidate{ chunk2 = Chunk2 }), - %% Decrement 2 for chunk1 and chunk2: - %% 1. chunk2 was previously read and cached - %% 2. chunk1 that was just read and used to compute H1 - {noreply, State3} - end + ar_mining_stats:h1_solution(), + %% Update the cache to store h1. + case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate#mining_candidate.cache_ref, Candidate#mining_candidate.nonce), + Candidate#mining_candidate.session_key, + State2#state.chunk_cache, + fun(CachedValue) -> {ok, CachedValue#ar_mining_cache_value{h1 = H1}} end + ) of + {ok, ChunkCache1} -> State2#state{ chunk_cache = ChunkCache1 }; + {error, Reason1} -> + ?LOG_ERROR([{event, mining_worker_failed_to_process_h1}, + {worker, State2#state.name}, {partition, State2#state.partition_number}, + {nonce, Candidate#mining_candidate.nonce}, + {session_key, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, + {reason, Reason1}]), + State2 + end; + partial -> ar_mining_server:prepare_and_post_solution(Candidate); + _ -> ok + end, + %% Check if we need to compute h2. + {ok, Config} = application:get_env(arweave, config), + case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate#mining_candidate.cache_ref, Candidate#mining_candidate.nonce), + Candidate#mining_candidate.session_key, + State3#state.chunk_cache, + fun + (#ar_mining_cache_value{chunk2_missing = true}) -> + %% This node does not store chunk2. If we're part of a coordinated + %% mining set, we can try one of our peers, otherwise we're done. + case Config#config.coordinated_mining of + false -> ok; + true -> + DiffPair = case get_partial_difficulty(State3, Candidate) of + not_set -> get_difficulty(State3, Candidate); + PartialDiffPair -> PartialDiffPair + end, + ar_coordination:computed_h1(Candidate, DiffPair) + end, + %% Remove the cached value from the cache. + {ok, drop}; + (#ar_mining_cache_value{chunk2 = undefined} = CachedValue) -> + %% chunk2 hasn't been read yet, so we cache chunk1 and wait for it. + {ok, CachedValue#ar_mining_cache_value{h1 = H1}}; + (#ar_mining_cache_value{chunk2 = Chunk2} = CachedValue) -> + %% chunk2 has already been read, so we can compute H2 now. + ar_mining_hash:compute_h2(self(), Candidate#mining_candidate{ chunk2 = Chunk2 }), + {ok, CachedValue#ar_mining_cache_value{h1 = H1}} + end + ) of + {ok, ChunkCache2} -> {noreply, State3#state{ chunk_cache = ChunkCache2 }}; + {error, Reason2} -> + ?LOG_ERROR([{event, mining_worker_failed_to_process_h1}, + {worker, State3#state.name}, {partition, State3#state.partition_number}, + {nonce, Candidate#mining_candidate.nonce}, + {session_key, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, + {reason, Reason2}]), + {noreply, State3} end; +%% @doc Handle the `computed_h2` task. +%% Indicates that the single hash for the second recall range has been computed. handle_task({computed_h2, Candidate, _ExtraArgs}, State) -> #mining_candidate{ chunk2 = Chunk2, h2 = H2, cm_lead_peer = Peer } = Candidate, State2 = hash_computed(h2, Candidate, State), PassesDiffChecks = h2_passes_diff_checks(H2, Candidate, State2), case PassesDiffChecks of - false -> - ok; + false -> ok; true -> ?LOG_INFO([{event, found_h2_solution}, {worker, State#state.name}, @@ -577,22 +473,47 @@ handle_task({computed_h2, Candidate, _ExtraArgs}, State) -> end, case {PassesDiffChecks, Peer} of {false, _} -> + %% h2 does not pass diff checks, do nothing. ok; - {_, not_set} -> + {Check, not_set} when partial == Check orelse true == Check -> + %% This branch only handles the case where we're not part of a coordinated mining set. + %% This includes the solo mining setup, and pool mining setup. + %% In case of solo mining, the `Check` will alwaysbe `true`. + %% In case of pool mining, the `Check` will be `partial` or `true`. + %% In either case, we prepare and post the solution. ar_mining_server:prepare_and_post_solution(Candidate); - _ -> + {Check, _} when partial == Check orelse true == Check -> + %% This branch only handles the case where we're part of a coordinated mining set. + %% In this case, we prepare the PoA2 and send it to the lead peer. PoA2 = case ar_mining_server:prepare_poa(poa2, Candidate, #poa{}) of - {ok, PoA} -> - PoA; + {ok, PoA} -> PoA; {error, _Error} -> %% Fallback. This will probably fail later, but prepare_poa/3 should %% have already printed several errors so we'll continue just in case. + %% df: Is this the right fallback?.. #poa{ chunk = Chunk2 } end, ar_coordination:computed_h2_for_peer(Candidate#mining_candidate{ poa2 = PoA2 }) end, - {noreply, State2}; + %% Remove the cached value from the cache. + case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate#mining_candidate.cache_ref, Candidate#mining_candidate.nonce), + Candidate#mining_candidate.session_key, + State2#state.chunk_cache, + fun(_) -> {ok, drop} end + ) of + {ok, ChunkCache2} -> {noreply, State2#state{ chunk_cache = ChunkCache2 }}; + {error, Reason} -> + ?LOG_ERROR([{event, mining_worker_failed_to_process_computed_h2}, + {worker, State2#state.name}, {partition, State2#state.partition_number}, + {nonce, Candidate#mining_candidate.nonce}, + {session_key, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, + {reason, Reason}]), + {noreply, State2} + end; +%% @doc Handle the `compute_h2_for_peer` task. +%% Indicates that we got a request to compute h2 for a peer. handle_task({compute_h2_for_peer, Candidate, _ExtraArgs}, State) -> #mining_candidate{ h0 = H0, @@ -601,26 +522,16 @@ handle_task({compute_h2_for_peer, Candidate, _ExtraArgs}, State) -> cm_h1_list = H1List, cm_lead_peer = Peer } = Candidate, - - {_RecallRange1Start, RecallRange2Start} = ar_block:get_recall_range(H0, - Partition1, PartitionUpperBound), + {_, RecallRange2Start} = ar_block:get_recall_range(H0, Partition1, PartitionUpperBound), Candidate2 = generate_cache_ref(Candidate), - %% Clear the list so we aren't copying it around all over the place + %% Clear the list so we aren't copying it around all over the place. Candidate3 = Candidate2#mining_candidate{ cm_h1_list = [] }, - Range2Exists = ar_mining_io:read_recall_range(chunk2, self(), Candidate3, RecallRange2Start), case Range2Exists of true -> ar_mining_stats:h1_received_from_peer(Peer, length(H1List)), - %% Note: when processing CM requests we always reserve the cache space and proceed - %% *even if* this puts us over the chunk cache limit. This may have to be - %% revisited later if we find that this causes unacceptable memory bloat. - State2 = State, - %% First flag all nonces in the range as do_not_cache, then cache the specific - %% nonces included in the H1 list. This will make sure we don't cache the chunk2s - %% that are read for the missing nonces. - State3 = do_not_cache(Candidate3, State2), - {noreply, cache_h1_list(Candidate3, H1List, State3)}; + State2 = mark_chunk2_missing(Candidate3, State), + {noreply, cache_h1_list(Candidate3, H1List, State2)}; false -> %% This can happen for two reasons: %% 1. (most common) Remote peer has requested a range we don't have from a @@ -634,13 +545,143 @@ handle_task({compute_h2_for_peer, Candidate, _ExtraArgs}, State) -> %%% Private functions. %%%=================================================================== +process_chunks(WhichChunk, Candidate, RangeStart, ChunkOffsets, State) -> + PackingDifficulty = Candidate#mining_candidate.packing_difficulty, + SubchunksPerRecallRange = ar_block:get_max_nonce(PackingDifficulty), + SubchunksPerChunk = ar_block:get_nonces_per_chunk(PackingDifficulty), + SubchunkSize = ar_block:get_sub_chunk_size(PackingDifficulty), + process_chunks( + WhichChunk, Candidate, RangeStart, 0, SubchunksPerChunk, + SubchunksPerRecallRange, ChunkOffsets, SubchunkSize, 0, State + ). + +process_chunks( + WhichChunk, Candidate, _RangeStart, Nonce, _SubchunksPerChunk, + SubchunksPerRecallRange, _ChunkOffsets, _SubchunkSize, Count, State +) when Nonce > SubchunksPerRecallRange -> + %% We've processed all the subchunks in the recall range. + ar_mining_stats:chunks_read(case WhichChunk of + chunk1 -> Candidate#mining_candidate.partition_number; + chunk2 -> Candidate#mining_candidate.partition_number2 + end, Count), + State; +process_chunks( + WhichChunk, Candidate, RangeStart, Nonce, SubchunksPerChunk, + SubchunksPerRecallRange, [], SubchunkSize, Count, State +) -> + %% No more ChunkOffsets means no more chunks have been read. Iterate through all the + %% remaining nonces and remove the full chunks from the cache. + State2 = remove_subchunks_from_cache(Candidate#mining_candidate{ nonce = Nonce }, SubchunksPerChunk, State), + %% Process the next chunk. + process_chunks( + WhichChunk, Candidate, RangeStart, Nonce + SubchunksPerChunk, + SubchunksPerChunk, SubchunksPerRecallRange, [], SubchunkSize, Count, State2 + ); +process_chunks( + WhichChunk, Candidate, RangeStart, Nonce, SubchunksPerChunk, + SubchunksPerRecallRange, [{ChunkEndOffset, Chunk} | ChunkOffsets], SubchunkSize, Count, State +) -> + SubchunkStartOffset = RangeStart + Nonce * SubchunkSize, + ChunkStartOffset = ChunkEndOffset - ?DATA_CHUNK_SIZE, + case {SubchunkStartOffset < ChunkStartOffset, SubchunkStartOffset >= ChunkEndOffset} of + {true, false} -> + %% Skip this nonce. + %% Nonce falls in a chunk which wasn't read from disk (for example, because there are holes + %% in the recall range), e.g. the nonce is in the middle of a non-existent chunk. + %% Remove chunk2 from cache if it is there already, we don't need it (e.g. WhichChunk == chunk1). + %% Remove chunk1 from cache if it is there already, we already processed it (e.g. WhichChunk == chunk2). + State2 = remove_subchunks_from_cache(Candidate#mining_candidate{ nonce = Nonce }, SubchunksPerChunk, State), + process_chunks( + WhichChunk, Candidate, RangeStart, Nonce + SubchunksPerChunk, SubchunksPerChunk, + SubchunksPerRecallRange, [{ChunkEndOffset, Chunk} | ChunkOffsets], SubchunkSize, Count, State2 + ); + {false, true} -> + %% Skip this chunk. + %% Nonce falls in a chunk beyond the current chunk offset, (for example, because we + %% read extra chunk in the beginning of recall range). Move ahead to the next + %% chunk offset. + %% No need to remove anything from cache, as the nonce is still in the recall range. + process_chunks( + WhichChunk, Candidate, RangeStart, Nonce, SubchunksPerChunk, + SubchunksPerRecallRange, ChunkOffsets, SubchunkSize, Count, State + ); + {false, false} -> + %% Process all sub-chunks in Chunk, and then advance to the next chunk. + State2 = process_all_subchunks(WhichChunk, Chunk, Candidate, Nonce, State), + process_chunks( + WhichChunk, Candidate, RangeStart, Nonce + SubchunksPerChunk, SubchunksPerChunk, + SubchunksPerRecallRange, ChunkOffsets, SubchunkSize, Count + 1, State2 + ) + end. + +process_all_subchunks(_WhichChunk, <<>>, _Candidate, _Nonce, State) -> State; +process_all_subchunks(WhichChunk, Chunk, Candidate, Nonce, State) +when Candidate#mining_candidate.packing_difficulty == 0 -> + %% Spora 2.6 packing (aka difficulty 0). + Candidate2 = Candidate#mining_candidate{ nonce = Nonce }, + process_subchunk(WhichChunk, Candidate2, Chunk, State); +process_all_subchunks( + WhichChunk, + << Subchunk:?COMPOSITE_PACKING_SUB_CHUNK_SIZE/binary, Rest/binary >>, + Candidate, Nonce, State +) -> + %% Composite packing / replica packing (aka difficulty 1+). + Candidate2 = Candidate#mining_candidate{ nonce = Nonce }, + State2 = process_subchunk(WhichChunk, Candidate2, Subchunk, State), + process_all_subchunks(WhichChunk, Rest, Candidate2, Nonce + 1, State2); +process_all_subchunks(WhichChunk, Rest, _Candidate, Nonce, State) -> + %% The chunk is not a multiple of the subchunk size. + ?LOG_ERROR([{event, failed_to_split_chunk_into_subchunks}, + {remaining_size, byte_size(Rest)}, + {nonce, Nonce}, + {chunk, WhichChunk}]), + State. + +process_subchunk(chunk1, Candidate, Subchunk, State) -> + ar_mining_hash:compute_h1(self(), Candidate#mining_candidate{ chunk1 = Subchunk }), + State; +process_subchunk(chunk2, Candidate, Subchunk, State) -> + #mining_candidate{ session_key = SessionKey } = Candidate, + Candidate2 = Candidate#mining_candidate{ chunk2 = Subchunk }, + case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate2#mining_candidate.cache_ref, Candidate2#mining_candidate.nonce), + SessionKey, + State#state.chunk_cache, + fun + (#ar_mining_cache_value{h1 = undefined} = CachedValue) -> + %% H1 is not yet calculated, cache the chunk2 for this nonce. + {ok, CachedValue#ar_mining_cache_value{chunk2 = Subchunk}}; + (#ar_mining_cache_value{h1 = H1} = CachedValue) -> + %% H1 is already calculated, compute h2 and cache the chunk2 for this nonce. + ar_mining_hash:compute_h2(self(), Candidate2#mining_candidate{ h1 = H1 }), + {ok, CachedValue#ar_mining_cache_value{chunk2 = Subchunk}} + end + ) of + {ok, ChunkCache2} -> State#state{ chunk_cache = ChunkCache2 }; + {error, Reason} -> + ?LOG_ERROR([{event, mining_worker_failed_to_process_chunk2}, + {worker, State#state.name}, {partition, State#state.partition_number}, + {nonce, Candidate2#mining_candidate.nonce}, + {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}, + {reason, Reason}]), + State + end. + +priority(computed_h2, StepNumber) -> {1, -StepNumber}; +priority(computed_h1, StepNumber) -> {2, -StepNumber}; +priority(compute_h2_for_peer, StepNumber) -> {2, -StepNumber}; +priority(chunk2, StepNumber) -> {3, -StepNumber}; +priority(chunk1, StepNumber) -> {4, -StepNumber}; +priority(computed_h0, StepNumber) -> {5, -StepNumber}; +priority(compute_h0, StepNumber) -> {6, -StepNumber}. + %% @doc Returns true if the mining candidate belongs to a valid mining session. Always assume -%% that a coordinated mining candidate is valid (its cm_lead_peer is set) +%% that a coordinated mining candidate is valid (its cm_lead_peer is set). is_session_valid(_State, #mining_candidate{ cm_lead_peer = Peer }) when Peer /= not_set -> true; is_session_valid(State, #mining_candidate{ session_key = SessionKey }) -> - ar_chunk_cache:group_exists(SessionKey, State#state.chunk_cache). + ar_mining_cache:session_exists(SessionKey, State#state.chunk_cache). h1_passes_diff_checks(H1, Candidate, State) -> passes_diff_checks(H1, true, Candidate, State). @@ -652,27 +693,23 @@ passes_diff_checks(SolutionHash, IsPoA1, Candidate, State) -> DiffPair = get_difficulty(State, Candidate), #mining_candidate{ packing_difficulty = PackingDifficulty } = Candidate, case ar_node_utils:passes_diff_check(SolutionHash, IsPoA1, DiffPair, PackingDifficulty) of - true -> - true; + true -> true; false -> case get_partial_difficulty(State, Candidate) of - not_set -> - false; + not_set -> false; PartialDiffPair -> - case ar_node_utils:passes_diff_check(SolutionHash, IsPoA1, - PartialDiffPair, PackingDifficulty) of - true -> - partial; - false -> - false + case ar_node_utils:passes_diff_check( + SolutionHash, IsPoA1, PartialDiffPair, PackingDifficulty + ) of + true -> partial; + false -> false end end end. maybe_warn_about_lag(Q, Name) -> case gb_sets:is_empty(Q) of - true -> - ok; + true -> ok; false -> case gb_sets:take_smallest(Q) of {{_Priority, _ID, {compute_h0, _}}, Q3} -> @@ -686,42 +723,40 @@ maybe_warn_about_lag(Q, Name) -> %% the threshold in the future. N = count_h0_tasks(Q3) + 1, case N > 2 of + false -> ok; true -> ?LOG_WARNING([ {event, mining_worker_lags_behind_the_nonce_limiter}, {worker, Name}, - {step_count, N}]); - false -> - ok + {step_count, N}]) end; - _ -> - ok + _ -> ok end end. count_h0_tasks(Q) -> case gb_sets:is_empty(Q) of - true -> - 0; + true -> 0; false -> case gb_sets:take_smallest(Q) of {{_Priority, _ID, {compute_h0, _Args}}, Q2} -> 1 + count_h0_tasks(Q2); - _ -> - 0 + _ -> 0 end end. update_sessions(ActiveSessions, State) -> - CurrentSessions = ar_chunk_cache:get_groups(State#state.chunk_cache), + CurrentSessions = ar_mining_cache:get_sessions(State#state.chunk_cache), AddedSessions = lists:subtract(ActiveSessions, CurrentSessions), RemovedSessions = lists:subtract(CurrentSessions, ActiveSessions), - add_sessions(AddedSessions, remove_sessions(RemovedSessions, State)). + State1 = add_sessions(AddedSessions, remove_sessions(RemovedSessions, State)), + %% Adding/removing sessions will cause the chunk cache size to change, + %% so we need to update the metrics. + report_chunk_cache_metrics(State1). -add_sessions([], State) -> - State; +add_sessions([], State) -> State; add_sessions([SessionKey | AddedSessions], State) -> - ChunkCache = ar_chunk_cache:add_group(SessionKey, State#state.chunk_cache), + ChunkCache = ar_mining_cache:add_session(SessionKey, State#state.chunk_cache), ?LOG_DEBUG([{event, mining_debug_add_session}, {worker, State#state.name}, {partition, State#state.partition_number}, {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}]), @@ -729,17 +764,16 @@ add_sessions([SessionKey | AddedSessions], State) -> chunk_cache = ChunkCache }). -remove_sessions([], State) -> - State; +remove_sessions([], State) -> State; remove_sessions([SessionKey | RemovedSessions], State) -> - ChuckCache = ar_chunk_cache:drop_group(SessionKey, State#state.chunk_cache), + ChunkCache = ar_mining_cache:drop_session(SessionKey, State#state.chunk_cache), TaskQueue = remove_tasks(SessionKey, State#state.task_queue), ?LOG_DEBUG([{event, mining_debug_remove_session}, {worker, State#state.name}, {partition, State#state.partition_number}, {session, ar_nonce_limiter:encode_session_key(SessionKey)}]), remove_sessions(RemovedSessions, State#state{ task_queue = TaskQueue, - chunk_cache = ChuckCache + chunk_cache = ChunkCache }). remove_tasks(SessionKey, TaskQueue) -> @@ -748,22 +782,25 @@ remove_tasks(SessionKey, TaskQueue) -> case Candidate#mining_candidate.session_key == SessionKey of true -> prometheus_gauge:dec(mining_server_task_queue_len, [TaskType]), - %% remove the task false; false -> - %% keep the task true end end, TaskQueue ). -try_to_reserve_cache_space(SessionKey, #state{ +try_to_reserve_cache_space(Multiplier, SessionKey, #state{ packing_difficulty = PackingDifficulty, chunk_cache = ChunkCache0 } = State) -> - case ar_chunk_cache:reserve(SessionKey, ar_block:get_recall_range_size(PackingDifficulty), ChunkCache0) of - {ok, ChunkCache1} -> {true, State#state{ chunk_cache = ChunkCache1 }}; + case ar_mining_cache:reserve_for_session( + SessionKey, Multiplier * ar_block:get_recall_range_size(PackingDifficulty), ChunkCache0 + ) of + {ok, ChunkCache1} -> + State1 = State#state{ chunk_cache = ChunkCache1 }, + report_chunk_cache_metrics(State1), + {true, State1}; {error, Reason} -> ?LOG_WARNING([{event, mining_worker_failed_to_reserve_cache_space}, {worker, State#state.name}, {partition, State#state.partition_number}, @@ -772,97 +809,71 @@ try_to_reserve_cache_space(SessionKey, #state{ false end. -do_not_cache(Candidate, State) -> +mark_chunk2_missing(Candidate, State) -> #mining_candidate{ packing_difficulty = PackingDifficulty } = Candidate, - do_not_cache(0, ar_block:get_max_nonce(PackingDifficulty), Candidate, State). + mark_chunk2_missing(0, ar_block:get_max_nonce(PackingDifficulty), Candidate, State). -do_not_cache(Nonce, NonceMax, _Candidate, State) - when Nonce > NonceMax -> +mark_chunk2_missing(Nonce, SubchunksPerRecallRange, _Candidate, State) +when Nonce > SubchunksPerRecallRange -> State; -do_not_cache(Nonce, NonceMax, Candidate, State) -> - State2 = cache_chunk(<<>>, Candidate#mining_candidate{ nonce = Nonce }, State), - do_not_cache(Nonce + 1, NonceMax, Candidate, State2). - -%% @doc The sub_chunk_cache stores either the first or second chunk for a given nonce. This is -%% because we process both the first and second recall ranges in parallel and don't know -%% which data will be available first. The function manages that shared cache slot by either -%% caching data if its the first to arrive, or "popping" data that was previously cached. The -%% caller is responsible for taking the appropriate action based on the return value. -cycle_sub_chunk_cache(#mining_candidate{ cache_ref = CacheRef } = Candidate, Chunk, ChunkMeta, State) - when CacheRef /= not_set -> - #mining_candidate{ nonce = Nonce, session_key = SessionKey } = Candidate, - case ar_chunk_cache:take_chunk(SessionKey, {CacheRef, Nonce}, State#state.chunk_cache) of - {ok, {<<>>, Meta}, ChunkCache1} -> - case ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, ChunkCache1) of - {ok, ChunkCache2} -> - {{<<>>, Meta}, State#state{ chunk_cache = ChunkCache2 }}; - {error, Reason} -> - ?LOG_ERROR([{event, mining_worker_failed_to_add_chunk_to_cache}, - {worker, State#state.name}, {partition, State#state.partition_number}, - {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}, - {sessions, length(ar_chunk_cache:get_groups(State#state.chunk_cache))}, - {cache_size, ar_chunk_cache:cache_size(State#state.chunk_cache)}, - {chunk_size, byte_size(Chunk)}, - {nonce, Nonce}, {reason, Reason}]), - {error, Reason} - end; - {error, chunk_not_found} -> - case ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, {Chunk, ChunkMeta}, State#state.chunk_cache) of - {ok, ChunkCache2} -> - {cached, State#state{ chunk_cache = ChunkCache2 }}; - {error, Reason} -> - ?LOG_ERROR([{event, mining_worker_failed_to_add_chunk_to_cache}, - {worker, State#state.name}, {partition, State#state.partition_number}, - {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}, - {sessions, length(ar_chunk_cache:get_groups(State#state.chunk_cache))}, - {cache_size, ar_chunk_cache:cache_size(State#state.chunk_cache)}, - {chunk_size, byte_size(Chunk)}, - {nonce, Nonce}, {reason, Reason}]), - {error, Reason} - end; - {ok, RetVal, ChunkCache1} -> - {RetVal, State#state{ chunk_cache = ChunkCache1 }} +mark_chunk2_missing(Nonce, SubchunksPerRecallRange, Candidate, State) -> + case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate#mining_candidate.cache_ref, Nonce), + Candidate#mining_candidate.session_key, + State#state.chunk_cache, + fun(CachedValue) -> {ok, CachedValue#ar_mining_cache_value{chunk2_missing = true}} end + ) of + {ok, ChunkCache1} -> + mark_chunk2_missing(Nonce + 1, SubchunksPerRecallRange, Candidate, State#state{ chunk_cache = ChunkCache1 }); + {error, Reason} -> + %% NB: this clause may cause a memory leak, because mining worker will wait for + %% chunk2 to arrive. + ?LOG_ERROR([{event, mining_worker_failed_to_add_chunk_to_cache}, {reason, Reason}]), + mark_chunk2_missing(Nonce + 1, SubchunksPerRecallRange, Candidate, State) end. -%% @doc Remove SubChunkCount sub-chunks from the cache starting at +%% @doc Remove SubchunkCount sub-chunks from the cache starting at %% Candidate#mining_candidate.nonce. -remove_sub_chunks_from_cache(_Candidate, 0, State) -> +remove_subchunks_from_cache(_Candidate, 0, State) -> State; -remove_sub_chunks_from_cache(#mining_candidate{ cache_ref = CacheRef } = Candidate, - SubChunkCount, State) when CacheRef /= not_set -> +remove_subchunks_from_cache(#mining_candidate{ cache_ref = CacheRef } = Candidate, + SubchunkCount, State) when CacheRef /= not_set -> #mining_candidate{ nonce = Nonce, session_key = SessionKey } = Candidate, - %% Decrement the cache size by 1 for each sub-chunk being removed. - %% We may decrement the cache size further depending on what's already cached. - State2 = State, - State3 = case ar_chunk_cache:take_chunk(SessionKey, {CacheRef, Nonce}, State#state.chunk_cache) of - {ok, {<<>>, _}, ChunkCache1} -> - State2#state{ chunk_cache = ChunkCache1 }; - {error, chunk_not_found} -> - cache_chunk(<<>>, Candidate, State2); - {ok, {_Chunk, #{chunk1 := true, h1 := _H1}}, ChunkCache1} -> - %% If we find data from a CM peer, discard it but don't decrement the cache size - State2#state{ chunk_cache = ChunkCache1 }; - {ok, _, ChunkCache1} -> - %% if we find any cached data, discard it and decrement the cache size - State2#state{ chunk_cache = ChunkCache1 } + State2 = case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate#mining_candidate.cache_ref, Candidate#mining_candidate.nonce), + SessionKey, + State#state.chunk_cache, + fun(_) -> {ok, drop} end + ) of + {ok, ChunkCache1} -> + report_chunk_cache_metrics(State#state{ chunk_cache = ChunkCache1 }); + {error, Reason} -> + ?LOG_ERROR([{event, mining_worker_failed_to_remove_subchunks_from_cache}, + {worker, State#state.name}, {partition, State#state.partition_number}, + {nonce, Nonce}, {session_key, ar_nonce_limiter:encode_session_key(SessionKey)}, + {reason, Reason}]), + State end, - remove_sub_chunks_from_cache(Candidate#mining_candidate{ nonce = Nonce + 1 }, - SubChunkCount - 1, State3). - -cache_chunk(Data, Candidate, State) -> - #mining_candidate{ cache_ref = CacheRef, nonce = Nonce, session_key = SessionKey } = Candidate, - {ok, Cache1} = ar_chunk_cache:add_chunk(SessionKey, {CacheRef, Nonce}, Data, State#state.chunk_cache), - State#state{ - chunk_cache = Cache1 - }. - -cache_h1_list(_Candidate, [], State) -> - State; -cache_h1_list( - #mining_candidate{ cache_ref = CacheRef } = Candidate, - [ {H1, Nonce} | H1List ], State) when CacheRef /= not_set -> - State2 = cache_chunk({<<>>, #{chunk1 => true, h1 => H1}}, Candidate#mining_candidate{ nonce = Nonce }, State), - cache_h1_list(Candidate, H1List, State2). + remove_subchunks_from_cache(Candidate#mining_candidate{ nonce = Nonce + 1 }, SubchunkCount - 1, State2). + +cache_h1_list(_Candidate, [], State) -> State; +cache_h1_list(#mining_candidate{ cache_ref = not_set } = _Candidate, [], State) -> State; +cache_h1_list(Candidate, [ {H1, Nonce} | H1List ], State) -> + case ar_mining_cache:with_cached_value( + ?VDF_STEP_CACHE_KEY(Candidate#mining_candidate.cache_ref, Nonce), + Candidate#mining_candidate.session_key, + State#state.chunk_cache, + fun(CachedValue) -> {ok, CachedValue#ar_mining_cache_value{h1 = H1}} end + ) of + {ok, ChunkCache1} -> + cache_h1_list(Candidate, H1List, State#state{ chunk_cache = ChunkCache1 }); + {error, Reason} -> + ?LOG_ERROR([{event, mining_worker_failed_to_cache_h1}, + {worker, State#state.name}, {partition, State#state.partition_number}, + {nonce, Nonce}, {session_key, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, + {reason, Reason}]), + cache_h1_list(Candidate, H1List, State) + end. get_difficulty(State, #mining_candidate{ cm_diff = not_set }) -> State#state.diff_pair; @@ -893,7 +904,7 @@ hash_computed(WhichHash, Candidate, State) -> State#state{ h2_hashes = maps:put(PartitionNumber, Hashes+1, State#state.h2_hashes) } end. -report_hashes(State) -> +report_and_reset_hashes(State) -> maps:foreach( fun(Key, Value) -> ar_mining_stats:h1_computed(Key, Value) @@ -908,10 +919,9 @@ report_hashes(State) -> ), State#state{ h1_hashes = #{}, h2_hashes = #{} }. -report_chunk_cache_metrics(State0) -> - ChunkCache = State0#state.chunk_cache, - Partition = State0#state.partition_number, - prometheus_gauge:set(mining_server_chunk_cache_size, [Partition], ar_chunk_cache:cache_size(ChunkCache)). +report_chunk_cache_metrics(#state{chunk_cache = ChunkCache, partition_number = Partition} = State0) -> + prometheus_gauge:set(mining_server_chunk_cache_size, [Partition], ar_mining_cache:cache_size(ChunkCache)), + State0. %%%=================================================================== %%% Public Test interface.