diff --git a/deps/rabbit/src/rabbit_fifo_dlx_client.erl b/deps/rabbit/src/rabbit_fifo_dlx_client.erl index eaca2644d5c..1181bae5b13 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_client.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_client.erl @@ -41,23 +41,38 @@ checkout(QResource, Leader, NumUnsettled) -> State = #state{queue_resource = QResource, leader = Leader, last_msg_id = -1}, - process_command(Cmd, State, 5). + checkout0(Cmd, State, 5). -process_command(_Cmd, _State, 0) -> +checkout0(_Cmd, _State, 0) -> {error, ra_command_failed}; -process_command(Cmd, #state{leader = Leader} = State, Tries) -> - case ra:process_command(Leader, Cmd, 60_000) of - {ok, ok, Leader} -> - {ok, State#state{leader = Leader}}; - {ok, ok, NonLocalLeader} -> - ?LOG_WARNING("Failed to process command ~tp on queue leader ~tp because actual leader is ~tp.", - [Cmd, Leader, NonLocalLeader]), - {error, non_local_leader}; - Err -> - ?LOG_WARNING("Failed to process command ~tp on queue leader ~tp: ~tp~n" - "Trying ~b more time(s)...", - [Cmd, Leader, Err, Tries]), - process_command(Cmd, State, Tries - 1) +checkout0(Cmd, #state{leader = Leader} = State, Tries) -> + Correlation = make_ref(), + %% We use ra:pipeline_command/4 instead of ra:process_command/3 because the + %% latter internally redirects to the new leader which we don't want. + ra:pipeline_command(Leader, Cmd, Correlation, normal), + receive_applied(Cmd, Correlation, State, Tries). + +receive_applied(Cmd, Corr, #state{queue_resource = QName, + leader = Leader} = State, Tries) -> + receive + {'$gen_cast', {queue_event, QName, {Leader, {applied, Results}}}} -> + case lists:member({Corr, ok}, Results) of + true -> + {ok, State}; + false -> + receive_applied(Cmd, Corr, State, Tries) + end; + {'$gen_cast', {queue_event, QName, + {_From, {rejected, {not_leader, NonLocalLeader, Corr}}}}} -> + ?LOG_WARNING("failed to apply command ~tp on leader ~tp " + "because actual leader is ~tp", + [Cmd, Leader, NonLocalLeader]), + {error, non_local_leader} + after 60_000 -> + ?LOG_WARNING("timed out applying command ~tp on leader ~tp; " + "trying ~b more time(s)...", + [Cmd, Leader, Tries - 1]), + checkout0(Cmd, State, Tries - 1) end. -spec handle_ra_event(pid(), term(), state()) -> diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index fbf1652f8c9..a4320fb5d73 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -38,35 +38,37 @@ all() -> [ - {group, single_node}, + {group, cluster_size_1}, {group, cluster_size_3} ]. groups() -> [ - {single_node, [shuffle], [ - expired, - rejected, - delivery_limit, - target_queue_not_bound, - target_queue_deleted, - dlx_missing, - cycle, - stats, - drop_head_falls_back_to_at_most_once, - switch_strategy, - reject_publish_source_queue_max_length, - reject_publish_source_queue_max_length_bytes, - reject_publish_target_classic_queue, - reject_publish_max_length_target_quorum_queue, - target_quorum_queue_delete_create - ]}, - {cluster_size_3, [], [ - reject_publish_max_length_target_quorum_queue, - reject_publish_down_target_quorum_queue, - many_target_queues, - single_dlx_worker - ]} + {cluster_size_1, [shuffle], + [ + expired, + rejected, + delivery_limit, + target_queue_not_bound, + target_queue_deleted, + dlx_missing, + cycle, + stats, + drop_head_falls_back_to_at_most_once, + switch_strategy, + reject_publish_source_queue_max_length, + reject_publish_source_queue_max_length_bytes, + reject_publish_target_classic_queue, + reject_publish_max_length_target_quorum_queue, + target_quorum_queue_delete_create + ]}, + {cluster_size_3, [], + [ + reject_publish_max_length_target_quorum_queue, + reject_publish_down_target_quorum_queue, + many_target_queues, + single_dlx_worker + ]} ]. init_per_suite(Config0) -> @@ -86,7 +88,7 @@ init_per_suite(Config0) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(single_node = Group, Config) -> +init_per_group(cluster_size_1 = Group, Config) -> init_per_group(Group, Config, 1); init_per_group(cluster_size_3 = Group, Config) -> init_per_group(Group, Config, 3). @@ -954,7 +956,7 @@ single_dlx_worker(Config) -> ok = rabbit_ct_broker_helpers:kill_node(Config, Leader0), {ok, _, {_, Leader1}} = ?awaitMatch({ok, _, _}, ra:members({RaName, Follower0}), - 30000), + 30_000), ?assertNotEqual(Leader0, Leader1), [Follower1] = [Server1, Follower0] -- [Leader1], assert_active_dlx_workers(0, Config, Follower1), @@ -962,7 +964,9 @@ single_dlx_worker(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, Leader0). assert_active_dlx_workers(N, Config, Server) -> - ?awaitMatch(N, length(rpc(Config, Server, supervisor, which_children, [rabbit_fifo_dlx_sup], 2000)), 60000). + ?awaitMatch(N, + length(rpc(Config, Server, supervisor, which_children, [rabbit_fifo_dlx_sup], 5000)), + 60_000). declare_queue(Channel, Queue, Args) -> #'queue.declare_ok'{} = amqp_channel:call(Channel, #'queue.declare'{