diff --git a/diode-server/reconciler/ingestion_log_processor.go b/diode-server/reconciler/ingestion_log_processor.go index 806be37b..c6fc3315 100644 --- a/diode-server/reconciler/ingestion_log_processor.go +++ b/diode-server/reconciler/ingestion_log_processor.go @@ -8,10 +8,33 @@ import ( "go.opentelemetry.io/otel/attribute" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" "github.com/netboxlabs/diode/diode-server/reconciler/ops" "github.com/netboxlabs/diode/diode-server/telemetry" ) +// AutoApplyPolicy decides per-record whether an already-planned change set +// should bypass the OPEN review queue and be applied immediately. Returns +// true to apply, false to leave the row in OPEN for manual review. +type AutoApplyPolicy func(log *reconcilerpb.IngestionLog, cs *changeset.ChangeSet) bool + +// IngestionLogProcessorOption is a functional option for IngestionLogProcessor. +type IngestionLogProcessorOption func(*IngestionLogProcessor) + +// WithAutoApplyPolicy attaches a per-record auto-apply policy. When set, +// matching records are applied immediately rather than left in OPEN. +// +// Layers on top of the cfg.AutoApplyChangesets default: it only widens +// auto-apply for matching records, never restricts. The boot-time decision +// between IngestionLogProcessor (plan-only) and AutoApplyProcessor (plan + +// apply) is unchanged. +func WithAutoApplyPolicy(policy AutoApplyPolicy) IngestionLogProcessorOption { + return func(p *IngestionLogProcessor) { + p.autoApplyPolicy = policy + } +} + const ( defaultIngestionLogPollInterval = 100 * time.Millisecond defaultIngestionLogIdleInterval = time.Second @@ -55,6 +78,8 @@ type IngestionLogProcessor struct { metrics Metrics backpressure BackpressureFunc + autoApplyPolicy AutoApplyPolicy + // mx protects the lifecycle fields below: workCancel, pollCancel, done. // Set by Start, read by Stop/shutdown/watchParent. mx sync.Mutex @@ -66,12 +91,12 @@ type IngestionLogProcessor struct { } // NewIngestionLogProcessor creates a new ingestion log processor. -func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, ops IngestionProcessorOps, metrics Metrics, backpressure BackpressureFunc) *IngestionLogProcessor { +func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, ops IngestionProcessorOps, metrics Metrics, backpressure BackpressureFunc, opts ...IngestionLogProcessorOption) *IngestionLogProcessor { batchSize := cfg.IngestionLogProcessorBatchSize if batchSize <= 0 { batchSize = defaultIngestionLogBatchSize } - return &IngestionLogProcessor{ + p := &IngestionLogProcessor{ config: cfg, logger: logger, ops: ops, @@ -80,6 +105,10 @@ func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, backpressure: backpressure, batchSize: batchSize, } + for _, opt := range opts { + opt(p) + } + return p } // Name returns the name of the component. @@ -241,14 +270,10 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu results := p.ops.BulkPlan(ctx, batch, branchID) + var matched []ops.QueuedIngestionLog for i, result := range results { item := batch[i] - - attrs := []attribute.KeyValue{ - attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()), - attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()), - } - metricsCtx := telemetry.ContextWithMetricAttributes(ctx, attrs...) + metricsCtx := p.metricsContext(ctx, item) if result.Err != nil { p.logger.Error("error generating changeset", "error", result.Err, "ingestionLogID", item.ID) @@ -258,5 +283,45 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu if result.ChangeSet != nil { p.metrics.RecordChangeSetCreate(metricsCtx, true, int64(len(result.ChangeSet.Changes))) } + + if p.autoApplyPolicy != nil && result.ChangeSet != nil && + p.autoApplyPolicy(item.IngestionLog, result.ChangeSet) { + matched = append(matched, item) + } + } + + if len(matched) == 0 { + return } + + // Interim limitation: the policy above was evaluated against the BulkPlan + // change sets, but BulkPlanApply re-plans server-side and applies the + // fresh sets in one call — a divergent re-plan can apply changes the + // policy never approved. Accepted until an apply-only endpoint (no + // re-plan) exists in the netbox plugin. + applyResults := p.ops.BulkPlanApply(ctx, matched, branchID) + for i, applyResult := range applyResults { + item := matched[i] + metricsCtx := p.metricsContext(ctx, item) + switch { + case applyResult.PlanErr != nil: + p.logger.Error("auto-apply policy matched but re-plan failed", + "error", applyResult.PlanErr, "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + case applyResult.ApplyErr != nil: + p.logger.Error("auto-apply policy matched but apply failed", + "error", applyResult.ApplyErr, "ingestionLogID", item.ID) + p.metrics.RecordChangeSetApply(metricsCtx, false, 0) + case applyResult.ChangeSet != nil && len(applyResult.ChangeSet.Changes) > 0: + p.metrics.RecordChangeSetApply(metricsCtx, true, int64(len(applyResult.ChangeSet.Changes))) + } + } +} + +// metricsContext returns ctx annotated with the per-record metric attributes. +func (p *IngestionLogProcessor) metricsContext(ctx context.Context, item ops.QueuedIngestionLog) context.Context { + return telemetry.ContextWithMetricAttributes(ctx, + attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()), + attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()), + ) } diff --git a/diode-server/reconciler/ingestion_log_processor_autoapply_test.go b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go new file mode 100644 index 00000000..d26f6f03 --- /dev/null +++ b/diode-server/reconciler/ingestion_log_processor_autoapply_test.go @@ -0,0 +1,163 @@ +package reconciler_test + +import ( + "context" + "log/slog" + "os" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" +) + +func newAutoApplyTestLog() *reconcilerpb.IngestionLog { + return &reconcilerpb.IngestionLog{ + DataType: "extras.customfield", + ObjectType: "extras.customfield", + State: reconcilerpb.State_QUEUED, + Entity: &diodepb.Entity{}, + SdkName: "test-sdk", + ProducerAppName: "orb-pro-credentials-bootstrap", + } +} + +func newAutoApplyTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +// TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem verifies that +// when WithAutoApplyPolicy is set and the policy returns true for a record, +// the processor calls BulkPlanApply for that item and records a successful +// apply metric — bypassing the OPEN review queue. +func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) { + repo := mocks.NewRepository(t) + mockOps := mocks.NewIngestionProcessorOps(t) + mockMetrics := mocks.NewMetrics(t) + + log := newAutoApplyTestLog() + batch := []ops.QueuedIngestionLog{{ID: 1, IngestionLog: log}} + cs := &changeset.ChangeSet{ + Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeCreate, ObjectType: "extras.customfield"}}, + } + + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return(batch, nil).Once() + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + mockOps.On("DefaultBranch", mock.Anything).Return(nil, nil).Maybe() + mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{ + {IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs}, + }).Once() + mockOps.On("BulkPlanApply", mock.Anything, []ops.QueuedIngestionLog{batch[0]}, ""). + Return([]ops.BulkPlanApplyResult{{IngestionLogID: 1, ChangeSet: cs}}).Once() + + applied := make(chan struct{}) + mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once() + mockMetrics.On("RecordChangeSetApply", mock.Anything, true, int64(1)).Once(). + Run(func(_ mock.Arguments) { close(applied) }) + + policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool { + return true + }) + + p := reconciler.NewIngestionLogProcessor( + newAutoApplyTestLogger(), + reconciler.Config{}, + repo, + mockOps, + mockMetrics, + nil, + reconciler.WithAutoApplyPolicy(policy), + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + select { + case <-applied: + case <-time.After(5 * time.Second): + t.Fatal("apply metric was never recorded") + } + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit") + } +} + +// TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply verifies that +// when the policy returns false for a record, the processor does NOT call +// BulkPlanApply — leaving the row in OPEN for review, as it would without +// any policy. +func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) { + repo := mocks.NewRepository(t) + mockOps := mocks.NewIngestionProcessorOps(t) + mockMetrics := mocks.NewMetrics(t) + + log := newAutoApplyTestLog() + batch := []ops.QueuedIngestionLog{{ID: 1, IngestionLog: log}} + cs := &changeset.ChangeSet{ + Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeUpdate, ObjectType: "extras.customfield"}}, + } + + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return(batch, nil).Once() + repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)). + Return([]ops.QueuedIngestionLog{}, nil).Maybe() + + mockOps.On("DefaultBranch", mock.Anything).Return(nil, nil).Maybe() + mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{ + {IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs}, + }).Once() + + planned := make(chan struct{}) + mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once(). + Run(func(_ mock.Arguments) { close(planned) }) + + policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool { + return false + }) + + p := reconciler.NewIngestionLogProcessor( + newAutoApplyTestLogger(), + reconciler.Config{}, + repo, + mockOps, + mockMetrics, + nil, + reconciler.WithAutoApplyPolicy(policy), + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Start(ctx) }() + + select { + case <-planned: + case <-time.After(5 * time.Second): + t.Fatal("create metric was never recorded") + } + cancel() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("processor did not exit") + } + + mockOps.AssertNotCalled(t, "BulkPlanApply") +}