Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2846,6 +2846,30 @@ end}.
end
}.

%%
%% Channel interceptors
%%

%% Priority levels for channel interceptor modules. Lower values run first, default is 0.
%%
%% e.g. channel_interceptor.priority.my_interceptor_module = 5
%%

{mapping, "channel_interceptor.priority.$name", "rabbit.channel_interceptor_priorities", [
{datatype, integer}]}.

{translation, "rabbit.channel_interceptor_priorities",
fun(Conf) ->
case cuttlefish_variable:filter_by_prefix("channel_interceptor.priority", Conf) of
[] ->
cuttlefish:unset();
L ->
[{list_to_atom(Name), Priority}
|| {["channel_interceptor", "priority", Name], Priority} <- L]
end
end
}.

%%
%% Message interceptors
%%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
-include_lib("rabbit_common/include/rabbit_framing.hrl").


-compile(export_all).
-export([init/1, description/0, intercept/3, applies_to/0]).

init(_Ch) ->
undefined.
Expand Down
49 changes: 34 additions & 15 deletions deps/rabbit/src/rabbit_channel_interceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-include_lib("rabbit_common/include/rabbit.hrl").

-export([init/1, intercept_in/3]).
-export([init/1, intercept_in/3, list/0]).

-behaviour(rabbit_registry_class).

Expand Down Expand Up @@ -37,29 +37,48 @@ added_to_rabbit_registry(_Type, _ModuleName) ->
removed_from_rabbit_registry(_Type) ->
rabbit_channel:refresh_interceptors().

list() ->
Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)],
[[{name, Mod}, {applies_to, Mod:applies_to()}, {priority, priority(Mod)}] || Mod <- Mods].

init(Ch) ->
Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)],
check_no_overlap(Mods),
[{Mod, Mod:init(Ch)} || Mod <- Mods].
Sorted = lists:sort(fun(A, B) -> priority(A) =< priority(B) end, Mods),
check_no_overlap(Sorted),
[{Mod, Mod:init(Ch)} || Mod <- Sorted].

%% Reject any two interceptors that share the same priority and handle the same
%% AMQP operation. Interceptors with different priorities may overlap freely.
check_no_overlap(Mods) ->
check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]).
ByPriority = lists:foldl(fun(Mod, Acc) ->
P = priority(Mod),
maps:update_with(P, fun(Ms) -> [Mod | Ms] end, [Mod], Acc)
end, #{}, Mods),
maps:foreach(fun(_Priority, Group) ->
check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Group])
end, ByPriority).

%% Check no non-empty pairwise intersection in a list of sets
check_no_overlap1(Sets) ->
_ = lists:foldl(fun(Set, Union) ->
Is = sets:intersection(Set, Union),
case sets:size(Is) of
0 -> ok;
_ ->
internal_error("Interceptor: more than one module handles ~tp", [Is])
end,
sets:union(Set, Union)
end,
sets:new(),
Sets),
Is = sets:intersection(Set, Union),
case sets:size(Is) of
0 -> ok;
_ ->
internal_error("Interceptor: more than one module handles ~tp", [Is])
end,
sets:union(Set, Union)
end,
sets:new(),
Sets),
ok.

priority(Mod) ->
Priorities = application:get_env(rabbit, channel_interceptor_priorities, []),
case lists:keyfind(Mod, 1, Priorities) of
{Mod, P} -> P;
false -> 0
end.

intercept_in(M, C, Mods) ->
lists:foldl(fun({Mod, ModState}, {M1, C1}) ->
call_module(Mod, ModState, M1, C1)
Expand Down
97 changes: 96 additions & 1 deletion deps/rabbit/test/channel_interceptor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ groups() ->
register_interceptor,
register_interceptor_failing_with_amqp_error,
register_interceptor_crashing_with_amqp_error_exception,
register_failing_interceptors
register_failing_interceptors,
multiple_interceptors_ordered_by_priority,
reject_interceptors_with_same_priority_for_same_operation,
priority_overridden_by_config
]}
].

