Skip to content

feat(worker): ingest_callback contract for API-triggered sync (INT-312)#432

Draft
ldrozdz93 wants to merge 12 commits into
developfrom
int-312/worker-ingest-callback
Draft

feat(worker): ingest_callback contract for API-triggered sync (INT-312)#432
ldrozdz93 wants to merge 12 commits into
developfrom
int-312/worker-ingest-callback

Conversation

@ldrozdz93
Copy link
Copy Markdown
Contributor

@ldrozdz93 ldrozdz93 commented Jun 1, 2026

Summary

Worker-side support for the API-triggered-sync contract (INT-312). Backends expose their
identity via a describe() classmethod, and the PolicyRunner constructs each backend
with an ingest_callback so an integration can ship entities outside the scheduled
run() cycle (e.g. from an HTTP-triggered sync). A worker.exceptions hierarchy
classifies ingest failures.

Changes (relative to develop)

  • Backend.describe() — new classmethod returning the backend's Metadata without
    constructing an instance. Backends that only implement the legacy instance setup()
    keep working: the worker reads their metadata from a throwaway instance.
  • Backend.setup() is formally deprecated (fallback removal planned for v2.0), with a
    signal per audience: PEP 702 @deprecated for IDEs/type checkers, a class-definition
    DeprecationWarning when a subclass overrides setup() without describe(), and an
    operator-facing log line when the worker uses the fallback.
  • Backend.__init__ accepts ingest_callback and a **kwargs forward-compat door.
  • worker.exceptions: IngestError base, with IngestRejected (permanent — do not
    retry) and IngestUnavailable (transient — retry-friendly).
  • PolicyRunner:
    • reads the backend's metadata first (describe(), or the legacy setup() fallback),
      builds the Diode client and the ingest callback from it, then constructs the backend
      once with the callback — every dependency the callback uses is ready before the
      instance exists, so the callback is valid from construction;
    • routes both the scheduled run() and the ingest callback through a shared
      _execute_run(client, produce_entities, *, source) helper that creates a run,
      ingests via the chunking path, and records COMPLETED/FAILED. The entity producer
      is invoked lazily inside the run's try-block, so a crashing backend is still recorded
      as a FAILED run;
    • the callback validates exactly-one-of entities/error and maps unexpected
      exceptions to the base IngestError.
  • worker/pyproject.toml source project.version raised from 1.0.0 to
    1.99.0.dev0. Editable / pip install -e consumers (e.g. controller-integrations'
    e2e harness running with --use orb-agent --local-orb-worker) read this value
    directly and fail with ResolutionImpossible against downstream pins like
    netboxlabs-orb-worker>=1.3.0. The release workflow stamps the real version on top
    of project.version before python -m build, so the published wheel is unaffected.

Consumer

The downstream @with_trigger_api decorator (netboxlabs/controller-integrations,
trigger-api-util) calls self.ingest_callback(entities=...) from an HTTP-triggered run.

ldrozdz93 and others added 6 commits May 19, 2026 14:51
Introduces IngestError / IngestUnavailable / IngestRejected to be raised
by the worker-supplied ingest callback when pipeline writes fail. The
hierarchy is the contract used by integrations to differentiate transient
vs. permanent failures.

Refs INT-386, INT-312
See: https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends Backend.__init__ with keyword-only ingest_callback, policy, plus
open **kwargs (forward-compat door). PolicyRunner.setup now builds a
per-policy ingest callback closure and passes both to backend
construction. The closure chunks+ingests entities (mirroring run()),
tracks each off-schedule emission as a pseudo-run in RunStore, and
translates response/transport errors into the new IngestError subclasses.

run() is unchanged and remains single-shot per cron tick.

Refs INT-386, INT-312
See:
https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md
https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/policy-distribution-refinement.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Bind IngestError in `except ... as exc` instead of sys.exc_info() and
  drop the now-unused `import sys`.
- Log unexpected exceptions via `logger.exception` before translating
  them to IngestUnavailable, so a worker-side programming bug isn't
  silently masked as a transient failure.
