From 8ff93836bcbd2018e9034fb2202ba17b8e40cf5e Mon Sep 17 00:00:00 2001
From: Ankur Shrivastava
Date: Sat, 25 Apr 2026 22:34:35 +0800
Subject: [PATCH 1/9] =?UTF-8?q?feat:=20address=20user=20feedback=20?=
=?UTF-8?q?=E2=80=94=20zombie=20cleanup,=20ErrSkipTick,=20config=20getters?=
=?UTF-8?q?,=20docs?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- Zombie children: lazily prune stopped children via done channel on
GetChildren/GetChild/GetChildCount — no manual cleanup needed
- ErrSkipTick: sentinel for periodic handlers to skip a tick without
triggering restart (transient failures like DB timeouts)
- WorkerInfo.GetHandler(): expose handler for middleware type assertion,
enabling the handler-as-metadata pattern
- Worker config getters: GetInterval, GetRestartOnFail, GetJitterPercent,
GetInitialDelay for reconciler introspection
- GetChildCount(): efficient child count without allocating a sorted slice
- WithSkipOnNotAcquired: convenience LockOption for log-and-skip pattern
- Docs: error semantics decision table, interceptor level guidance,
handler-as-metadata reconciler example, Locker compatibility note
---
README.md | 294 ++++++++++++++++++++++++++++++++++------
example_test.go | 80 +++++++++++
helpers.go | 5 +-
helpers_test.go | 39 ++++++
middleware/README.md | 26 +++-
middleware/lock.go | 21 +++
middleware/lock_test.go | 31 +++++
run.go | 30 ++--
run_test.go | 34 +++++
worker.go | 95 ++++++++++++-
worker_test.go | 124 +++++++++++++++++
11 files changed, 715 insertions(+), 64 deletions(-)
diff --git a/README.md b/README.md
index 4dd5f3a..2197038 100644
--- a/README.md
+++ b/README.md
@@ -53,6 +53,22 @@ For periodic workers \(with [Worker.Every](<#Worker.Every>)\): the handler runs
Returning nil from a non\-periodic handler stops the worker permanently, even with restart enabled. Use [ErrDoNotRestart](<#ErrDoNotRestart>) for explicit permanent completion from periodic handlers.
+### Error Semantics for Periodic Handlers
+
+The return value from a periodic handler determines what happens next:
+
+```
+| Return value | Timer loop | Restart? | Use case |
+|-----------------|------------|----------|--------------------------|
+| nil | continues | n/a | Success, next tick fires |
+| ErrSkipTick | continues | n/a | Transient failure, skip |
+| ErrDoNotRestart | exits | no | Permanent completion |
+| other error | exits | yes* | Failure, needs restart |
+| ctx.Err() | exits | no | Graceful shutdown |
+```
+
+\*Only if [Worker.WithRestart](<#Worker.WithRestart>)\(true\) \(the default\).
+
### Middleware
Cross\-cutting concerns like tracing, logging, and panic recovery are implemented as [Middleware](<#Middleware>). The middleware chain follows the gRPC interceptor convention: a flat function that calls next to continue:
@@ -68,6 +84,8 @@ func myMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.Cy
Attach middleware per\-worker via [Worker.Interceptors](<#Worker.Interceptors>) or per\-run via [WithInterceptors](<#WithInterceptors>). Built\-in middleware is available in the middleware/ sub\-package.
+Run\-level interceptors \([WithInterceptors](<#WithInterceptors>)\) wrap all workers and are best for cross\-cutting defaults \(tracing, logging, panic recovery\). Worker\-level interceptors \([Worker.Interceptors](<#Worker.Interceptors>)\) are best for worker\-specific concerns \(distributed locks with per\-worker TTL, rate limiting\). Children inherit run\-level interceptors but not the parent's worker\-level interceptors.
+
### Helpers
Common patterns are provided as helpers:
@@ -171,6 +189,106 @@ pool shut down
+Example (Reconciler With Change Detection)
+
+
+Demonstrates config\-driven reconciliation with change detection using the handler\-as\-metadata pattern. The handler struct carries a config version that the reconciler inspects via GetChild\(\).GetHandler\(\) type assertion, eliminating the need for a parallel tracking map.
+
+```go
+package main
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-coldbrew/workers"
+)
+
+// solverHandler is used in Example_reconcilerWithChangeDetection to
+// demonstrate the handler-as-metadata pattern.
+type solverHandler struct {
+ version int
+}
+
+func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error {
+ <-ctx.Done()
+ return ctx.Err()
+}
+
+func (h *solverHandler) Close() error { return nil }
+
+func main() {
+ type solverConfig struct {
+ version int
+ name string
+ }
+
+ // Simulate config that changes over 3 ticks.
+ configs := []map[string]solverConfig{
+ {"a": {version: 1, name: "a"}},
+ {"a": {version: 1, name: "a"}, "b": {version: 1, name: "b"}},
+ {"a": {version: 2, name: "a"}, "b": {version: 1, name: "b"}}, // a gets new version
+ }
+
+ tick := 0
+ manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
+ ticker := time.NewTicker(40 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ if tick >= len(configs) {
+ continue
+ }
+ desired := configs[tick]
+ tick++
+
+ // Remove workers no longer desired.
+ for _, name := range info.GetChildren() {
+ if _, ok := desired[name]; !ok {
+ info.Remove(name)
+ }
+ }
+
+ // Add new or replace changed workers.
+ for key, cfg := range desired {
+ child, exists := info.GetChild(key)
+ if exists {
+ // Check if config changed via handler type assertion.
+ if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version {
+ continue // unchanged, skip
+ }
+ info.Remove(key) // config changed, replace
+ }
+ info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
+ }
+ time.Sleep(10 * time.Millisecond)
+ fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), info.GetChildCount())
+ }
+ }
+ })
+
+ ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
+ defer cancel()
+
+ workers.Run(ctx, []*workers.Worker{manager})
+}
+```
+
+#### Output
+
+```
+tick 1: children=[a] count=1
+tick 2: children=[a b] count=2
+tick 3: children=[a b] count=2
+```
+
+
+
+
Example (Standalone)
@@ -247,7 +365,11 @@ shutdown complete
- [func \(w \*Worker\) AddInterceptors\(mw ...Middleware\) \*Worker](<#Worker.AddInterceptors>)
- [func \(w \*Worker\) Every\(d time.Duration\) \*Worker](<#Worker.Every>)
- [func \(w \*Worker\) GetHandler\(\) CycleHandler](<#Worker.GetHandler>)
+ - [func \(w \*Worker\) GetInitialDelay\(\) time.Duration](<#Worker.GetInitialDelay>)
+ - [func \(w \*Worker\) GetInterval\(\) time.Duration](<#Worker.GetInterval>)
+ - [func \(w \*Worker\) GetJitterPercent\(\) int](<#Worker.GetJitterPercent>)
- [func \(w \*Worker\) GetName\(\) string](<#Worker.GetName>)
+ - [func \(w \*Worker\) GetRestartOnFail\(\) bool](<#Worker.GetRestartOnFail>)
- [func \(w \*Worker\) Handler\(h CycleHandler\) \*Worker](<#Worker.Handler>)
- [func \(w \*Worker\) HandlerFunc\(fn CycleFunc\) \*Worker](<#Worker.HandlerFunc>)
- [func \(w \*Worker\) Interceptors\(mw ...Middleware\) \*Worker](<#Worker.Interceptors>)
@@ -265,11 +387,14 @@ shutdown complete
- [func \(info \*WorkerInfo\) Add\(w \*Worker\) bool](<#WorkerInfo.Add>)
- [func \(info \*WorkerInfo\) GetAttempt\(\) int](<#WorkerInfo.GetAttempt>)
- [func \(info \*WorkerInfo\) GetChild\(name string\) \(Worker, bool\)](<#WorkerInfo.GetChild>)
+ - [func \(info \*WorkerInfo\) GetChildCount\(\) int](<#WorkerInfo.GetChildCount>)
- [func \(info \*WorkerInfo\) GetChildren\(\) \[\]string](<#WorkerInfo.GetChildren>)
+ - [func \(info \*WorkerInfo\) GetHandler\(\) CycleHandler](<#WorkerInfo.GetHandler>)
- [func \(info \*WorkerInfo\) GetName\(\) string](<#WorkerInfo.GetName>)
- [func \(info \*WorkerInfo\) Remove\(name string\)](<#WorkerInfo.Remove>)
- [type WorkerInfoOption](<#WorkerInfoOption>)
- [func WithTestChildren\(ctx context.Context\) WorkerInfoOption](<#WithTestChildren>)
+ - [func WithTestHandler\(h CycleHandler\) WorkerInfoOption](<#WithTestHandler>)
## Variables
@@ -280,8 +405,14 @@ shutdown complete
var ErrDoNotRestart = suture.ErrDoNotRestart
```
+ErrSkipTick can be returned from a periodic handler to skip the current tick without triggering restart. The timer continues and the next tick fires normally. Only meaningful for periodic workers \(with [Worker.Every](<#Worker.Every>)\).
+
+```go
+var ErrSkipTick = errors.New("workers: skip tick")
+```
+
-## func [Run]()
+## func [Run]()
```go
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
@@ -337,7 +468,7 @@ all workers stopped
-## func [RunWorker]()
+## func [RunWorker]()
```go
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
@@ -470,7 +601,7 @@ func (BaseMetrics) WorkerStopped(string)
-## type [CycleFunc]()
+## type [CycleFunc]()
CycleFunc adapts a plain function into a [CycleHandler](<#CycleHandler>). Close is a no\-op — use this for simple, stateless handlers.
@@ -479,7 +610,7 @@ type CycleFunc func(ctx context.Context, info *WorkerInfo) error
```
-### func [BatchChannelWorker]()
+### func [BatchChannelWorker]()
```go
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFunc
@@ -534,7 +665,7 @@ func main() {
-### func [ChannelWorker]()
+### func [ChannelWorker]()
```go
func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFunc
@@ -590,7 +721,7 @@ world
-### func [EveryInterval]()
+### func [EveryInterval]()
```go
func EveryInterval(d time.Duration, fn CycleFunc) CycleFunc
@@ -642,7 +773,7 @@ tick 2
-### func \(CycleFunc\) [Close]()
+### func \(CycleFunc\) [Close]()
```go
func (fn CycleFunc) Close() error
@@ -651,7 +782,7 @@ func (fn CycleFunc) Close() error
Close is a no\-op for CycleFunc.
-### func \(CycleFunc\) [RunCycle]()
+### func \(CycleFunc\) [RunCycle]()
```go
func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
@@ -660,7 +791,7 @@ func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
-## type [CycleHandler]()
+## type [CycleHandler]()
CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources.
@@ -701,7 +832,7 @@ func NewPrometheusMetrics(namespace string) Metrics
NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names \(e.g., "myapp" → "myapp\_worker\_started\_total"\). Metrics are auto\-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process\-global; use a small number of static namespaces \(not per\-request/tenant values\).
-## type [Middleware]()
+## type [Middleware]()
Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention.
@@ -710,7 +841,7 @@ type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) erro
```
-## type [RunOption]()
+## type [RunOption]()
RunOption configures the behavior of [Run](<#Run>).
@@ -719,7 +850,7 @@ type RunOption func(*runConfig)
```
-### func [AddInterceptors]()
+### func [AddInterceptors]()
```go
func AddInterceptors(mw ...Middleware) RunOption
@@ -728,7 +859,7 @@ func AddInterceptors(mw ...Middleware) RunOption
AddInterceptors appends to the run\-level interceptor list.
-### func [WithDefaultJitter]()
+### func [WithDefaultJitter]()
```go
func WithDefaultJitter(percent int) RunOption
@@ -737,7 +868,7 @@ func WithDefaultJitter(percent int) RunOption
WithDefaultJitter sets a run\-level default jitter percentage for all periodic workers. Worker\-level [Worker.WithJitter](<#Worker.WithJitter>) takes precedence. Setting Worker.WithJitter\(0\) disables jitter for a specific worker even when a run\-level default is set.
-### func [WithInterceptors]()
+### func [WithInterceptors]()
```go
func WithInterceptors(mw ...Middleware) RunOption
@@ -746,7 +877,7 @@ func WithInterceptors(mw ...Middleware) RunOption
WithInterceptors replaces the run\-level interceptor list. Run\-level interceptors wrap outside worker\-level interceptors.
-### func [WithMetrics]()
+### func [WithMetrics]()
```go
func WithMetrics(m Metrics) RunOption
@@ -755,7 +886,7 @@ func WithMetrics(m Metrics) RunOption
WithMetrics sets the metrics implementation for all workers started by [Run](<#Run>). Workers inherit this unless they override via [Worker.WithMetrics](<#Worker.WithMetrics>). If not set, [BaseMetrics](<#BaseMetrics>) is used.
-## type [Worker]()
+## type [Worker]()
Worker represents a background goroutine managed by the framework. Create with [NewWorker](<#NewWorker>) and configure with builder methods.
@@ -766,7 +897,7 @@ type Worker struct {
```
-### func [NewWorker]()
+### func [NewWorker]()
```go
func NewWorker(name string) *Worker
@@ -814,7 +945,7 @@ worker "greeter" started (attempt 0)
-### func \(\*Worker\) [AddInterceptors]()
+### func \(\*Worker\) [AddInterceptors]()
```go
func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
@@ -823,7 +954,7 @@ func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
AddInterceptors appends to the worker\-level interceptor list.
-### func \(\*Worker\) [Every]()
+### func \(\*Worker\) [Every]()
```go
func (w *Worker) Every(d time.Duration) *Worker
@@ -873,7 +1004,7 @@ tick 2
-### func \(\*Worker\) [GetHandler]()
+### func \(\*Worker\) [GetHandler]()
```go
func (w *Worker) GetHandler() CycleHandler
@@ -881,8 +1012,35 @@ func (w *Worker) GetHandler() CycleHandler
GetHandler returns the worker's [CycleHandler](<#CycleHandler>), or nil if not set.
+
+### func \(\*Worker\) [GetInitialDelay]()
+
+```go
+func (w *Worker) GetInitialDelay() time.Duration
+```
+
+GetInitialDelay returns the initial delay before the first tick, or 0 if not set.
+
+
+### func \(\*Worker\) [GetInterval]()
+
+```go
+func (w *Worker) GetInterval() time.Duration
+```
+
+GetInterval returns the periodic interval, or 0 if this is not a periodic worker.
+
+
+### func \(\*Worker\) [GetJitterPercent]()
+
+```go
+func (w *Worker) GetJitterPercent() int
+```
+
+GetJitterPercent returns the jitter percentage. \-1 means inherit run\-level default, 0 means no jitter.
+
-### func \(\*Worker\) [GetName]()
+### func \(\*Worker\) [GetName]()
```go
func (w *Worker) GetName() string
@@ -890,8 +1048,17 @@ func (w *Worker) GetName() string
GetName returns the worker's name.
+
+### func \(\*Worker\) [GetRestartOnFail]()
+
+```go
+func (w *Worker) GetRestartOnFail() bool
+```
+
+GetRestartOnFail returns whether the worker restarts on failure.
+
-### func \(\*Worker\) [Handler]()
+### func \(\*Worker\) [Handler]()
```go
func (w *Worker) Handler(h CycleHandler) *Worker
@@ -900,7 +1067,7 @@ func (w *Worker) Handler(h CycleHandler) *Worker
Handler sets the worker's [CycleHandler](<#CycleHandler>). Use this for handlers that need cleanup via Close \(e.g., database connections, leases\).
-### func \(\*Worker\) [HandlerFunc]()
+### func \(\*Worker\) [HandlerFunc]()
```go
func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
@@ -909,7 +1076,7 @@ func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
HandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers.
-### func \(\*Worker\) [Interceptors]()
+### func \(\*Worker\) [Interceptors]()
```go
func (w *Worker) Interceptors(mw ...Middleware) *Worker
@@ -968,7 +1135,7 @@ func main() {
-### func \(\*Worker\) [WithBackoffJitter]()
+### func \(\*Worker\) [WithBackoffJitter]()
```go
func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker
@@ -977,7 +1144,7 @@ func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Wo
WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration.
-### func \(\*Worker\) [WithFailureBackoff]()
+### func \(\*Worker\) [WithFailureBackoff]()
```go
func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
@@ -986,7 +1153,7 @@ func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
-### func \(\*Worker\) [WithFailureDecay]()
+### func \(\*Worker\) [WithFailureDecay]()
```go
func (w *Worker) WithFailureDecay(decay float64) *Worker
@@ -995,7 +1162,7 @@ func (w *Worker) WithFailureDecay(decay float64) *Worker
WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
-### func \(\*Worker\) [WithFailureThreshold]()
+### func \(\*Worker\) [WithFailureThreshold]()
```go
func (w *Worker) WithFailureThreshold(threshold float64) *Worker
@@ -1004,7 +1171,7 @@ func (w *Worker) WithFailureThreshold(threshold float64) *Worker
WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
-### func \(\*Worker\) [WithInitialDelay]()
+### func \(\*Worker\) [WithInitialDelay]()
```go
func (w *Worker) WithInitialDelay(d time.Duration) *Worker
@@ -1013,7 +1180,7 @@ func (w *Worker) WithInitialDelay(d time.Duration) *Worker
WithInitialDelay delays the first tick to stagger startup. Requires [Worker.Every](<#Worker.Every>).
-### func \(\*Worker\) [WithJitter]()
+### func \(\*Worker\) [WithJitter]()
```go
func (w *Worker) WithJitter(percent int) *Worker
@@ -1022,7 +1189,7 @@ func (w *Worker) WithJitter(percent int) *Worker
WithJitter sets per\-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires [Worker.Every](<#Worker.Every>). Setting WithJitter\(0\) explicitly disables jitter even when a run\-level default is set via [WithDefaultJitter](<#WithDefaultJitter>).
-### func \(\*Worker\) [WithMetrics]()
+### func \(\*Worker\) [WithMetrics]()
```go
func (w *Worker) WithMetrics(m Metrics) *Worker
@@ -1031,7 +1198,7 @@ func (w *Worker) WithMetrics(m Metrics) *Worker
WithMetrics sets a per\-worker metrics implementation, overriding the metrics inherited from the parent [WorkerInfo](<#WorkerInfo>) or [Run](<#Run>) options.
-### func \(\*Worker\) [WithRestart]()
+### func \(\*Worker\) [WithRestart]()
```go
func (w *Worker) WithRestart(restart bool) *Worker
@@ -1080,7 +1247,7 @@ func main() {
-### func \(\*Worker\) [WithTimeout]()
+### func \(\*Worker\) [WithTimeout]()
```go
func (w *Worker) WithTimeout(d time.Duration) *Worker
@@ -1089,7 +1256,7 @@ func (w *Worker) WithTimeout(d time.Duration) *Worker
WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.
-## type [WorkerInfo]()
+## type [WorkerInfo]()
WorkerInfo carries worker metadata and child management. The framework always creates it — it is never nil. context.Context handles cancellation/deadlines/values; WorkerInfo handles everything worker\-specific.
@@ -1100,7 +1267,7 @@ type WorkerInfo struct {
```
-### func [NewWorkerInfo]()
+### func [NewWorkerInfo]()
```go
func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfo
@@ -1111,7 +1278,7 @@ NewWorkerInfo creates a [WorkerInfo](<#WorkerInfo>) with the given name and atte
Use [WithTestChildren](<#WithTestChildren>) to enable Add/Remove/GetChildren in tests.
-### func \(\*WorkerInfo\) [Add]()
+### func \(\*WorkerInfo\) [Add]()
```go
func (info *WorkerInfo) Add(w *Worker) bool
@@ -1123,6 +1290,8 @@ Note: Remove \+ Add is not atomic — there is a brief window where the worker i
Children inherit run\-level interceptors, metrics \(unless overridden via [Worker.WithMetrics](<#Worker.WithMetrics>)\), and scoped lifecycle — when this worker stops, all its children stop too.
+When a child permanently stops, it is automatically removed from the children map on the next call to [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>), [WorkerInfo.GetChild](<#WorkerInfo.GetChild>), or [WorkerInfo.GetChildCount](<#WorkerInfo.GetChildCount>).
+
Example
@@ -1242,7 +1411,7 @@ processor v2
-### func \(\*WorkerInfo\) [GetAttempt]()
+### func \(\*WorkerInfo\) [GetAttempt]()
```go
func (info *WorkerInfo) GetAttempt() int
@@ -1251,25 +1420,51 @@ func (info *WorkerInfo) GetAttempt() int
GetAttempt returns the restart attempt number \(0 on first run\).
-### func \(\*WorkerInfo\) [GetChild]()
+### func \(\*WorkerInfo\) [GetChild]()
```go
func (info *WorkerInfo) GetChild(name string) (Worker, bool)
```
-GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations have no effect on the running worker.
+GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations to the Worker fields have no effect on the running worker.
+
+The [CycleHandler](<#CycleHandler>) \(accessible via [Worker.GetHandler](<#Worker.GetHandler>)\) is shared with the running worker, not copied. Use type assertion to inspect handler state \(e.g., config versions for reconciliation\). See \[Example\_reconcilerWithChangeDetection\].
+
+
+### func \(\*WorkerInfo\) [GetChildCount]()
+
+```go
+func (info *WorkerInfo) GetChildCount() int
+```
+
+GetChildCount returns the number of currently running child workers. This is more efficient than len\([WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>)\) as it avoids allocating a sorted slice. Stopped children are lazily pruned.
-### func \(\*WorkerInfo\) [GetChildren]()
+### func \(\*WorkerInfo\) [GetChildren]()
```go
func (info *WorkerInfo) GetChildren() []string
```
-GetChildren returns the names of currently running child workers.
+GetChildren returns the names of currently running child workers. Stopped children are lazily pruned before building the list.
+
+
+### func \(\*WorkerInfo\) [GetHandler]()
+
+```go
+func (info *WorkerInfo) GetHandler() CycleHandler
+```
+
+GetHandler returns the worker's [CycleHandler](<#CycleHandler>), or nil if not set. Use type assertion to access handler\-specific state or interfaces:
+
+```
+if h, ok := info.GetHandler().(MyHandler); ok {
+ // access h.Config, h.Version, etc.
+}
+```
-### func \(\*WorkerInfo\) [GetName]()
+### func \(\*WorkerInfo\) [GetName]()
```go
func (info *WorkerInfo) GetName() string
@@ -1278,7 +1473,7 @@ func (info *WorkerInfo) GetName() string
GetName returns the worker's name as passed to [NewWorker](<#NewWorker>).
-### func \(\*WorkerInfo\) [Remove]()
+### func \(\*WorkerInfo\) [Remove]()
```go
func (info *WorkerInfo) Remove(name string)
@@ -1287,7 +1482,7 @@ func (info *WorkerInfo) Remove(name string)
Remove stops a child worker by name.
-## type [WorkerInfoOption]()
+## type [WorkerInfoOption]()
WorkerInfoOption configures a [WorkerInfo](<#WorkerInfo>) created by [NewWorkerInfo](<#NewWorkerInfo>).
@@ -1296,7 +1491,7 @@ type WorkerInfoOption func(*WorkerInfo)
```
-### func [WithTestChildren]()
+### func [WithTestChildren]()
```go
func WithTestChildren(ctx context.Context) WorkerInfoOption
@@ -1310,4 +1505,13 @@ defer cancel()
info := workers.NewWorkerInfo("test", 0, workers.WithTestChildren(ctx))
```
+
+### func [WithTestHandler]()
+
+```go
+func WithTestHandler(h CycleHandler) WorkerInfoOption
+```
+
+WithTestHandler sets the handler on a test [WorkerInfo](<#WorkerInfo>) so that [WorkerInfo.GetHandler](<#WorkerInfo.GetHandler>) works in unit tests.
+
Generated by [gomarkdoc]()
diff --git a/example_test.go b/example_test.go
index 6e74ad0..7a3419e 100644
--- a/example_test.go
+++ b/example_test.go
@@ -8,6 +8,19 @@ import (
"github.com/go-coldbrew/workers"
)
+// solverHandler is used in Example_reconcilerWithChangeDetection to
+// demonstrate the handler-as-metadata pattern.
+type solverHandler struct {
+ version int
+}
+
+func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error {
+ <-ctx.Done()
+ return ctx.Err()
+}
+
+func (h *solverHandler) Close() error { return nil }
+
// A simple worker that runs until cancelled.
func ExampleNewWorker() {
w := workers.NewWorker("greeter").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
@@ -338,6 +351,73 @@ func Example_dynamicWorkerPool() {
// pool shut down
}
+// Demonstrates config-driven reconciliation with change detection using
+// the handler-as-metadata pattern. The handler struct carries a config
+// version that the reconciler inspects via GetChild().GetHandler() type
+// assertion, eliminating the need for a parallel tracking map.
+func Example_reconcilerWithChangeDetection() {
+ type solverConfig struct {
+ version int
+ name string
+ }
+
+ // Simulate config that changes over 3 ticks.
+ configs := []map[string]solverConfig{
+ {"a": {version: 1, name: "a"}},
+ {"a": {version: 1, name: "a"}, "b": {version: 1, name: "b"}},
+ {"a": {version: 2, name: "a"}, "b": {version: 1, name: "b"}}, // a gets new version
+ }
+
+ tick := 0
+ manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
+ ticker := time.NewTicker(40 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ if tick >= len(configs) {
+ continue
+ }
+ desired := configs[tick]
+ tick++
+
+ // Remove workers no longer desired.
+ for _, name := range info.GetChildren() {
+ if _, ok := desired[name]; !ok {
+ info.Remove(name)
+ }
+ }
+
+ // Add new or replace changed workers.
+ for key, cfg := range desired {
+ child, exists := info.GetChild(key)
+ if exists {
+ // Check if config changed via handler type assertion.
+ if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version {
+ continue // unchanged, skip
+ }
+ info.Remove(key) // config changed, replace
+ }
+ info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
+ }
+ time.Sleep(10 * time.Millisecond)
+ fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), info.GetChildCount())
+ }
+ }
+ })
+
+ ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
+ defer cancel()
+
+ workers.Run(ctx, []*workers.Worker{manager})
+ // Output:
+ // tick 1: children=[a] count=1
+ // tick 2: children=[a b] count=2
+ // tick 3: children=[a b] count=2
+}
+
// Per-worker middleware using the interceptor pattern.
func ExampleWorker_Interceptors() {
loggingMW := func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
diff --git a/helpers.go b/helpers.go
index 1cc6601..796b6a4 100644
--- a/helpers.go
+++ b/helpers.go
@@ -2,6 +2,7 @@ package workers
import (
"context"
+ "errors"
"math/rand/v2"
"time"
)
@@ -51,7 +52,9 @@ func everyIntervalWithJitter(base time.Duration, jitterPercent int, initialDelay
return ctx.Err()
case <-timer.C:
if err := fn(ctx, info); err != nil {
- return err
+ if !errors.Is(err, ErrSkipTick) {
+ return err
+ }
}
timer.Reset(computeInterval())
}
diff --git a/helpers_test.go b/helpers_test.go
index 2ac01f6..7ae2e60 100644
--- a/helpers_test.go
+++ b/helpers_test.go
@@ -3,6 +3,7 @@ package workers
import (
"context"
"errors"
+ "fmt"
"math"
"sync/atomic"
"testing"
@@ -136,6 +137,44 @@ func TestEveryInterval_Jitter_VariableIntervals(t *testing.T) {
assert.Greater(t, stddev, 0.0, "intervals should vary with jitter enabled")
}
+func TestEveryIntervalWithJitter_ErrSkipTick(t *testing.T) {
+ var count atomic.Int32
+ fn := everyIntervalWithJitter(10*time.Millisecond, 0, 0, func(_ context.Context, _ *WorkerInfo) error {
+ n := count.Add(1)
+ if n == 1 {
+ return ErrSkipTick // skip first tick
+ }
+ return nil
+ })
+
+ ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
+ defer cancel()
+
+ info := &WorkerInfo{name: "skiptick", attempt: 0}
+ err := fn(ctx, info)
+ assert.ErrorIs(t, err, context.DeadlineExceeded, "should not exit from ErrSkipTick")
+ assert.GreaterOrEqual(t, int(count.Load()), 2, "should continue ticking after ErrSkipTick")
+}
+
+func TestEveryIntervalWithJitter_ErrSkipTick_Wrapped(t *testing.T) {
+ var count atomic.Int32
+ fn := everyIntervalWithJitter(10*time.Millisecond, 0, 0, func(_ context.Context, _ *WorkerInfo) error {
+ n := count.Add(1)
+ if n == 1 {
+ return fmt.Errorf("db timeout: %w", ErrSkipTick)
+ }
+ return nil
+ })
+
+ ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
+ defer cancel()
+
+ info := &WorkerInfo{name: "skiptick-wrapped", attempt: 0}
+ err := fn(ctx, info)
+ assert.ErrorIs(t, err, context.DeadlineExceeded, "wrapped ErrSkipTick should also be caught")
+ assert.GreaterOrEqual(t, int(count.Load()), 2)
+}
+
func TestChannelWorker(t *testing.T) {
ch := make(chan string, 3)
ch <- "a"
diff --git a/middleware/README.md b/middleware/README.md
index 902ba90..766ba4d 100644
--- a/middleware/README.md
+++ b/middleware/README.md
@@ -38,6 +38,7 @@ Package middleware provides optional interceptors for [go\\\-coldbrew/workers](<
- [type LockOption](<#LockOption>)
- [func WithKeyFunc\(fn func\(name string\) string\) LockOption](<#WithKeyFunc>)
- [func WithOnNotAcquired\(fn func\(ctx context.Context, name string\) error\) LockOption](<#WithOnNotAcquired>)
+ - [func WithSkipOnNotAcquired\(logFn func\(ctx context.Context, name string\)\) LockOption](<#WithSkipOnNotAcquired>)
- [func WithTTLFunc\(fn func\(name string\) time.Duration\) LockOption](<#WithTTLFunc>)
- [type Locker](<#Locker>)
@@ -60,7 +61,7 @@ workers.Run(ctx, myWorkers,
```
-## func [DistributedLock]()
+## func [DistributedLock]()
```go
func DistributedLock(locker Locker, opts ...LockOption) workers.Middleware
@@ -123,7 +124,7 @@ func Tracing() workers.Middleware
Tracing creates an OTEL span per cycle via go\-coldbrew/tracing. The span is named "worker:\:cycle" and records errors.
-## type [LockOption]()
+## type [LockOption]()
LockOption configures [DistributedLock](<#DistributedLock>) behavior.
@@ -132,7 +133,7 @@ type LockOption func(*lockConfig)
```
-### func [WithKeyFunc]()
+### func [WithKeyFunc]()
```go
func WithKeyFunc(fn func(name string) string) LockOption
@@ -141,7 +142,7 @@ func WithKeyFunc(fn func(name string) string) LockOption
WithKeyFunc sets a custom function to derive the lock key from the worker name. Default: "worker\-lock:\".
-### func [WithOnNotAcquired]()
+### func [WithOnNotAcquired]()
```go
func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOption
@@ -149,8 +150,19 @@ func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOpti
WithOnNotAcquired sets a callback invoked when the lock is held by another instance. The cycle is skipped. Default: skip silently \(return nil\).
+Caution: returning a non\-nil error from this callback triggers the framework's normal error handling — for periodic workers, this means restart with backoff. If you want to log and skip, return nil from this callback or use [WithSkipOnNotAcquired](<#WithSkipOnNotAcquired>).
+
+
+### func [WithSkipOnNotAcquired]()
+
+```go
+func WithSkipOnNotAcquired(logFn func(ctx context.Context, name string)) LockOption
+```
+
+WithSkipOnNotAcquired is a convenience [LockOption](<#LockOption>) that calls logFn when the lock is held and skips the cycle \(returns nil, no restart\). If logFn is nil, the cycle is skipped silently \(same as the default but explicit in intent\).
+
-### func [WithTTLFunc]()
+### func [WithTTLFunc]()
```go
func WithTTLFunc(fn func(name string) time.Duration) LockOption
@@ -159,9 +171,9 @@ func WithTTLFunc(fn func(name string) time.Duration) LockOption
WithTTLFunc sets a custom function to derive the lock TTL from the worker name. Default: 30s.
-## type [Locker]()
+## type [Locker]()
-Locker abstracts a distributed lock backend \(e.g., Redis, etcd, Consul\).
+Locker abstracts a distributed lock backend \(e.g., Redis, etcd, Consul\). If your lock implementation already has Acquire\(ctx, key, ttl\) \(bool, error\) and Release\(ctx, key\) error methods, it satisfies this interface directly — no adapter needed.
```go
type Locker interface {
diff --git a/middleware/lock.go b/middleware/lock.go
index 030e5bf..91b3d9e 100644
--- a/middleware/lock.go
+++ b/middleware/lock.go
@@ -10,6 +10,9 @@ import (
)
// Locker abstracts a distributed lock backend (e.g., Redis, etcd, Consul).
+// If your lock implementation already has Acquire(ctx, key, ttl) (bool, error)
+// and Release(ctx, key) error methods, it satisfies this interface directly —
+// no adapter needed.
type Locker interface {
// Acquire attempts to acquire a lock for the given key with a TTL.
// Returns true if the lock was acquired, false if held by another instance.
@@ -41,10 +44,28 @@ func WithTTLFunc(fn func(name string) time.Duration) LockOption {
// WithOnNotAcquired sets a callback invoked when the lock is held by another
// instance. The cycle is skipped. Default: skip silently (return nil).
+//
+// Caution: returning a non-nil error from this callback triggers the
+// framework's normal error handling — for periodic workers, this means
+// restart with backoff. If you want to log and skip, return nil from
+// this callback or use [WithSkipOnNotAcquired].
func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOption {
return func(c *lockConfig) { c.onNotAcquired = fn }
}
+// WithSkipOnNotAcquired is a convenience [LockOption] that calls logFn
+// when the lock is held and skips the cycle (returns nil, no restart).
+// If logFn is nil, the cycle is skipped silently (same as the default
+// but explicit in intent).
+func WithSkipOnNotAcquired(logFn func(ctx context.Context, name string)) LockOption {
+ return WithOnNotAcquired(func(ctx context.Context, name string) error {
+ if logFn != nil {
+ logFn(ctx, name)
+ }
+ return nil
+ })
+}
+
// DistributedLock acquires a distributed lock before each cycle. If the lock
// is held by another instance, the cycle is skipped (or the onNotAcquired
// callback is invoked). Release uses [context.WithoutCancel] so that context
diff --git a/middleware/lock_test.go b/middleware/lock_test.go
index bd274e3..45eb550 100644
--- a/middleware/lock_test.go
+++ b/middleware/lock_test.go
@@ -104,6 +104,37 @@ func TestDistributedLock_AcquireError(t *testing.T) {
assert.EqualError(t, err, "redis down")
}
+func TestDistributedLock_WithSkipOnNotAcquired(t *testing.T) {
+ locker := &mockLocker{acquired: false}
+ var gotName string
+ mw := DistributedLock(locker, WithSkipOnNotAcquired(func(_ context.Context, name string) {
+ gotName = name
+ }))
+
+ called := false
+ info := workers.NewWorkerInfo("skipped", 0)
+ err := mw(context.Background(), info, func(_ context.Context, _ *workers.WorkerInfo) error {
+ called = true
+ return nil
+ })
+
+ assert.NoError(t, err, "WithSkipOnNotAcquired should return nil")
+ assert.False(t, called, "next should not be called")
+ assert.Equal(t, "skipped", gotName, "logFn should receive worker name")
+}
+
+func TestDistributedLock_WithSkipOnNotAcquired_NilLogFn(t *testing.T) {
+ locker := &mockLocker{acquired: false}
+ mw := DistributedLock(locker, WithSkipOnNotAcquired(nil))
+
+ info := workers.NewWorkerInfo("skipped", 0)
+ err := mw(context.Background(), info, func(_ context.Context, _ *workers.WorkerInfo) error {
+ return nil
+ })
+
+ assert.NoError(t, err, "nil logFn should still skip silently")
+}
+
func TestDistributedLock_CustomKeyAndTTL(t *testing.T) {
locker := &mockLocker{acquired: true}
mw := DistributedLock(locker,
diff --git a/run.go b/run.go
index 73eb650..62d5443 100644
--- a/run.go
+++ b/run.go
@@ -16,12 +16,17 @@ import (
// permanent completion (e.g., channel closed, work exhausted).
var ErrDoNotRestart = suture.ErrDoNotRestart
+// ErrSkipTick can be returned from a periodic handler to skip the current
+// tick without triggering restart. The timer continues and the next tick
+// fires normally. Only meaningful for periodic workers (with [Worker.Every]).
+var ErrSkipTick = errors.New("workers: skip tick")
+
// RunOption configures the behavior of [Run].
type RunOption func(*runConfig)
type runConfig struct {
- metrics Metrics
- interceptors []Middleware
+ metrics Metrics
+ interceptors []Middleware
defaultJitter int // -1 = not set
}
@@ -81,12 +86,13 @@ func buildChain(middlewares []Middleware, handler CycleHandler) CycleFunc {
type workerRunService struct {
w *Worker
runFn CycleFunc // fully resolved: chain + interval wrapping
- closeFn func() // calls handler.Close() exactly once via shared sync.Once
+ closeFn func() // calls handler.Close() exactly once via shared sync.Once
childSup *suture.Supervisor
metrics Metrics
active *atomic.Int32
cfg *runConfig
attempt atomic.Int32
+ done chan struct{} // closed on permanent stop, for lazy zombie detection
}
// Serve implements suture.Service.
@@ -112,6 +118,7 @@ func (ws *workerRunService) Serve(ctx context.Context) error {
info := &WorkerInfo{
name: ws.w.name,
attempt: attempt,
+ handler: ws.w.handler,
sup: ws.childSup,
children: make(map[string]childEntry),
cfg: ws.cfg,
@@ -132,7 +139,7 @@ func (ws *workerRunService) Serve(ctx context.Context) error {
err := ws.runFn(ctx, info)
- if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) {
+ if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) && !errors.Is(err, ErrSkipTick) {
m.WorkerFailed(ws.w.name, err)
}
@@ -141,6 +148,9 @@ func (ws *workerRunService) Serve(ctx context.Context) error {
if permanentStop {
ws.closeFn()
+ if ws.done != nil {
+ close(ws.done)
+ }
return suture.ErrDoNotRestart
}
return err
@@ -164,8 +174,9 @@ func resolveMetrics(w *Worker, parent Metrics) Metrics {
// addWorkerToSupervisor creates a child supervisor for the worker,
// builds the middleware chain, resolves jitter, and adds the worker
-// to the parent supervisor. Returns the service token for removal.
-func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, active *atomic.Int32, parentMetrics Metrics) suture.ServiceToken {
+// to the parent supervisor. Returns the service token for removal and
+// a channel that is closed when the worker permanently stops.
+func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, active *atomic.Int32, parentMetrics Metrics) (suture.ServiceToken, <-chan struct{}) {
m := resolveMetrics(w, parentMetrics)
handler := w.handler
@@ -206,12 +217,15 @@ func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig,
})
}
+ done := make(chan struct{})
childSup := suture.New("worker:"+w.name, w.sutureSpec(makeEventHook(m)))
childSup.Add(&workerRunService{
w: w, runFn: runFn, closeFn: closeFn,
childSup: childSup, metrics: m, active: active, cfg: cfg,
+ done: done,
})
- return parent.Add(&closingSupervisor{Supervisor: childSup, closeFn: closeFn})
+ tok := parent.Add(&closingSupervisor{Supervisor: childSup, closeFn: closeFn})
+ return tok, done
}
// Run starts all workers under a suture supervisor and blocks until ctx is
@@ -231,7 +245,7 @@ func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error {
EventHook: makeEventHook(cfg.metrics),
})
for _, w := range workers {
- addWorkerToSupervisor(root, w, cfg, active, cfg.metrics)
+ _, _ = addWorkerToSupervisor(root, w, cfg, active, cfg.metrics)
}
err := root.Serve(ctx)
if err != nil && ctx.Err() != nil {
diff --git a/run_test.go b/run_test.go
index 8e69f6c..1f8ff46 100644
--- a/run_test.go
+++ b/run_test.go
@@ -423,6 +423,40 @@ func TestRun_ClosingSupervisor_ClosesOnShutdown(t *testing.T) {
assert.Equal(t, int32(1), closeCount.Load(), "Close should be called exactly once")
}
+func TestRun_ErrSkipTick_PeriodicWorker(t *testing.T) {
+ var count atomic.Int32
+ w := NewWorker("skipper").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ n := count.Add(1)
+ if n == 1 {
+ return ErrSkipTick
+ }
+ return nil
+ }).Every(10 * time.Millisecond)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
+ defer cancel()
+
+ err := Run(ctx, []*Worker{w})
+ assert.NoError(t, err)
+ assert.GreaterOrEqual(t, int(count.Load()), 2, "should tick again after ErrSkipTick")
+}
+
+func TestRun_ErrSkipTick_NotCountedAsFailure(t *testing.T) {
+ m := newMockMetrics()
+ w := NewWorker("skipper").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ return ErrSkipTick
+ }).Every(10 * time.Millisecond)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+ defer cancel()
+
+ Run(ctx, []*Worker{w}, WithMetrics(m))
+
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ assert.Empty(t, m.failed, "ErrSkipTick should not be counted as failure")
+}
+
func TestRun_ErrDoNotRestart_NotCountedAsFailure(t *testing.T) {
m := newMockMetrics()
w := NewWorker("completer").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
diff --git a/worker.go b/worker.go
index 5a1e3ab..8cba2dc 100644
--- a/worker.go
+++ b/worker.go
@@ -33,6 +33,20 @@
// even with restart enabled. Use [ErrDoNotRestart] for explicit permanent
// completion from periodic handlers.
//
+// # Error Semantics for Periodic Handlers
+//
+// The return value from a periodic handler determines what happens next:
+//
+// | Return value | Timer loop | Restart? | Use case |
+// |-----------------|------------|----------|--------------------------|
+// | nil | continues | n/a | Success, next tick fires |
+// | ErrSkipTick | continues | n/a | Transient failure, skip |
+// | ErrDoNotRestart | exits | no | Permanent completion |
+// | other error | exits | yes* | Failure, needs restart |
+// | ctx.Err() | exits | no | Graceful shutdown |
+//
+// *Only if [Worker.WithRestart](true) (the default).
+//
// # Middleware
//
// Cross-cutting concerns like tracing, logging, and panic recovery are
@@ -50,6 +64,13 @@
// [WithInterceptors]. Built-in middleware is available in the middleware/
// sub-package.
//
+// Run-level interceptors ([WithInterceptors]) wrap all workers and are
+// best for cross-cutting defaults (tracing, logging, panic recovery).
+// Worker-level interceptors ([Worker.Interceptors]) are best for
+// worker-specific concerns (distributed locks with per-worker TTL,
+// rate limiting). Children inherit run-level interceptors but not the
+// parent's worker-level interceptors.
+//
// # Helpers
//
// Common patterns are provided as helpers:
@@ -83,6 +104,7 @@ import (
type WorkerInfo struct {
name string
attempt int
+ handler CycleHandler // the worker's handler, set by framework
// child management, set by framework
sup *suture.Supervisor
@@ -97,6 +119,7 @@ type WorkerInfo struct {
type childEntry struct {
token suture.ServiceToken
worker *Worker
+ done <-chan struct{} // closed when the child permanently stops
}
// GetName returns the worker's name as passed to [NewWorker].
@@ -105,6 +128,14 @@ func (info *WorkerInfo) GetName() string { return info.name }
// GetAttempt returns the restart attempt number (0 on first run).
func (info *WorkerInfo) GetAttempt() int { return info.attempt }
+// GetHandler returns the worker's [CycleHandler], or nil if not set.
+// Use type assertion to access handler-specific state or interfaces:
+//
+// if h, ok := info.GetHandler().(MyHandler); ok {
+// // access h.Config, h.Version, etc.
+// }
+func (info *WorkerInfo) GetHandler() CycleHandler { return info.handler }
+
// WorkerInfoOption configures a [WorkerInfo] created by [NewWorkerInfo].
type WorkerInfoOption func(*WorkerInfo)
@@ -127,6 +158,12 @@ func WithTestChildren(ctx context.Context) WorkerInfoOption {
}
}
+// WithTestHandler sets the handler on a test [WorkerInfo] so that
+// [WorkerInfo.GetHandler] works in unit tests.
+func WithTestHandler(h CycleHandler) WorkerInfoOption {
+ return func(info *WorkerInfo) { info.handler = h }
+}
+
// NewWorkerInfo creates a [WorkerInfo] with the given name and attempt.
// This is useful for testing middleware and handlers — the framework
// creates fully populated instances internally.
@@ -151,6 +188,10 @@ func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerIn
// Children inherit run-level interceptors, metrics (unless overridden via
// [Worker.WithMetrics]), and scoped lifecycle — when this worker stops,
// all its children stop too.
+//
+// When a child permanently stops, it is automatically removed from
+// the children map on the next call to [WorkerInfo.GetChildren],
+// [WorkerInfo.GetChild], or [WorkerInfo.GetChildCount].
func (info *WorkerInfo) Add(w *Worker) bool {
if info.sup == nil {
return false
@@ -161,8 +202,8 @@ func (info *WorkerInfo) Add(w *Worker) bool {
if _, ok := info.children[w.name]; ok {
return false
}
- tok := addWorkerToSupervisor(info.sup, w, info.cfg, info.active, info.metrics)
- info.children[w.name] = childEntry{token: tok, worker: w}
+ tok, done := addWorkerToSupervisor(info.sup, w, info.cfg, info.active, info.metrics)
+ info.children[w.name] = childEntry{token: tok, worker: w, done: done}
return true
}
@@ -177,6 +218,21 @@ func (info *WorkerInfo) Remove(name string) {
info.removeLocked(name)
}
+// pruneStoppedLocked removes children whose done channel is closed.
+// Caller must hold childrenMu.
+func (info *WorkerInfo) pruneStoppedLocked() {
+ for name, entry := range info.children {
+ if entry.done == nil {
+ continue
+ }
+ select {
+ case <-entry.done:
+ delete(info.children, name)
+ default:
+ }
+ }
+}
+
// removeLocked stops and deletes a child by name. Caller must hold childrenMu.
func (info *WorkerInfo) removeLocked(name string) {
if entry, ok := info.children[name]; ok {
@@ -186,10 +242,12 @@ func (info *WorkerInfo) removeLocked(name string) {
}
// GetChildren returns the names of currently running child workers.
+// Stopped children are lazily pruned before building the list.
func (info *WorkerInfo) GetChildren() []string {
info.childrenMu.Lock()
defer info.childrenMu.Unlock()
+ info.pruneStoppedLocked()
names := make([]string, 0, len(info.children))
for name := range info.children {
names = append(names, name)
@@ -198,13 +256,29 @@ func (info *WorkerInfo) GetChildren() []string {
return names
}
+// GetChildCount returns the number of currently running child workers.
+// This is more efficient than len([WorkerInfo.GetChildren]) as it avoids
+// allocating a sorted slice. Stopped children are lazily pruned.
+func (info *WorkerInfo) GetChildCount() int {
+ info.childrenMu.Lock()
+ defer info.childrenMu.Unlock()
+ info.pruneStoppedLocked()
+ return len(info.children)
+}
+
// GetChild returns a copy of a running child worker and true, or the zero
// value and false if not found. The returned value is a snapshot —
-// mutations have no effect on the running worker.
+// mutations to the Worker fields have no effect on the running worker.
+//
+// The [CycleHandler] (accessible via [Worker.GetHandler]) is shared with
+// the running worker, not copied. Use type assertion to inspect handler
+// state (e.g., config versions for reconciliation). See
+// [Example_reconcilerWithChangeDetection].
func (info *WorkerInfo) GetChild(name string) (Worker, bool) {
info.childrenMu.Lock()
defer info.childrenMu.Unlock()
+ info.pruneStoppedLocked()
if entry, ok := info.children[name]; ok {
return *entry.worker, true
}
@@ -262,6 +336,21 @@ func (w *Worker) GetName() string { return w.name }
// GetHandler returns the worker's [CycleHandler], or nil if not set.
func (w *Worker) GetHandler() CycleHandler { return w.handler }
+// GetInterval returns the periodic interval, or 0 if this is not a
+// periodic worker.
+func (w *Worker) GetInterval() time.Duration { return w.interval }
+
+// GetRestartOnFail returns whether the worker restarts on failure.
+func (w *Worker) GetRestartOnFail() bool { return w.restartOnFail }
+
+// GetJitterPercent returns the jitter percentage. -1 means inherit
+// run-level default, 0 means no jitter.
+func (w *Worker) GetJitterPercent() int { return w.jitterPercent }
+
+// GetInitialDelay returns the initial delay before the first tick,
+// or 0 if not set.
+func (w *Worker) GetInitialDelay() time.Duration { return w.initialDelay }
+
// Handler sets the worker's [CycleHandler]. Use this for handlers that
// need cleanup via Close (e.g., database connections, leases).
func (w *Worker) Handler(h CycleHandler) *Worker {
diff --git a/worker_test.go b/worker_test.go
index b0b804c..0f7a560 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -141,6 +141,44 @@ func TestWorker_WithBackoffJitter(t *testing.T) {
assert.NotNil(t, w.backoffJitter)
}
+func TestWorkerInfo_GetHandler(t *testing.T) {
+ fn := CycleFunc(func(_ context.Context, _ *WorkerInfo) error { return nil })
+ info := NewWorkerInfo("test", 0, WithTestHandler(fn))
+ assert.NotNil(t, info.GetHandler())
+}
+
+func TestWorkerInfo_GetHandler_Nil(t *testing.T) {
+ info := NewWorkerInfo("test", 0)
+ assert.Nil(t, info.GetHandler())
+}
+
+func TestWorkerInfo_GetChildCount(t *testing.T) {
+ info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
+
+ assert.Equal(t, 0, info.GetChildCount())
+
+ childFn := CycleFunc(func(ctx context.Context, _ *WorkerInfo) error {
+ <-ctx.Done()
+ return ctx.Err()
+ })
+
+ info.Add(NewWorker("a").HandlerFunc(childFn))
+ info.Add(NewWorker("b").HandlerFunc(childFn))
+ time.Sleep(20 * time.Millisecond)
+
+ assert.Equal(t, 2, info.GetChildCount())
+
+ info.Remove("a")
+ time.Sleep(20 * time.Millisecond)
+
+ assert.Equal(t, 1, info.GetChildCount())
+}
+
+func TestWorkerInfo_GetChildCount_Nil(t *testing.T) {
+ info := &WorkerInfo{name: "test"}
+ assert.Equal(t, 0, info.GetChildCount())
+}
+
func TestWorkerInfo_GetChild(t *testing.T) {
info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
@@ -225,6 +263,69 @@ func TestNewWorkerInfo_WithTestChildren(t *testing.T) {
assert.Equal(t, []string{"b"}, info.GetChildren())
}
+func TestWorkerInfo_ZombieChild_AutoCleanup(t *testing.T) {
+ info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
+
+ // Add a child that returns nil immediately (no restart).
+ info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ return nil
+ }).WithRestart(false))
+
+ time.Sleep(100 * time.Millisecond) // let child stop
+
+ // Lazy prune should remove the stopped child.
+ assert.Equal(t, 0, info.GetChildCount())
+ assert.Empty(t, info.GetChildren())
+}
+
+func TestWorkerInfo_ZombieChild_ErrDoNotRestart(t *testing.T) {
+ info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
+
+ info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ return ErrDoNotRestart
+ }))
+
+ time.Sleep(100 * time.Millisecond)
+
+ assert.Equal(t, 0, info.GetChildCount())
+ assert.Empty(t, info.GetChildren())
+}
+
+func TestWorkerInfo_ZombieChild_ReAdd(t *testing.T) {
+ info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
+
+ // Add a child that stops immediately.
+ info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ return nil
+ }).WithRestart(false))
+
+ time.Sleep(100 * time.Millisecond)
+
+ // After prune, Add with same name should succeed.
+ assert.Equal(t, 0, info.GetChildCount())
+ added := info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error {
+ <-ctx.Done()
+ return ctx.Err()
+ }))
+ assert.True(t, added, "re-Add after zombie prune should succeed")
+ time.Sleep(20 * time.Millisecond)
+ assert.Equal(t, 1, info.GetChildCount())
+}
+
+func TestWorkerInfo_ZombieChild_GetChild(t *testing.T) {
+ info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
+
+ info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ return nil
+ }).WithRestart(false))
+
+ time.Sleep(100 * time.Millisecond)
+
+ // GetChild should also prune and return false.
+ _, ok := info.GetChild("child")
+ assert.False(t, ok)
+}
+
func TestNewWorkerInfo_Minimal(t *testing.T) {
// Without options, Add/Remove/GetChildren are safe no-ops.
info := NewWorkerInfo("test", 5)
@@ -237,6 +338,29 @@ func TestNewWorkerInfo_Minimal(t *testing.T) {
assert.Empty(t, info.GetChildren())
}
+func TestWorker_ConfigGetters(t *testing.T) {
+ w := NewWorker("test").
+ HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { return nil }).
+ Every(30 * time.Second).
+ WithJitter(15).
+ WithInitialDelay(5 * time.Second).
+ WithRestart(false)
+
+ assert.Equal(t, 30*time.Second, w.GetInterval())
+ assert.Equal(t, 15, w.GetJitterPercent())
+ assert.Equal(t, 5*time.Second, w.GetInitialDelay())
+ assert.False(t, w.GetRestartOnFail())
+}
+
+func TestWorker_ConfigGetters_Defaults(t *testing.T) {
+ w := NewWorker("test")
+
+ assert.Equal(t, time.Duration(0), w.GetInterval())
+ assert.Equal(t, -1, w.GetJitterPercent())
+ assert.Equal(t, time.Duration(0), w.GetInitialDelay())
+ assert.True(t, w.GetRestartOnFail())
+}
+
func TestWorker_InterceptorsCopiesSlice(t *testing.T) {
mw := func(_ context.Context, _ *WorkerInfo, next CycleFunc) error { return next(nil, nil) }
original := []Middleware{mw}
From 87ad33ae74cbf728cda943c4a7924377a5426cca Mon Sep 17 00:00:00 2001
From: Ankur Shrivastava
Date: Sat, 25 Apr 2026 22:41:27 +0800
Subject: [PATCH 2/9] fix: prune stopped children in Add before name-exists
check
Add() held the lock but didn't call pruneStoppedLocked(), so a child
that permanently stopped could block re-Add until the caller happened
to call GetChildren/GetChild/GetChildCount. Now Add prunes stale
entries before checking for name conflicts.
---
README.md | 66 +++++++++++++++++++++++++-------------------------
worker.go | 6 +++--
worker_test.go | 21 ++++++++++++++++
3 files changed, 58 insertions(+), 35 deletions(-)
diff --git a/README.md b/README.md
index 2197038..792600c 100644
--- a/README.md
+++ b/README.md
@@ -601,7 +601,7 @@ func (BaseMetrics) WorkerStopped(string)
-## type [CycleFunc]()
+## type [CycleFunc]()
CycleFunc adapts a plain function into a [CycleHandler](<#CycleHandler>). Close is a no\-op — use this for simple, stateless handlers.
@@ -773,7 +773,7 @@ tick 2
-### func \(CycleFunc\) [Close]()
+### func \(CycleFunc\) [Close]()
```go
func (fn CycleFunc) Close() error
@@ -782,7 +782,7 @@ func (fn CycleFunc) Close() error
Close is a no\-op for CycleFunc.
-### func \(CycleFunc\) [RunCycle]()
+### func \(CycleFunc\) [RunCycle]()
```go
func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
@@ -791,7 +791,7 @@ func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
-## type [CycleHandler]()
+## type [CycleHandler]()
CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources.
@@ -832,7 +832,7 @@ func NewPrometheusMetrics(namespace string) Metrics
NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names \(e.g., "myapp" → "myapp\_worker\_started\_total"\). Metrics are auto\-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process\-global; use a small number of static namespaces \(not per\-request/tenant values\).
-## type [Middleware]()
+## type [Middleware]()
Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention.
@@ -886,7 +886,7 @@ func WithMetrics(m Metrics) RunOption
WithMetrics sets the metrics implementation for all workers started by [Run](<#Run>). Workers inherit this unless they override via [Worker.WithMetrics](<#Worker.WithMetrics>). If not set, [BaseMetrics](<#BaseMetrics>) is used.
-## type [Worker]()
+## type [Worker]()
Worker represents a background goroutine managed by the framework. Create with [NewWorker](<#NewWorker>) and configure with builder methods.
@@ -897,7 +897,7 @@ type Worker struct {
```
-### func [NewWorker]()
+### func [NewWorker]()
```go
func NewWorker(name string) *Worker
@@ -945,7 +945,7 @@ worker "greeter" started (attempt 0)
-### func \(\*Worker\) [AddInterceptors]()
+### func \(\*Worker\) [AddInterceptors]()
```go
func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
@@ -954,7 +954,7 @@ func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
AddInterceptors appends to the worker\-level interceptor list.
-### func \(\*Worker\) [Every]()
+### func \(\*Worker\) [Every]()
```go
func (w *Worker) Every(d time.Duration) *Worker
@@ -1004,7 +1004,7 @@ tick 2
-### func \(\*Worker\) [GetHandler]()
+### func \(\*Worker\) [GetHandler]()
```go
func (w *Worker) GetHandler() CycleHandler
@@ -1013,7 +1013,7 @@ func (w *Worker) GetHandler() CycleHandler
GetHandler returns the worker's [CycleHandler](<#CycleHandler>), or nil if not set.
-### func \(\*Worker\) [GetInitialDelay]()
+### func \(\*Worker\) [GetInitialDelay]()
```go
func (w *Worker) GetInitialDelay() time.Duration
@@ -1022,7 +1022,7 @@ func (w *Worker) GetInitialDelay() time.Duration
GetInitialDelay returns the initial delay before the first tick, or 0 if not set.
-### func \(\*Worker\) [GetInterval]()
+### func \(\*Worker\) [GetInterval]()
```go
func (w *Worker) GetInterval() time.Duration
@@ -1031,7 +1031,7 @@ func (w *Worker) GetInterval() time.Duration
GetInterval returns the periodic interval, or 0 if this is not a periodic worker.
-### func \(\*Worker\) [GetJitterPercent]()
+### func \(\*Worker\) [GetJitterPercent]()
```go
func (w *Worker) GetJitterPercent() int
@@ -1040,7 +1040,7 @@ func (w *Worker) GetJitterPercent() int
GetJitterPercent returns the jitter percentage. \-1 means inherit run\-level default, 0 means no jitter.
-### func \(\*Worker\) [GetName]()
+### func \(\*Worker\) [GetName]()
```go
func (w *Worker) GetName() string
@@ -1049,7 +1049,7 @@ func (w *Worker) GetName() string
GetName returns the worker's name.
-### func \(\*Worker\) [GetRestartOnFail]()
+### func \(\*Worker\) [GetRestartOnFail]()
```go
func (w *Worker) GetRestartOnFail() bool
@@ -1058,7 +1058,7 @@ func (w *Worker) GetRestartOnFail() bool
GetRestartOnFail returns whether the worker restarts on failure.
-### func \(\*Worker\) [Handler]()
+### func \(\*Worker\) [Handler]()
```go
func (w *Worker) Handler(h CycleHandler) *Worker
@@ -1067,7 +1067,7 @@ func (w *Worker) Handler(h CycleHandler) *Worker
Handler sets the worker's [CycleHandler](<#CycleHandler>). Use this for handlers that need cleanup via Close \(e.g., database connections, leases\).
-### func \(\*Worker\) [HandlerFunc]()
+### func \(\*Worker\) [HandlerFunc]()
```go
func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
@@ -1076,7 +1076,7 @@ func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
HandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers.
-### func \(\*Worker\) [Interceptors]()
+### func \(\*Worker\) [Interceptors]()
```go
func (w *Worker) Interceptors(mw ...Middleware) *Worker
@@ -1135,7 +1135,7 @@ func main() {
-### func \(\*Worker\) [WithBackoffJitter]()
+### func \(\*Worker\) [WithBackoffJitter]()
```go
func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker
@@ -1144,7 +1144,7 @@ func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Wo
WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration.
-### func \(\*Worker\) [WithFailureBackoff]()
+### func \(\*Worker\) [WithFailureBackoff]()
```go
func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
@@ -1153,7 +1153,7 @@ func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
-### func \(\*Worker\) [WithFailureDecay]()
+### func \(\*Worker\) [WithFailureDecay]()
```go
func (w *Worker) WithFailureDecay(decay float64) *Worker
@@ -1162,7 +1162,7 @@ func (w *Worker) WithFailureDecay(decay float64) *Worker
WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
-### func \(\*Worker\) [WithFailureThreshold]()
+### func \(\*Worker\) [WithFailureThreshold]()
```go
func (w *Worker) WithFailureThreshold(threshold float64) *Worker
@@ -1171,7 +1171,7 @@ func (w *Worker) WithFailureThreshold(threshold float64) *Worker
WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
-### func \(\*Worker\) [WithInitialDelay]()
+### func \(\*Worker\) [WithInitialDelay]()
```go
func (w *Worker) WithInitialDelay(d time.Duration) *Worker
@@ -1180,7 +1180,7 @@ func (w *Worker) WithInitialDelay(d time.Duration) *Worker
WithInitialDelay delays the first tick to stagger startup. Requires [Worker.Every](<#Worker.Every>).
-### func \(\*Worker\) [WithJitter]()
+### func \(\*Worker\) [WithJitter]()
```go
func (w *Worker) WithJitter(percent int) *Worker
@@ -1189,7 +1189,7 @@ func (w *Worker) WithJitter(percent int) *Worker
WithJitter sets per\-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires [Worker.Every](<#Worker.Every>). Setting WithJitter\(0\) explicitly disables jitter even when a run\-level default is set via [WithDefaultJitter](<#WithDefaultJitter>).
-### func \(\*Worker\) [WithMetrics]()
+### func \(\*Worker\) [WithMetrics]()
```go
func (w *Worker) WithMetrics(m Metrics) *Worker
@@ -1198,7 +1198,7 @@ func (w *Worker) WithMetrics(m Metrics) *Worker
WithMetrics sets a per\-worker metrics implementation, overriding the metrics inherited from the parent [WorkerInfo](<#WorkerInfo>) or [Run](<#Run>) options.
-### func \(\*Worker\) [WithRestart]()
+### func \(\*Worker\) [WithRestart]()
```go
func (w *Worker) WithRestart(restart bool) *Worker
@@ -1247,7 +1247,7 @@ func main() {
-### func \(\*Worker\) [WithTimeout]()
+### func \(\*Worker\) [WithTimeout]()
```go
func (w *Worker) WithTimeout(d time.Duration) *Worker
@@ -1278,7 +1278,7 @@ NewWorkerInfo creates a [WorkerInfo](<#WorkerInfo>) with the given name and atte
Use [WithTestChildren](<#WithTestChildren>) to enable Add/Remove/GetChildren in tests.
-### func \(\*WorkerInfo\) [Add]()
+### func \(\*WorkerInfo\) [Add]()
```go
func (info *WorkerInfo) Add(w *Worker) bool
@@ -1290,7 +1290,7 @@ Note: Remove \+ Add is not atomic — there is a brief window where the worker i
Children inherit run\-level interceptors, metrics \(unless overridden via [Worker.WithMetrics](<#Worker.WithMetrics>)\), and scoped lifecycle — when this worker stops, all its children stop too.
-When a child permanently stops, it is automatically removed from the children map on the next call to [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>), [WorkerInfo.GetChild](<#WorkerInfo.GetChild>), or [WorkerInfo.GetChildCount](<#WorkerInfo.GetChildCount>).
+When a child permanently stops, it is automatically removed from the children map on the next call to [WorkerInfo.Add](<#WorkerInfo.Add>), [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>), [WorkerInfo.GetChild](<#WorkerInfo.GetChild>), or [WorkerInfo.GetChildCount](<#WorkerInfo.GetChildCount>).
Example
@@ -1420,7 +1420,7 @@ func (info *WorkerInfo) GetAttempt() int
GetAttempt returns the restart attempt number \(0 on first run\).
-### func \(\*WorkerInfo\) [GetChild]()
+### func \(\*WorkerInfo\) [GetChild]()
```go
func (info *WorkerInfo) GetChild(name string) (Worker, bool)
@@ -1431,7 +1431,7 @@ GetChild returns a copy of a running child worker and true, or the zero value an
The [CycleHandler](<#CycleHandler>) \(accessible via [Worker.GetHandler](<#Worker.GetHandler>)\) is shared with the running worker, not copied. Use type assertion to inspect handler state \(e.g., config versions for reconciliation\). See \[Example\_reconcilerWithChangeDetection\].
-### func \(\*WorkerInfo\) [GetChildCount]()
+### func \(\*WorkerInfo\) [GetChildCount]()
```go
func (info *WorkerInfo) GetChildCount() int
@@ -1440,7 +1440,7 @@ func (info *WorkerInfo) GetChildCount() int
GetChildCount returns the number of currently running child workers. This is more efficient than len\([WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>)\) as it avoids allocating a sorted slice. Stopped children are lazily pruned.
-### func \(\*WorkerInfo\) [GetChildren]()
+### func \(\*WorkerInfo\) [GetChildren]()
```go
func (info *WorkerInfo) GetChildren() []string
@@ -1473,7 +1473,7 @@ func (info *WorkerInfo) GetName() string
GetName returns the worker's name as passed to [NewWorker](<#NewWorker>).
-### func \(\*WorkerInfo\) [Remove]()
+### func \(\*WorkerInfo\) [Remove]()
```go
func (info *WorkerInfo) Remove(name string)
diff --git a/worker.go b/worker.go
index 8cba2dc..fb678ab 100644
--- a/worker.go
+++ b/worker.go
@@ -190,8 +190,9 @@ func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerIn
// all its children stop too.
//
// When a child permanently stops, it is automatically removed from
-// the children map on the next call to [WorkerInfo.GetChildren],
-// [WorkerInfo.GetChild], or [WorkerInfo.GetChildCount].
+// the children map on the next call to [WorkerInfo.Add],
+// [WorkerInfo.GetChildren], [WorkerInfo.GetChild], or
+// [WorkerInfo.GetChildCount].
func (info *WorkerInfo) Add(w *Worker) bool {
if info.sup == nil {
return false
@@ -199,6 +200,7 @@ func (info *WorkerInfo) Add(w *Worker) bool {
info.childrenMu.Lock()
defer info.childrenMu.Unlock()
+ info.pruneStoppedLocked()
if _, ok := info.children[w.name]; ok {
return false
}
diff --git a/worker_test.go b/worker_test.go
index 0f7a560..b28c4ce 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -312,6 +312,27 @@ func TestWorkerInfo_ZombieChild_ReAdd(t *testing.T) {
assert.Equal(t, 1, info.GetChildCount())
}
+func TestWorkerInfo_ZombieChild_ReAdd_NoRead(t *testing.T) {
+ info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
+
+ // Add a child that stops immediately.
+ info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ return nil
+ }).WithRestart(false))
+
+ time.Sleep(100 * time.Millisecond)
+
+ // Re-Add directly — no GetChildren/GetChild/GetChildCount call in between.
+ // Add must prune the stale entry itself.
+ added := info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error {
+ <-ctx.Done()
+ return ctx.Err()
+ }))
+ assert.True(t, added, "Add should prune stopped child and allow re-Add")
+ time.Sleep(20 * time.Millisecond)
+ assert.Equal(t, 1, info.GetChildCount())
+}
+
func TestWorkerInfo_ZombieChild_GetChild(t *testing.T) {
info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
From 2751368ac30d6bc45e14ced2078c491cb08d07d4 Mon Sep 17 00:00:00 2001
From: Ankur Shrivastava
Date: Sat, 25 Apr 2026 22:50:37 +0800
Subject: [PATCH 3/9] fix: only suppress ErrSkipTick failure metric for
periodic workers
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
A non-periodic worker returning ErrSkipTick would silently skip the
WorkerFailed metric while still restarting — a silent failure loop.
Now ErrSkipTick is only suppressed from failure metrics when the
worker has an interval (periodic), matching the semantics: ErrSkipTick
is only meaningful for periodic workers where the timer loop handles it.
---
README.md | 4 ++--
run.go | 3 ++-
run_test.go | 18 ++++++++++++++++++
3 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/README.md b/README.md
index 792600c..e9f70de 100644
--- a/README.md
+++ b/README.md
@@ -412,7 +412,7 @@ var ErrSkipTick = errors.New("workers: skip tick")
```
-## func [Run]()
+## func [Run]()
```go
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
@@ -468,7 +468,7 @@ all workers stopped
-## func [RunWorker]()
+## func [RunWorker]()
```go
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
diff --git a/run.go b/run.go
index 62d5443..d18ea60 100644
--- a/run.go
+++ b/run.go
@@ -139,7 +139,8 @@ func (ws *workerRunService) Serve(ctx context.Context) error {
err := ws.runFn(ctx, info)
- if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) && !errors.Is(err, ErrSkipTick) {
+ if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) &&
+ (ws.w.interval <= 0 || !errors.Is(err, ErrSkipTick)) {
m.WorkerFailed(ws.w.name, err)
}
diff --git a/run_test.go b/run_test.go
index 1f8ff46..2511a64 100644
--- a/run_test.go
+++ b/run_test.go
@@ -457,6 +457,24 @@ func TestRun_ErrSkipTick_NotCountedAsFailure(t *testing.T) {
assert.Empty(t, m.failed, "ErrSkipTick should not be counted as failure")
}
+func TestRun_ErrSkipTick_NonPeriodic_CountedAsFailure(t *testing.T) {
+ m := newMockMetrics()
+ var attempts atomic.Int32
+ w := NewWorker("non-periodic-skipper").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
+ attempts.Add(1)
+ return ErrSkipTick // meaningless for non-periodic, should be treated as normal error
+ }).WithRestart(false) // stop after first attempt
+
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ Run(ctx, []*Worker{w}, WithMetrics(m))
+
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ assert.NotEmpty(t, m.failed, "ErrSkipTick from non-periodic worker should be counted as failure")
+}
+
func TestRun_ErrDoNotRestart_NotCountedAsFailure(t *testing.T) {
m := newMockMetrics()
w := NewWorker("completer").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error {
From af2bcf109da6bcf40c5135ed04b36a87f0e84932 Mon Sep 17 00:00:00 2001
From: Ankur Shrivastava
Date: Sat, 25 Apr 2026 23:29:23 +0800
Subject: [PATCH 4/9] refactor: use suture Services() as source of truth for
child liveness
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Replace the children map + done channel with suture's Services() API.
closingSupervisor now implements childService interface, exposing
worker name, Worker config, and token. All child enumeration
(Add/Remove/GetChildren/GetChild) queries suture directly.
- Remove childEntry type, children map, childrenMu, pruneStoppedLocked,
removeLocked, done channel
- Add childService interface with isActive() check via inner supervisor
- Remove GetChildCount (no longer cheaper than len(GetChildren()))
- Fix GetChildCount doc nit (len([WorkerInfo.GetChildren]) → valid Go)
Zombie children are eliminated by design — suture only returns active
services, so stopped children are never visible to the parent.
---
README.md | 100 ++++++++++++++++++-----------------------
example_test.go | 2 +-
run.go | 77 ++++++++++++++++++--------------
worker.go | 115 +++++++++++++++++-------------------------------
worker_test.go | 50 +++++----------------
5 files changed, 142 insertions(+), 202 deletions(-)
diff --git a/README.md b/README.md
index e9f70de..bee2eef 100644
--- a/README.md
+++ b/README.md
@@ -266,7 +266,7 @@ func main() {
info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
}
time.Sleep(10 * time.Millisecond)
- fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), info.GetChildCount())
+ fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren()))
}
}
})
@@ -387,7 +387,6 @@ shutdown complete
- [func \(info \*WorkerInfo\) Add\(w \*Worker\) bool](<#WorkerInfo.Add>)
- [func \(info \*WorkerInfo\) GetAttempt\(\) int](<#WorkerInfo.GetAttempt>)
- [func \(info \*WorkerInfo\) GetChild\(name string\) \(Worker, bool\)](<#WorkerInfo.GetChild>)
- - [func \(info \*WorkerInfo\) GetChildCount\(\) int](<#WorkerInfo.GetChildCount>)
- [func \(info \*WorkerInfo\) GetChildren\(\) \[\]string](<#WorkerInfo.GetChildren>)
- [func \(info \*WorkerInfo\) GetHandler\(\) CycleHandler](<#WorkerInfo.GetHandler>)
- [func \(info \*WorkerInfo\) GetName\(\) string](<#WorkerInfo.GetName>)
@@ -412,7 +411,7 @@ var ErrSkipTick = errors.New("workers: skip tick")
```
-## func [Run]()
+## func [Run]()
```go
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
@@ -468,7 +467,7 @@ all workers stopped
-## func [RunWorker]()
+## func [RunWorker]()
```go
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
@@ -601,7 +600,7 @@ func (BaseMetrics) WorkerStopped(string)
-## type [CycleFunc]()
+## type [CycleFunc]()
CycleFunc adapts a plain function into a [CycleHandler](<#CycleHandler>). Close is a no\-op — use this for simple, stateless handlers.
@@ -773,7 +772,7 @@ tick 2
-### func \(CycleFunc\) [Close]()
+### func \(CycleFunc\) [Close]()
```go
func (fn CycleFunc) Close() error
@@ -782,7 +781,7 @@ func (fn CycleFunc) Close() error
Close is a no\-op for CycleFunc.
-### func \(CycleFunc\) [RunCycle]()
+### func \(CycleFunc\) [RunCycle]()
```go
func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
@@ -791,7 +790,7 @@ func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error
-## type [CycleHandler]()
+## type [CycleHandler]()
CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources.
@@ -832,7 +831,7 @@ func NewPrometheusMetrics(namespace string) Metrics
NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names \(e.g., "myapp" → "myapp\_worker\_started\_total"\). Metrics are auto\-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process\-global; use a small number of static namespaces \(not per\-request/tenant values\).
-## type [Middleware]()
+## type [Middleware]()
Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention.
@@ -886,7 +885,7 @@ func WithMetrics(m Metrics) RunOption
WithMetrics sets the metrics implementation for all workers started by [Run](<#Run>). Workers inherit this unless they override via [Worker.WithMetrics](<#Worker.WithMetrics>). If not set, [BaseMetrics](<#BaseMetrics>) is used.
-## type [Worker]()
+## type [Worker]()
Worker represents a background goroutine managed by the framework. Create with [NewWorker](<#NewWorker>) and configure with builder methods.
@@ -897,7 +896,7 @@ type Worker struct {
```
-### func [NewWorker]()
+### func [NewWorker]()
```go
func NewWorker(name string) *Worker
@@ -945,7 +944,7 @@ worker "greeter" started (attempt 0)
-### func \(\*Worker\) [AddInterceptors]()
+### func \(\*Worker\) [AddInterceptors]()
```go
func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
@@ -954,7 +953,7 @@ func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
AddInterceptors appends to the worker\-level interceptor list.
-### func \(\*Worker\) [Every]()
+### func \(\*Worker\) [Every]()
```go
func (w *Worker) Every(d time.Duration) *Worker
@@ -1004,7 +1003,7 @@ tick 2
-### func \(\*Worker\) [GetHandler]()
+### func \(\*Worker\) [GetHandler]()
```go
func (w *Worker) GetHandler() CycleHandler
@@ -1013,7 +1012,7 @@ func (w *Worker) GetHandler() CycleHandler
GetHandler returns the worker's [CycleHandler](<#CycleHandler>), or nil if not set.
-### func \(\*Worker\) [GetInitialDelay]()
+### func \(\*Worker\) [GetInitialDelay]()
```go
func (w *Worker) GetInitialDelay() time.Duration
@@ -1022,7 +1021,7 @@ func (w *Worker) GetInitialDelay() time.Duration
GetInitialDelay returns the initial delay before the first tick, or 0 if not set.
-### func \(\*Worker\) [GetInterval]()
+### func \(\*Worker\) [GetInterval]()
```go
func (w *Worker) GetInterval() time.Duration
@@ -1031,7 +1030,7 @@ func (w *Worker) GetInterval() time.Duration
GetInterval returns the periodic interval, or 0 if this is not a periodic worker.
-### func \(\*Worker\) [GetJitterPercent]()
+### func \(\*Worker\) [GetJitterPercent]()
```go
func (w *Worker) GetJitterPercent() int
@@ -1040,7 +1039,7 @@ func (w *Worker) GetJitterPercent() int
GetJitterPercent returns the jitter percentage. \-1 means inherit run\-level default, 0 means no jitter.
-### func \(\*Worker\) [GetName]()
+### func \(\*Worker\) [GetName]()
```go
func (w *Worker) GetName() string
@@ -1049,7 +1048,7 @@ func (w *Worker) GetName() string
GetName returns the worker's name.
-### func \(\*Worker\) [GetRestartOnFail]()
+### func \(\*Worker\) [GetRestartOnFail]()
```go
func (w *Worker) GetRestartOnFail() bool
@@ -1058,7 +1057,7 @@ func (w *Worker) GetRestartOnFail() bool
GetRestartOnFail returns whether the worker restarts on failure.
-### func \(\*Worker\) [Handler]()
+### func \(\*Worker\) [Handler]()
```go
func (w *Worker) Handler(h CycleHandler) *Worker
@@ -1067,7 +1066,7 @@ func (w *Worker) Handler(h CycleHandler) *Worker
Handler sets the worker's [CycleHandler](<#CycleHandler>). Use this for handlers that need cleanup via Close \(e.g., database connections, leases\).
-### func \(\*Worker\) [HandlerFunc]()
+### func \(\*Worker\) [HandlerFunc]()
```go
func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
@@ -1076,7 +1075,7 @@ func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
HandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers.
-### func \(\*Worker\) [Interceptors]()
+### func \(\*Worker\) [Interceptors]()
```go
func (w *Worker) Interceptors(mw ...Middleware) *Worker
@@ -1135,7 +1134,7 @@ func main() {
-### func \(\*Worker\) [WithBackoffJitter]()
+### func \(\*Worker\) [WithBackoffJitter]()
```go
func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker
@@ -1144,7 +1143,7 @@ func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Wo
WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration.
-### func \(\*Worker\) [WithFailureBackoff]()
+### func \(\*Worker\) [WithFailureBackoff]()
```go
func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
@@ -1153,7 +1152,7 @@ func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
-### func \(\*Worker\) [WithFailureDecay]()
+### func \(\*Worker\) [WithFailureDecay]()
```go
func (w *Worker) WithFailureDecay(decay float64) *Worker
@@ -1162,7 +1161,7 @@ func (w *Worker) WithFailureDecay(decay float64) *Worker
WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
-### func \(\*Worker\) [WithFailureThreshold]()
+### func \(\*Worker\) [WithFailureThreshold]()
```go
func (w *Worker) WithFailureThreshold(threshold float64) *Worker
@@ -1171,7 +1170,7 @@ func (w *Worker) WithFailureThreshold(threshold float64) *Worker
WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
-### func \(\*Worker\) [WithInitialDelay]()
+### func \(\*Worker\) [WithInitialDelay]()
```go
func (w *Worker) WithInitialDelay(d time.Duration) *Worker
@@ -1180,7 +1179,7 @@ func (w *Worker) WithInitialDelay(d time.Duration) *Worker
WithInitialDelay delays the first tick to stagger startup. Requires [Worker.Every](<#Worker.Every>).
-### func \(\*Worker\) [WithJitter]()
+### func \(\*Worker\) [WithJitter]()
```go
func (w *Worker) WithJitter(percent int) *Worker
@@ -1189,7 +1188,7 @@ func (w *Worker) WithJitter(percent int) *Worker
WithJitter sets per\-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires [Worker.Every](<#Worker.Every>). Setting WithJitter\(0\) explicitly disables jitter even when a run\-level default is set via [WithDefaultJitter](<#WithDefaultJitter>).
-### func \(\*Worker\) [WithMetrics]()
+### func \(\*Worker\) [WithMetrics]()
```go
func (w *Worker) WithMetrics(m Metrics) *Worker
@@ -1198,7 +1197,7 @@ func (w *Worker) WithMetrics(m Metrics) *Worker
WithMetrics sets a per\-worker metrics implementation, overriding the metrics inherited from the parent [WorkerInfo](<#WorkerInfo>) or [Run](<#Run>) options.
-### func \(\*Worker\) [WithRestart]()
+### func \(\*Worker\) [WithRestart]()
```go
func (w *Worker) WithRestart(restart bool) *Worker
@@ -1247,7 +1246,7 @@ func main() {
-### func \(\*Worker\) [WithTimeout]()
+### func \(\*Worker\) [WithTimeout]()
```go
func (w *Worker) WithTimeout(d time.Duration) *Worker
@@ -1256,7 +1255,7 @@ func (w *Worker) WithTimeout(d time.Duration) *Worker
WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.
-## type [WorkerInfo]()
+## type [WorkerInfo]()
WorkerInfo carries worker metadata and child management. The framework always creates it — it is never nil. context.Context handles cancellation/deadlines/values; WorkerInfo handles everything worker\-specific.
@@ -1267,7 +1266,7 @@ type WorkerInfo struct {
```
-### func [NewWorkerInfo]()
+### func [NewWorkerInfo]()
```go
func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfo
@@ -1278,7 +1277,7 @@ NewWorkerInfo creates a [WorkerInfo](<#WorkerInfo>) with the given name and atte
Use [WithTestChildren](<#WithTestChildren>) to enable Add/Remove/GetChildren in tests.
-### func \(\*WorkerInfo\) [Add]()
+### func \(\*WorkerInfo\) [Add]()
```go
func (info *WorkerInfo) Add(w *Worker) bool
@@ -1288,9 +1287,7 @@ Add starts a child worker under this worker's supervisor subtree. Returns true i
Note: Remove \+ Add is not atomic — there is a brief window where the worker is not running. For most reconciliation patterns this is fine.
-Children inherit run\-level interceptors, metrics \(unless overridden via [Worker.WithMetrics](<#Worker.WithMetrics>)\), and scoped lifecycle — when this worker stops, all its children stop too.
-
-When a child permanently stops, it is automatically removed from the children map on the next call to [WorkerInfo.Add](<#WorkerInfo.Add>), [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>), [WorkerInfo.GetChild](<#WorkerInfo.GetChild>), or [WorkerInfo.GetChildCount](<#WorkerInfo.GetChildCount>).
+Children inherit run\-level interceptors, metrics \(unless overridden via [Worker.WithMetrics](<#Worker.WithMetrics>)\), and scoped lifecycle — when this worker stops, all its children stop too. Stopped children are automatically excluded from [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>), [WorkerInfo.GetChild](<#WorkerInfo.GetChild>), and \[WorkerInfo.GetChildCount\].
Example
@@ -1411,7 +1408,7 @@ processor v2
-### func \(\*WorkerInfo\) [GetAttempt]()
+### func \(\*WorkerInfo\) [GetAttempt]()
```go
func (info *WorkerInfo) GetAttempt() int
@@ -1420,7 +1417,7 @@ func (info *WorkerInfo) GetAttempt() int
GetAttempt returns the restart attempt number \(0 on first run\).
-### func \(\*WorkerInfo\) [GetChild]()
+### func \(\*WorkerInfo\) [GetChild]()
```go
func (info *WorkerInfo) GetChild(name string) (Worker, bool)
@@ -1430,26 +1427,17 @@ GetChild returns a copy of a running child worker and true, or the zero value an
The [CycleHandler](<#CycleHandler>) \(accessible via [Worker.GetHandler](<#Worker.GetHandler>)\) is shared with the running worker, not copied. Use type assertion to inspect handler state \(e.g., config versions for reconciliation\). See \[Example\_reconcilerWithChangeDetection\].
-
-### func \(\*WorkerInfo\) [GetChildCount]()
-
-```go
-func (info *WorkerInfo) GetChildCount() int
-```
-
-GetChildCount returns the number of currently running child workers. This is more efficient than len\([WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>)\) as it avoids allocating a sorted slice. Stopped children are lazily pruned.
-
-### func \(\*WorkerInfo\) [GetChildren]()
+### func \(\*WorkerInfo\) [GetChildren]()
```go
func (info *WorkerInfo) GetChildren() []string
```
-GetChildren returns the names of currently running child workers. Stopped children are lazily pruned before building the list.
+GetChildren returns the names of currently running child workers. Suture is the source of truth — stopped children are never returned.
-### func \(\*WorkerInfo\) [GetHandler]()
+### func \(\*WorkerInfo\) [GetHandler]()
```go
func (info *WorkerInfo) GetHandler() CycleHandler
@@ -1464,7 +1452,7 @@ if h, ok := info.GetHandler().(MyHandler); ok {
```
-### func \(\*WorkerInfo\) [GetName]()
+### func \(\*WorkerInfo\) [GetName]()
```go
func (info *WorkerInfo) GetName() string
@@ -1473,7 +1461,7 @@ func (info *WorkerInfo) GetName() string
GetName returns the worker's name as passed to [NewWorker](<#NewWorker>).
-### func \(\*WorkerInfo\) [Remove]()
+### func \(\*WorkerInfo\) [Remove]()
```go
func (info *WorkerInfo) Remove(name string)
@@ -1482,7 +1470,7 @@ func (info *WorkerInfo) Remove(name string)
Remove stops a child worker by name.
-## type [WorkerInfoOption]()
+## type [WorkerInfoOption]()
WorkerInfoOption configures a [WorkerInfo](<#WorkerInfo>) created by [NewWorkerInfo](<#NewWorkerInfo>).
@@ -1491,7 +1479,7 @@ type WorkerInfoOption func(*WorkerInfo)
```
-### func [WithTestChildren]()
+### func [WithTestChildren]()
```go
func WithTestChildren(ctx context.Context) WorkerInfoOption
@@ -1506,7 +1494,7 @@ info := workers.NewWorkerInfo("test", 0, workers.WithTestChildren(ctx))
```
-### func [WithTestHandler]()
+### func [WithTestHandler]()
```go
func WithTestHandler(h CycleHandler) WorkerInfoOption
diff --git a/example_test.go b/example_test.go
index 7a3419e..60cda0d 100644
--- a/example_test.go
+++ b/example_test.go
@@ -403,7 +403,7 @@ func Example_reconcilerWithChangeDetection() {
info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
}
time.Sleep(10 * time.Millisecond)
- fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), info.GetChildCount())
+ fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren()))
}
}
})
diff --git a/run.go b/run.go
index d18ea60..8fe5cf7 100644
--- a/run.go
+++ b/run.go
@@ -92,7 +92,6 @@ type workerRunService struct {
active *atomic.Int32
cfg *runConfig
attempt atomic.Int32
- done chan struct{} // closed on permanent stop, for lazy zombie detection
}
// Serve implements suture.Service.
@@ -116,27 +115,15 @@ func (ws *workerRunService) Serve(ctx context.Context) error {
}
info := &WorkerInfo{
- name: ws.w.name,
- attempt: attempt,
- handler: ws.w.handler,
- sup: ws.childSup,
- children: make(map[string]childEntry),
- cfg: ws.cfg,
- active: ws.active,
- metrics: m,
+ name: ws.w.name,
+ attempt: attempt,
+ handler: ws.w.handler,
+ sup: ws.childSup,
+ cfg: ws.cfg,
+ active: ws.active,
+ metrics: m,
}
- // Remove all children spawned during this attempt so they don't
- // leak across restarts (each attempt gets a fresh children map,
- // but children are attached to the long-lived childSup).
- defer func() {
- info.childrenMu.Lock()
- for name := range info.children {
- info.removeLocked(name)
- }
- info.childrenMu.Unlock()
- }()
-
err := ws.runFn(ctx, info)
if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) &&
@@ -149,9 +136,6 @@ func (ws *workerRunService) Serve(ctx context.Context) error {
if permanentStop {
ws.closeFn()
- if ws.done != nil {
- close(ws.done)
- }
return suture.ErrDoNotRestart
}
return err
@@ -175,9 +159,8 @@ func resolveMetrics(w *Worker, parent Metrics) Metrics {
// addWorkerToSupervisor creates a child supervisor for the worker,
// builds the middleware chain, resolves jitter, and adds the worker
-// to the parent supervisor. Returns the service token for removal and
-// a channel that is closed when the worker permanently stops.
-func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, active *atomic.Int32, parentMetrics Metrics) (suture.ServiceToken, <-chan struct{}) {
+// to the parent supervisor.
+func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, active *atomic.Int32, parentMetrics Metrics) {
m := resolveMetrics(w, parentMetrics)
handler := w.handler
@@ -218,15 +201,18 @@ func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig,
})
}
- done := make(chan struct{})
childSup := suture.New("worker:"+w.name, w.sutureSpec(makeEventHook(m)))
childSup.Add(&workerRunService{
w: w, runFn: runFn, closeFn: closeFn,
childSup: childSup, metrics: m, active: active, cfg: cfg,
- done: done,
})
- tok := parent.Add(&closingSupervisor{Supervisor: childSup, closeFn: closeFn})
- return tok, done
+ cs := &closingSupervisor{
+ Supervisor: childSup,
+ closeFn: closeFn,
+ childName: w.name,
+ worker: w,
+ }
+ cs.token = parent.Add(cs)
}
// Run starts all workers under a suture supervisor and blocks until ctx is
@@ -246,7 +232,7 @@ func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error {
EventHook: makeEventHook(cfg.metrics),
})
for _, w := range workers {
- _, _ = addWorkerToSupervisor(root, w, cfg, active, cfg.metrics)
+ addWorkerToSupervisor(root, w, cfg, active, cfg.metrics)
}
err := root.Serve(ctx)
if err != nil && ctx.Err() != nil {
@@ -262,13 +248,29 @@ func RunWorker(ctx context.Context, w *Worker, opts ...RunOption) {
_ = Run(ctx, []*Worker{w}, opts...)
}
+// childService is implemented by [closingSupervisor] to expose child
+// metadata when iterating suture's [suture.Supervisor.Services] list.
+type childService interface {
+ getChildName() string
+ getWorker() *Worker
+ getToken() suture.ServiceToken
+ isActive() bool
+}
+
// closingSupervisor wraps a child supervisor and calls closeFn exactly
// once after Supervisor.Serve returns. This guarantees handler.Close()
// fires when the supervisor tree is torn down, even if Serve() panics
// before reaching the permanentStop check.
+//
+// It also implements [childService] so parent workers can enumerate
+// children via [suture.Supervisor.Services] without maintaining a
+// separate map.
type closingSupervisor struct {
*suture.Supervisor
- closeFn func()
+ closeFn func()
+ childName string // the child worker's name
+ worker *Worker // the original Worker config
+ token suture.ServiceToken // set after parent.Add()
}
func (cs *closingSupervisor) Serve(ctx context.Context) error {
@@ -277,6 +279,17 @@ func (cs *closingSupervisor) Serve(ctx context.Context) error {
return err
}
+func (cs *closingSupervisor) getChildName() string { return cs.childName }
+func (cs *closingSupervisor) getWorker() *Worker { return cs.worker }
+func (cs *closingSupervisor) getToken() suture.ServiceToken { return cs.token }
+
+// isActive returns true if the worker inside this supervisor is still
+// running. When the workerRunService permanently stops (ErrDoNotRestart),
+// suture removes it from the inner supervisor, making Services() empty.
+func (cs *closingSupervisor) isActive() bool {
+ return len(cs.Services()) > 0
+}
+
// makeEventHook returns a suture event hook that logs events and records
// panic metrics.
func makeEventHook(m Metrics) suture.EventHook {
diff --git a/worker.go b/worker.go
index fb678ab..657c9ad 100644
--- a/worker.go
+++ b/worker.go
@@ -107,19 +107,11 @@ type WorkerInfo struct {
handler CycleHandler // the worker's handler, set by framework
// child management, set by framework
- sup *suture.Supervisor
- childrenMu sync.Mutex
- children map[string]childEntry
- cfg *runConfig
- active *atomic.Int32
- metrics Metrics
-}
-
-// childEntry tracks a child worker and its supervisor token.
-type childEntry struct {
- token suture.ServiceToken
- worker *Worker
- done <-chan struct{} // closed when the child permanently stops
+ sup *suture.Supervisor
+ mu sync.Mutex // serializes Add/Remove
+ cfg *runConfig
+ active *atomic.Int32
+ metrics Metrics
}
// GetName returns the worker's name as passed to [NewWorker].
@@ -151,7 +143,6 @@ func WithTestChildren(ctx context.Context) WorkerInfoOption {
sup := suture.New("test:"+info.name, suture.Spec{})
go sup.Serve(ctx) //nolint:errcheck
info.sup = sup
- info.children = make(map[string]childEntry)
info.cfg = &runConfig{metrics: BaseMetrics{}, defaultJitter: -1}
info.active = &atomic.Int32{}
info.metrics = BaseMetrics{}
@@ -177,6 +168,21 @@ func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerIn
return info
}
+// findChild returns the [closingSupervisor] for a named child, or nil
+// if not found. It queries suture's [suture.Supervisor.Services] directly —
+// suture is the source of truth for which children are running.
+func (info *WorkerInfo) findChild(name string) (childService, bool) {
+ if info.sup == nil {
+ return nil, false
+ }
+ for _, svc := range info.sup.Services() {
+ if cs, ok := svc.(childService); ok && cs.getChildName() == name && cs.isActive() {
+ return cs, true
+ }
+ }
+ return nil, false
+}
+
// Add starts a child worker under this worker's supervisor subtree.
// Returns true if the worker was added, false if a worker with the same
// name is already running (no-op). To replace a running worker, call
@@ -187,25 +193,20 @@ func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerIn
//
// Children inherit run-level interceptors, metrics (unless overridden via
// [Worker.WithMetrics]), and scoped lifecycle — when this worker stops,
-// all its children stop too.
-//
-// When a child permanently stops, it is automatically removed from
-// the children map on the next call to [WorkerInfo.Add],
-// [WorkerInfo.GetChildren], [WorkerInfo.GetChild], or
+// all its children stop too. Stopped children are automatically excluded
+// from [WorkerInfo.GetChildren], [WorkerInfo.GetChild], and
// [WorkerInfo.GetChildCount].
func (info *WorkerInfo) Add(w *Worker) bool {
if info.sup == nil {
return false
}
- info.childrenMu.Lock()
- defer info.childrenMu.Unlock()
+ info.mu.Lock()
+ defer info.mu.Unlock()
- info.pruneStoppedLocked()
- if _, ok := info.children[w.name]; ok {
+ if _, exists := info.findChild(w.name); exists {
return false
}
- tok, done := addWorkerToSupervisor(info.sup, w, info.cfg, info.active, info.metrics)
- info.children[w.name] = childEntry{token: tok, worker: w, done: done}
+ addWorkerToSupervisor(info.sup, w, info.cfg, info.active, info.metrics)
return true
}
@@ -214,60 +215,30 @@ func (info *WorkerInfo) Remove(name string) {
if info.sup == nil {
return
}
- info.childrenMu.Lock()
- defer info.childrenMu.Unlock()
+ info.mu.Lock()
+ defer info.mu.Unlock()
- info.removeLocked(name)
-}
-
-// pruneStoppedLocked removes children whose done channel is closed.
-// Caller must hold childrenMu.
-func (info *WorkerInfo) pruneStoppedLocked() {
- for name, entry := range info.children {
- if entry.done == nil {
- continue
- }
- select {
- case <-entry.done:
- delete(info.children, name)
- default:
- }
- }
-}
-
-// removeLocked stops and deletes a child by name. Caller must hold childrenMu.
-func (info *WorkerInfo) removeLocked(name string) {
- if entry, ok := info.children[name]; ok {
- _ = info.sup.Remove(entry.token)
- delete(info.children, name)
+ if cs, ok := info.findChild(name); ok {
+ _ = info.sup.Remove(cs.getToken())
}
}
// GetChildren returns the names of currently running child workers.
-// Stopped children are lazily pruned before building the list.
+// Suture is the source of truth — stopped children are never returned.
func (info *WorkerInfo) GetChildren() []string {
- info.childrenMu.Lock()
- defer info.childrenMu.Unlock()
-
- info.pruneStoppedLocked()
- names := make([]string, 0, len(info.children))
- for name := range info.children {
- names = append(names, name)
+ if info.sup == nil {
+ return nil
+ }
+ var names []string
+ for _, svc := range info.sup.Services() {
+ if cs, ok := svc.(childService); ok && cs.isActive() {
+ names = append(names, cs.getChildName())
+ }
}
slices.Sort(names)
return names
}
-// GetChildCount returns the number of currently running child workers.
-// This is more efficient than len([WorkerInfo.GetChildren]) as it avoids
-// allocating a sorted slice. Stopped children are lazily pruned.
-func (info *WorkerInfo) GetChildCount() int {
- info.childrenMu.Lock()
- defer info.childrenMu.Unlock()
- info.pruneStoppedLocked()
- return len(info.children)
-}
-
// GetChild returns a copy of a running child worker and true, or the zero
// value and false if not found. The returned value is a snapshot —
// mutations to the Worker fields have no effect on the running worker.
@@ -277,12 +248,8 @@ func (info *WorkerInfo) GetChildCount() int {
// state (e.g., config versions for reconciliation). See
// [Example_reconcilerWithChangeDetection].
func (info *WorkerInfo) GetChild(name string) (Worker, bool) {
- info.childrenMu.Lock()
- defer info.childrenMu.Unlock()
-
- info.pruneStoppedLocked()
- if entry, ok := info.children[name]; ok {
- return *entry.worker, true
+ if cs, ok := info.findChild(name); ok {
+ return *cs.getWorker(), true
}
return Worker{}, false
}
diff --git a/worker_test.go b/worker_test.go
index b28c4ce..5ebe044 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -89,7 +89,7 @@ func TestWorkerInfo(t *testing.T) {
func TestWorkerInfo_Children_Nil(t *testing.T) {
info := &WorkerInfo{name: "test"}
- assert.Empty(t, info.GetChildren(), "Children on nil map should return empty slice")
+ assert.Empty(t, info.GetChildren(), "Children on nil sup should return empty slice")
}
func TestCycleFunc_Close(t *testing.T) {
@@ -152,33 +152,6 @@ func TestWorkerInfo_GetHandler_Nil(t *testing.T) {
assert.Nil(t, info.GetHandler())
}
-func TestWorkerInfo_GetChildCount(t *testing.T) {
- info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
-
- assert.Equal(t, 0, info.GetChildCount())
-
- childFn := CycleFunc(func(ctx context.Context, _ *WorkerInfo) error {
- <-ctx.Done()
- return ctx.Err()
- })
-
- info.Add(NewWorker("a").HandlerFunc(childFn))
- info.Add(NewWorker("b").HandlerFunc(childFn))
- time.Sleep(20 * time.Millisecond)
-
- assert.Equal(t, 2, info.GetChildCount())
-
- info.Remove("a")
- time.Sleep(20 * time.Millisecond)
-
- assert.Equal(t, 1, info.GetChildCount())
-}
-
-func TestWorkerInfo_GetChildCount_Nil(t *testing.T) {
- info := &WorkerInfo{name: "test"}
- assert.Equal(t, 0, info.GetChildCount())
-}
-
func TestWorkerInfo_GetChild(t *testing.T) {
info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context()))
@@ -273,8 +246,8 @@ func TestWorkerInfo_ZombieChild_AutoCleanup(t *testing.T) {
time.Sleep(100 * time.Millisecond) // let child stop
- // Lazy prune should remove the stopped child.
- assert.Equal(t, 0, info.GetChildCount())
+ // Suture removes the stopped service — GetChildCount reflects this.
+ assert.Equal(t, 0, len(info.GetChildren()))
assert.Empty(t, info.GetChildren())
}
@@ -287,7 +260,7 @@ func TestWorkerInfo_ZombieChild_ErrDoNotRestart(t *testing.T) {
time.Sleep(100 * time.Millisecond)
- assert.Equal(t, 0, info.GetChildCount())
+ assert.Equal(t, 0, len(info.GetChildren()))
assert.Empty(t, info.GetChildren())
}
@@ -301,15 +274,14 @@ func TestWorkerInfo_ZombieChild_ReAdd(t *testing.T) {
time.Sleep(100 * time.Millisecond)
- // After prune, Add with same name should succeed.
- assert.Equal(t, 0, info.GetChildCount())
+ // Suture removed the stopped child — Add with same name should succeed.
added := info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}))
assert.True(t, added, "re-Add after zombie prune should succeed")
time.Sleep(20 * time.Millisecond)
- assert.Equal(t, 1, info.GetChildCount())
+ assert.Equal(t, 1, len(info.GetChildren()))
}
func TestWorkerInfo_ZombieChild_ReAdd_NoRead(t *testing.T) {
@@ -322,15 +294,15 @@ func TestWorkerInfo_ZombieChild_ReAdd_NoRead(t *testing.T) {
time.Sleep(100 * time.Millisecond)
- // Re-Add directly — no GetChildren/GetChild/GetChildCount call in between.
- // Add must prune the stale entry itself.
+ // Re-Add directly — suture already removed the stopped service,
+ // so Add sees no conflict via Services().
added := info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}))
- assert.True(t, added, "Add should prune stopped child and allow re-Add")
+ assert.True(t, added, "Add should succeed after stopped child is gone from suture")
time.Sleep(20 * time.Millisecond)
- assert.Equal(t, 1, info.GetChildCount())
+ assert.Equal(t, 1, len(info.GetChildren()))
}
func TestWorkerInfo_ZombieChild_GetChild(t *testing.T) {
@@ -342,7 +314,7 @@ func TestWorkerInfo_ZombieChild_GetChild(t *testing.T) {
time.Sleep(100 * time.Millisecond)
- // GetChild should also prune and return false.
+ // GetChild queries suture — stopped child is not found.
_, ok := info.GetChild("child")
assert.False(t, ok)
}
From 074c7b4f1eab282eaf163c6374ffeed21c8365c7 Mon Sep 17 00:00:00 2001
From: Ankur Shrivastava
Date: Sun, 26 Apr 2026 16:41:57 +0800
Subject: [PATCH 5/9] revert: use done channel for zombie detection, fix review
nits
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Revert the Services()-based child management back to the done channel
approach. The Services() approach introduced edge cases (grandchildren
fooling isActive, performance overhead) without solving a real problem
— the done channel handles all permanent stop cases correctly.
The done channel is closed on permanentStop in workerRunService.Serve,
which covers: nil+WithRestart(false), ErrDoNotRestart, and ctx cancel.
Failure threshold does NOT close done (correct — suture keeps
restarting after backoff decay). The defer in Serve cleans up
grandchildren before done is closed.
Also fixes:
- GetChildCount doc: invalid Go syntax in comment
- Test message: "nil sup" → "nil supervisor"
- Test comments: remove stale GetChildCount references
- Add grandchildren zombie test validating the done channel approach
---
README.md | 98 +++++++++++++++++++++++------------------
run.go | 77 ++++++++++++++------------------
worker.go | 116 ++++++++++++++++++++++++++++++++-----------------
worker_test.go | 33 +++++++++++---
4 files changed, 188 insertions(+), 136 deletions(-)
diff --git a/README.md b/README.md
index bee2eef..0255c39 100644
--- a/README.md
+++ b/README.md
@@ -387,6 +387,7 @@ shutdown complete
- [func \(info \*WorkerInfo\) Add\(w \*Worker\) bool](<#WorkerInfo.Add>)
- [func \(info \*WorkerInfo\) GetAttempt\(\) int](<#WorkerInfo.GetAttempt>)
- [func \(info \*WorkerInfo\) GetChild\(name string\) \(Worker, bool\)](<#WorkerInfo.GetChild>)
+ - [func \(info \*WorkerInfo\) GetChildCount\(\) int](<#WorkerInfo.GetChildCount>)
- [func \(info \*WorkerInfo\) GetChildren\(\) \[\]string](<#WorkerInfo.GetChildren>)
- [func \(info \*WorkerInfo\) GetHandler\(\) CycleHandler](<#WorkerInfo.GetHandler>)
- [func \(info \*WorkerInfo\) GetName\(\) string](<#WorkerInfo.GetName>)
@@ -411,7 +412,7 @@ var ErrSkipTick = errors.New("workers: skip tick")
```
-## func [Run]()
+## func [Run]()
```go
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
@@ -467,7 +468,7 @@ all workers stopped