Expand Down Expand Up @@ -213,6 +216,98 @@ register_failing_interceptors(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).

multiple_interceptors_ordered_by_priority(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, multiple_interceptors_ordered_by_priority1, [Config]).

multiple_interceptors_ordered_by_priority1(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
QName = <<"multiple-interceptors-q">>,
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true}),

ok = application:set_env(rabbit, channel_interceptor_priorities,
[{dummy_interceptor_priority_1, 1},
{dummy_interceptor_priority_2, 2},
{dummy_interceptor_priority_3, 3}]),

ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 3">>, dummy_interceptor_priority_3),
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 2">>, dummy_interceptor_priority_2),
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 1">>, dummy_interceptor_priority_1),

%% Interceptors run in ascending priority order regardless of registration order,
%% so the payload becomes <<"foo1">>, then <<"foo12">>, then <<"foo123">>.
check_send_receive(Ch, QName, <<"foo">>, <<"foo123">>),

ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1">>),
ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 2">>),
ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 3">>),
ok = application:unset_env(rabbit, channel_interceptor_priorities),

#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
passed.

reject_interceptors_with_same_priority_for_same_operation(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, reject_interceptors_with_same_priority_for_same_operation1, [Config]).

reject_interceptors_with_same_priority_for_same_operation1(_Config) ->
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 1">>,
dummy_interceptor_priority_1),
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 1 conflict">>,
dummy_interceptor_priority_1_conflict),
try
%% Initialising interceptors must fail: two interceptors with the same
%% priority handle the same AMQP operation.
rabbit_channel_interceptor:init(self())
catch
exit:{amqp_error, internal_error, _, _} -> ok
after
rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1">>),
rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1 conflict">>)
end,
passed.

priority_overridden_by_config(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, priority_overridden_by_config1, [Config]).

priority_overridden_by_config1(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
QName = <<"priority-override-q">>,
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true}),

%% priority_1 (config priority=1) runs before priority_3 (config priority=3),
%% so the result is <<"foo13">>.
ok = application:set_env(rabbit, channel_interceptor_priorities,
[{dummy_interceptor_priority_1, 1},
{dummy_interceptor_priority_3, 3}]),
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 1">>, dummy_interceptor_priority_1),
ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor priority 3">>, dummy_interceptor_priority_3),
check_send_receive(Ch, QName, <<"foo">>, <<"foo13">>),

%% Reconfigure priority_3 to run first (priority=0). Now the result is <<"foo31">>.
ok = application:set_env(rabbit, channel_interceptor_priorities,
[{dummy_interceptor_priority_1, 1},
{dummy_interceptor_priority_3, 0}]),
rabbit_channel:refresh_interceptors(),
check_send_receive(Ch, QName, <<"foo">>, <<"foo31">>),

ok = application:unset_env(rabbit, channel_interceptor_priorities),
ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1">>),
ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 3">>),

#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
passed.

check_send_receive(Ch1, QName, Send, Receive) ->
amqp_channel:call(Ch1,
#'basic.publish'{routing_key = QName},
Expand Down
16 changes: 16 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -1771,5 +1771,21 @@ credential_validator.regexp = ^abc\\d+",
{certfile,"test/config_schema_SUITE_data/certs/server_certificate.pem"},
{keyfile,"test/config_schema_SUITE_data/certs/server_key.pem"},
{psk_identity, "my_identity"}]}]}],
[]},
{channel_interceptor_single_priority,
"channel_interceptor.priority.my_interceptor_module_1 = 5",
[{rabbit,
[{channel_interceptor_priorities,
[{my_interceptor_module_1, 5}]}]}],
[]},
{channel_interceptor_multiple_priorities,
"channel_interceptor.priority.my_interceptor_module_1 = 5
channel_interceptor.priority.my_interceptor_module_2 = 10
channel_interceptor.priority.my_interceptor_module_3 = 15",
[{rabbit,
[{channel_interceptor_priorities,
[{my_interceptor_module_1, 5},
{my_interceptor_module_2, 10},
{my_interceptor_module_3, 15}]}]}],
[]}
].
27 changes: 27 additions & 0 deletions deps/rabbit/test/dummy_interceptor_priority_1.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(dummy_interceptor_priority_1).

