Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum State {
IGNORED = 6;
ERRORED = 7;
APPLYING = 8;
PENDING_RETRY = 9;
}

// Ingestion metrics
Expand Down
4 changes: 3 additions & 1 deletion diode-server/cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func main() {
os.Exit(1)
}

ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil)
ops := reconciler.NewOps(repository, nbClient, s.Logger(), nil, reconciler.WithRetryPolicy(cfg.RetryPolicy()))
ops.Start(ctx)

// Build processor options
Expand Down Expand Up @@ -245,6 +245,8 @@ func main() {
if err := repository.ResetApplyingIngestionLogs(ctx); err != nil {
s.Logger().Error("failed to reset applying ingestion logs", "error", err)
}
// AutoApplyProcessor also re-drives due PENDING_RETRY rows (gated by
// ENABLE_FAILED_RETRY via the RetryPolicy on ops); inert when retry is off.
autoApplyProcessor := reconciler.NewAutoApplyProcessor(s.Logger(), cfg, repository, ops, metricRecorder, backpressure)
if err := s.RegisterComponent(autoApplyProcessor); err != nil {
s.Logger().Error("failed to register auto-apply processor", "error", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose Up

-- Retry accounting for automatic retry of failed applies.
-- retry_count - failed apply attempts for this row.
-- next_retry_at - earliest re-claim time; NULL means eligible now.
ALTER TABLE ingestion_logs
ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMP WITH TIME ZONE;

-- Partial index for the due-retry claim predicate (PENDING_RETRY by due time).
CREATE INDEX IF NOT EXISTS idx_ingestion_logs_pending_retry
ON ingestion_logs (next_retry_at) WHERE state = 9;

-- +goose Down

DROP INDEX IF EXISTS idx_ingestion_logs_pending_retry;
ALTER TABLE ingestion_logs
DROP COLUMN IF EXISTS next_retry_at,
DROP COLUMN IF EXISTS retry_count;
38 changes: 34 additions & 4 deletions diode-server/dbstore/postgres/queries/ingestion_logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ LEFT JOIN LATERAL (
) lcs ON true
WHERE il.entity_hash = sqlc.arg('entity_hash')
AND lcs.branch_id IS NOT DISTINCT FROM sqlc.narg('branch_id')::text
-- Exclude terminal ERRORED (7) so a re-ingest after the system gives up
-- re-queues instead of deduping against the dead row.
AND il.state IS DISTINCT FROM 7
ORDER BY il.created_at DESC
LIMIT 1;

Expand All @@ -78,6 +81,9 @@ CROSS JOIN LATERAL (
ORDER BY cs.id DESC
LIMIT 1
) IS NOT DISTINCT FROM sqlc.narg('branch_id')::text
-- Exclude terminal ERRORED (7): see FindPriorIngestionLogByEntityHash.
-- Re-ingest after the retrier gives up must re-queue, not dedupe.
AND il2.state IS DISTINCT FROM 7
ORDER BY il2.created_at DESC
LIMIT 1
) il;
Expand Down Expand Up @@ -120,15 +126,16 @@ WHERE id IN (
RETURNING *;

-- name: ClaimQueuedForAutoApply :many
-- Claim a batch of QUEUED ingestion logs for the AutoApplyProcessor (combined
-- plan+apply via /bulk-plan-apply). Transitions QUEUED (1) -> APPLYING (8).
-- A row stays in APPLYING for the duration of the NetBox round-trip and is
-- reset back to QUEUED on reconciler startup via ResetApplyingIngestionLogs.
-- Claim a batch for the AutoApplyProcessor: fresh QUEUED (1) plus retry-eligible
-- PENDING_RETRY (9) rows whose backoff has elapsed, transitioned to APPLYING (8).
-- Ordered by id (not fresh-first) so a due retry is processed in line rather than
-- starved behind fresh work when the queue never empties.
UPDATE ingestion_logs
SET state = 8
WHERE id IN (
SELECT id FROM ingestion_logs
WHERE state = 1
OR (state = 9 AND (next_retry_at IS NULL OR next_retry_at <= NOW()))
ORDER BY id
LIMIT sqlc.arg('batch_size')
FOR UPDATE SKIP LOCKED
Expand All @@ -141,3 +148,26 @@ RETURNING *;
UPDATE ingestion_logs
SET state = 1
WHERE state = 8;

-- name: MarkIngestionLogRetry :exec
-- Record a failed apply: increment retry_count and either re-arm as PENDING_RETRY
-- (9) with a jittered exponential backoff (base*2^n capped at max, ×random[0.5,1)
-- to spread retry herds), or retire to terminal ERRORED (7) once the budget is
-- spent. SET expressions read the pre-update retry_count.
UPDATE ingestion_logs
SET retry_count = retry_count + 1,
state = CASE
WHEN retry_count + 1 >= sqlc.arg('max_retries')::int THEN 7
ELSE 9
END,
next_retry_at = CASE
WHEN retry_count + 1 >= sqlc.arg('max_retries')::int THEN NULL
ELSE NOW() + make_interval(secs => GREATEST(1, (
LEAST(
sqlc.arg('base_backoff_secs')::bigint * (1::bigint << LEAST(retry_count, 30)),
sqlc.arg('max_backoff_secs')::bigint
)::double precision * (0.5 + random() * 0.5)
)::int))
END,
error = sqlc.arg('error')
WHERE id = sqlc.arg('id');
24 changes: 24 additions & 0 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,3 +785,27 @@ func (r *Repository) ClaimQueuedForAutoApply(ctx context.Context, batchSize int3
func (r *Repository) ResetApplyingIngestionLogs(ctx context.Context) error {
return r.queries.ResetApplyingIngestionLogs(ctx)
}

// MarkIngestionLogRetry records a failed bulk-plan-apply attempt for retry
// accounting. It increments retry_count and either re-arms the row as
// PENDING_RETRY with a jittered exponential backoff, or — once the budget is
// spent — retires it to terminal ERRORED. The terminal-vs-retry decision and the
// backoff schedule are computed atomically in SQL (see the MarkIngestionLogRetry
// query).
func (r *Repository) MarkIngestionLogRetry(ctx context.Context, id int32, maxRetries int32, baseBackoffSecs int64, maxBackoffSecs int64, err error) error {
params := postgres.MarkIngestionLogRetryParams{
ID: id,
MaxRetries: maxRetries,
BaseBackoffSecs: baseBackoffSecs,
MaxBackoffSecs: maxBackoffSecs,
}

if err != nil {
errJSON, marshalErr := json.Marshal(err)
if marshalErr != nil {
return fmt.Errorf("failed to marshal error: %w", marshalErr)
}
params.Error = errJSON
}
return r.queries.MarkIngestionLogRetry(ctx, params)
}
90 changes: 77 additions & 13 deletions diode-server/gen/dbstore/postgres/ingestion_logs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions diode-server/gen/dbstore/postgres/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading