Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo_dlx_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
rabbit_fifo_dlx_worker:init_ets(),
SupFlags = #{strategy => simple_one_for_one,
intensity => 100,
period => 1},
Expand Down
59 changes: 48 additions & 11 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@

-behaviour(gen_server).

-export([init_ets/0, block_reasons/1]).
-export([start_link/1]).
%% gen_server callbacks
-export([init/1, terminate/2, handle_continue/2,
handle_cast/2, handle_call/3, handle_info/2,
code_change/3, format_status/1]).

-define(HIBERNATE_AFTER, 4*60*1000).
-define(UNROUTABLE_TABLE, rabbit_fifo_dlx_unroutable).

-record(pending, {
%% consumed_msg_id is not to be confused with consumer delivery tag.
Expand Down Expand Up @@ -100,6 +102,21 @@

-type state() :: #state{}.

init_ets() ->
ets:new(?UNROUTABLE_TABLE, [named_table, public]),
ok.

block_reasons(QRef) ->
try ets:lookup(?UNROUTABLE_TABLE, QRef) of
[{QRef, Count}] ->
Count;
[] ->
0
catch
error:badarg ->
0
end.

start_link(QRef) ->
gen_server:start_link(?MODULE, QRef, [{hibernate_after, ?HIBERNATE_AFTER}]).

Expand Down Expand Up @@ -131,7 +148,8 @@ handle_continue(QRef, undefined) ->
{stop, Error, undefined}
end.

terminate(_Reason, State) ->
terminate(_Reason, #state{queue_ref = QRef} = State) ->
_ = catch ets:delete(?UNROUTABLE_TABLE, QRef),
cancel_timer(State).

handle_call(Request, From, State) ->
Expand Down Expand Up @@ -623,26 +641,30 @@ log_missing_dlx_once(#state{exchange_ref = SameDlx,
State;
log_missing_dlx_once(#state{exchange_ref = DlxResource,
queue_ref = QueueResource,
logged = Logged} = State) ->
logged = Logged} = State0) ->
?LOG_WARNING("Cannot forward any dead-letter messages from source ~ts because "
"its configured dead-letter-exchange ~ts does not exist. "
"Either create the configured dead-letter-exchange or re-configure "
"the dead-letter-exchange policy for the source queue to prevent "
"dead-lettered messages from piling up in the source queue. "
"This message will not be logged again.",
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource)]),
State#state{logged = maps:put(missing_dlx, DlxResource, Logged)}.
State = State0#state{logged = maps:put(missing_dlx, DlxResource, Logged)},
set_unroutable(State),
State.

clear_log_missing_dlx_once(#state{exchange_ref = DlxResource,
queue_ref = QueueResource,
pendings = Pendings,
logged = #{missing_dlx := MissingDlx} = Logged} = State) ->
logged = #{missing_dlx := MissingDlx} = Logged} = State0) ->
?LOG_INFO("Dead-letter-exchange ~ts found for ~ts. Forwarding was previously "
"blocked since the configured dead-letter-exchange ~ts could not be found. "
"Forwarding of ~b pending dead-letter messages will be attempted.",
[rabbit_misc:rs(DlxResource), rabbit_misc:rs(QueueResource),
rabbit_misc:rs(MissingDlx), maps:size(Pendings)]),
State#state{logged = maps:remove(missing_dlx, Logged)};
State = State0#state{logged = maps:remove(missing_dlx, Logged)},
set_unroutable(State),
State;
clear_log_missing_dlx_once(State) ->
State.

Expand All @@ -653,7 +675,7 @@ log_no_route_once(#state{exchange_ref = SameDlx,
log_no_route_once(#state{queue_ref = QueueResource,
exchange_ref = DlxResource,
routing_key = RoutingKey,
logged = Logged} = State) ->
logged = Logged} = State0) ->
?LOG_WARNING("Cannot forward any dead-letter messages from source ~ts "
"with configured dead-letter-exchange ~ts and configured "
"dead-letter-routing-key '~ts'. This can happen either if the dead-letter "
Expand All @@ -664,21 +686,25 @@ log_no_route_once(#state{queue_ref = QueueResource,
"in the source queue. "
"This message will not be logged again.",
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource), RoutingKey]),
State#state{logged = maps:put(no_route, {DlxResource, RoutingKey}, Logged)}.
State = State0#state{logged = maps:put(no_route, {DlxResource, RoutingKey}, Logged)},
set_unroutable(State),
State.

clear_log_no_route_once(#state{exchange_ref = DlxResource,
routing_key = RoutingKey,
queue_ref = QueueResource,
pendings = Pendings,
logged = #{no_route := {OldDlx, OldRoutingKey}} = Logged} = State) ->
logged = #{no_route := {OldDlx, OldRoutingKey}} = Logged} = State0) ->
?LOG_INFO("Discovered a route to forward dead-letter messages from ~ts on "
"configured dead-letter-exchange ~ts and dead-letter-routing-key '~ts'. "
"Previously dead-letter messages could not be forwarded on configured "
"dead-letter-exchange ~ts and dead-letter-routing-key '~ts'. "
"Forwarding of ~b pending dead-letter messages will be attempted.",
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource),
RoutingKey, rabbit_misc:rs(OldDlx), OldRoutingKey, maps:size(Pendings)]),
State#state{logged = maps:remove(no_route, Logged)};
State = State0#state{logged = maps:remove(no_route, Logged)},
set_unroutable(State),
State;
clear_log_no_route_once(State) ->
State.

Expand All @@ -690,10 +716,21 @@ log_cycle_once(Queues, _, #state{logged = Logged} = State)
State;
log_cycle_once(Queues, RoutingKeys, #state{exchange_ref = DlxResource,
queue_ref = QueueResource,
logged = Logged} = State) ->
logged = Logged} = State0) ->
?LOG_WARNING("Dead-letter queues cycle detected for source ~ts "
"with dead-letter exchange ~ts and routing keys ~tp: ~tp "
"This message will not be logged again.",
[rabbit_misc:rs(QueueResource), rabbit_misc:rs(DlxResource),
RoutingKeys, Queues]),
State#state{logged = maps:put({cycle, Queues}, true, Logged)}.
State = State0#state{logged = maps:put({cycle, Queues}, true, Logged)},
set_unroutable(State),
State.

set_unroutable(#state{queue_ref = QRef, logged = Logged}) ->
Count = map_size(Logged),
try
ets:update_element(?UNROUTABLE_TABLE, QRef, {2, Count}, {QRef, Count})
catch
error:badarg ->
ok
end.
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,12 @@ handle_tick(QName,
(_, _, Acc) ->
Acc
end, info(Q, Keys), Overview),
Blocks = case Cfg of
#{dead_letter_handler := at_least_once} ->
rabbit_fifo_dlx_worker:block_reasons(QName);
_ ->
undefined
end,
MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes,
MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded,
Infos = [{consumers, NumConsumers},
Expand All @@ -664,6 +670,7 @@ handle_tick(QName,
{messages_persistent, NumMessages},
{messages_dlx, NumDiscarded + NumDiscardedCheckedOut},
{message_bytes_dlx, MsgBytesDiscarded},
{at_least_once_dlx_block_reasons, Blocks},
{single_active_consumer_tag, SacTag},
{single_active_consumer_pid, SacPid},
{leader, node()},
Expand Down
Loading