From 01881cf292af414b077a0e0029589c26836f6567 Mon Sep 17 00:00:00 2001 From: Micah Parks <66095735+MicahParks@users.noreply.github.com> Date: Thu, 28 May 2026 18:18:08 -0400 Subject: [PATCH 1/2] feat(reconciler): automatically retry failed ingestion logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When bulk-plan-apply failed, the ingestion log was left in FAILED and was terminal: nothing re-claimed it, and re-ingesting the same entity deduped against it, so the data was never applied and no error surfaced. Auto-apply tenants had no recovery path. With ENABLE_FAILED_RETRY on, a failed apply is parked in PENDING_RETRY with a jittered exponential backoff and re-claimed by the AutoApplyProcessor — in the same pipeline as fresh QUEUED work, FIFO by id so retries are processed in line rather than starved or contending for NetBox throughput — until it applies or exhausts its budget and is retired to terminal ERRORED. Off by default; when off, failures stay terminal FAILED exactly as before. - migration: add retry_count, next_retry_at + partial index on PENDING_RETRY - proto: add PENDING_RETRY state - ClaimQueuedForAutoApply claims QUEUED + due PENDING_RETRY in one FIFO batch - MarkIngestionLogRetry: increment, jittered exponential backoff, or retire to ERRORED - dedup excludes terminal ERRORED so a re-ingest after give-up re-queues - retry gated by RetryPolicy.Enabled, wired from ENABLE_FAILED_RETRY Pro inherits the new repository method via struct embedding, so it compiles on pin-bump with no changes. Co-Authored-By: Claude Opus 4.8 (1M context) --- diode-proto/diode/v1/reconciler.proto | 1 + diode-server/cmd/reconciler/main.go | 4 +- ...20260528000002_add_ingestion_log_retry.sql | 19 +++ .../postgres/queries/ingestion_logs.sql | 38 ++++- diode-server/dbstore/postgres/repository.go | 24 +++ .../dbstore/postgres/ingestion_logs.sql.go | 90 +++++++++-- diode-server/gen/dbstore/postgres/types.go | 8 + .../diode/v1/reconcilerpb/reconciler.pb.go | 8 +- diode-server/reconciler/config.go | 18 +++ diode-server/reconciler/mocks/repository.go | 51 +++++++ diode-server/reconciler/ops.go | 87 ++++++++--- diode-server/reconciler/ops_retry_test.go | 143 ++++++++++++++++++ diode-server/reconciler/repository.go | 4 + docs/diode-proto.md | 1 + 14 files changed, 457 insertions(+), 39 deletions(-) create mode 100644 diode-server/dbstore/postgres/migrations/20260528000002_add_ingestion_log_retry.sql create mode 100644 diode-server/reconciler/ops_retry_test.go diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index 585c0ba0..5eed8d61 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -17,6 +17,7 @@ enum State { IGNORED = 6; ERRORED = 7; APPLYING = 8; + PENDING_RETRY = 9; // failed but eligible for automatic retry; reclaimed by the auto-apply processor once backoff elapses } // Ingestion metrics diff --git a/diode-server/cmd/reconciler/main.go b/diode-server/cmd/reconciler/main.go index aa643e6a..d8c8add0 100644 --- a/diode-server/cmd/reconciler/main.go +++ b/diode-server/cmd/reconciler/main.go @@ -203,7 +203,7 @@ func main() { os.Exit(1) } - ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil) + ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil, reconciler.WithRetryPolicy(cfg.RetryPolicy())) ops.Start(ctx) // Build processor options @@ -245,6 +245,8 @@ func main() { if err := repository.ResetApplyingIngestionLogs(ctx); err != nil { s.Logger().Error("failed to reset applying ingestion logs", "error", err) } + // AutoApplyProcessor also re-drives due PENDING_RETRY rows (gated by + // ENABLE_FAILED_RETRY via the RetryPolicy on ops); inert when retry is off. autoApplyProcessor := reconciler.NewAutoApplyProcessor(s.Logger(), cfg, repository, ops, metricRecorder, backpressure) if err := s.RegisterComponent(autoApplyProcessor); err != nil { s.Logger().Error("failed to register auto-apply processor", "error", err) diff --git a/diode-server/dbstore/postgres/migrations/20260528000002_add_ingestion_log_retry.sql b/diode-server/dbstore/postgres/migrations/20260528000002_add_ingestion_log_retry.sql new file mode 100644 index 00000000..47ecc50f --- /dev/null +++ b/diode-server/dbstore/postgres/migrations/20260528000002_add_ingestion_log_retry.sql @@ -0,0 +1,19 @@ +-- +goose Up + +-- Retry accounting for automatic retry of failed applies. +-- retry_count - failed apply attempts for this row. +-- next_retry_at - earliest re-claim time; NULL means eligible now. +ALTER TABLE ingestion_logs + ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMP WITH TIME ZONE; + +-- Partial index for the due-retry claim predicate (PENDING_RETRY by due time). +CREATE INDEX IF NOT EXISTS idx_ingestion_logs_pending_retry + ON ingestion_logs (next_retry_at) WHERE state = 9; + +-- +goose Down + +DROP INDEX IF EXISTS idx_ingestion_logs_pending_retry; +ALTER TABLE ingestion_logs + DROP COLUMN IF EXISTS next_retry_at, + DROP COLUMN IF EXISTS retry_count; diff --git a/diode-server/dbstore/postgres/queries/ingestion_logs.sql b/diode-server/dbstore/postgres/queries/ingestion_logs.sql index 3aa7588e..4795219c 100644 --- a/diode-server/dbstore/postgres/queries/ingestion_logs.sql +++ b/diode-server/dbstore/postgres/queries/ingestion_logs.sql @@ -55,6 +55,9 @@ LEFT JOIN LATERAL ( ) lcs ON true WHERE il.entity_hash = sqlc.arg('entity_hash') AND lcs.branch_id IS NOT DISTINCT FROM sqlc.narg('branch_id')::text + -- Exclude terminal ERRORED (7) so a re-ingest after the system gives up + -- re-queues instead of deduping against the dead row. + AND il.state IS DISTINCT FROM 7 ORDER BY il.created_at DESC LIMIT 1; @@ -78,6 +81,9 @@ CROSS JOIN LATERAL ( ORDER BY cs.id DESC LIMIT 1 ) IS NOT DISTINCT FROM sqlc.narg('branch_id')::text + -- Exclude terminal ERRORED (7): see FindPriorIngestionLogByEntityHash. + -- Re-ingest after the retrier gives up must re-queue, not dedupe. + AND il2.state IS DISTINCT FROM 7 ORDER BY il2.created_at DESC LIMIT 1 ) il; @@ -120,15 +126,16 @@ WHERE id IN ( RETURNING *; -- name: ClaimQueuedForAutoApply :many --- Claim a batch of QUEUED ingestion logs for the AutoApplyProcessor (combined --- plan+apply via /bulk-plan-apply). Transitions QUEUED (1) -> APPLYING (8). --- A row stays in APPLYING for the duration of the NetBox round-trip and is --- reset back to QUEUED on reconciler startup via ResetApplyingIngestionLogs. +-- Claim a batch for the AutoApplyProcessor: fresh QUEUED (1) plus retry-eligible +-- PENDING_RETRY (9) rows whose backoff has elapsed, transitioned to APPLYING (8). +-- Ordered by id (not fresh-first) so a due retry is processed in line rather than +-- starved behind fresh work when the queue never empties. UPDATE ingestion_logs SET state = 8 WHERE id IN ( SELECT id FROM ingestion_logs WHERE state = 1 + OR (state = 9 AND (next_retry_at IS NULL OR next_retry_at <= NOW())) ORDER BY id LIMIT sqlc.arg('batch_size') FOR UPDATE SKIP LOCKED @@ -141,3 +148,26 @@ RETURNING *; UPDATE ingestion_logs SET state = 1 WHERE state = 8; + +-- name: MarkIngestionLogRetry :exec +-- Record a failed apply: increment retry_count and either re-arm as PENDING_RETRY +-- (9) with a jittered exponential backoff (base*2^n capped at max, ×random[0.5,1) +-- to spread retry herds), or retire to terminal ERRORED (7) once the budget is +-- spent. SET expressions read the pre-update retry_count. +UPDATE ingestion_logs +SET retry_count = retry_count + 1, + state = CASE + WHEN retry_count + 1 >= sqlc.arg('max_retries')::int THEN 7 + ELSE 9 + END, + next_retry_at = CASE + WHEN retry_count + 1 >= sqlc.arg('max_retries')::int THEN NULL + ELSE NOW() + make_interval(secs => GREATEST(1, ( + LEAST( + sqlc.arg('base_backoff_secs')::bigint * (1::bigint << LEAST(retry_count, 30)), + sqlc.arg('max_backoff_secs')::bigint + )::double precision * (0.5 + random() * 0.5) + )::int)) + END, + error = sqlc.arg('error') +WHERE id = sqlc.arg('id'); diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index 22fabb4b..76e6353e 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -785,3 +785,27 @@ func (r *Repository) ClaimQueuedForAutoApply(ctx context.Context, batchSize int3 func (r *Repository) ResetApplyingIngestionLogs(ctx context.Context) error { return r.queries.ResetApplyingIngestionLogs(ctx) } + +// MarkIngestionLogRetry records a failed bulk-plan-apply attempt for retry +// accounting. It increments retry_count and either re-arms the row as +// PENDING_RETRY with a jittered exponential backoff, or — once the budget is +// spent — retires it to terminal ERRORED. The terminal-vs-retry decision and the +// backoff schedule are computed atomically in SQL (see the MarkIngestionLogRetry +// query). +func (r *Repository) MarkIngestionLogRetry(ctx context.Context, id int32, maxRetries int32, baseBackoffSecs int64, maxBackoffSecs int64, err error) error { + params := postgres.MarkIngestionLogRetryParams{ + ID: id, + MaxRetries: maxRetries, + BaseBackoffSecs: baseBackoffSecs, + MaxBackoffSecs: maxBackoffSecs, + } + + if err != nil { + errJSON, marshalErr := json.Marshal(err) + if marshalErr != nil { + return fmt.Errorf("failed to marshal error: %w", marshalErr) + } + params.Error = errJSON + } + return r.queries.MarkIngestionLogRetry(ctx, params) +} diff --git a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go index 3130d9da..824bec7d 100644 --- a/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go +++ b/diode-server/gen/dbstore/postgres/ingestion_logs.sql.go @@ -68,17 +68,18 @@ SET state = 8 WHERE id IN ( SELECT id FROM ingestion_logs WHERE state = 1 + OR (state = 9 AND (next_retry_at IS NULL OR next_retry_at <= NOW())) ORDER BY id LIMIT $1 FOR UPDATE SKIP LOCKED ) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, retry_count, next_retry_at ` -// Claim a batch of QUEUED ingestion logs for the AutoApplyProcessor (combined -// plan+apply via /bulk-plan-apply). Transitions QUEUED (1) -> APPLYING (8). -// A row stays in APPLYING for the duration of the NetBox round-trip and is -// reset back to QUEUED on reconciler startup via ResetApplyingIngestionLogs. +// Claim a batch for the AutoApplyProcessor: fresh QUEUED (1) plus retry-eligible +// PENDING_RETRY (9) rows whose backoff has elapsed, transitioned to APPLYING (8). +// Ordered by id (not fresh-first) so a due retry is processed in line rather than +// starved behind fresh work when the queue never empties. func (q *Queries) ClaimQueuedForAutoApply(ctx context.Context, batchSize int32) ([]IngestionLog, error) { rows, err := q.db.Query(ctx, claimQueuedForAutoApply, batchSize) if err != nil { @@ -108,6 +109,8 @@ func (q *Queries) ClaimQueuedForAutoApply(ctx context.Context, batchSize int32) &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ); err != nil { return nil, err } @@ -129,7 +132,7 @@ WHERE id IN ( LIMIT $1 FOR UPDATE SKIP LOCKED ) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, retry_count, next_retry_at ` func (q *Queries) ClaimQueuedIngestionLogs(ctx context.Context, batchSize int32) ([]IngestionLog, error) { @@ -161,6 +164,8 @@ func (q *Queries) ClaimQueuedIngestionLogs(ctx context.Context, batchSize int32) &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ); err != nil { return nil, err } @@ -207,7 +212,7 @@ const createIngestionLog = `-- name: CreateIngestionLog :one INSERT INTO ingestion_logs (external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, source_metadata, entity_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, retry_count, next_retry_at ` type CreateIngestionLogParams struct { @@ -263,6 +268,8 @@ func (q *Queries) CreateIngestionLog(ctx context.Context, arg CreateIngestionLog &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ) return i, err } @@ -297,7 +304,7 @@ func (q *Queries) FindIngestionLogIDsByExternalIDs(ctx context.Context, external } const findPriorIngestionLogByEntityHash = `-- name: FindPriorIngestionLogByEntityHash :one -SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count +SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count, il.retry_count, il.next_retry_at FROM ingestion_logs il LEFT JOIN LATERAL ( SELECT branch_id @@ -308,6 +315,9 @@ LEFT JOIN LATERAL ( ) lcs ON true WHERE il.entity_hash = $1 AND lcs.branch_id IS NOT DISTINCT FROM $2::text + -- Exclude terminal ERRORED (7) so a re-ingest after the system gives up + -- re-queues instead of deduping against the dead row. + AND il.state IS DISTINCT FROM 7 ORDER BY il.created_at DESC LIMIT 1 ` @@ -340,15 +350,17 @@ func (q *Queries) FindPriorIngestionLogByEntityHash(ctx context.Context, arg Fin &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ) return i, err } const findPriorIngestionLogsByEntityHashes = `-- name: FindPriorIngestionLogsByEntityHashes :many -SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count +SELECT il.id, il.external_id, il.object_type, il.state, il.request_id, il.ingestion_ts, il.source_ts, il.producer_app_name, il.producer_app_version, il.sdk_name, il.sdk_version, il.entity, il.error, il.source_metadata, il.created_at, il.updated_at, il.entity_hash, il.last_seen, il.duplicate_count, il.retry_count, il.next_retry_at FROM unnest($1::text[]) AS h(entity_hash) CROSS JOIN LATERAL ( - SELECT il2.id, il2.external_id, il2.object_type, il2.state, il2.request_id, il2.ingestion_ts, il2.source_ts, il2.producer_app_name, il2.producer_app_version, il2.sdk_name, il2.sdk_version, il2.entity, il2.error, il2.source_metadata, il2.created_at, il2.updated_at, il2.entity_hash, il2.last_seen, il2.duplicate_count + SELECT il2.id, il2.external_id, il2.object_type, il2.state, il2.request_id, il2.ingestion_ts, il2.source_ts, il2.producer_app_name, il2.producer_app_version, il2.sdk_name, il2.sdk_version, il2.entity, il2.error, il2.source_metadata, il2.created_at, il2.updated_at, il2.entity_hash, il2.last_seen, il2.duplicate_count, il2.retry_count, il2.next_retry_at FROM ingestion_logs il2 WHERE il2.entity_hash = h.entity_hash AND ( @@ -358,6 +370,9 @@ CROSS JOIN LATERAL ( ORDER BY cs.id DESC LIMIT 1 ) IS NOT DISTINCT FROM $2::text + -- Exclude terminal ERRORED (7): see FindPriorIngestionLogByEntityHash. + -- Re-ingest after the retrier gives up must re-queue, not dedupe. + AND il2.state IS DISTINCT FROM 7 ORDER BY il2.created_at DESC LIMIT 1 ) il @@ -397,6 +412,8 @@ func (q *Queries) FindPriorIngestionLogsByEntityHashes(ctx context.Context, arg &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ); err != nil { return nil, err } @@ -420,6 +437,49 @@ func (q *Queries) IncrementDuplicateCount(ctx context.Context, id int32) error { return err } +const markIngestionLogRetry = `-- name: MarkIngestionLogRetry :exec +UPDATE ingestion_logs +SET retry_count = retry_count + 1, + state = CASE + WHEN retry_count + 1 >= $1::int THEN 7 + ELSE 9 + END, + next_retry_at = CASE + WHEN retry_count + 1 >= $1::int THEN NULL + ELSE NOW() + make_interval(secs => GREATEST(1, ( + LEAST( + $2::bigint * (1::bigint << LEAST(retry_count, 30)), + $3::bigint + )::double precision * (0.5 + random() * 0.5) + )::int)) + END, + error = $4 +WHERE id = $5 +` + +type MarkIngestionLogRetryParams struct { + MaxRetries int32 `json:"max_retries"` + BaseBackoffSecs int64 `json:"base_backoff_secs"` + MaxBackoffSecs int64 `json:"max_backoff_secs"` + Error json.RawMessage `json:"error"` + ID int32 `json:"id"` +} + +// Record a failed apply: increment retry_count and either re-arm as PENDING_RETRY +// (9) with a jittered exponential backoff (base*2^n capped at max, ×random[0.5,1) +// to spread retry herds), or retire to terminal ERRORED (7) once the budget is +// spent. SET expressions read the pre-update retry_count. +func (q *Queries) MarkIngestionLogRetry(ctx context.Context, arg MarkIngestionLogRetryParams) error { + _, err := q.db.Exec(ctx, markIngestionLogRetry, + arg.MaxRetries, + arg.BaseBackoffSecs, + arg.MaxBackoffSecs, + arg.Error, + arg.ID, + ) + return err +} + const resetApplyingIngestionLogs = `-- name: ResetApplyingIngestionLogs :exec UPDATE ingestion_logs SET state = 1 @@ -434,7 +494,7 @@ func (q *Queries) ResetApplyingIngestionLogs(ctx context.Context) error { } const retrieveIngestionLogByExternalID = `-- name: RetrieveIngestionLogByExternalID :one -SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, retry_count, next_retry_at FROM ingestion_logs WHERE external_id = $1 ` @@ -462,12 +522,14 @@ func (q *Queries) RetrieveIngestionLogByExternalID(ctx context.Context, external &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ) return i, err } const retrieveIngestionLogs = `-- name: RetrieveIngestionLogs :many -SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +SELECT id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, retry_count, next_retry_at FROM ingestion_logs WHERE (state = $1 OR $1 IS NULL) AND (object_type = $2 OR $2 IS NULL) @@ -522,6 +584,8 @@ func (q *Queries) RetrieveIngestionLogs(ctx context.Context, arg RetrieveIngesti &i.EntityHash, &i.LastSeen, &i.DuplicateCount, + &i.RetryCount, + &i.NextRetryAt, ); err != nil { return nil, err } @@ -606,7 +670,7 @@ UPDATE ingestion_logs SET state = $2, error = $3 WHERE id = $1 -RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count +RETURNING id, external_id, object_type, state, request_id, ingestion_ts, source_ts, producer_app_name, producer_app_version, sdk_name, sdk_version, entity, error, source_metadata, created_at, updated_at, entity_hash, last_seen, duplicate_count, retry_count, next_retry_at ` type UpdateIngestionLogStateWithErrorParams struct { diff --git a/diode-server/gen/dbstore/postgres/types.go b/diode-server/gen/dbstore/postgres/types.go index bc02935d..72d1906c 100644 --- a/diode-server/gen/dbstore/postgres/types.go +++ b/diode-server/gen/dbstore/postgres/types.go @@ -109,6 +109,14 @@ type IngestionLog struct { EntityHash pgtype.Text `json:"entity_hash"` LastSeen pgtype.Timestamptz `json:"last_seen"` DuplicateCount int32 `json:"duplicate_count"` + RetryCount int32 `json:"retry_count"` + NextRetryAt pgtype.Timestamptz `json:"next_retry_at"` +} + +type IngestionLogStateObjectTypeCount struct { + State int32 `json:"state"` + ObjectType string `json:"object_type"` + N int64 `json:"n"` } type VDeviation struct { diff --git a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go index a84a6254..62da8e64 100644 --- a/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go +++ b/diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go @@ -35,6 +35,7 @@ const ( State_IGNORED State = 6 State_ERRORED State = 7 State_APPLYING State = 8 + State_PENDING_RETRY State = 9 // failed but eligible for automatic retry; reclaimed by the auto-apply processor once backoff elapses ) // Enum value maps for State. @@ -49,6 +50,7 @@ var ( 6: "IGNORED", 7: "ERRORED", 8: "APPLYING", + 9: "PENDING_RETRY", } State_value = map[string]int32{ "STATE_UNSPECIFIED": 0, @@ -60,6 +62,7 @@ var ( "IGNORED": 6, "ERRORED": 7, "APPLYING": 8, + "PENDING_RETRY": 9, } ) @@ -1186,7 +1189,7 @@ const file_diode_v1_reconciler_proto_rawDesc = "" + "\x1cRetrieveDeviationByIDRequest\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\"R\n" + "\x1dRetrieveDeviationByIDResponse\x121\n" + - "\tdeviation\x18\x01 \x01(\v2\x13.diode.v1.DeviationR\tdeviation*\x85\x01\n" + + "\tdeviation\x18\x01 \x01(\v2\x13.diode.v1.DeviationR\tdeviation*\x98\x01\n" + "\x05State\x12\x15\n" + "\x11STATE_UNSPECIFIED\x10\x00\x12\n" + "\n" + @@ -1199,7 +1202,8 @@ const file_diode_v1_reconciler_proto_rawDesc = "" + "NO_CHANGES\x10\x05\x12\v\n" + "\aIGNORED\x10\x06\x12\v\n" + "\aERRORED\x10\a\x12\f\n" + - "\bAPPLYING\x10\b2\xcd\x02\n" + + "\bAPPLYING\x10\b\x12\x11\n" + + "\rPENDING_RETRY\x10\t2\xcd\x02\n" + "\x11ReconcilerService\x12m\n" + "\x15RetrieveIngestionLogs\x12&.diode.v1.RetrieveIngestionLogsRequest\x1a'.diode.v1.RetrieveIngestionLogsResponse\"\x03\x88\x02\x01\x12_\n" + "\x12RetrieveDeviations\x12#.diode.v1.RetrieveDeviationsRequest\x1a$.diode.v1.RetrieveDeviationsResponse\x12h\n" + diff --git a/diode-server/reconciler/config.go b/diode-server/reconciler/config.go index 87fd8056..80138ac9 100644 --- a/diode-server/reconciler/config.go +++ b/diode-server/reconciler/config.go @@ -1,6 +1,8 @@ package reconciler import ( + "time" + "github.com/netboxlabs/diode/diode-server/telemetry" "github.com/netboxlabs/diode/diode-server/tls" ) @@ -42,6 +44,12 @@ type Config struct { AutoApplyProcessorBatchSize int32 `envconfig:"AUTO_APPLY_PROCESSOR_BATCH_SIZE" default:"100"` AutoApplyProcessorConcurrency int `envconfig:"AUTO_APPLY_PROCESSOR_CONCURRENCY" default:"1"` + // Automatic retry of failed applies (auto-apply mode only); see RetryPolicy. + EnableFailedRetry bool `envconfig:"ENABLE_FAILED_RETRY" default:"false"` + FailedRetryMaxRetries int32 `envconfig:"FAILED_RETRY_MAX_RETRIES" default:"5"` + FailedRetryBaseBackoffSeconds int `envconfig:"FAILED_RETRY_BASE_BACKOFF_SECONDS" default:"30"` + FailedRetryMaxBackoffSeconds int `envconfig:"FAILED_RETRY_MAX_BACKOFF_SECONDS" default:"3600"` + // Experimental EnableGraphDB bool `envconfig:"ENABLE_GRAPH_DB" default:"false"` @@ -49,3 +57,13 @@ type Config struct { // If empty, default matching rules are used EntityMatchingConfigPath string `envconfig:"ENTITY_MATCHING_CONFIG_PATH" default:""` } + +// RetryPolicy builds the retry policy from config; it is wired into Ops. +func (c Config) RetryPolicy() RetryPolicy { + return RetryPolicy{ + Enabled: c.EnableFailedRetry, + MaxRetries: c.FailedRetryMaxRetries, + BaseBackoff: time.Duration(c.FailedRetryBaseBackoffSeconds) * time.Second, + MaxBackoff: time.Duration(c.FailedRetryMaxBackoffSeconds) * time.Second, + } +} diff --git a/diode-server/reconciler/mocks/repository.go b/diode-server/reconciler/mocks/repository.go index 7ac0a26c..203a0baf 100644 --- a/diode-server/reconciler/mocks/repository.go +++ b/diode-server/reconciler/mocks/repository.go @@ -668,6 +668,57 @@ func (_c *Repository_IncrementDuplicateCount_Call) RunAndReturn(run func(context return _c } +// MarkIngestionLogRetry provides a mock function with given fields: ctx, id, maxRetries, baseBackoffSecs, maxBackoffSecs, err +func (_m *Repository) MarkIngestionLogRetry(ctx context.Context, id int32, maxRetries int32, baseBackoffSecs int64, maxBackoffSecs int64, err error) error { + ret := _m.Called(ctx, id, maxRetries, baseBackoffSecs, maxBackoffSecs, err) + + if len(ret) == 0 { + panic("no return value specified for MarkIngestionLogRetry") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int32, int32, int64, int64, error) error); ok { + r0 = rf(ctx, id, maxRetries, baseBackoffSecs, maxBackoffSecs, err) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Repository_MarkIngestionLogRetry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkIngestionLogRetry' +type Repository_MarkIngestionLogRetry_Call struct { + *mock.Call +} + +// MarkIngestionLogRetry is a helper method to define mock.On call +// - ctx context.Context +// - id int32 +// - maxRetries int32 +// - baseBackoffSecs int64 +// - maxBackoffSecs int64 +// - err error +func (_e *Repository_Expecter) MarkIngestionLogRetry(ctx interface{}, id interface{}, maxRetries interface{}, baseBackoffSecs interface{}, maxBackoffSecs interface{}, err interface{}) *Repository_MarkIngestionLogRetry_Call { + return &Repository_MarkIngestionLogRetry_Call{Call: _e.mock.On("MarkIngestionLogRetry", ctx, id, maxRetries, baseBackoffSecs, maxBackoffSecs, err)} +} + +func (_c *Repository_MarkIngestionLogRetry_Call) Run(run func(ctx context.Context, id int32, maxRetries int32, baseBackoffSecs int64, maxBackoffSecs int64, err error)) *Repository_MarkIngestionLogRetry_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int32), args[2].(int32), args[3].(int64), args[4].(int64), args[5].(error)) + }) + return _c +} + +func (_c *Repository_MarkIngestionLogRetry_Call) Return(_a0 error) *Repository_MarkIngestionLogRetry_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Repository_MarkIngestionLogRetry_Call) RunAndReturn(run func(context.Context, int32, int32, int64, int64, error) error) *Repository_MarkIngestionLogRetry_Call { + _c.Call.Return(run) + return _c +} + // ResetApplyingIngestionLogs provides a mock function with given fields: ctx func (_m *Repository) ResetApplyingIngestionLogs(ctx context.Context) error { ret := _m.Called(ctx) diff --git a/diode-server/reconciler/ops.go b/diode-server/reconciler/ops.go index 0f9c167a..bf560462 100644 --- a/diode-server/reconciler/ops.go +++ b/diode-server/reconciler/ops.go @@ -40,6 +40,35 @@ func (l *DefaultLimits) MaxChangeSetsPerIngestionLog() int32 { return 5 } +// RetryPolicy governs automatic retry of failed applies. When Enabled, a failure +// is parked in PENDING_RETRY and re-claimed by the AutoApplyProcessor after a +// jittered exponential backoff (BaseBackoff*2^attempt, capped at MaxBackoff), up +// to MaxRetries attempts before retiring to terminal ERRORED. When disabled, +// failures stay terminal FAILED. +type RetryPolicy struct { + Enabled bool + MaxRetries int32 + BaseBackoff time.Duration + MaxBackoff time.Duration +} + +// DefaultRetryPolicy applies when WithRetryPolicy is not supplied. Retry is off +// by default; the reconciler enables it via ENABLE_FAILED_RETRY. +var DefaultRetryPolicy = RetryPolicy{ + Enabled: false, + MaxRetries: 5, + BaseBackoff: 30 * time.Second, + MaxBackoff: 60 * time.Minute, +} + +// OpsOption configures an Ops. +type OpsOption func(*Ops) + +// WithRetryPolicy overrides the default failed-apply retry policy. +func WithRetryPolicy(p RetryPolicy) OpsOption { + return func(o *Ops) { o.retryPolicy = p } +} + // Ops high level operations performed during ingestion processing. // // DefaultBranch lookups are served exclusively from an in-memory cache that @@ -47,10 +76,11 @@ func (l *DefaultLimits) MaxChangeSetsPerIngestionLog() int32 { // loop, AutoApply) never do synchronous NetBox HTTP via DefaultBranch — so // a NetBox/Hydra outage cannot block Redis→inbox draining. type Ops struct { - repository Repository - nbClient netboxdiodeplugin.NetBoxAPI - logger *slog.Logger - limits Limits + repository Repository + nbClient netboxdiodeplugin.NetBoxAPI + logger *slog.Logger + limits Limits + retryPolicy RetryPolicy // Default-branch state. Updated only by the background refresher. branchMu sync.RWMutex @@ -63,18 +93,23 @@ type Ops struct { // NewOps creates a new Ops. The background DefaultBranch refresher is NOT // started until Start(ctx) is called; until then DefaultBranch() returns // (nil, nil) and callers degrade to "no branch context". -func NewOps(repository Repository, nbClient netboxdiodeplugin.NetBoxAPI, logger *slog.Logger, limits Limits) *Ops { +func NewOps(repository Repository, nbClient netboxdiodeplugin.NetBoxAPI, logger *slog.Logger, limits Limits, opts ...OpsOption) *Ops { if limits == nil { limits = &DefaultLimits{} } - return &Ops{ - repository: repository, - nbClient: nbClient, - logger: logger, - limits: limits, - refreshSig: make(chan struct{}, 1), + o := &Ops{ + repository: repository, + nbClient: nbClient, + logger: logger, + limits: limits, + retryPolicy: DefaultRetryPolicy, + refreshSig: make(chan struct{}, 1), + } + for _, opt := range opts { + opt(o) } + return o } // Start launches the background default-branch refresher. It fetches once @@ -578,14 +613,12 @@ func (o *Ops) BulkPlanApply(ctx context.Context, items []ops.QueuedIngestionLog, } } - // For entities whose apply phase failed, attach the apply error message to the - // ingestion log row. BulkPersistChangeSets clears the error column when it sets - // the state, so this second pass annotates the FAILED rows with their reason. + // Apply failed: BulkPersistChangeSets cleared the error column when it set + // the state, so re-record the reason and advance retry accounting. for _, idx := range persistIndex { if results[idx].ApplyErr != nil { - changeSetErr := handleChangeSetError(results[idx].ApplyErr) - if err := o.repository.UpdateIngestionLogStateWithError(ctx, results[idx].IngestionLogID, reconcilerpb.State_FAILED, changeSetErr); err != nil { - o.logger.Warn("failed to annotate apply failure", "ingestionLogID", results[idx].IngestionLogID, "error", err) + if err := o.markForRetry(ctx, results[idx].IngestionLogID, results[idx].ApplyErr); err != nil { + o.logger.Warn("failed to record apply failure for retry", "ingestionLogID", results[idx].IngestionLogID, "error", err) } } } @@ -610,8 +643,7 @@ func (o *Ops) persistPlanApplyFailurePlaceholder(ctx context.Context, item ops.Q contextMap := map[string]any{"request_id": ingestEntity.RequestID, "object_type": ingestEntity.ObjectType} sentry.CaptureError(planErr, tags, "BulkPlanApply", contextMap) - changeSetErr := handleChangeSetError(planErr) - if err2 := o.repository.UpdateIngestionLogStateWithError(ctx, item.ID, reconcilerpb.State_FAILED, changeSetErr); err2 != nil { + if err2 := o.markForRetry(ctx, item.ID, planErr); err2 != nil { planErr = errors.Join(planErr, err2) } @@ -625,6 +657,23 @@ func (o *Ops) persistPlanApplyFailurePlaceholder(ctx context.Context, item ops.Q return ops.BulkPlanApplyResult{IngestionLogID: item.ID, ChangeSetID: id, ChangeSet: cs, PlanErr: planErr} } +// markForRetry records a failed apply per the RetryPolicy: PENDING_RETRY with +// backoff when enabled, else terminal FAILED. All failure paths route through it. +func (o *Ops) markForRetry(ctx context.Context, id int32, cause error) error { + changeSetErr := handleChangeSetError(cause) + if !o.retryPolicy.Enabled { + return o.repository.UpdateIngestionLogStateWithError(ctx, id, reconcilerpb.State_FAILED, changeSetErr) + } + return o.repository.MarkIngestionLogRetry( + ctx, + id, + o.retryPolicy.MaxRetries, + int64(o.retryPolicy.BaseBackoff.Seconds()), + int64(o.retryPolicy.MaxBackoff.Seconds()), + changeSetErr, + ) +} + func handleChangeSetError(err error) error { var changeSetErr *changeset.Error if errors.As(err, &changeSetErr) { diff --git a/diode-server/reconciler/ops_retry_test.go b/diode-server/reconciler/ops_retry_test.go new file mode 100644 index 00000000..6dabd422 --- /dev/null +++ b/diode-server/reconciler/ops_retry_test.go @@ -0,0 +1,143 @@ +package reconciler_test + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + pluginmocks "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler/ops" +) + +func retryTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func retryTestItems() []ops.QueuedIngestionLog { + return []ops.QueuedIngestionLog{ + {ID: 1, IngestionLog: &reconcilerpb.IngestionLog{Entity: &diodepb.Entity{}, ObjectType: "dcim.device"}}, + } +} + +// TestBulkPlanApplyTransportErrorMarksForRetry verifies that a whole-batch +// transport failure routes every entity through MarkIngestionLogRetry with the +// configured policy (seconds derived from the RetryPolicy durations) rather than +// a bare FAILED write, and still persists a failure-placeholder change set. +func TestBulkPlanApplyTransportErrorMarksForRetry(t *testing.T) { + mockRepository := mocks.NewRepository(t) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + policy := reconciler.RetryPolicy{Enabled: true, MaxRetries: 7, BaseBackoff: 45 * time.Second, MaxBackoff: 10 * time.Minute} + opsInstance := reconciler.NewOps(mockRepository, mockNetBoxClient, retryTestLogger(), nil, reconciler.WithRetryPolicy(policy)) + + mockNetBoxClient.EXPECT().BulkPlanApply(mock.Anything, mock.Anything). + Return(nil, fmt.Errorf("netbox unavailable")) + + // 45s base, 600s (10m) cap -> passed to MarkIngestionLogRetry as seconds. + mockRepository.EXPECT(). + MarkIngestionLogRetry(mock.Anything, int32(1), int32(7), int64(45), int64(600), mock.Anything). + Return(nil).Once() + mockRepository.EXPECT(). + CreateChangeSet(mock.Anything, mock.Anything, int32(1)). + Return(int32Ptr(99), nil).Once() + + results := opsInstance.BulkPlanApply(context.Background(), retryTestItems(), "") + + require.Len(t, results, 1) + require.Error(t, results[0].PlanErr) +} + +// TestBulkPlanApplyApplyErrorMarksForRetry verifies that an apply-phase failure +// (plan ok, apply failed) persists the change set and then advances retry +// accounting via MarkIngestionLogRetry. +func TestBulkPlanApplyApplyErrorMarksForRetry(t *testing.T) { + mockRepository := mocks.NewRepository(t) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + policy := reconciler.RetryPolicy{Enabled: true, MaxRetries: 3, BaseBackoff: 30 * time.Second, MaxBackoff: time.Hour} + opsInstance := reconciler.NewOps(mockRepository, mockNetBoxClient, retryTestLogger(), nil, reconciler.WithRetryPolicy(policy)) + + resp := &netboxdiodeplugin.BulkPlanApplyResponse{ + Results: []netboxdiodeplugin.BulkPlanApplyResult{ + { + ID: "1", + ChangeSet: &netboxdiodeplugin.ChangeSet{ + ID: "cs-1", + Changes: []netboxdiodeplugin.Change{{ID: "c1", ChangeType: "update", ObjectType: "dcim.device"}}, + }, + Errors: &netboxdiodeplugin.BulkPlanApplyErrors{Apply: json.RawMessage(`"constraint violation"`)}, + }, + }, + } + mockNetBoxClient.EXPECT().BulkPlanApply(mock.Anything, mock.Anything).Return(resp, nil) + + // DefaultLimits.MaxChangeSetsPerIngestionLog() == 5. + mockRepository.EXPECT(). + BulkPersistChangeSets(mock.Anything, mock.Anything, int32(5)). + Return([]ops.BulkPersistResult{{IngestionLogID: 1, ChangeSetID: int32Ptr(5)}}, nil).Once() + mockRepository.EXPECT(). + MarkIngestionLogRetry(mock.Anything, int32(1), int32(3), int64(30), int64(3600), mock.Anything). + Return(nil).Once() + + results := opsInstance.BulkPlanApply(context.Background(), retryTestItems(), "") + + require.Len(t, results, 1) + require.NoError(t, results[0].PlanErr) + require.Error(t, results[0].ApplyErr) +} + +// TestBulkPlanApplyRetryDisabledWritesFailed verifies that with retry disabled +// (the default), a failed apply is written as terminal FAILED via +// UpdateIngestionLogStateWithError — exactly as before the feature — and +// MarkIngestionLogRetry is never called. +func TestBulkPlanApplyRetryDisabledWritesFailed(t *testing.T) { + mockRepository := mocks.NewRepository(t) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + // Enabled defaults to false. + policy := reconciler.RetryPolicy{MaxRetries: 5, BaseBackoff: 30 * time.Second, MaxBackoff: time.Hour} + opsInstance := reconciler.NewOps(mockRepository, mockNetBoxClient, retryTestLogger(), nil, reconciler.WithRetryPolicy(policy)) + + mockNetBoxClient.EXPECT().BulkPlanApply(mock.Anything, mock.Anything). + Return(nil, fmt.Errorf("netbox unavailable")) + + mockRepository.EXPECT(). + UpdateIngestionLogStateWithError(mock.Anything, int32(1), reconcilerpb.State_FAILED, mock.Anything). + Return(nil).Once() + mockRepository.EXPECT(). + CreateChangeSet(mock.Anything, mock.Anything, int32(1)). + Return(int32Ptr(99), nil).Once() + + results := opsInstance.BulkPlanApply(context.Background(), retryTestItems(), "") + + require.Len(t, results, 1) + require.Error(t, results[0].PlanErr) + mockRepository.AssertNotCalled(t, "MarkIngestionLogRetry") +} + +// TestConfigRetryPolicy verifies the config->RetryPolicy mapping, including the +// Enabled gate (off unless ENABLE_FAILED_RETRY is set) and seconds->durations. +func TestConfigRetryPolicy(t *testing.T) { + cfg := reconciler.Config{ + EnableFailedRetry: true, + FailedRetryMaxRetries: 9, + FailedRetryBaseBackoffSeconds: 15, + FailedRetryMaxBackoffSeconds: 1800, + } + p := cfg.RetryPolicy() + require.True(t, p.Enabled) + require.Equal(t, int32(9), p.MaxRetries) + require.Equal(t, 15*time.Second, p.BaseBackoff) + require.Equal(t, 30*time.Minute, p.MaxBackoff) + + require.False(t, reconciler.Config{}.RetryPolicy().Enabled) +} diff --git a/diode-server/reconciler/repository.go b/diode-server/reconciler/repository.go index 3b04067b..539c992b 100644 --- a/diode-server/reconciler/repository.go +++ b/diode-server/reconciler/repository.go @@ -35,4 +35,8 @@ type Repository interface { ClaimQueuedIngestionLogs(ctx context.Context, batchSize int32) ([]ops.QueuedIngestionLog, error) ClaimQueuedForAutoApply(ctx context.Context, batchSize int32) ([]ops.QueuedIngestionLog, error) ResetApplyingIngestionLogs(ctx context.Context) error + + // MarkIngestionLogRetry records a failed apply (PENDING_RETRY + backoff, or + // ERRORED once the budget is exhausted); see reconciler.RetryPolicy. + MarkIngestionLogRetry(ctx context.Context, id int32, maxRetries int32, baseBackoffSecs int64, maxBackoffSecs int64, err error) error } diff --git a/docs/diode-proto.md b/docs/diode-proto.md index cfd2a997..33050bb3 100644 --- a/docs/diode-proto.md +++ b/docs/diode-proto.md @@ -5364,6 +5364,7 @@ The response from the retrieve ingestion logs request | IGNORED | 6 | | | ERRORED | 7 | | | APPLYING | 8 | | +| PENDING_RETRY | 9 | failed but eligible for automatic retry; reclaimed by the auto-apply processor once backoff elapses | From 80dae054d7c9bd0ef08ff70309824963e72c5419 Mon Sep 17 00:00:00 2001 From: Micah Parks <66095735+MicahParks@users.noreply.github.com> Date: Thu, 28 May 2026 18:28:11 -0400 Subject: [PATCH 2/2] Remove comment --- diode-proto/diode/v1/reconciler.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diode-proto/diode/v1/reconciler.proto b/diode-proto/diode/v1/reconciler.proto index 5eed8d61..a2d9d4fd 100644 --- a/diode-proto/diode/v1/reconciler.proto +++ b/diode-proto/diode/v1/reconciler.proto @@ -17,7 +17,7 @@ enum State { IGNORED = 6; ERRORED = 7; APPLYING = 8; - PENDING_RETRY = 9; // failed but eligible for automatic retry; reclaimed by the auto-apply processor once backoff elapses + PENDING_RETRY = 9; } // Ingestion metrics