- Annotate the closure signature (`error: Exception | None = None`,
  `-> None`) and rename `**kw` → `**kwargs` to match the docstring and
  Backend.__init__ contract. Comment the forward-compat door.
- Guard against an integration calling the callback before the diode
  client is initialised (e.g. from inside `Backend.setup()`): record a
  FAILED pseudo-run and raise IngestUnavailable with a clear message
  instead of letting an AttributeError surface as an unexpected
  exception.
- Extract `_send_entities` so `_build_ingest_callback` stays under the
  C901 complexity limit.
- Add a unit test for the new guard.

Refs INT-386, INT-312

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…k helper

Addresses 6 reviewer comments on PR #415:

- Init-order race (Copilot/Codex/jajeffries): the closure dereferences
  `self.run_store`, `self.metadata`, `self._diode_client` — none of
  which are attached until after `backend.setup()` returns. Replaces
  the partial `_diode_client is None` guard with a single
  `self._callback_ready` flag, flipped True at the end of `setup()`
  and back to False at the start of `stop()`. Single source of truth
  for "callback is safe to invoke".
- Pre-try work (Copilot): `list(entities)` and
  `apply_run_id_to_entities` now run INSIDE the try, so failures
  there reliably mark the pseudo-run FAILED instead of leaving it
  RUNNING. `entities_list = []` is pre-bound so `len()` is defined in
  the except handlers when `list(entities)` itself fails.
- Chunk-threshold constant (Copilot): extracts
  `MAX_INGEST_MESSAGE_BYTES = 3 * 1024 * 1024` at module scope;
  replaces both float literals in `_send_entities` and `run()`.
- Helper consistency (leoparente): `run()` now calls `_send_entities`
  instead of inlining the chunking branch. Helper returns the chunk
  count for the existing log line. Side effect: `run()` now raises
  `IngestRejected` (not `RuntimeError`) on response errors; outer
  `except Exception` catches it unchanged.

Backend.__init__ docstring documents the "do not invoke from
__init__/setup()" contract.

Adds 4 new tests, renames one existing test to match the new guard:
- test_ingest_callback_raises_when_not_ready (renamed)
- test_ingest_callback_records_failure_on_apply_run_id_error
- test_ingest_callback_records_failure_on_iterable_error
- test_setup_sets_callback_ready_flag
- test_stop_clears_callback_ready_flag

ruff clean, pytest 114 passed.

Refs INT-386, INT-312

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…spect on construction

- ADR-0008: Drop policy= kwarg from Backend.__init__ (no consumer in v1).
- ADR-0009: Add **kwargs forward-compat door to Backend.run.
- ADR-0007: PolicyRunner uses inspect.signature to detect whether the
  backend class accepts ingest_callback. Legacy backends with __init__(self)
  continue to construct zero-arg with no coordinated upgrade.

49 tests pass including 3 new parametrized introspection cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…setup callback, shared _execute_run, IngestError mapping)

- Delete _construct_backend signature introspection; construct backends
  directly (Backend.__init__ already accepts ingest_callback + **kwargs).
- Build + attach ingest_callback AFTER deps are assigned; drop the
  _callback_ready flag, its early-call guard, and the stop() reset.
- Extract _execute_run(client, produce_entities, *, source) shared by the
  scheduled run() and the ingest callback. The entity producer is invoked
  lazily INSIDE the run's try (after create_run), so a crashing eager
  backend is still recorded as a FAILED run rather than vanishing.
- Map the callback's unexpected-exception catch-all to base IngestError
  (not the retry-friendly IngestUnavailable). IngestUnavailable is retained
  in worker.exceptions (published contract consumed downstream).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 1, 2026

Coverage

Coverage Report
FileStmtsMissCoverMissing
worker
   entity_metadata.py13192%23
   main.py63494%206–207, 217, 223
   metrics.py52198%102
   package_finder.py891188%24–25, 48, 96–97, 132, 145–146, 159, 183–184
   server.py81495%34–36, 167, 170
   version.py7186%14
