diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 55fd0bd3b7c..6f48af367ab 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -10,6 +10,7 @@ -behaviour(gen_server2). -export([start_link/5, successfully_recovered_state/1, + gc_pid/1, client_init/3, client_terminate/1, client_delete_and_terminate/1, client_pre_hibernate/1, client_ref/1, write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]). @@ -399,6 +400,11 @@ start_link(VHost, Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). +-spec gc_pid(server()) -> pid(). + +gc_pid(Server) -> + gen_server2:call(Server, gc_pid, infinity). + -spec client_init(server(), client_ref(), maybe_msg_id_fun()) -> client_msstate(). client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) -> @@ -806,6 +812,7 @@ init([VHost, Type, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _Len, _State) -> case Msg of successfully_recovered_state -> 7; + gc_pid -> 7; {new_client_state, _Ref, _Pid, _MODC} -> 7; _ -> 0 end. @@ -826,6 +833,9 @@ prioritise_info(Msg, _Len, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); +handle_call(gc_pid, _From, State) -> + reply(State #msstate.gc_pid, State); + handle_call({new_client_state, CRef, CPid, MsgOnDiskFun}, _From, State = #msstate { dir = Dir, index_ets = IndexEts, @@ -971,9 +981,13 @@ terminate(Reason, State = #msstate { index_ets = IndexEts, _ -> {" with reason ~0p", [Reason]} end, ?LOG_INFO("Stopping message store for directory '~ts'" ++ ExtraLog, [Dir|ExtraLogArgs]), - %% stop the gc first, otherwise it could be working and we pull - %% out the ets tables from under it. - ok = rabbit_msg_store_gc:stop(GCPid), + %% Terminate the GC first, otherwise it could still be running and we + %% pull the ETS tables out from under it. The GC does not trap exits, + %% so an exit signal terminates it immediately even if it is stuck + %% mid-callback on disk I/O. Bound the wait so terminate stays within + %% the msg_store child's own supervisor shutdown timeout, and fall + %% back to kill if the shutdown signal does not take effect in time. + stop_gc(GCPid, Dir), State3 = case CurHdl of undefined -> State; _ -> State2 = internal_sync(State), @@ -1008,6 +1022,24 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +stop_gc(GCPid, Dir) -> + ShutdownTimeout = rabbit_misc:get_env( + rabbit, msg_store_shutdown_timeout, 600_000), + Timeout = max(ShutdownTimeout - 60_000, 5_000), + MRef = erlang:monitor(process, GCPid), + exit(GCPid, shutdown), + receive + {'DOWN', MRef, process, GCPid, _} -> ok + after Timeout -> + ?LOG_WARNING("Message store GC for directory '~ts' did not exit " + "within ~bms of shutdown signal, killing it", + [Dir, Timeout]), + exit(GCPid, kill), + receive + {'DOWN', MRef, process, GCPid, _} -> ok + end + end. + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 28c989bab93..473c0cae0bd 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -64,6 +64,8 @@ groups() -> msg_store, msg_store_read_many_fanout, msg_store_file_scan, + msg_store_gc_stuck_suspended, + msg_store_gc_stuck_mid_callback, {backing_queue_v2, [], Common ++ V2Only} ]} ]. @@ -718,6 +720,146 @@ msg_store_file_scan1(Config) -> gen_id() -> rand:bytes(16). +%% Test that when the GC process is unresponsive during shutdown, +%% the msg_store recovers cleanly because terminate sends the GC an +%% exit signal and proceeds to write recovery files. +msg_store_gc_stuck_suspended(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, msg_store_gc_stuck_suspended1, [Config]). + +msg_store_gc_stuck_suspended1(_Config) -> + GenRef = fun() -> make_ref() end, + restart_msg_store_empty(), + + %% Write some messages so the store has data to recover. + Ref = rabbit_guid:gen(), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)], + ok = msg_store_write(MsgIds, MSCState), + ok = rabbit_msg_store:client_terminate(MSCState), + + %% Get the msg_store pid and its GC pid. + StorePid = rabbit_vhost_msg_store:vhost_store_pid( + ?VHOST, ?PERSISTENT_MSG_STORE), + GCPid = rabbit_msg_store:gc_pid(StorePid), + true = is_process_alive(GCPid), + + %% Suspend the GC process so it cannot process messages. + ok = sys:suspend(GCPid), + + %% Stop the transient store cleanly first. + rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE), + + %% Terminate the persistent store via the supervisor. The terminate + %% callback sends the GC an exit signal. The GC does not trap exits + %% so it terminates immediately, and terminate proceeds to write + %% recovery files. + {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST), + ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE), + + %% Delete the child specs so we can restart. + ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE), + + %% Restart the msg_store and check recovery state. + ok = rabbit_variable_queue:start_msg_store( + ?VHOST, [Ref], {fun ([]) -> finished end, []}), + + %% The store should report a clean recovery because the fix + %% terminates the unresponsive GC and proceeds to write recovery files. + true = rabbit_vhost_msg_store:successfully_recovered_state( + ?VHOST, ?PERSISTENT_MSG_STORE), + + %% Verify all messages survived the unclean GC shutdown. + MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + true = msg_store_contains(true, MsgIds, MSCState2), + ok = rabbit_msg_store:client_terminate(MSCState2), + + %% Clean up. + restart_msg_store_empty(), + passed. + +%% Test that when the GC process is blocked mid-callback (simulating disk I/O), +%% the msg_store recovers cleanly because terminate sends the GC an exit +%% signal and proceeds to write recovery files. +msg_store_gc_stuck_mid_callback(Config) -> + rabbit_ct_broker_helpers:setup_meck(Config), + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, msg_store_gc_stuck_mid_callback1, [Config]). + +msg_store_gc_stuck_mid_callback1(_Config) -> + GenRef = fun() -> make_ref() end, + restart_msg_store_empty(), + + %% Write some messages so the store has data to recover. + Ref = rabbit_guid:gen(), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 50)], + ok = msg_store_write(MsgIds, MSCState), + ok = rabbit_msg_store:client_terminate(MSCState), + + %% Get the msg_store pid and its GC pid. + StorePid = rabbit_vhost_msg_store:vhost_store_pid( + ?VHOST, ?PERSISTENT_MSG_STORE), + GCPid = rabbit_msg_store:gc_pid(StorePid), + true = is_process_alive(GCPid), + + %% Mock compact_file to signal the test process on entry, then block + %% indefinitely, simulating a GC process stuck on disk I/O mid-callback. + TestPid = self(), + ok = meck:new(rabbit_msg_store, [no_link, passthrough]), + ok = meck:expect(rabbit_msg_store, compact_file, + fun(_, _) -> + TestPid ! gc_in_callback, + %% Block forever with no CPU usage, simulating a + %% process stuck waiting on disk I/O that never + %% completes. The GC will be terminated by stop_gc/1. + receive after infinity -> ok end + end), + + %% Send a compact cast directly to the GC. It will enter the mocked + %% compact_file, signal us, then block inside the handle_cast callback. + rabbit_msg_store_gc:compact(GCPid, 0), + + %% Wait for the GC to confirm it has entered the blocking callback. + receive + gc_in_callback -> ok + after 5000 -> + error(gc_did_not_enter_callback) + end, + + %% Stop the transient store cleanly first. + rabbit_vhost_msg_store:stop(?VHOST, ?TRANSIENT_MSG_STORE), + + %% Terminate the persistent store via the supervisor. The GC is blocked + %% mid-callback but the exit signal sent by terminate preempts the + %% callback because the GC does not trap exits, so terminate proceeds + %% to write recovery files. + {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(?VHOST), + ok = supervisor:terminate_child(VHostSup, ?PERSISTENT_MSG_STORE), + + ok = meck:unload(rabbit_msg_store), + + %% Delete the child spec so we can restart. + ok = supervisor:delete_child(VHostSup, ?PERSISTENT_MSG_STORE), + + %% Restart the msg_store and check recovery state. + ok = rabbit_variable_queue:start_msg_store( + ?VHOST, [Ref], {fun ([]) -> finished end, []}), + + %% The store should report a clean recovery because the fix terminates + %% the unresponsive GC and proceeds to write recovery files. + true = rabbit_vhost_msg_store:successfully_recovered_state( + ?VHOST, ?PERSISTENT_MSG_STORE), + + %% Verify all messages survived the unclean GC shutdown. + MSCState2 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + true = msg_store_contains(true, MsgIds, MSCState2), + ok = rabbit_msg_store:client_terminate(MSCState2), + + %% Clean up. + restart_msg_store_empty(), + passed. + gen_msg() -> gen_msg(1024 * 1024).