Skip to content

Commit 1ad5a75

Browse files
jensneuseclaude
andauthored
feat(cache): negative caching, goroutine arenas, global key prefix, cache op errors (#1435)
## Summary - **Negative caching**: cache null entity responses (`NegativeCacheTTL`) to avoid repeated subgraph lookups for non-existent entities - **Per-goroutine arenas**: fix thread safety for L2 cache allocations during Phase 2 parallel execution via `l2ArenaPool` - **Global cache key prefix**: support schema versioning by prepending a configurable prefix (`GlobalCacheKeyPrefix`) to all L2 cache keys - **Cache operation error tracking**: record Get/Set/Delete failures in analytics (`CacheOperationError`) for operator observability - **Circuit breaker**: protect cache operations with configurable failure thresholds - **Comprehensive tests**: negative cache, mutation cache impact, arena thread safety (bench + GC), circuit breaker, L2 cache key interceptor ## Test plan - [ ] `go test ./v2/pkg/engine/resolve/... -v` passes - [ ] `go test ./execution/engine/... -v` passes - [ ] New negative cache tests cover null sentinel storage and retrieval - [ ] Arena thread safety bench + GC tests validate no data races under parallel load - [ ] Circuit breaker tests verify open/close state transitions 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Negative caching for null/not-found entities with configurable TTL * Circuit breaker protection for cache lookups * Global cache key prefix for schema-versioned keys * Cache operation TTL logging and enhanced error analytics * **Tests** * Negative caching, circuit breaker, mutation-impact, key-interceptor, L1/L2 e2e, and trigger tests * Arena thread-safety benchmarks and GC/concurrency tests * **Documentation** * Entity caching acceptance criteria document * Caching test sync/update guideline <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 683d690 commit 1ad5a75

22 files changed

Lines changed: 3295 additions & 60 deletions

ENTITY_CACHING_ACCEPTANCE_CRITERIA.md

Lines changed: 693 additions & 0 deletions
Large diffs are not rendered by default.

v2/pkg/engine/plan/federation_metadata.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ type EntityCacheConfiguration struct {
124124
// Instead, fresh data is always fetched from the subgraph and compared against the cached value
125125
// to detect staleness. L1 cache works normally (not affected by shadow mode).
126126
ShadowMode bool `json:"shadow_mode"`
127+
128+
// NegativeCacheTTL is the TTL for caching null entity results (entity not found).
129+
// When > 0, null responses (entity returned null without errors from _entities) are cached
130+
// as negative sentinels to avoid repeated subgraph lookups for non-existent entities.
131+
// When 0 (default), null entities are not cached and will be re-fetched on every request.
132+
NegativeCacheTTL time.Duration `json:"negative_cache_ttl,omitzero"`
127133
}
128134

129135
// EntityCacheConfigurations is a collection of entity cache configurations.

v2/pkg/engine/plan/visitor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,6 +2368,7 @@ func (v *Visitor) configureFetchCaching(internal *objectFetchConfiguration, exte
23682368
HashAnalyticsKeys: cacheConfig.HashAnalyticsKeys,
23692369
KeyFields: keyFields,
23702370
ShadowMode: cacheConfig.ShadowMode,
2371+
NegativeCacheTTL: cacheConfig.NegativeCacheTTL,
23712372
}
23722373
}
23732374

