Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 73 additions & 8 deletions diode-server/reconciler/ingestion_log_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,33 @@ import (

"go.opentelemetry.io/otel/attribute"

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb"
"github.com/netboxlabs/diode/diode-server/reconciler/changeset"
"github.com/netboxlabs/diode/diode-server/reconciler/ops"
"github.com/netboxlabs/diode/diode-server/telemetry"
)

// AutoApplyPolicy decides per-record whether an already-planned change set
// should bypass the OPEN review queue and be applied immediately. Returns
// true to apply, false to leave the row in OPEN for manual review.
type AutoApplyPolicy func(log *reconcilerpb.IngestionLog, cs *changeset.ChangeSet) bool

// IngestionLogProcessorOption is a functional option for IngestionLogProcessor.
type IngestionLogProcessorOption func(*IngestionLogProcessor)

// WithAutoApplyPolicy attaches a per-record auto-apply policy. When set,
// matching records are applied immediately rather than left in OPEN.
//
// Layers on top of the cfg.AutoApplyChangesets default: it only widens
// auto-apply for matching records, never restricts. The boot-time decision
// between IngestionLogProcessor (plan-only) and AutoApplyProcessor (plan +
// apply) is unchanged.
func WithAutoApplyPolicy(policy AutoApplyPolicy) IngestionLogProcessorOption {
return func(p *IngestionLogProcessor) {
p.autoApplyPolicy = policy
}
}

const (
defaultIngestionLogPollInterval = 100 * time.Millisecond
defaultIngestionLogIdleInterval = time.Second
Expand Down Expand Up @@ -55,6 +78,8 @@ type IngestionLogProcessor struct {
metrics Metrics
backpressure BackpressureFunc

autoApplyPolicy AutoApplyPolicy

// mx protects the lifecycle fields below: workCancel, pollCancel, done.
// Set by Start, read by Stop/shutdown/watchParent.
mx sync.Mutex
Expand All @@ -66,12 +91,12 @@ type IngestionLogProcessor struct {
}

// NewIngestionLogProcessor creates a new ingestion log processor.
func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, ops IngestionProcessorOps, metrics Metrics, backpressure BackpressureFunc) *IngestionLogProcessor {
func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository, ops IngestionProcessorOps, metrics Metrics, backpressure BackpressureFunc, opts ...IngestionLogProcessorOption) *IngestionLogProcessor {
batchSize := cfg.IngestionLogProcessorBatchSize
if batchSize <= 0 {
batchSize = defaultIngestionLogBatchSize
}
return &IngestionLogProcessor{
p := &IngestionLogProcessor{
config: cfg,
logger: logger,
ops: ops,
Expand All @@ -80,6 +105,10 @@ func NewIngestionLogProcessor(logger *slog.Logger, cfg Config, repo Repository,
backpressure: backpressure,
batchSize: batchSize,
}
for _, opt := range opts {
opt(p)
}
return p
}

// Name returns the name of the component.
Expand Down Expand Up @@ -241,14 +270,10 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu

results := p.ops.BulkPlan(ctx, batch, branchID)

var matched []ops.QueuedIngestionLog
for i, result := range results {
item := batch[i]

attrs := []attribute.KeyValue{
attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()),
attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()),
}
metricsCtx := telemetry.ContextWithMetricAttributes(ctx, attrs...)
metricsCtx := p.metricsContext(ctx, item)

if result.Err != nil {
p.logger.Error("error generating changeset", "error", result.Err, "ingestionLogID", item.ID)
Expand All @@ -258,5 +283,45 @@ func (p *IngestionLogProcessor) processBatch(ctx context.Context, batch []ops.Qu
if result.ChangeSet != nil {
p.metrics.RecordChangeSetCreate(metricsCtx, true, int64(len(result.ChangeSet.Changes)))
}

if p.autoApplyPolicy != nil && result.ChangeSet != nil &&
p.autoApplyPolicy(item.IngestionLog, result.ChangeSet) {
matched = append(matched, item)
}
}

if len(matched) == 0 {
return
}

// Interim limitation: the policy above was evaluated against the BulkPlan
// change sets, but BulkPlanApply re-plans server-side and applies the
// fresh sets in one call — a divergent re-plan can apply changes the
// policy never approved. Accepted until an apply-only endpoint (no
// re-plan) exists in the netbox plugin.
applyResults := p.ops.BulkPlanApply(ctx, matched, branchID)
for i, applyResult := range applyResults {
item := matched[i]
metricsCtx := p.metricsContext(ctx, item)
switch {
case applyResult.PlanErr != nil:
p.logger.Error("auto-apply policy matched but re-plan failed",
"error", applyResult.PlanErr, "ingestionLogID", item.ID)
p.metrics.RecordChangeSetApply(metricsCtx, false, 0)
case applyResult.ApplyErr != nil:
p.logger.Error("auto-apply policy matched but apply failed",
"error", applyResult.ApplyErr, "ingestionLogID", item.ID)
p.metrics.RecordChangeSetApply(metricsCtx, false, 0)
case applyResult.ChangeSet != nil && len(applyResult.ChangeSet.Changes) > 0:
p.metrics.RecordChangeSetApply(metricsCtx, true, int64(len(applyResult.ChangeSet.Changes)))
}
}
}

// metricsContext returns ctx annotated with the per-record metric attributes.
func (p *IngestionLogProcessor) metricsContext(ctx context.Context, item ops.QueuedIngestionLog) context.Context {
return telemetry.ContextWithMetricAttributes(ctx,
attribute.String(telemetry.AttributeSDKName, item.IngestionLog.GetSdkName()),
attribute.String(telemetry.AttributeProducerAppName, item.IngestionLog.GetProducerAppName()),
)
}
163 changes: 163 additions & 0 deletions diode-server/reconciler/ingestion_log_processor_autoapply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package reconciler_test

import (
"context"
"log/slog"
"os"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb"
"github.com/netboxlabs/diode/diode-server/reconciler"
"github.com/netboxlabs/diode/diode-server/reconciler/changeset"
"github.com/netboxlabs/diode/diode-server/reconciler/mocks"
"github.com/netboxlabs/diode/diode-server/reconciler/ops"
)

func newAutoApplyTestLog() *reconcilerpb.IngestionLog {
return &reconcilerpb.IngestionLog{
DataType: "extras.customfield",
ObjectType: "extras.customfield",
State: reconcilerpb.State_QUEUED,
Entity: &diodepb.Entity{},
SdkName: "test-sdk",
ProducerAppName: "orb-pro-credentials-bootstrap",
}
}

func newAutoApplyTestLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
}

// TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem verifies that
// when WithAutoApplyPolicy is set and the policy returns true for a record,
// the processor calls BulkPlanApply for that item and records a successful
// apply metric — bypassing the OPEN review queue.
func TestIngestionLogProcessor_AutoApplyPolicyMatch_AppliesItem(t *testing.T) {
repo := mocks.NewRepository(t)
mockOps := mocks.NewIngestionProcessorOps(t)
mockMetrics := mocks.NewMetrics(t)

log := newAutoApplyTestLog()
batch := []ops.QueuedIngestionLog{{ID: 1, IngestionLog: log}}
cs := &changeset.ChangeSet{
Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeCreate, ObjectType: "extras.customfield"}},
}

repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)).
Return(batch, nil).Once()
repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)).
Return([]ops.QueuedIngestionLog{}, nil).Maybe()

mockOps.On("DefaultBranch", mock.Anything).Return(nil, nil).Maybe()
mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{
{IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs},
}).Once()
mockOps.On("BulkPlanApply", mock.Anything, []ops.QueuedIngestionLog{batch[0]}, "").
Return([]ops.BulkPlanApplyResult{{IngestionLogID: 1, ChangeSet: cs}}).Once()

applied := make(chan struct{})
mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once()
mockMetrics.On("RecordChangeSetApply", mock.Anything, true, int64(1)).Once().
Run(func(_ mock.Arguments) { close(applied) })

policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool {
return true
})

p := reconciler.NewIngestionLogProcessor(
newAutoApplyTestLogger(),
reconciler.Config{},
repo,
mockOps,
mockMetrics,
nil,
reconciler.WithAutoApplyPolicy(policy),
)

ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() { done <- p.Start(ctx) }()

select {
case <-applied:
case <-time.After(5 * time.Second):
t.Fatal("apply metric was never recorded")
}
cancel()

select {
case err := <-done:
require.NoError(t, err)
case <-time.After(5 * time.Second):
t.Fatal("processor did not exit")
}
}

// TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply verifies that
// when the policy returns false for a record, the processor does NOT call
// BulkPlanApply — leaving the row in OPEN for review, as it would without
// any policy.
func TestIngestionLogProcessor_AutoApplyPolicyNoMatch_DoesNotApply(t *testing.T) {
repo := mocks.NewRepository(t)
mockOps := mocks.NewIngestionProcessorOps(t)
mockMetrics := mocks.NewMetrics(t)

log := newAutoApplyTestLog()
batch := []ops.QueuedIngestionLog{{ID: 1, IngestionLog: log}}
cs := &changeset.ChangeSet{
Changes: []changeset.Change{{ChangeType: changeset.ChangeTypeUpdate, ObjectType: "extras.customfield"}},
}

repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)).
Return(batch, nil).Once()
repo.On("ClaimQueuedIngestionLogs", mock.Anything, int32(100)).
Return([]ops.QueuedIngestionLog{}, nil).Maybe()

mockOps.On("DefaultBranch", mock.Anything).Return(nil, nil).Maybe()
mockOps.On("BulkPlan", mock.Anything, batch, "").Return([]ops.BulkGenerateChangeSetResult{
{IngestionLogID: 1, ChangeSetID: int32Ptr(10), ChangeSet: cs},
}).Once()

planned := make(chan struct{})
mockMetrics.On("RecordChangeSetCreate", mock.Anything, true, int64(1)).Once().
Run(func(_ mock.Arguments) { close(planned) })

policy := reconciler.AutoApplyPolicy(func(_ *reconcilerpb.IngestionLog, _ *changeset.ChangeSet) bool {
return false
})

p := reconciler.NewIngestionLogProcessor(
newAutoApplyTestLogger(),
reconciler.Config{},
repo,
mockOps,
mockMetrics,
nil,
reconciler.WithAutoApplyPolicy(policy),
)

ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() { done <- p.Start(ctx) }()

select {
case <-planned:
case <-time.After(5 * time.Second):
t.Fatal("create metric was never recorded")
}
cancel()

select {
case err := <-done:
require.NoError(t, err)
case <-time.After(5 * time.Second):
t.Fatal("processor did not exit")
}

mockOps.AssertNotCalled(t, "BulkPlanApply")
}
Loading