Skip to content

Fix separated external source prefetch drain#6397

Open
JanuszL wants to merge 6 commits into
NVIDIA:mainfrom
JanuszL:fix-separated-external-source-prefetch
Open

Fix separated external source prefetch drain#6397
JanuszL wants to merge 6 commits into
NVIDIA:mainfrom
JanuszL:fix-separated-external-source-prefetch

Conversation

@JanuszL

@JanuszL JanuszL commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Category:

Bug fix

Description:

This PR fixes a hang at the end of an epoch when a Python-managed
external_source is used with separated CPU/GPU prefetch queues.

The newer prefetch path fed all inputs before running the backend. For
separated execution this could leave CPU-prefetched external source batches
without scheduled Mixed/GPU work once the source reached end of epoch. The
consumer then waited indefinitely for output indexes in the separated queue
policy.

The fix keeps prefetching interleaved with backend runs, so input feeding and
backend scheduling stay aligned at epoch boundaries.

Related to #5199

Additional information:

Affected modules and functionalities:

  • nvidia.dali.Pipeline prefetch scheduling in the legacy executor path.
  • Python regression coverage for external_source(batch=True, cycle="raise")
    with mixed image decoding and separated prefetch queues.

Key points relevant for the review:

Review whether routing prefetch through the interleaved path is acceptable for
legacy separated execution. This restores the behavior that avoids a CPU-only
tail when input callbacks reach end of epoch.

Tests:

  • Existing tests apply
  • New tests added
    • Python tests
      • test_pipeline.py:test_separated_queue_external_source_drains_prefetched_batches
    • GTests
    • Benchmark
    • Other
  • N/A

Checklist

Documentation

  • Existing documentation applies
  • Documentation updated
    • Docstring
    • Doxygen
    • RST
    • Jupyter
    • Other
  • N/A

DALI team only

Requirements

  • Implements new requirements
  • Affects existing requirements
  • N/A

REQ IDs: N/A

JIRA TASK: N/A

@JanuszL JanuszL force-pushed the fix-separated-external-source-prefetch branch from a17716a to d972238 Compare June 16, 2026 15:33
@JanuszL

JanuszL commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

!build

@JanuszL JanuszL requested a review from mzient June 16, 2026 15:37
@greptile-apps

greptile-apps Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a hang at end-of-epoch for Python external_source pipelines with separated CPU/GPU prefetch queues. The root cause was that the old separated path fed all inputs first and then called _pipe.Prefetch() — if the source raised StopIteration before all batches were fed, some CPU-prefetched batches had no corresponding Mixed/GPU work scheduled, leaving the separated queue consumer waiting forever.

  • Python (pipeline.py): _prefetch() now unconditionally calls _legacy_interleaved_prefetch() for all execution modes. For separated execution the loop runs max(cpu_size, gpu_size) iterations, interleaving one input feed with one _pipe.Run() call so feeding and backend scheduling always stay in lockstep. The _prefetch_inputs() helper and the is_prefetch parameter on _run_input_callbacks() are removed.
  • C++ (async_separated_pipelined_executor.cc): Prefetch() is updated to run gpu_size full pipeline passes plus max(0, cpu_size − gpu_size) CPU-only passes (total = max(cpu_size, gpu_size)), and InputFeedCount() is updated to return max(cpu_size, gpu_size) to match — keeping the C++ method consistent even though Python no longer calls Prefetch() directly.
  • Test: New @params-driven test covers (2,2), (3,2), and (2,3) queue depth combinations, exercising both cpu_size > gpu_size and cpu_size < gpu_size, and verifies non-zero decoded RGB images plus StopIteration propagation at epoch end.

Confidence Score: 5/5

Safe to merge; the interleaved prefetch path restores correct epoch-boundary behaviour for separated queues without affecting the non-separated path.

The fix is narrow and well-reasoned: removing the bulk-feed-then-Prefetch() pattern for separated execution and replacing it with the existing interleaved loop eliminates the race between input feeding and backend scheduling at epoch end. The C++ Prefetch() and InputFeedCount() changes for the async executor are internally consistent. Non-separated pipelines are unaffected (same loop count as before). The only open question is whether the non-async SeparatedPipelinedExecutor base class needs a matching InputFeedCount update, which appears pre-existing and outside the Python-facing path.

dali/pipeline/executor/async_separated_pipelined_executor.cc — confirm whether SeparatedPipelinedExecutor (non-async) also needs InputFeedCount updated.

Important Files Changed

