QQ DLX: use a pipeline command for checkout#15548
Conversation
07f2161 to
c59ef3c
Compare
|
Relevant logs of the flake: |
| handle_cast({queue_event, QRef, Evt}, | ||
| #state{dlx_client_state = DlxState0, | ||
| queue_type_state = QTypeState0} = State0) -> | ||
| case maybe_handle_dlx_event(Evt, DlxState0) of | ||
| {ok, DlxState, Actions} -> | ||
| State1 = State0#state{dlx_client_state = DlxState}, | ||
| State = handle_queue_actions(Actions, State1), | ||
| {noreply, State}; | ||
| {reject, _DlxState} -> | ||
| {stop, {shutdown, not_leader}, State0}; | ||
| ignore -> | ||
| case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of | ||
| {ok, QTypeState1, Actions} -> | ||
| State1 = State0#state{queue_type_state = QTypeState1}, | ||
| State = handle_queue_actions(Actions, State1), | ||
| {noreply, State}; | ||
| {eol, Actions} -> | ||
| State = handle_queue_actions(Actions, State0), | ||
| remove_queue(QRef, State); | ||
| {protocol_error, _Type, _Reason, _Args} -> | ||
| {noreply, State0} | ||
| end | ||
| end; |
There was a problem hiding this comment.
Can we simplify this?
Conceptually after the worker sent its initial {dlx, #checkout{}} message with a correlation, it's in a separate state registering. The only message it should receive in this state is a reply with that given correlation. Maybe we can make this more apparent by introducing a rabbit_fifo_dlx_client:handle_registration function? If successful, the state changes to registered. Only thereafter will the worker handle other messages including dlx deliveries from its source queue and publisher confirms from target queues.
There was a problem hiding this comment.
I've refactored the code. Please take a look if that's what you had in mind
There was a problem hiding this comment.
Thank you. The code is much simpler now.
The only message it should receive in this state is a reply with that given correlation.
@kjnilsson gave me a hint that I was wrong on this one. Apparently, the applied notifications in Ra (including that the dlx checkout command was applied) are delivered after Ra effects (which may already include dlx deliveries). So, we have 3 options I suppose:
- Stash any early (pre-registartion) dlx deliveries that the dlx worker proc receives in its state, or
- Send the dlx worker checkout command for pre evaluation to the aux machine (similar as done in ) The leader will then only append this Ra command to its log if the command was sent from a local dlx worker, or
rabbitmq-server/deps/rabbit/src/rabbit_fifo_client.erl
Lines 981 to 982 in 5388ef9
- Modify Ra to have an option so that ra:process_command() won't redirect to the leader.
What do you think? Which approach is the simplest and/or safest?
Before this change, with two leader elections in short succession, we could end up in a situation where no DLX worker is running. single_dlx_worker test appeared to be flaky, but it actually triggered a real race condition: A node that briefly was a leader, started a DLX worker which registered itself as a consumer via ra:process_command. When that node loses leadership a moment later, process_command on the now-follower Ra server redirects the checkout command to the actual leader, where it gets committed to the Raft log. This overwrites the new/real leader's worker registration, terminating the leader's worker. The result is that the leader has no DLX worker: the old leader's worker is stopped to allow the new leader to take over, but the new leader's worker exits due to a stale checkout.
22f4b07 to
e0443bd
Compare
|
Closing in favor of #16203 |
This commit supersedes #15548. ## What? Fix the following genuine CI flake: ``` make -C deps/rabbit ct-rabbit_fifo_dlx_integration t=cluster_size_3:single_dlx_worker ``` Sometimes this test case failed with the logs showing the following: ```text 2026-02-24 09:06:19.413770+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': vote granted for term 3 votes 2 2026-02-24 09:06:19.414048+00:00 [debug] <0.2377.0> started rabbit_fifo_dlx_worker <0.2600.0> for queue 'single_dlx_worker_source' in vhost '/' 2026-02-24 09:06:19.414096+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': candidate -> leader in term: 3 machine version: 7, last applied 5 2026-02-24 09:06:19.414388+00:00 [debug] <0.2602.0> queue 'single_dlx_worker_source' in vhost '/': updating leader record to current node rmq-ct-cluster_size_3-1-28000@localhost 2026-02-24 09:06:19.414279+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader saw request_vote_rpc from {'%2F_single�[118;1:3u_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4 abdicates term: 3! 2026-02-24 09:06:19.417479+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader -> follower in term: 4 machine version: 7, last applied 5 2026-02-24 09:06:19.417533+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': is not new, setting election timeout. 2026-02-24 09:06:19.417740+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': declining vote for {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4, candidate last log {index, term} was: {5,2} last log entry {index, term} is: {{6,3}} 2026-02-24 09:06:19.417824+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader call - leader not known. Command will be forwarded once leader is known. 2026-02-24 09:06:19.418190+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/' declining pre-vote to {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-2-28072@localhost'} for term 3, current term 4 2026-02-24 09:06:19.428043+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': resetting last index to 5 from 6 in term 4 2026-02-24 09:06:19.428157+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': detected a new leader {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} in term 4 2026-02-24 09:06:19.428280+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': mem table overwriting detected whilst staging entries, opening new mem table 2026-02-24 09:06:19.436299+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': enabling ra cluster changes in 4, index 6 2026-02-24 09:06:19.436411+00:00 [debug] <0.2377.0> Terminating <31028.1894.0> since <31122.2516.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437003+00:00 [debug] <0.2377.0> Terminating <31122.2516.0> since <0.2600.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> Failed to process command {dlx,{checkout,<0.2600.0>,2}} on quorum queue leader {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-1-28000@localhost'} because actual leader is {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-3-28144@localhost'}. ``` ## How? This commit supersedes #15548. In this commit, we use `ra:pipeline_command/4` with a selective receive instead of `ra:process_command/3` for the dlx checkout command. This prevents Ra from automatically redirecting the checkout command to a new leader if a failover happens while the command is being processed.
This commit supersedes #15548. ## What? Fix the following genuine CI flake: ``` make -C deps/rabbit ct-rabbit_fifo_dlx_integration t=cluster_size_3:single_dlx_worker ``` Sometimes this test case failed with the logs showing the following: ```text 2026-02-24 09:06:19.413770+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': vote granted for term 3 votes 2 2026-02-24 09:06:19.414048+00:00 [debug] <0.2377.0> started rabbit_fifo_dlx_worker <0.2600.0> for queue 'single_dlx_worker_source' in vhost '/' 2026-02-24 09:06:19.414096+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': candidate -> leader in term: 3 machine version: 7, last applied 5 2026-02-24 09:06:19.414388+00:00 [debug] <0.2602.0> queue 'single_dlx_worker_source' in vhost '/': updating leader record to current node rmq-ct-cluster_size_3-1-28000@localhost 2026-02-24 09:06:19.414279+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader saw request_vote_rpc from {'%2F_single�[118;1:3u_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4 abdicates term: 3! 2026-02-24 09:06:19.417479+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader -> follower in term: 4 machine version: 7, last applied 5 2026-02-24 09:06:19.417533+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': is not new, setting election timeout. 2026-02-24 09:06:19.417740+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': declining vote for {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4, candidate last log {index, term} was: {5,2} last log entry {index, term} is: {{6,3}} 2026-02-24 09:06:19.417824+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader call - leader not known. Command will be forwarded once leader is known. 2026-02-24 09:06:19.418190+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/' declining pre-vote to {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-2-28072@localhost'} for term 3, current term 4 2026-02-24 09:06:19.428043+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': resetting last index to 5 from 6 in term 4 2026-02-24 09:06:19.428157+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': detected a new leader {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} in term 4 2026-02-24 09:06:19.428280+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': mem table overwriting detected whilst staging entries, opening new mem table 2026-02-24 09:06:19.436299+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': enabling ra cluster changes in 4, index 6 2026-02-24 09:06:19.436411+00:00 [debug] <0.2377.0> Terminating <31028.1894.0> since <31122.2516.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437003+00:00 [debug] <0.2377.0> Terminating <31122.2516.0> since <0.2600.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> Failed to process command {dlx,{checkout,<0.2600.0>,2}} on quorum queue leader {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-1-28000@localhost'} because actual leader is {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-3-28144@localhost'}. ``` ## How? This commit supersedes #15548. In this commit, we use `ra:pipeline_command/4` with a selective receive instead of `ra:process_command/3` for the dlx checkout command. This prevents Ra from automatically redirecting the checkout command to a new leader if a failover happens while the command is being processed. (cherry picked from commit 7768f99)
This commit supersedes #15548. ## What? Fix the following genuine CI flake: ``` make -C deps/rabbit ct-rabbit_fifo_dlx_integration t=cluster_size_3:single_dlx_worker ``` Sometimes this test case failed with the logs showing the following: ```text 2026-02-24 09:06:19.413770+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': vote granted for term 3 votes 2 2026-02-24 09:06:19.414048+00:00 [debug] <0.2377.0> started rabbit_fifo_dlx_worker <0.2600.0> for queue 'single_dlx_worker_source' in vhost '/' 2026-02-24 09:06:19.414096+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': candidate -> leader in term: 3 machine version: 7, last applied 5 2026-02-24 09:06:19.414388+00:00 [debug] <0.2602.0> queue 'single_dlx_worker_source' in vhost '/': updating leader record to current node rmq-ct-cluster_size_3-1-28000@localhost 2026-02-24 09:06:19.414279+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader saw request_vote_rpc from {'%2F_single�[118;1:3u_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4 abdicates term: 3! 2026-02-24 09:06:19.417479+00:00 [notice] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader -> follower in term: 4 machine version: 7, last applied 5 2026-02-24 09:06:19.417533+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': is not new, setting election timeout. 2026-02-24 09:06:19.417740+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': declining vote for {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} for term 4, candidate last log {index, term} was: {5,2} last log entry {index, term} is: {{6,3}} 2026-02-24 09:06:19.417824+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': leader call - leader not known. Command will be forwarded once leader is known. 2026-02-24 09:06:19.418190+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/' declining pre-vote to {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-2-28072@localhost'} for term 3, current term 4 2026-02-24 09:06:19.428043+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': resetting last index to 5 from 6 in term 4 2026-02-24 09:06:19.428157+00:00 [info] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': detected a new leader {'%2F_single_dlx_worker_source','rmq-ct-cluster_size_3-3-28144@localhost'} in term 4 2026-02-24 09:06:19.428280+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': mem table overwriting detected whilst staging entries, opening new mem table 2026-02-24 09:06:19.436299+00:00 [debug] <0.2377.0> queue 'single_dlx_worker_source' in vhost '/': enabling ra cluster changes in 4, index 6 2026-02-24 09:06:19.436411+00:00 [debug] <0.2377.0> Terminating <31028.1894.0> since <31122.2516.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437003+00:00 [debug] <0.2377.0> Terminating <31122.2516.0> since <0.2600.0> becomes active rabbit_fifo_dlx_worker 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> Failed to process command {dlx,{checkout,<0.2600.0>,2}} on quorum queue leader {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-1-28000@localhost'} because actual leader is {'%2F_single_dlx_worker_source', 2026-02-24 09:06:19.437107+00:00 [warning] <0.2600.0> 'rmq-ct-cluster_size_3-3-28144@localhost'}. ``` ## How? This commit supersedes #15548. In this commit, we use `ra:pipeline_command/4` with a selective receive instead of `ra:process_command/3` for the dlx checkout command. This prevents Ra from automatically redirecting the checkout command to a new leader if a failover happens while the command is being processed. (cherry picked from commit 7768f99) (cherry picked from commit 4495252) # Conflicts: # deps/rabbit/src/rabbit_fifo_dlx_client.erl
Before this change, with two leader elections in short succession, we could end up in a situation where no DLX worker is running.
single_dlx_worker test appeared to be flaky, but it actually triggered a real race condition:
A node that briefly was a leader, started a DLX worker which registered itself as a consumer via ra:process_command.
When that node loses leadership a moment later, process_command on the now-follower Ra server redirects the checkout command to the actual leader, where it gets committed to the Raft log. This overwrites the new/real leader's worker registration, terminating the leader's worker.
The result is that the leader has no DLX worker: the old leader's worker is stopped to allow the new leader to take over, but the new leader's worker exits due to a stale checkout.
This PR should address this failure:
https://github.com/rabbitmq/rabbitmq-server/actions/runs/22343794026/job/64653112641?pr=15502