diff --git a/big_tests/tests/cets_disco_SUITE.erl b/big_tests/tests/cets_disco_SUITE.erl index bb4e36754ef..8a7acf69074 100644 --- a/big_tests/tests/cets_disco_SUITE.erl +++ b/big_tests/tests/cets_disco_SUITE.erl @@ -3,6 +3,8 @@ -import(distributed_helper, [mim/0, mim2/0, rpc/4]). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + %%-------------------------------------------------------------------- %% Suite configuration @@ -20,19 +22,16 @@ file_cases() -> rdbms_cases() -> [rdbms_backend, - rdbms_backend_supports_auto_cleaning]. + rdbms_backend_supports_auto_cleaning, + rdbms_backend_node_doesnt_remove_itself, + rdbms_backend_db_queries]. suite() -> - distributed_helper:require_rpc_nodes([mim, mim2]) ++ escalus:suite(). + distributed_helper:require_rpc_nodes([mim, mim2]). %%-------------------------------------------------------------------- %% Init & teardown %%-------------------------------------------------------------------- -init_per_suite(Config) -> - escalus:init_per_suite(Config). - -end_per_suite(Config) -> - escalus:end_per_suite(Config). init_per_group(rdbms, Config) -> case not ct_helper:is_ct_running() @@ -46,17 +45,11 @@ init_per_group(_, Config) -> end_per_group(_, Config) -> Config. -init_per_testcase(rdbms_backend_supports_auto_cleaning = CaseName, Config) -> - mock_timestamp(mim(), month_ago()) ++ - escalus:init_per_testcase(CaseName, Config); -init_per_testcase(CaseName, Config) -> - escalus:init_per_testcase(CaseName, Config). +init_per_testcase(_CaseName, Config) -> Config. -end_per_testcase(rdbms_backend_supports_auto_cleaning = CaseName, Config) -> - unmock_timestamp(mim()), - escalus:end_per_testcase(CaseName, Config); -end_per_testcase(CaseName, Config) -> - escalus:end_per_testcase(CaseName, Config). +end_per_testcase(_CaseName, Config) -> + unmock(mim()), + unmock(mim2()). %%-------------------------------------------------------------------- %% Test cases @@ -67,51 +60,139 @@ file_backend(Config) -> Opts = #{disco_file => Path}, State = rpc(mim(), cets_discovery_file, init, [Opts]), {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]), - ['node1@localhost', 'node2@otherhost'] = lists:sort(Nodes). + ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)). rdbms_backend(_Config) -> - CN = <<"big_test">>, + CN = random_cluster_name(?FUNCTION_NAME), Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, + State1 = disco_init(mim(), Opts1), - disco_get_nodes(mim(), State1), + {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), + ?assertMatch(#{last_query_info := #{already_registered := false}}, State1_2), + ?assertEqual([], Nodes1_2), + + %% "test2" node can see "test1" on initial registration State2 = disco_init(mim2(), Opts2), - {{ok, Nodes}, State2_2} = disco_get_nodes(mim2(), State2), - %% "test2" node can see "test1" - true = lists:member(test1, Nodes), - {{ok, _}, State2_3} = disco_get_nodes(mim2(), State2_2), - %% Check that we follow the right code branch - #{last_query_info := #{already_registered := true}} = State2_3. - -rdbms_backend_supports_auto_cleaning(Config) -> - ensure_mocked(Config), - CN = <<"big_test2">>, + {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), + ?assertMatch(#{last_query_info := #{already_registered := false}}, State2_2), + ?assertEqual([test1], Nodes2_2), + + %% "test2" node can see "test1" on update + {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2), + ?assertEqual(lists:sort([test1, test2]), lists:sort(Nodes2_3)), + ?assertMatch(#{last_query_info := #{already_registered := true}}, State2_3). + +rdbms_backend_supports_auto_cleaning(_Config) -> + Timestamp = month_ago(), + mock_timestamp(mim(), Timestamp), + CN = random_cluster_name(?FUNCTION_NAME), Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, + %% test1 row is written with an old (mocked) timestamp State1 = disco_init(mim(), Opts1), - {_, State1_2} = disco_get_nodes(mim(), State1), - {{ok, Nodes1}, State1_3} = disco_get_nodes(mim(), State1_2), - Timestamp = proplists:get_value(mocked_timestamp, Config), - #{last_query_info := #{timestamp := Timestamp}} = State1_3, - %% It is in DB - true = lists:member(test1, Nodes1), + {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), + {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), + ?assertEqual([], Nodes1_2), + ?assertEqual([test1], Nodes1_3), + ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2), + ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3), + %% test2 would clean test1 registration %% We don't mock on mim2 node, so timestamps would differ State2 = disco_init(mim2(), Opts2), - {{ok, Nodes2}, State2_2} = disco_get_nodes(mim2(), State2), - false = lists:member(test1, Nodes2), - #{last_query_info := #{run_cleaning_result := {removed, [test1]}}} = State2_2. + {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), + ?assertEqual([], Nodes2_2), + ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}}, + State2_2), + {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2), + ?assertEqual([test2], Nodes2_3), + #{last_query_info := #{last_rows := SelectedRows}} = State2_3, + ?assertMatch(1, length(SelectedRows)). + +rdbms_backend_node_doesnt_remove_itself(_Config) -> + Timestamp = month_ago(), + mock_timestamp(mim(), Timestamp), + CN = random_cluster_name(?FUNCTION_NAME), + Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, + Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, + + %% test1 row is written with an old (mocked) timestamp + State1 = disco_init(mim(), Opts1), + {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), + ?assertEqual([], Nodes1_2), + ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2), + + unmock_timestamp(mim()), + %% test1 row is not removed and timestamp is updated + {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), + ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3), + ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, + State1_3), + ?assertEqual([test1], Nodes1_3), + + State2 = disco_init(mim2(), Opts2), + {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), + ?assertEqual([test1], Nodes2_2), + ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, + State2_2). + +rdbms_backend_db_queries(_Config) -> + CN = random_cluster_name(?FUNCTION_NAME), + TS = rpc(mim(), mongoose_rdbms_timestamp, select, []), + TS2 = TS + 100, + + %% insertion fails if node name or node num is already added for the cluster + ?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, TS, 1)), + ?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 1)), + ?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 2)), + ?assertMatch({error, _}, insert_new(CN, <<"test2">>, TS, 1)), + ?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, TS, 2)), + + %% update of the timestamp works correctly + {selected, SelectedNodes1} = select(CN), + ?assertEqual(lists:sort([{<<"test1">>, 1, TS}, {<<"test2">>, 2, TS}]), + lists:sort(SelectedNodes1)), + ?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, TS2)), + {selected, SelectedNodes2} = select(CN), + ?assertEqual(lists:sort([{<<"test1">>, 1, TS2}, {<<"test2">>, 2, TS}]), + lists:sort(SelectedNodes2)), + + %% node removal work correctly + ?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)), + ?assertEqual({selected, [{<<"test2">>, 2, TS}]}, select(CN)). %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- disco_init(Node, Opts) -> - rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]). + State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]), + log_disco_request(?FUNCTION_NAME, Node, Opts, State), + State. disco_get_nodes(Node, State) -> - rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]). + NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]), + log_disco_request(?FUNCTION_NAME, Node, State, NewState), + NewState. + +log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) -> + ct:log("[0] disco_init(~p,~n" ++ + " ~p) =~n" ++ + " ~p", + [Node, Opts, State]), + erlang:put({disco, Node, CN}, 1); +log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) -> + N = case erlang:get({disco, Node, CN}) of + undefined -> 1; + Int when is_integer(Int) -> Int + end, + ct:log("[~p] disco_get_nodes(~p,~n" ++ + " ~p) =~n" ++ + " ~p", + [N, Node, OldState, NewState]), + erlang:put({disco, Node, CN}, N+1). timestamp() -> os:system_time(second). @@ -123,15 +204,34 @@ mock_timestamp(Node, Timestamp) -> ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]), ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]), %% Ensure that we mock - EnsureMocked = fun() -> - Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []) - end, - EnsureMocked(), - [{ensure_mocked, EnsureMocked}, {mocked_timestamp, Timestamp}]. - -ensure_mocked(Config) -> - EnsureMocked = proplists:get_value(ensure_mocked, Config), - EnsureMocked(). + Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []). unmock_timestamp(Node) -> ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]). + +unmock(Node) -> + rpc(Node, meck, unload, []). + +random_cluster_name(CaseName) -> + Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []), + <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>. + +insert_new(CN, BinNode, TS, NodeNum) -> + Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, TS, NodeNum]), + ct:log("insert_new(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, TS, NodeNum, Ret]), + Ret. + +select(CN) -> + Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]), + ct:log("select(~p) = ~p", [CN, Ret]), + Ret. + +update_existing(CN, BinNode, TS) -> + Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, TS]), + ct:log("select(~p, ~p, ~p) = ~p", [CN, BinNode, TS, Ret]), + Ret. + +delete_node_from_db(CN, BinNode) -> + Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]), + ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]), + Ret. diff --git a/src/mongoose_cets_discovery_rdbms.erl b/src/mongoose_cets_discovery_rdbms.erl index 0c51fb04e76..6708f91277d 100644 --- a/src/mongoose_cets_discovery_rdbms.erl +++ b/src/mongoose_cets_discovery_rdbms.erl @@ -3,19 +3,27 @@ -behaviour(cets_discovery). -export([init/1, get_nodes/1]). +%% these functions are exported for testing purposes only. +-export([select/1, insert_new/4, update_existing/3, delete_node_from_db/2]). +-ignore_xref([select/1, insert_new/4, update_existing/3, delete_node_from_db/2]). + -include("mongoose_logger.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. --type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map(), - expire_time := non_neg_integer()}. --type state() :: opts(). +-type opts() :: #{cluster_name := binary(), node_name_to_insert := binary(), + last_query_info => map(), expire_time => non_neg_integer(), + any() => any()}. + +-type state() :: #{cluster_name := binary(), node_name_to_insert := binary(), + last_query_info := map(), expire_time := non_neg_integer()}. -spec init(opts()) -> state(). init(Opts = #{cluster_name := _, node_name_to_insert := _}) -> - maps:merge(defaults(), Opts). + Keys = [cluster_name, node_name_to_insert, last_query_info, expire_time], + maps:with(Keys, maps:merge(defaults(), Opts)). defaults() -> #{expire_time => 60 * 60 * 1, %% 1 hour in seconds @@ -23,23 +31,20 @@ defaults() -> -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}. get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) -> - try - case is_rdbms_running() of - true -> - try_register(ClusterName, Node, State); - false -> - skip - end - of - {Num, Nodes, Info} -> - mongoose_node_num:set_node_num(Num), - {{ok, Nodes}, State#{last_query_info => Info}}; - skip -> + case is_rdbms_running() of + true -> + try try_register(ClusterName, Node, State) of + {Num, Nodes, Info} -> + mongoose_node_num:set_node_num(Num), + {{ok, [binary_to_atom(N) || N <- Nodes]}, + State#{last_query_info => Info}} + catch Class:Reason:Stacktrace -> + ?LOG_ERROR(#{what => discovery_failed_select, class => Class, + reason => Reason, stacktrace => Stacktrace}), + {{error, Reason}, State} + end; + false -> {{error, rdbms_not_running}, State} - catch Class:Reason:Stacktrace -> - ?LOG_ERROR(#{what => discovery_failed_select, class => Class, - reason => Reason, stacktrace => Stacktrace}), - {{error, Reason}, State} end. is_rdbms_running() -> @@ -50,60 +55,56 @@ is_rdbms_running() -> false end. -try_register(ClusterName, NodeBin, State) when is_binary(NodeBin), is_binary(ClusterName) -> +try_register(ClusterName, Node, State) when is_binary(Node), is_binary(ClusterName) -> prepare(), Timestamp = timestamp(), - Node = binary_to_atom(NodeBin), {selected, Rows} = select(ClusterName), - Zipped = [{binary_to_atom(DbNodeBin), Num, TS} || {DbNodeBin, Num, TS} <- Rows], - {Nodes, Nums, _Timestamps} = lists:unzip3(Zipped), + {Nodes, Nums, _Timestamps} = lists:unzip3(Rows), AlreadyRegistered = lists:member(Node, Nodes), NodeNum = case AlreadyRegistered of true -> - update_existing(ClusterName, NodeBin, Timestamp), - {value, {_, Num, _TS}} = lists:keysearch(Node, 1, Zipped), + update_existing(ClusterName, Node, Timestamp), + {value, {_, Num, _TS}} = lists:keysearch(Node, 1, Rows), Num; false -> - Num = next_free_num(lists:usort(Nums)), + Num = first_free_num(lists:usort(Nums)), %% Could fail with duplicate node_num reason. %% In this case just wait for the next get_nodes call. - insert_new(ClusterName, NodeBin, Timestamp, Num), - Num + case insert_new(ClusterName, Node, Timestamp, Num) of + {error, _} -> 0; %% return default node num + {updated, 1} -> Num + end end, RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State), %% This could be used for debugging Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp, node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult}, - Nodes2 = skip_expired_nodes(Nodes, RunCleaningResult), - {NodeNum, Nodes2, Info}. + {NodeNum, skip_expired_nodes(Nodes, RunCleaningResult), Info}. skip_expired_nodes(Nodes, {removed, ExpiredNodes}) -> - Nodes -- ExpiredNodes; -skip_expired_nodes(Nodes, {skip, _}) -> - Nodes. + (Nodes -- ExpiredNodes). run_cleaning(ClusterName, Timestamp, Rows, State) -> - Expired = [{DbNodeBin, Num, DbTS} || {DbNodeBin, Num, DbTS} <- Rows, - is_expired(DbTS, Timestamp, State)], - ExpiredNodes = [binary_to_atom(DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired], - case Expired of - [] -> - {skip, nothing_expired}; - _ -> - [delete_node_from_db(ClusterName, DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired], + #{expire_time := ExpireTime, node_name_to_insert := CurrentNode} = State, + ExpiredNodes = [DbNode || {DbNode, _Num, DbTS} <- Rows, + is_expired(DbTS, Timestamp, ExpireTime), + DbNode =/= CurrentNode], + [delete_node_from_db(ClusterName, DbNode) || DbNode <- ExpiredNodes], + case ExpiredNodes of + [] -> ok; + [_ | _] -> ?LOG_WARNING(#{what => cets_expired_nodes, text => <<"Expired nodes are detected in discovery_nodes table">>, - expired_nodes => ExpiredNodes}), - {removed, ExpiredNodes} - end. + expired_nodes => ExpiredNodes}) + end, + {removed, ExpiredNodes}. -is_expired(DbTS, Timestamp, #{expire_time := ExpireTime}) when is_integer(DbTS) -> +is_expired(DbTS, Timestamp, ExpireTime) when is_integer(Timestamp), + is_integer(ExpireTime), + is_integer(DbTS) -> (Timestamp - DbTS) > ExpireTime. %% compare seconds -delete_node_from_db(ClusterName, Node) -> - mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]). - prepare() -> T = discovery_nodes, mongoose_rdbms_timestamp:prepare(), @@ -131,11 +132,14 @@ insert_new(ClusterName, Node, Timestamp, Num) -> update_existing() -> <<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>. +update_existing(ClusterName, Node, Timestamp) -> + mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]). + delete_node_from_db() -> <<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>. -update_existing(ClusterName, Node, Timestamp) -> - mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]). +delete_node_from_db(ClusterName, Node) -> + mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]). %% in seconds timestamp() -> @@ -144,19 +148,17 @@ timestamp() -> mongoose_rdbms_timestamp:select(). %% Returns a next free node id based on the currently registered ids -next_free_num([]) -> - 0; -next_free_num([H | T = [E | _]]) when ((H + 1) =:= E) -> - %% Sequential, ignore H - next_free_num(T); -next_free_num([H | _]) -> - H + 1. +first_free_num(Nums) -> + %% 0 is default node_num, so lets start from 1 + [FirstFreeNum | _] = lists:seq(1, length(Nums)+1) -- Nums, + FirstFreeNum. -ifdef(TEST). jid_to_opt_binary_test_() -> - [?_assertEqual(0, next_free_num([])), - ?_assertEqual(3, next_free_num([1, 2, 5])), - ?_assertEqual(3, next_free_num([1, 2]))]. + [?_assertEqual(1, first_free_num([])), + ?_assertEqual(3, first_free_num([1, 2, 5])), + ?_assertEqual(1, first_free_num([2, 5])), + ?_assertEqual(3, first_free_num([1, 2]))]. -endif.