diff --git a/deps/rabbit/src/rabbit_fifo_dlx_client.erl b/deps/rabbit/src/rabbit_fifo_dlx_client.erl index 54a03b79b843..d0a34c6fda71 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_client.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_client.erl @@ -8,8 +8,7 @@ -include_lib("kernel/include/logger.hrl"). - --export([checkout/3, settle/2, handle_ra_event/3, +-export([checkout/3, settle/2, handle_registration/1, handle_ra_event/3, overview/1]). -record(state,{ @@ -35,30 +34,25 @@ settle(MsgIds, #state{leader = Leader} = State) {ok, State}. -spec checkout(rabbit_amqqueue:name(), ra:server_id(), non_neg_integer()) -> - {ok, state()} | {error, non_local_leader | ra_command_failed}. + {reference(), state()}. checkout(QResource, Leader, NumUnsettled) -> Cmd = rabbit_fifo_dlx:make_checkout(self(), NumUnsettled), - State = #state{queue_resource = QResource, - leader = Leader, - last_msg_id = -1}, - process_command(Cmd, State, 5). + Corr = make_ref(), + ra:pipeline_command(Leader, Cmd, Corr, normal), + {Corr, #state{queue_resource = QResource, + leader = Leader, + last_msg_id = -1}}. -process_command(_Cmd, _State, 0) -> - {error, ra_command_failed}; -process_command(Cmd, #state{leader = Leader} = State, Tries) -> - case ra:process_command(Leader, Cmd, 60_000) of - {ok, ok, Leader} -> - {ok, State#state{leader = Leader}}; - {ok, ok, NonLocalLeader} -> - ?LOG_WARNING("Failed to process command ~tp on quorum queue leader ~tp because actual leader is ~tp.", - [Cmd, Leader, NonLocalLeader]), - {error, non_local_leader}; - Err -> - ?LOG_WARNING("Failed to process command ~tp on quorum queue leader ~tp: ~tp~n" - "Trying ~b more time(s)...", - [Cmd, Leader, Err, Tries]), - process_command(Cmd, State, Tries - 1) - end. +-spec handle_registration(term()) -> + {registered, reference()} | {not_leader, reference()} | ignore. +handle_registration({{_, _}, {applied, [{Corr, ok}]}}) + when is_reference(Corr) -> + {registered, Corr}; +handle_registration({{_, _}, {rejected, {not_leader, _Leader, Corr}}}) + when is_reference(Corr) -> + {not_leader, Corr}; +handle_registration(_Evt) -> + ignore. -spec handle_ra_event(pid(), term(), state()) -> {ok, state(), actions()}. diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 5d5c273ce4ef..561d5bc34538 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -73,6 +73,9 @@ queue_ref :: rabbit_amqqueue:name(), %% monitors source queue monitor_ref :: reference(), + %% Set to a reference while waiting for the checkout command to be + %% applied by the leader. Cleared to 'undefined' once registered. + checkout_corr :: undefined | reference(), %% configured (x-)dead-letter-exchange of source queue exchange_ref :: rabbit_exchange:name() | undefined, %% configured (x-)dead-letter-routing-key of source queue @@ -116,20 +119,19 @@ handle_continue(QRef, undefined) -> dead_letter_worker_consumer_prefetch), {ok, SettleTimeout} = application:get_env(rabbit, dead_letter_worker_publisher_confirm_timeout), - {ok, Q} = rabbit_amqqueue:lookup(QRef), - {ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q), - case rabbit_fifo_dlx_client:checkout(QRef, {ClusterName, node()}, Prefetch) of - {ok, ConsumerState} -> - {noreply, lookup_topology(#state{queue_ref = QRef, - queue_type_state = rabbit_queue_type:init(), - settle_timeout = SettleTimeout, - dlx_client_state = ConsumerState, - monitor_ref = erlang:monitor(process, ClusterName) - })}; - {error, non_local_leader = Reason} -> - {stop, {shutdown, Reason}, undefined}; - Error -> - {stop, Error, undefined} + case rabbit_amqqueue:lookup(QRef) of + {ok, Q} -> + {ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q), + {Corr, ConsumerState} = rabbit_fifo_dlx_client:checkout( + QRef, {ClusterName, node()}, Prefetch), + {noreply, #state{queue_ref = QRef, + monitor_ref = erlang:monitor(process, ClusterName), + checkout_corr = Corr, + dlx_client_state = ConsumerState, + queue_type_state = rabbit_queue_type:init(), + settle_timeout = SettleTimeout}}; + {error, not_found} -> + {stop, {shutdown, queue_not_found}, undefined} end. terminate(_Reason, State) -> @@ -139,24 +141,37 @@ handle_call(Request, From, State) -> ?LOG_INFO("~ts received unhandled call from ~tp: ~tp", [?MODULE, From, Request]), {noreply, State}. -handle_cast({dlx_event, _LeaderPid, lookup_topology}, - #state{queue_ref = _} = State0) -> +%% While registering, only handle the checkout confirmation delivered +%% via the ra_event_formatter as a queue_event. +handle_cast({queue_event, _QRef, Evt}, + #state{checkout_corr = Corr} = State0) + when is_reference(Corr) -> + case rabbit_fifo_dlx_client:handle_registration(Evt) of + {registered, Corr} -> + {noreply, lookup_topology(State0#state{checkout_corr = undefined})}; + {not_leader, Corr} -> + {stop, {shutdown, not_leader}, State0}; + ignore -> + {noreply, State0} + end; +handle_cast(_Msg, #state{checkout_corr = Corr} = State) + when is_reference(Corr) -> + {noreply, State}; +handle_cast({dlx_event, _LeaderPid, lookup_topology}, State0) -> State = lookup_topology(State0), redeliver_and_ack(State); handle_cast({dlx_event, LeaderPid, Evt}, - #state{queue_ref = _QRef, - dlx_client_state = DlxState0} = State0) -> - %% received dead-letter message from source queue - {ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0), - State1 = State0#state{dlx_client_state = DlxState}, - State = handle_queue_actions(Actions, State1), - {noreply, State}; + #state{dlx_client_state = DlxState0} = State0) -> + case rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0) of + {ok, DlxState, Actions} -> + State1 = State0#state{dlx_client_state = DlxState}, + State = handle_queue_actions(Actions, State1), + {noreply, State} + end; handle_cast({queue_event, QRef, Evt}, #state{queue_type_state = QTypeState0} = State0) -> - case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of {ok, QTypeState1, Actions} -> - %% received e.g. confirm from target queue State1 = State0#state{queue_type_state = QTypeState1}, State = handle_queue_actions(Actions, State1), {noreply, State}; @@ -182,8 +197,6 @@ redeliver_and_ack(State0) -> handle_info({'DOWN', Ref, process, _, _}, #state{monitor_ref = Ref, queue_ref = QRef}) -> - %% Source quorum queue is down. Therefore, terminate ourself. - %% The new leader will re-create another dlx_worker. ?LOG_DEBUG("~ts terminating itself because leader of ~ts is down...", [?MODULE, rabbit_misc:rs(QRef)]), supervisor:terminate_child(rabbit_fifo_dlx_sup, self());