diff --git a/README.md b/README.md index 4dd5f3a..19d1bed 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,22 @@ For periodic workers \(with [Worker.Every](<#Worker.Every>)\): the handler runs Returning nil from a non\-periodic handler stops the worker permanently, even with restart enabled. Use [ErrDoNotRestart](<#ErrDoNotRestart>) for explicit permanent completion from periodic handlers. +### Error Semantics for Periodic Handlers + +The return value from a periodic handler determines what happens next: + +``` +| Return value | Timer loop | Restart? | Use case | +|-----------------|------------|----------|--------------------------| +| nil | continues | n/a | Success, next tick fires | +| ErrSkipTick | continues | n/a | Transient failure, skip | +| ErrDoNotRestart | exits | no | Permanent completion | +| other error | exits | yes* | Failure, needs restart | +| ctx.Err() | exits | no | Graceful shutdown | +``` + +\*Only if [Worker.WithRestart](<#Worker.WithRestart>)\(true\) \(the default\). + ### Middleware Cross\-cutting concerns like tracing, logging, and panic recovery are implemented as [Middleware](<#Middleware>). The middleware chain follows the gRPC interceptor convention: a flat function that calls next to continue: @@ -68,6 +84,8 @@ func myMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.Cy Attach middleware per\-worker via [Worker.Interceptors](<#Worker.Interceptors>) or per\-run via [WithInterceptors](<#WithInterceptors>). Built\-in middleware is available in the middleware/ sub\-package. +Run\-level interceptors \([WithInterceptors](<#WithInterceptors>)\) wrap all workers and are best for cross\-cutting defaults \(tracing, logging, panic recovery\). Worker\-level interceptors \([Worker.Interceptors](<#Worker.Interceptors>)\) are best for worker\-specific concerns \(distributed locks with per\-worker TTL, rate limiting\). Children inherit run\-level interceptors but not the parent's worker\-level interceptors. + ### Helpers Common patterns are provided as helpers: @@ -171,6 +189,106 @@ pool shut down

+
Example (Reconciler With Change Detection) +

+ +Demonstrates config\-driven reconciliation with change detection using the handler\-as\-metadata pattern. The handler struct carries a config version that the reconciler inspects via GetChild\(\).GetHandler\(\) type assertion, eliminating the need for a parallel tracking map. + +```go +package main + +import ( + "context" + "fmt" + "time" + + "github.com/go-coldbrew/workers" +) + +// solverHandler is used in Example_reconcilerWithChangeDetection to +// demonstrate the handler-as-metadata pattern. +type solverHandler struct { + version int +} + +func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error { + <-ctx.Done() + return ctx.Err() +} + +func (h *solverHandler) Close() error { return nil } + +func main() { + type solverConfig struct { + version int + } + + // Simulate config that changes over 3 ticks. + configs := []map[string]solverConfig{ + {"a": {version: 1}}, + {"a": {version: 1}, "b": {version: 1}}, + {"a": {version: 2}, "b": {version: 1}}, // a gets new version + } + + tick := 0 + manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { + ticker := time.NewTicker(40 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if tick >= len(configs) { + continue + } + desired := configs[tick] + tick++ + + // Remove workers no longer desired. + for _, name := range info.GetChildren() { + if _, ok := desired[name]; !ok { + info.Remove(name) + } + } + + // Add new or replace changed workers. + for key, cfg := range desired { + child, exists := info.GetChild(key) + if exists { + // Check if config changed via handler type assertion. + if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version { + continue // unchanged, skip + } + info.Remove(key) // config changed, replace + time.Sleep(10 * time.Millisecond) // let old worker stop + } + info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version})) + } + time.Sleep(10 * time.Millisecond) + fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren())) + } + } + }) + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + + workers.Run(ctx, []*workers.Worker{manager}) +} +``` + +#### Output + +``` +tick 1: children=[a] count=1 +tick 2: children=[a b] count=2 +tick 3: children=[a b] count=2 +``` + +

+
+
Example (Standalone)

@@ -247,7 +365,11 @@ shutdown complete - [func \(w \*Worker\) AddInterceptors\(mw ...Middleware\) \*Worker](<#Worker.AddInterceptors>) - [func \(w \*Worker\) Every\(d time.Duration\) \*Worker](<#Worker.Every>) - [func \(w \*Worker\) GetHandler\(\) CycleHandler](<#Worker.GetHandler>) + - [func \(w \*Worker\) GetInitialDelay\(\) time.Duration](<#Worker.GetInitialDelay>) + - [func \(w \*Worker\) GetInterval\(\) time.Duration](<#Worker.GetInterval>) + - [func \(w \*Worker\) GetJitterPercent\(\) int](<#Worker.GetJitterPercent>) - [func \(w \*Worker\) GetName\(\) string](<#Worker.GetName>) + - [func \(w \*Worker\) GetRestartOnFail\(\) bool](<#Worker.GetRestartOnFail>) - [func \(w \*Worker\) Handler\(h CycleHandler\) \*Worker](<#Worker.Handler>) - [func \(w \*Worker\) HandlerFunc\(fn CycleFunc\) \*Worker](<#Worker.HandlerFunc>) - [func \(w \*Worker\) Interceptors\(mw ...Middleware\) \*Worker](<#Worker.Interceptors>) @@ -265,11 +387,14 @@ shutdown complete - [func \(info \*WorkerInfo\) Add\(w \*Worker\) bool](<#WorkerInfo.Add>) - [func \(info \*WorkerInfo\) GetAttempt\(\) int](<#WorkerInfo.GetAttempt>) - [func \(info \*WorkerInfo\) GetChild\(name string\) \(Worker, bool\)](<#WorkerInfo.GetChild>) + - [func \(info \*WorkerInfo\) GetChildCount\(\) int](<#WorkerInfo.GetChildCount>) - [func \(info \*WorkerInfo\) GetChildren\(\) \[\]string](<#WorkerInfo.GetChildren>) + - [func \(info \*WorkerInfo\) GetHandler\(\) CycleHandler](<#WorkerInfo.GetHandler>) - [func \(info \*WorkerInfo\) GetName\(\) string](<#WorkerInfo.GetName>) - [func \(info \*WorkerInfo\) Remove\(name string\)](<#WorkerInfo.Remove>) - [type WorkerInfoOption](<#WorkerInfoOption>) - [func WithTestChildren\(ctx context.Context\) WorkerInfoOption](<#WithTestChildren>) + - [func WithTestHandler\(h CycleHandler\) WorkerInfoOption](<#WithTestHandler>) ## Variables @@ -280,8 +405,14 @@ shutdown complete var ErrDoNotRestart = suture.ErrDoNotRestart ``` +ErrSkipTick can be returned from a periodic handler to skip the current tick without triggering restart. The timer continues and the next tick fires normally. Only meaningful for periodic workers \(with [Worker.Every](<#Worker.Every>)\). + +```go +var ErrSkipTick = errors.New("workers: skip tick") +``` + -## func [Run]() +## func [Run]() ```go func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error @@ -337,7 +468,7 @@ all workers stopped

-## func [RunWorker]() +## func [RunWorker]() ```go func RunWorker(ctx context.Context, w *Worker, opts ...RunOption) @@ -470,7 +601,7 @@ func (BaseMetrics) WorkerStopped(string) -## type [CycleFunc]() +## type [CycleFunc]() CycleFunc adapts a plain function into a [CycleHandler](<#CycleHandler>). Close is a no\-op — use this for simple, stateless handlers. @@ -479,7 +610,7 @@ type CycleFunc func(ctx context.Context, info *WorkerInfo) error ``` -### func [BatchChannelWorker]() +### func [BatchChannelWorker]() ```go func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFunc @@ -534,7 +665,7 @@ func main() { -### func [ChannelWorker]() +### func [ChannelWorker]() ```go func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFunc @@ -590,7 +721,7 @@ world -### func [EveryInterval]() +### func [EveryInterval]() ```go func EveryInterval(d time.Duration, fn CycleFunc) CycleFunc @@ -642,7 +773,7 @@ tick 2 -### func \(CycleFunc\) [Close]() +### func \(CycleFunc\) [Close]() ```go func (fn CycleFunc) Close() error @@ -651,7 +782,7 @@ func (fn CycleFunc) Close() error Close is a no\-op for CycleFunc. -### func \(CycleFunc\) [RunCycle]() +### func \(CycleFunc\) [RunCycle]() ```go func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error @@ -660,7 +791,7 @@ func (fn CycleFunc) RunCycle(ctx context.Context, info *WorkerInfo) error -## type [CycleHandler]() +## type [CycleHandler]() CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources. @@ -701,7 +832,7 @@ func NewPrometheusMetrics(namespace string) Metrics NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names \(e.g., "myapp" → "myapp\_worker\_started\_total"\). Metrics are auto\-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process\-global; use a small number of static namespaces \(not per\-request/tenant values\). -## type [Middleware]() +## type [Middleware]() Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention. @@ -710,7 +841,7 @@ type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) erro ``` -## type [RunOption]() +## type [RunOption]() RunOption configures the behavior of [Run](<#Run>). @@ -719,7 +850,7 @@ type RunOption func(*runConfig) ``` -### func [AddInterceptors]() +### func [AddInterceptors]() ```go func AddInterceptors(mw ...Middleware) RunOption @@ -728,7 +859,7 @@ func AddInterceptors(mw ...Middleware) RunOption AddInterceptors appends to the run\-level interceptor list. -### func [WithDefaultJitter]() +### func [WithDefaultJitter]() ```go func WithDefaultJitter(percent int) RunOption @@ -737,7 +868,7 @@ func WithDefaultJitter(percent int) RunOption WithDefaultJitter sets a run\-level default jitter percentage for all periodic workers. Worker\-level [Worker.WithJitter](<#Worker.WithJitter>) takes precedence. Setting Worker.WithJitter\(0\) disables jitter for a specific worker even when a run\-level default is set. -### func [WithInterceptors]() +### func [WithInterceptors]() ```go func WithInterceptors(mw ...Middleware) RunOption @@ -746,7 +877,7 @@ func WithInterceptors(mw ...Middleware) RunOption WithInterceptors replaces the run\-level interceptor list. Run\-level interceptors wrap outside worker\-level interceptors. -### func [WithMetrics]() +### func [WithMetrics]() ```go func WithMetrics(m Metrics) RunOption @@ -755,7 +886,7 @@ func WithMetrics(m Metrics) RunOption WithMetrics sets the metrics implementation for all workers started by [Run](<#Run>). Workers inherit this unless they override via [Worker.WithMetrics](<#Worker.WithMetrics>). If not set, [BaseMetrics](<#BaseMetrics>) is used. -## type [Worker]() +## type [Worker]() Worker represents a background goroutine managed by the framework. Create with [NewWorker](<#NewWorker>) and configure with builder methods. @@ -766,7 +897,7 @@ type Worker struct { ``` -### func [NewWorker]() +### func [NewWorker]() ```go func NewWorker(name string) *Worker @@ -814,7 +945,7 @@ worker "greeter" started (attempt 0) -### func \(\*Worker\) [AddInterceptors]() +### func \(\*Worker\) [AddInterceptors]() ```go func (w *Worker) AddInterceptors(mw ...Middleware) *Worker @@ -823,7 +954,7 @@ func (w *Worker) AddInterceptors(mw ...Middleware) *Worker AddInterceptors appends to the worker\-level interceptor list. -### func \(\*Worker\) [Every]() +### func \(\*Worker\) [Every]() ```go func (w *Worker) Every(d time.Duration) *Worker @@ -873,7 +1004,7 @@ tick 2 -### func \(\*Worker\) [GetHandler]() +### func \(\*Worker\) [GetHandler]() ```go func (w *Worker) GetHandler() CycleHandler @@ -881,8 +1012,35 @@ func (w *Worker) GetHandler() CycleHandler GetHandler returns the worker's [CycleHandler](<#CycleHandler>), or nil if not set. + +### func \(\*Worker\) [GetInitialDelay]() + +```go +func (w *Worker) GetInitialDelay() time.Duration +``` + +GetInitialDelay returns the initial delay before the first tick, or 0 if not set. + + +### func \(\*Worker\) [GetInterval]() + +```go +func (w *Worker) GetInterval() time.Duration +``` + +GetInterval returns the periodic interval, or 0 if this is not a periodic worker. + + +### func \(\*Worker\) [GetJitterPercent]() + +```go +func (w *Worker) GetJitterPercent() int +``` + +GetJitterPercent returns the jitter percentage. \-1 means inherit run\-level default, 0 means no jitter. + -### func \(\*Worker\) [GetName]() +### func \(\*Worker\) [GetName]() ```go func (w *Worker) GetName() string @@ -890,8 +1048,17 @@ func (w *Worker) GetName() string GetName returns the worker's name. + +### func \(\*Worker\) [GetRestartOnFail]() + +```go +func (w *Worker) GetRestartOnFail() bool +``` + +GetRestartOnFail returns whether the worker restarts on failure. + -### func \(\*Worker\) [Handler]() +### func \(\*Worker\) [Handler]() ```go func (w *Worker) Handler(h CycleHandler) *Worker @@ -900,7 +1067,7 @@ func (w *Worker) Handler(h CycleHandler) *Worker Handler sets the worker's [CycleHandler](<#CycleHandler>). Use this for handlers that need cleanup via Close \(e.g., database connections, leases\). -### func \(\*Worker\) [HandlerFunc]() +### func \(\*Worker\) [HandlerFunc]() ```go func (w *Worker) HandlerFunc(fn CycleFunc) *Worker @@ -909,7 +1076,7 @@ func (w *Worker) HandlerFunc(fn CycleFunc) *Worker HandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers. -### func \(\*Worker\) [Interceptors]() +### func \(\*Worker\) [Interceptors]() ```go func (w *Worker) Interceptors(mw ...Middleware) *Worker @@ -968,7 +1135,7 @@ func main() { -### func \(\*Worker\) [WithBackoffJitter]() +### func \(\*Worker\) [WithBackoffJitter]() ```go func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker @@ -977,7 +1144,7 @@ func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Wo WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration. -### func \(\*Worker\) [WithFailureBackoff]() +### func \(\*Worker\) [WithFailureBackoff]() ```go func (w *Worker) WithFailureBackoff(d time.Duration) *Worker @@ -986,7 +1153,7 @@ func (w *Worker) WithFailureBackoff(d time.Duration) *Worker WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds. -### func \(\*Worker\) [WithFailureDecay]() +### func \(\*Worker\) [WithFailureDecay]() ```go func (w *Worker) WithFailureDecay(decay float64) *Worker @@ -995,7 +1162,7 @@ func (w *Worker) WithFailureDecay(decay float64) *Worker WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0. -### func \(\*Worker\) [WithFailureThreshold]() +### func \(\*Worker\) [WithFailureThreshold]() ```go func (w *Worker) WithFailureThreshold(threshold float64) *Worker @@ -1004,7 +1171,7 @@ func (w *Worker) WithFailureThreshold(threshold float64) *Worker WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5. -### func \(\*Worker\) [WithInitialDelay]() +### func \(\*Worker\) [WithInitialDelay]() ```go func (w *Worker) WithInitialDelay(d time.Duration) *Worker @@ -1013,7 +1180,7 @@ func (w *Worker) WithInitialDelay(d time.Duration) *Worker WithInitialDelay delays the first tick to stagger startup. Requires [Worker.Every](<#Worker.Every>). -### func \(\*Worker\) [WithJitter]() +### func \(\*Worker\) [WithJitter]() ```go func (w *Worker) WithJitter(percent int) *Worker @@ -1022,7 +1189,7 @@ func (w *Worker) WithJitter(percent int) *Worker WithJitter sets per\-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires [Worker.Every](<#Worker.Every>). Setting WithJitter\(0\) explicitly disables jitter even when a run\-level default is set via [WithDefaultJitter](<#WithDefaultJitter>). -### func \(\*Worker\) [WithMetrics]() +### func \(\*Worker\) [WithMetrics]() ```go func (w *Worker) WithMetrics(m Metrics) *Worker @@ -1031,7 +1198,7 @@ func (w *Worker) WithMetrics(m Metrics) *Worker WithMetrics sets a per\-worker metrics implementation, overriding the metrics inherited from the parent [WorkerInfo](<#WorkerInfo>) or [Run](<#Run>) options. -### func \(\*Worker\) [WithRestart]() +### func \(\*Worker\) [WithRestart]() ```go func (w *Worker) WithRestart(restart bool) *Worker @@ -1039,6 +1206,8 @@ func (w *Worker) WithRestart(restart bool) *Worker WithRestart configures whether the worker should be restarted on failure. Default is true. Set to false for one\-shot workers that should exit after completion or failure. Note: a handler returning nil always stops the worker permanently, regardless of this setting. +Periodic workers \(with [Worker.Every](<#Worker.Every>)\) should generally keep the default \(restart enabled\). Use [ErrSkipTick](<#ErrSkipTick>) for transient failures and [ErrDoNotRestart](<#ErrDoNotRestart>) for permanent completion instead of disabling restart. +
Example

@@ -1080,7 +1249,7 @@ func main() {

-### func \(\*Worker\) [WithTimeout]() +### func \(\*Worker\) [WithTimeout]() ```go func (w *Worker) WithTimeout(d time.Duration) *Worker @@ -1089,7 +1258,7 @@ func (w *Worker) WithTimeout(d time.Duration) *Worker WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds. -## type [WorkerInfo]() +## type [WorkerInfo]() WorkerInfo carries worker metadata and child management. The framework always creates it — it is never nil. context.Context handles cancellation/deadlines/values; WorkerInfo handles everything worker\-specific. @@ -1100,7 +1269,7 @@ type WorkerInfo struct { ``` -### func [NewWorkerInfo]() +### func [NewWorkerInfo]() ```go func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfo @@ -1111,7 +1280,7 @@ NewWorkerInfo creates a [WorkerInfo](<#WorkerInfo>) with the given name and atte Use [WithTestChildren](<#WithTestChildren>) to enable Add/Remove/GetChildren in tests. -### func \(\*WorkerInfo\) [Add]() +### func \(\*WorkerInfo\) [Add]() ```go func (info *WorkerInfo) Add(w *Worker) bool @@ -1123,6 +1292,8 @@ Note: Remove \+ Add is not atomic — there is a brief window where the worker i Children inherit run\-level interceptors, metrics \(unless overridden via [Worker.WithMetrics](<#Worker.WithMetrics>)\), and scoped lifecycle — when this worker stops, all its children stop too. +When a child permanently stops, it is automatically removed from the children map on the next call to [WorkerInfo.Add](<#WorkerInfo.Add>), [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>), [WorkerInfo.GetChild](<#WorkerInfo.GetChild>), or [WorkerInfo.GetChildCount](<#WorkerInfo.GetChildCount>). +
Example

@@ -1242,7 +1413,7 @@ processor v2

-### func \(\*WorkerInfo\) [GetAttempt]() +### func \(\*WorkerInfo\) [GetAttempt]() ```go func (info *WorkerInfo) GetAttempt() int @@ -1251,25 +1422,51 @@ func (info *WorkerInfo) GetAttempt() int GetAttempt returns the restart attempt number \(0 on first run\). -### func \(\*WorkerInfo\) [GetChild]() +### func \(\*WorkerInfo\) [GetChild]() ```go func (info *WorkerInfo) GetChild(name string) (Worker, bool) ``` -GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations have no effect on the running worker. +GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations to the Worker fields have no effect on the running worker. + +The [CycleHandler](<#CycleHandler>) \(accessible via [Worker.GetHandler](<#Worker.GetHandler>)\) is shared with the running worker, not copied. Use type assertion to inspect handler state \(e.g., config versions for reconciliation\). See \[Example\_reconcilerWithChangeDetection\]. + + +### func \(\*WorkerInfo\) [GetChildCount]() + +```go +func (info *WorkerInfo) GetChildCount() int +``` + +GetChildCount returns the number of currently running child workers. This is more efficient than calling [WorkerInfo.GetChildren](<#WorkerInfo.GetChildren>) and taking len, as it avoids allocating a sorted slice. Stopped children are lazily pruned. -### func \(\*WorkerInfo\) [GetChildren]() +### func \(\*WorkerInfo\) [GetChildren]() ```go func (info *WorkerInfo) GetChildren() []string ``` -GetChildren returns the names of currently running child workers. +GetChildren returns the names of currently running child workers. Stopped children are lazily pruned before building the list. + + +### func \(\*WorkerInfo\) [GetHandler]() + +```go +func (info *WorkerInfo) GetHandler() CycleHandler +``` + +GetHandler returns the worker's [CycleHandler](<#CycleHandler>), or nil if not set. Use type assertion to access handler\-specific state or interfaces: + +``` +if h, ok := info.GetHandler().(MyHandler); ok { + // access h.Config, h.Version, etc. +} +``` -### func \(\*WorkerInfo\) [GetName]() +### func \(\*WorkerInfo\) [GetName]() ```go func (info *WorkerInfo) GetName() string @@ -1278,7 +1475,7 @@ func (info *WorkerInfo) GetName() string GetName returns the worker's name as passed to [NewWorker](<#NewWorker>). -### func \(\*WorkerInfo\) [Remove]() +### func \(\*WorkerInfo\) [Remove]() ```go func (info *WorkerInfo) Remove(name string) @@ -1287,7 +1484,7 @@ func (info *WorkerInfo) Remove(name string) Remove stops a child worker by name. -## type [WorkerInfoOption]() +## type [WorkerInfoOption]() WorkerInfoOption configures a [WorkerInfo](<#WorkerInfo>) created by [NewWorkerInfo](<#NewWorkerInfo>). @@ -1296,7 +1493,7 @@ type WorkerInfoOption func(*WorkerInfo) ``` -### func [WithTestChildren]() +### func [WithTestChildren]() ```go func WithTestChildren(ctx context.Context) WorkerInfoOption @@ -1310,4 +1507,13 @@ defer cancel() info := workers.NewWorkerInfo("test", 0, workers.WithTestChildren(ctx)) ``` + +### func [WithTestHandler]() + +```go +func WithTestHandler(h CycleHandler) WorkerInfoOption +``` + +WithTestHandler sets the handler on a test [WorkerInfo](<#WorkerInfo>) so that [WorkerInfo.GetHandler](<#WorkerInfo.GetHandler>) works in unit tests. + Generated by [gomarkdoc]() diff --git a/example_test.go b/example_test.go index 6e74ad0..bd635bf 100644 --- a/example_test.go +++ b/example_test.go @@ -8,6 +8,19 @@ import ( "github.com/go-coldbrew/workers" ) +// solverHandler is used in Example_reconcilerWithChangeDetection to +// demonstrate the handler-as-metadata pattern. +type solverHandler struct { + version int +} + +func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error { + <-ctx.Done() + return ctx.Err() +} + +func (h *solverHandler) Close() error { return nil } + // A simple worker that runs until cancelled. func ExampleNewWorker() { w := workers.NewWorker("greeter").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { @@ -338,6 +351,73 @@ func Example_dynamicWorkerPool() { // pool shut down } +// Demonstrates config-driven reconciliation with change detection using +// the handler-as-metadata pattern. The handler struct carries a config +// version that the reconciler inspects via GetChild().GetHandler() type +// assertion, eliminating the need for a parallel tracking map. +func Example_reconcilerWithChangeDetection() { + type solverConfig struct { + version int + } + + // Simulate config that changes over 3 ticks. + configs := []map[string]solverConfig{ + {"a": {version: 1}}, + {"a": {version: 1}, "b": {version: 1}}, + {"a": {version: 2}, "b": {version: 1}}, // a gets new version + } + + tick := 0 + manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { + ticker := time.NewTicker(40 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if tick >= len(configs) { + continue + } + desired := configs[tick] + tick++ + + // Remove workers no longer desired. + for _, name := range info.GetChildren() { + if _, ok := desired[name]; !ok { + info.Remove(name) + } + } + + // Add new or replace changed workers. + for key, cfg := range desired { + child, exists := info.GetChild(key) + if exists { + // Check if config changed via handler type assertion. + if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version { + continue // unchanged, skip + } + info.Remove(key) // config changed, replace + time.Sleep(10 * time.Millisecond) // let old worker stop + } + info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version})) + } + time.Sleep(10 * time.Millisecond) + fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren())) + } + } + }) + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + + workers.Run(ctx, []*workers.Worker{manager}) + // Output: + // tick 1: children=[a] count=1 + // tick 2: children=[a b] count=2 + // tick 3: children=[a b] count=2 +} + // Per-worker middleware using the interceptor pattern. func ExampleWorker_Interceptors() { loggingMW := func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error { diff --git a/helpers.go b/helpers.go index 1cc6601..796b6a4 100644 --- a/helpers.go +++ b/helpers.go @@ -2,6 +2,7 @@ package workers import ( "context" + "errors" "math/rand/v2" "time" ) @@ -51,7 +52,9 @@ func everyIntervalWithJitter(base time.Duration, jitterPercent int, initialDelay return ctx.Err() case <-timer.C: if err := fn(ctx, info); err != nil { - return err + if !errors.Is(err, ErrSkipTick) { + return err + } } timer.Reset(computeInterval()) } diff --git a/helpers_test.go b/helpers_test.go index 2ac01f6..7ae2e60 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -3,6 +3,7 @@ package workers import ( "context" "errors" + "fmt" "math" "sync/atomic" "testing" @@ -136,6 +137,44 @@ func TestEveryInterval_Jitter_VariableIntervals(t *testing.T) { assert.Greater(t, stddev, 0.0, "intervals should vary with jitter enabled") } +func TestEveryIntervalWithJitter_ErrSkipTick(t *testing.T) { + var count atomic.Int32 + fn := everyIntervalWithJitter(10*time.Millisecond, 0, 0, func(_ context.Context, _ *WorkerInfo) error { + n := count.Add(1) + if n == 1 { + return ErrSkipTick // skip first tick + } + return nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond) + defer cancel() + + info := &WorkerInfo{name: "skiptick", attempt: 0} + err := fn(ctx, info) + assert.ErrorIs(t, err, context.DeadlineExceeded, "should not exit from ErrSkipTick") + assert.GreaterOrEqual(t, int(count.Load()), 2, "should continue ticking after ErrSkipTick") +} + +func TestEveryIntervalWithJitter_ErrSkipTick_Wrapped(t *testing.T) { + var count atomic.Int32 + fn := everyIntervalWithJitter(10*time.Millisecond, 0, 0, func(_ context.Context, _ *WorkerInfo) error { + n := count.Add(1) + if n == 1 { + return fmt.Errorf("db timeout: %w", ErrSkipTick) + } + return nil + }) + + ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond) + defer cancel() + + info := &WorkerInfo{name: "skiptick-wrapped", attempt: 0} + err := fn(ctx, info) + assert.ErrorIs(t, err, context.DeadlineExceeded, "wrapped ErrSkipTick should also be caught") + assert.GreaterOrEqual(t, int(count.Load()), 2) +} + func TestChannelWorker(t *testing.T) { ch := make(chan string, 3) ch <- "a" diff --git a/middleware/README.md b/middleware/README.md index 902ba90..de377c2 100644 --- a/middleware/README.md +++ b/middleware/README.md @@ -38,6 +38,7 @@ Package middleware provides optional interceptors for [go\\\-coldbrew/workers](< - [type LockOption](<#LockOption>) - [func WithKeyFunc\(fn func\(name string\) string\) LockOption](<#WithKeyFunc>) - [func WithOnNotAcquired\(fn func\(ctx context.Context, name string\) error\) LockOption](<#WithOnNotAcquired>) + - [func WithSkipOnNotAcquired\(logFn func\(ctx context.Context, name string\)\) LockOption](<#WithSkipOnNotAcquired>) - [func WithTTLFunc\(fn func\(name string\) time.Duration\) LockOption](<#WithTTLFunc>) - [type Locker](<#Locker>) @@ -60,7 +61,7 @@ workers.Run(ctx, myWorkers, ``` -## func [DistributedLock]() +## func [DistributedLock]() ```go func DistributedLock(locker Locker, opts ...LockOption) workers.Middleware @@ -114,7 +115,7 @@ func Timeout(d time.Duration) workers.Middleware Timeout enforces a per\-cycle deadline. Distinct from \[workers.Worker.WithTimeout\] which controls graceful shutdown. -## func [Tracing]() +## func [Tracing]() ```go func Tracing() workers.Middleware @@ -122,8 +123,12 @@ func Tracing() workers.Middleware Tracing creates an OTEL span per cycle via go\-coldbrew/tracing. The span is named "worker:\:cycle" and records errors. +Worker spans are always sampled regardless of the global TracerProvider's sampler. This prevents silent span drops when using ParentBased\(TraceIDRatioBased\(ratio\)\), where worker root spans \(which have no parent\) would otherwise be probabilistically dropped. + +The OTEL trace ID is injected into the log context as "trace" for correlation with the tracing backend. + -## type [LockOption]() +## type [LockOption]() LockOption configures [DistributedLock](<#DistributedLock>) behavior. @@ -132,7 +137,7 @@ type LockOption func(*lockConfig) ``` -### func [WithKeyFunc]() +### func [WithKeyFunc]() ```go func WithKeyFunc(fn func(name string) string) LockOption @@ -141,7 +146,7 @@ func WithKeyFunc(fn func(name string) string) LockOption WithKeyFunc sets a custom function to derive the lock key from the worker name. Default: "worker\-lock:\". -### func [WithOnNotAcquired]() +### func [WithOnNotAcquired]() ```go func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOption @@ -149,8 +154,19 @@ func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOpti WithOnNotAcquired sets a callback invoked when the lock is held by another instance. The cycle is skipped. Default: skip silently \(return nil\). +Caution: returning a non\-nil error from this callback triggers the framework's normal error handling — for periodic workers, this means restart with backoff. If you want to log and skip, return nil from this callback or use [WithSkipOnNotAcquired](<#WithSkipOnNotAcquired>). + + +### func [WithSkipOnNotAcquired]() + +```go +func WithSkipOnNotAcquired(logFn func(ctx context.Context, name string)) LockOption +``` + +WithSkipOnNotAcquired is a convenience [LockOption](<#LockOption>) that calls logFn when the lock is held and skips the cycle \(returns nil, no restart\). If logFn is nil, the cycle is skipped silently \(same as the default but explicit in intent\). + -### func [WithTTLFunc]() +### func [WithTTLFunc]() ```go func WithTTLFunc(fn func(name string) time.Duration) LockOption @@ -159,9 +175,9 @@ func WithTTLFunc(fn func(name string) time.Duration) LockOption WithTTLFunc sets a custom function to derive the lock TTL from the worker name. Default: 30s. -## type [Locker]() +## type [Locker]() -Locker abstracts a distributed lock backend \(e.g., Redis, etcd, Consul\). +Locker abstracts a distributed lock backend \(e.g., Redis, etcd, Consul\). If your lock implementation already has Acquire\(ctx, key, ttl\) \(bool, error\) and Release\(ctx, key\) error methods, it satisfies this interface directly — no adapter needed. ```go type Locker interface { diff --git a/middleware/lock.go b/middleware/lock.go index 030e5bf..91b3d9e 100644 --- a/middleware/lock.go +++ b/middleware/lock.go @@ -10,6 +10,9 @@ import ( ) // Locker abstracts a distributed lock backend (e.g., Redis, etcd, Consul). +// If your lock implementation already has Acquire(ctx, key, ttl) (bool, error) +// and Release(ctx, key) error methods, it satisfies this interface directly — +// no adapter needed. type Locker interface { // Acquire attempts to acquire a lock for the given key with a TTL. // Returns true if the lock was acquired, false if held by another instance. @@ -41,10 +44,28 @@ func WithTTLFunc(fn func(name string) time.Duration) LockOption { // WithOnNotAcquired sets a callback invoked when the lock is held by another // instance. The cycle is skipped. Default: skip silently (return nil). +// +// Caution: returning a non-nil error from this callback triggers the +// framework's normal error handling — for periodic workers, this means +// restart with backoff. If you want to log and skip, return nil from +// this callback or use [WithSkipOnNotAcquired]. func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOption { return func(c *lockConfig) { c.onNotAcquired = fn } } +// WithSkipOnNotAcquired is a convenience [LockOption] that calls logFn +// when the lock is held and skips the cycle (returns nil, no restart). +// If logFn is nil, the cycle is skipped silently (same as the default +// but explicit in intent). +func WithSkipOnNotAcquired(logFn func(ctx context.Context, name string)) LockOption { + return WithOnNotAcquired(func(ctx context.Context, name string) error { + if logFn != nil { + logFn(ctx, name) + } + return nil + }) +} + // DistributedLock acquires a distributed lock before each cycle. If the lock // is held by another instance, the cycle is skipped (or the onNotAcquired // callback is invoked). Release uses [context.WithoutCancel] so that context diff --git a/middleware/lock_test.go b/middleware/lock_test.go index bd274e3..45eb550 100644 --- a/middleware/lock_test.go +++ b/middleware/lock_test.go @@ -104,6 +104,37 @@ func TestDistributedLock_AcquireError(t *testing.T) { assert.EqualError(t, err, "redis down") } +func TestDistributedLock_WithSkipOnNotAcquired(t *testing.T) { + locker := &mockLocker{acquired: false} + var gotName string + mw := DistributedLock(locker, WithSkipOnNotAcquired(func(_ context.Context, name string) { + gotName = name + })) + + called := false + info := workers.NewWorkerInfo("skipped", 0) + err := mw(context.Background(), info, func(_ context.Context, _ *workers.WorkerInfo) error { + called = true + return nil + }) + + assert.NoError(t, err, "WithSkipOnNotAcquired should return nil") + assert.False(t, called, "next should not be called") + assert.Equal(t, "skipped", gotName, "logFn should receive worker name") +} + +func TestDistributedLock_WithSkipOnNotAcquired_NilLogFn(t *testing.T) { + locker := &mockLocker{acquired: false} + mw := DistributedLock(locker, WithSkipOnNotAcquired(nil)) + + info := workers.NewWorkerInfo("skipped", 0) + err := mw(context.Background(), info, func(_ context.Context, _ *workers.WorkerInfo) error { + return nil + }) + + assert.NoError(t, err, "nil logFn should still skip silently") +} + func TestDistributedLock_CustomKeyAndTTL(t *testing.T) { locker := &mockLocker{acquired: true} mw := DistributedLock(locker, diff --git a/middleware/tracing.go b/middleware/tracing.go index 55857be..e52084c 100644 --- a/middleware/tracing.go +++ b/middleware/tracing.go @@ -2,17 +2,35 @@ package middleware import ( "context" + "crypto/rand" + "github.com/go-coldbrew/log" "github.com/go-coldbrew/tracing" "github.com/go-coldbrew/workers" + oteltrace "go.opentelemetry.io/otel/trace" ) // Tracing creates an OTEL span per cycle via go-coldbrew/tracing. // The span is named "worker::cycle" and records errors. +// +// Worker spans are always sampled regardless of the global +// TracerProvider's sampler. This prevents silent span drops when +// using ParentBased(TraceIDRatioBased(ratio)), where worker root +// spans (which have no parent) would otherwise be probabilistically +// dropped. +// +// The OTEL trace ID is injected into the log context as "trace" +// for correlation with the tracing backend. func Tracing() workers.Middleware { return func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error { + ctx = ensureSampled(ctx) span, ctx := tracing.NewInternalSpan(ctx, "worker:"+info.GetName()+":cycle") defer span.Finish() + + if spanCtx := oteltrace.SpanFromContext(ctx).SpanContext(); spanCtx.HasTraceID() { + ctx = log.AddToContext(ctx, "trace", spanCtx.TraceID().String()) + } + err := next(ctx, info) if err != nil { _ = span.SetError(err) @@ -20,3 +38,23 @@ func Tracing() workers.Middleware { return err } } + +// ensureSampled injects a sampled remote span context so that +// ParentBased samplers always sample the next span created from +// this context. If the context already has a sampled span, it is +// returned unchanged. +func ensureSampled(ctx context.Context) context.Context { + if oteltrace.SpanFromContext(ctx).SpanContext().IsSampled() { + return ctx + } + var traceID oteltrace.TraceID + var spanID oteltrace.SpanID + _, _ = rand.Read(traceID[:]) + _, _ = rand.Read(spanID[:]) + return oteltrace.ContextWithRemoteSpanContext(ctx, oteltrace.NewSpanContext(oteltrace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: oteltrace.FlagsSampled, + Remote: true, + })) +} diff --git a/middleware/tracing_test.go b/middleware/tracing_test.go index e5a3319..287f048 100644 --- a/middleware/tracing_test.go +++ b/middleware/tracing_test.go @@ -6,11 +6,10 @@ import ( "github.com/go-coldbrew/workers" "github.com/stretchr/testify/assert" + oteltrace "go.opentelemetry.io/otel/trace" ) func TestTracing_NoPanic(t *testing.T) { - // Verify Tracing() doesn't panic and passes through correctly. - // Full span verification would require a tracing test harness. mw := Tracing() called := false @@ -34,3 +33,42 @@ func TestTracing_PropagatesError(t *testing.T) { assert.ErrorIs(t, err, assert.AnError) } + +func TestEnsureSampled(t *testing.T) { + // From a bare context (no span), ensureSampled should inject a + // sampled remote span context. + ctx := ensureSampled(context.Background()) + sc := oteltrace.SpanContextFromContext(ctx) + + assert.True(t, sc.IsSampled(), "should be sampled") + assert.True(t, sc.IsRemote(), "should be remote") + assert.True(t, sc.HasTraceID(), "should have a trace ID") + assert.True(t, sc.HasSpanID(), "should have a span ID") +} + +func TestEnsureSampled_AlreadySampled(t *testing.T) { + // If context already has a sampled span, ensureSampled is a no-op. + existing := oteltrace.NewSpanContext(oteltrace.SpanContextConfig{ + TraceID: oteltrace.TraceID{1, 2, 3}, + SpanID: oteltrace.SpanID{4, 5, 6}, + TraceFlags: oteltrace.FlagsSampled, + Remote: true, + }) + ctx := oteltrace.ContextWithRemoteSpanContext(context.Background(), existing) + + ctx = ensureSampled(ctx) + sc := oteltrace.SpanContextFromContext(ctx) + + assert.Equal(t, existing.TraceID(), sc.TraceID(), "should keep existing trace ID") + assert.Equal(t, existing.SpanID(), sc.SpanID(), "should keep existing span ID") +} + +func TestEnsureSampled_UniqueIDs(t *testing.T) { + ctx1 := ensureSampled(context.Background()) + ctx2 := ensureSampled(context.Background()) + + sc1 := oteltrace.SpanContextFromContext(ctx1) + sc2 := oteltrace.SpanContextFromContext(ctx2) + + assert.NotEqual(t, sc1.TraceID(), sc2.TraceID(), "each call should generate unique trace IDs") +} diff --git a/run.go b/run.go index 73eb650..d18ea60 100644 --- a/run.go +++ b/run.go @@ -16,12 +16,17 @@ import ( // permanent completion (e.g., channel closed, work exhausted). var ErrDoNotRestart = suture.ErrDoNotRestart +// ErrSkipTick can be returned from a periodic handler to skip the current +// tick without triggering restart. The timer continues and the next tick +// fires normally. Only meaningful for periodic workers (with [Worker.Every]). +var ErrSkipTick = errors.New("workers: skip tick") + // RunOption configures the behavior of [Run]. type RunOption func(*runConfig) type runConfig struct { - metrics Metrics - interceptors []Middleware + metrics Metrics + interceptors []Middleware defaultJitter int // -1 = not set } @@ -81,12 +86,13 @@ func buildChain(middlewares []Middleware, handler CycleHandler) CycleFunc { type workerRunService struct { w *Worker runFn CycleFunc // fully resolved: chain + interval wrapping - closeFn func() // calls handler.Close() exactly once via shared sync.Once + closeFn func() // calls handler.Close() exactly once via shared sync.Once childSup *suture.Supervisor metrics Metrics active *atomic.Int32 cfg *runConfig attempt atomic.Int32 + done chan struct{} // closed on permanent stop, for lazy zombie detection } // Serve implements suture.Service. @@ -112,6 +118,7 @@ func (ws *workerRunService) Serve(ctx context.Context) error { info := &WorkerInfo{ name: ws.w.name, attempt: attempt, + handler: ws.w.handler, sup: ws.childSup, children: make(map[string]childEntry), cfg: ws.cfg, @@ -132,7 +139,8 @@ func (ws *workerRunService) Serve(ctx context.Context) error { err := ws.runFn(ctx, info) - if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) { + if err != nil && ctx.Err() == nil && !errors.Is(err, suture.ErrDoNotRestart) && + (ws.w.interval <= 0 || !errors.Is(err, ErrSkipTick)) { m.WorkerFailed(ws.w.name, err) } @@ -141,6 +149,9 @@ func (ws *workerRunService) Serve(ctx context.Context) error { if permanentStop { ws.closeFn() + if ws.done != nil { + close(ws.done) + } return suture.ErrDoNotRestart } return err @@ -164,8 +175,9 @@ func resolveMetrics(w *Worker, parent Metrics) Metrics { // addWorkerToSupervisor creates a child supervisor for the worker, // builds the middleware chain, resolves jitter, and adds the worker -// to the parent supervisor. Returns the service token for removal. -func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, active *atomic.Int32, parentMetrics Metrics) suture.ServiceToken { +// to the parent supervisor. Returns the service token for removal and +// a channel that is closed when the worker permanently stops. +func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, active *atomic.Int32, parentMetrics Metrics) (suture.ServiceToken, <-chan struct{}) { m := resolveMetrics(w, parentMetrics) handler := w.handler @@ -206,12 +218,15 @@ func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, cfg *runConfig, }) } + done := make(chan struct{}) childSup := suture.New("worker:"+w.name, w.sutureSpec(makeEventHook(m))) childSup.Add(&workerRunService{ w: w, runFn: runFn, closeFn: closeFn, childSup: childSup, metrics: m, active: active, cfg: cfg, + done: done, }) - return parent.Add(&closingSupervisor{Supervisor: childSup, closeFn: closeFn}) + tok := parent.Add(&closingSupervisor{Supervisor: childSup, closeFn: closeFn}) + return tok, done } // Run starts all workers under a suture supervisor and blocks until ctx is @@ -231,7 +246,7 @@ func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error { EventHook: makeEventHook(cfg.metrics), }) for _, w := range workers { - addWorkerToSupervisor(root, w, cfg, active, cfg.metrics) + _, _ = addWorkerToSupervisor(root, w, cfg, active, cfg.metrics) } err := root.Serve(ctx) if err != nil && ctx.Err() != nil { diff --git a/run_test.go b/run_test.go index 8e69f6c..2511a64 100644 --- a/run_test.go +++ b/run_test.go @@ -423,6 +423,58 @@ func TestRun_ClosingSupervisor_ClosesOnShutdown(t *testing.T) { assert.Equal(t, int32(1), closeCount.Load(), "Close should be called exactly once") } +func TestRun_ErrSkipTick_PeriodicWorker(t *testing.T) { + var count atomic.Int32 + w := NewWorker("skipper").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + n := count.Add(1) + if n == 1 { + return ErrSkipTick + } + return nil + }).Every(10 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond) + defer cancel() + + err := Run(ctx, []*Worker{w}) + assert.NoError(t, err) + assert.GreaterOrEqual(t, int(count.Load()), 2, "should tick again after ErrSkipTick") +} + +func TestRun_ErrSkipTick_NotCountedAsFailure(t *testing.T) { + m := newMockMetrics() + w := NewWorker("skipper").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + return ErrSkipTick + }).Every(10 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + Run(ctx, []*Worker{w}, WithMetrics(m)) + + m.mu.Lock() + defer m.mu.Unlock() + assert.Empty(t, m.failed, "ErrSkipTick should not be counted as failure") +} + +func TestRun_ErrSkipTick_NonPeriodic_CountedAsFailure(t *testing.T) { + m := newMockMetrics() + var attempts atomic.Int32 + w := NewWorker("non-periodic-skipper").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + attempts.Add(1) + return ErrSkipTick // meaningless for non-periodic, should be treated as normal error + }).WithRestart(false) // stop after first attempt + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + Run(ctx, []*Worker{w}, WithMetrics(m)) + + m.mu.Lock() + defer m.mu.Unlock() + assert.NotEmpty(t, m.failed, "ErrSkipTick from non-periodic worker should be counted as failure") +} + func TestRun_ErrDoNotRestart_NotCountedAsFailure(t *testing.T) { m := newMockMetrics() w := NewWorker("completer").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { diff --git a/worker.go b/worker.go index 5a1e3ab..78f09b4 100644 --- a/worker.go +++ b/worker.go @@ -33,6 +33,20 @@ // even with restart enabled. Use [ErrDoNotRestart] for explicit permanent // completion from periodic handlers. // +// # Error Semantics for Periodic Handlers +// +// The return value from a periodic handler determines what happens next: +// +// | Return value | Timer loop | Restart? | Use case | +// |-----------------|------------|----------|--------------------------| +// | nil | continues | n/a | Success, next tick fires | +// | ErrSkipTick | continues | n/a | Transient failure, skip | +// | ErrDoNotRestart | exits | no | Permanent completion | +// | other error | exits | yes* | Failure, needs restart | +// | ctx.Err() | exits | no | Graceful shutdown | +// +// *Only if [Worker.WithRestart](true) (the default). +// // # Middleware // // Cross-cutting concerns like tracing, logging, and panic recovery are @@ -50,6 +64,13 @@ // [WithInterceptors]. Built-in middleware is available in the middleware/ // sub-package. // +// Run-level interceptors ([WithInterceptors]) wrap all workers and are +// best for cross-cutting defaults (tracing, logging, panic recovery). +// Worker-level interceptors ([Worker.Interceptors]) are best for +// worker-specific concerns (distributed locks with per-worker TTL, +// rate limiting). Children inherit run-level interceptors but not the +// parent's worker-level interceptors. +// // # Helpers // // Common patterns are provided as helpers: @@ -83,6 +104,7 @@ import ( type WorkerInfo struct { name string attempt int + handler CycleHandler // the worker's handler, set by framework // child management, set by framework sup *suture.Supervisor @@ -97,6 +119,7 @@ type WorkerInfo struct { type childEntry struct { token suture.ServiceToken worker *Worker + done <-chan struct{} // closed when the child permanently stops } // GetName returns the worker's name as passed to [NewWorker]. @@ -105,6 +128,14 @@ func (info *WorkerInfo) GetName() string { return info.name } // GetAttempt returns the restart attempt number (0 on first run). func (info *WorkerInfo) GetAttempt() int { return info.attempt } +// GetHandler returns the worker's [CycleHandler], or nil if not set. +// Use type assertion to access handler-specific state or interfaces: +// +// if h, ok := info.GetHandler().(MyHandler); ok { +// // access h.Config, h.Version, etc. +// } +func (info *WorkerInfo) GetHandler() CycleHandler { return info.handler } + // WorkerInfoOption configures a [WorkerInfo] created by [NewWorkerInfo]. type WorkerInfoOption func(*WorkerInfo) @@ -127,6 +158,12 @@ func WithTestChildren(ctx context.Context) WorkerInfoOption { } } +// WithTestHandler sets the handler on a test [WorkerInfo] so that +// [WorkerInfo.GetHandler] works in unit tests. +func WithTestHandler(h CycleHandler) WorkerInfoOption { + return func(info *WorkerInfo) { info.handler = h } +} + // NewWorkerInfo creates a [WorkerInfo] with the given name and attempt. // This is useful for testing middleware and handlers — the framework // creates fully populated instances internally. @@ -151,6 +188,11 @@ func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerIn // Children inherit run-level interceptors, metrics (unless overridden via // [Worker.WithMetrics]), and scoped lifecycle — when this worker stops, // all its children stop too. +// +// When a child permanently stops, it is automatically removed from +// the children map on the next call to [WorkerInfo.Add], +// [WorkerInfo.GetChildren], [WorkerInfo.GetChild], or +// [WorkerInfo.GetChildCount]. func (info *WorkerInfo) Add(w *Worker) bool { if info.sup == nil { return false @@ -158,11 +200,12 @@ func (info *WorkerInfo) Add(w *Worker) bool { info.childrenMu.Lock() defer info.childrenMu.Unlock() + info.pruneStoppedLocked() if _, ok := info.children[w.name]; ok { return false } - tok := addWorkerToSupervisor(info.sup, w, info.cfg, info.active, info.metrics) - info.children[w.name] = childEntry{token: tok, worker: w} + tok, done := addWorkerToSupervisor(info.sup, w, info.cfg, info.active, info.metrics) + info.children[w.name] = childEntry{token: tok, worker: w, done: done} return true } @@ -177,6 +220,22 @@ func (info *WorkerInfo) Remove(name string) { info.removeLocked(name) } +// pruneStoppedLocked removes children whose done channel is closed. +// It also removes the underlying supervisor service to prevent +// orphaned goroutines. Caller must hold childrenMu. +func (info *WorkerInfo) pruneStoppedLocked() { + for name, entry := range info.children { + if entry.done == nil { + continue + } + select { + case <-entry.done: + info.removeLocked(name) + default: + } + } +} + // removeLocked stops and deletes a child by name. Caller must hold childrenMu. func (info *WorkerInfo) removeLocked(name string) { if entry, ok := info.children[name]; ok { @@ -186,10 +245,12 @@ func (info *WorkerInfo) removeLocked(name string) { } // GetChildren returns the names of currently running child workers. +// Stopped children are lazily pruned before building the list. func (info *WorkerInfo) GetChildren() []string { info.childrenMu.Lock() defer info.childrenMu.Unlock() + info.pruneStoppedLocked() names := make([]string, 0, len(info.children)) for name := range info.children { names = append(names, name) @@ -198,13 +259,30 @@ func (info *WorkerInfo) GetChildren() []string { return names } +// GetChildCount returns the number of currently running child workers. +// This is more efficient than calling [WorkerInfo.GetChildren] and +// taking len, as it avoids allocating a sorted slice. +// Stopped children are lazily pruned. +func (info *WorkerInfo) GetChildCount() int { + info.childrenMu.Lock() + defer info.childrenMu.Unlock() + info.pruneStoppedLocked() + return len(info.children) +} + // GetChild returns a copy of a running child worker and true, or the zero // value and false if not found. The returned value is a snapshot — -// mutations have no effect on the running worker. +// mutations to the Worker fields have no effect on the running worker. +// +// The [CycleHandler] (accessible via [Worker.GetHandler]) is shared with +// the running worker, not copied. Use type assertion to inspect handler +// state (e.g., config versions for reconciliation). See +// [Example_reconcilerWithChangeDetection]. func (info *WorkerInfo) GetChild(name string) (Worker, bool) { info.childrenMu.Lock() defer info.childrenMu.Unlock() + info.pruneStoppedLocked() if entry, ok := info.children[name]; ok { return *entry.worker, true } @@ -262,6 +340,21 @@ func (w *Worker) GetName() string { return w.name } // GetHandler returns the worker's [CycleHandler], or nil if not set. func (w *Worker) GetHandler() CycleHandler { return w.handler } +// GetInterval returns the periodic interval, or 0 if this is not a +// periodic worker. +func (w *Worker) GetInterval() time.Duration { return w.interval } + +// GetRestartOnFail returns whether the worker restarts on failure. +func (w *Worker) GetRestartOnFail() bool { return w.restartOnFail } + +// GetJitterPercent returns the jitter percentage. -1 means inherit +// run-level default, 0 means no jitter. +func (w *Worker) GetJitterPercent() int { return w.jitterPercent } + +// GetInitialDelay returns the initial delay before the first tick, +// or 0 if not set. +func (w *Worker) GetInitialDelay() time.Duration { return w.initialDelay } + // Handler sets the worker's [CycleHandler]. Use this for handlers that // need cleanup via Close (e.g., database connections, leases). func (w *Worker) Handler(h CycleHandler) *Worker { @@ -315,6 +408,10 @@ func (w *Worker) AddInterceptors(mw ...Middleware) *Worker { // Default is true. Set to false for one-shot workers that should exit after // completion or failure. Note: a handler returning nil always stops the // worker permanently, regardless of this setting. +// +// Periodic workers (with [Worker.Every]) should generally keep the default +// (restart enabled). Use [ErrSkipTick] for transient failures and +// [ErrDoNotRestart] for permanent completion instead of disabling restart. func (w *Worker) WithRestart(restart bool) *Worker { w.restartOnFail = restart return w diff --git a/worker_test.go b/worker_test.go index b0b804c..3d81fda 100644 --- a/worker_test.go +++ b/worker_test.go @@ -89,7 +89,7 @@ func TestWorkerInfo(t *testing.T) { func TestWorkerInfo_Children_Nil(t *testing.T) { info := &WorkerInfo{name: "test"} - assert.Empty(t, info.GetChildren(), "Children on nil map should return empty slice") + assert.Empty(t, info.GetChildren(), "Children on nil supervisor should return empty slice") } func TestCycleFunc_Close(t *testing.T) { @@ -141,6 +141,17 @@ func TestWorker_WithBackoffJitter(t *testing.T) { assert.NotNil(t, w.backoffJitter) } +func TestWorkerInfo_GetHandler(t *testing.T) { + fn := CycleFunc(func(_ context.Context, _ *WorkerInfo) error { return nil }) + info := NewWorkerInfo("test", 0, WithTestHandler(fn)) + assert.NotNil(t, info.GetHandler()) +} + +func TestWorkerInfo_GetHandler_Nil(t *testing.T) { + info := NewWorkerInfo("test", 0) + assert.Nil(t, info.GetHandler()) +} + func TestWorkerInfo_GetChild(t *testing.T) { info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) @@ -225,6 +236,108 @@ func TestNewWorkerInfo_WithTestChildren(t *testing.T) { assert.Equal(t, []string{"b"}, info.GetChildren()) } +func TestWorkerInfo_ZombieChild_AutoCleanup(t *testing.T) { + info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) + + // Add a child that returns nil immediately (no restart). + info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + return nil + }).WithRestart(false)) + + time.Sleep(100 * time.Millisecond) // let child stop + + // Done channel is closed on permanent stop — lazy prune removes the child. + assert.Equal(t, 0, len(info.GetChildren())) + assert.Empty(t, info.GetChildren()) +} + +func TestWorkerInfo_ZombieChild_WithGrandchildren(t *testing.T) { + // A child that spawns grandchildren and then permanently stops + // should not remain visible to the parent. + info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) + + info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, childInfo *WorkerInfo) error { + // Spawn a grandchild that stays alive until context is cancelled. + childInfo.Add(NewWorker("grandchild").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error { + <-ctx.Done() + return ctx.Err() + })) + time.Sleep(20 * time.Millisecond) // let grandchild start + return ErrDoNotRestart // child permanently stops + })) + + time.Sleep(200 * time.Millisecond) + + assert.Empty(t, info.GetChildren(), "stopped child should not appear even with live grandchildren") +} + +func TestWorkerInfo_ZombieChild_ErrDoNotRestart(t *testing.T) { + info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) + + info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + return ErrDoNotRestart + })) + + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, 0, len(info.GetChildren())) + assert.Empty(t, info.GetChildren()) +} + +func TestWorkerInfo_ZombieChild_ReAdd(t *testing.T) { + info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) + + // Add a child that stops immediately. + info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + return nil + }).WithRestart(false)) + + time.Sleep(100 * time.Millisecond) + + // Stopped child is pruned — Add with same name should succeed. + added := info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error { + <-ctx.Done() + return ctx.Err() + })) + assert.True(t, added, "re-Add after zombie prune should succeed") + time.Sleep(20 * time.Millisecond) + assert.Equal(t, 1, len(info.GetChildren())) +} + +func TestWorkerInfo_ZombieChild_ReAdd_NoRead(t *testing.T) { + info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) + + // Add a child that stops immediately. + info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + return nil + }).WithRestart(false)) + + time.Sleep(100 * time.Millisecond) + + // Re-Add directly — Add prunes the stale entry before checking. + added := info.Add(NewWorker("child").HandlerFunc(func(ctx context.Context, _ *WorkerInfo) error { + <-ctx.Done() + return ctx.Err() + })) + assert.True(t, added, "Add should succeed after stopped child is pruned") + time.Sleep(20 * time.Millisecond) + assert.Equal(t, 1, len(info.GetChildren())) +} + +func TestWorkerInfo_ZombieChild_GetChild(t *testing.T) { + info := NewWorkerInfo("parent", 0, WithTestChildren(t.Context())) + + info.Add(NewWorker("child").HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { + return nil + }).WithRestart(false)) + + time.Sleep(100 * time.Millisecond) + + // GetChild prunes stopped child and returns false. + _, ok := info.GetChild("child") + assert.False(t, ok) +} + func TestNewWorkerInfo_Minimal(t *testing.T) { // Without options, Add/Remove/GetChildren are safe no-ops. info := NewWorkerInfo("test", 5) @@ -237,6 +350,29 @@ func TestNewWorkerInfo_Minimal(t *testing.T) { assert.Empty(t, info.GetChildren()) } +func TestWorker_ConfigGetters(t *testing.T) { + w := NewWorker("test"). + HandlerFunc(func(_ context.Context, _ *WorkerInfo) error { return nil }). + Every(30 * time.Second). + WithJitter(15). + WithInitialDelay(5 * time.Second). + WithRestart(false) + + assert.Equal(t, 30*time.Second, w.GetInterval()) + assert.Equal(t, 15, w.GetJitterPercent()) + assert.Equal(t, 5*time.Second, w.GetInitialDelay()) + assert.False(t, w.GetRestartOnFail()) +} + +func TestWorker_ConfigGetters_Defaults(t *testing.T) { + w := NewWorker("test") + + assert.Equal(t, time.Duration(0), w.GetInterval()) + assert.Equal(t, -1, w.GetJitterPercent()) + assert.Equal(t, time.Duration(0), w.GetInitialDelay()) + assert.True(t, w.GetRestartOnFail()) +} + func TestWorker_InterceptorsCopiesSlice(t *testing.T) { mw := func(_ context.Context, _ *WorkerInfo, next CycleFunc) error { return next(nil, nil) } original := []Middleware{mw}