diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 0a3d0b02e67..30b8c425cab 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2846,6 +2846,30 @@ end}. end }. +%% +%% Channel interceptors +%% + +%% Priority levels for channel interceptor modules. Lower values run first, default is 0. +%% +%% e.g. channel_interceptor.priority.my_interceptor_module = 5 +%% + +{mapping, "channel_interceptor.priority.$name", "rabbit.channel_interceptor_priorities", [ + {datatype, integer}]}. + +{translation, "rabbit.channel_interceptor_priorities", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("channel_interceptor.priority", Conf) of + [] -> + cuttlefish:unset(); + L -> + [{list_to_atom(Name), Priority} + || {["channel_interceptor", "priority", Name], Priority} <- L] + end + end +}. + %% %% Message interceptors %% diff --git a/deps/rabbit/test/dummy_interceptor.erl b/deps/rabbit/src/dummy_interceptor.erl similarity index 94% rename from deps/rabbit/test/dummy_interceptor.erl rename to deps/rabbit/src/dummy_interceptor.erl index 836ea744962..48ecc9ca3f1 100644 --- a/deps/rabbit/test/dummy_interceptor.erl +++ b/deps/rabbit/src/dummy_interceptor.erl @@ -6,7 +6,7 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). --compile(export_all). +-export([init/1, description/0, intercept/3, applies_to/0]). init(_Ch) -> undefined. diff --git a/deps/rabbit/src/rabbit_channel_interceptor.erl b/deps/rabbit/src/rabbit_channel_interceptor.erl index c4b1b1c94e8..b999df6bfc5 100644 --- a/deps/rabbit/src/rabbit_channel_interceptor.erl +++ b/deps/rabbit/src/rabbit_channel_interceptor.erl @@ -9,7 +9,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). --export([init/1, intercept_in/3]). +-export([init/1, intercept_in/3, list/0]). -behaviour(rabbit_registry_class). @@ -37,29 +37,48 @@ added_to_rabbit_registry(_Type, _ModuleName) -> removed_from_rabbit_registry(_Type) -> rabbit_channel:refresh_interceptors(). +list() -> + Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)], + [[{name, Mod}, {applies_to, Mod:applies_to()}, {priority, priority(Mod)}] || Mod <- Mods]. + init(Ch) -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)], - check_no_overlap(Mods), - [{Mod, Mod:init(Ch)} || Mod <- Mods]. + Sorted = lists:sort(fun(A, B) -> priority(A) =< priority(B) end, Mods), + check_no_overlap(Sorted), + [{Mod, Mod:init(Ch)} || Mod <- Sorted]. +%% Reject any two interceptors that share the same priority and handle the same +%% AMQP operation. Interceptors with different priorities may overlap freely. check_no_overlap(Mods) -> - check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]). + ByPriority = lists:foldl(fun(Mod, Acc) -> + P = priority(Mod), + maps:update_with(P, fun(Ms) -> [Mod | Ms] end, [Mod], Acc) + end, #{}, Mods), + maps:foreach(fun(_Priority, Group) -> + check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Group]) + end, ByPriority). -%% Check no non-empty pairwise intersection in a list of sets check_no_overlap1(Sets) -> _ = lists:foldl(fun(Set, Union) -> - Is = sets:intersection(Set, Union), - case sets:size(Is) of - 0 -> ok; - _ -> - internal_error("Interceptor: more than one module handles ~tp", [Is]) - end, - sets:union(Set, Union) - end, - sets:new(), - Sets), + Is = sets:intersection(Set, Union), + case sets:size(Is) of + 0 -> ok; + _ -> + internal_error("Interceptor: more than one module handles ~tp", [Is]) + end, + sets:union(Set, Union) + end, + sets:new(), + Sets), ok. +priority(Mod) -> + Priorities = application:get_env(rabbit, channel_interceptor_priorities, []), + case lists:keyfind(Mod, 1, Priorities) of + {Mod, P} -> P; + false -> 0 + end. + intercept_in(M, C, Mods) -> lists:foldl(fun({Mod, ModState}, {M1, C1}) -> call_module(Mod, ModState, M1, C1) diff --git a/deps/rabbit/test/channel_interceptor_SUITE.erl b/deps/rabbit/test/channel_interceptor_SUITE.erl index b1c6685c7fa..b551c69ffb6 100644 --- a/deps/rabbit/test/channel_interceptor_SUITE.erl +++ b/deps/rabbit/test/channel_interceptor_SUITE.erl @@ -24,7 +24,10 @@ groups() -> register_interceptor, register_interceptor_failing_with_amqp_error, register_interceptor_crashing_with_amqp_error_exception, - register_failing_interceptors + register_failing_interceptors, + multiple_interceptors_ordered_by_priority, + reject_interceptors_with_same_priority_for_same_operation, + priority_overridden_by_config ]} ]. @@ -213,6 +216,98 @@ register_failing_interceptors(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]). +multiple_interceptors_ordered_by_priority(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, multiple_interceptors_ordered_by_priority1, [Config]). + +multiple_interceptors_ordered_by_priority1(Config) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + QName = <<"multiple-interceptors-q">>, + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true}), + + ok = application:set_env(rabbit, channel_interceptor_priorities, + [{dummy_interceptor_priority_1, 1}, + {dummy_interceptor_priority_2, 2}, + {dummy_interceptor_priority_3, 3}]), + + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 3">>, dummy_interceptor_priority_3), + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 2">>, dummy_interceptor_priority_2), + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 1">>, dummy_interceptor_priority_1), + + %% Interceptors run in ascending priority order regardless of registration order, + %% so the payload becomes <<"foo1">>, then <<"foo12">>, then <<"foo123">>. + check_send_receive(Ch, QName, <<"foo">>, <<"foo123">>), + + ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1">>), + ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 2">>), + ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 3">>), + ok = application:unset_env(rabbit, channel_interceptor_priorities), + + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + passed. + +reject_interceptors_with_same_priority_for_same_operation(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, reject_interceptors_with_same_priority_for_same_operation1, [Config]). + +reject_interceptors_with_same_priority_for_same_operation1(_Config) -> + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 1">>, + dummy_interceptor_priority_1), + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 1 conflict">>, + dummy_interceptor_priority_1_conflict), + try + %% Initialising interceptors must fail: two interceptors with the same + %% priority handle the same AMQP operation. + rabbit_channel_interceptor:init(self()) + catch + exit:{amqp_error, internal_error, _, _} -> ok + after + rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1">>), + rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1 conflict">>) + end, + passed. + +priority_overridden_by_config(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, priority_overridden_by_config1, [Config]). + +priority_overridden_by_config1(Config) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + QName = <<"priority-override-q">>, + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true}), + + %% priority_1 (config priority=1) runs before priority_3 (config priority=3), + %% so the result is <<"foo13">>. + ok = application:set_env(rabbit, channel_interceptor_priorities, + [{dummy_interceptor_priority_1, 1}, + {dummy_interceptor_priority_3, 3}]), + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 1">>, dummy_interceptor_priority_1), + ok = rabbit_registry:register(channel_interceptor, + <<"dummy interceptor priority 3">>, dummy_interceptor_priority_3), + check_send_receive(Ch, QName, <<"foo">>, <<"foo13">>), + + %% Reconfigure priority_3 to run first (priority=0). Now the result is <<"foo31">>. + ok = application:set_env(rabbit, channel_interceptor_priorities, + [{dummy_interceptor_priority_1, 1}, + {dummy_interceptor_priority_3, 0}]), + rabbit_channel:refresh_interceptors(), + check_send_receive(Ch, QName, <<"foo">>, <<"foo31">>), + + ok = application:unset_env(rabbit, channel_interceptor_priorities), + ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 1">>), + ok = rabbit_registry:unregister(channel_interceptor, <<"dummy interceptor priority 3">>), + + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + passed. + check_send_receive(Ch1, QName, Send, Receive) -> amqp_channel:call(Ch1, #'basic.publish'{routing_key = QName}, diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 6da29b5651e..80cef7ae941 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1771,5 +1771,21 @@ credential_validator.regexp = ^abc\\d+", {certfile,"test/config_schema_SUITE_data/certs/server_certificate.pem"}, {keyfile,"test/config_schema_SUITE_data/certs/server_key.pem"}, {psk_identity, "my_identity"}]}]}], + []}, + {channel_interceptor_single_priority, + "channel_interceptor.priority.my_interceptor_module_1 = 5", + [{rabbit, + [{channel_interceptor_priorities, + [{my_interceptor_module_1, 5}]}]}], + []}, + {channel_interceptor_multiple_priorities, + "channel_interceptor.priority.my_interceptor_module_1 = 5 + channel_interceptor.priority.my_interceptor_module_2 = 10 + channel_interceptor.priority.my_interceptor_module_3 = 15", + [{rabbit, + [{channel_interceptor_priorities, + [{my_interceptor_module_1, 5}, + {my_interceptor_module_2, 10}, + {my_interceptor_module_3, 15}]}]}], []} ]. diff --git a/deps/rabbit/test/dummy_interceptor_priority_1.erl b/deps/rabbit/test/dummy_interceptor_priority_1.erl new file mode 100644 index 00000000000..edbbef70910 --- /dev/null +++ b/deps/rabbit/test/dummy_interceptor_priority_1.erl @@ -0,0 +1,27 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(dummy_interceptor_priority_1). + +-behaviour(rabbit_channel_interceptor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-compile(export_all). + +init(_Ch) -> undefined. + +description() -> [{description, <<"Appends '1' to basic.publish payload">>}]. + +intercept(#'basic.publish'{} = Method, Content, _State) -> + Payload = iolist_to_binary(lists:reverse(Content#content.payload_fragments_rev)), + {Method, Content#content{payload_fragments_rev = [<>]}}; +intercept(Method, Content, _State) -> + {Method, Content}. + +applies_to() -> ['basic.publish']. diff --git a/deps/rabbit/test/dummy_interceptor_priority_1_conflict.erl b/deps/rabbit/test/dummy_interceptor_priority_1_conflict.erl new file mode 100644 index 00000000000..65e7f026069 --- /dev/null +++ b/deps/rabbit/test/dummy_interceptor_priority_1_conflict.erl @@ -0,0 +1,24 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(dummy_interceptor_priority_1_conflict). + +-behaviour(rabbit_channel_interceptor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-compile(export_all). + +init(_Ch) -> undefined. + +description() -> [{description, <<"Conflicts with dummy_interceptor_priority_1">>}]. + +intercept(Method, Content, _State) -> + {Method, Content}. + +applies_to() -> ['basic.publish']. diff --git a/deps/rabbit/test/dummy_interceptor_priority_2.erl b/deps/rabbit/test/dummy_interceptor_priority_2.erl new file mode 100644 index 00000000000..a325f7d57cc --- /dev/null +++ b/deps/rabbit/test/dummy_interceptor_priority_2.erl @@ -0,0 +1,27 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(dummy_interceptor_priority_2). + +-behaviour(rabbit_channel_interceptor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-compile(export_all). + +init(_Ch) -> undefined. + +description() -> [{description, <<"Appends '2' to basic.publish payload">>}]. + +intercept(#'basic.publish'{} = Method, Content, _State) -> + Payload = iolist_to_binary(lists:reverse(Content#content.payload_fragments_rev)), + {Method, Content#content{payload_fragments_rev = [<>]}}; +intercept(Method, Content, _State) -> + {Method, Content}. + +applies_to() -> ['basic.publish']. diff --git a/deps/rabbit/test/dummy_interceptor_priority_3.erl b/deps/rabbit/test/dummy_interceptor_priority_3.erl new file mode 100644 index 00000000000..12d609f6147 --- /dev/null +++ b/deps/rabbit/test/dummy_interceptor_priority_3.erl @@ -0,0 +1,27 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(dummy_interceptor_priority_3). + +-behaviour(rabbit_channel_interceptor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-compile(export_all). + +init(_Ch) -> undefined. + +description() -> [{description, <<"Appends '3' to basic.publish payload">>}]. + +intercept(#'basic.publish'{} = Method, Content, _State) -> + Payload = iolist_to_binary(lists:reverse(Content#content.payload_fragments_rev)), + {Method, Content#content{payload_fragments_rev = [<>]}}; +intercept(Method, Content, _State) -> + {Method, Content}. + +applies_to() -> ['basic.publish']. diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/list_channel_interceptors_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/list_channel_interceptors_command.ex new file mode 100644 index 00000000000..c8e1151e005 --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/list_channel_interceptors_command.ex @@ -0,0 +1,44 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Ctl.Commands.ListChannelInterceptorsCommand do + @behaviour RabbitMQ.CLI.CommandBehaviour + + def scopes(), do: [:ctl, :diagnostics] + + use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout + use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + + def merge_defaults(args, opts) do + {args, Map.merge(%{table_headers: true}, opts)} + end + + def run([], %{node: node_name, timeout: timeout}) do + case :rabbit_misc.rpc_call(node_name, :rabbit_channel_interceptor, :list, [], timeout) do + {:badrpc, _} = err -> + err + + interceptors -> + Enum.map(interceptors, fn info -> + Keyword.update!(info, :applies_to, &Enum.join(&1, ", ")) + end) + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.Table + + def usage, do: "list_channel_interceptors [--no-table-headers]" + + def help_section(), do: :observability_and_health_checks + + def description(), + do: "Lists registered channel interceptors with their name, intercepted AMQP operations, and priority" + + def banner(_, _), do: "Listing channel interceptors ..." +end diff --git a/deps/rabbitmq_cli/test/ctl/list_channel_interceptors_command_test.exs b/deps/rabbitmq_cli/test/ctl/list_channel_interceptors_command_test.exs new file mode 100644 index 00000000000..24b94965717 --- /dev/null +++ b/deps/rabbitmq_cli/test/ctl/list_channel_interceptors_command_test.exs @@ -0,0 +1,87 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule ListChannelInterceptorsCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Ctl.Commands.ListChannelInterceptorsCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + { + :ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || :infinity + } + } + end + + test "merge_defaults: adds table_headers true to opts", context do + {args, opts} = @command.merge_defaults([], context[:opts]) + assert args == [] + assert opts[:table_headers] == true + end + + test "validate: accepts no positional arguments", context do + assert @command.validate([], context[:opts]) == :ok + end + + test "validate: rejects any positional arguments", context do + assert @command.validate(["extra"], context[:opts]) == + {:validation_failure, :too_many_args} + end + + test "run: on a bad RabbitMQ node, returns a badrpc" do + opts = %{node: :jake@thedog, timeout: 200} + assert match?({:badrpc, _}, @command.run([], opts)) + end + + @tag test_timeout: :infinity + test "run: with no interceptors registered, returns an empty list", context do + result = @command.run([], context[:opts]) + assert result == [] + end + + @tag test_timeout: :infinity + test "run: registered interceptors are listed with applies_to joined as a string", context do + node = get_rabbit_hostname() + + :ok = + :rabbit_misc.rpc_call( + node, + :rabbit_registry, + :register, + [:channel_interceptor, <<"test interceptor">>, :dummy_interceptor] + ) + + try do + result = @command.run([], context[:opts]) + assert is_list(result) + interceptor = Enum.find(result, fn info -> info[:name] == :dummy_interceptor end) + assert interceptor != nil + assert is_binary(interceptor[:applies_to]) + assert is_integer(interceptor[:priority]) + after + :rabbit_misc.rpc_call( + node, + :rabbit_registry, + :unregister, + [:channel_interceptor, <<"test interceptor">>] + ) + end + end + + test "banner: returns the expected string", context do + assert @command.banner([], context[:opts]) == "Listing channel interceptors ..." + end +end