worker/policy
   manager.py49394%35–36, 52
   runner.py129199%136
TOTAL6142696% 

Tests Skipped Failures Errors Time
117 0 💤 0 ❌ 0 🔥 4.807s ⏱️

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds worker-side support for an API-triggered ingest contract by introducing an ingest_callback hook on backends, an exception hierarchy for ingest failures, and refactoring PolicyRunner to reuse a shared run+ingest path for both scheduled runs and callback-triggered ingests.

Changes:

  • Add worker.exceptions (IngestError, IngestRejected, IngestUnavailable) and unit tests for the hierarchy.
  • Extend Backend to accept/store an optional ingest_callback plus forward-compat **kwargs (and allow run(**kwargs)).
  • Refactor PolicyRunner to (a) attach an ingest callback after setup dependencies exist and (b) centralize run creation, ingest chunking, and run status updates in _execute_run() / _send_entities(); add tests for the new callback behavior.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
worker/worker/policy/runner.py Adds ingest callback construction and shared run execution/ingest helpers; refactors scheduled run() to use them.
worker/worker/exceptions.py Introduces ingestion-related exception hierarchy for consumers to classify failures.
worker/worker/backend.py Adds ingest_callback + **kwargs forward-compat “doors” on backend construction and run().
worker/tests/test_exceptions.py Adds tests validating exception hierarchy and messaging.
worker/tests/test_backend.py Adds tests for new Backend.__init__ and Backend.run(**kwargs) behavior.
worker/tests/policy/test_runner.py Adds regression + new tests for callback ingestion behavior and refactored run semantics.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread worker/worker/policy/runner.py
Comment thread worker/worker/policy/runner.py
Comment thread worker/worker/backend.py Outdated
Comment thread worker/worker/exceptions.py
Comment thread worker/tests/policy/test_runner.py Outdated
- _build_ingest_callback: drop the unused policy_name parameter (it was always
  self.name; internals already use self.name), removing the attribution
  inconsistency the reviewer flagged.
- backend.py: docstring now states the worker constructs with no args and
  assigns ingest_callback afterwards (matching PolicyRunner.setup).
- exceptions.py: broaden the module docstring — the hierarchy is raised by
  worker ingestion in general (scheduled run() path + ingest callback), not
  just the callback.