v2/pkg/engine/resolve/CLAUDE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,13 @@ L2Reads: []resolve.CacheKeyEvent{
554554

555555
Every `defaultCache.ClearLog()` MUST be followed by `defaultCache.GetLog()` with full assertions BEFORE the next `ClearLog()` or end of test. Never clear a log without verifying its contents.
556556

557+
### Caching Test / AC Sync Rule
558+
559+
**When modifying or adding caching-related tests**, you MUST also update `ENTITY_CACHING_ACCEPTANCE_CRITERIA.md` (in the repo root). Every AC must link to its covering tests with relative paths, line numbers, and test names. This applies to:
560+
- New caching tests (add test links to the relevant AC)
561+
- Changes to existing caching tests that affect which ACs are covered
562+
- New ACs (must have at least one test link)
563+
557564
### Run Tests
558565
```bash
559566
go test -run "TestL1Cache" ./v2/pkg/engine/resolve/... -v
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package resolve
2+
3+
import (
4+
"strconv"
5+
"sync"
6+
"testing"
7+
8+
"github.com/wundergraph/astjson"
9+
"github.com/wundergraph/go-arena"
10+
)
11+
12+
// cacheLoadAllocs simulates the allocation pattern of tryL2CacheLoad:
13+
// parse cached JSON bytes, create wrapper objects, allocate slices.
14+
func cacheLoadAllocs(a arena.Arena) {
15+
// 1. extractCacheKeysStrings: allocate slice + string bytes
16+
keys := arena.AllocateSlice[string](a, 0, 4)
17+
for range 4 {
18+
buf := arena.AllocateSlice[byte](a, 0, 64)
19+
buf = arena.SliceAppend(a, buf, []byte("cache:entity:Product:id:prod-1234")...)
20+
keys = arena.SliceAppend(a, keys, string(buf))
21+
}
22+
_ = keys
23+
24+
// 2. populateFromCache: parse JSON bytes
25+
v, _ := astjson.ParseBytesWithArena(a, []byte(`{"__typename":"Product","id":"prod-1234","name":"Test Product","price":29.99}`))
26+
27+
// 3. EntityMergePath wrapping: create wrapper objects
28+
obj := astjson.ObjectValue(a)
29+
obj.Set(a, "product", v)
30+
outer := astjson.ObjectValue(a)
31+
outer.Set(a, "data", obj)
32+
33+
// 4. denormalizeFromCache: create new object tree
34+
result := astjson.ObjectValue(a)
35+
result.Set(a, "productName", v.Get("name"))
36+
result.Set(a, "productPrice", v.Get("price"))
37+
}
38+
39+
// BenchmarkConcurrentArena measures Option A: single arena wrapped with NewConcurrentArena.
40+
// All goroutines allocate from the same mutex-protected arena.
41+
func BenchmarkConcurrentArena(b *testing.B) {
42+
for _, goroutines := range []int{1, 4, 8, 16} {
43+
b.Run(goroutineName(goroutines), func(b *testing.B) {
44+
a := arena.NewConcurrentArena(arena.NewMonotonicArena(arena.WithMinBufferSize(64 * 1024)))
45+
b.ResetTimer()
46+
for b.Loop() {
47+
var wg sync.WaitGroup
48+
for range goroutines {
49+
wg.Go(func() {
50+
cacheLoadAllocs(a)
51+
})
52+
}
53+
wg.Wait()
54+
a.Reset()
55+
}
56+
})
57+
}
58+
}
59+
60+
// BenchmarkPerGoroutineArena measures Option B: each goroutine gets its own arena from sync.Pool.
61+
// Zero lock contention on allocations.
62+
func BenchmarkPerGoroutineArena(b *testing.B) {
63+
pool := sync.Pool{
64+
New: func() any {
65+
return arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
66+
},
67+
}
68+
69+
for _, goroutines := range []int{1, 4, 8, 16} {
70+
b.Run(goroutineName(goroutines), func(b *testing.B) {
71+
b.ResetTimer()
72+
for b.Loop() {
73+
arenas := make([]arena.Arena, goroutines)
74+
var wg sync.WaitGroup
75+
for i := range goroutines {
76+
ga := pool.Get().(arena.Arena)
77+
arenas[i] = ga
78+
wg.Go(func() {
79+
cacheLoadAllocs(ga)
80+
})
81+
}
82+
wg.Wait()
83+
for _, ga := range arenas {
84+
ga.Reset()
85+
pool.Put(ga)
86+
}
87+
}
88+
})
89+
}
90+
}
91+
92+
func goroutineName(n int) string {
93+
return "goroutines=" + stringFromInt(n)
94+
}
95+
96+
func stringFromInt(n int) string {
97+
return strconv.Itoa(n)
98+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package resolve
2+
3+
import (
4+
"runtime"
5+
"runtime/debug"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/wundergraph/astjson"
12+
"github.com/wundergraph/go-arena"
13+
)
14+
15+
// TestCrossArenaMergeValuesCreatesShallowReferences proves that MergeValues
16+
// links *Value pointers from the source arena into the target arena's tree
17+
// without deep-copying. Resetting the source arena makes the merged values stale.
18+
//
19+
// This is the foundational invariant for AC-THREAD-04: goroutine arenas that
20+
// hold FromCache values must NOT be released before the response is fully rendered.
21+
func TestCrossArenaMergeValuesCreatesShallowReferences(t *testing.T) {
22+
old := debug.SetGCPercent(1)
23+
defer debug.SetGCPercent(old)
24+
25+
mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
26+
goroutineArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
27+
28+
// Parse entity data on the "goroutine" arena (simulates populateFromCache)
29+
fromCache, err := astjson.ParseBytesWithArena(goroutineArena, []byte(`{"id":"prod-1","name":"Widget"}`))
30+
require.NoError(t, err)
31+
32+
// Parse the target item on the main arena (simulates the response tree)
33+
item, err := astjson.ParseBytesWithArena(mainArena, []byte(`{"id":"prod-1"}`))
34+
require.NoError(t, err)
35+
36+
// Merge: this splices FromCache nodes into item's object tree
37+
merged, _, err := astjson.MergeValues(mainArena, item, fromCache)
38+
require.NoError(t, err)
39+
40+
// Verify merged result contains data from both arenas
41+
mergedJSON := string(merged.MarshalTo(nil))
42+
assert.Contains(t, mergedJSON, `"name":"Widget"`)
43+
assert.Contains(t, mergedJSON, `"id":"prod-1"`)
44+
45+
// Force GC to stress-test pointer validity — goroutine arena is still alive
46+
runtime.GC()
47+
runtime.GC()
48+
49+
// Values should still be valid since goroutine arena hasn't been reset
50+
postGCJSON := string(merged.MarshalTo(nil))
51+
assert.Equal(t, mergedJSON, postGCJSON,
52+
"merged values should survive GC when goroutine arena is still alive")
53+
54+
// Now reset the goroutine arena — simulates premature release
55+
goroutineArena.Reset()
56+
57+
// Overwrite the freed memory with different data
58+
_, _ = astjson.ParseBytesWithArena(goroutineArena, []byte(`{"id":"STALE","name":"CORRUPTED"}`))
59+
60+
// The merged tree still holds pointers into the (now overwritten) goroutine arena.
61+
// This proves MergeValues is shallow — accessing the stale data may panic or
62+
// return corrupted values.
63+
staleOrPanicked := func() (result string, panicked bool) {
64+
defer func() {
65+
if r := recover(); r != nil {
66+
panicked = true
67+
}
68+
}()
69+
return string(merged.MarshalTo(nil)), false
70+
}
71+
staleJSON, panicked := staleOrPanicked()
72+
assert.True(t, panicked || staleJSON != mergedJSON,
73+
"merged values should be stale or inaccessible after goroutine arena reset — "+
74+
"this proves MergeValues creates cross-arena shallow references")
75+
76+
runtime.KeepAlive(mainArena)
77+
runtime.KeepAlive(goroutineArena)
78+
}
79+
80+
// TestGoroutineArenaLifetimeWithDeferredRelease verifies the correct pattern:
81+
// goroutine arenas survive through the full resolve lifecycle and are only
82+
// released in Free(). This matches the Loader.goroutineArenas design.
83+
func TestGoroutineArenaLifetimeWithDeferredRelease(t *testing.T) {
84+
old := debug.SetGCPercent(1)
85+
defer debug.SetGCPercent(old)
86+
87+
mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
88+
89+
// Simulate multiple goroutines, each with their own arena
90+
const numGoroutines = 4
91+
goroutineArenas := make([]arena.Arena, numGoroutines)
92+
fromCacheValues := make([]*astjson.Value, numGoroutines)
93+
94+
for i := range numGoroutines {
95+
goroutineArenas[i] = arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
96+
var err error
97+
fromCacheValues[i], err = astjson.ParseBytesWithArena(
98+
goroutineArenas[i],
99+
[]byte(`{"id":"prod-`+stringFromInt(i+1)+`","name":"Product `+stringFromInt(i+1)+`"}`),
100+
)
101+
require.NoError(t, err)
102+
}
103+
104+
// Phase 4: merge all FromCache values into main arena tree
105+
items := make([]*astjson.Value, numGoroutines)
106+
for i := range numGoroutines {
107+
items[i], _ = astjson.ParseBytesWithArena(mainArena, []byte(`{"id":"prod-`+stringFromInt(i+1)+`"}`))
108+
merged, _, err := astjson.MergeValues(mainArena, items[i], fromCacheValues[i])
109+
require.NoError(t, err)
110+
items[i] = merged
111+
}
112+
113+
// GC pressure — all arenas still alive
114+
runtime.GC()
115+
runtime.GC()
116+
117+
// Verify all merged values are still valid (simulates response rendering)
118+
for i := range numGoroutines {
119+
json := string(items[i].MarshalTo(nil))
120+
assert.Contains(t, json, `"name":"Product `+stringFromInt(i+1)+`"`,
121+
"merged value %d should be readable with goroutine arenas alive", i)
122+
}
123+
124+
// Now release goroutine arenas (simulates Loader.Free())
125+
for _, a := range goroutineArenas {
126+
a.Reset()
127+
}
128+
129+
runtime.KeepAlive(mainArena)
130+
runtime.KeepAlive(goroutineArenas)
131+
}
132+
133+
// Benchmark_CrossArenaGCSafety exercises the goroutine arena pattern under GC
134+
// pressure. Each iteration creates goroutine arenas, merges values, renders the
135+
// result, then releases. runtime.GC() between iterations maximizes pressure on
136+
// any dangling pointers.
137+
func Benchmark_CrossArenaGCSafety(b *testing.B) {
138+
old := debug.SetGCPercent(1)
139+
defer debug.SetGCPercent(old)
140+
141+
entityJSON := []byte(`{"__typename":"Product","id":"prod-1","name":"Widget","price":9.99}`)
142+
itemJSON := []byte(`{"__typename":"Product","id":"prod-1"}`)
143+
144+
b.ResetTimer()
145+
for b.Loop() {
146+
mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
147+
goroutineArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096))
148+
149+
// Simulate goroutine: parse cached entity
150+
fromCache, err := astjson.ParseBytesWithArena(goroutineArena, entityJSON)
151+
if err != nil {
152+
b.Fatal(err)
153+
}
154+
155+
// Simulate Phase 4: merge into response tree
156+
item, err := astjson.ParseBytesWithArena(mainArena, itemJSON)
157+
if err != nil {
158+
b.Fatal(err)
159+
}
160+
merged, _, err := astjson.MergeValues(mainArena, item, fromCache)
161+
if err != nil {
162+
b.Fatal(err)
163+
}
164+
165+
// Simulate response rendering
166+
buf := merged.MarshalTo(nil)
167+
if len(buf) == 0 {
168+
b.Fatal("empty output")
169+
}
170+
171+
// Release (correct order: goroutine arena after rendering)
172+
goroutineArena.Reset()
173+
mainArena.Reset()
174+
175+
// GC pressure between iterations
176+
runtime.GC()
177+
}
178+
}

v2/pkg/engine/resolve/cache_analytics.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,18 @@ type MutationEvent struct {
140140
FreshBytes int
141141
}
142142

143+
// CacheOperationError records a cache operation (Get/Set/Delete) that returned an error.
144+
// Cache errors are non-fatal (the engine falls back to subgraph fetch), but tracking them
145+
// in analytics allows operators to detect cache infrastructure issues.
146+
type CacheOperationError struct {
147+
Operation string // "get", "set", or "delete"
148+
CacheName string // named cache instance
149+
EntityType string // entity type (empty for root fetches)
150+
DataSource string // subgraph name
151+
Message string // error message (truncated for safety)
152+
ItemCount int // number of keys involved in the failed operation
153+
}
154+
143155
// HeaderImpactEvent records a fresh fetch that wrote to L2 cache with header-prefixed keys.
144156
// A cross-request consumer can aggregate these events: when the same BaseKey appears with
145157
// different HeaderHash values but identical ResponseHash values, the forwarded headers
@@ -170,6 +182,8 @@ type CacheAnalyticsCollector struct {
170182
shadowComparisons []ShadowComparisonEvent // shadow mode staleness comparison events
171183
mutationEvents []MutationEvent // mutation entity impact events
172184
headerImpactEvents []HeaderImpactEvent // header impact events for L2 writes with header prefix
185+
cacheOpErrors []CacheOperationError // cache operation errors (main thread)
186+
l2CacheOpErrors []CacheOperationError // accumulated in goroutines, merged on main thread
173187
xxh *xxhash.Digest
174188
}
175189

@@ -322,6 +336,17 @@ func (c *CacheAnalyticsCollector) RecordHeaderImpactEvent(event HeaderImpactEven
322336
c.headerImpactEvents = append(c.headerImpactEvents, event)
323337
}
324338

339+
// RecordCacheOperationError records a cache operation error. Main thread only.
340+
func (c *CacheAnalyticsCollector) RecordCacheOperationError(event CacheOperationError) {
341+
c.cacheOpErrors = append(c.cacheOpErrors, event)
342+
}
343+
344+
// MergeL2CacheOpErrors merges cache operation errors collected in goroutines into the collector.
345+
// Must be called on the main thread.
346+
func (c *CacheAnalyticsCollector) MergeL2CacheOpErrors(events []CacheOperationError) {
347+
c.cacheOpErrors = append(c.cacheOpErrors, events...)
348+
}
349+
325350
// EntitySource returns the source for a given entity instance.
326351
// Returns FieldSourceSubgraph if no record is found (the default).
327352
func (c *CacheAnalyticsCollector) EntitySource(entityType, keyJSON string) FieldSource {
@@ -347,6 +372,7 @@ func (c *CacheAnalyticsCollector) Snapshot() CacheAnalyticsSnapshot {
347372
ShadowComparisons: deduplicateShadowComparisons(c.shadowComparisons),
348373
MutationEvents: c.mutationEvents,
349374
HeaderImpactEvents: deduplicateHeaderImpactEvents(c.headerImpactEvents),
375+
CacheOpErrors: c.cacheOpErrors,
350376
}
351377

352378
// Split write events into L1 and L2, then deduplicate each
@@ -487,6 +513,9 @@ type CacheAnalyticsSnapshot struct {
487513

488514
// Header impact events (L2 writes with header-prefixed keys)
489515
HeaderImpactEvents []HeaderImpactEvent
516+
517+
// Cache operation errors (Get/Set/Delete failures)
518+
CacheOpErrors []CacheOperationError
490519
}
491520

492521
// L1HitRate returns the L1 cache hit rate as a float64 in [0, 1].

0 commit comments

Comments
 (0)