Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions deps/rabbit/src/rabbit_fifo_dlx_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

-include_lib("kernel/include/logger.hrl").


-export([checkout/3, settle/2, handle_ra_event/3,
-export([checkout/3, settle/2, handle_registration/1, handle_ra_event/3,
overview/1]).

-record(state,{
Expand All @@ -35,30 +34,25 @@ settle(MsgIds, #state{leader = Leader} = State)
{ok, State}.

-spec checkout(rabbit_amqqueue:name(), ra:server_id(), non_neg_integer()) ->
{ok, state()} | {error, non_local_leader | ra_command_failed}.
{reference(), state()}.
checkout(QResource, Leader, NumUnsettled) ->
Cmd = rabbit_fifo_dlx:make_checkout(self(), NumUnsettled),
State = #state{queue_resource = QResource,
leader = Leader,
last_msg_id = -1},
process_command(Cmd, State, 5).
Corr = make_ref(),
ra:pipeline_command(Leader, Cmd, Corr, normal),
{Corr, #state{queue_resource = QResource,
leader = Leader,
last_msg_id = -1}}.

process_command(_Cmd, _State, 0) ->
{error, ra_command_failed};
process_command(Cmd, #state{leader = Leader} = State, Tries) ->
case ra:process_command(Leader, Cmd, 60_000) of
{ok, ok, Leader} ->
{ok, State#state{leader = Leader}};
{ok, ok, NonLocalLeader} ->
?LOG_WARNING("Failed to process command ~tp on quorum queue leader ~tp because actual leader is ~tp.",
[Cmd, Leader, NonLocalLeader]),
{error, non_local_leader};
Err ->
?LOG_WARNING("Failed to process command ~tp on quorum queue leader ~tp: ~tp~n"
"Trying ~b more time(s)...",
[Cmd, Leader, Err, Tries]),
process_command(Cmd, State, Tries - 1)
end.
-spec handle_registration(term()) ->
{registered, reference()} | {not_leader, reference()} | ignore.
handle_registration({{_, _}, {applied, [{Corr, ok}]}})
when is_reference(Corr) ->
{registered, Corr};
handle_registration({{_, _}, {rejected, {not_leader, _Leader, Corr}}})
when is_reference(Corr) ->
{not_leader, Corr};
handle_registration(_Evt) ->
ignore.

-spec handle_ra_event(pid(), term(), state()) ->
{ok, state(), actions()}.
Expand Down
67 changes: 40 additions & 27 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
queue_ref :: rabbit_amqqueue:name(),
%% monitors source queue
monitor_ref :: reference(),
%% Set to a reference while waiting for the checkout command to be
%% applied by the leader. Cleared to 'undefined' once registered.
checkout_corr :: undefined | reference(),
%% configured (x-)dead-letter-exchange of source queue
exchange_ref :: rabbit_exchange:name() | undefined,
%% configured (x-)dead-letter-routing-key of source queue
Expand Down Expand Up @@ -116,20 +119,19 @@ handle_continue(QRef, undefined) ->
dead_letter_worker_consumer_prefetch),
{ok, SettleTimeout} = application:get_env(rabbit,
dead_letter_worker_publisher_confirm_timeout),
{ok, Q} = rabbit_amqqueue:lookup(QRef),
{ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q),
case rabbit_fifo_dlx_client:checkout(QRef, {ClusterName, node()}, Prefetch) of
{ok, ConsumerState} ->
{noreply, lookup_topology(#state{queue_ref = QRef,
queue_type_state = rabbit_queue_type:init(),
settle_timeout = SettleTimeout,
dlx_client_state = ConsumerState,
monitor_ref = erlang:monitor(process, ClusterName)
})};
{error, non_local_leader = Reason} ->
{stop, {shutdown, Reason}, undefined};
Error ->
{stop, Error, undefined}
case rabbit_amqqueue:lookup(QRef) of
{ok, Q} ->
{ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q),
{Corr, ConsumerState} = rabbit_fifo_dlx_client:checkout(
QRef, {ClusterName, node()}, Prefetch),
{noreply, #state{queue_ref = QRef,
monitor_ref = erlang:monitor(process, ClusterName),
checkout_corr = Corr,
dlx_client_state = ConsumerState,
queue_type_state = rabbit_queue_type:init(),
settle_timeout = SettleTimeout}};
{error, not_found} ->
{stop, {shutdown, queue_not_found}, undefined}
end.

terminate(_Reason, State) ->
Expand All @@ -139,24 +141,37 @@ handle_call(Request, From, State) ->
?LOG_INFO("~ts received unhandled call from ~tp: ~tp", [?MODULE, From, Request]),
{noreply, State}.

handle_cast({dlx_event, _LeaderPid, lookup_topology},
#state{queue_ref = _} = State0) ->
%% While registering, only handle the checkout confirmation delivered
%% via the ra_event_formatter as a queue_event.
handle_cast({queue_event, _QRef, Evt},
#state{checkout_corr = Corr} = State0)
when is_reference(Corr) ->
case rabbit_fifo_dlx_client:handle_registration(Evt) of
{registered, Corr} ->
{noreply, lookup_topology(State0#state{checkout_corr = undefined})};
{not_leader, Corr} ->
{stop, {shutdown, not_leader}, State0};
ignore ->
{noreply, State0}
end;
handle_cast(_Msg, #state{checkout_corr = Corr} = State)
when is_reference(Corr) ->
{noreply, State};
handle_cast({dlx_event, _LeaderPid, lookup_topology}, State0) ->
State = lookup_topology(State0),
redeliver_and_ack(State);
handle_cast({dlx_event, LeaderPid, Evt},
#state{queue_ref = _QRef,
dlx_client_state = DlxState0} = State0) ->
%% received dead-letter message from source queue
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0),
State1 = State0#state{dlx_client_state = DlxState},
State = handle_queue_actions(Actions, State1),
{noreply, State};
#state{dlx_client_state = DlxState0} = State0) ->
case rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0) of
{ok, DlxState, Actions} ->
State1 = State0#state{dlx_client_state = DlxState},
State = handle_queue_actions(Actions, State1),
{noreply, State}
end;
handle_cast({queue_event, QRef, Evt},
#state{queue_type_state = QTypeState0} = State0) ->

case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of
{ok, QTypeState1, Actions} ->
%% received e.g. confirm from target queue
State1 = State0#state{queue_type_state = QTypeState1},
State = handle_queue_actions(Actions, State1),
{noreply, State};
Expand All @@ -182,8 +197,6 @@ redeliver_and_ack(State0) ->
handle_info({'DOWN', Ref, process, _, _},
#state{monitor_ref = Ref,
queue_ref = QRef}) ->
%% Source quorum queue is down. Therefore, terminate ourself.
%% The new leader will re-create another dlx_worker.
?LOG_DEBUG("~ts terminating itself because leader of ~ts is down...",
[?MODULE, rabbit_misc:rs(QRef)]),
supervisor:terminate_child(rabbit_fifo_dlx_sup, self());
Expand Down
Loading