- test_runner.py: fix the multi-chunk test comment to match the scenario
  (both chunks sent; the second chunk's response carries errors).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread worker/tests/policy/test_runner.py
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey claude,
design alternative for the new tests - lots of them share the same setup - can that test preparation be shared across the tests as a common step? Design and /notify-me

Comment thread worker/worker/policy/runner.py Outdated
Comment thread worker/worker/policy/runner.py Outdated
Comment on lines +77 to +119
@@ -101,6 +110,13 @@ def setup(
self.metadata = metadata
self.policy = policy
self.run_store = run_store
self._diode_client = client

# Every dependency the closure reads (run_store, metadata, _diode_client)
# is now assigned, so the callback is safe to build and attach. The
# consumer reads `backend.ingest_callback` lazily at trigger time, so
# post-construction assignment is correct.
backend.ingest_callback = self._build_ingest_callback()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question to you @leoparente. It's claude-formatted, but the core is mine:

Question: what is Backend.setup() actually for?

While wiring ingest_callback into the worker, setup() started to feel like the wrong
shape. Before we build more on top of it, I'd like your view.

What it does today

setup() doesn't set anything up. The worker calls it once, only to read the integration's
metadata (name, app_name, app_version) — which it needs to name the Diode client and
tag runs. The real work is all in run(). Every integration's setup() just does
return Metadata(...) with constant values.

So setup() is really "tell me who you are," not "initialise yourself." And because it's an
instance method, the worker has to construct the backend before it can ask — which is the
only reason the callback ordering got awkward (build the backend, then attach
ingest_callback afterwards)
.

The real need

The worker needs the integration's identity (name/app_name/app_version), ideally
before it constructs the instance. The instance only matters for run() (and now for
holding ingest_callback). Since __init__ already gives us the instance, there isn't much
left for a separate "setup" step to do — the identity is constant, so it's really class
data, not per-instance work.

Constraint

Assume we'd rather add a new method and ship a new worker/package version than change
setup()'s signature or turn it into a static/class method — changing the existing contract
ripples through every integration. So: keep setup() working, add a cleaner path beside it.

Proposed approach (additive)

Add a class-level metadata accessor — e.g. @classmethod def describe(cls) -> Metadata.
The worker reads it off the class, with no instance. Integrations that only have setup()
keep working via a fallback.

With this, the worker can read the metadata, build the client, build the callback, and then
construct the backend once with the callback already in hand. The
attach-after-construction step (and any future readiness concern) disappears for new
integrations; the old path stays only as a fallback.

Legacy fallback note: for an integration that only has setup(), the worker would
create the instance twice:

  • first a throwaway just to read metadata, like Backend().setup();
  • then the actual scheduled instance, built with the callback.

The extra throwaway construction is what lets us keep the natural construct-once-with-callback
lifecycle and ordering for ingest_callback on every path.

What I'd like from you

  • Is setup() meant to do real per-instance initialisation that I'm missing, or is it only
    ever a metadata accessor?
  • OK for the worker to support both paths (new accessor + legacy setup()) for backwards compatibility?

I've prototyped the worker side on a spike branch: it adds describe() to Backend, reads metadata via describe()
(falling back to a throwaway setup() instance for legacy backends), and constructs the
backend once with the callback — no post-construction attach:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and I'm good with that change.

ldrozdz93 and others added 5 commits June 2, 2026 15:40
…kend once with ingest_callback

Backends can expose their identity via a `describe()` classmethod, so the
worker reads metadata (name/app_name/app_version) WITHOUT constructing an
instance, builds the ingest callback, and constructs the backend once with
the callback already in hand — removing the post-construction attach step.

Backends that only implement the instance `setup()` are still supported: the
worker falls back to a throwaway instance to read their metadata, then builds
the real callback-bearing instance.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…struction

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…GE_BYTES

_send_entities now always calls create_message_chunks, which owns the gRPC
message-size threshold (max_chunk_size_mb=3.0, a safe margin below the 4 MB
ceiling) and returns a single chunk when the payload already fits. This
removes the worker's duplicate of that constant and the
estimate_message_size pre-check.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- @deprecated (PEP 702, typing_extensions) on Backend.setup() so IDEs and
  type checkers flag uses; calling the base method also warns at runtime.
- Backend.__init_subclass__ emits a DeprecationWarning at class-definition
  time when a subclass overrides setup() without implementing describe() —
  the signal integration authors see in their own test runs.
- PolicyRunner._backend_metadata logs an operator-facing warning when the
  legacy throwaway-setup() fallback is used, naming the backend class.
- typing-extensions declared as a runtime dependency.

The fallback (and these warnings) are scheduled for removal in worker v2.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…resolve

Source `project.version` was `1.0.0`. CI overwrites it at release time
(worker-release.yaml runs `toml set --toml-path pyproject.toml
project.version` and stamps `worker/version.py` separately), so the
published wheel is unaffected.

The placeholder only matters for **editable / local installs from
source** — e.g. controller-integrations' `e2e_tests --use orb-agent
--local-orb-worker`, which `pip install -e`s this checkout. pip reads
`project.version` from source (`1.0.0`) and resolves it against
backend-util's pin `netboxlabs-orb-worker>=1.3.0`; `1.0.0 < 1.3.0`
triggers `ResolutionImpossible` and the install fails.

Bumping the placeholder to `1.99.0.dev0` satisfies any reasonable
`>=X.Y` floor (PEP 440 ordering: `1.99.0.dev0 >= 1.3.0` is true) while
remaining an obvious not-a-real-release marker. The build still stamps
the true release version on top of it, so nothing downstream of a real
build changes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants