Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-behaviour(gen_server2).

-export([start_link/5, successfully_recovered_state/1,
gc_pid/1,
client_init/3, client_terminate/1, client_delete_and_terminate/1,
client_pre_hibernate/1, client_ref/1,
write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]).
Expand Down Expand Up @@ -399,6 +400,11 @@ start_link(VHost, Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).

-spec gc_pid(server()) -> pid().

gc_pid(Server) ->
gen_server2:call(Server, gc_pid, infinity).

-spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate().

client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
Expand Down Expand Up @@ -806,6 +812,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
successfully_recovered_state -> 7;
gc_pid -> 7;
{new_client_state, _Ref, _Pid, _MODC} -> 7;
_ -> 0
end.
Expand All @@ -826,6 +833,9 @@ prioritise_info(Msg, _Len, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);

handle_call(gc_pid, _From, State) ->
reply(State #msstate.gc_pid, State);

handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From,
State = #msstate { dir = Dir,
index_ets = IndexEts,
Expand Down Expand Up @@ -971,9 +981,13 @@ terminate(Reason, State = #msstate { index_ets = IndexEts,
_ -> {" with reason ~0p", [Reason]}
end,
?LOG_INFO("Stopping message store for directory '~ts'" ++ ExtraLog, [Dir|ExtraLogArgs]),
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
ok = rabbit_msg_store_gc:stop(GCPid),
%% Terminate the GC first, otherwise it could still be running and we
%% pull the ETS tables out from under it. The GC does not trap exits,
%% so an exit signal terminates it immediately even if it is stuck
%% mid-callback on disk I/O. Bound the wait so terminate stays within
%% the msg_store child's own supervisor shutdown timeout, and fall
%% back to kill if the shutdown signal does not take effect in time.
stop_gc(GCPid, Dir),
State3 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
Expand Down Expand Up @@ -1008,6 +1022,24 @@ code_change(_OldVsn, State, _Extra) ->

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

stop_gc(GCPid, Dir) ->
ShutdownTimeout = rabbit_misc:get_env(
rabbit, msg_store_shutdown_timeout, 600_000),
Timeout = max(ShutdownTimeout - 60_000, 5_000),
MRef = erlang:monitor(process, GCPid),
exit(GCPid, shutdown),
receive
{'DOWN', MRef, process, GCPid, _} -> ok
after Timeout ->
?LOG_WARNING("Message store GC for directory '~ts' did not exit "
"within ~bms of shutdown signal, killing it",
[Dir, Timeout]),
exit(GCPid, kill),
receive
{'DOWN', MRef, process, GCPid, _} -> ok
end
end.

%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
Expand Down
142 changes: 142 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ groups() ->
msg_store,
msg_store_read_many_fanout,
msg_store_file_scan,
msg_store_gc_stuck_suspended,
msg_store_gc_stuck_mid_callback,
{backing_queue_v2, [], Common ++ V2Only}
]}
].
Expand Down Expand Up @@ -718,6 +720,146 @@ msg_store_file_scan1(Config) ->
gen_id() ->
rand:bytes(16).

%% Test that when the GC process is unresponsive during shutdown,
%% the msg_store recovers cleanly because terminate sends the GC an
%% exit signal and proceeds to write recovery files.
msg_store_gc_stuck_suspended(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, msg_store_gc_stuck_suspended1, [Config]).

msg_store_gc_stuck_suspended1(_Config) ->
GenRef = fun() -> make_ref() end,
restart_msg_store_empty(),

%% Write some messages so the store has data to recover.
Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
ok = msg_store_write(MsgIds, MSCState),
ok = rabbit_msg_store:client_terminate(MSCState),

%% Get the msg_store pid and its GC pid.
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
?VHOST, ?PERSISTENT_MSG_STORE),
GCPid = rabbit_msg_store:gc_pid(StorePid),
true = is_process_alive(GCPid),

%% Suspend the GC process so it cannot process messages.
ok = sys:suspend(GCPid),

