diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index 1a6ee50aef4..bff75958852 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -17,6 +17,7 @@ -module(rdbms_SUITE). -compile([export_all, nowarn_export_all]). +-include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). %% We need assert from it @@ -31,10 +32,16 @@ %%-------------------------------------------------------------------- all() -> - [{group, rdbms_queries}]. + [ + {group, global_rdbms_queries}, + {group, tagged_rdbms_queries} + ]. groups() -> - [{rdbms_queries, [], rdbms_queries_cases()}]. + [ + {global_rdbms_queries, [], rdbms_queries_cases()}, + {tagged_rdbms_queries, [], rdbms_queries_cases()} + ]. rdbms_queries_cases() -> [select_one_works_case, @@ -99,6 +106,18 @@ init_per_suite(Config) -> end_per_suite(Config) -> escalus:end_per_suite(Config). +init_per_group(tagged_rdbms_queries, Config) -> + ExtraConfig = stop_global_default_pool(), + start_local_host_type_pool(ExtraConfig), + ExtraConfig ++ Config; +init_per_group(global_rdbms_queries, Config) -> + [{tag, global} | Config]. + +end_per_group(tagged_rdbms_queries, Config) -> + restart_global_default_pool(Config); +end_per_group(global_rdbms_queries, Config) -> + Config. + init_per_testcase(test_incremental_upsert, Config) -> erase_inbox(Config), escalus:init_per_testcase(test_incremental_upsert, Config); @@ -325,7 +344,13 @@ read_prep_boolean_case(Config) -> select_current_timestamp_case(Config) -> ok = rpc(mim(), mongoose_rdbms_timestamp, prepare, []), - assert_is_integer(rpc(mim(), mongoose_rdbms_timestamp, select, [])). + Res = case ?config(tag, Config) of + global -> + rpc(mim(), mongoose_rdbms_timestamp, select, []); + Tag -> + rpc(mim(), mongoose_rdbms_timestamp, select, [host_type(), Tag]) + end, + assert_is_integer(Res). assert_is_integer(X) when is_integer(X) -> X. @@ -522,7 +547,8 @@ test_failed_wrapper_transaction(Config) -> end, % when - F = fun() -> sql_execute_wrapped_request(Config, insert_one, [<<"check1">>], WrapperFun) end, + ScopeAndTag = scope_and_tag(Config), + F = fun() -> sql_execute_wrapped_request(ScopeAndTag, insert_one, [<<"check1">>], WrapperFun) end, sql_transaction(Config, F), % then @@ -590,9 +616,18 @@ select_like_prep_case(Config) -> %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- +tag() -> + extra_tag. + +scope_and_tag(Config) -> + case ?config(tag, Config) of + global -> [host_type()]; + Tag -> [host_type(), Tag] + end. -sql_query(_Config, Query) -> - slow_rpc(mongoose_rdbms, sql_query, [host_type(), Query]). +sql_query(Config, Query) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, sql_query, ScopeAndTag ++ [Query]). sql_prepare(_Config, Name, Table, Fields, Query) -> escalus_ejabberd:rpc(mongoose_rdbms, prepare, [Name, Table, Fields, Query]). @@ -600,39 +635,52 @@ sql_prepare(_Config, Name, Table, Fields, Query) -> sql_prepare_upsert(_Config, Name, Table, Insert, Update, Unique, Incr) -> escalus_ejabberd:rpc(rdbms_queries, prepare_upsert, [host_type(), Name, Table, Insert, Update, Unique, Incr]). -sql_execute(_Config, Name, Parameters) -> - slow_rpc(mongoose_rdbms, execute, [host_type(), Name, Parameters]). +sql_execute(Config, Name, Parameters) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, execute, ScopeAndTag ++ [Name, Parameters]). -sql_execute_cast(_Config, Name, Parameters) -> - slow_rpc(mongoose_rdbms, execute_cast, [host_type(), Name, Parameters]). +sql_execute_cast(Config, Name, Parameters) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, execute_cast, ScopeAndTag ++ [Name, Parameters]). -sql_query_cast(_Config, Query) -> - slow_rpc(mongoose_rdbms, sql_query_cast, [host_type(), Query]). +sql_query_cast(Config, Query) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, sql_query_cast, ScopeAndTag ++ [Query]). -sql_execute_request(_Config, Name, Parameters) -> - slow_rpc(mongoose_rdbms, execute_request, [host_type(), Name, Parameters]). +sql_execute_request(Config, Name, Parameters) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, execute_request, ScopeAndTag ++ [Name, Parameters]). -sql_execute_wrapped_request(_Config, Name, Parameters, WrapperFun) -> - slow_rpc(mongoose_rdbms, execute_wrapped_request, [host_type(), Name, Parameters, WrapperFun]). +sql_execute_wrapped_request(ScopeAndTag, Name, Parameters, WrapperFun) -> + slow_rpc(mongoose_rdbms, execute_wrapped_request, ScopeAndTag ++ [Name, Parameters, WrapperFun]). -sql_execute_wrapped_request_and_wait_response(_Config, Name, Parameters, WrapperFun) -> - slow_rpc(?MODULE, execute_wrapped_request_and_wait_response, [host_type(), Name, Parameters, WrapperFun]). +sql_execute_wrapped_request_and_wait_response(Config, Name, Parameters, WrapperFun) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(?MODULE, execute_wrapped_request_and_wait_response, ScopeAndTag ++ [Name, Parameters, WrapperFun]). + +execute_wrapped_request_and_wait_response(HostType, Tag, Name, Parameters, WrapperFun) -> + RequestId = mongoose_rdbms:execute_wrapped_request(HostType, Tag, Name, Parameters, WrapperFun), + gen_server:wait_response(RequestId, 100). execute_wrapped_request_and_wait_response(HostType, Name, Parameters, WrapperFun) -> RequestId = mongoose_rdbms:execute_wrapped_request(HostType, Name, Parameters, WrapperFun), gen_server:wait_response(RequestId, 100). -sql_execute_upsert(_Config, Name, Insert, Update, Unique) -> - slow_rpc(rdbms_queries, execute_upsert, [host_type(), Name, Insert, Update, Unique]). +sql_execute_upsert(Config, Name, Insert, Update, Unique) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(rdbms_queries, execute_upsert, ScopeAndTag ++ [Name, Insert, Update, Unique]). -sql_query_request(_Config, Query) -> - slow_rpc(mongoose_rdbms, sql_query_request, [host_type(), Query]). +sql_query_request(Config, Query) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, sql_query_request, ScopeAndTag ++ [Query]). -sql_transaction_request(_Config, Query) -> - slow_rpc(mongoose_rdbms, sql_transaction_request, [host_type(), Query]). +sql_transaction_request(Config, Query) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, sql_transaction_request, ScopeAndTag ++ [Query]). -sql_transaction(_Config, F) -> - slow_rpc(mongoose_rdbms, sql_transaction, [host_type(), F]). +sql_transaction(Config, F) -> + ScopeAndTag = scope_and_tag(Config), + slow_rpc(mongoose_rdbms, sql_transaction, ScopeAndTag ++ [F]). escape_null(_Config) -> escalus_ejabberd:rpc(mongoose_rdbms, escape_null, []). @@ -1118,6 +1166,40 @@ is_pgsql() -> is_mysql() -> db_engine() == mysql. +stop_global_default_pool() -> + Pools = rpc(mim(), mongoose_config, get_opt, [outgoing_pools]), + [GlobalRdbmsPool] = [Pool || Pool = #{type := rdbms, scope := global, tag := default} <- Pools], + ok = rpc(mim(), mongoose_wpool, stop, [rdbms, global, default]), + Extra = maybe_stop_service_domain_db(), + [{tag, tag()}, {global_default_rdbms_pool, GlobalRdbmsPool} | Extra]. + +restart_global_default_pool(Config) -> + GlobalRdbmsPool = ?config(global_default_rdbms_pool, Config), + rpc(mim(), mongoose_wpool, start_configured_pools, [[GlobalRdbmsPool]]), + maybe_restart_service_domain_db(Config). + +maybe_stop_service_domain_db() -> + case rpc(mim(), erlang, whereis, [service_domain_db]) of + undefined -> + []; + ServiceDomainDB when is_pid(ServiceDomainDB) -> + ok = rpc(mim(), sys, suspend, [ServiceDomainDB]), + [{service_domain_db, ServiceDomainDB}] + end. + +maybe_restart_service_domain_db(Config) -> + case ?config(service_domain_db, Config) of + undefined -> + ok; + ServiceDomainDB -> + ok = rpc(mim(), sys, resume, [ServiceDomainDB]) + end. + +start_local_host_type_pool(Config) -> + GlobalRdbmsPool = ?config(global_default_rdbms_pool, Config), + LocalHostTypePool = GlobalRdbmsPool#{scope := host_type(), tag := tag()}, + rpc(mim(), mongoose_wpool, start_configured_pools, [[LocalHostTypePool], [host_type()]]). + escape_column(Name) -> case is_mysql() of true -> diff --git a/doc/configuration/outgoing-connections.md b/doc/configuration/outgoing-connections.md index ed99a563ffa..5e5bcc6d01d 100644 --- a/doc/configuration/outgoing-connections.md +++ b/doc/configuration/outgoing-connections.md @@ -30,6 +30,10 @@ This allows you to create multiple dedicated pools of the same type. * `host` - the pool will be started for each XMPP host or host type served by MongooseIM * `single_host` - the pool will be started for the selected host or host type only (you must provide the name). + !!! Note + A pool with scope `global` and tag `default` is used by services that are not configured by host_type, like `service_domain_db` or `service_mongoose_system_metrics`, or by modules that don't support dynamic domains, like `mod_pubsub`. + If a global default pool is not configured, these services will fail. + ## Worker pool options All pools are managed by the [inaka/worker_pool](https://github.com/inaka/worker_pool) library. diff --git a/src/rdbms/mongoose_rdbms.erl b/src/rdbms/mongoose_rdbms.erl index 19ba47ad29b..7fa9c0ba073 100644 --- a/src/rdbms/mongoose_rdbms.erl +++ b/src/rdbms/mongoose_rdbms.erl @@ -67,71 +67,62 @@ %% External exports -export([prepare/4, prepared/1, - execute/3, - execute_cast/3, - execute_request/3, - execute_wrapped_request/4, - execute_successfully/3, - sql_query/2, - sql_query_cast/2, - sql_query_request/2, + execute/3, execute/4, + execute_cast/3, execute_cast/4, + execute_request/3, execute_request/4, + execute_wrapped_request/4, execute_wrapped_request/5, + execute_successfully/3, execute_successfully/4, + sql_query/2, sql_query/3, + sql_query_cast/2, sql_query_cast/3, + sql_query_request/2, sql_query_request/3, + sql_transaction/2, sql_transaction/3, + sql_transaction_request/2, sql_transaction_request/3, + sql_dirty/2, sql_dirty/3, sql_query_t/1, - sql_transaction/2, - sql_transaction_request/2, transaction_with_delayed_retry/3, - sql_dirty/2, to_bool/1, db_engine/1, db_type/0, use_escaped/1]). %% Unicode escaping --export([escape_string/1, - use_escaped_string/1]). +-export([escape_string/1, use_escaped_string/1]). %% Integer escaping --export([escape_integer/1, - use_escaped_integer/1]). +-export([escape_integer/1, use_escaped_integer/1]). %% Boolean escaping --export([escape_boolean/1, - use_escaped_boolean/1]). +-export([escape_boolean/1, use_escaped_boolean/1]). %% LIKE escaping --export([escape_like/1, - escape_prepared_like/1, - escape_like_prefix/1, - use_escaped_like/1]). +-export([escape_like/1, escape_prepared_like/1, escape_like_prefix/1, use_escaped_like/1]). %% BLOB escaping --export([escape_binary/2, - unescape_binary/2, - use_escaped_binary/1]). +-export([escape_binary/2, unescape_binary/2, use_escaped_binary/1]). %% Null escaping %% (to keep uniform pattern of passing values) --export([escape_null/0, - use_escaped_null/1]). +-export([escape_null/0, use_escaped_null/1]). %% count / integra types decoding --export([result_to_integer/1, - selected_to_integer/1]). +-export([result_to_integer/1, selected_to_integer/1]). -export([character_to_integer/1]). %% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --ignore_xref([sql_query_cast/2, sql_query_request/2, - execute_cast/3, execute_request/3, - execute_wrapped_request/4, - sql_transaction_request/2, - sql_query_t/1, use_escaped/1, +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +%% External exports +-ignore_xref([ + execute/4, execute_cast/3, execute_cast/4, + execute_request/3, execute_request/4, + execute_wrapped_request/4, execute_wrapped_request/5, + execute_successfully/4, + sql_query/3, sql_query_cast/2, sql_query_cast/3, + sql_query_request/2, sql_query_request/3, + sql_transaction/3, sql_transaction_request/2, sql_transaction_request/3, + sql_dirty/3, sql_query_t/1, + use_escaped/1, escape_like/1, escape_like_prefix/1, use_escaped_like/1, escape_binary/2, use_escaped_binary/1, escape_integer/1, use_escaped_integer/1, @@ -152,6 +143,7 @@ }). -type state() :: #state{}. +-define(DEFAULT_POOL_TAG, default). -define(STATE_KEY, mongoose_rdbms_state). -define(MAX_TRANSACTION_RESTARTS, 10). -define(TRANSACTION_TIMEOUT, 60000). % milliseconds @@ -160,7 +152,8 @@ %% the retry counter runs out. We just attempt to reduce log pollution. -define(CONNECT_RETRIES, 5). --type server() :: mongooseim:host_type() | global. +-type query_name() :: atom(). +-type query_params() :: [term()]. -type request_wrapper() :: fun((fun(() -> T)) -> T). -type rdbms_msg() :: {sql_query, _} | {sql_transaction, fun()} @@ -175,9 +168,7 @@ -type query_result() :: single_query_result() | [single_query_result()]. -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}. -type dirty_result() :: {ok, any()} | {error, any()}. --export_type([query_result/0, - transaction_result/0, - server/0]). +-export_type([query_name/0, query_result/0, transaction_result/0]). -type options() :: #{driver := pgsql | mysql | odbc, max_start_interval := pos_integer(), @@ -210,10 +201,9 @@ ensure_db_port(Opts = #{port := _}) -> Opts; ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432}; ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}. --spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()], - Statement :: iodata()) -> - {ok, Name} | {error, already_exists} - when Name :: atom(). +-spec prepare( + query_name(), Table :: binary() | atom(), Fields :: [binary() | atom()], Statement :: iodata()) -> + {ok, query_name()} | {error, already_exists}. prepare(Name, Table, Fields, Statement) when is_atom(Table) -> prepare(Name, atom_to_binary(Table, utf8), Fields, Statement); prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) -> @@ -230,35 +220,55 @@ prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) -> prepared(Name) -> ets:member(prepared_statements, Name). --spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - query_result(). -execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> - sql_call(HostType, {sql_execute, Name, Parameters}). +-spec execute(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result(). +execute(HostType, Name, Parameters) -> + execute(HostType, ?DEFAULT_POOL_TAG, Name, Parameters). + +-spec execute(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) -> + query_result(). +execute(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) -> + sql_call(HostType, PoolTag, {sql_execute, Name, Parameters}). --spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - query_result(). -execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> - sql_cast(HostType, {sql_execute, Name, Parameters}). +-spec execute_cast(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result(). +execute_cast(HostType, Name, Parameters) -> + execute_cast(HostType, ?DEFAULT_POOL_TAG, Name, Parameters). --spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - request_id(). +-spec execute_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) -> + query_result(). +execute_cast(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) -> + sql_cast(HostType, PoolTag, {sql_execute, Name, Parameters}). + +-spec execute_request(mongooseim:host_type_or_global(), query_name(), query_params()) -> request_id(). execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> - sql_request(HostType, {sql_execute, Name, Parameters}). + execute_request(HostType, ?DEFAULT_POOL_TAG, Name, Parameters). + +-spec execute_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) -> + request_id(). +execute_request(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) -> + sql_request(HostType, PoolTag, {sql_execute, Name, Parameters}). + +-spec execute_wrapped_request(mongooseim:host_type_or_global(), query_name(), query_params(), request_wrapper()) -> + request_id(). +execute_wrapped_request(HostType, Name, Parameters, Wrapper) -> + execute_wrapped_request(HostType, ?DEFAULT_POOL_TAG, Name, Parameters, Wrapper). -spec execute_wrapped_request( - HostType :: server(), - Name :: atom(), - Parameters :: [term()], - Wrapper :: request_wrapper()) -> request_id(). -execute_wrapped_request(HostType, Name, Parameters, Wrapper) - when is_atom(Name), is_list(Parameters), is_function(Wrapper) -> - sql_request(HostType, {sql_execute_wrapped, Name, Parameters, Wrapper}). + mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params(), request_wrapper()) -> + request_id(). +execute_wrapped_request(HostType, PoolTag, Name, Parameters, Wrapper) + when is_atom(PoolTag), is_atom(Name), is_list(Parameters), is_function(Wrapper) -> + sql_request(HostType, PoolTag, {sql_execute_wrapped, Name, Parameters, Wrapper}). %% Same as execute/3, but would fail loudly on any error. --spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - query_result(). +-spec execute_successfully(mongooseim:host_type_or_global(), query_name(), query_params()) -> + query_result(). execute_successfully(HostType, Name, Parameters) -> - try execute(HostType, Name, Parameters) of + execute_successfully(HostType, ?DEFAULT_POOL_TAG, Name, Parameters). + +-spec execute_successfully(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) -> + query_result(). +execute_successfully(HostType, PoolTag, Name, Parameters) -> + try execute(HostType, PoolTag, Name, Parameters) of {selected, _} = Result -> Result; {updated, _} = Result -> @@ -286,38 +296,66 @@ query_name_to_string(Name) -> Statement end. --spec sql_query(HostType :: server(), Query :: any()) -> query_result(). +-spec sql_query(mongooseim:host_type_or_global(), Query :: any()) -> query_result(). sql_query(HostType, Query) -> - sql_call(HostType, {sql_query, Query}). + sql_query(HostType, ?DEFAULT_POOL_TAG, Query). + +-spec sql_query(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) -> + query_result(). +sql_query(HostType, PoolTag, Query) -> + sql_call(HostType, PoolTag, {sql_query, Query}). --spec sql_query_request(HostType :: server(), Query :: any()) -> request_id(). +-spec sql_query_request(mongooseim:host_type_or_global(), Query :: any()) -> request_id(). sql_query_request(HostType, Query) -> - sql_request(HostType, {sql_query, Query}). + sql_query_request(HostType, ?DEFAULT_POOL_TAG, Query). + +-spec sql_query_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) -> + request_id(). +sql_query_request(HostType, PoolTag, Query) -> + sql_request(HostType, PoolTag, {sql_query, Query}). --spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result(). +-spec sql_query_cast(mongooseim:host_type_or_global(), Query :: any()) -> query_result(). sql_query_cast(HostType, Query) -> - sql_cast(HostType, {sql_query, Query}). + sql_query_cast(HostType, ?DEFAULT_POOL_TAG, Query). + +-spec sql_query_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) -> + query_result(). +sql_query_cast(HostType, PoolTag, Query) -> + sql_cast(HostType, PoolTag, {sql_query, Query}). %% @doc SQL transaction based on a list of queries --spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result(). -sql_transaction(HostType, Queries) when is_list(Queries) -> +-spec sql_transaction(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) -> + transaction_result(). +sql_transaction(HostType, Msg) -> + sql_transaction(HostType, ?DEFAULT_POOL_TAG, Msg). + +-spec sql_transaction(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) -> + transaction_result(). +sql_transaction(HostType, PoolTag, Queries) when is_atom(PoolTag), is_list(Queries) -> F = fun() -> lists:map(fun sql_query_t/1, Queries) end, - sql_transaction(HostType, F); + sql_transaction(HostType, PoolTag, F); %% SQL transaction, based on a erlang anonymous function (F = fun) -sql_transaction(HostType, F) when is_function(F) -> - sql_call(HostType, {sql_transaction, F}). +sql_transaction(HostType, PoolTag, F) when is_atom(PoolTag), is_function(F) -> + sql_call(HostType, PoolTag, {sql_transaction, F}). %% @doc SQL transaction based on a list of queries --spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id(). -sql_transaction_request(HostType, Queries) when is_list(Queries) -> +-spec sql_transaction_request(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) -> + request_id(). +sql_transaction_request(HostType, Queries) -> + sql_transaction_request(HostType, ?DEFAULT_POOL_TAG, Queries). + +-spec sql_transaction_request(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) -> + request_id(). +sql_transaction_request(HostType, PoolTag, Queries) when is_atom(PoolTag), is_list(Queries) -> F = fun() -> lists:map(fun sql_query_t/1, Queries) end, - sql_transaction_request(HostType, F); + sql_transaction_request(HostType, PoolTag, F); %% SQL transaction, based on a erlang anonymous function (F = fun) -sql_transaction_request(HostType, F) when is_function(F) -> - sql_request(HostType, {sql_transaction, F}). +sql_transaction_request(HostType, PoolTag, F) when is_atom(PoolTag), is_function(F) -> + sql_request(HostType, PoolTag, {sql_transaction, F}). %% This function allows to specify delay between retries. --spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result(). +-spec transaction_with_delayed_retry(mongooseim:host_type_or_global(), fun() | maybe_improper_list(), map()) -> + transaction_result(). transaction_with_delayed_retry(HostType, F, Info) -> Retries = maps:get(retries, Info), Delay = maps:get(delay, Info), @@ -342,60 +380,64 @@ do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) -> erlang:error(Err) end. --spec sql_dirty(server(), fun()) -> any() | no_return(). -sql_dirty(HostType, F) when is_function(F) -> - case sql_call(HostType, {sql_dirty, F}) of +-spec sql_dirty(mongooseim:host_type_or_global(), fun()) -> any() | no_return(). +sql_dirty(HostType, F) -> + sql_dirty(HostType, ?DEFAULT_POOL_TAG, F). + +-spec sql_dirty(mongooseim:host_type_or_global(), atom(), fun()) -> any() | no_return(). +sql_dirty(HostType, PoolTag, F) when is_function(F) -> + case sql_call(HostType, PoolTag, {sql_dirty, F}) of {ok, Result} -> Result; {error, Error} -> throw(Error) end. %% TODO: Better spec for RPC calls --spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_call(HostType, Msg) -> +-spec sql_call(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any(). +sql_call(HostType, PoolTag, Msg) -> case get_state() of - undefined -> sql_call0(HostType, Msg); + undefined -> sql_call0(HostType, PoolTag, Msg); State -> {Res, NewState} = nested_op(Msg, State), put_state(NewState), Res end. --spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_call0(HostType, Msg) -> +-spec sql_call0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any(). +sql_call0(HostType, PoolTag, Msg) -> Timestamp = erlang:monotonic_time(millisecond), - mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + mongoose_wpool:call(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}). --spec sql_request(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_request(HostType, Msg) -> +-spec sql_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any(). +sql_request(HostType, PoolTag, Msg) -> case get_state() of - undefined -> sql_request0(HostType, Msg); + undefined -> sql_request0(HostType, PoolTag, Msg); State -> {Res, NewState} = nested_op(Msg, State), put_state(NewState), Res end. --spec sql_request0(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_request0(HostType, Msg) -> +-spec sql_request0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any(). +sql_request0(HostType, PoolTag, Msg) -> Timestamp = erlang:monotonic_time(millisecond), - mongoose_wpool:send_request(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + mongoose_wpool:send_request(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}). --spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_cast(HostType, Msg) -> +-spec sql_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any(). +sql_cast(HostType, PoolTag, Msg) -> case get_state() of - undefined -> sql_cast0(HostType, Msg); + undefined -> sql_cast0(HostType, PoolTag, Msg); State -> {Res, NewState} = nested_op(Msg, State), put_state(NewState), Res end. --spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_cast0(HostType, Msg) -> +-spec sql_cast0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any(). +sql_cast0(HostType, PoolTag, Msg) -> Timestamp = erlang:monotonic_time(millisecond), - mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + mongoose_wpool:cast(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}). --spec get_db_info(Target :: server() | pid()) -> +-spec get_db_info(Target :: mongooseim:host_type_or_global() | pid()) -> {ok, DbType :: atom(), DbRef :: term()} | {error, any()}. get_db_info(Pid) when is_pid(Pid) -> wpool_process:call(Pid, get_db_info, 5000); @@ -442,7 +484,7 @@ escape_like(S) -> escape_like_prefix(S) -> {escaped_like, [$', escape_like_internal(S), $%, $']}. --spec escape_binary(server(), binary()) -> escaped_binary(). +-spec escape_binary(mongooseim:host_type_or_global(), binary()) -> escaped_binary(). escape_binary(_HostType, Bin) when is_binary(Bin) -> {escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}. @@ -582,7 +624,7 @@ escape_character($\\) -> "\\\\"; escape_character(C) -> C. --spec unescape_binary(server(), binary()) -> binary(). +-spec unescape_binary(mongooseim:host_type_or_global(), binary()) -> binary(). unescape_binary(_HostType, Bin) when is_binary(Bin) -> mongoose_rdbms_backend:unescape_binary(Bin). @@ -816,7 +858,7 @@ sql_dirty_internal(F, State) -> end, {Result, erase_state()}. --spec sql_execute(Type :: atom(), Name :: atom(), Params :: [term()], state()) -> +-spec sql_execute(Type :: atom(), query_name(), query_params(), state()) -> {query_result(), state()}. sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) -> %% Postgres allows to prepare statement only once, so we should take care that NewState is updated @@ -844,7 +886,7 @@ check_execute_result(nested_op, Res, Name, Params) -> ok end. --spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}. +-spec prepare_statement(query_name(), state()) -> {Ref :: term(), state()}. prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) -> case maps:get(Name, Prepared, undefined) of undefined -> @@ -870,7 +912,7 @@ abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql abort_on_driver_error(_) -> continue. --spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined. +-spec db_engine(mongooseim:host_type_or_global()) -> odbc | mysql | pgsql | undefined. db_engine(_HostType) -> try mongoose_backend:get_backend_name(global, ?MODULE) catch error:badarg -> undefined end. diff --git a/src/rdbms/mongoose_rdbms_timestamp.erl b/src/rdbms/mongoose_rdbms_timestamp.erl index 9b57434b292..48da86826cf 100644 --- a/src/rdbms/mongoose_rdbms_timestamp.erl +++ b/src/rdbms/mongoose_rdbms_timestamp.erl @@ -1,6 +1,8 @@ -module(mongoose_rdbms_timestamp). -export([prepare/0, - select/0]). + select/0, + select/2]). +-ignore_xref([select/2]). -spec prepare() -> ok. prepare() -> @@ -19,7 +21,10 @@ select_query() -> error({prepare_timestamp_query_failed, Other}) end. --spec select() -> integer(). +-spec select(mongooseim:host_type_or_global(), mongoose_wpool:tag()) -> integer(). select() -> - Res = mongoose_rdbms:execute_successfully(global, mim_timestamp, []), + select(global, default). + +select(HostType, PoolTag) -> + Res = mongoose_rdbms:execute_successfully(HostType, PoolTag, mim_timestamp, []), mongoose_rdbms:selected_to_integer(Res). %% ensure it is an integer diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index fc1ca83bd77..22146793288 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -43,11 +43,12 @@ -export([join/2, prepare_upsert/6, prepare_upsert/7, - execute_upsert/5, + execute_upsert/5, execute_upsert/6, request_upsert/5]). -ignore_xref([ - count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 + execute_upsert/6, count_records_where/3, + get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 ]). %% We have only two compile time options for db queries: @@ -82,61 +83,70 @@ join([H|T], Sep) -> get_db_type() -> ?RDBMS_TYPE. --spec execute_upsert(Host :: mongoose_rdbms:server(), +-spec execute_upsert(HostType :: mongooseim:host_type_or_global(), Name :: atom(), InsertParams :: [any()], UpdateParams :: [any()], UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). -execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> - case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of +execute_upsert(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) -> + execute_upsert(HostType, default, Name, InsertParams, UpdateParams, UniqueKeyValues). + +-spec execute_upsert(HostType :: mongooseim:host_type_or_global(), + PoolTag :: mongoose_wpool:tag(), + Name :: atom(), + InsertParams :: [any()], + UpdateParams :: [any()], + UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). +execute_upsert(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyValues) -> + case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> - mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); + mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); {pgsql, _} -> - mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); + mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); {odbc, mssql} -> - mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); + mongoose_rdbms:execute(HostType, PoolTag, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) end. --spec request_upsert(Host :: mongoose_rdbms:server(), +-spec request_upsert(HostType :: mongooseim:host_type_or_global(), Name :: atom(), InsertParams :: [any()], UpdateParams :: [any()], UniqueKeyValues :: [any()]) -> request_id(). -request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> - case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of +request_upsert(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) -> + case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> - mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams); + mongoose_rdbms:execute_request(HostType, Name, InsertParams ++ UpdateParams); {pgsql, _} -> - mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams); + mongoose_rdbms:execute_request(HostType, Name, InsertParams ++ UpdateParams); {odbc, mssql} -> - mongoose_rdbms:execute_request(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); + mongoose_rdbms:execute_request(HostType, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) end. %% @doc %% This functions prepares query for inserting a row or updating it if already exists %% Updates can be either fields or literal expressions like "column = tab.column + 1" --spec prepare_upsert(Host :: mongoose_rdbms:server(), - QueryName :: atom(), +-spec prepare_upsert(HostType :: mongooseim:host_type_or_global(), + QueryName :: mongoose_rdbms:query_name(), TableName :: atom(), InsertFields :: [binary()], Updates :: [binary() | {assignment | expression, binary(), binary()}], UniqueKeyFields :: [binary()]) -> - {ok, QueryName :: atom()} | {error, already_exists}. -prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) -> - prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none). + {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. +prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields) -> + prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields, none). --spec prepare_upsert(Host :: mongoose_rdbms:server(), - QueryName :: atom(), +-spec prepare_upsert(HostType :: mongooseim:host_type_or_global(), + QueryName :: mongoose_rdbms:query_name(), TableName :: atom(), InsertFields :: [ColumnName :: binary()], Updates :: [binary() | {assignment | expression, binary(), binary()}], UniqueKeyFields :: [binary()], IncrementalField :: none | binary()) -> - {ok, QueryName :: atom()} | {error, already_exists}. -prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> - SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), + {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. +prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> + SQL = upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), Query = iolist_to_binary(SQL), ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), @@ -150,8 +160,8 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> _ -> InsertFields ++ UpdateFields end. -upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> - case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of +upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> + case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of {mysql, _} -> upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); {pgsql, _} -> diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index 52f5acf26b6..81397e16168 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -38,7 +38,7 @@ -export([expand_pools/2]). -ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2, - send_request/2, send_request/4, send_request/5, + send_request/2, send_request/3, send_request/4, send_request/5, is_configured/2, is_configured/1, is_configured/1, start/2, start/3, start/5, start_configured_pools/1, start_configured_pools/2, stats/3, stop/1, stop/2]).