Skip to content

feat(worker): ingest_callback contract on Backend (INT-386)#415

Draft
ldrozdz93 wants to merge 6 commits into
developfrom
int-386-worker-ingest-callback
Draft

feat(worker): ingest_callback contract on Backend (INT-386)#415
ldrozdz93 wants to merge 6 commits into
developfrom
int-386-worker-ingest-callback

Conversation

@ldrozdz93

@ldrozdz93 ldrozdz93 commented May 19, 2026

Copy link
Copy Markdown
Contributor

Context

The ingest_callback primitive added here is required for the
API-triggered sync HTTP endpoint
in controller-integrations to work — that endpoint pushes entities
outside the cron run() cycle and needs a callback to reach the
worker's Diode pipeline.

Summary

Extend worker.backend.Backend so that:

  • __init__ accepts an ingest_callback keyword argument (optional,
    keyword-only, plus open **kwargs forward-compat door).
  • A new worker.exceptions module exposes IngestError,
    IngestUnavailable, IngestRejected — raised by the ingest
    callback on pipeline failures.
  • PolicyRunner.setup builds a per-policy ingest-callback closure and
    passes it to the backend constructor. The closure chunks+ingests
    entities mirroring run(), records each off-schedule emission as a
    pseudo-run in RunStore, and translates response/transport errors
    into the new exception subclasses.

PolicyRunner.run() is unchanged and remains single-shot per cron tick.

This unblocks API-triggered sync (INT-312, INT-386) and any future
integration pattern that needs to emit entities outside of the
single-shot run() cycle.

Design

Design doc:
worker-backend-ingest-callback.md
in netboxlabs/controller-integrations.

Compat

  • Backwards-compatible. Older integrations (zero-arg __init__ or
    custom __init__ with **kwargs) continue working unchanged
    thanks to _construct_backend's inspect.signature introspection.
  • Two integrations in controller-integrations define
    __init__(self) -> None and need a mechanical **kwargs edit
    before this minor reaches production: nbl-msft-dhcp,
    nbl-proxmox-ve. Follow-up ticket in that repo tracks the bump
    — does NOT block this PR (tests here mock the backend class).

Test plan

  • worker/tests/test_exceptions.py — hierarchy (IngestError /
    IngestUnavailable / IngestRejected) catchable as base; messages
    preserved.
  • worker/tests/test_backend.pyingest_callback stored on the
    instance; zero-arg construction still works; unknown kwargs absorbed.
  • worker/tests/policy/test_runner.pysetup passes
    ingest_callback to backend; callback happy path; error= path;
    TypeError when both or neither supplied; transport errors →
    IngestUnavailable; response errors → IngestRejected; large
    payloads chunked; the existing run() path remains unaffected.

Locally: cd worker && pip install -e ".[test]" && pytest → 109
passed, 0 failures.

Judgement calls

  • _build_ingest_callback lives on PolicyRunner (not a standalone
    helper module) because the closure must capture self.run_store,
    self.metadata, and the diode client. Splitting it out adds
    plumbing without testability benefit — tests extract the closure
    via mock_backend_class.call_args.kwargs["ingest_callback"] and
    exercise it directly.
  • Closure reads self._diode_client lazily so it can be built BEFORE
    the diode client (which needs metadata from backend.setup(),
    which needs the constructed backend). Solves the chicken-and-egg
    between "construct backend with callback" and "callback needs
    client".
  • Error-mapping policy: response.errors non-empty → IngestRejected;
    any other transport exception → IngestUnavailable. Coarse but
    useful first cut; refinements remain additive.
  • run_id kwarg from the integration is NOT honoured in v1 — the
    closure always creates a fresh pseudo-run. Documented as future
    open-kwarg evolution in the design doc.

Refs INT-386, INT-312.

ldrozdz93 and others added 2 commits May 19, 2026 14:51
Introduces IngestError / IngestUnavailable / IngestRejected to be raised
by the worker-supplied ingest callback when pipeline writes fail. The
hierarchy is the contract used by integrations to differentiate transient
vs. permanent failures.

Refs INT-386, INT-312
See: https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends Backend.__init__ with keyword-only ingest_callback, policy, plus
open **kwargs (forward-compat door). PolicyRunner.setup now builds a
per-policy ingest callback closure and passes both to backend
construction. The closure chunks+ingests entities (mirroring run()),
tracks each off-schedule emission as a pseudo-run in RunStore, and
translates response/transport errors into the new IngestError subclasses.

run() is unchanged and remains single-shot per cron tick.

Refs INT-386, INT-312
See:
https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md
https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/policy-distribution-refinement.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown

Coverage

Coverage Report
FileStmtsMissCoverMissing
worker
   entity_metadata.py13192%23
   main.py63494%206–207, 217, 223
   metrics.py52198%102
   package_finder.py891188%24–25, 48, 96–97, 132, 145–146, 159, 183–184
   server.py81495%34–36, 167, 170
   version.py7186%14
worker/policy
   manager.py49394%35–36, 52
   runner.py145199%168
TOTAL6262696% 

Tests Skipped Failures Errors Time
118 0 💤 0 ❌ 0 🔥 3.443s ⏱️

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the worker backend interface to support “off-schedule” entity ingestion via a construction-time ingest_callback, and introduces a dedicated exception hierarchy for integrations to signal ingest failures.

