From 6cbf3459950cc1bf08d6532826a736845344ef76 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Tue, 19 May 2026 14:51:17 +0200 Subject: [PATCH 01/12] feat(worker): add worker.exceptions hierarchy for ingest callback Introduces IngestError / IngestUnavailable / IngestRejected to be raised by the worker-supplied ingest callback when pipeline writes fail. The hierarchy is the contract used by integrations to differentiate transient vs. permanent failures. Refs INT-386, INT-312 See: https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/tests/test_exceptions.py | 39 +++++++++++++++++++++++++++++++++ worker/worker/exceptions.py | 29 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 worker/tests/test_exceptions.py create mode 100644 worker/worker/exceptions.py diff --git a/worker/tests/test_exceptions.py b/worker/tests/test_exceptions.py new file mode 100644 index 00000000..d66d071d --- /dev/null +++ b/worker/tests/test_exceptions.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs Inc +"""NetBox Labs - worker.exceptions hierarchy tests.""" + +import pytest + +from worker.exceptions import IngestError, IngestRejected, IngestUnavailable + + +def test_unavailable_is_ingest_error(): + """IngestUnavailable is a subclass of IngestError.""" + assert issubclass(IngestUnavailable, IngestError) + + +def test_rejected_is_ingest_error(): + """IngestRejected is a subclass of IngestError.""" + assert issubclass(IngestRejected, IngestError) + + +def test_ingest_error_chain_catchable_as_base(): + """Subclasses can be caught as the base IngestError.""" + with pytest.raises(IngestError): + raise IngestUnavailable("transient") + with pytest.raises(IngestError): + raise IngestRejected("bad payload") + + +@pytest.mark.parametrize( + "exc_cls,msg", + [ + pytest.param(IngestError, "base", id="base"), + pytest.param(IngestUnavailable, "transient", id="unavailable"), + pytest.param(IngestRejected, "permanent", id="rejected"), + ], +) +def test_exceptions_carry_message(exc_cls, msg): + """Each exception carries its constructor message via str().""" + exc = exc_cls(msg) + assert str(exc) == msg diff --git a/worker/worker/exceptions.py b/worker/worker/exceptions.py new file mode 100644 index 00000000..71bb4eb9 --- /dev/null +++ b/worker/worker/exceptions.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs Inc +"""Exception hierarchy raised by the worker ingest callback.""" + + +class IngestError(Exception): + """ + Base for pipeline-side ingestion failures. + + Integrations should catch this when they want uniform handling. New + subclasses are added under this base in future minor releases. + """ + + +class IngestUnavailable(IngestError): + """ + Transient pipeline failure — Diode unreachable, queue full, rate-limited. + + Retry-friendly. The integration MAY retry with backoff. + """ + + +class IngestRejected(IngestError): + """ + Permanent pipeline rejection for this call. + + Reasons include bad payload, instance retired, or policy removed. + The integration should NOT retry; the call will fail again. + """ From 13bd6022598fbe815d7619a658916beb30b47689 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Tue, 19 May 2026 14:51:37 +0200 Subject: [PATCH 02/12] feat(worker): pass ingest_callback and policy to Backend.__init__ Extends Backend.__init__ with keyword-only ingest_callback, policy, plus open **kwargs (forward-compat door). PolicyRunner.setup now builds a per-policy ingest callback closure and passes both to backend construction. The closure chunks+ingests entities (mirroring run()), tracks each off-schedule emission as a pseudo-run in RunStore, and translates response/transport errors into the new IngestError subclasses. run() is unchanged and remains single-shot per cron tick. Refs INT-386, INT-312 See: https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/worker-backend-ingest-callback.md https://github.com/netboxlabs/controller-integrations/blob/develop/docs/plans/policy-distribution-refinement.md Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/tests/policy/test_runner.py | 255 ++++++++++++++++++++++++++++- worker/tests/test_backend.py | 24 +++ worker/worker/backend.py | 32 ++++ worker/worker/policy/runner.py | 101 +++++++++++- 4 files changed, 410 insertions(+), 2 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index 7365c2e0..25f51ef5 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -9,8 +9,9 @@ from netboxlabs.diode.sdk.diode.v1 import ingester_pb2 from worker.backend import Backend +from worker.exceptions import IngestRejected, IngestUnavailable from worker.models import Config, DiodeConfig, Metadata, Policy, Status -from worker.policy.run import RunStore +from worker.policy.run import RunStatus, RunStore from worker.policy.runner import PolicyRunner @@ -116,6 +117,11 @@ def mock_backend(): return backend +def _extract_callback(mock_backend_class): + """Recover the ingest_callback closure that PolicyRunner.setup passed to backend_class().""" + return mock_backend_class.call_args.kwargs["ingest_callback"] + + def test_initial_status(policy_runner): """Test initial status of PolicyRunner.""" assert policy_runner.status == Status.NEW @@ -592,3 +598,250 @@ def test_run_chunk_ingestion_error( # Should log the error assert "Chunk ingestion failed" in caplog.text + + +# --------------------------------------------------------------------------- +# New tests: _build_ingest_callback +# --------------------------------------------------------------------------- + + +def test_setup_passes_kwargs_to_backend( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """setup() constructs the backend with ingest_callback= and policy= kwargs.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + mock_backend_class = mock_load_class.return_value + call_kwargs = mock_backend_class.call_args.kwargs + assert "ingest_callback" in call_kwargs + assert callable(call_kwargs["ingest_callback"]) + assert call_kwargs["policy"] == sample_policy + + +def test_ingest_callback_entities_happy_path( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Callback with entities= ingests them and records COMPLETED run.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + + client_instance = mock_diode_client.return_value + client_instance.ingest.return_value.errors = [] + + entity1 = ingester_pb2.Entity() + entity1.device.name = "dev1" + entity2 = ingester_pb2.Entity() + entity2.device.name = "dev2" + + with patch("worker.policy.runner.apply_run_id_to_entities"), patch( + "worker.policy.runner.estimate_message_size", return_value=1024 + ), patch("worker.policy.runner.create_message_chunks"): + result = callback(entities=[entity1, entity2]) + + assert result is None + mock_run_store.create_run.assert_called_once() + client_instance.ingest.assert_called_once() + call_kwargs = client_instance.ingest.call_args.kwargs + assert len(call_kwargs["entities"]) == 2 + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.COMPLETED + + +def test_ingest_callback_error_path( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Callback with error= records FAILED run and skips client.ingest.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + client_instance = mock_diode_client.return_value + + err = Exception("vendor unreachable") + result = callback(error=err) + + assert result is None + client_instance.ingest.assert_not_called() + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.FAILED + assert update_kwargs["error"] is err + + +@pytest.mark.parametrize( + "kwargs", + [ + pytest.param({}, id="neither"), + pytest.param({"entities": [], "error": Exception("x")}, id="both"), + ], +) +def test_ingest_callback_requires_exactly_one_of_entities_or_error( + kwargs, + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Callback raises TypeError when neither or both of entities/error are given.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + + with pytest.raises(TypeError): + callback(**kwargs) + + +def test_ingest_callback_translates_transport_errors_to_unavailable( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Non-IngestError transport exceptions are wrapped as IngestUnavailable.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + client_instance = mock_diode_client.return_value + client_instance.ingest.side_effect = RuntimeError("connection refused") + + entity = ingester_pb2.Entity() + entity.device.name = "dev1" + + with patch("worker.policy.runner.estimate_message_size", return_value=1024), patch( + "worker.policy.runner.apply_run_id_to_entities" + ): + with pytest.raises(IngestUnavailable): + callback(entities=[entity]) + + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.FAILED + + +def test_ingest_callback_translates_response_errors_to_rejected( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Response errors (non-empty errors list) are raised as IngestRejected.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + client_instance = mock_diode_client.return_value + client_instance.ingest.return_value.errors = ["bad payload"] + + entity = ingester_pb2.Entity() + entity.device.name = "dev1" + + with patch("worker.policy.runner.estimate_message_size", return_value=1024), patch( + "worker.policy.runner.apply_run_id_to_entities" + ): + with pytest.raises(IngestRejected): + callback(entities=[entity]) + + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.FAILED + + +def test_ingest_callback_chunks_large_payloads( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Callback splits large payloads into chunks and ingests each separately.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + client_instance = mock_diode_client.return_value + client_instance.ingest.return_value.errors = [] + + entity1 = ingester_pb2.Entity() + entity1.device.name = "dev1" + entity2 = ingester_pb2.Entity() + entity2.device.name = "dev2" + chunk_a = [entity1] + chunk_b = [entity2] + + with patch( + "worker.policy.runner.estimate_message_size", return_value=4 * 1024 * 1024 + ), patch( + "worker.policy.runner.create_message_chunks", return_value=[chunk_a, chunk_b] + ), patch( + "worker.policy.runner.apply_run_id_to_entities" + ): + callback(entities=[entity1, entity2]) + + assert client_instance.ingest.call_count == 2 + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.COMPLETED + + +def test_run_unaffected_by_callback( + policy_runner, sample_policy, mock_diode_client, mock_backend, mock_run_store +): + """PolicyRunner.run() is unaffected by the new callback mechanism.""" + policy_runner.name = "test_policy" + policy_runner.run_store = mock_run_store + + entity = ingester_pb2.Entity() + entity.device.name = "device-x" + mock_backend.run.return_value = [entity] + mock_diode_client.ingest.return_value.errors = [] + + with patch("worker.policy.runner.estimate_message_size", return_value=512): + policy_runner.run(mock_diode_client, mock_backend, sample_policy) + + mock_backend.run.assert_called_once_with("test_policy", sample_policy) + mock_diode_client.ingest.assert_called_once() + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.COMPLETED diff --git a/worker/tests/test_backend.py b/worker/tests/test_backend.py index 8c2a18b6..179fd602 100644 --- a/worker/tests/test_backend.py +++ b/worker/tests/test_backend.py @@ -88,3 +88,27 @@ def test_load_class_attribute_error(mock_import_module): match=f"Failed to load a class inheriting from 'Backend' in module '{mock_module_name}': Attribute error", ): load_class(mock_module_name) + + +def test_backend_init_default_no_args(): + """Zero-arg construction still works (back-compat with older worker).""" + b = Backend() + assert b.ingest_callback is None + assert b.policy is None + + +def test_backend_init_stores_kwargs(): + """ingest_callback and policy are stored on the instance.""" + + def cb(**_): + return None + + pol = MagicMock(spec=Policy) + b = Backend(ingest_callback=cb, policy=pol) + assert b.ingest_callback is cb + assert b.policy is pol + + +def test_backend_init_absorbs_unknown_kwargs(): + """Forward-compat: unknown kwargs don't raise.""" + Backend(unknown_future_resource="x", another_one=42) # must not raise diff --git a/worker/worker/backend.py b/worker/worker/backend.py index 3571c204..17a4f938 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -14,6 +14,38 @@ class Backend: """Backend Class.""" + def __init__( + self, + *, + ingest_callback=None, + policy: Policy | None = None, + **kwargs, + ) -> None: + """ + Construct the Backend. + + Worker passes ``ingest_callback`` and ``policy`` at construction + starting with the minor release this docstring ships in. Older + worker versions construct ``Backend()`` with zero args; integrations + that override ``__init__`` should accept ``**kwargs`` so both paths + keep working. + + Args: + ---- + ingest_callback: Optional callable that ingests entities or + reports errors outside of the ``run()`` cycle. See + ``worker.exceptions`` for the exception hierarchy it may + raise. + policy: Optional construction-time policy. If supplied, the + integration may use credentials / scope without waiting for + the first scheduled ``run()``. + **kwargs: Forward-compat door for additional resources worker + may pass in future versions; silently ignored by default. + + """ + self.ingest_callback = ingest_callback + self.policy = policy + def setup(self) -> Metadata: """ Set up the backend. diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index 0f5baa57..da8936c6 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -3,6 +3,7 @@ """Orb Worker Policy Runner.""" import logging +import sys import time from datetime import datetime, timedelta @@ -19,6 +20,7 @@ from worker.backend import Backend, load_class from worker.entity_metadata import apply_run_id_to_entities +from worker.exceptions import IngestError, IngestRejected, IngestUnavailable from worker.metrics import get_metric from worker.models import DiodeConfig, Policy, Status from worker.package_finder import maybe_evict @@ -38,6 +40,7 @@ def __init__(self): self.status = Status.NEW self.scheduler = BackgroundScheduler() self.run_store = None + self._diode_client = None def setup( self, name: str, diode_config: DiodeConfig, policy: Policy, run_store: RunStore @@ -65,7 +68,12 @@ def setup( # Debug logging for backend loading logger.debug(f"Loading backend class: {policy.config.package}") backend_class = load_class(policy.config.package) - backend = backend_class() + + # Build the ingest callback closure. It captures `self` and reads + # `self._diode_client` lazily, so it is safe to construct before the + # client is assigned below. + ingest_callback = self._build_ingest_callback(self.name) + backend = backend_class(ingest_callback=ingest_callback, policy=policy) logger.debug(f"Backend class loaded successfully: {backend_class.__name__}") metadata = backend.setup() @@ -101,6 +109,7 @@ def setup( self.metadata = metadata self.policy = policy self.run_store = run_store + self._diode_client = client self.scheduler.start() @@ -127,6 +136,96 @@ def setup( if active_policies: active_policies.add(1, {"policy": self.name}) + def _build_ingest_callback(self, policy_name: str): + """ + Build a closure used to ingest entities outside the scheduled run() cycle. + + The returned callable signature: + cb(entities=None, *, error=None, **kwargs) -> None + + Exactly one of ``entities`` / ``error`` must be supplied. + On the ``entities`` path: a pseudo-run is created in the RunStore, + entities are chunked and ingested via the same path run() uses, and + response/transport errors are translated into IngestRejected / + IngestUnavailable. On the ``error`` path: a failed pseudo-run is + recorded; no client.ingest call is made; returns None. + """ + + def ingest_callback(entities=None, *, error=None, **kw): + if (entities is None) == (error is None): + raise TypeError( + "ingest_callback requires exactly one of 'entities' or 'error'" + ) + run = self.run_store.create_run( + policy_name=policy_name, + metadata={ + "name": self.metadata.name, + "app_name": self.metadata.app_name, + "app_version": self.metadata.app_version, + "source": "ingest_callback", + }, + ) + if error is not None: + self.run_store.update_run( + policy_name=policy_name, + run_id=run.id, + status=RunStatus.FAILED, + error=error, + entity_count=0, + ) + return + entities_list = list(entities) + apply_run_id_to_entities(entities_list, run.id) + metadata = { + "policy_name": policy_name, + "worker_backend": self.metadata.name, + "run_id": run.id, + } + client = self._diode_client + try: + size_bytes = estimate_message_size(entities_list) + if size_bytes > (3.0 * 1024 * 1024): + chunks = create_message_chunks(entities_list) + for chunk in chunks: + response = client.ingest(entities=chunk, metadata=metadata) + if response.errors: + raise IngestRejected( + f"Chunk ingestion failed: {response.errors}" + ) + else: + response = client.ingest(entities=entities_list, metadata=metadata) + if response.errors: + raise IngestRejected( + f"Entities ingestion failed: {response.errors}" + ) + except IngestError: + self.run_store.update_run( + policy_name=policy_name, + run_id=run.id, + status=RunStatus.FAILED, + error=sys.exc_info()[1], + entity_count=len(entities_list), + ) + raise + except Exception as exc: + self.run_store.update_run( + policy_name=policy_name, + run_id=run.id, + status=RunStatus.FAILED, + error=exc, + entity_count=len(entities_list), + ) + raise IngestUnavailable(str(exc)) from exc + self.run_store.update_run( + policy_name=policy_name, + run_id=run.id, + status=RunStatus.COMPLETED, + error=None, + entity_count=len(entities_list), + ) + + return ingest_callback + def run( self, client: DiodeClient | DiodeDryRunClient | DiodeOTLPClient, From 7e962a32648edca5980bf0df94f7ecb4df0320eb Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Tue, 19 May 2026 15:04:49 +0200 Subject: [PATCH 03/12] fix(worker): pr-code-reviewer feedback on ingest callback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bind IngestError in `except ... as exc` instead of sys.exc_info() and drop the now-unused `import sys`. - Log unexpected exceptions via `logger.exception` before translating them to IngestUnavailable, so a worker-side programming bug isn't silently masked as a transient failure. - Annotate the closure signature (`error: Exception | None = None`, `-> None`) and rename `**kw` → `**kwargs` to match the docstring and Backend.__init__ contract. Comment the forward-compat door. - Guard against an integration calling the callback before the diode client is initialised (e.g. from inside `Backend.setup()`): record a FAILED pseudo-run and raise IngestUnavailable with a clear message instead of letting an AttributeError surface as an unexpected exception. - Extract `_send_entities` so `_build_ingest_callback` stays under the C901 complexity limit. - Add a unit test for the new guard. Refs INT-386, INT-312 Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/tests/policy/test_runner.py | 30 +++++++++++++++ worker/worker/policy/runner.py | 60 ++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 19 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index 25f51ef5..a9d073ee 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -845,3 +845,33 @@ def test_run_unaffected_by_callback( mock_run_store.update_run.assert_called_once() update_kwargs = mock_run_store.update_run.call_args.kwargs assert update_kwargs["status"] == RunStatus.COMPLETED + + +def test_ingest_callback_raises_when_client_not_initialised( + policy_runner, + sample_policy, + sample_diode_config, + mock_run_store, + mock_load_class, + mock_diode_client, +): + """Calling the callback before _diode_client is assigned raises IngestUnavailable.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy-x", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + + # Simulate "called before client init" — clear the attribute. + policy_runner._diode_client = None + + entity = MagicMock() + with patch("worker.policy.runner.apply_run_id_to_entities"): + with pytest.raises(IngestUnavailable, match="diode client was initialised"): + callback(entities=[entity]) + + # Pseudo-run was still recorded as FAILED. + mock_run_store.update_run.assert_called() + final_call = mock_run_store.update_run.call_args + assert final_call.kwargs["status"] == RunStatus.FAILED diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index da8936c6..625dcdeb 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -3,7 +3,6 @@ """Orb Worker Policy Runner.""" import logging -import sys import time from datetime import datetime, timedelta @@ -151,7 +150,13 @@ def _build_ingest_callback(self, policy_name: str): recorded; no client.ingest call is made; returns None. """ - def ingest_callback(entities=None, *, error=None, **kw): + def ingest_callback( + entities=None, + *, + error: Exception | None = None, + **kwargs, + ) -> None: + # kwargs is reserved for forward-compat (run_id, source, etc.); currently ignored. if (entities is None) == (error is None): raise TypeError( "ingest_callback requires exactly one of 'entities' or 'error'" @@ -182,32 +187,35 @@ def ingest_callback(entities=None, *, error=None, **kw): "run_id": run.id, } client = self._diode_client + if client is None: + guard_error = IngestUnavailable( + "ingest_callback invoked before the diode client was initialised " + "(likely called from Backend.setup() — defer until after setup completes)" + ) + self.run_store.update_run( + policy_name=policy_name, + run_id=run.id, + status=RunStatus.FAILED, + error=guard_error, + entity_count=len(entities_list), + ) + raise guard_error try: - size_bytes = estimate_message_size(entities_list) - if size_bytes > (3.0 * 1024 * 1024): - chunks = create_message_chunks(entities_list) - for chunk in chunks: - response = client.ingest(entities=chunk, metadata=metadata) - if response.errors: - raise IngestRejected( - f"Chunk ingestion failed: {response.errors}" - ) - else: - response = client.ingest(entities=entities_list, metadata=metadata) - if response.errors: - raise IngestRejected( - f"Entities ingestion failed: {response.errors}" - ) - except IngestError: + self._send_entities(client, entities_list, metadata) + except IngestError as exc: self.run_store.update_run( policy_name=policy_name, run_id=run.id, status=RunStatus.FAILED, - error=sys.exc_info()[1], + error=exc, entity_count=len(entities_list), ) raise except Exception as exc: + logger.exception( + "Unexpected exception in ingest_callback; " + "translating to IngestUnavailable" + ) self.run_store.update_run( policy_name=policy_name, run_id=run.id, @@ -226,6 +234,20 @@ def ingest_callback(entities=None, *, error=None, **kw): return ingest_callback + def _send_entities(self, client, entities_list: list, metadata: dict) -> None: + """Send entities to the Diode client, chunking if the payload exceeds 3 MB.""" + size_bytes = estimate_message_size(entities_list) + if size_bytes > (3.0 * 1024 * 1024): + chunks = create_message_chunks(entities_list) + for chunk in chunks: + response = client.ingest(entities=chunk, metadata=metadata) + if response.errors: + raise IngestRejected(f"Chunk ingestion failed: {response.errors}") + else: + response = client.ingest(entities=entities_list, metadata=metadata) + if response.errors: + raise IngestRejected(f"Entities ingestion failed: {response.errors}") + def run( self, client: DiodeClient | DiodeDryRunClient | DiodeOTLPClient, From 2dbccf8b4ed9d36bb15e190e142fcab41481873d Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Wed, 20 May 2026 10:06:05 +0200 Subject: [PATCH 04/12] fix(worker): explicit callback-ready flag, full try-scope, share chunk helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses 6 reviewer comments on PR #415: - Init-order race (Copilot/Codex/jajeffries): the closure dereferences `self.run_store`, `self.metadata`, `self._diode_client` — none of which are attached until after `backend.setup()` returns. Replaces the partial `_diode_client is None` guard with a single `self._callback_ready` flag, flipped True at the end of `setup()` and back to False at the start of `stop()`. Single source of truth for "callback is safe to invoke". - Pre-try work (Copilot): `list(entities)` and `apply_run_id_to_entities` now run INSIDE the try, so failures there reliably mark the pseudo-run FAILED instead of leaving it RUNNING. `entities_list = []` is pre-bound so `len()` is defined in the except handlers when `list(entities)` itself fails. - Chunk-threshold constant (Copilot): extracts `MAX_INGEST_MESSAGE_BYTES = 3 * 1024 * 1024` at module scope; replaces both float literals in `_send_entities` and `run()`. - Helper consistency (leoparente): `run()` now calls `_send_entities` instead of inlining the chunking branch. Helper returns the chunk count for the existing log line. Side effect: `run()` now raises `IngestRejected` (not `RuntimeError`) on response errors; outer `except Exception` catches it unchanged. Backend.__init__ docstring documents the "do not invoke from __init__/setup()" contract. Adds 4 new tests, renames one existing test to match the new guard: - test_ingest_callback_raises_when_not_ready (renamed) - test_ingest_callback_records_failure_on_apply_run_id_error - test_ingest_callback_records_failure_on_iterable_error - test_setup_sets_callback_ready_flag - test_stop_clears_callback_ready_flag ruff clean, pytest 114 passed. Refs INT-386, INT-312 Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/tests/policy/test_runner.py | 124 ++++++++++++++++++++++++++--- worker/worker/backend.py | 6 +- worker/worker/policy/runner.py | 82 +++++++++---------- 3 files changed, 157 insertions(+), 55 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index a9d073ee..0c4cbf5f 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -593,7 +593,7 @@ def test_run_chunk_ingestion_error( with caplog.at_level("ERROR"): policy_runner.run(mock_diode_client, mock_backend, sample_policy) - # Should call ingest once and fail on first chunk error (it raises RuntimeError immediately) + # Should call ingest once and fail on first chunk error (it raises IngestRejected immediately) assert mock_diode_client.ingest.call_count == 2 # Should log the error @@ -847,7 +847,7 @@ def test_run_unaffected_by_callback( assert update_kwargs["status"] == RunStatus.COMPLETED -def test_ingest_callback_raises_when_client_not_initialised( +def test_ingest_callback_raises_when_not_ready( policy_runner, sample_policy, sample_diode_config, @@ -855,7 +855,7 @@ def test_ingest_callback_raises_when_client_not_initialised( mock_load_class, mock_diode_client, ): - """Calling the callback before _diode_client is assigned raises IngestUnavailable.""" + """Calling the callback before _callback_ready is True raises IngestUnavailable.""" with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" ): @@ -863,15 +863,117 @@ def test_ingest_callback_raises_when_client_not_initialised( callback = _extract_callback(mock_load_class.return_value) - # Simulate "called before client init" — clear the attribute. - policy_runner._diode_client = None + # Simulate "called before worker finished constructing" — clear the readiness flag. + policy_runner._callback_ready = False entity = MagicMock() - with patch("worker.policy.runner.apply_run_id_to_entities"): - with pytest.raises(IngestUnavailable, match="diode client was initialised"): + with pytest.raises(IngestUnavailable, match="before the worker finished constructing"): + callback(entities=[entity]) + + # No pseudo-run must have been created (guard fires before create_run). + mock_run_store.create_run.assert_not_called() + + +def test_ingest_callback_records_failure_on_apply_run_id_error( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """apply_run_id_to_entities failure inside try: records FAILED run as IngestUnavailable.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + + entity = ingester_pb2.Entity() + entity.device.name = "dev1" + + with patch( + "worker.policy.runner.apply_run_id_to_entities", + side_effect=RuntimeError("entity corrupt"), + ), patch("worker.policy.runner.estimate_message_size", return_value=1024): + with pytest.raises(IngestUnavailable): callback(entities=[entity]) - # Pseudo-run was still recorded as FAILED. - mock_run_store.update_run.assert_called() - final_call = mock_run_store.update_run.call_args - assert final_call.kwargs["status"] == RunStatus.FAILED + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.FAILED + # The entity was materialised before apply_run_id_to_entities raised. + assert update_kwargs["entity_count"] == 1 + + +def test_ingest_callback_records_failure_on_iterable_error( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """An iterable that raises on first next() records FAILED run with entity_count=0.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + + bad_iterable = MagicMock() + bad_iterable.__iter__ = MagicMock(side_effect=ValueError("bad")) + + with pytest.raises(IngestUnavailable): + callback(entities=bad_iterable) + + mock_run_store.update_run.assert_called_once() + update_kwargs = mock_run_store.update_run.call_args.kwargs + assert update_kwargs["status"] == RunStatus.FAILED + # Iterable failed before any entity was materialised. + assert update_kwargs["entity_count"] == 0 + + +def test_setup_sets_callback_ready_flag( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """After setup() returns, _callback_ready is True.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + assert policy_runner._callback_ready is True + + +def test_stop_clears_callback_ready_flag( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """After stop(), _callback_ready is False and the callback raises IngestUnavailable.""" + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + callback = _extract_callback(mock_load_class.return_value) + + with patch.object(policy_runner.scheduler, "shutdown"): + policy_runner.stop() + + assert policy_runner._callback_ready is False + + entity = MagicMock() + with pytest.raises(IngestUnavailable, match="before the worker finished constructing"): + callback(entities=[entity]) diff --git a/worker/worker/backend.py b/worker/worker/backend.py index 17a4f938..ccc01046 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -33,7 +33,11 @@ def __init__( Args: ---- ingest_callback: Optional callable that ingests entities or - reports errors outside of the ``run()`` cycle. See + reports errors outside of the ``run()`` cycle. **Do not + invoke from ``__init__`` or ``setup()`` — the callback is + only usable starting after the worker finishes constructing + the Backend (i.e. after ``setup()`` returns); calling it + earlier raises ``IngestUnavailable``.** See ``worker.exceptions`` for the exception hierarchy it may raise. policy: Optional construction-time policy. If supplied, the diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index 625dcdeb..df55dae3 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -25,6 +25,10 @@ from worker.package_finder import maybe_evict from worker.policy.run import RunStatus, RunStore +# Diode message-size cap (per chunk). Stays in sync with the reconciler's +# 4 MiB gRPC ceiling minus a safety margin. +MAX_INGEST_MESSAGE_BYTES = 3 * 1024 * 1024 + logger = logging.getLogger(__name__) @@ -40,6 +44,7 @@ def __init__(self): self.scheduler = BackgroundScheduler() self.run_store = None self._diode_client = None + self._callback_ready = False def setup( self, name: str, diode_config: DiodeConfig, policy: Policy, run_store: RunStore @@ -131,6 +136,11 @@ def setup( self.status = Status.RUNNING + # Callback is now safe to invoke — every dependency the closure reads + # (run_store, metadata, _diode_client) is attached. Integrations may push + # entities via ingest_callback from this point onward. + self._callback_ready = True + active_policies = get_metric("active_policies") if active_policies: active_policies.add(1, {"policy": self.name}) @@ -161,6 +171,12 @@ def ingest_callback( raise TypeError( "ingest_callback requires exactly one of 'entities' or 'error'" ) + if not self._callback_ready: + raise IngestUnavailable( + "ingest_callback invoked before the worker finished constructing " + "this Backend (likely called from Backend.__init__ or " + "Backend.setup() — defer until after setup() returns)" + ) run = self.run_store.create_run( policy_name=policy_name, metadata={ @@ -179,29 +195,16 @@ def ingest_callback( entity_count=0, ) return - entities_list = list(entities) - apply_run_id_to_entities(entities_list, run.id) - metadata = { - "policy_name": policy_name, - "worker_backend": self.metadata.name, - "run_id": run.id, - } - client = self._diode_client - if client is None: - guard_error = IngestUnavailable( - "ingest_callback invoked before the diode client was initialised " - "(likely called from Backend.setup() — defer until after setup completes)" - ) - self.run_store.update_run( - policy_name=policy_name, - run_id=run.id, - status=RunStatus.FAILED, - error=guard_error, - entity_count=len(entities_list), - ) - raise guard_error + entities_list: list = [] try: - self._send_entities(client, entities_list, metadata) + entities_list = list(entities) + apply_run_id_to_entities(entities_list, run.id) + metadata = { + "policy_name": policy_name, + "worker_backend": self.metadata.name, + "run_id": run.id, + } + self._send_entities(self._diode_client, entities_list, metadata) except IngestError as exc: self.run_store.update_run( policy_name=policy_name, @@ -234,19 +237,24 @@ def ingest_callback( return ingest_callback - def _send_entities(self, client, entities_list: list, metadata: dict) -> None: - """Send entities to the Diode client, chunking if the payload exceeds 3 MB.""" + def _send_entities(self, client, entities_list: list, metadata: dict) -> int: + """ + Send entities to the Diode client, chunking if the payload exceeds MAX_INGEST_MESSAGE_BYTES. + + Returns the number of chunks actually sent (1 if not chunked). + """ size_bytes = estimate_message_size(entities_list) - if size_bytes > (3.0 * 1024 * 1024): + if size_bytes > MAX_INGEST_MESSAGE_BYTES: chunks = create_message_chunks(entities_list) for chunk in chunks: response = client.ingest(entities=chunk, metadata=metadata) if response.errors: raise IngestRejected(f"Chunk ingestion failed: {response.errors}") - else: - response = client.ingest(entities=entities_list, metadata=metadata) - if response.errors: - raise IngestRejected(f"Entities ingestion failed: {response.errors}") + return len(chunks) + response = client.ingest(entities=entities_list, metadata=metadata) + if response.errors: + raise IngestRejected(f"Entities ingestion failed: {response.errors}") + return 1 def run( self, @@ -295,20 +303,7 @@ def run( "worker_backend": self.metadata.name, "run_id": run.id, } - chunk_num = 1 - size_bytes = estimate_message_size(entities) - - if size_bytes > (3.0 * 1024 * 1024): - chunks = create_message_chunks(entities) - chunk_num = len(chunks) - for chunk in chunks: - response = client.ingest(entities=chunk, metadata=metadata) - if response.errors: - raise RuntimeError(f"Chunk ingestion failed: {response.errors}") - else: - response = client.ingest(entities=entities, metadata=metadata) - if response.errors: - raise RuntimeError(f"Entities ingestion failed: {response.errors}") + chunk_num = self._send_entities(client, entities, metadata) logger.info( f"Policy {self.name}: Successfully ingested {entity_count} entities in {chunk_num} chunks" ) @@ -372,6 +367,7 @@ def run( def stop(self): """Stop the policy runner.""" + self._callback_ready = False self.scheduler.shutdown(wait=False) self.status = Status.FINISHED active_policies = get_metric("active_policies") From 9df7605fa7c680bbf11f3bacb521f72586663702 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Wed, 20 May 2026 22:15:28 +0200 Subject: [PATCH 05/12] fix(worker): drop policy= kwarg, add Backend.run **kwargs door, introspect on construction - ADR-0008: Drop policy= kwarg from Backend.__init__ (no consumer in v1). - ADR-0009: Add **kwargs forward-compat door to Backend.run. - ADR-0007: PolicyRunner uses inspect.signature to detect whether the backend class accepts ingest_callback. Legacy backends with __init__(self) continue to construct zero-arg with no coordinated upgrade. 49 tests pass including 3 new parametrized introspection cases. Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/tests/policy/test_runner.py | 43 ++++++++++++++++++++++++++++-- worker/tests/test_backend.py | 20 +++++++++----- worker/worker/backend.py | 29 +++++++++++--------- worker/worker/policy/runner.py | 24 ++++++++++++++++- 4 files changed, 95 insertions(+), 21 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index 0c4cbf5f..774555cd 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -613,7 +613,7 @@ def test_setup_passes_kwargs_to_backend( mock_diode_client, mock_run_store, ): - """setup() constructs the backend with ingest_callback= and policy= kwargs.""" + """setup() constructs the backend with ingest_callback= kwarg (no policy= per ADR-0008).""" with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" ): @@ -623,7 +623,46 @@ def test_setup_passes_kwargs_to_backend( call_kwargs = mock_backend_class.call_args.kwargs assert "ingest_callback" in call_kwargs assert callable(call_kwargs["ingest_callback"]) - assert call_kwargs["policy"] == sample_policy + assert "policy" not in call_kwargs + + +@pytest.mark.parametrize( + "init_signature, expects_ingest_callback", + [ + pytest.param( + "def __init__(self): self.ingest_callback = None", + False, + id="legacy-zero-arg", + ), + pytest.param( + "def __init__(self, **kwargs): self.ingest_callback = kwargs.get('ingest_callback')", + True, + id="kwargs-absorber", + ), + pytest.param( + "def __init__(self, *, ingest_callback=None, **kwargs): self.ingest_callback = ingest_callback", + True, + id="named-kwarg", + ), + ], +) +def test_construct_backend_introspection(init_signature, expects_ingest_callback): + """_construct_backend only passes ingest_callback when the class accepts it.""" + from worker.policy.runner import _construct_backend + + namespace: dict = {} + exec( # noqa: S102 — synthesizing tiny class fixture under test + "class _Stub:\n" + " " + init_signature + "\n", + namespace, + ) + stub_class = namespace["_Stub"] + sentinel = object() + instance = _construct_backend(stub_class, ingest_callback=sentinel) + if expects_ingest_callback: + assert instance.ingest_callback is sentinel + else: + assert instance.ingest_callback is None def test_ingest_callback_entities_happy_path( diff --git a/worker/tests/test_backend.py b/worker/tests/test_backend.py index 179fd602..827a2801 100644 --- a/worker/tests/test_backend.py +++ b/worker/tests/test_backend.py @@ -94,21 +94,29 @@ def test_backend_init_default_no_args(): """Zero-arg construction still works (back-compat with older worker).""" b = Backend() assert b.ingest_callback is None - assert b.policy is None -def test_backend_init_stores_kwargs(): - """ingest_callback and policy are stored on the instance.""" +def test_backend_init_stores_ingest_callback(): + """ingest_callback is stored on the instance.""" def cb(**_): return None - pol = MagicMock(spec=Policy) - b = Backend(ingest_callback=cb, policy=pol) + b = Backend(ingest_callback=cb) assert b.ingest_callback is cb - assert b.policy is pol def test_backend_init_absorbs_unknown_kwargs(): """Forward-compat: unknown kwargs don't raise.""" Backend(unknown_future_resource="x", another_one=42) # must not raise + + +def test_backend_run_accepts_kwargs(): + """run() signature absorbs **kwargs (passive forward-compat door).""" + b = Backend() + # The base implementation raises NotImplementedError; the point is + # that calling with kwargs reaches the body without a TypeError. + try: + b.run("policy", MagicMock(spec=Policy), future_kwarg="x") + except NotImplementedError: + pass diff --git a/worker/worker/backend.py b/worker/worker/backend.py index ccc01046..f6eec1fa 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -18,17 +18,15 @@ def __init__( self, *, ingest_callback=None, - policy: Policy | None = None, **kwargs, ) -> None: """ Construct the Backend. - Worker passes ``ingest_callback`` and ``policy`` at construction - starting with the minor release this docstring ships in. Older - worker versions construct ``Backend()`` with zero args; integrations - that override ``__init__`` should accept ``**kwargs`` so both paths - keep working. + Worker passes ``ingest_callback`` at construction starting with the + minor release this docstring ships in. Older worker versions + construct ``Backend()`` with zero args; integrations that override + ``__init__`` should accept ``**kwargs`` so both paths keep working. Args: ---- @@ -40,15 +38,11 @@ def __init__( earlier raises ``IngestUnavailable``.** See ``worker.exceptions`` for the exception hierarchy it may raise. - policy: Optional construction-time policy. If supplied, the - integration may use credentials / scope without waiting for - the first scheduled ``run()``. **kwargs: Forward-compat door for additional resources worker may pass in future versions; silently ignored by default. """ self.ingest_callback = ingest_callback - self.policy = policy def setup(self) -> Metadata: """ @@ -61,7 +55,12 @@ def setup(self) -> Metadata: """ raise NotImplementedError("The 'setup' method must be implemented.") - def run(self, policy_name: str, policy: Policy) -> Iterable[Entity]: + def run( + self, + policy_name: str, + policy: Policy, + **kwargs, + ) -> Iterable[Entity]: """ Run the backend. @@ -69,10 +68,16 @@ def run(self, policy_name: str, policy: Policy) -> Iterable[Entity]: ---- policy_name (str): The name of the policy. policy (Policy): The policy to run. + **kwargs: Passive forward-compat door. The worker passes nothing + through it in v1; future minor releases may add per-tick + context (e.g. ``source="scheduled"|"trigger"``, ``run_id``). + Concrete backends are encouraged to declare ``**kwargs`` so + additive kwargs ride into the contract without a coordinated + upgrade. Returns: ------- - Iterable[Entity]: The entities produced by the backend + Iterable[Entity]: The entities produced by the backend. """ raise NotImplementedError("The 'run' method must be implemented.") diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index df55dae3..7083c389 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -2,6 +2,7 @@ # Copyright 2025 NetBox Labs Inc """Orb Worker Policy Runner.""" +import inspect import logging import time from datetime import datetime, timedelta @@ -32,6 +33,27 @@ logger = logging.getLogger(__name__) +def _construct_backend(backend_class, *, ingest_callback): + """ + Construct a Backend, passing ``ingest_callback`` only when accepted. + + Uses ``inspect.signature`` to detect whether ``backend_class.__init__`` + has a matching named parameter or ``VAR_KEYWORD`` absorber. Legacy + backends with ``__init__(self)`` get constructed zero-arg and never + see the kwarg, so the worker can ship this contract bump without a + coordinated upgrade across every integration package. + """ + sig = inspect.signature(backend_class) + params = sig.parameters + accepts_var_kw = any( + p.kind is inspect.Parameter.VAR_KEYWORD for p in params.values() + ) + init_kwargs: dict[str, object] = {} + if accepts_var_kw or "ingest_callback" in params: + init_kwargs["ingest_callback"] = ingest_callback + return backend_class(**init_kwargs) + + class PolicyRunner: """Policy Runner class.""" @@ -77,7 +99,7 @@ def setup( # `self._diode_client` lazily, so it is safe to construct before the # client is assigned below. ingest_callback = self._build_ingest_callback(self.name) - backend = backend_class(ingest_callback=ingest_callback, policy=policy) + backend = _construct_backend(backend_class, ingest_callback=ingest_callback) logger.debug(f"Backend class loaded successfully: {backend_class.__name__}") metadata = backend.setup() From dd482d06fdd8859189a6368b52273e30fd793d10 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Mon, 1 Jun 2026 10:14:56 +0200 Subject: [PATCH 06/12] refactor(worker): apply code-review fixes (direct construction, post-setup callback, shared _execute_run, IngestError mapping) - Delete _construct_backend signature introspection; construct backends directly (Backend.__init__ already accepts ingest_callback + **kwargs). - Build + attach ingest_callback AFTER deps are assigned; drop the _callback_ready flag, its early-call guard, and the stop() reset. - Extract _execute_run(client, produce_entities, *, source) shared by the scheduled run() and the ingest callback. The entity producer is invoked lazily INSIDE the run's try (after create_run), so a crashing eager backend is still recorded as a FAILED run rather than vanishing. - Map the callback's unexpected-exception catch-all to base IngestError (not the retry-friendly IngestUnavailable). IngestUnavailable is retained in worker.exceptions (published contract consumed downstream). Co-Authored-By: Claude Opus 4.8 --- worker/tests/policy/test_runner.py | 151 ++++------------------ worker/worker/backend.py | 3 +- worker/worker/policy/runner.py | 199 +++++++++++------------------ 3 files changed, 100 insertions(+), 253 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index 774555cd..6af988dd 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -9,7 +9,7 @@ from netboxlabs.diode.sdk.diode.v1 import ingester_pb2 from worker.backend import Backend -from worker.exceptions import IngestRejected, IngestUnavailable +from worker.exceptions import IngestError, IngestRejected from worker.models import Config, DiodeConfig, Metadata, Policy, Status from worker.policy.run import RunStatus, RunStore from worker.policy.runner import PolicyRunner @@ -118,8 +118,8 @@ def mock_backend(): def _extract_callback(mock_backend_class): - """Recover the ingest_callback closure that PolicyRunner.setup passed to backend_class().""" - return mock_backend_class.call_args.kwargs["ingest_callback"] + """Recover the ingest_callback closure that PolicyRunner.setup attached to the backend.""" + return mock_backend_class.return_value.ingest_callback def test_initial_status(policy_runner): @@ -353,6 +353,16 @@ def test_run_backend_exception( mock_diode_client.ingest.assert_not_called() # Client ingestion should not be called assert "Policy test_policy: Backend error" in caplog.text + # Regression guard: a crashing scheduled backend must still be recorded as a + # FAILED run — the run is created before the backend executes. + mock_run_store.create_run.assert_called_once() + failed_updates = [ + c + for c in mock_run_store.update_run.call_args_list + if c.kwargs.get("status") == RunStatus.FAILED + ] + assert failed_updates, "backend crash must record a FAILED run" + def test_stop_policy_runner(policy_runner): """Test stopping the PolicyRunner.""" @@ -551,7 +561,7 @@ def test_run_with_multiple_chunks( assert mock_diode_client.ingest.call_count == 2 # Verify log messages for successful ingestion - assert "Successfully ingested 10 entities in 2 chunks" in caplog.text + assert "Successfully ingested 10 entities" in caplog.text def test_run_chunk_ingestion_error( @@ -605,7 +615,7 @@ def test_run_chunk_ingestion_error( # --------------------------------------------------------------------------- -def test_setup_passes_kwargs_to_backend( +def test_setup_constructs_backend_directly_and_attaches_callback( policy_runner, sample_policy, sample_diode_config, @@ -613,56 +623,17 @@ def test_setup_passes_kwargs_to_backend( mock_diode_client, mock_run_store, ): - """setup() constructs the backend with ingest_callback= kwarg (no policy= per ADR-0008).""" + """setup() constructs the backend with no args and attaches ingest_callback after deps are ready.""" with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" ): policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) mock_backend_class = mock_load_class.return_value - call_kwargs = mock_backend_class.call_args.kwargs - assert "ingest_callback" in call_kwargs - assert callable(call_kwargs["ingest_callback"]) - assert "policy" not in call_kwargs - - -@pytest.mark.parametrize( - "init_signature, expects_ingest_callback", - [ - pytest.param( - "def __init__(self): self.ingest_callback = None", - False, - id="legacy-zero-arg", - ), - pytest.param( - "def __init__(self, **kwargs): self.ingest_callback = kwargs.get('ingest_callback')", - True, - id="kwargs-absorber", - ), - pytest.param( - "def __init__(self, *, ingest_callback=None, **kwargs): self.ingest_callback = ingest_callback", - True, - id="named-kwarg", - ), - ], -) -def test_construct_backend_introspection(init_signature, expects_ingest_callback): - """_construct_backend only passes ingest_callback when the class accepts it.""" - from worker.policy.runner import _construct_backend - - namespace: dict = {} - exec( # noqa: S102 — synthesizing tiny class fixture under test - "class _Stub:\n" - " " + init_signature + "\n", - namespace, - ) - stub_class = namespace["_Stub"] - sentinel = object() - instance = _construct_backend(stub_class, ingest_callback=sentinel) - if expects_ingest_callback: - assert instance.ingest_callback is sentinel - else: - assert instance.ingest_callback is None + assert mock_backend_class.call_args.args == () + assert mock_backend_class.call_args.kwargs == {} + backend = mock_backend_class.return_value + assert callable(backend.ingest_callback) def test_ingest_callback_entities_happy_path( @@ -760,7 +731,7 @@ def test_ingest_callback_requires_exactly_one_of_entities_or_error( callback(**kwargs) -def test_ingest_callback_translates_transport_errors_to_unavailable( +def test_ingest_callback_translates_transport_errors_to_ingest_error( policy_runner, sample_policy, sample_diode_config, @@ -768,7 +739,7 @@ def test_ingest_callback_translates_transport_errors_to_unavailable( mock_diode_client, mock_run_store, ): - """Non-IngestError transport exceptions are wrapped as IngestUnavailable.""" + """Non-IngestError transport exceptions are wrapped as the base IngestError.""" with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" ): @@ -784,7 +755,7 @@ def test_ingest_callback_translates_transport_errors_to_unavailable( with patch("worker.policy.runner.estimate_message_size", return_value=1024), patch( "worker.policy.runner.apply_run_id_to_entities" ): - with pytest.raises(IngestUnavailable): + with pytest.raises(IngestError): callback(entities=[entity]) mock_run_store.update_run.assert_called_once() @@ -886,33 +857,6 @@ def test_run_unaffected_by_callback( assert update_kwargs["status"] == RunStatus.COMPLETED -def test_ingest_callback_raises_when_not_ready( - policy_runner, - sample_policy, - sample_diode_config, - mock_run_store, - mock_load_class, - mock_diode_client, -): - """Calling the callback before _callback_ready is True raises IngestUnavailable.""" - with patch.object(policy_runner.scheduler, "start"), patch.object( - policy_runner.scheduler, "add_job" - ): - policy_runner.setup("policy-x", sample_diode_config, sample_policy, mock_run_store) - - callback = _extract_callback(mock_load_class.return_value) - - # Simulate "called before worker finished constructing" — clear the readiness flag. - policy_runner._callback_ready = False - - entity = MagicMock() - with pytest.raises(IngestUnavailable, match="before the worker finished constructing"): - callback(entities=[entity]) - - # No pseudo-run must have been created (guard fires before create_run). - mock_run_store.create_run.assert_not_called() - - def test_ingest_callback_records_failure_on_apply_run_id_error( policy_runner, sample_policy, @@ -921,7 +865,7 @@ def test_ingest_callback_records_failure_on_apply_run_id_error( mock_diode_client, mock_run_store, ): - """apply_run_id_to_entities failure inside try: records FAILED run as IngestUnavailable.""" + """apply_run_id_to_entities failure inside try: records FAILED run as IngestError.""" with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" ): @@ -936,7 +880,7 @@ def test_ingest_callback_records_failure_on_apply_run_id_error( "worker.policy.runner.apply_run_id_to_entities", side_effect=RuntimeError("entity corrupt"), ), patch("worker.policy.runner.estimate_message_size", return_value=1024): - with pytest.raises(IngestUnavailable): + with pytest.raises(IngestError): callback(entities=[entity]) mock_run_store.update_run.assert_called_once() @@ -965,7 +909,7 @@ def test_ingest_callback_records_failure_on_iterable_error( bad_iterable = MagicMock() bad_iterable.__iter__ = MagicMock(side_effect=ValueError("bad")) - with pytest.raises(IngestUnavailable): + with pytest.raises(IngestError): callback(entities=bad_iterable) mock_run_store.update_run.assert_called_once() @@ -973,46 +917,3 @@ def test_ingest_callback_records_failure_on_iterable_error( assert update_kwargs["status"] == RunStatus.FAILED # Iterable failed before any entity was materialised. assert update_kwargs["entity_count"] == 0 - - -def test_setup_sets_callback_ready_flag( - policy_runner, - sample_policy, - sample_diode_config, - mock_load_class, - mock_diode_client, - mock_run_store, -): - """After setup() returns, _callback_ready is True.""" - with patch.object(policy_runner.scheduler, "start"), patch.object( - policy_runner.scheduler, "add_job" - ): - policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) - - assert policy_runner._callback_ready is True - - -def test_stop_clears_callback_ready_flag( - policy_runner, - sample_policy, - sample_diode_config, - mock_load_class, - mock_diode_client, - mock_run_store, -): - """After stop(), _callback_ready is False and the callback raises IngestUnavailable.""" - with patch.object(policy_runner.scheduler, "start"), patch.object( - policy_runner.scheduler, "add_job" - ): - policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) - - callback = _extract_callback(mock_load_class.return_value) - - with patch.object(policy_runner.scheduler, "shutdown"): - policy_runner.stop() - - assert policy_runner._callback_ready is False - - entity = MagicMock() - with pytest.raises(IngestUnavailable, match="before the worker finished constructing"): - callback(entities=[entity]) diff --git a/worker/worker/backend.py b/worker/worker/backend.py index f6eec1fa..02a0ccfe 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -34,8 +34,7 @@ def __init__( reports errors outside of the ``run()`` cycle. **Do not invoke from ``__init__`` or ``setup()`` — the callback is only usable starting after the worker finishes constructing - the Backend (i.e. after ``setup()`` returns); calling it - earlier raises ``IngestUnavailable``.** See + the Backend (i.e. after ``setup()`` returns).** See ``worker.exceptions`` for the exception hierarchy it may raise. **kwargs: Forward-compat door for additional resources worker diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index 7083c389..fd591da0 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -2,7 +2,6 @@ # Copyright 2025 NetBox Labs Inc """Orb Worker Policy Runner.""" -import inspect import logging import time from datetime import datetime, timedelta @@ -20,7 +19,7 @@ from worker.backend import Backend, load_class from worker.entity_metadata import apply_run_id_to_entities -from worker.exceptions import IngestError, IngestRejected, IngestUnavailable +from worker.exceptions import IngestError, IngestRejected from worker.metrics import get_metric from worker.models import DiodeConfig, Policy, Status from worker.package_finder import maybe_evict @@ -33,27 +32,6 @@ logger = logging.getLogger(__name__) -def _construct_backend(backend_class, *, ingest_callback): - """ - Construct a Backend, passing ``ingest_callback`` only when accepted. - - Uses ``inspect.signature`` to detect whether ``backend_class.__init__`` - has a matching named parameter or ``VAR_KEYWORD`` absorber. Legacy - backends with ``__init__(self)`` get constructed zero-arg and never - see the kwarg, so the worker can ship this contract bump without a - coordinated upgrade across every integration package. - """ - sig = inspect.signature(backend_class) - params = sig.parameters - accepts_var_kw = any( - p.kind is inspect.Parameter.VAR_KEYWORD for p in params.values() - ) - init_kwargs: dict[str, object] = {} - if accepts_var_kw or "ingest_callback" in params: - init_kwargs["ingest_callback"] = ingest_callback - return backend_class(**init_kwargs) - - class PolicyRunner: """Policy Runner class.""" @@ -66,7 +44,6 @@ def __init__(self): self.scheduler = BackgroundScheduler() self.run_store = None self._diode_client = None - self._callback_ready = False def setup( self, name: str, diode_config: DiodeConfig, policy: Policy, run_store: RunStore @@ -95,11 +72,9 @@ def setup( logger.debug(f"Loading backend class: {policy.config.package}") backend_class = load_class(policy.config.package) - # Build the ingest callback closure. It captures `self` and reads - # `self._diode_client` lazily, so it is safe to construct before the - # client is assigned below. - ingest_callback = self._build_ingest_callback(self.name) - backend = _construct_backend(backend_class, ingest_callback=ingest_callback) + # Construct with no ingest_callback; it is attached below once every + # dependency the closure reads has been assigned. + backend = backend_class() logger.debug(f"Backend class loaded successfully: {backend_class.__name__}") metadata = backend.setup() @@ -137,6 +112,12 @@ def setup( self.run_store = run_store self._diode_client = client + # Every dependency the closure reads (run_store, metadata, _diode_client) + # is now assigned, so the callback is safe to build and attach. The + # consumer reads `backend.ingest_callback` lazily at trigger time, so + # post-construction assignment is correct. + backend.ingest_callback = self._build_ingest_callback(self.name) + self.scheduler.start() if self.policy.config.schedule is not None: @@ -158,11 +139,6 @@ def setup( self.status = Status.RUNNING - # Callback is now safe to invoke — every dependency the closure reads - # (run_store, metadata, _diode_client) is attached. Integrations may push - # entities via ingest_callback from this point onward. - self._callback_ready = True - active_policies = get_metric("active_policies") if active_policies: active_policies.add(1, {"policy": self.name}) @@ -178,7 +154,7 @@ def _build_ingest_callback(self, policy_name: str): On the ``entities`` path: a pseudo-run is created in the RunStore, entities are chunked and ingested via the same path run() uses, and response/transport errors are translated into IngestRejected / - IngestUnavailable. On the ``error`` path: a failed pseudo-run is + IngestError. On the ``error`` path: a failed pseudo-run is recorded; no client.ingest call is made; returns None. """ @@ -193,71 +169,81 @@ def ingest_callback( raise TypeError( "ingest_callback requires exactly one of 'entities' or 'error'" ) - if not self._callback_ready: - raise IngestUnavailable( - "ingest_callback invoked before the worker finished constructing " - "this Backend (likely called from Backend.__init__ or " - "Backend.setup() — defer until after setup() returns)" - ) - run = self.run_store.create_run( - policy_name=policy_name, - metadata={ - "name": self.metadata.name, - "app_name": self.metadata.app_name, - "app_version": self.metadata.app_version, - "source": "ingest_callback", - }, - ) if error is not None: + run = self.run_store.create_run( + policy_name=self.name, + metadata={ + "name": self.metadata.name, + "app_name": self.metadata.app_name, + "app_version": self.metadata.app_version, + "source": "ingest_callback", + }, + ) self.run_store.update_run( - policy_name=policy_name, + policy_name=self.name, run_id=run.id, status=RunStatus.FAILED, error=error, entity_count=0, ) return - entities_list: list = [] try: - entities_list = list(entities) - apply_run_id_to_entities(entities_list, run.id) - metadata = { - "policy_name": policy_name, - "worker_backend": self.metadata.name, - "run_id": run.id, - } - self._send_entities(self._diode_client, entities_list, metadata) - except IngestError as exc: - self.run_store.update_run( - policy_name=policy_name, - run_id=run.id, - status=RunStatus.FAILED, - error=exc, - entity_count=len(entities_list), + self._execute_run( + self._diode_client, lambda: entities, source="ingest_callback" ) + except IngestError: raise except Exception as exc: - logger.exception( - "Unexpected exception in ingest_callback; " - "translating to IngestUnavailable" - ) - self.run_store.update_run( - policy_name=policy_name, - run_id=run.id, - status=RunStatus.FAILED, - error=exc, - entity_count=len(entities_list), - ) - raise IngestUnavailable(str(exc)) from exc + raise IngestError(str(exc)) from exc + + return ingest_callback + + def _execute_run(self, client, produce_entities, *, source: str | None = None) -> int: + """ + Create a run, produce + ingest entities through it, record COMPLETED/FAILED. + + ``produce_entities`` is a zero-arg callable invoked INSIDE the run's + try-block — after ``create_run`` — so a failure while producing the + entities (e.g. the backend's ``run()`` raising) is still recorded as a + FAILED run rather than vanishing before the run is created. Re-raises on + failure. + """ + run_metadata = { + "name": self.metadata.name, + "app_name": self.metadata.app_name, + "app_version": self.metadata.app_version, + } + if source is not None: + run_metadata["source"] = source + run = self.run_store.create_run(policy_name=self.name, metadata=run_metadata) + entity_count = 0 + try: + entities_list = list(produce_entities()) + entity_count = len(entities_list) + apply_run_id_to_entities(entities_list, run.id) + metadata = { + "policy_name": self.name, + "worker_backend": self.metadata.name, + "run_id": run.id, + } + self._send_entities(client, entities_list, metadata) self.run_store.update_run( - policy_name=policy_name, + policy_name=self.name, run_id=run.id, status=RunStatus.COMPLETED, error=None, - entity_count=len(entities_list), + entity_count=entity_count, ) - - return ingest_callback + return entity_count + except Exception as exc: + self.run_store.update_run( + policy_name=self.name, + run_id=run.id, + status=RunStatus.FAILED, + error=exc, + entity_count=entity_count, + ) + raise def _send_entities(self, client, entities_list: list, metadata: dict) -> int: """ @@ -298,45 +284,16 @@ def run( if policy_executions: policy_executions.add(1, {"policy": self.name}) - # CREATE RUN AT START with metadata from backend setup - run_metadata = { - "name": self.metadata.name, - "app_name": self.metadata.app_name, - "app_version": self.metadata.app_version, - } - run = self.run_store.create_run( - policy_name=self.name, - metadata=run_metadata, - ) - exec_start_time = time.perf_counter() - entity_count = 0 try: logger.debug(f"Policy {self.name}: Starting backend execution") - entities = list(backend.run(self.name, policy)) + entity_count = self._execute_run( + client, lambda: backend.run(self.name, policy), source=None + ) elapsed = time.perf_counter() - exec_start_time logger.debug(f"Policy {self.name}: Backend execution completed in {elapsed:.3f} seconds") - entity_count = len(entities) - - apply_run_id_to_entities(entities, run.id) - - metadata = { - "policy_name": self.name, - "worker_backend": self.metadata.name, - "run_id": run.id, - } - chunk_num = self._send_entities(client, entities, metadata) logger.info( - f"Policy {self.name}: Successfully ingested {entity_count} entities in {chunk_num} chunks" - ) - - # UPDATE RUN ON SUCCESS - self.run_store.update_run( - policy_name=self.name, - run_id=run.id, - status=RunStatus.COMPLETED, - error=None, - entity_count=entity_count, + f"Policy {self.name}: Successfully ingested {entity_count} entities" ) run_success = get_metric("backend_execution_success") @@ -353,15 +310,6 @@ def run( except Exception as e: logger.error(f"Policy {self.name}: {e}") - # UPDATE RUN ON FAILURE - self.run_store.update_run( - policy_name=self.name, - run_id=run.id, - status=RunStatus.FAILED, - error=e, - entity_count=entity_count, - ) - run_failure = get_metric("backend_execution_failure") if run_failure: run_failure.add( @@ -389,7 +337,6 @@ def run( def stop(self): """Stop the policy runner.""" - self._callback_ready = False self.scheduler.shutdown(wait=False) self.status = Status.FINISHED active_policies = get_metric("active_policies") From 9c673f0109af40f4ab298f8f424ae43022e28755 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Mon, 1 Jun 2026 11:27:14 +0200 Subject: [PATCH 07/12] fix(worker): address Copilot review on #432 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _build_ingest_callback: drop the unused policy_name parameter (it was always self.name; internals already use self.name), removing the attribution inconsistency the reviewer flagged. - backend.py: docstring now states the worker constructs with no args and assigns ingest_callback afterwards (matching PolicyRunner.setup). - exceptions.py: broaden the module docstring — the hierarchy is raised by worker ingestion in general (scheduled run() path + ingest callback), not just the callback. - test_runner.py: fix the multi-chunk test comment to match the scenario (both chunks sent; the second chunk's response carries errors). Co-Authored-By: Claude Opus 4.8 --- worker/tests/policy/test_runner.py | 3 ++- worker/worker/backend.py | 9 +++++---- worker/worker/exceptions.py | 7 ++++++- worker/worker/policy/runner.py | 4 ++-- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index 6af988dd..f3ec86be 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -603,7 +603,8 @@ def test_run_chunk_ingestion_error( with caplog.at_level("ERROR"): policy_runner.run(mock_diode_client, mock_backend, sample_policy) - # Should call ingest once and fail on first chunk error (it raises IngestRejected immediately) + # Both chunks are sent; the second chunk's response carries errors, raising + # IngestRejected — so ingest is called twice. assert mock_diode_client.ingest.call_count == 2 # Should log the error diff --git a/worker/worker/backend.py b/worker/worker/backend.py index 02a0ccfe..96b61c1f 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -23,10 +23,11 @@ def __init__( """ Construct the Backend. - Worker passes ``ingest_callback`` at construction starting with the - minor release this docstring ships in. Older worker versions - construct ``Backend()`` with zero args; integrations that override - ``__init__`` should accept ``**kwargs`` so both paths keep working. + The worker constructs the Backend with no arguments and assigns + ``ingest_callback`` afterwards, once its dependencies are ready — so + ``__init__`` does not receive it from the current worker. The + parameter and ``**kwargs`` stay accepted for forward-compatibility + and for integrations that pass them directly. Args: ---- diff --git a/worker/worker/exceptions.py b/worker/worker/exceptions.py index 71bb4eb9..0e6525ed 100644 --- a/worker/worker/exceptions.py +++ b/worker/worker/exceptions.py @@ -1,6 +1,11 @@ #!/usr/bin/env python # Copyright 2026 NetBox Labs Inc -"""Exception hierarchy raised by the worker ingest callback.""" +""" +Exception hierarchy raised by worker ingestion. + +Covers both the scheduled run() path (via PolicyRunner._send_entities) +and the ingest callback. +""" class IngestError(Exception): diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index fd591da0..a2b8079a 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -116,7 +116,7 @@ def setup( # is now assigned, so the callback is safe to build and attach. The # consumer reads `backend.ingest_callback` lazily at trigger time, so # post-construction assignment is correct. - backend.ingest_callback = self._build_ingest_callback(self.name) + backend.ingest_callback = self._build_ingest_callback() self.scheduler.start() @@ -143,7 +143,7 @@ def setup( if active_policies: active_policies.add(1, {"policy": self.name}) - def _build_ingest_callback(self, policy_name: str): + def _build_ingest_callback(self): """ Build a closure used to ingest entities outside the scheduled run() cycle. From 50d3493840918bb98863998732795817dfc39b80 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Tue, 2 Jun 2026 12:57:24 +0200 Subject: [PATCH 08/12] feat(worker): add Backend.describe() metadata accessor; construct backend once with ingest_callback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backends can expose their identity via a `describe()` classmethod, so the worker reads metadata (name/app_name/app_version) WITHOUT constructing an instance, builds the ingest callback, and constructs the backend once with the callback already in hand — removing the post-construction attach step. Backends that only implement the instance `setup()` are still supported: the worker falls back to a throwaway instance to read their metadata, then builds the real callback-bearing instance. Co-Authored-By: Claude Opus 4.8 --- worker/tests/nbl-custom/nbl_custom/impl.py | 5 ++ worker/tests/policy/test_runner.py | 53 +++++++++++++++++++--- worker/tests/test_backend.py | 8 ++++ worker/worker/backend.py | 13 ++++++ worker/worker/policy/runner.py | 31 +++++++++---- 5 files changed, 93 insertions(+), 17 deletions(-) diff --git a/worker/tests/nbl-custom/nbl_custom/impl.py b/worker/tests/nbl-custom/nbl_custom/impl.py index 5ff919c2..ccb6621b 100644 --- a/worker/tests/nbl-custom/nbl_custom/impl.py +++ b/worker/tests/nbl-custom/nbl_custom/impl.py @@ -40,6 +40,11 @@ class ScopeMap(BaseModel): class MockBackend(Backend): """Mock backend class.""" + @classmethod + def describe(cls) -> Metadata: + """Mock describe method (no-instance metadata accessor).""" + return Metadata(name="mock_custom", app_name="mock_app", app_version="1.0.0") + def setup(self) -> Metadata: """Mock setup method.""" return Metadata(name="mock_custom", app_name="mock_app", app_version="1.0.0") diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index f3ec86be..653858ef 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -78,6 +78,11 @@ def mock_load_class(): with patch("worker.policy.runner.load_class") as mock_load: mock_backend_class = MagicMock(spec=Backend) mock_backend_class.__name__ = "MockBackend" + # New path: the runner reads metadata via the class-level describe() + # before constructing the backend, so it must return real Metadata. + mock_backend_class.describe.return_value = Metadata( + name="mock_backend", app_name="mock_app", app_version="1.0.0" + ) mock_load.return_value = mock_backend_class yield mock_load @@ -118,8 +123,8 @@ def mock_backend(): def _extract_callback(mock_backend_class): - """Recover the ingest_callback closure that PolicyRunner.setup attached to the backend.""" - return mock_backend_class.return_value.ingest_callback + """Recover the ingest_callback closure PolicyRunner.setup passed at construction.""" + return mock_backend_class.call_args.kwargs["ingest_callback"] def test_initial_status(policy_runner): @@ -616,7 +621,7 @@ def test_run_chunk_ingestion_error( # --------------------------------------------------------------------------- -def test_setup_constructs_backend_directly_and_attaches_callback( +def test_setup_reads_metadata_via_describe_and_constructs_once_with_callback( policy_runner, sample_policy, sample_diode_config, @@ -624,17 +629,51 @@ def test_setup_constructs_backend_directly_and_attaches_callback( mock_diode_client, mock_run_store, ): - """setup() constructs the backend with no args and attaches ingest_callback after deps are ready.""" + """setup() reads metadata via describe() then constructs the backend ONCE with ingest_callback.""" with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" ): policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) mock_backend_class = mock_load_class.return_value + # New path: metadata read off the class, not a constructed instance. + mock_backend_class.describe.assert_called_once_with() + mock_backend_class.setup.assert_not_called() + # Constructed exactly once, with the prebuilt ingest_callback passed in. + mock_backend_class.assert_called_once() assert mock_backend_class.call_args.args == () - assert mock_backend_class.call_args.kwargs == {} - backend = mock_backend_class.return_value - assert callable(backend.ingest_callback) + assert set(mock_backend_class.call_args.kwargs) == {"ingest_callback"} + assert callable(mock_backend_class.call_args.kwargs["ingest_callback"]) + + +def test_setup_falls_back_to_setup_when_describe_not_implemented( + policy_runner, + sample_policy, + sample_diode_config, + mock_load_class, + mock_diode_client, + mock_run_store, +): + """Legacy backend (describe() raises) → metadata read via a throwaway setup() instance.""" + mock_backend_class = mock_load_class.return_value + # A bare MagicMock.describe() returns a Mock and would not raise, so we must + # force the fallback explicitly. + mock_backend_class.describe.side_effect = NotImplementedError + mock_backend_class.return_value.setup.return_value = Metadata( + name="legacy_backend", app_name="legacy_app", app_version="2.0.0" + ) + + with patch.object(policy_runner.scheduler, "start"), patch.object( + policy_runner.scheduler, "add_job" + ): + policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + + # Throwaway instance's setup() was used to read metadata; identity flows through. + mock_backend_class.return_value.setup.assert_called_once_with() + assert policy_runner.metadata.name == "legacy_backend" + # The real, callback-bearing instance is still constructed with the callback. + assert set(mock_backend_class.call_args.kwargs) == {"ingest_callback"} + assert callable(mock_backend_class.call_args.kwargs["ingest_callback"]) def test_ingest_callback_entities_happy_path( diff --git a/worker/tests/test_backend.py b/worker/tests/test_backend.py index 827a2801..467a7045 100644 --- a/worker/tests/test_backend.py +++ b/worker/tests/test_backend.py @@ -17,6 +17,14 @@ def mock_import_module(): yield mock_import +def test_backend_describe_not_implemented(): + """Test that Backend.describe raises NotImplementedError.""" + with pytest.raises( + NotImplementedError, match="The 'describe' classmethod must be implemented." + ): + Backend.describe() + + def test_backend_setup_not_implemented(): """Test that Backend.setup raises NotImplementedError.""" backend = Backend() diff --git a/worker/worker/backend.py b/worker/worker/backend.py index 96b61c1f..4655b951 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -44,6 +44,19 @@ def __init__( """ self.ingest_callback = ingest_callback + @classmethod + def describe(cls) -> Metadata: + """ + Return the backend's metadata without constructing an instance. + + Preferred over setup(): lets the worker read the backend's identity + (name/app_name/app_version) before constructing it, so the ingest + callback can be built and passed at construction time. Integrations + that only implement the instance setup() are still supported — the + worker falls back to a throwaway instance to read their metadata. + """ + raise NotImplementedError("The 'describe' classmethod must be implemented.") + def setup(self) -> Metadata: """ Set up the backend. diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index a2b8079a..c65fc596 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -21,7 +21,7 @@ from worker.entity_metadata import apply_run_id_to_entities from worker.exceptions import IngestError, IngestRejected from worker.metrics import get_metric -from worker.models import DiodeConfig, Policy, Status +from worker.models import DiodeConfig, Metadata, Policy, Status from worker.package_finder import maybe_evict from worker.policy.run import RunStatus, RunStore @@ -71,13 +71,11 @@ def setup( # Debug logging for backend loading logger.debug(f"Loading backend class: {policy.config.package}") backend_class = load_class(policy.config.package) - - # Construct with no ingest_callback; it is attached below once every - # dependency the closure reads has been assigned. - backend = backend_class() logger.debug(f"Backend class loaded successfully: {backend_class.__name__}") - metadata = backend.setup() + # Read the backend's identity WITHOUT committing to an instance, so the + # ingest callback can be built and passed at construction time below. + metadata = self._backend_metadata(backend_class) app_name = ( f"{diode_config.prefix}/{metadata.app_name}" if diode_config.prefix @@ -113,10 +111,9 @@ def setup( self._diode_client = client # Every dependency the closure reads (run_store, metadata, _diode_client) - # is now assigned, so the callback is safe to build and attach. The - # consumer reads `backend.ingest_callback` lazily at trigger time, so - # post-construction assignment is correct. - backend.ingest_callback = self._build_ingest_callback() + # is now assigned, so the callback is built first and the backend is + # constructed ONCE with it — no post-construction attach step. + backend = backend_class(ingest_callback=self._build_ingest_callback()) self.scheduler.start() @@ -143,6 +140,20 @@ def setup( if active_policies: active_policies.add(1, {"policy": self.name}) + def _backend_metadata(self, backend_class) -> Metadata: + """ + Read the backend's metadata without committing to an instance. + + Prefer the class-level describe(); fall back to a throwaway instance's + setup() for integrations that only implement the legacy instance method. + """ + try: + return backend_class.describe() + except NotImplementedError: + # Legacy backend: construct a throwaway just to read its metadata. The + # real, callback-bearing instance is constructed below. + return backend_class().setup() + def _build_ingest_callback(self): """ Build a closure used to ingest entities outside the scheduled run() cycle. From a23e70bcc1314efb3344b826f01a70c72802e448 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Wed, 3 Jun 2026 16:22:30 +0200 Subject: [PATCH 09/12] docs(worker): Backend.__init__ docstring matches describe()-first construction Co-Authored-By: Claude Opus 4.8 --- worker/worker/backend.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/worker/worker/backend.py b/worker/worker/backend.py index 4655b951..eaac6eb1 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -23,19 +23,17 @@ def __init__( """ Construct the Backend. - The worker constructs the Backend with no arguments and assigns - ``ingest_callback`` afterwards, once its dependencies are ready — so - ``__init__`` does not receive it from the current worker. The - parameter and ``**kwargs`` stay accepted for forward-compatibility - and for integrations that pass them directly. + The worker reads the backend's metadata first (via the ``describe()`` + classmethod, or a throwaway legacy ``setup()`` instance that receives + no callback), then constructs the instance it will run with + ``ingest_callback`` passed here. Every dependency the callback uses + is ready by that point, so the callback is usable as soon as the + instance exists — just do not invoke it from ``__init__`` itself. Args: ---- ingest_callback: Optional callable that ingests entities or - reports errors outside of the ``run()`` cycle. **Do not - invoke from ``__init__`` or ``setup()`` — the callback is - only usable starting after the worker finishes constructing - the Backend (i.e. after ``setup()`` returns).** See + reports errors outside of the ``run()`` cycle. See ``worker.exceptions`` for the exception hierarchy it may raise. **kwargs: Forward-compat door for additional resources worker From f8c3d4e54c9792011b84ae98aa5eb20d834fe59d Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Wed, 3 Jun 2026 16:55:23 +0200 Subject: [PATCH 10/12] refactor(worker): delegate chunking to the SDK; drop MAX_INGEST_MESSAGE_BYTES _send_entities now always calls create_message_chunks, which owns the gRPC message-size threshold (max_chunk_size_mb=3.0, a safe margin below the 4 MB ceiling) and returns a single chunk when the payload already fits. This removes the worker's duplicate of that constant and the estimate_message_size pre-check. Co-Authored-By: Claude Opus 4.8 --- worker/tests/policy/test_runner.py | 43 +++++++++--------------------- worker/worker/policy/runner.py | 29 ++++++++------------ 2 files changed, 23 insertions(+), 49 deletions(-) diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index 653858ef..a67342e7 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -319,17 +319,14 @@ def test_run_ingestion_errors( # Simulate ingestion errors mock_diode_client.ingest.return_value.errors = ["error1", "error2"] - # Mock estimate_message_size to return small size (no chunking) - with patch("worker.policy.runner.estimate_message_size", return_value=1024 * 1024): - # Call the run method - with caplog.at_level("ERROR"): - policy_runner.run(mock_diode_client, mock_backend, sample_policy) + with caplog.at_level("ERROR"): + policy_runner.run(mock_diode_client, mock_backend, sample_policy) # Assertions mock_backend.run.assert_called_once_with(policy_runner.name, sample_policy) mock_diode_client.ingest.assert_called_once() assert ( - "Policy test_policy: Entities ingestion failed: ['error1', 'error2']" + "Policy test_policy: Chunk ingestion failed: ['error1', 'error2']" in caplog.text ) @@ -512,11 +509,8 @@ def test_run_with_small_entities_no_chunking( mock_backend.run.return_value = entities mock_diode_client.ingest.return_value.errors = [] - # Mock estimate_message_size to return small size (under 3.0 MB) - with patch( - "worker.policy.runner.estimate_message_size", return_value=1024 * 1024 - ): # 1MB - policy_runner.run(mock_diode_client, mock_backend, sample_policy) + # Small real entities fit in a single chunk, so the SDK returns one chunk. + policy_runner.run(mock_diode_client, mock_backend, sample_policy) # Should call ingest once (no chunking) mock_diode_client.ingest.assert_called_once() @@ -548,10 +542,8 @@ def test_run_with_multiple_chunks( mock_backend.run.return_value = entities mock_diode_client.ingest.return_value.errors = [] - # Mock estimate_message_size to return large size (over 3.0 MB) and create_message_chunks + # Force the SDK to split into two chunks. with patch( - "worker.policy.runner.estimate_message_size", return_value=5 * 1024 * 1024 - ), patch( "worker.policy.runner.create_message_chunks", return_value=[entities[:5], entities[5:]], ) as mock_chunks: @@ -597,10 +589,8 @@ def test_run_chunk_ingestion_error( mock_diode_client.ingest.side_effect = responses - # Mock large size to trigger chunking and create_message_chunks + # Force two chunks; the second one fails. with patch( - "worker.policy.runner.estimate_message_size", return_value=5 * 1024 * 1024 - ), patch( "worker.policy.runner.create_message_chunks", return_value=[entities[:3], entities[3:]], ): @@ -700,9 +690,7 @@ def test_ingest_callback_entities_happy_path( entity2 = ingester_pb2.Entity() entity2.device.name = "dev2" - with patch("worker.policy.runner.apply_run_id_to_entities"), patch( - "worker.policy.runner.estimate_message_size", return_value=1024 - ), patch("worker.policy.runner.create_message_chunks"): + with patch("worker.policy.runner.apply_run_id_to_entities"): result = callback(entities=[entity1, entity2]) assert result is None @@ -792,9 +780,7 @@ def test_ingest_callback_translates_transport_errors_to_ingest_error( entity = ingester_pb2.Entity() entity.device.name = "dev1" - with patch("worker.policy.runner.estimate_message_size", return_value=1024), patch( - "worker.policy.runner.apply_run_id_to_entities" - ): + with patch("worker.policy.runner.apply_run_id_to_entities"): with pytest.raises(IngestError): callback(entities=[entity]) @@ -824,9 +810,7 @@ def test_ingest_callback_translates_response_errors_to_rejected( entity = ingester_pb2.Entity() entity.device.name = "dev1" - with patch("worker.policy.runner.estimate_message_size", return_value=1024), patch( - "worker.policy.runner.apply_run_id_to_entities" - ): + with patch("worker.policy.runner.apply_run_id_to_entities"): with pytest.raises(IngestRejected): callback(entities=[entity]) @@ -861,8 +845,6 @@ def test_ingest_callback_chunks_large_payloads( chunk_b = [entity2] with patch( - "worker.policy.runner.estimate_message_size", return_value=4 * 1024 * 1024 - ), patch( "worker.policy.runner.create_message_chunks", return_value=[chunk_a, chunk_b] ), patch( "worker.policy.runner.apply_run_id_to_entities" @@ -887,8 +869,7 @@ def test_run_unaffected_by_callback( mock_backend.run.return_value = [entity] mock_diode_client.ingest.return_value.errors = [] - with patch("worker.policy.runner.estimate_message_size", return_value=512): - policy_runner.run(mock_diode_client, mock_backend, sample_policy) + policy_runner.run(mock_diode_client, mock_backend, sample_policy) mock_backend.run.assert_called_once_with("test_policy", sample_policy) mock_diode_client.ingest.assert_called_once() @@ -919,7 +900,7 @@ def test_ingest_callback_records_failure_on_apply_run_id_error( with patch( "worker.policy.runner.apply_run_id_to_entities", side_effect=RuntimeError("entity corrupt"), - ), patch("worker.policy.runner.estimate_message_size", return_value=1024): + ): with pytest.raises(IngestError): callback(entities=[entity]) diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index c65fc596..d01688f6 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -14,7 +14,6 @@ DiodeDryRunClient, DiodeOTLPClient, create_message_chunks, - estimate_message_size, ) from worker.backend import Backend, load_class @@ -25,10 +24,6 @@ from worker.package_finder import maybe_evict from worker.policy.run import RunStatus, RunStore -# Diode message-size cap (per chunk). Stays in sync with the reconciler's -# 4 MiB gRPC ceiling minus a safety margin. -MAX_INGEST_MESSAGE_BYTES = 3 * 1024 * 1024 - logger = logging.getLogger(__name__) @@ -258,22 +253,20 @@ def _execute_run(self, client, produce_entities, *, source: str | None = None) - def _send_entities(self, client, entities_list: list, metadata: dict) -> int: """ - Send entities to the Diode client, chunking if the payload exceeds MAX_INGEST_MESSAGE_BYTES. + Send entities to the Diode client. + + Delegates chunking to the SDK's ``create_message_chunks``, which owns + the gRPC message-size threshold (3 MB default, a safe margin below the + 4 MB ceiling) and returns a single chunk when the payload already fits. Returns the number of chunks actually sent (1 if not chunked). """ - size_bytes = estimate_message_size(entities_list) - if size_bytes > MAX_INGEST_MESSAGE_BYTES: - chunks = create_message_chunks(entities_list) - for chunk in chunks: - response = client.ingest(entities=chunk, metadata=metadata) - if response.errors: - raise IngestRejected(f"Chunk ingestion failed: {response.errors}") - return len(chunks) - response = client.ingest(entities=entities_list, metadata=metadata) - if response.errors: - raise IngestRejected(f"Entities ingestion failed: {response.errors}") - return 1 + chunks = create_message_chunks(entities_list) + for chunk in chunks: + response = client.ingest(entities=chunk, metadata=metadata) + if response.errors: + raise IngestRejected(f"Chunk ingestion failed: {response.errors}") + return len(chunks) def run( self, From 0328934e0feff640af95752ca610671f5cee681d Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Wed, 3 Jun 2026 20:23:32 +0200 Subject: [PATCH 11/12] feat(worker): formally deprecate Backend.setup() in favour of describe() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - @deprecated (PEP 702, typing_extensions) on Backend.setup() so IDEs and type checkers flag uses; calling the base method also warns at runtime. - Backend.__init_subclass__ emits a DeprecationWarning at class-definition time when a subclass overrides setup() without implementing describe() — the signal integration authors see in their own test runs. - PolicyRunner._backend_metadata logs an operator-facing warning when the legacy throwaway-setup() fallback is used, naming the backend class. - typing-extensions declared as a runtime dependency. The fallback (and these warnings) are scheduled for removal in worker v2.0. Co-Authored-By: Claude Opus 4.8 --- worker/pyproject.toml | 1 + worker/tests/policy/test_runner.py | 5 ++++- worker/tests/test_backend.py | 32 +++++++++++++++++++++++++++++- worker/worker/backend.py | 28 ++++++++++++++++++++++++++ worker/worker/policy/runner.py | 6 ++++++ 5 files changed, 70 insertions(+), 2 deletions(-) diff --git a/worker/pyproject.toml b/worker/pyproject.toml index d1f68de8..b1b63829 100644 --- a/worker/pyproject.toml +++ b/worker/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "pydantic~=2.9", "uvicorn~=0.32", "PyYAML~=6.0", + "typing-extensions~=4.5", "opentelemetry-api~=1.32", "opentelemetry-sdk~=1.32", "opentelemetry-exporter-otlp~=1.32", diff --git a/worker/tests/policy/test_runner.py b/worker/tests/policy/test_runner.py index a67342e7..6dee264c 100644 --- a/worker/tests/policy/test_runner.py +++ b/worker/tests/policy/test_runner.py @@ -643,6 +643,7 @@ def test_setup_falls_back_to_setup_when_describe_not_implemented( mock_load_class, mock_diode_client, mock_run_store, + caplog, ): """Legacy backend (describe() raises) → metadata read via a throwaway setup() instance.""" mock_backend_class = mock_load_class.return_value @@ -655,9 +656,11 @@ def test_setup_falls_back_to_setup_when_describe_not_implemented( with patch.object(policy_runner.scheduler, "start"), patch.object( policy_runner.scheduler, "add_job" - ): + ), caplog.at_level("WARNING"): policy_runner.setup("policy1", sample_diode_config, sample_policy, mock_run_store) + # The deprecation is surfaced to operators at the fallback site. + assert "deprecated setup() fallback" in caplog.text # Throwaway instance's setup() was used to read metadata; identity flows through. mock_backend_class.return_value.setup.assert_called_once_with() assert policy_runner.metadata.name == "legacy_backend" diff --git a/worker/tests/test_backend.py b/worker/tests/test_backend.py index 467a7045..8f0f6f27 100644 --- a/worker/tests/test_backend.py +++ b/worker/tests/test_backend.py @@ -2,12 +2,13 @@ # Copyright 2025 NetBox Labs Inc """NetBox Labs - Backend Unit Tests.""" +import warnings from unittest.mock import MagicMock, patch import pytest from worker.backend import Backend, load_class -from worker.models import Policy +from worker.models import Metadata, Policy @pytest.fixture @@ -128,3 +129,32 @@ def test_backend_run_accepts_kwargs(): b.run("policy", MagicMock(spec=Policy), future_kwarg="x") except NotImplementedError: pass + + +def test_subclass_overriding_setup_without_describe_warns_deprecated(): + """Defining a setup()-only subclass emits a DeprecationWarning at class creation.""" + with pytest.warns(DeprecationWarning, match="implement the describe"): + + class LegacySetupBackend(Backend): + def setup(self) -> Metadata: + return Metadata(name="legacy", app_name="legacy", app_version="0.0.0") + + +def test_subclass_with_describe_defines_without_warning(): + """A describe()-implementing subclass is clean, even if it also keeps setup().""" + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + + class ModernBackend(Backend): + @classmethod + def describe(cls) -> Metadata: + return Metadata(name="modern", app_name="modern", app_version="0.0.0") + + def setup(self) -> Metadata: + return self.describe() + + +def test_base_setup_call_emits_runtime_deprecation(): + """Calling the base setup() directly warns (PEP 702 runtime) and still raises.""" + with pytest.warns(DeprecationWarning), pytest.raises(NotImplementedError): + Backend().setup() diff --git a/worker/worker/backend.py b/worker/worker/backend.py index eaac6eb1..aee2128b 100644 --- a/worker/worker/backend.py +++ b/worker/worker/backend.py @@ -4,9 +4,11 @@ import importlib import inspect +import warnings from collections.abc import Iterable from netboxlabs.diode.sdk.ingester import Entity +from typing_extensions import deprecated from worker.models import Metadata, Policy @@ -42,6 +44,22 @@ def __init__( """ self.ingest_callback = ingest_callback + def __init_subclass__(cls, **kwargs) -> None: + """Warn once, at class-definition time, when a subclass still relies on setup().""" + super().__init_subclass__(**kwargs) + overrides_setup = any( + "setup" in klass.__dict__ for klass in cls.__mro__[:-1] if klass is not Backend + ) + has_describe = cls.describe.__func__ is not Backend.describe.__func__ + if overrides_setup and not has_describe: + warnings.warn( + f"{cls.__module__}.{cls.__qualname__} overrides Backend.setup(), which is " + "deprecated — implement the describe() classmethod instead " + "(the setup() fallback will be removed in worker v2.0).", + DeprecationWarning, + stacklevel=2, + ) + @classmethod def describe(cls) -> Metadata: """ @@ -55,10 +73,20 @@ def describe(cls) -> Metadata: """ raise NotImplementedError("The 'describe' classmethod must be implemented.") + @deprecated( + "Implement the describe() classmethod instead; " + "the setup() fallback will be removed in worker v2.0." + ) def setup(self) -> Metadata: """ Set up the backend. + .. deprecated:: + Implement the :meth:`describe` classmethod instead. The worker reads + metadata via ``describe()`` and only falls back to a throwaway + instance's ``setup()`` for legacy backends; that fallback is + scheduled for removal in worker v2.0. + Returns ------- Metadata: The metadata for the backend. diff --git a/worker/worker/policy/runner.py b/worker/worker/policy/runner.py index d01688f6..e3ff4cdf 100644 --- a/worker/worker/policy/runner.py +++ b/worker/worker/policy/runner.py @@ -147,6 +147,12 @@ def _backend_metadata(self, backend_class) -> Metadata: except NotImplementedError: # Legacy backend: construct a throwaway just to read its metadata. The # real, callback-bearing instance is constructed below. + logger.warning( + "%s does not implement describe(); reading metadata via the " + "deprecated setup() fallback (scheduled for removal in worker " + "v2.0) — implement the describe() classmethod.", + backend_class.__name__, + ) return backend_class().setup() def _build_ingest_callback(self): From 51a3d79909cc06a2528c9b6c970246b78bfee577 Mon Sep 17 00:00:00 2001 From: Lukasz Drozdz Date: Mon, 8 Jun 2026 09:43:55 +0200 Subject: [PATCH 12/12] chore(worker): raise source version placeholder so editable installs resolve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Source `project.version` was `1.0.0`. CI overwrites it at release time (worker-release.yaml runs `toml set --toml-path pyproject.toml project.version` and stamps `worker/version.py` separately), so the published wheel is unaffected. The placeholder only matters for **editable / local installs from source** — e.g. controller-integrations' `e2e_tests --use orb-agent --local-orb-worker`, which `pip install -e`s this checkout. pip reads `project.version` from source (`1.0.0`) and resolves it against backend-util's pin `netboxlabs-orb-worker>=1.3.0`; `1.0.0 < 1.3.0` triggers `ResolutionImpossible` and the install fails. Bumping the placeholder to `1.99.0.dev0` satisfies any reasonable `>=X.Y` floor (PEP 440 ordering: `1.99.0.dev0 >= 1.3.0` is true) while remaining an obvious not-a-real-release marker. The build still stamps the true release version on top of it, so nothing downstream of a real build changes. --- worker/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/pyproject.toml b/worker/pyproject.toml index b1b63829..870a2be4 100644 --- a/worker/pyproject.toml +++ b/worker/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "netboxlabs-orb-worker" -version = "1.0.0" # Overwritten during the build process +version = "1.99.0.dev0" # Placeholder, overwritten by CI before build (worker-release.yaml: `toml set project.version`). Kept above all sibling pins so editable / pip-install-from-source flows resolve. description = "NetBox Labs, Worker backend for Orb Agent" readme = "README.md" requires-python = ">=3.10"