%% Stop the transient store cleanly first.
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),

%% Terminate the persistent store via the supervisor. The terminate
%% callback sends the GC an exit signal. The GC does not trap exits
%% so it terminates immediately, and terminate proceeds to write
%% recovery files.
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),

%% Delete the child specs so we can restart.
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),

%% Restart the msg_store and check recovery state.
ok = rabbit_variable_queue:start_msg_store(
?VHOST, [Ref], {fun ([]) -> finished end, []}),

%% The store should report a clean recovery because the fix
%% terminates the unresponsive GC and proceeds to write recovery files.
true = rabbit_vhost_msg_store:successfully_recovered_state(
?VHOST, ?PERSISTENT_MSG_STORE),

%% Verify all messages survived the unclean GC shutdown.
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
true = msg_store_contains(true, MsgIds, MSCState2),
ok = rabbit_msg_store:client_terminate(MSCState2),

%% Clean up.
restart_msg_store_empty(),
passed.

%% Test that when the GC process is blocked mid-callback (simulating disk I/O),
%% the msg_store recovers cleanly because terminate sends the GC an exit
%% signal and proceeds to write recovery files.
msg_store_gc_stuck_mid_callback(Config) ->
rabbit_ct_broker_helpers:setup_meck(Config),
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, msg_store_gc_stuck_mid_callback1, [Config]).

msg_store_gc_stuck_mid_callback1(_Config) ->
GenRef = fun() -> make_ref() end,
restart_msg_store_empty(),

%% Write some messages so the store has data to recover.
Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)],
ok = msg_store_write(MsgIds, MSCState),
ok = rabbit_msg_store:client_terminate(MSCState),

%% Get the msg_store pid and its GC pid.
StorePid = rabbit_vhost_msg_store:vhost_store_pid(
?VHOST, ?PERSISTENT_MSG_STORE),
GCPid = rabbit_msg_store:gc_pid(StorePid),
true = is_process_alive(GCPid),

%% Mock compact_file to signal the test process on entry, then block
%% indefinitely, simulating a GC process stuck on disk I/O mid-callback.
TestPid = self(),
ok = meck:new(rabbit_msg_store, [no_link, passthrough]),
ok = meck:expect(rabbit_msg_store, compact_file,
fun(_, _) ->
TestPid ! gc_in_callback,
%% Block forever with no CPU usage, simulating a
%% process stuck waiting on disk I/O that never
%% completes. The GC will be terminated by stop_gc/1.
receive after infinity -> ok end
end),

%% Send a compact cast directly to the GC. It will enter the mocked
%% compact_file, signal us, then block inside the handle_cast callback.
rabbit_msg_store_gc:compact(GCPid, 0),

%% Wait for the GC to confirm it has entered the blocking callback.
receive
gc_in_callback -> ok
after 5000 ->
error(gc_did_not_enter_callback)
end,

%% Stop the transient store cleanly first.
rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE),

%% Terminate the persistent store via the supervisor. The GC is blocked
%% mid-callback but the exit signal sent by terminate preempts the
%% callback because the GC does not trap exits, so terminate proceeds
%% to write recovery files.
{ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST),
ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE),

ok = meck:unload(rabbit_msg_store),

%% Delete the child spec so we can restart.
ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE),

%% Restart the msg_store and check recovery state.
ok = rabbit_variable_queue:start_msg_store(
?VHOST, [Ref], {fun ([]) -> finished end, []}),

%% The store should report a clean recovery because the fix terminates
%% the unresponsive GC and proceeds to write recovery files.
true = rabbit_vhost_msg_store:successfully_recovered_state(
?VHOST, ?PERSISTENT_MSG_STORE),

%% Verify all messages survived the unclean GC shutdown.
MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
true = msg_store_contains(true, MsgIds, MSCState2),
ok = rabbit_msg_store:client_terminate(MSCState2),

%% Clean up.
restart_msg_store_empty(),
passed.

gen_msg() ->
gen_msg(1024 * 1024).

Expand Down
Loading