-behaviour(rabbit_channel_interceptor).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

-compile(export_all).

init(_Ch) -> undefined.

description() -> [{description, <<"Appends '1' to basic.publish payload">>}].

intercept(#'basic.publish'{} = Method, Content, _State) ->
Payload = iolist_to_binary(lists:reverse(Content#content.payload_fragments_rev)),
{Method, Content#content{payload_fragments_rev = [<<Payload/binary, "1">>]}};
intercept(Method, Content, _State) ->
{Method, Content}.

applies_to() -> ['basic.publish'].
24 changes: 24 additions & 0 deletions deps/rabbit/test/dummy_interceptor_priority_1_conflict.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(dummy_interceptor_priority_1_conflict).

-behaviour(rabbit_channel_interceptor).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

-compile(export_all).

init(_Ch) -> undefined.

description() -> [{description, <<"Conflicts with dummy_interceptor_priority_1">>}].

intercept(Method, Content, _State) ->
{Method, Content}.

applies_to() -> ['basic.publish'].
27 changes: 27 additions & 0 deletions deps/rabbit/test/dummy_interceptor_priority_2.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(dummy_interceptor_priority_2).

-behaviour(rabbit_channel_interceptor).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

-compile(export_all).

init(_Ch) -> undefined.

description() -> [{description, <<"Appends '2' to basic.publish payload">>}].

intercept(#'basic.publish'{} = Method, Content, _State) ->
Payload = iolist_to_binary(lists:reverse(Content#content.payload_fragments_rev)),
{Method, Content#content{payload_fragments_rev = [<<Payload/binary, "2">>]}};
intercept(Method, Content, _State) ->
{Method, Content}.

applies_to() -> ['basic.publish'].
27 changes: 27 additions & 0 deletions deps/rabbit/test/dummy_interceptor_priority_3.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(dummy_interceptor_priority_3).

-behaviour(rabbit_channel_interceptor).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

-compile(export_all).

init(_Ch) -> undefined.

description() -> [{description, <<"Appends '3' to basic.publish payload">>}].

intercept(#'basic.publish'{} = Method, Content, _State) ->
Payload = iolist_to_binary(lists:reverse(Content#content.payload_fragments_rev)),
{Method, Content#content{payload_fragments_rev = [<<Payload/binary, "3">>]}};
intercept(Method, Content, _State) ->
{Method, Content}.

applies_to() -> ['basic.publish'].
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Ctl.Commands.ListChannelInterceptorsCommand do
@behaviour RabbitMQ.CLI.CommandBehaviour

def scopes(), do: [:ctl, :diagnostics]

use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def merge_defaults(args, opts) do
{args, Map.merge(%{table_headers: true}, opts)}
end

def run([], %{node: node_name, timeout: timeout}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_channel_interceptor, :list, [], timeout) do
{:badrpc, _} = err ->
err

interceptors ->
Enum.map(interceptors, fn info ->
Keyword.update!(info, :applies_to, &Enum.join(&1, ", "))
end)
end
end

use RabbitMQ.CLI.DefaultOutput

def formatter(), do: RabbitMQ.CLI.Formatters.Table

def usage, do: "list_channel_interceptors [--no-table-headers]"

def help_section(), do: :observability_and_health_checks

def description(),
do: "Lists registered channel interceptors with their name, intercepted AMQP operations, and priority"

def banner(_, _), do: "Listing channel interceptors ..."
end
Loading
Loading