From 7c2f75e1e2428e84881780d4a8a5f3a5af550f5d Mon Sep 17 00:00:00 2001 From: Raymond Li Date: Tue, 12 May 2026 15:30:10 -0400 Subject: [PATCH 1/2] fix(actor): use training_done event under Fast-LLM, not sample-counting heuristic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The actor decides when training is done via `is_trainer_finished()`. On the legacy HF/DeepSpeed trainer path, the check is: samples_target = max_train_steps * train_batch_size * gradient_accumulation_passes samples_processed >= samples_target This is correct for that path because `gradient_accumulation_passes` counts the microbatches per optimizer step, so the product is samples per step. Under Fast-LLM (`use_fast_llm=True`), the trainer uses its own `schedule.docs_per_step` knob instead, and `gradient_accumulation_passes` becomes vestigial — it is not propagated to the Fast-LLM trainer. Worse, Fast-LLM's `_prefetch_to_doc_target` always overshoots `docs_per_step` by a few documents per step (the loop runs `while total_docs < target` and stops just after crossing it). The accumulated overshoot, combined with the unrelated `gradient_accumulation_passes` value (which auto-adjusts to be divisible by the trainer-rank count, e.g. 1024 -> 1026 for finetune_fraction=6), makes `samples_target` finish hundreds of documents short of the trainer's true 400-step consumption. Concrete reproduction on a single 8-GPU node with the math gspo recipe (`max_train_steps=400`, `gradient_accumulation_passes=1026`, `docs_per_step=1024`, `train_iters=400`): actor declares completion at samples_processed=410,400 while the Fast-LLM trainer is only at step ~393/400, stops feeding redis, the trainer's `RedisStreamingDataset` times out after 600s with `No document received`, vLLM logs `[FastLLM] training_finished was not received; forcing stop`, and the job dies with `EngineDeadError`. Fast-LLM already publishes an explicit `{"type": "training_finished"}` event to the `fast_llm_events` redis stream at the natural end of training (`fast_llm/engine/training/streaming.py:train_end`), and PipelineRL already listens for it and sets `TrainerState.training_done = True` (`pipelinerl/state.py:112-115`). This commit makes `is_trainer_finished()` — and its inline mirror in `ActorLoop.run()` at line ~614 — consult that flag when `use_fast_llm=True`, while preserving the sample-counting heuristic for the HF/DeepSpeed path. No race, no overshoot accounting, no implicit dependence on `gradient_accumulation_passes`. Co-Authored-By: Claude Opus 4.7 (1M context) --- pipelinerl/actor.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 1a41020a..d077b641 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -168,6 +168,21 @@ async def schedule_rollouts( retry_max_delay_s = float(getattr(cfg.actor, "rollout_retry_max_delay_s", 30.0)) def is_trainer_finished() -> bool: + # For the Fast-LLM trainer path, the explicit `training_finished` event the + # trainer publishes to the `fast_llm_events` redis stream (tracked here as + # `trainer_state.training_done`) is the canonical completion signal. + # The legacy `max_train_steps × train_batch_size × gradient_accumulation_passes` + # formula below is for the HF/DeepSpeed path, where `gradient_accumulation_passes` + # counts microbatches per optimizer step. Under Fast-LLM, the trainer uses + # `schedule.docs_per_step` instead, and `_prefetch_to_doc_target` always + # overshoots that target by a few documents per step (the loop runs while + # `total_docs < target` and stops just after crossing it). The sample-counting + # heuristic therefore fires several optimizer steps early — e.g., on a single + # 8-GPU node with max_train_steps=400, the actor declares completion at + # samples_processed=410,400 while the trainer is only at step ~393/400, stops + # feeding redis, and the trainer eventually times out waiting for documents. + if cfg.use_fast_llm: + return trainer_state.training_done return ( trainer_state.samples_processed is not None and trainer_state.samples_processed >= samples_target @@ -609,9 +624,18 @@ def _run(self, dataset: list[tuple[str, dict]]): # the user function must do next(...) to run each iteration yield - final_steps = calculate_train_steps(self.cfg.finetune, self.cfg.finetune.interrupt_train_steps) - samples_target = final_steps * self.cfg.finetune.train_batch_size * self.cfg.finetune.gradient_accumulation_passes - if self.trainer_state.samples_processed is not None and self.trainer_state.samples_processed >= samples_target: + # Mirror `is_trainer_finished` (above): use the explicit training_done + # event under Fast-LLM; fall back to sample counting for HF/DeepSpeed. + if self.cfg.use_fast_llm: + trainer_finished = self.trainer_state.training_done + else: + final_steps = calculate_train_steps(self.cfg.finetune, self.cfg.finetune.interrupt_train_steps) + samples_target = final_steps * self.cfg.finetune.train_batch_size * self.cfg.finetune.gradient_accumulation_passes + trainer_finished = ( + self.trainer_state.samples_processed is not None + and self.trainer_state.samples_processed >= samples_target + ) + if trainer_finished: logger.info("Trainer signalled completion; stopping actor loop") break From 9ab0e53b78e9ef857c455de8fc0bc1c90c1cc2a7 Mon Sep 17 00:00:00 2001 From: Raymond Li Date: Thu, 14 May 2026 10:42:28 -0400 Subject: [PATCH 2/2] review: trim verbose comment on is_trainer_finished The 14-line block duplicated the PR description. Keep the non-obvious why (Fast-LLM ignores gradient_accumulation_passes and overshoots docs_per_step); the symptom numbers live in the PR description. Co-Authored-By: Claude Opus 4.7 (1M context) --- pipelinerl/actor.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index d077b641..1fec56f5 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -168,19 +168,9 @@ async def schedule_rollouts( retry_max_delay_s = float(getattr(cfg.actor, "rollout_retry_max_delay_s", 30.0)) def is_trainer_finished() -> bool: - # For the Fast-LLM trainer path, the explicit `training_finished` event the - # trainer publishes to the `fast_llm_events` redis stream (tracked here as - # `trainer_state.training_done`) is the canonical completion signal. - # The legacy `max_train_steps × train_batch_size × gradient_accumulation_passes` - # formula below is for the HF/DeepSpeed path, where `gradient_accumulation_passes` - # counts microbatches per optimizer step. Under Fast-LLM, the trainer uses - # `schedule.docs_per_step` instead, and `_prefetch_to_doc_target` always - # overshoots that target by a few documents per step (the loop runs while - # `total_docs < target` and stops just after crossing it). The sample-counting - # heuristic therefore fires several optimizer steps early — e.g., on a single - # 8-GPU node with max_train_steps=400, the actor declares completion at - # samples_processed=410,400 while the trainer is only at step ~393/400, stops - # feeding redis, and the trainer eventually times out waiting for documents. + # Fast-LLM ignores `gradient_accumulation_passes` and overshoots `docs_per_step` + # by a few docs per step, so the sample-counting formula below fires several + # optimizer steps early. Use the explicit `training_finished` event instead. if cfg.use_fast_llm: return trainer_state.training_done return (