From 6176df770890d1414d621ba921c0e4acdccdc595 Mon Sep 17 00:00:00 2001 From: Morgan Baron Date: Thu, 21 May 2026 14:55:50 -0400 Subject: [PATCH 1/4] feat(reconciler): add WithAutoApplyPolicy + ApplyChangeSetForLog for per-record auto-apply (OBS-2988) --- .../reconciler/ingestion_log_processor.go | 45 ++++- .../ingestion_log_processor_autoapply_test.go | 174 ++++++++++++++++++ .../reconciler/ingestion_processor.go | 4 + .../reconciler/mocks/ingestionprocessorops.go | 51 +++++ diode-server/reconciler/ops.go | 22 +++ 5 files changed, 294 insertions(+), 2 deletions(-) create mode 100644 diode-server/reconciler/ingestion_log_processor_autoapply_test.go diff --git a/diode-server/reconciler/ingestion_log_processor.go b/diode-server/reconciler/ingestion_log_processor.go index 806be37b..062358df 100644 --- a/diode-server/reconciler/ingestion_log_processor.go +++ b/diode-server/reconciler/ingestion_log_processor.go @@ -8,10 +8,33 @@ import ( "go.opentelemetry.io/otel/attribute" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/telemetry" ) +// AutoApplyPolicy decides per-record whether an already-planned change set +// should bypass the OPEN review queue and be applied immediately. Returns +// true to apply, false to leave the row in OPEN for manual review. +type AutoApplyPolicy func(log *reconcilerpb.IngestionLog, cs *changeset.ChangeSet) bool + +// IngestionLogProcessorOption is a functional option for IngestionLogProcessor. +type IngestionLogProcessorOption func(*IngestionLogProcessor) + +// WithAutoApplyPolicy attaches a per-record auto-apply policy. When set, +// matching records are applied immediately rather than left in OPEN. +// +// Layers on top of the cfg.AutoApplyChangesets default: it only widens +// auto-apply for matching records, never restricts. The boot-time decision +// between IngestionLogProcessor (plan-only) and AutoApplyProcessor (plan + +// apply) is unchanged. +func WithAutoApplyPolicy(policy AutoApplyPolicy) IngestionLogProcessorOption { + return func(p *IngestionLogProcessor) { + p.autoApplyPolicy = policy + } +} + const ( defaultIngestionLogPollInterval = 100 * time.Millisecond defaultIngestionLogIdleInterval = time.Second @@ -55,6 +78,8 @@ type IngestionLogProcessor struct { metrics Metrics backpressure BackpressureFunc + autoApplyPolicy AutoApplyPolicy + // mx protects the lifecycle fields below: workCancel, pollCancel, done. // Set by Start, read by Stop/shutdown/watchParent. mx sync.Mutex @@ -66,12 +91,12 @@ type IngestionLogProcessor struct { } // NewIngestionLogProcessor creates a new ingestion log processor. -func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, ops IngestionProcessorOps, metrics Metrics, backpressure BackpressureFunc) *IngestionLogProcessor { +func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, ops IngestionProcessorOps, metrics Metrics, backpressure BackpressureFunc, opts ...IngestionLogProcessorOption) *IngestionLogProcessor { batchSize := cfg.IngestionLogProcessorBatchSize if batchSize <= 0 { batchSize = defaultIngestionLogBatchSize } - return &IngestionLogProcessor{ + p := &IngestionLogProcessor{ config: cfg, logger: logger, ops: ops, @@ -80,6 +105,10 @@ func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, backpressure: backpressure, batchSize: batchSize, } + for _, opt := range opts { + opt(p) + } + return p } // Name returns the name of the component. @@ -258,5 +287,17 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu if result.ChangeSet != nil { p.metrics.RecordChangeSetCreate(metricsCtx, true, int64(len(result.ChangeSet.Changes))) } + + if p.autoApplyPolicy != nil && result.ChangeSet != nil && + p.autoApplyPolicy(item.IngestionLog, result.ChangeSet) { + changes := int64(len(result.ChangeSet.Changes)) + if err := p.ops.ApplyChangeSetForLog(ctx, item, result.ChangeSet, branchID); err != nil { + p.logger.Error("auto-apply policy matched but apply failed", + "error", err, "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + } else { + p.metrics.RecordChangeSetApply(metricsCtx, true, changes) + } + } } } diff --git a/diode-server/reconciler/ingestion_log_processor_autoapply_test.go b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go new file mode 100644 index 00000000..10d0ea63 --- /dev/null +++ b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go @@ -0,0 +1,174 @@ +package reconciler_test + +import ( + "context" + "log/slog" + "os" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" +) + +func newAutoApplyTestLog() *reconcilerpb.IngestionLog { + return &reconcilerpb.IngestionLog{ + DataType: "extras.customfield", + ObjectType: "extras.customfield", + State: reconcilerpb.State_QUEUED, + Entity: &diodepb.Entity{}, + SdkName: "test-sdk", + ProducerAppName: "orb-pro-credentials-bootstrap", + } +} + +func newAutoApplyTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +// TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem verifies that +// when WithAutoApplyPolicy is set and the policy returns true for a record, +// the processor calls ApplyChangeSetForLog and records a successful apply +// metric — bypassing the OPEN review queue. +func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { + repo := mocks.NewRepository(t) + mockOps := mocks.NewIngestionProcessorOps(t) + mockMetrics := mocks.NewMetrics(t) + + log := newAutoApplyTestLog() + batch := []ops.QueuedIngestionLog{{ID: 1, IngestionLog: log}} + cs := &changeset.ChangeSet{ + Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeCreate, ObjectType: "extras.customfield"}}, + } + + claimed := make(chan struct{}, 1) + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return(batch, nil).Once().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + mockOps.On("DefaultBranch", mock.Anything).Return(nil, nil).Maybe() + mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{ + {IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs}, + }).Once() + mockOps.On("ApplyChangeSetForLog", mock.Anything, batch[0], cs, "").Return(nil).Once() + + mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once() + mockMetrics.On("RecordChangeSetApply", mock.Anything, true, int64(1)).Once() + + policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool { + return true + }) + + p := reconciler.NewIngestionLogProcessor( + newAutoApplyTestLogger(), + reconciler.Config{}, + repo, + mockOps, + mockMetrics, + nil, + reconciler.WithAutoApplyPolicy(policy), + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + select { + case <-claimed: + case <-time.After(5 * time.Second): + t.Fatal("batch was never claimed") + } + + time.Sleep(200 * time.Millisecond) + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit") + } +} + +// TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply verifies that +// when the policy returns false for a record, the processor does NOT call +// ApplyChangeSetForLog — leaving the row in OPEN for review, as it would +// without any policy. +func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) { + repo := mocks.NewRepository(t) + mockOps := mocks.NewIngestionProcessorOps(t) + mockMetrics := mocks.NewMetrics(t) + + log := newAutoApplyTestLog() + batch := []ops.QueuedIngestionLog{{ID: 1, IngestionLog: log}} + cs := &changeset.ChangeSet{ + Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeUpdate, ObjectType: "extras.customfield"}}, + } + + claimed := make(chan struct{}, 1) + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return(batch, nil).Once().Run(func(_ mock.Arguments) { + select { + case claimed <- struct{}{}: + default: + } + }) + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + mockOps.On("DefaultBranch", mock.Anything).Return(nil, nil).Maybe() + mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{ + {IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs}, + }).Once() + + mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once() + + policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool { + return false + }) + + p := reconciler.NewIngestionLogProcessor( + newAutoApplyTestLogger(), + reconciler.Config{}, + repo, + mockOps, + mockMetrics, + nil, + reconciler.WithAutoApplyPolicy(policy), + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + select { + case <-claimed: + case <-time.After(5 * time.Second): + t.Fatal("batch was never claimed") + } + + time.Sleep(200 * time.Millisecond) + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit") + } + + mockOps.AssertNotCalled(t, "ApplyChangeSetForLog") +} diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index d3df36a2..f1acdb5c 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -26,6 +26,7 @@ import ( "github.com/netboxlabs/diode/diode-server/gen/netbox" "github.com/netboxlabs/diode/diode-server/graph" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/sentry" "github.com/netboxlabs/diode/diode-server/telemetry" @@ -81,6 +82,9 @@ type IngestionProcessorOps interface { BulkCreateIngestionLogs(ctx context.Context, ingestionLogs []*reconcilerpb.IngestionLog, sourceMetadata [][]byte, entityHashes []string) ([]*ops.CreateIngestionLogResult, error) BulkPlan(ctx context.Context, items []ops.QueuedIngestionLog, branchID string) []ops.BulkGenerateChangeSetResult BulkPlanApply(ctx context.Context, items []ops.QueuedIngestionLog, branchID string) []ops.BulkPlanApplyResult + // ApplyChangeSetForLog applies an already-planned change set for a single + // ingestion log, transitioning it through APPLYING -> APPLIED. + ApplyChangeSetForLog(ctx context.Context, item ops.QueuedIngestionLog, cs *changeset.ChangeSet, branchID string) error DefaultBranch(ctx context.Context) (*netboxdiodeplugin.Branch, error) RefreshDefaultBranch(ctx context.Context) (*netboxdiodeplugin.Branch, error) } diff --git a/diode-server/reconciler/mocks/ingestionprocessorops.go b/diode-server/reconciler/mocks/ingestionprocessorops.go index 9b1b613a..8fa5fbff 100644 --- a/diode-server/reconciler/mocks/ingestionprocessorops.go +++ b/diode-server/reconciler/mocks/ingestionprocessorops.go @@ -10,6 +10,8 @@ import ( ops "github.com/netboxlabs/diode/diode-server/reconciler/ops" + changeset "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + reconcilerpb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" ) @@ -363,6 +365,55 @@ func (_c *IngestionProcessorOps_RefreshDefaultBranch_Call) RunAndReturn(run func return _c } +// ApplyChangeSetForLog provides a mock function with given fields: ctx, item, cs, branchID +func (_m *IngestionProcessorOps) ApplyChangeSetForLog(ctx context.Context, item ops.QueuedIngestionLog, cs *changeset.ChangeSet, branchID string) error { + ret := _m.Called(ctx, item, cs, branchID) + + if len(ret) == 0 { + panic("no return value specified for ApplyChangeSetForLog") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, ops.QueuedIngestionLog, *changeset.ChangeSet, string) error); ok { + r0 = rf(ctx, item, cs, branchID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IngestionProcessorOps_ApplyChangeSetForLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ApplyChangeSetForLog' +type IngestionProcessorOps_ApplyChangeSetForLog_Call struct { + *mock.Call +} + +// ApplyChangeSetForLog is a helper method to define mock.On call +// - ctx context.Context +// - item ops.QueuedIngestionLog +// - cs *changeset.ChangeSet +// - branchID string +func (_e *IngestionProcessorOps_Expecter) ApplyChangeSetForLog(ctx interface{}, item interface{}, cs interface{}, branchID interface{}) *IngestionProcessorOps_ApplyChangeSetForLog_Call { + return &IngestionProcessorOps_ApplyChangeSetForLog_Call{Call: _e.mock.On("ApplyChangeSetForLog", ctx, item, cs, branchID)} +} + +func (_c *IngestionProcessorOps_ApplyChangeSetForLog_Call) Run(run func(ctx context.Context, item ops.QueuedIngestionLog, cs *changeset.ChangeSet, branchID string)) *IngestionProcessorOps_ApplyChangeSetForLog_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(ops.QueuedIngestionLog), args[2].(*changeset.ChangeSet), args[3].(string)) + }) + return _c +} + +func (_c *IngestionProcessorOps_ApplyChangeSetForLog_Call) Return(_a0 error) *IngestionProcessorOps_ApplyChangeSetForLog_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IngestionProcessorOps_ApplyChangeSetForLog_Call) RunAndReturn(run func(context.Context, ops.QueuedIngestionLog, *changeset.ChangeSet, string) error) *IngestionProcessorOps_ApplyChangeSetForLog_Call { + _c.Call.Return(run) + return _c +} + // NewIngestionProcessorOps creates a new instance of IngestionProcessorOps. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewIngestionProcessorOps(t interface { diff --git a/diode-server/reconciler/ops.go b/diode-server/reconciler/ops.go index 0f9c167a..d1faa4ae 100644 --- a/diode-server/reconciler/ops.go +++ b/diode-server/reconciler/ops.go @@ -593,6 +593,28 @@ func (o *Ops) BulkPlanApply(ctx context.Context, items []ops.QueuedIngestionLog, return results } +// ApplyChangeSetForLog applies a single already-planned ingestion log, +// transitioning it OPEN -> APPLIED. +// +// Delegates to BulkPlanApply with a single-item batch, which re-plans before +// applying — one wasted HTTP round-trip per matched record. Acceptable for +// current volume; a dedicated apply-only NetBox endpoint can replace the +// re-plan later. The cs parameter is ignored as a result. +func (o *Ops) ApplyChangeSetForLog(ctx context.Context, item ops.QueuedIngestionLog, _ *changeset.ChangeSet, branchID string) error { + results := o.BulkPlanApply(ctx, []ops.QueuedIngestionLog{item}, branchID) + if len(results) == 0 { + return fmt.Errorf("no result returned for ingestion log %d", item.ID) + } + r := results[0] + if r.PlanErr != nil { + return r.PlanErr + } + if r.ApplyErr != nil { + return r.ApplyErr + } + return nil +} + // persistPlanApplyFailurePlaceholder records a plan-phase failure as a // failure-placeholder change_set + ingestion log state=FAILED with the error // detail. Mirrors handleGenerateChangeSetFailure in the plan-only flow so From 54b49cabddb715a654cf3ea3a4b214b375acca58 Mon Sep 17 00:00:00 2001 From: Morgan Baron Date: Wed, 3 Jun 2026 12:07:38 -0400 Subject: [PATCH 2/4] address review feedback on PR #539 --- .../reconciler/ingestion_log_processor.go | 21 ++++++-- .../ingestion_log_processor_autoapply_test.go | 13 ++--- .../reconciler/ingestion_processor.go | 4 -- .../reconciler/mocks/ingestionprocessorops.go | 51 ------------------- diode-server/reconciler/ops.go | 22 -------- 5 files changed, 23 insertions(+), 88 deletions(-) diff --git a/diode-server/reconciler/ingestion_log_processor.go b/diode-server/reconciler/ingestion_log_processor.go index 062358df..3105f6d9 100644 --- a/diode-server/reconciler/ingestion_log_processor.go +++ b/diode-server/reconciler/ingestion_log_processor.go @@ -290,13 +290,24 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu if p.autoApplyPolicy != nil && result.ChangeSet != nil && p.autoApplyPolicy(item.IngestionLog, result.ChangeSet) { - changes := int64(len(result.ChangeSet.Changes)) - if err := p.ops.ApplyChangeSetForLog(ctx, item, result.ChangeSet, branchID); err != nil { + applyResults := p.ops.BulkPlanApply(ctx, []ops.QueuedIngestionLog{item}, branchID) + if len(applyResults) == 0 { + p.logger.Error("auto-apply policy matched but no result returned", "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + continue + } + applyResult := applyResults[0] + switch { + case applyResult.PlanErr != nil: + p.logger.Error("auto-apply policy matched but re-plan failed", + "error", applyResult.PlanErr, "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + case applyResult.ApplyErr != nil: p.logger.Error("auto-apply policy matched but apply failed", - "error", err, "ingestionLogID", item.ID) + "error", applyResult.ApplyErr, "ingestionLogID", item.ID) p.metrics.RecordChangeSetApply(metricsCtx, false, 0) - } else { - p.metrics.RecordChangeSetApply(metricsCtx, true, changes) + case applyResult.ChangeSet != nil && len(applyResult.ChangeSet.Changes) > 0: + p.metrics.RecordChangeSetApply(metricsCtx, true, int64(len(applyResult.ChangeSet.Changes))) } } } diff --git a/diode-server/reconciler/ingestion_log_processor_autoapply_test.go b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go index 10d0ea63..44c9fb1f 100644 --- a/diode-server/reconciler/ingestion_log_processor_autoapply_test.go +++ b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go @@ -35,8 +35,8 @@ func newAutoApplyTestLogger() *slog.Logger { // TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem verifies that // when WithAutoApplyPolicy is set and the policy returns true for a record, -// the processor calls ApplyChangeSetForLog and records a successful apply -// metric — bypassing the OPEN review queue. +// the processor calls BulkPlanApply for that item and records a successful +// apply metric — bypassing the OPEN review queue. func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { repo := mocks.NewRepository(t) mockOps := mocks.NewIngestionProcessorOps(t) @@ -63,7 +63,8 @@ func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{ {IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs}, }).Once() - mockOps.On("ApplyChangeSetForLog", mock.Anything, batch[0], cs, "").Return(nil).Once() + mockOps.On("BulkPlanApply", mock.Anything, []ops.QueuedIngestionLog{batch[0]}, ""). + Return([]ops.BulkPlanApplyResult{{IngestionLogID: 1, ChangeSet: cs}}).Once() mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once() mockMetrics.On("RecordChangeSetApply", mock.Anything, true, int64(1)).Once() @@ -105,8 +106,8 @@ func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { // TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply verifies that // when the policy returns false for a record, the processor does NOT call -// ApplyChangeSetForLog — leaving the row in OPEN for review, as it would -// without any policy. +// BulkPlanApply — leaving the row in OPEN for review, as it would without +// any policy. func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) { repo := mocks.NewRepository(t) mockOps := mocks.NewIngestionProcessorOps(t) @@ -170,5 +171,5 @@ func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) t.Fatal("processor did not exit") } - mockOps.AssertNotCalled(t, "ApplyChangeSetForLog") + mockOps.AssertNotCalled(t, "BulkPlanApply") } diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index f1acdb5c..d3df36a2 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -26,7 +26,6 @@ import ( "github.com/netboxlabs/diode/diode-server/gen/netbox" "github.com/netboxlabs/diode/diode-server/graph" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" - "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/sentry" "github.com/netboxlabs/diode/diode-server/telemetry" @@ -82,9 +81,6 @@ type IngestionProcessorOps interface { BulkCreateIngestionLogs(ctx context.Context, ingestionLogs []*reconcilerpb.IngestionLog, sourceMetadata [][]byte, entityHashes []string) ([]*ops.CreateIngestionLogResult, error) BulkPlan(ctx context.Context, items []ops.QueuedIngestionLog, branchID string) []ops.BulkGenerateChangeSetResult BulkPlanApply(ctx context.Context, items []ops.QueuedIngestionLog, branchID string) []ops.BulkPlanApplyResult - // ApplyChangeSetForLog applies an already-planned change set for a single - // ingestion log, transitioning it through APPLYING -> APPLIED. - ApplyChangeSetForLog(ctx context.Context, item ops.QueuedIngestionLog, cs *changeset.ChangeSet, branchID string) error DefaultBranch(ctx context.Context) (*netboxdiodeplugin.Branch, error) RefreshDefaultBranch(ctx context.Context) (*netboxdiodeplugin.Branch, error) } diff --git a/diode-server/reconciler/mocks/ingestionprocessorops.go b/diode-server/reconciler/mocks/ingestionprocessorops.go index 8fa5fbff..9b1b613a 100644 --- a/diode-server/reconciler/mocks/ingestionprocessorops.go +++ b/diode-server/reconciler/mocks/ingestionprocessorops.go @@ -10,8 +10,6 @@ import ( ops "github.com/netboxlabs/diode/diode-server/reconciler/ops" - changeset "github.com/netboxlabs/diode/diode-server/reconciler/changeset" - reconcilerpb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" ) @@ -365,55 +363,6 @@ func (_c *IngestionProcessorOps_RefreshDefaultBranch_Call) RunAndReturn(run func return _c } -// ApplyChangeSetForLog provides a mock function with given fields: ctx, item, cs, branchID -func (_m *IngestionProcessorOps) ApplyChangeSetForLog(ctx context.Context, item ops.QueuedIngestionLog, cs *changeset.ChangeSet, branchID string) error { - ret := _m.Called(ctx, item, cs, branchID) - - if len(ret) == 0 { - panic("no return value specified for ApplyChangeSetForLog") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, ops.QueuedIngestionLog, *changeset.ChangeSet, string) error); ok { - r0 = rf(ctx, item, cs, branchID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// IngestionProcessorOps_ApplyChangeSetForLog_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ApplyChangeSetForLog' -type IngestionProcessorOps_ApplyChangeSetForLog_Call struct { - *mock.Call -} - -// ApplyChangeSetForLog is a helper method to define mock.On call -// - ctx context.Context -// - item ops.QueuedIngestionLog -// - cs *changeset.ChangeSet -// - branchID string -func (_e *IngestionProcessorOps_Expecter) ApplyChangeSetForLog(ctx interface{}, item interface{}, cs interface{}, branchID interface{}) *IngestionProcessorOps_ApplyChangeSetForLog_Call { - return &IngestionProcessorOps_ApplyChangeSetForLog_Call{Call: _e.mock.On("ApplyChangeSetForLog", ctx, item, cs, branchID)} -} - -func (_c *IngestionProcessorOps_ApplyChangeSetForLog_Call) Run(run func(ctx context.Context, item ops.QueuedIngestionLog, cs *changeset.ChangeSet, branchID string)) *IngestionProcessorOps_ApplyChangeSetForLog_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(ops.QueuedIngestionLog), args[2].(*changeset.ChangeSet), args[3].(string)) - }) - return _c -} - -func (_c *IngestionProcessorOps_ApplyChangeSetForLog_Call) Return(_a0 error) *IngestionProcessorOps_ApplyChangeSetForLog_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *IngestionProcessorOps_ApplyChangeSetForLog_Call) RunAndReturn(run func(context.Context, ops.QueuedIngestionLog, *changeset.ChangeSet, string) error) *IngestionProcessorOps_ApplyChangeSetForLog_Call { - _c.Call.Return(run) - return _c -} - // NewIngestionProcessorOps creates a new instance of IngestionProcessorOps. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewIngestionProcessorOps(t interface { diff --git a/diode-server/reconciler/ops.go b/diode-server/reconciler/ops.go index d1faa4ae..0f9c167a 100644 --- a/diode-server/reconciler/ops.go +++ b/diode-server/reconciler/ops.go @@ -593,28 +593,6 @@ func (o *Ops) BulkPlanApply(ctx context.Context, items []ops.QueuedIngestionLog, return results } -// ApplyChangeSetForLog applies a single already-planned ingestion log, -// transitioning it OPEN -> APPLIED. -// -// Delegates to BulkPlanApply with a single-item batch, which re-plans before -// applying — one wasted HTTP round-trip per matched record. Acceptable for -// current volume; a dedicated apply-only NetBox endpoint can replace the -// re-plan later. The cs parameter is ignored as a result. -func (o *Ops) ApplyChangeSetForLog(ctx context.Context, item ops.QueuedIngestionLog, _ *changeset.ChangeSet, branchID string) error { - results := o.BulkPlanApply(ctx, []ops.QueuedIngestionLog{item}, branchID) - if len(results) == 0 { - return fmt.Errorf("no result returned for ingestion log %d", item.ID) - } - r := results[0] - if r.PlanErr != nil { - return r.PlanErr - } - if r.ApplyErr != nil { - return r.ApplyErr - } - return nil -} - // persistPlanApplyFailurePlaceholder records a plan-phase failure as a // failure-placeholder change_set + ingestion log state=FAILED with the error // detail. Mirrors handleGenerateChangeSetFailure in the plan-only flow so From e7dfce250b0a7ede8a74c55d9738aa70b330e40a Mon Sep 17 00:00:00 2001 From: Morgan Baron Date: Wed, 3 Jun 2026 14:31:21 -0400 Subject: [PATCH 3/4] batch auto-apply BulkPlanApply calls and document re-plan divergence --- .../reconciler/ingestion_log_processor.go | 63 +++++++++++-------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/diode-server/reconciler/ingestion_log_processor.go b/diode-server/reconciler/ingestion_log_processor.go index 3105f6d9..c6fc3315 100644 --- a/diode-server/reconciler/ingestion_log_processor.go +++ b/diode-server/reconciler/ingestion_log_processor.go @@ -270,14 +270,10 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu results := p.ops.BulkPlan(ctx, batch, branchID) + var matched []ops.QueuedIngestionLog for i, result := range results { item := batch[i] - - attrs := []attribute.KeyValue{ - attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()), - attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()), - } - metricsCtx := telemetry.ContextWithMetricAttributes(ctx, attrs...) + metricsCtx := p.metricsContext(ctx, item) if result.Err != nil { p.logger.Error("error generating changeset", "error", result.Err, "ingestionLogID", item.ID) @@ -290,25 +286,42 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu if p.autoApplyPolicy != nil && result.ChangeSet != nil && p.autoApplyPolicy(item.IngestionLog, result.ChangeSet) { - applyResults := p.ops.BulkPlanApply(ctx, []ops.QueuedIngestionLog{item}, branchID) - if len(applyResults) == 0 { - p.logger.Error("auto-apply policy matched but no result returned", "ingestionLogID", item.ID) - p.metrics.RecordChangeSetApply(metricsCtx, false, 0) - continue - } - applyResult := applyResults[0] - switch { - case applyResult.PlanErr != nil: - p.logger.Error("auto-apply policy matched but re-plan failed", - "error", applyResult.PlanErr, "ingestionLogID", item.ID) - p.metrics.RecordChangeSetApply(metricsCtx, false, 0) - case applyResult.ApplyErr != nil: - p.logger.Error("auto-apply policy matched but apply failed", - "error", applyResult.ApplyErr, "ingestionLogID", item.ID) - p.metrics.RecordChangeSetApply(metricsCtx, false, 0) - case applyResult.ChangeSet != nil && len(applyResult.ChangeSet.Changes) > 0: - p.metrics.RecordChangeSetApply(metricsCtx, true, int64(len(applyResult.ChangeSet.Changes))) - } + matched = append(matched, item) } } + + if len(matched) == 0 { + return + } + + // Interim limitation: the policy above was evaluated against the BulkPlan + // change sets, but BulkPlanApply re-plans server-side and applies the + // fresh sets in one call — a divergent re-plan can apply changes the + // policy never approved. Accepted until an apply-only endpoint (no + // re-plan) exists in the netbox plugin. + applyResults := p.ops.BulkPlanApply(ctx, matched, branchID) + for i, applyResult := range applyResults { + item := matched[i] + metricsCtx := p.metricsContext(ctx, item) + switch { + case applyResult.PlanErr != nil: + p.logger.Error("auto-apply policy matched but re-plan failed", + "error", applyResult.PlanErr, "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + case applyResult.ApplyErr != nil: + p.logger.Error("auto-apply policy matched but apply failed", + "error", applyResult.ApplyErr, "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + case applyResult.ChangeSet != nil && len(applyResult.ChangeSet.Changes) > 0: + p.metrics.RecordChangeSetApply(metricsCtx, true, int64(len(applyResult.ChangeSet.Changes))) + } + } +} + +// metricsContext returns ctx annotated with the per-record metric attributes. +func (p *IngestionLogProcessor) metricsContext(ctx context.Context, item ops.QueuedIngestionLog) context.Context { + return telemetry.ContextWithMetricAttributes(ctx, + attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()), + attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()), + ) } From 39425ca373475e0fc920d3647a795e62b4ecbd9b Mon Sep 17 00:00:00 2001 From: Morgan Baron Date: Wed, 3 Jun 2026 14:56:08 -0400 Subject: [PATCH 4/4] replace sleep-based test sync with deterministic mock signals --- .../ingestion_log_processor_autoapply_test.go | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/diode-server/reconciler/ingestion_log_processor_autoapply_test.go b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go index 44c9fb1f..d26f6f03 100644 --- a/diode-server/reconciler/ingestion_log_processor_autoapply_test.go +++ b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go @@ -48,14 +48,8 @@ func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeCreate, ObjectType: "extras.customfield"}}, } - claimed := make(chan struct{}, 1) repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). - Return(batch, nil).Once().Run(func(_ mock.Arguments) { - select { - case claimed <- struct{}{}: - default: - } - }) + Return(batch, nil).Once() repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). Return([]ops.QueuedIngestionLog{}, nil).Maybe() @@ -66,8 +60,10 @@ func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { mockOps.On("BulkPlanApply", mock.Anything, []ops.QueuedIngestionLog{batch[0]}, ""). Return([]ops.BulkPlanApplyResult{{IngestionLogID: 1, ChangeSet: cs}}).Once() + applied := make(chan struct{}) mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once() - mockMetrics.On("RecordChangeSetApply", mock.Anything, true, int64(1)).Once() + mockMetrics.On("RecordChangeSetApply", mock.Anything, true, int64(1)).Once(). + Run(func(_ mock.Arguments) { close(applied) }) policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool { return true @@ -88,12 +84,10 @@ func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { go func() { done <- p.Start(ctx) }() select { - case <-claimed: + case <-applied: case <-time.After(5 * time.Second): - t.Fatal("batch was never claimed") + t.Fatal("apply metric was never recorded") } - - time.Sleep(200 * time.Millisecond) cancel() select { @@ -119,14 +113,8 @@ func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeUpdate, ObjectType: "extras.customfield"}}, } - claimed := make(chan struct{}, 1) repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). - Return(batch, nil).Once().Run(func(_ mock.Arguments) { - select { - case claimed <- struct{}{}: - default: - } - }) + Return(batch, nil).Once() repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). Return([]ops.QueuedIngestionLog{}, nil).Maybe() @@ -135,7 +123,9 @@ func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) {IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs}, }).Once() - mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once() + planned := make(chan struct{}) + mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once(). + Run(func(_ mock.Arguments) { close(planned) }) policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool { return false @@ -156,12 +146,10 @@ func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) go func() { done <- p.Start(ctx) }() select { - case <-claimed: + case <-planned: case <-time.After(5 * time.Second): - t.Fatal("batch was never claimed") + t.Fatal("create metric was never recorded") } - - time.Sleep(200 * time.Millisecond) cancel() select {