Changes:

  • Add ingest_callback and policy keyword-only parameters (plus forward-compatible **kwargs) to worker.backend.Backend.__init__.
  • Introduce worker.exceptions with IngestError, IngestUnavailable, and IngestRejected for callback-facing error signaling.
  • Update PolicyRunner.setup() to build and pass an ingest-callback closure that chunks/ingests entities, records pseudo-runs in RunStore, and maps transport/response failures into the new exceptions.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
worker/worker/policy/runner.py Builds and passes an ingest-callback closure during setup; adds ingest callback implementation and error mapping.
worker/worker/exceptions.py Adds ingest-callback exception hierarchy for integrations.
worker/worker/backend.py Extends Backend constructor to accept ingest_callback, policy, and absorb future kwargs.
worker/tests/test_exceptions.py Adds unit tests for exception inheritance and message preservation.
worker/tests/test_backend.py Adds unit tests for new backend constructor behavior and backward compatibility.
worker/tests/policy/test_runner.py Adds tests verifying setup passes kwargs and callback behavior (happy path, error path, chunking, error translation).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/worker/policy/runner.py Outdated
Comment thread worker/worker/policy/runner.py Outdated
Comment thread worker/worker/policy/runner.py Outdated
Comment thread worker/worker/policy/runner.py Outdated
- Bind IngestError in `except ... as exc` instead of sys.exc_info() and
  drop the now-unused `import sys`.
- Log unexpected exceptions via `logger.exception` before translating
  them to IngestUnavailable, so a worker-side programming bug isn't
  silently masked as a transient failure.
- Annotate the closure signature (`error: Exception | None = None`,
  `-> None`) and rename `**kw` → `**kwargs` to match the docstring and
  Backend.__init__ contract. Comment the forward-compat door.
- Guard against an integration calling the callback before the diode
  client is initialised (e.g. from inside `Backend.setup()`): record a
  FAILED pseudo-run and raise IngestUnavailable with a clear message
  instead of letting an AttributeError surface as an unexpected
  exception.
- Extract `_send_entities` so `_build_ingest_callback` stays under the
  C901 complexity limit.
- Add a unit test for the new guard.

Refs INT-386, INT-312

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@leoparente

Copy link
Copy Markdown
Contributor

@codex review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7e962a3264

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread worker/worker/policy/runner.py
Comment thread worker/worker/policy/runner.py Outdated
@ldrozdz93 ldrozdz93 marked this pull request as draft May 19, 2026 13:23
@ldrozdz93

Copy link
Copy Markdown
Contributor Author

@leoparente @jajeffries sorry for the noise - I undrafted by mistake. Will let you know once the PR is proper quality

ldrozdz93 and others added 3 commits May 20, 2026 10:06
…k helper

Addresses 6 reviewer comments on PR #415:

- Init-order race (Copilot/Codex/jajeffries): the closure dereferences
  `self.run_store`, `self.metadata`, `self._diode_client` — none of
  which are attached until after `backend.setup()` returns. Replaces
  the partial `_diode_client is None` guard with a single
  `self._callback_ready` flag, flipped True at the end of `setup()`
  and back to False at the start of `stop()`. Single source of truth
  for "callback is safe to invoke".
- Pre-try work (Copilot): `list(entities)` and
  `apply_run_id_to_entities` now run INSIDE the try, so failures
  there reliably mark the pseudo-run FAILED instead of leaving it
  RUNNING. `entities_list = []` is pre-bound so `len()` is defined in
  the except handlers when `list(entities)` itself fails.
- Chunk-threshold constant (Copilot): extracts
  `MAX_INGEST_MESSAGE_BYTES = 3 * 1024 * 1024` at module scope;
  replaces both float literals in `_send_entities` and `run()`.
- Helper consistency (leoparente): `run()` now calls `_send_entities`
  instead of inlining the chunking branch. Helper returns the chunk
  count for the existing log line. Side effect: `run()` now raises
  `IngestRejected` (not `RuntimeError`) on response errors; outer
  `except Exception` catches it unchanged.

Backend.__init__ docstring documents the "do not invoke from
__init__/setup()" contract.

Adds 4 new tests, renames one existing test to match the new guard:
- test_ingest_callback_raises_when_not_ready (renamed)
- test_ingest_callback_records_failure_on_apply_run_id_error
- test_ingest_callback_records_failure_on_iterable_error
- test_setup_sets_callback_ready_flag
- test_stop_clears_callback_ready_flag

ruff clean, pytest 114 passed.

Refs INT-386, INT-312

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…spect on construction

- ADR-0008: Drop policy= kwarg from Backend.__init__ (no consumer in v1).
- ADR-0009: Add **kwargs forward-compat door to Backend.run.
- ADR-0007: PolicyRunner uses inspect.signature to detect whether the
  backend class accepts ingest_callback. Legacy backends with __init__(self)
  continue to construct zero-arg with no coordinated upgrade.

49 tests pass including 3 new parametrized introspection cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Represents the contract bump shipped in this branch (ingest_callback
keyword on Backend.__init__, **kwargs door on Backend.run, exceptions
module, introspection in PolicyRunner.setup). Downstream consumers like
nbl-infoblox-nios floor >=1.3.0, so the pre-bump 1.0.0 placeholder was
causing pip resolver conflicts when both packages get installed as
editable via INSTALL_DRIVERS_PATH.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Comment thread worker/worker/policy/runner.py
@ldrozdz93 ldrozdz93 changed the title feat(worker): ingest_callback + construction-time policy on Backend (INT-386) feat(worker): ingest_callback contract on Backend (INT-386) May 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants