Skip to content

Commit

Permalink
Merge pull request #12214 from rabbitmq/md/simplify-vhost-exchange-de…
Browse files Browse the repository at this point in the history
…letion
  • Loading branch information
the-mikedavis authored Sep 5, 2024
2 parents c2ce905 + d98e0f2 commit 2f6a0ce
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 26 deletions.
28 changes: 11 additions & 17 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -835,9 +835,8 @@ delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = #if_has_data{}),
{ok, Bindings} = khepri_tx:get_many(Path),
ok = khepri_tx:delete_many(Path),
maps:fold(fun(_P, Set, Acc) ->
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
maps:fold(fun(_P, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], Bindings).

Expand Down Expand Up @@ -881,25 +880,20 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
OnlyDurable :: boolean(),
Deletions :: rabbit_binding:deletions().

delete_for_destination_in_khepri(DstName, OnlyDurable) ->
BindingsMap = match_destination_in_khepri(DstName),
maps:foreach(fun(K, _V) -> khepri_tx:delete(K) end, BindingsMap),
Bindings = maps:fold(fun(_, Set, Acc) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) ->
Path = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
{ok, Map} = khepri_tx:get_many(Path),
Map.

%% -------------------------------------------------------------------
%% delete_transient_for_destination_in_mnesia().
%% -------------------------------------------------------------------
Expand Down
64 changes: 64 additions & 0 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
peek_serial/1,
next_serial/1,
delete/2,
delete_all/1,
delete_serial/1,
recover/1,
match/1,
Expand Down Expand Up @@ -657,6 +658,69 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour
ok = khepri_tx:delete(khepri_exchange_path(XName)),
rabbit_db_binding:delete_all_for_exchange_in_khepri(X, OnlyDurable, RemoveBindingsForSource).

%% -------------------------------------------------------------------
%% delete_all().
%% -------------------------------------------------------------------

-spec delete_all(VHostName) -> Ret when
VHostName :: vhost:name(),
Deletions :: rabbit_binding:deletions(),
Ret :: {ok, Deletions}.
%% @doc Deletes all exchanges for a given vhost.
%%
%% @returns an `{ok, Deletions}' tuple containing the {@link
%% rabbit_binding:deletions()} caused by deleting the exchanges under the given
%% vhost.
%%
%% @private

delete_all(VHostName) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> delete_all_in_mnesia(VHostName) end,
khepri => fun() -> delete_all_in_khepri(VHostName) end
}).

delete_all_in_mnesia(VHostName) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
delete_all_in_mnesia_tx(VHostName)
end).

delete_all_in_mnesia_tx(VHostName) ->
Match = #exchange{name = rabbit_misc:r(VHostName, exchange), _ = '_'},
Xs = mnesia:match_object(?MNESIA_TABLE, Match, write),
Deletions =
lists:foldl(
fun(X, Acc) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
unconditional_delete_in_mnesia( X, false),
XDeletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, XDeletions),
rabbit_binding:combine_deletions(Acc, XDeletions1)
end, rabbit_binding:new_deletions(), Xs),
{ok, Deletions}.

delete_all_in_khepri(VHostName) ->
rabbit_khepri:transaction(
fun() ->
delete_all_in_khepri_tx(VHostName)
end, rw, #{timeout => infinity}).

delete_all_in_khepri_tx(VHostName) ->
Pattern = khepri_exchange_path(VHostName, ?KHEPRI_WILDCARD_STAR),
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
Deletions =
maps:fold(
fun(_Path, #{data := X}, Deletions) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.

%% -------------------------------------------------------------------
%% delete_serial().
%% -------------------------------------------------------------------
Expand Down
13 changes: 12 additions & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
update_scratch/3, update_decorators/2, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, route/3, delete/3, validate_binding/2, count/0,
ensure_deleted/3]).
ensure_deleted/3, delete_all/2]).
-export([list_names/0]).
-export([serialise_events/1]).
-export([serial/1, peek_serial/1]).
Expand Down Expand Up @@ -484,6 +484,17 @@ delete(XName, IfUnused, Username) ->
XName#resource.name, Username)
end.

-spec delete_all(VHostName, ActingUser) -> Ret when
VHostName :: vhost:name(),
ActingUser :: rabbit_types:username(),
Ret :: ok.

delete_all(VHostName, ActingUser) ->
{ok, Deletions} = rabbit_db_exchange:delete_all(VHostName),
Deletions1 = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions1, ActingUser),
ok.

process_deletions({error, _} = E) ->
E;
process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ delete(VHost, ActingUser) ->
assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser)
end || Q <- rabbit_amqqueue:list(VHost)],
rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]),
[ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) ||
#exchange{name = Name} <- rabbit_exchange:list(VHost)],
ok = rabbit_exchange:delete_all(VHost, ActingUser),
rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]),
_ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser),
rabbit_log:debug("Removing vhost '~ts' from the metadata storage because it's being deleted", [VHost]),
Expand Down
18 changes: 12 additions & 6 deletions deps/rabbitmq_event_exchange/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ all() ->
authentication,
audit_queue,
audit_exchange,
audit_exchange_internal_parameter,
audit_binding,
audit_vhost,
audit_vhost_deletion,
audit_channel,
audit_connection,
audit_direct_connection,
audit_consumer,
audit_vhost_internal_parameter,
audit_parameter,
audit_policy,
audit_vhost_limit,
Expand Down Expand Up @@ -272,13 +272,19 @@ audit_consumer(Config) ->
rabbit_ct_client_helpers:close_channel(Ch),
ok.

audit_vhost_internal_parameter(Config) ->
audit_exchange_internal_parameter(Config) ->
Ch = declare_event_queue(Config, <<"parameter.*">>),
User = <<"Bugs Bunny">>,
Vhost = <<"test-vhost">>,

rabbit_ct_broker_helpers:add_vhost(Config, 0, Vhost, User),
rabbit_ct_broker_helpers:delete_vhost(Config, 0, Vhost, User),
X = <<"exchange.audited-for-parameters">>,
#'exchange.declare_ok'{} =
amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
type = <<"topic">>}),
#'exchange.delete_ok'{} =
amqp_channel:call(Ch, #'exchange.delete'{exchange = X}),

User = proplists:get_value(rmq_username, Config),
%% Exchange deletion sets and clears a runtime parameter which acts as a
%% kind of lock:
receive_user_in_event(<<"parameter.set">>, User),
receive_user_in_event(<<"parameter.cleared">>, User),

Expand Down

0 comments on commit 2f6a0ce

Please sign in to comment.