Filename Overview
dali/pipeline/executor/async_separated_pipelined_executor.cc Prefetch() now runs gpu_size full pipelines + max(0, cpu_size-gpu_size) CPU-only passes, and InputFeedCount returns max(cpu_size, gpu_size) — internally consistent; SeparatedPipelinedExecutor base class InputFeedCount (cpu_size+gpu_size) is not updated, a pre-existing divergence.
dali/python/nvidia/dali/pipeline.py _prefetch() now unconditionally calls _legacy_interleaved_prefetch() for both separated and non-separated paths; the old _prefetch_inputs()/_run_input_callbacks(is_prefetch) machinery is cleanly removed; _run_input_callbacks simplified to always feed 1 batch; loop count in _legacy_interleaved_prefetch widened to max(cpu,gpu) for separated execution.
dali/test/python/test_pipeline.py New parametric test covers (2,2), (3,2), (2,3) queue depth combinations and verifies decoded RGB images are non-zero; checks StopIteration after epoch ends; content validation is present (np.any check) but pixel-value assertions remain shallow.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Py as Python _prefetch()
    participant LIP as _legacy_interleaved_prefetch
    participant IC as _iter_setup / _run_input_callbacks
    participant ES as ExternalSource (Python)
    participant Run as _pipe.Run() [C++ async]

    Note over Py,Run: New unified path (separated & non-separated)
    Py->>LIP: always (was separated-only before)
    loop max(cpu_size, gpu_size) times [separated] or cpu_size [non-sep]
        LIP->>IC: _iter_setup()
        IC->>ES: feed 1 batch
        ES-->>IC: batch fed (or StopIteration → break)
        LIP->>Run: _pipe.Run()  [RunCPU+RunMixed+RunGPU async]
    end

    Note over Py,Run: Old separated path (removed)
    Note over Py,Run: _prefetch_inputs() fed ALL batches first,<br/>then _pipe.Prefetch() — caused hang when<br/>source reached EoE before GPU work was scheduled
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Py as Python _prefetch()
    participant LIP as _legacy_interleaved_prefetch
    participant IC as _iter_setup / _run_input_callbacks
    participant ES as ExternalSource (Python)
    participant Run as _pipe.Run() [C++ async]

    Note over Py,Run: New unified path (separated & non-separated)
    Py->>LIP: always (was separated-only before)
    loop max(cpu_size, gpu_size) times [separated] or cpu_size [non-sep]
        LIP->>IC: _iter_setup()
        IC->>ES: feed 1 batch
        ES-->>IC: batch fed (or StopIteration → break)
        LIP->>Run: _pipe.Run()  [RunCPU+RunMixed+RunGPU async]
    end

    Note over Py,Run: Old separated path (removed)
    Note over Py,Run: _prefetch_inputs() fed ALL batches first,<br/>then _pipe.Prefetch() — caused hang when<br/>source reached EoE before GPU work was scheduled
Loading

Reviews (22): Last reviewed commit: "Use interleaved separated Python prefetc..." | Re-trigger Greptile

Comment thread dali/test/python/test_pipeline.py Outdated
@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55012969]: BUILD STARTED

@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55012969]: BUILD PASSED

@JanuszL JanuszL force-pushed the fix-separated-external-source-prefetch branch from d972238 to 82c4432 Compare June 17, 2026 08:37
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

@JanuszL JanuszL force-pushed the fix-separated-external-source-prefetch branch from 82c4432 to fe2b252 Compare June 17, 2026 08:53
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

@JanuszL JanuszL force-pushed the fix-separated-external-source-prefetch branch from fe2b252 to 3c8286e Compare June 17, 2026 09:12
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

1 similar comment
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

Keep pipeline prefetching interleaved with backend runs so separated execution does not leave CPU-prefetched external source batches without scheduled Mixed/GPU work at end of epoch. Prime separated execution for the maximum of CPU and GPU queue depths to avoid underfilling asymmetric queue configurations.

Add a regression that drains a batch external source through mixed image decoding with symmetric and asymmetric separated CPU/GPU prefetch queues.

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL JanuszL force-pushed the fix-separated-external-source-prefetch branch from 3c8286e to 1276420 Compare June 17, 2026 09:20
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

1 similar comment
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

!build

@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55097813]: BUILD STARTED

Comment thread dali/python/nvidia/dali/pipeline.py
@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55097813]: BUILD FAILED

@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

Make async separated executor prefetch and InputFeedCount use the same maximum queue-depth contract. Keep the Python separated prefetch path for drainable queue shapes, but use interleaved prefetch when the CPU queue is longer than the GPU queue so end-of-epoch Python sources do not leave CPU-only work without scheduled Mixed/GPU stages.

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL JanuszL force-pushed the fix-separated-external-source-prefetch branch from a0755d2 to 6fa12d5 Compare June 17, 2026 13:26
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

!build

@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55110109]: BUILD STARTED

Comment thread dali/python/nvidia/dali/pipeline.py Outdated
Route separated Python prefetch through the interleaved path for all queue shapes. The bulk _prefetch_inputs path could call _run_input_callbacks with a stale argument and underfeed the backend Prefetch schedule for Python external sources, causing either a TypeError or a hang at epoch end.

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

1 similar comment
@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

@greptile review

@JanuszL

JanuszL commented Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

!build

@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55116811]: BUILD STARTED

@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55110109]: BUILD FAILED

@dali-automaton

Copy link
Copy Markdown
Collaborator

CI MESSAGE: [55116811]: BUILD FAILED

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants