diff --git a/ENTITY_CACHING_ACCEPTANCE_CRITERIA.md b/ENTITY_CACHING_ACCEPTANCE_CRITERIA.md new file mode 100644 index 0000000000..f4507b6b22 --- /dev/null +++ b/ENTITY_CACHING_ACCEPTANCE_CRITERIA.md @@ -0,0 +1,693 @@ +# Entity Caching Acceptance Criteria + +Two-level entity caching system for GraphQL federation: L1 (per-request, in-memory) eliminates +redundant entity fetches within a single request; L2 (cross-request, external) shares cached +entities across requests via external stores like Redis. + +## L1 Cache (Per-Request, In-Memory) + +### AC-L1-01: Request-scoped isolation +Each GraphQL request gets its own L1 cache instance (a fresh `sync.Map` on the Loader). +No data leaks between requests. The cache is discarded when the request completes. + +Tests: +- `v2/pkg/engine/resolve/l1_cache_test.go:24` — `TestL1Cache / "L1 hit - same entity fetched twice in same request"` + +### AC-L1-02: Entity fetches only +L1 caches entity fetch results (fetches with `@key`-based representations), not root field +query results. Root fields never _read_ from L1 — they use L2 for cross-request caching. +However, root fields that return entities can _populate_ L1 (see AC-L1-08), so that a +subsequent entity fetch within the same request can hit L1. + +Tests: +- `execution/engine/federation_caching_l1_test.go:56` — `TestL1CacheReducesHTTPCalls / "L1 enabled - entity fetches use L1 cache"` + +### AC-L1-03: Cache keys use only @key fields +L1 cache keys are derived exclusively from the entity's `@key` directive fields +(see AC-KEY-01 for canonical format). `@requires` fields are never included because +they vary per consuming subgraph and would fragment the cache. + +Tests: +- `v2/pkg/engine/resolve/cache_key_test.go:632` — `TestCachingRenderEntityQueryCacheKeyTemplate / "single entity with typename and id"` + +### AC-L1-04: Main-thread L1 check; full hit skips goroutine +L1 lookup happens in Phase 1 (`prepareCacheKeys` + `tryL1CacheLoad`) on the main thread, +before any goroutine is spawned. When every entity in a fetch batch is found in L1, the +fetch sets `cacheSkipFetch=true` and no goroutine is spawned for that fetch. The cached +values are used directly, saving both the goroutine allocation and the network call. + +Tests: +- `v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go:899` — `TestL1CacheSkipsParallelFetch` +- `execution/engine/federation_caching_l1_test.go:449` — `TestL1CacheSelfReferentialEntity / "L1 enabled - sameUserReviewers fetch entirely skipped via L1 cache"` + +### AC-L1-06: Disabled by default +L1 caching must be explicitly enabled per-request via +`ctx.ExecutionOptions.Caching.EnableL1Cache = true`. When disabled, every entity fetch +goes through the normal L2/subgraph path. + +Tests: +- `execution/engine/federation_caching_l1_test.go:93` — `TestL1CacheReducesHTTPCalls / "L1 disabled - more accounts calls without cache"` + +### AC-L1-07: Shallow copy on L1 read +Every L1 cache read returns a shallow copy of the cached value (via `shallowCopyProvidedFields`), +not a direct pointer. This prevents pointer aliasing that would cause stack overflow during +JSON merge when an entity type references itself (e.g., `User.friends` returns `[User]`). +The copy is unconditional — it always happens, even for non-self-referential entities — +because the overhead is minimal and the safety guarantee is universal. The copy includes +only the fields specified in `ProvidesData`, not the entire entity. + +_Future optimization_: for entities known to never self-reference, the copy could be skipped. + +Tests: +- `execution/engine/federation_caching_l1_test.go:344` — `TestL1CacheSelfReferentialEntity` +- `v2/pkg/engine/resolve/l1_cache_test.go:1993` — `TestShallowCopyWithAliases` (reads original name, writes alias) + +### AC-L1-08: Root field entity population +When a root field query (e.g., `topProducts`) returns entities, those entities are +extracted and stored in L1 using their `@key`-based cache keys. This means a subsequent +entity fetch for the same entity within the same request can hit L1 instead of making +another subgraph call. Requires `RootFieldL1EntityCacheKeyTemplates` to be configured. + +If the client's query doesn't select the `@key` fields (e.g., omits `id`), the cache key +is produced with an empty key object (`{"__typename":"Product","key":{}}`) and the entity +is silently stored under this degraded key. It will never match a real entity fetch, so the +behavior is benign but wasteful. + +Tests: +- `execution/engine/federation_caching_l1_test.go:667` — `TestL1CacheRootFieldEntityListPopulation` +- `v2/pkg/engine/resolve/l1_cache_test.go:1813` — `TestPopulateL1CacheForRootFieldEntities_MissingKeyFields` + +### AC-L1-09: Argument-variant coexistence via field merging +When the same entity is fetched with different field arguments (e.g., `friends(first:5)` +and `friends(first:20)`), each variant gets a unique suffixed field name +(e.g., `friends_`, `friends_`). When a second fetch for the same entity +arrives, L1 merges the new fields into the existing cached entity using first-writer-wins +semantics, so all arg variants coexist in a single cached entity. + +L2 also performs arg-variant merging during `updateL2Cache`: before writing a new entity, +existing cached fields from other arg variants are merged in via `MergeValues` so they +are not lost (see AC-L2-08). + +Tests: +- `execution/engine/federation_caching_entity_field_args_test.go:129` — `TestEntityFieldArgsCaching` +- `v2/pkg/engine/resolve/l1_cache_test.go:2609` — `TestMergeEntityFields` (6 subtests: new field added, existing preserved, nil dst/src, non-object, multiple fields coexist) + +## L1/L2 Interaction Ordering + +### AC-L1L2-01: L1 checked before L2; L1 hit skips L2 entirely +Within a single request, L1 is always checked first (Phase 1, main thread). When L1 has +a hit, L2 is never consulted and no subgraph call is made. This holds regardless of L2 +TTL state — even if the L2 entry is expired, stale, or missing, an L1 hit is authoritative. + +L1 is always fresh within a request because it is populated from the current request's own +subgraph fetches (or root field entity extraction), not from L2. L1 and L2 are independent +caches with different scopes: +- L1: per-request, in-memory, populated by fetches within the current request +- L2: cross-request, external, populated after successful subgraph calls + +Tests: +- `v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go:496` — `TestL1L2CacheEndToEnd / "L1+L2 - L1 hit prevents L2 lookup"` (two sequential entity fetches: first populates L1+L2, second hits L1 with zero L2 operations) +- `v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go:605` — `TestL1L2CacheEndToEnd / "L1+L2 - L1 miss, L2 hit provides data"` (L1 miss falls through to L2) +- `v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go:698` — `TestL1L2CacheEndToEnd / "L1+L2 - cross-request: L1 isolated, L2 shared"` (new request has empty L1, uses L2) +- `v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go:899` — `TestL1CacheSkipsParallelFetch` (L1 hit prevents goroutine spawn for parallel fetch) + +## L2 Cache (Cross-Request, External) + +### AC-L2-01: External cache via LoaderCache interface +L2 caching delegates to user-provided implementations of the `LoaderCache` interface +(`Get`/`Set`/`Delete`). Typical backends: Redis, Memcached. Multiple named cache instances +are supported (e.g., different Redis clusters for different entity types). + +Tests: +- `execution/engine/federation_caching_l2_test.go:20` — `TestL2CacheOnly / "L2 enabled - miss then hit across requests"` + +### AC-L2-02: L2 operations run in goroutines +L2 `Get` (cache read) and the fallback subgraph HTTP call happen in parallel goroutines +during Phase 2. This means `LoaderCache` implementations must be safe for concurrent +access from multiple goroutines. + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:828` — `TestCacheLoadSequential / "two sequential calls - miss then hit"` + +### AC-L2-03: Configurable TTL per entity type +Each entity type (or root field) can have its own TTL configured via +`EntityCacheConfiguration.TTL`. The TTL is passed to `LoaderCache.Set()`. If the cache +backend supports TTL introspection, it returns `RemainingTTL` on `Get` for analytics. + +Tests: +- `execution/engine/federation_caching_test.go:1386` — `TestFederationCaching / "TTL expiry"` + +### AC-L2-04: L2 keys follow canonical format with optional prefix +L2 cache keys use the canonical entity key format (see AC-KEY-01) or root field key +format (see AC-KEY-02), with an optional header hash prefix (AC-KEY-03) and optional +global prefix (AC-KEY-07) prepended for cache isolation. + +Tests: +- `v2/pkg/engine/resolve/cache_key_test.go:632` — `TestCachingRenderEntityQueryCacheKeyTemplate` +- `v2/pkg/engine/resolve/cache_key_test.go:13` — `TestCachingRenderRootQueryCacheKeyTemplate` + +### AC-L2-05: Disabled by default +L2 caching must be explicitly enabled per-request via +`ctx.ExecutionOptions.Caching.EnableL2Cache = true` AND configured per-subgraph with +entity/root field cache configurations. + +Tests: +- `execution/engine/federation_caching_l2_test.go:191` — `TestL2CacheOnly / "L2 disabled - no external cache operations"` + +### AC-L2-06: Normalization before storage +Before writing to L2, field names are normalized: aliases are replaced with original +schema field names, and fields with arguments get an xxhash suffix appended. This +ensures cached data is query-independent and can be reused across different GraphQL +operations that request the same entity. + +Tests: +- `v2/pkg/engine/resolve/l1_cache_test.go:1535` — `TestNormalizeForCache` (7 subtests: fast path, aliases, mixed, nested, __typename, CacheArgs suffix, alias+CacheArgs) +- `v2/pkg/engine/resolve/l1_cache_test.go:1693` — `TestNormalizeDenormalizeRoundTrip` (7 subtests: round-trip with CacheArgs, alias+CacheArgs, nested, arrays, __typename preservation) +- `v2/pkg/engine/resolve/l1_cache_test.go:1858` — `TestDenormalizeFromCache` (4 subtests: fast path, aliases, CacheArgs suffixed lookup, alias+CacheArgs) + +### AC-L2-07: Validation before serving cached data +When reading from L2, the cached entity is validated against the `ProvidesData` schema +(the set of fields the current fetch expects). Every required field must be present; if +any are missing, the cached entry is treated as a miss and the entity is refetched from +the subgraph. + +Tests: +- `execution/engine/federation_caching_l2_test.go:504` — `TestPartialEntityCaching / "only configured entities are cached"` +- `v2/pkg/engine/resolve/l1_cache_test.go:2159` — `TestValidateItemHasRequiredData` (22 subtests: nil, scalars, nullable/non-nullable, nested objects, arrays, CacheArgs suffixed lookup, empty arrays) +- `v2/pkg/engine/resolve/l1_cache_test.go:1953` — `TestValidateFieldDataWithAliases` (validates using original name on normalized cache data) + +### AC-L2-08: Failed validation preserves old entity for field merging +When L2 validation fails (cached entity is missing fields the current query needs), the +old cached entity is preserved in `FromCache`. After the subgraph returns fresh data, the +old and new entities are merged so that previously-cached fields from other arg variants +are not lost. The merged result is then written back to L2. + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:605` — `TestCacheLoadSequential / "single entity fetch with cache miss"` + +## Negative Caching + +### AC-NEG-01: Null entity responses cached as negative sentinels +When a subgraph returns `null` for an entity in `_entities` (entity not found, no errors), +and `NegativeCacheTTL > 0` is configured for that entity type, the null result is stored in +L2 as a sentinel value (`"null"` bytes). On subsequent requests, the sentinel is recognized +as a negative cache hit and served without calling the subgraph. + +This prevents repeated subgraph lookups for non-existent entities (e.g., a deleted product +that is still referenced by other entities). + +Tests: +- `v2/pkg/engine/resolve/negative_cache_test.go:60` — `TestNegativeCaching / "null entity stored as negative sentinel and served on second request"` + +### AC-NEG-02: Disabled by default (NegativeCacheTTL = 0) +When `NegativeCacheTTL` is 0 (default), null entity responses are NOT cached. Each request +re-fetches from the subgraph, preserving the pre-negative-caching behavior. + +Tests: +- `v2/pkg/engine/resolve/negative_cache_test.go:229` — `TestNegativeCaching / "negative caching disabled when NegativeCacheTTL is 0"` (subgraph called twice, no sentinel stored) + +### AC-NEG-03: Separate TTL for negative sentinels +Negative cache entries use `NegativeCacheTTL` (not the regular entity `TTL`) when calling +`LoaderCache.Set()`. This allows shorter TTLs for negative entries (e.g., 5s) compared to +regular entity data (e.g., 60s), so deleted entities are re-checked sooner. + +Tests: +- `v2/pkg/engine/resolve/negative_cache_test.go:353` — `TestNegativeCaching / "negative cache sentinel uses NegativeCacheTTL not regular TTL"` + +### AC-NEG-04: Per-entity-type opt-in +Negative caching is configured per entity type via `EntityCacheConfiguration.NegativeCacheTTL`. +Different entity types can have different negative cache TTLs, or have it disabled entirely +(TTL = 0). + +## Cache Key Construction + +### AC-KEY-01: Entity key format +Entity cache keys use the canonical format `{"__typename":"T","key":{...}}` where the +key object contains only the fields declared in the entity's `@key` directive. Composite +keys (multiple fields) and nested keys are supported. + +Tests: +- `v2/pkg/engine/resolve/cache_key_test.go:632` — `TestCachingRenderEntityQueryCacheKeyTemplate` + +### AC-KEY-02: Root field key format +Root field cache keys use `{"__typename":"Query","field":"fieldName","args":{...}}`. +Arguments are included when present. Root field keys can optionally map to entity keys +via `EntityKeyMappings` so that a root field query and an entity query share the same +cache entry. + +Tests: +- `v2/pkg/engine/resolve/cache_key_test.go:13` — `TestCachingRenderRootQueryCacheKeyTemplate` + +### AC-KEY-03: Subgraph header hash prefix +When `IncludeSubgraphHeaderPrefix` is enabled, the L2 cache key is prefixed with a hash +of the forwarded subgraph headers (e.g., auth tokens). Format: `{hash}:{json_key}`. This +ensures different auth contexts get separate cache entries, preventing data leakage +between tenants or users. + +Tests: +- `execution/engine/federation_caching_test.go:418` — `TestFederationCaching / "two subgraphs - with subgraph header prefix"` + +### AC-KEY-04: L2CacheKeyInterceptor transform +After the header prefix is applied, the key passes through an optional user-provided +`L2CacheKeyInterceptor` function. This allows custom transformations like adding tenant +prefixes or routing to different cache namespaces. The interceptor receives the subgraph +name and cache name as context. + +Tests: +- `v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go:80` — `TestL2CacheKeyInterceptor` + +### AC-KEY-05: Field argument suffix for entity fields +When an entity field has arguments (e.g., `friends(first:5)`), the _field name in the +cached entity data_ gets an `_<16-hex-digit-xxhash>` suffix computed from the sorted, +canonicalized argument values. This ensures `friends(first:5)` and `friends(first:20)` +produce different field names _within_ the cached entity and don't overwrite each other. + +Note: the suffix applies to field names in the stored JSON, not to the entity's L1 or L2 +cache key. Cache keys are always derived from `@key` fields only (see AC-KEY-01). +Both L1 and L2 use the `cacheFieldName()` function to apply these suffixes during +normalization before storage and during denormalization on read. + +Tests: +- `v2/pkg/engine/resolve/l1_cache_test.go:2502` — `TestComputeArgSuffix` (8 subtests: deterministic suffix, different values, null handling, sorted args, RemapVariables, object arg canonical JSON) + +### AC-KEY-06: Canonical JSON for deterministic hashing +Argument values are serialized as canonical JSON (object keys sorted alphabetically, +arrays in order, scalars as-is) before hashing. This guarantees the same logical arguments +always produce the same hash, regardless of the JSON key order sent by the client. + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:1979` — `TestWriteCanonicalJSON` + +### AC-KEY-07: Global cache key prefix for schema versioning +When `CachingOptions.GlobalCacheKeyPrefix` is set, the prefix is prepended to all L2 cache +keys (both entity and root field). Format: `{prefix}:{rest_of_key}`. This allows the +router to separate cache entries by schema version — when the schema changes, a new prefix +automatically invalidates all old cache entries without explicit cache flushing. + +The global prefix is applied as the outermost prefix, before the header hash prefix. When +both are active: `{global}:{header_hash}:{json_key}`. When only global prefix: +`{global}:{json_key}`. + +The global prefix is applied consistently across all cache operations: L2 reads, L2 writes, +extension-based invalidation, mutation invalidation, and subscription populate/invalidate. + +Tests: +- `v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go:504` — `TestL2CacheKeyInterceptor / "global prefix is prepended to L2 keys"` +- `v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go:597` — `TestL2CacheKeyInterceptor / "global prefix combined with interceptor"` + +## Partial Cache Loading + +### AC-PARTIAL-01: Default behavior (full refetch on any miss) +When `EnablePartialCacheLoad` is false (default), if any entity in a batch has a cache +miss, ALL entities in that batch are refetched from the subgraph. This keeps the cache +maximally fresh because every entity gets a new value on every batch that includes a miss. + +Tests: +- `execution/engine/partial_cache_test.go:233` — `TestPartialCacheLoading / "L2 partial cache loading disabled - all entities fetched even with partial cache hit"` + +### AC-PARTIAL-02: Partial loading fetches only missing entities +When `EnablePartialCacheLoad` is true, only entities with cache misses are included in the +subgraph fetch request. Cached entities are served directly from cache within their TTL. +The subgraph receives a smaller representations list containing only the missed entities. + +Tests: +- `execution/engine/partial_cache_test.go:85` — `TestPartialCacheLoading / "L2 partial cache loading enabled - only missing entities fetched"` + +### AC-PARTIAL-03: Freshness vs load tradeoff +Partial loading reduces subgraph load (fewer entities per request) at the cost of +potentially serving slightly stale data for the cached entities. Full refetch (default) +ensures maximum freshness but increases subgraph load. + +Tests: +- `v2/pkg/engine/resolve/l1_cache_test.go:555` — `TestL1CachePartialLoading / "partial cache loading with L2 - only missing entities fetched"` + +## Mutations and Cache Coherency + +### AC-MUT-01: Mutations never read from L2 +When the operation type is Mutation, the L2 cache is never consulted for reads. Mutations +always go to the subgraph to ensure they execute against live data. This prevents serving +stale cached data during write operations. + +Tests: +- `execution/engine/federation_caching_test.go:2165` — `TestFederationCaching_MutationSkipsL2Read` +- `v2/pkg/engine/resolve/cache_load_test.go:2225` — `TestMutationSkipsL2Read` (unit test: mutation skips L2 read and always fetches from subgraph) + +### AC-MUT-02: Mutations skip L2 writes by default +Mutation responses are not written to L2 cache by default. This is because mutation +responses often contain partial entity data that could overwrite a more complete cached +entity. + +Tests: +- `execution/engine/federation_caching_test.go:2447` — `TestFederationCaching / "mutation skips L2 write by default without EnableEntityL2CachePopulation"` + +### AC-MUT-03: Opt-in mutation L2 population +When `EnableMutationL2CachePopulation` is set to true for a specific mutation field, that +mutation's response IS written to L2. This is useful when a mutation returns a complete, +canonical entity representation that should update the cache. + +Tests: +- `execution/engine/federation_caching_l2_test.go:1115` — `TestMutationCacheInvalidationE2E` + +### AC-MUT-04: Mutation-triggered L2 invalidation +When `MutationCacheInvalidationConfiguration` is configured for a mutation, and the +mutation response contains an entity with `@key` fields, the corresponding L2 cache entry +is deleted. The cache key is constructed using the same pipeline as storage (typename + +key fields + header prefix + interceptor). Supports both single-entity responses (object) +and list responses (array) — each entity in the array is individually invalidated. + +Tests: +- `execution/engine/federation_caching_l2_test.go:1115` — `TestMutationCacheInvalidationE2E` +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:21` — `TestNavigateProvidesDataToField` (4 subtests: valid field, missing field, nil providesData, non-Object field) +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:71` — `TestBuildEntityKeyValue` (4 subtests: simple key, composite key, nested key, missing field) +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:128` — `TestBuildMutationEntityCacheKey` (3 subtests: basic key, with header prefix, with interceptor) +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:249` — `TestDetectMutationEntityImpact` (includes array response invalidation and non-object item skipping) + +### AC-MUT-05: Pre-delete cache read for analytics +When both cache invalidation and analytics are enabled, the cached value is read BEFORE +the delete operation. This allows the analytics system to compare the stale cached value +against the fresh mutation response to measure staleness. + +_Known limitation_: `LoaderCache.Delete()` returns only an error, not a success/existence +indicator. The analytics system cannot distinguish "key did not exist" from "key was +successfully deleted". This would require extending the `LoaderCache` interface. + +Tests: +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:378` — `TestDetectMutationEntityImpact / "analytics enabled, no cached value records MutationEvent with HadCachedValue=false"` + +### AC-MUT-06: Staleness detection via hash comparison +Mutation impact analytics computes xxhash of both the cached entity (pre-delete) and the +fresh mutation response (both filtered to `ProvidesData` fields only). If hashes differ, +the entity is marked as stale. This measures how often mutations actually change cached +data. + +_Note_: This mechanism (xxhash of `ProvidesData`-filtered fields) is shared with +shadow mode staleness detection (AC-SHADOW-03). The trigger differs (mutation response +vs shadow mode) but the comparison logic is identical. + +Tests: +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:416` — `TestDetectMutationEntityImpact / "analytics enabled, stale cached value records MutationEvent with IsStale=true"` + +## Extension-Based Invalidation + +### AC-EXT-01: Subgraph-driven invalidation signals +Subgraphs can include cache invalidation keys in their response extensions: +`{"extensions":{"cacheInvalidation":{"keys":[{"typename":"User","key":{"id":"1"}}]}}}`. +The engine processes these keys and deletes the corresponding L2 cache entries. + +Tests: +- `execution/engine/federation_caching_ext_invalidation_test.go:14` — `TestFederationCaching_ExtensionsInvalidation / "mutation with extensions invalidation clears L2 cache"` + +### AC-EXT-02: Key format matches storage format +Invalidation keys use the same `typename` + `key` structure as stored cache keys, ensuring +the correct entry is targeted for deletion. + +Tests: +- `execution/engine/federation_caching_ext_invalidation_test.go:90` — `TestFederationCaching_ExtensionsInvalidation / "multiple entities invalidated in single response"` + +### AC-EXT-03: Full key construction pipeline for deletion +The invalidation key goes through the same transformation pipeline as storage keys: +build JSON → apply header hash prefix → apply `L2CacheKeyInterceptor` → call +`cache.Delete()`. This ensures tenant-isolated keys are correctly invalidated. + +Tests: +- `execution/engine/federation_caching_ext_invalidation_test.go:214` — `TestFederationCaching_ExtensionsInvalidation / "with subgraph header prefix"` + +### AC-EXT-04: Works for queries and mutations +Extension-based invalidation is not restricted to mutation responses. A query response can +also include invalidation keys (e.g., when a subgraph detects data has changed since the +last cache write). + +Tests: +- `execution/engine/federation_caching_ext_invalidation_test.go:178` — `TestFederationCaching_ExtensionsInvalidation / "query response triggers invalidation"` + +### AC-EXT-05: Skip redundant delete-before-set +If the same entity key appears in both the invalidation keys and the cache write set of +the same fetch, the delete is skipped because the entry will be overwritten with fresh +data anyway. This avoids an unnecessary cache round-trip. + +Tests: +- `v2/pkg/engine/resolve/extensions_cache_invalidation_test.go:11` — `TestExtensionsCacheInvalidation` + +### AC-EXT-06: Prerequisites for extension invalidation +Extension-based invalidation requires: (1) L2 caching enabled, (2) `entityCacheConfigs` +present for the subgraph (to determine which named cache to delete from and whether header +prefix is needed), and (3) the `caches` map populated. + +Tests: +- `execution/engine/federation_caching_ext_invalidation_test.go:121` — `TestFederationCaching_ExtensionsInvalidation / "mutation without extensions does not delete"` + +## Subscription Caching + +### AC-SUB-01: Populate mode writes entities to L2 +In populate mode, each subscription event that returns entity data writes it to the L2 +cache. This keeps the cache warm with real-time data, so subsequent queries can serve +the latest state without hitting the subgraph. + +Tests: +- `execution/engine/federation_subscription_caching_test.go:330` — `TestFederationSubscriptionCaching / "subscription entity populates L2 - verified via cache"` + +### AC-SUB-02: Invalidate mode deletes L2 entries +In invalidate mode, each subscription event triggers L2 cache deletion for the received +entity (identified by `@key` fields). This is used when the subscription delivers only +key fields (not full entity data), signaling that the cached version is stale. + +Tests: +- `execution/engine/federation_subscription_caching_test.go:714` — `TestFederationSubscriptionCaching / "key-only subscription invalidates L2 cache"` + +### AC-SUB-03: Base key pipeline for subscription cache operations +Subscription cache operations (both populate and invalidate) apply the cache key +pipeline: template rendering → global prefix → header hash prefix → `L2CacheKeyInterceptor`. +The base path (template rendering, populate, invalidate) is covered by existing tests. +Global prefix and `L2CacheKeyInterceptor` integration within subscriptions is verified +by the code path (shared with `prepareCacheKeys`) but not yet exercised by dedicated +trigger-level tests. + +Tests: +- `v2/pkg/engine/resolve/trigger_cache_test.go:51` — `TestHandleTriggerEntityCache / "populate single entity"` (verifies base key pipeline for populate) +- `v2/pkg/engine/resolve/trigger_cache_test.go:224` — `TestHandleTriggerEntityCache / "invalidate mode deletes cache entry"` (verifies base key pipeline for invalidate) + +## Shadow Mode + +### AC-SHADOW-01: Never serves cached data; always fetches from subgraph +When shadow mode is enabled for an entity type, the subgraph is always called regardless +of cache state. L2 cached data is never used in the actual response — the client always +receives fresh data from the subgraph, even on a cache hit. + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:1324` — `TestShadowMode_L2_AlwaysFetches` + +### AC-SHADOW-02: Cache operations proceed normally +Despite not serving cached data, L2 reads and writes happen as usual. The cache stays +warm and populated. This allows measuring cache effectiveness without affecting +production traffic. + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:1504` — `TestShadowMode_StalenessDetection` + +### AC-SHADOW-03: Staleness detection via hash comparison +After both cached and fresh values are available, they are compared using xxhash. The +comparison uses only `ProvidesData` fields (the fields the fetch actually needs). Results +are recorded as `ShadowComparisonEvent` with `IsFresh` indicating whether cached data +matched. + +_Note_: This mechanism (xxhash of `ProvidesData`-filtered fields) is shared with +mutation staleness detection (AC-MUT-06). The trigger differs (shadow mode vs mutation +response) but the comparison logic is identical. + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:1504` — `TestShadowMode_StalenessDetection` + +### AC-SHADOW-04: Per-field hash comparison +In addition to the whole-entity comparison (AC-SHADOW-03), shadow mode records individual +xxhash values for each non-key field of the cached entity (tagged as `FieldSourceShadowCached`). +During response rendering, the same fields from fresh subgraph data are hashed (tagged as +`FieldSourceSubgraph`). By comparing per-field hashes across these two sources, consumers +can identify exactly which fields went stale, enabling field-level staleness analysis. + +Implementation: `loader_cache.go` iterates `ProvidesData` fields, computing xxhash per +field via `HashFieldValue`. The hashes appear in `CacheAnalyticsSnapshot.FieldHashes`. + +Tests: +- `execution/engine/federation_caching_analytics_test.go:679` — `TestCacheAnalyticsE2E / "shadow all entities - always fetches"` +- `v2/pkg/engine/resolve/l1_cache_test.go:2017` — `TestComputeHasAliases` (4 subtests: no aliases, direct alias, nested alias, alias in array item) + +### AC-SHADOW-05: L1 cache unaffected +Shadow mode only affects L2 behavior. L1 cache operates normally — it still caches and +serves entities within the same request, since L1 is always fresh (populated from the +current request's fetches). + +Tests: +- `v2/pkg/engine/resolve/cache_load_test.go:1687` — `TestShadowMode_L1_WorksNormally` + +## Thread Safety + +### AC-THREAD-01: L1 on main thread with sync.Map +L1 cache reads (`Load`) and writes (`Store`) use `sync.Map` and occur on the main thread +only. The `sync.Map` provides safety for the concurrent `LoadOrStore` pattern used during +root field entity population. + +Tests: +- `v2/pkg/engine/resolve/l1_cache_test.go:24` — `TestL1Cache / "L1 hit - same entity fetched twice in same request"` + +### AC-THREAD-02: L2 implementations must be goroutine-safe +L2 `LoaderCache.Get()`, `Set()`, and `Delete()` are called from goroutines during Phase 2 +parallel execution. Implementers must ensure thread-safe access (e.g., connection pooling +for Redis). + +Tests: +- `execution/engine/federation_caching_test.go:1435` — `TestFederationCaching / "concurrency with different IDs"` + +### AC-THREAD-03: Per-result analytics accumulation +During Phase 2, each goroutine accumulates analytics events (L2 key events, fetch timings, +errors) on its own per-result slice. After all goroutines complete (`g.Wait()`), the main +thread merges all per-result events into the single analytics collector via +`MergeL2Events`/`MergeL2FetchTimings`/`MergeL2Errors`. + +Tests: +- `v2/pkg/engine/resolve/cache_analytics_test.go:65` — `TestCacheAnalyticsCollector_MergeL2Events` + +### AC-THREAD-04: Per-goroutine arenas for thread-safe allocation +The JSON arena (`jsonArena`) uses a `MonotonicArena` which is NOT thread-safe. Phase 2 +goroutines that run `tryL2CacheLoad` allocate JSON values (in `extractCacheKeysStrings`, +`populateFromCache`, `EntityMergePath` wrapping, and `denormalizeFromCache`). + +To avoid data races, each Phase 2 goroutine receives its own arena from `l2ArenaPool` +(a `sync.Pool` of `MonotonicArena` instances). The per-goroutine arenas are stored in +`Loader.goroutineArenas` and released in `Loader.Free()` — NOT inside the goroutine — +because `astjson.MergeValues` is shallow (it links `*Value` pointers from the source into +the target tree without deep-copying). After merge, the response tree holds cross-arena +references into the goroutine arenas, which must remain valid until response rendering +completes. + +Tests: +- `v2/pkg/engine/resolve/arena_thread_safety_gc_test.go:21` — `TestCrossArenaMergeValuesCreatesShallowReferences` +- `v2/pkg/engine/resolve/arena_thread_safety_gc_test.go:83` — `TestGoroutineArenaLifetimeWithDeferredRelease` +- `v2/pkg/engine/resolve/arena_thread_safety_gc_test.go:137` — `Benchmark_CrossArenaGCSafety` +- `v2/pkg/engine/resolve/arena_thread_safety_bench_test.go:40` — `BenchmarkConcurrentArena` +- `v2/pkg/engine/resolve/arena_thread_safety_bench_test.go:61` — `BenchmarkPerGoroutineArena` +- `v2/pkg/engine/resolve/loader_arena_gc_test.go:102` — `Benchmark_ArenaGCSafety` + +## Error Handling + +### AC-ERR-01: Cache errors are non-fatal +All cache operations (`Get`, `Set`, `Delete`) are non-fatal. A cache failure never causes +the GraphQL request to fail — the engine falls back to fetching from the subgraph. +When analytics is enabled, cache operation errors are recorded as `CacheOperationError` +events (see AC-ANA-06) so that infrastructure issues are visible to operators. + +Tests: +- `execution/engine/federation_caching_l2_test.go:788` — `TestCacheNotPopulatedOnErrors` +- `v2/pkg/engine/resolve/cache_load_test.go:2077` — `TestL2CacheErrorResilience` (Get error falls through to fetch, Set error still returns correct response) + +### AC-ERR-02: Subgraph errors prevent cache population +When a subgraph returns an error response, the result is NOT written to L2 cache. This +prevents caching error responses that would be served to subsequent requests. + +Tests: +- `execution/engine/federation_caching_l2_test.go:788` — `TestCacheNotPopulatedOnErrors` + +### AC-ERR-03: Graceful degradation on validation failure +When L2 returns a cached entity that fails `ProvidesData` validation (missing required +fields), the system gracefully refetches from the subgraph rather than serving incomplete +data. The old cached entity is preserved for field merging (AC-L2-08). + +Tests: +- `execution/engine/federation_caching_l2_test.go:504` — `TestPartialEntityCaching / "only configured entities are cached"` + +## L2 Circuit Breaker + +### AC-CB-01: Configurable per-cache circuit breaker +Each named L2 cache can have a circuit breaker via `ResolverOptions.CacheCircuitBreakers`. +The breaker wraps the `LoaderCache` interface transparently — callers (loader, resolver) +don't need any changes. + +Configuration: +- `FailureThreshold`: consecutive failures to trip open (default: 5) +- `CooldownPeriod`: duration in open state before half-open probe (default: 10s) + +Tests: +- `v2/pkg/engine/resolve/circuit_breaker_test.go:44` — `TestCircuitBreaker` (7 subtests: stays closed below threshold, opens after N failures, open skips cache, half-open probe success/failure, concurrent safety, success resets count) + +### AC-CB-02: Three-state lifecycle +The circuit breaker follows the standard Closed → Open → Half-Open pattern: +- **Closed**: all operations pass through to the underlying cache +- **Open**: `Get` returns `(nil, nil)` (all-miss), `Set`/`Delete` return `nil` (no-op) +- **Half-Open**: after `CooldownPeriod`, the next operation is allowed through as a probe; + success closes the breaker, failure re-opens it + +Tests: +- `v2/pkg/engine/resolve/circuit_breaker_test.go:44` — covers all three states and transitions + +### AC-CB-03: Non-blocking failure isolation +When open, the breaker returns immediately without contacting the cache backend. This +prevents cascading failures when the cache is down (e.g., Redis timeout) from affecting +GraphQL request latency. The engine falls back to subgraph fetches transparently. + +## Analytics + +### AC-ANA-01: Event-level tracking +Every L1 and L2 read/write operation records a structured event containing: cache level +(L1/L2), entity type, cache key, data source name, byte size, and TTL. Events are +collected per-request in the `CacheAnalyticsCollector`. + +Tests: +- `execution/engine/federation_caching_analytics_test.go:106` — `TestCacheAnalyticsE2E / "L2 miss then hit with analytics"` + +### AC-ANA-02: Fetch timing instrumentation +Each subgraph HTTP call records: request duration, HTTP status code, time-to-first-byte, +and response body size. These timings are available in the snapshot for correlating cache +performance with fetch latency. + +Tests: +- `execution/engine/federation_caching_analytics_test.go:505` — `TestCacheAnalyticsE2E / "subgraph fetch records HTTPStatusCode and ResponseBytes"` + +### AC-ANA-03: Aggregate convenience methods +The `CacheAnalyticsSnapshot` provides pre-computed metrics: `L1HitRate()`, `L2HitRate()`, +`CachedBytesServed()`, `SubgraphCallsAvoided()`, `AvgCacheAgeMs()`, etc. These are +derived from the raw events at snapshot time. + +Tests: +- `v2/pkg/engine/resolve/cache_analytics_test.go:239` — `TestCacheAnalyticsCollector_SnapshotDerivedMetrics` + +### AC-ANA-04: Event deduplication in snapshots +When `Snapshot()` is called, duplicate events (same CacheKey + Kind combination) are +removed to prevent double-counting from retry or re-merge scenarios. + +Tests: +- `v2/pkg/engine/resolve/cache_analytics_test.go:1679` — `TestSnapshotDeduplication` + +### AC-ANA-05: Header impact analytics +When `IncludeSubgraphHeaderPrefix` is active, the system records `HeaderImpactEvent`s +containing the base key (without header hash) and the response hash. By comparing response +hashes across different header hash values, consumers can detect whether the header prefix +is actually necessary — if all responses are identical regardless of headers, the prefix +adds cache fragmentation without benefit. + +Tests: +- `execution/engine/federation_caching_analytics_test.go:1791` — `TestCacheAnalyticsE2E / "shadow mode with header prefix - same response different headers"` +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:216` — `TestBuildMutationEntityDisplayKey` (display key always without prefix) + +### AC-ANA-06: Cache operation error tracking +When analytics is enabled, L2 cache operation errors (`Get`, `Set`, `Delete`) are recorded +as `CacheOperationError` events in the analytics snapshot. Each event contains the operation +type, cache name, entity type, data source, error message (truncated to 256 chars), and +the number of keys involved. This allows operators to detect cache infrastructure issues +(e.g., Redis timeouts, connection failures) without requiring a logger on the Loader. + +Tests: +- `v2/pkg/engine/resolve/mutation_cache_impact_test.go:625` — `TestDetectMutationEntityImpact / "array response invalidates all entities in the list"` + +## Future Improvements + +The following features are not yet implemented but are planned or under consideration: + +- **Stale-While-Revalidate (SWR)**: Serve stale cached data immediately while revalidating + asynchronously in the background. Would reduce tail latency for cache-miss scenarios + by serving slightly stale data rather than waiting for the subgraph. + +- **Tag-based invalidation**: Associate cache entries with tags (e.g., `team:123`) and + invalidate all entries with a given tag in a single operation. Would simplify bulk + invalidation for related entities. + +- **Cache entry compression**: Compress cached entity data (e.g., with zstd or gzip) to + reduce memory and network usage for large entities in external cache stores. diff --git a/v2/pkg/engine/plan/federation_metadata.go b/v2/pkg/engine/plan/federation_metadata.go index eb8efb6d23..413e92a245 100644 --- a/v2/pkg/engine/plan/federation_metadata.go +++ b/v2/pkg/engine/plan/federation_metadata.go @@ -124,6 +124,12 @@ type EntityCacheConfiguration struct { // Instead, fresh data is always fetched from the subgraph and compared against the cached value // to detect staleness. L1 cache works normally (not affected by shadow mode). ShadowMode bool `json:"shadow_mode"` + + // NegativeCacheTTL is the TTL for caching null entity results (entity not found). + // When > 0, null responses (entity returned null without errors from _entities) are cached + // as negative sentinels to avoid repeated subgraph lookups for non-existent entities. + // When 0 (default), null entities are not cached and will be re-fetched on every request. + NegativeCacheTTL time.Duration `json:"negative_cache_ttl,omitzero"` } // EntityCacheConfigurations is a collection of entity cache configurations. diff --git a/v2/pkg/engine/plan/visitor.go b/v2/pkg/engine/plan/visitor.go index d3984b89e8..824354bfe5 100644 --- a/v2/pkg/engine/plan/visitor.go +++ b/v2/pkg/engine/plan/visitor.go @@ -2368,6 +2368,7 @@ func (v *Visitor) configureFetchCaching(internal *objectFetchConfiguration, exte HashAnalyticsKeys: cacheConfig.HashAnalyticsKeys, KeyFields: keyFields, ShadowMode: cacheConfig.ShadowMode, + NegativeCacheTTL: cacheConfig.NegativeCacheTTL, } } diff --git a/v2/pkg/engine/resolve/CLAUDE.md b/v2/pkg/engine/resolve/CLAUDE.md index 67e4b156f8..e730028744 100644 --- a/v2/pkg/engine/resolve/CLAUDE.md +++ b/v2/pkg/engine/resolve/CLAUDE.md @@ -554,6 +554,13 @@ L2Reads: []resolve.CacheKeyEvent{ Every `defaultCache.ClearLog()` MUST be followed by `defaultCache.GetLog()` with full assertions BEFORE the next `ClearLog()` or end of test. Never clear a log without verifying its contents. +### Caching Test / AC Sync Rule + +**When modifying or adding caching-related tests**, you MUST also update `ENTITY_CACHING_ACCEPTANCE_CRITERIA.md` (in the repo root). Every AC must link to its covering tests with relative paths, line numbers, and test names. This applies to: +- New caching tests (add test links to the relevant AC) +- Changes to existing caching tests that affect which ACs are covered +- New ACs (must have at least one test link) + ### Run Tests ```bash go test -run "TestL1Cache" ./v2/pkg/engine/resolve/... -v diff --git a/v2/pkg/engine/resolve/arena_thread_safety_bench_test.go b/v2/pkg/engine/resolve/arena_thread_safety_bench_test.go new file mode 100644 index 0000000000..887b741b0b --- /dev/null +++ b/v2/pkg/engine/resolve/arena_thread_safety_bench_test.go @@ -0,0 +1,98 @@ +package resolve + +import ( + "strconv" + "sync" + "testing" + + "github.com/wundergraph/astjson" + "github.com/wundergraph/go-arena" +) + +// cacheLoadAllocs simulates the allocation pattern of tryL2CacheLoad: +// parse cached JSON bytes, create wrapper objects, allocate slices. +func cacheLoadAllocs(a arena.Arena) { + // 1. extractCacheKeysStrings: allocate slice + string bytes + keys := arena.AllocateSlice[string](a, 0, 4) + for range 4 { + buf := arena.AllocateSlice[byte](a, 0, 64) + buf = arena.SliceAppend(a, buf, []byte("cache:entity:Product:id:prod-1234")...) + keys = arena.SliceAppend(a, keys, string(buf)) + } + _ = keys + + // 2. populateFromCache: parse JSON bytes + v, _ := astjson.ParseBytesWithArena(a, []byte(`{"__typename":"Product","id":"prod-1234","name":"Test Product","price":29.99}`)) + + // 3. EntityMergePath wrapping: create wrapper objects + obj := astjson.ObjectValue(a) + obj.Set(a, "product", v) + outer := astjson.ObjectValue(a) + outer.Set(a, "data", obj) + + // 4. denormalizeFromCache: create new object tree + result := astjson.ObjectValue(a) + result.Set(a, "productName", v.Get("name")) + result.Set(a, "productPrice", v.Get("price")) +} + +// BenchmarkConcurrentArena measures Option A: single arena wrapped with NewConcurrentArena. +// All goroutines allocate from the same mutex-protected arena. +func BenchmarkConcurrentArena(b *testing.B) { + for _, goroutines := range []int{1, 4, 8, 16} { + b.Run(goroutineName(goroutines), func(b *testing.B) { + a := arena.NewConcurrentArena(arena.NewMonotonicArena(arena.WithMinBufferSize(64 * 1024))) + b.ResetTimer() + for b.Loop() { + var wg sync.WaitGroup + for range goroutines { + wg.Go(func() { + cacheLoadAllocs(a) + }) + } + wg.Wait() + a.Reset() + } + }) + } +} + +// BenchmarkPerGoroutineArena measures Option B: each goroutine gets its own arena from sync.Pool. +// Zero lock contention on allocations. +func BenchmarkPerGoroutineArena(b *testing.B) { + pool := sync.Pool{ + New: func() any { + return arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + }, + } + + for _, goroutines := range []int{1, 4, 8, 16} { + b.Run(goroutineName(goroutines), func(b *testing.B) { + b.ResetTimer() + for b.Loop() { + arenas := make([]arena.Arena, goroutines) + var wg sync.WaitGroup + for i := range goroutines { + ga := pool.Get().(arena.Arena) + arenas[i] = ga + wg.Go(func() { + cacheLoadAllocs(ga) + }) + } + wg.Wait() + for _, ga := range arenas { + ga.Reset() + pool.Put(ga) + } + } + }) + } +} + +func goroutineName(n int) string { + return "goroutines=" + stringFromInt(n) +} + +func stringFromInt(n int) string { + return strconv.Itoa(n) +} diff --git a/v2/pkg/engine/resolve/arena_thread_safety_gc_test.go b/v2/pkg/engine/resolve/arena_thread_safety_gc_test.go new file mode 100644 index 0000000000..b3c880bfea --- /dev/null +++ b/v2/pkg/engine/resolve/arena_thread_safety_gc_test.go @@ -0,0 +1,178 @@ +package resolve + +import ( + "runtime" + "runtime/debug" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/astjson" + "github.com/wundergraph/go-arena" +) + +// TestCrossArenaMergeValuesCreatesShallowReferences proves that MergeValues +// links *Value pointers from the source arena into the target arena's tree +// without deep-copying. Resetting the source arena makes the merged values stale. +// +// This is the foundational invariant for AC-THREAD-04: goroutine arenas that +// hold FromCache values must NOT be released before the response is fully rendered. +func TestCrossArenaMergeValuesCreatesShallowReferences(t *testing.T) { + old := debug.SetGCPercent(1) + defer debug.SetGCPercent(old) + + mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + goroutineArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + + // Parse entity data on the "goroutine" arena (simulates populateFromCache) + fromCache, err := astjson.ParseBytesWithArena(goroutineArena, []byte(`{"id":"prod-1","name":"Widget"}`)) + require.NoError(t, err) + + // Parse the target item on the main arena (simulates the response tree) + item, err := astjson.ParseBytesWithArena(mainArena, []byte(`{"id":"prod-1"}`)) + require.NoError(t, err) + + // Merge: this splices FromCache nodes into item's object tree + merged, _, err := astjson.MergeValues(mainArena, item, fromCache) + require.NoError(t, err) + + // Verify merged result contains data from both arenas + mergedJSON := string(merged.MarshalTo(nil)) + assert.Contains(t, mergedJSON, `"name":"Widget"`) + assert.Contains(t, mergedJSON, `"id":"prod-1"`) + + // Force GC to stress-test pointer validity — goroutine arena is still alive + runtime.GC() + runtime.GC() + + // Values should still be valid since goroutine arena hasn't been reset + postGCJSON := string(merged.MarshalTo(nil)) + assert.Equal(t, mergedJSON, postGCJSON, + "merged values should survive GC when goroutine arena is still alive") + + // Now reset the goroutine arena — simulates premature release + goroutineArena.Reset() + + // Overwrite the freed memory with different data + _, _ = astjson.ParseBytesWithArena(goroutineArena, []byte(`{"id":"STALE","name":"CORRUPTED"}`)) + + // The merged tree still holds pointers into the (now overwritten) goroutine arena. + // This proves MergeValues is shallow — accessing the stale data may panic or + // return corrupted values. + staleOrPanicked := func() (result string, panicked bool) { + defer func() { + if r := recover(); r != nil { + panicked = true + } + }() + return string(merged.MarshalTo(nil)), false + } + staleJSON, panicked := staleOrPanicked() + assert.True(t, panicked || staleJSON != mergedJSON, + "merged values should be stale or inaccessible after goroutine arena reset — "+ + "this proves MergeValues creates cross-arena shallow references") + + runtime.KeepAlive(mainArena) + runtime.KeepAlive(goroutineArena) +} + +// TestGoroutineArenaLifetimeWithDeferredRelease verifies the correct pattern: +// goroutine arenas survive through the full resolve lifecycle and are only +// released in Free(). This matches the Loader.goroutineArenas design. +func TestGoroutineArenaLifetimeWithDeferredRelease(t *testing.T) { + old := debug.SetGCPercent(1) + defer debug.SetGCPercent(old) + + mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + + // Simulate multiple goroutines, each with their own arena + const numGoroutines = 4 + goroutineArenas := make([]arena.Arena, numGoroutines) + fromCacheValues := make([]*astjson.Value, numGoroutines) + + for i := range numGoroutines { + goroutineArenas[i] = arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + var err error + fromCacheValues[i], err = astjson.ParseBytesWithArena( + goroutineArenas[i], + []byte(`{"id":"prod-`+stringFromInt(i+1)+`","name":"Product `+stringFromInt(i+1)+`"}`), + ) + require.NoError(t, err) + } + + // Phase 4: merge all FromCache values into main arena tree + items := make([]*astjson.Value, numGoroutines) + for i := range numGoroutines { + items[i], _ = astjson.ParseBytesWithArena(mainArena, []byte(`{"id":"prod-`+stringFromInt(i+1)+`"}`)) + merged, _, err := astjson.MergeValues(mainArena, items[i], fromCacheValues[i]) + require.NoError(t, err) + items[i] = merged + } + + // GC pressure — all arenas still alive + runtime.GC() + runtime.GC() + + // Verify all merged values are still valid (simulates response rendering) + for i := range numGoroutines { + json := string(items[i].MarshalTo(nil)) + assert.Contains(t, json, `"name":"Product `+stringFromInt(i+1)+`"`, + "merged value %d should be readable with goroutine arenas alive", i) + } + + // Now release goroutine arenas (simulates Loader.Free()) + for _, a := range goroutineArenas { + a.Reset() + } + + runtime.KeepAlive(mainArena) + runtime.KeepAlive(goroutineArenas) +} + +// Benchmark_CrossArenaGCSafety exercises the goroutine arena pattern under GC +// pressure. Each iteration creates goroutine arenas, merges values, renders the +// result, then releases. runtime.GC() between iterations maximizes pressure on +// any dangling pointers. +func Benchmark_CrossArenaGCSafety(b *testing.B) { + old := debug.SetGCPercent(1) + defer debug.SetGCPercent(old) + + entityJSON := []byte(`{"__typename":"Product","id":"prod-1","name":"Widget","price":9.99}`) + itemJSON := []byte(`{"__typename":"Product","id":"prod-1"}`) + + b.ResetTimer() + for b.Loop() { + mainArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + goroutineArena := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + + // Simulate goroutine: parse cached entity + fromCache, err := astjson.ParseBytesWithArena(goroutineArena, entityJSON) + if err != nil { + b.Fatal(err) + } + + // Simulate Phase 4: merge into response tree + item, err := astjson.ParseBytesWithArena(mainArena, itemJSON) + if err != nil { + b.Fatal(err) + } + merged, _, err := astjson.MergeValues(mainArena, item, fromCache) + if err != nil { + b.Fatal(err) + } + + // Simulate response rendering + buf := merged.MarshalTo(nil) + if len(buf) == 0 { + b.Fatal("empty output") + } + + // Release (correct order: goroutine arena after rendering) + goroutineArena.Reset() + mainArena.Reset() + + // GC pressure between iterations + runtime.GC() + } +} diff --git a/v2/pkg/engine/resolve/cache_analytics.go b/v2/pkg/engine/resolve/cache_analytics.go index b4836c56f0..f45a9129e6 100644 --- a/v2/pkg/engine/resolve/cache_analytics.go +++ b/v2/pkg/engine/resolve/cache_analytics.go @@ -140,6 +140,18 @@ type MutationEvent struct { FreshBytes int } +// CacheOperationError records a cache operation (Get/Set/Delete) that returned an error. +// Cache errors are non-fatal (the engine falls back to subgraph fetch), but tracking them +// in analytics allows operators to detect cache infrastructure issues. +type CacheOperationError struct { + Operation string // "get", "set", or "delete" + CacheName string // named cache instance + EntityType string // entity type (empty for root fetches) + DataSource string // subgraph name + Message string // error message (truncated for safety) + ItemCount int // number of keys involved in the failed operation +} + // HeaderImpactEvent records a fresh fetch that wrote to L2 cache with header-prefixed keys. // A cross-request consumer can aggregate these events: when the same BaseKey appears with // different HeaderHash values but identical ResponseHash values, the forwarded headers @@ -170,6 +182,8 @@ type CacheAnalyticsCollector struct { shadowComparisons []ShadowComparisonEvent // shadow mode staleness comparison events mutationEvents []MutationEvent // mutation entity impact events headerImpactEvents []HeaderImpactEvent // header impact events for L2 writes with header prefix + cacheOpErrors []CacheOperationError // cache operation errors (main thread) + l2CacheOpErrors []CacheOperationError // accumulated in goroutines, merged on main thread xxh *xxhash.Digest } @@ -322,6 +336,17 @@ func (c *CacheAnalyticsCollector) RecordHeaderImpactEvent(event HeaderImpactEven c.headerImpactEvents = append(c.headerImpactEvents, event) } +// RecordCacheOperationError records a cache operation error. Main thread only. +func (c *CacheAnalyticsCollector) RecordCacheOperationError(event CacheOperationError) { + c.cacheOpErrors = append(c.cacheOpErrors, event) +} + +// MergeL2CacheOpErrors merges cache operation errors collected in goroutines into the collector. +// Must be called on the main thread. +func (c *CacheAnalyticsCollector) MergeL2CacheOpErrors(events []CacheOperationError) { + c.cacheOpErrors = append(c.cacheOpErrors, events...) +} + // EntitySource returns the source for a given entity instance. // Returns FieldSourceSubgraph if no record is found (the default). func (c *CacheAnalyticsCollector) EntitySource(entityType, keyJSON string) FieldSource { @@ -347,6 +372,7 @@ func (c *CacheAnalyticsCollector) Snapshot() CacheAnalyticsSnapshot { ShadowComparisons: deduplicateShadowComparisons(c.shadowComparisons), MutationEvents: c.mutationEvents, HeaderImpactEvents: deduplicateHeaderImpactEvents(c.headerImpactEvents), + CacheOpErrors: c.cacheOpErrors, } // Split write events into L1 and L2, then deduplicate each @@ -487,6 +513,9 @@ type CacheAnalyticsSnapshot struct { // Header impact events (L2 writes with header-prefixed keys) HeaderImpactEvents []HeaderImpactEvent + + // Cache operation errors (Get/Set/Delete failures) + CacheOpErrors []CacheOperationError } // L1HitRate returns the L1 cache hit rate as a float64 in [0, 1]. diff --git a/v2/pkg/engine/resolve/cache_load_test.go b/v2/pkg/engine/resolve/cache_load_test.go index 4ec9a6016d..d9a28cee34 100644 --- a/v2/pkg/engine/resolve/cache_load_test.go +++ b/v2/pkg/engine/resolve/cache_load_test.go @@ -1098,9 +1098,10 @@ func TestCacheLoadSequential(t *testing.T) { // CacheLogEntry tracks a cache operation for testing type CacheLogEntry struct { - Operation string // "get", "set", "delete" - Keys []string // Keys involved in the operation - Hits []bool // For Get: whether each key was a hit (true) or miss (false) + Operation string // "get", "set", "delete" + Keys []string // Keys involved in the operation + Hits []bool // For Get: whether each key was a hit (true) or miss (false) + TTL time.Duration // For Set: the TTL passed to the operation } type cacheEntry struct { @@ -1211,6 +1212,7 @@ func (f *FakeLoaderCache) Set(ctx context.Context, entries []*CacheEntry, ttl ti Operation: "set", Keys: keys, Hits: nil, // Set operations don't have hits/misses + TTL: ttl, }) return nil @@ -1253,6 +1255,18 @@ func (f *FakeLoaderCache) ClearLog() { f.log = make([]CacheLogEntry, 0) } +// GetValue returns the raw cached value for a key, or nil if not found. +func (f *FakeLoaderCache) GetValue(key string) []byte { + f.mu.RLock() + defer f.mu.RUnlock() + if entry, exists := f.storage[key]; exists { + dataCopy := make([]byte, len(entry.data)) + copy(dataCopy, entry.data) + return dataCopy + } + return nil +} + // Clear removes all entries from the cache func (f *FakeLoaderCache) Clear() { f.mu.Lock() diff --git a/v2/pkg/engine/resolve/caching.go b/v2/pkg/engine/resolve/caching.go index d8ae11fb8a..572821d665 100644 --- a/v2/pkg/engine/resolve/caching.go +++ b/v2/pkg/engine/resolve/caching.go @@ -21,6 +21,9 @@ type CacheKey struct { // On STORE: extracts entity-level data at this path (e.g., ["user"] extracts from {"user":{...}}). // On LOAD: wraps cached entity-level data back at this path (e.g., wraps {...} into {"user":{...}}). EntityMergePath []string + // NegativeCacheHit is set during mergeResult when the subgraph returned null for this entity. + // Used by updateL2Cache to store a null sentinel with NegativeCacheTTL instead of regular TTL. + NegativeCacheHit bool } type RootQueryCacheKeyTemplate struct { diff --git a/v2/pkg/engine/resolve/circuit_breaker.go b/v2/pkg/engine/resolve/circuit_breaker.go new file mode 100644 index 0000000000..bffa49cfb8 --- /dev/null +++ b/v2/pkg/engine/resolve/circuit_breaker.go @@ -0,0 +1,170 @@ +package resolve + +import ( + "context" + "sync/atomic" + "time" +) + +// CircuitBreakerConfig configures the L2 cache circuit breaker for a named cache instance. +// When the circuit is open, all L2 operations (Get/Set/Delete) are skipped and the engine +// falls back to subgraph fetches. This prevents cascading latency when the cache backend +// (e.g., Redis) is slow or unavailable. +type CircuitBreakerConfig struct { + // Enabled activates the circuit breaker for this cache instance. + Enabled bool + + // FailureThreshold is the number of consecutive failures that trips the breaker. + // Default: 5 + FailureThreshold int + + // CooldownPeriod is how long the breaker stays open before allowing a probe request. + // Default: 10s + CooldownPeriod time.Duration +} + +// circuitBreakerState tracks the state of one circuit breaker instance. +// All fields use atomic operations for goroutine safety (L2 operations run in Phase 2 goroutines). +// +// States: +// - Closed: openedAt == 0. All operations pass through. +// - Open: openedAt != 0 && now < openedAt + cooldown. All operations are skipped. +// - Half-Open: openedAt != 0 && now >= openedAt + cooldown. One probe request allowed. +type circuitBreakerState struct { + consecutiveFailures atomic.Int64 + openedAt atomic.Int64 // unix nano timestamp, 0 = closed + probeInFlight atomic.Bool + config CircuitBreakerConfig +} + +func newCircuitBreakerState(config CircuitBreakerConfig) *circuitBreakerState { + return &circuitBreakerState{config: config} +} + +// shouldAllow returns true if the operation should proceed. +// In half-open state, uses CAS to allow exactly one probe without clearing the +// open state — openedAt and consecutiveFailures are only reset on probe success. +func (cb *circuitBreakerState) shouldAllow() bool { + openedAt := cb.openedAt.Load() + if openedAt == 0 { + return true // closed + } + + elapsed := time.Since(time.Unix(0, openedAt)) + if elapsed < cb.config.CooldownPeriod { + return false // open, cooldown not elapsed + } + + // Half-open: allow exactly one probe, but don't mark the breaker closed + // until that probe succeeds. + return cb.probeInFlight.CompareAndSwap(false, true) +} + +// recordSuccess resets the breaker to closed state. +func (cb *circuitBreakerState) recordSuccess() { + cb.consecutiveFailures.Store(0) + cb.openedAt.Store(0) + cb.probeInFlight.Store(false) +} + +// recordFailure increments the failure counter and trips the breaker if threshold is reached. +func (cb *circuitBreakerState) recordFailure() { + if cb.probeInFlight.Swap(false) { + // Half-open probe failed — reopen immediately. + cb.openedAt.Store(time.Now().UnixNano()) + return + } + failures := cb.consecutiveFailures.Add(1) + if failures >= int64(cb.config.FailureThreshold) { + cb.openedAt.Store(time.Now().UnixNano()) + } +} + +// isOpen returns true if the breaker is currently open (not allowing operations). +func (cb *circuitBreakerState) isOpen() bool { + openedAt := cb.openedAt.Load() + if openedAt == 0 { + return false + } + elapsed := time.Since(time.Unix(0, openedAt)) + return elapsed < cb.config.CooldownPeriod +} + +// circuitBreakerCache wraps a LoaderCache with circuit breaker protection. +// When the breaker is open: +// - Get returns (nil, nil) — treated as all cache misses by existing code +// - Set returns nil — same as current non-fatal error handling +// - Delete returns nil — same as current non-fatal error handling +type circuitBreakerCache struct { + inner LoaderCache + state *circuitBreakerState +} + +func (c *circuitBreakerCache) Get(ctx context.Context, keys []string) ([]*CacheEntry, error) { + if !c.state.shouldAllow() { + return nil, nil + } + entries, err := c.inner.Get(ctx, keys) + if err != nil { + c.state.recordFailure() + return nil, err + } + c.state.recordSuccess() + return entries, nil +} + +func (c *circuitBreakerCache) Set(ctx context.Context, entries []*CacheEntry, ttl time.Duration) error { + if !c.state.shouldAllow() { + return nil + } + err := c.inner.Set(ctx, entries, ttl) + if err != nil { + c.state.recordFailure() + return err + } + c.state.recordSuccess() + return nil +} + +func (c *circuitBreakerCache) Delete(ctx context.Context, keys []string) error { + if !c.state.shouldAllow() { + return nil + } + err := c.inner.Delete(ctx, keys) + if err != nil { + c.state.recordFailure() + return err + } + c.state.recordSuccess() + return nil +} + +// wrapCachesWithCircuitBreakers returns a shallow copy of caches with circuit breaker +// wrappers applied where configured. The original map is not mutated. +// Called once during Resolver.New(). +func wrapCachesWithCircuitBreakers(caches map[string]LoaderCache, configs map[string]CircuitBreakerConfig) map[string]LoaderCache { + if caches == nil || configs == nil { + return caches + } + wrapped := make(map[string]LoaderCache, len(caches)) + for name, cache := range caches { + wrapped[name] = cache + } + for name, cbConfig := range configs { + cache, ok := wrapped[name] + if !ok || !cbConfig.Enabled { + continue + } + if cbConfig.FailureThreshold <= 0 { + cbConfig.FailureThreshold = 5 + } + if cbConfig.CooldownPeriod <= 0 { + cbConfig.CooldownPeriod = 10 * time.Second + } + wrapped[name] = &circuitBreakerCache{ + inner: cache, + state: newCircuitBreakerState(cbConfig), + } + } + return wrapped +} diff --git a/v2/pkg/engine/resolve/circuit_breaker_test.go b/v2/pkg/engine/resolve/circuit_breaker_test.go new file mode 100644 index 0000000000..2c40b63448 --- /dev/null +++ b/v2/pkg/engine/resolve/circuit_breaker_test.go @@ -0,0 +1,277 @@ +package resolve + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// failingCache is a test LoaderCache that fails on demand. +// Uses atomic counters for goroutine safety in concurrent tests. +type failingCache struct { + getErr error + setErr error + deleteErr error + getCalls atomic.Int64 + setCalls atomic.Int64 + delCalls atomic.Int64 +} + +func (c *failingCache) Get(_ context.Context, _ []string) ([]*CacheEntry, error) { + c.getCalls.Add(1) + if c.getErr != nil { + return nil, c.getErr + } + return []*CacheEntry{{Key: "k", Value: []byte("v")}}, nil +} + +func (c *failingCache) Set(_ context.Context, _ []*CacheEntry, _ time.Duration) error { + c.setCalls.Add(1) + return c.setErr +} + +func (c *failingCache) Delete(_ context.Context, _ []string) error { + c.delCalls.Add(1) + return c.deleteErr +} + +func TestCircuitBreaker(t *testing.T) { + cacheErr := errors.New("redis: connection refused") + + t.Run("closed - passes through on success", func(t *testing.T) { + inner := &failingCache{} + cb := &circuitBreakerCache{ + inner: inner, + state: newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 3, + CooldownPeriod: time.Second, + }), + } + + ctx := t.Context() + entries, err := cb.Get(ctx, []string{"k1"}) + require.NoError(t, err) + assert.Len(t, entries, 1) + assert.Equal(t, int64(1), inner.getCalls.Load()) + + err = cb.Set(ctx, []*CacheEntry{{Key: "k1"}}, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(1), inner.setCalls.Load()) + + err = cb.Delete(ctx, []string{"k1"}) + require.NoError(t, err) + assert.Equal(t, int64(1), inner.delCalls.Load()) + }) + + t.Run("stays closed below threshold", func(t *testing.T) { + inner := &failingCache{getErr: cacheErr} + cb := &circuitBreakerCache{ + inner: inner, + state: newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 3, + CooldownPeriod: time.Second, + }), + } + + ctx := t.Context() + // Two failures — below threshold of 3 + _, _ = cb.Get(ctx, []string{"k1"}) + _, _ = cb.Get(ctx, []string{"k1"}) + + assert.Equal(t, int64(2), inner.getCalls.Load(), "both calls should pass through") + assert.False(t, cb.state.isOpen(), "breaker should remain closed") + + // Third call still passes through (threshold is reached ON this call) + _, _ = cb.Get(ctx, []string{"k1"}) + assert.Equal(t, int64(3), inner.getCalls.Load(), "threshold call should pass through") + assert.True(t, cb.state.isOpen(), "breaker should be open after reaching threshold") + }) + + t.Run("opens after consecutive failures reach threshold", func(t *testing.T) { + inner := &failingCache{getErr: cacheErr} + cb := &circuitBreakerCache{ + inner: inner, + state: newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 2, + CooldownPeriod: time.Second, + }), + } + + ctx := t.Context() + _, _ = cb.Get(ctx, []string{"k1"}) + _, _ = cb.Get(ctx, []string{"k1"}) + assert.True(t, cb.state.isOpen()) + + // While open, Get returns nil/nil, inner is not called + entries, err := cb.Get(ctx, []string{"k1"}) + assert.NoError(t, err, "open breaker returns nil error") + assert.Nil(t, entries, "open breaker returns nil entries (all-miss)") + assert.Equal(t, int64(2), inner.getCalls.Load(), "inner should not be called when open") + }) + + t.Run("open breaker skips Set and Delete", func(t *testing.T) { + inner := &failingCache{setErr: cacheErr, deleteErr: cacheErr} + state := newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 1, + CooldownPeriod: time.Second, + }) + // Force open + state.openedAt.Store(time.Now().UnixNano()) + + cb := &circuitBreakerCache{inner: inner, state: state} + + ctx := t.Context() + err := cb.Set(ctx, []*CacheEntry{{Key: "k1"}}, time.Minute) + assert.NoError(t, err, "open breaker Set returns nil") + assert.Equal(t, int64(0), inner.setCalls.Load(), "inner Set not called when open") + + err = cb.Delete(ctx, []string{"k1"}) + assert.NoError(t, err, "open breaker Delete returns nil") + assert.Equal(t, int64(0), inner.delCalls.Load(), "inner Delete not called when open") + }) + + t.Run("half-open probe success closes breaker", func(t *testing.T) { + inner := &failingCache{} // no errors — probe succeeds + state := newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 2, + CooldownPeriod: 10 * time.Millisecond, + }) + // Open the breaker in the past so cooldown has elapsed + state.openedAt.Store(time.Now().Add(-50 * time.Millisecond).UnixNano()) + state.consecutiveFailures.Store(2) + + cb := &circuitBreakerCache{inner: inner, state: state} + + ctx := t.Context() + entries, err := cb.Get(ctx, []string{"k1"}) + require.NoError(t, err) + assert.Len(t, entries, 1, "probe should return data") + assert.Equal(t, int64(1), inner.getCalls.Load(), "probe should call inner") + assert.False(t, cb.state.isOpen(), "breaker should be closed after successful probe") + assert.Equal(t, int64(0), cb.state.consecutiveFailures.Load(), "failures should be reset") + }) + + t.Run("half-open probe failure re-opens breaker", func(t *testing.T) { + inner := &failingCache{getErr: cacheErr} + state := newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 1, + CooldownPeriod: 10 * time.Millisecond, + }) + // Open the breaker in the past so cooldown has elapsed + state.openedAt.Store(time.Now().Add(-50 * time.Millisecond).UnixNano()) + + cb := &circuitBreakerCache{inner: inner, state: state} + + ctx := t.Context() + _, err := cb.Get(ctx, []string{"k1"}) + assert.Error(t, err, "probe failure should return error") + assert.Equal(t, int64(1), inner.getCalls.Load(), "probe should call inner") + assert.True(t, cb.state.isOpen(), "breaker should re-open after failed probe") + }) + + t.Run("success resets consecutive failure count", func(t *testing.T) { + inner := &failingCache{} + state := newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 3, + CooldownPeriod: time.Second, + }) + + cb := &circuitBreakerCache{inner: inner, state: state} + + ctx := t.Context() + + // Two failures + inner.getErr = cacheErr + _, _ = cb.Get(ctx, []string{"k1"}) + _, _ = cb.Get(ctx, []string{"k1"}) + assert.Equal(t, int64(2), state.consecutiveFailures.Load()) + + // One success resets count + inner.getErr = nil + _, err := cb.Get(ctx, []string{"k1"}) + require.NoError(t, err) + assert.Equal(t, int64(0), state.consecutiveFailures.Load(), "success should reset failures") + assert.False(t, state.isOpen()) + }) + + t.Run("concurrent access safety", func(t *testing.T) { + inner := &failingCache{getErr: cacheErr} + cb := &circuitBreakerCache{ + inner: inner, + state: newCircuitBreakerState(CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 100, // high threshold so we can count + CooldownPeriod: time.Second, + }), + } + + ctx := t.Context() + var wg sync.WaitGroup + for range 50 { + wg.Go(func() { + _, _ = cb.Get(ctx, []string{"k1"}) + }) + } + wg.Wait() + + // No panics, no data races. Exact failure count may vary due to + // concurrency but should be <= 50. + assert.LessOrEqual(t, cb.state.consecutiveFailures.Load(), int64(50)) + }) + + t.Run("wrapCachesWithCircuitBreakers applies defaults", func(t *testing.T) { + inner := &failingCache{} + caches := map[string]LoaderCache{"default": inner} + configs := map[string]CircuitBreakerConfig{ + "default": {Enabled: true}, // no threshold or cooldown set + } + + result := wrapCachesWithCircuitBreakers(caches, configs) + + wrapped, ok := result["default"].(*circuitBreakerCache) + require.True(t, ok, "cache should be wrapped") + assert.Equal(t, 5, wrapped.state.config.FailureThreshold, "default threshold should be 5") + assert.Equal(t, 10*time.Second, wrapped.state.config.CooldownPeriod, "default cooldown should be 10s") + // Original map should not be mutated + _, originalWrapped := caches["default"].(*circuitBreakerCache) + assert.False(t, originalWrapped, "original map should not be mutated") + }) + + t.Run("wrapCachesWithCircuitBreakers skips disabled", func(t *testing.T) { + inner := &failingCache{} + caches := map[string]LoaderCache{"default": inner} + configs := map[string]CircuitBreakerConfig{ + "default": {Enabled: false}, + } + + result := wrapCachesWithCircuitBreakers(caches, configs) + + _, ok := result["default"].(*circuitBreakerCache) + assert.False(t, ok, "disabled breaker should not wrap the cache") + }) + + t.Run("wrapCachesWithCircuitBreakers ignores missing cache names", func(t *testing.T) { + caches := map[string]LoaderCache{"default": &failingCache{}} + configs := map[string]CircuitBreakerConfig{ + "nonexistent": {Enabled: true}, + } + + result := wrapCachesWithCircuitBreakers(caches, configs) + + _, ok := result["default"].(*circuitBreakerCache) + assert.False(t, ok, "unrelated cache should not be wrapped") + }) +} diff --git a/v2/pkg/engine/resolve/context.go b/v2/pkg/engine/resolve/context.go index 6a355ffa89..8e1722878c 100644 --- a/v2/pkg/engine/resolve/context.go +++ b/v2/pkg/engine/resolve/context.go @@ -199,6 +199,12 @@ type CachingOptions struct { // graphql-go-tools internals. Does not affect L1 cache keys. // Default: nil (no transformation) L2CacheKeyInterceptor L2CacheKeyInterceptor + // GlobalCacheKeyPrefix is prepended to all L2 cache keys (before header hash prefix). + // Use this for schema versioning: set to a schema hash so that schema changes + // automatically separate cache entries without requiring a cache flush. + // Format: "{prefix}:{rest_of_key}". Empty string means no prefix. + // Applied in order: global prefix → header hash prefix → interceptor. + GlobalCacheKeyPrefix string } type FieldValue struct { diff --git a/v2/pkg/engine/resolve/fetch.go b/v2/pkg/engine/resolve/fetch.go index 46d56e6070..3aaa772364 100644 --- a/v2/pkg/engine/resolve/fetch.go +++ b/v2/pkg/engine/resolve/fetch.go @@ -358,6 +358,12 @@ type FetchCacheConfiguration struct { // to the L2 cache. Propagated from MutationFieldCacheConfiguration. // By default, mutations do NOT populate L2. EnableMutationL2CachePopulation bool + + // NegativeCacheTTL is the TTL for caching null entity results (entity not found). + // When > 0, null responses (entity returned null without errors) are cached to avoid + // repeated subgraph lookups for non-existent entities. + // When 0 (default), null entities are not cached. + NegativeCacheTTL time.Duration } // MutationEntityImpactConfig holds information for detecting entity cache changes from mutations. diff --git a/v2/pkg/engine/resolve/l1_cache_test.go b/v2/pkg/engine/resolve/l1_cache_test.go index ad304d0e9a..fc2a1f931c 100644 --- a/v2/pkg/engine/resolve/l1_cache_test.go +++ b/v2/pkg/engine/resolve/l1_cache_test.go @@ -2,6 +2,7 @@ package resolve import ( "context" + "sync" "testing" "time" @@ -1708,7 +1709,7 @@ func TestNormalizeDenormalizeRoundTrip(t *testing.T) { original := mustParseJSON(ar, `{"friends":"value"}`) normalized := loader.normalizeForCache(original, obj) - denormalized := loader.denormalizeFromCache(normalized, obj) + denormalized := loader.denormalizeFromCache(ar, normalized, obj) assert.Equal(t, `{"friends":"value"}`, string(denormalized.MarshalTo(nil))) }) @@ -1732,7 +1733,7 @@ func TestNormalizeDenormalizeRoundTrip(t *testing.T) { original := mustParseJSON(ar, `{"myFriends":"value"}`) normalized := loader.normalizeForCache(original, obj) - denormalized := loader.denormalizeFromCache(normalized, obj) + denormalized := loader.denormalizeFromCache(ar, normalized, obj) assert.Equal(t, `{"myFriends":"value"}`, string(denormalized.MarshalTo(nil))) }) @@ -1762,7 +1763,7 @@ func TestNormalizeDenormalizeRoundTrip(t *testing.T) { original := mustParseJSON(ar, `{"myFriends":{"n":"Alice"}}`) normalized := loader.normalizeForCache(original, obj) - denormalized := loader.denormalizeFromCache(normalized, obj) + denormalized := loader.denormalizeFromCache(ar, normalized, obj) assert.Equal(t, `{"myFriends":{"n":"Alice"}}`, string(denormalized.MarshalTo(nil))) }) @@ -1793,7 +1794,7 @@ func TestNormalizeDenormalizeRoundTrip(t *testing.T) { original := mustParseJSON(ar, `{"myFriends":[{"n":"Alice"},{"n":"Bob"}]}`) normalized := loader.normalizeForCache(original, obj) - denormalized := loader.denormalizeFromCache(normalized, obj) + denormalized := loader.denormalizeFromCache(ar, normalized, obj) assert.Equal(t, `{"myFriends":[{"n":"Alice"},{"n":"Bob"}]}`, string(denormalized.MarshalTo(nil))) }) @@ -1817,7 +1818,7 @@ func TestNormalizeDenormalizeRoundTrip(t *testing.T) { original := mustParseJSON(ar, `{"__typename":"User","myFriends":"value"}`) normalized := loader.normalizeForCache(original, obj) - denormalized := loader.denormalizeFromCache(normalized, obj) + denormalized := loader.denormalizeFromCache(ar, normalized, obj) // After round-trip, __typename should be preserved and field alias restored result := denormalized @@ -1847,7 +1848,7 @@ func TestNormalizeDenormalizeRoundTrip(t *testing.T) { original := mustParseJSON(ar, `{"friends":"Alice","id":"1"}`) normalized := loader.normalizeForCache(original, obj) - denormalized := loader.denormalizeFromCache(normalized, obj) + denormalized := loader.denormalizeFromCache(ar, normalized, obj) assert.Equal(t, `"Alice"`, string(denormalized.Get("friends").MarshalTo(nil))) assert.Equal(t, `"1"`, string(denormalized.Get("id").MarshalTo(nil))) @@ -1869,7 +1870,7 @@ func TestDenormalizeFromCache(t *testing.T) { } item := mustParseJSON(ar, `{"username":"Alice"}`) - result := loader.denormalizeFromCache(item, obj) + result := loader.denormalizeFromCache(ar, item, obj) assert.Equal(t, item, result, "should return same pointer when no aliases") }) @@ -1889,7 +1890,7 @@ func TestDenormalizeFromCache(t *testing.T) { // Cache stores normalized data with original name "username" item := mustParseJSON(ar, `{"username":"Alice"}`) - result := loader.denormalizeFromCache(item, obj) + result := loader.denormalizeFromCache(ar, item, obj) resultJSON := string(result.MarshalTo(nil)) assert.Equal(t, `{"userName":"Alice"}`, resultJSON, "should convert original name to alias") @@ -1916,7 +1917,7 @@ func TestDenormalizeFromCache(t *testing.T) { cacheJSON := `{"friends` + suffix + `":"value"}` cacheItem := mustParseJSON(ar, cacheJSON) - result := loader.denormalizeFromCache(cacheItem, obj) + result := loader.denormalizeFromCache(ar, cacheItem, obj) resultJSON := string(result.MarshalTo(nil)) assert.Equal(t, `{"friends":"value"}`, resultJSON, "should map suffixed cache key back to query name") }) @@ -1943,7 +1944,7 @@ func TestDenormalizeFromCache(t *testing.T) { cacheJSON := `{"friends` + suffix + `":"value"}` cacheItem := mustParseJSON(ar, cacheJSON) - result := loader.denormalizeFromCache(cacheItem, obj) + result := loader.denormalizeFromCache(ar, cacheItem, obj) resultJSON := string(result.MarshalTo(nil)) assert.Equal(t, `{"myFriends":"value"}`, resultJSON, "should map suffixed original name back to alias") }) @@ -2071,6 +2072,79 @@ func TestComputeHasAliases(t *testing.T) { }) } +// TestPopulateL1CacheForRootFieldEntities_MissingKeyFields verifies that root field +// entity population skips entities that are missing @key fields. +// When the client's query doesn't select the @key fields (e.g., "id"), RenderCacheKeys +// produces a key with empty key object (e.g., {"__typename":"Product","key":{}}). +// These degraded keys would collide for all entities of the same type, so we skip storage. +func TestPopulateL1CacheForRootFieldEntities_MissingKeyFields(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableL1Cache = true + ctx.Variables = astjson.MustParse(`{}`) + + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + // Set response data: entity with __typename but missing @key field "id" + resolvable.data, err = astjson.ParseBytesWithArena(ar, []byte(`{"topProducts":[{"__typename":"Product","name":"Widget"}]}`)) + require.NoError(t, err) + + l1Cache := &sync.Map{} + + l := &Loader{ + jsonArena: ar, + ctx: ctx, + resolvable: resolvable, + l1Cache: l1Cache, + } + + // Template expects @key field "id" which is NOT in the entity data. + // Path points to where entities live in the response. + entityTemplate := &EntityQueryCacheKeyTemplate{ + Keys: NewResolvableObjectVariable(&Object{ + Path: []string{"topProducts"}, + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + }), + } + + fetchItem := &FetchItem{ + Fetch: &SingleFetch{ + FetchConfiguration: FetchConfiguration{ + Caching: FetchCacheConfiguration{ + Enabled: true, + UseL1Cache: true, + RootFieldL1EntityCacheKeyTemplates: map[string]CacheKeyTemplate{ + "Product": entityTemplate, + }, + }, + }, + Info: &FetchInfo{ + RootFields: []GraphCoordinate{ + {TypeName: "Query", FieldName: "topProducts"}, + }, + }, + }, + } + + l.populateL1CacheForRootFieldEntities(fetchItem) + + // Entity should NOT be stored because key fields are missing. + // A degraded key like {"__typename":"Product","key":{}} would collide for all + // Product entities, so populateL1CacheForRootFieldEntities skips storage. + degradedKey := `{"__typename":"Product","key":{}}` + _, loaded := l1Cache.Load(degradedKey) + assert.False(t, loaded, "entity with missing @key fields should not be stored in L1 cache") + + // A proper entity cache key won't find anything either + _, loaded = l1Cache.Load(`{"__typename":"Product","key":{"id":"123"}}`) + assert.False(t, loaded, "proper entity key should not find the entity with missing @key fields") +} + func mustParseJSON(a arena.Arena, jsonStr string) *astjson.Value { v, err := astjson.ParseBytesWithArena(a, []byte(jsonStr)) if err != nil { diff --git a/v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go b/v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go index e563481177..e610ea3573 100644 --- a/v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go +++ b/v2/pkg/engine/resolve/l1_l2_cache_e2e_test.go @@ -376,7 +376,7 @@ func TestL1L2CacheEndToEnd(t *testing.T) { // _entities(Product) — L2 miss, product not yet cached {Operation: "get", Keys: productKey, Hits: []bool{false}}, // _entities(Product) — store fetched product data in L2 - {Operation: "set", Keys: productKey}, + {Operation: "set", Keys: productKey, TTL: time.Minute}, } assert.Equal(t, wantFirstLog, log, "First request: L2 miss then set") @@ -596,7 +596,7 @@ func TestL1L2CacheEndToEnd(t *testing.T) { // 1st _entities(Product) — L1 miss, L2 miss {Operation: "get", Keys: productKey, Hits: []bool{false}}, // 1st _entities(Product) — store fetched data in L2 (L1 also populated in-memory) - {Operation: "set", Keys: productKey}, + {Operation: "set", Keys: productKey, TTL: time.Minute}, // 2nd _entities(Product) — no L2 operations: L1 hit short-circuits } assert.Equal(t, wantLog, log, "L1 hit should prevent second L2 lookup") diff --git a/v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go b/v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go index 0b65246470..40d34c381d 100644 --- a/v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go +++ b/v2/pkg/engine/resolve/l2_cache_key_interceptor_test.go @@ -501,6 +501,196 @@ func TestL2CacheKeyInterceptor(t *testing.T) { }, capturedInfos[0]) }) + t.Run("global prefix is prepended to L2 keys", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cache := NewFakeLoaderCache() + + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil + }).Times(1) + + entityDS := NewMockDataSource(ctrl) + entityDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Product One"}]}}`), nil + }).Times(1) + + response := &GraphQLResponse{ + Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery}, + Fetches: Sequence( + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), SegmentType: StaticSegmentType}, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: entityDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: newProductCacheKeyTemplate(), + UseL1Cache: true, + }, + }, + InputTemplate: InputTemplate{Segments: newEntityFetchSegments()}, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: newProductProvidesData(), + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: newProductResponseData(), + } + + loader := &Loader{ + caches: map[string]LoaderCache{"default": cache}, + } + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL2Cache = true + ctx.ExecutionOptions.Caching.GlobalCacheKeyPrefix = "schema-v42" + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, response, resolvable) + require.NoError(t, err) + + cacheLog := cache.GetLog() + var setKeys []string + for _, entry := range cacheLog { + if entry.Operation == "set" { + setKeys = append(setKeys, entry.Keys...) + } + } + require.Equal(t, 1, len(setKeys)) + assert.Equal(t, `schema-v42:{"__typename":"Product","key":{"id":"prod-1"}}`, setKeys[0], + "L2 key should have global prefix prepended") + }) + + t.Run("global prefix combined with interceptor", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cache := NewFakeLoaderCache() + + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil + }).Times(1) + + entityDS := NewMockDataSource(ctrl) + entityDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Product One"}]}}`), nil + }).Times(1) + + response := &GraphQLResponse{ + Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery}, + Fetches: Sequence( + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + {Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), SegmentType: StaticSegmentType}, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: entityDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: newProductCacheKeyTemplate(), + UseL1Cache: true, + }, + }, + InputTemplate: InputTemplate{Segments: newEntityFetchSegments()}, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: newProductProvidesData(), + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: newProductResponseData(), + } + + loader := &Loader{ + caches: map[string]LoaderCache{"default": cache}, + } + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL2Cache = true + ctx.ExecutionOptions.Caching.GlobalCacheKeyPrefix = "schema-v42" + ctx.ExecutionOptions.Caching.L2CacheKeyInterceptor = func(_ context.Context, key string, _ L2CacheKeyInterceptorInfo) string { + return "tenant-abc:" + key + } + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, response, resolvable) + require.NoError(t, err) + + cacheLog := cache.GetLog() + var setKeys []string + for _, entry := range cacheLog { + if entry.Operation == "set" { + setKeys = append(setKeys, entry.Keys...) + } + } + require.Equal(t, 1, len(setKeys)) + // Order: interceptor wraps (global_prefix:entity_key) + assert.Equal(t, `tenant-abc:schema-v42:{"__typename":"Product","key":{"id":"prod-1"}}`, setKeys[0], + "L2 key should have global prefix then interceptor applied") + }) + t.Run("nil interceptor has no effect", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/v2/pkg/engine/resolve/loader.go b/v2/pkg/engine/resolve/loader.go index fbc592173d..94ee03626b 100644 --- a/v2/pkg/engine/resolve/loader.go +++ b/v2/pkg/engine/resolve/loader.go @@ -162,6 +162,9 @@ type result struct { // l2ErrorEvents accumulates error events in goroutines, merged on main thread. l2ErrorEvents []SubgraphErrorEvent + // l2CacheOpErrors accumulates cache operation errors in goroutines, merged on main thread. + l2CacheOpErrors []CacheOperationError + // analyticsEntityType caches the entity type name for analytics recording. // Set during prepareCacheKeys, used by L2 write recording. analyticsEntityType string @@ -175,6 +178,10 @@ type result struct { // After fresh data arrives, these are compared to detect staleness. // Key is the index into l1CacheKeys (entity fetches) or l2CacheKeys (root fetches). shadowCachedValues map[int]shadowCacheEntry + + // goroutineArena is the per-goroutine arena for L2 cache allocations during Phase 2. + // Acquired from l2ArenaPool before the goroutine starts, released in Loader.Free(). + goroutineArena arena.Arena } // shadowCacheEntry holds a cached value saved during shadow mode L2 lookup. @@ -251,6 +258,9 @@ type Loader struct { // Not thread safe — only use from the main goroutine. // Don't Reset or Release; the Resolver handles this. // + // Phase 2 goroutines use per-goroutine arenas (see goroutineArenas) + // instead of jsonArena to avoid data races. + // // IMPORTANT: All astjson *Value nodes returned by ParseWithArena, // ParseBytesWithArena, StringValue, etc. live on this arena. // Never store heap-allocated *Value into an arena-owned container — @@ -258,6 +268,12 @@ type Loader struct { // a heap *Value could be collected while still referenced. jsonArena arena.Arena + // goroutineArenas collects per-goroutine arenas acquired during Phase 2 + // parallel execution. Released together with jsonArena in Free(), because + // MergeValues creates cross-arena references from the response tree into + // these arenas. + goroutineArenas []arena.Arena + // singleFlight is the SubgraphRequestSingleFlight object shared across all client requests. // It's thread safe and can be used to de-duplicate subgraph requests. singleFlight *SubgraphRequestSingleFlight @@ -284,6 +300,12 @@ func (l *Loader) Free() { l.l1Cache = nil l.jsonArena = nil l.enableMutationL2CachePopulation = false + for i, a := range l.goroutineArenas { + a.Reset() + l2ArenaPool.Put(a) + l.goroutineArenas[i] = nil + } + l.goroutineArenas = l.goroutineArenas[:0] } func (l *Loader) LoadGraphQLResponseData(ctx *Context, response *GraphQLResponse, resolvable *Resolvable) (err error) { @@ -384,6 +406,13 @@ func (l *Loader) resolveParallel(nodes []*FetchTreeNode) error { continue } + // Acquire a per-goroutine arena for L2 cache allocations. + // Released in Loader.Free(), not here, because MergeValues + // creates cross-arena references from the response tree. + goroutineArena := l2ArenaPool.Get().(arena.Arena) + l.goroutineArenas = append(l.goroutineArenas, goroutineArena) + res.goroutineArena = goroutineArena + g.Go(func() error { return l.loadFetchL2Only(ctx, f, item, items, res) }) @@ -408,6 +437,9 @@ func (l *Loader) resolveParallel(nodes []*FetchTreeNode) error { if len(results[i].l2ErrorEvents) > 0 { l.ctx.cacheAnalytics.MergeL2Errors(results[i].l2ErrorEvents) } + if len(results[i].l2CacheOpErrors) > 0 { + l.ctx.cacheAnalytics.MergeL2CacheOpErrors(results[i].l2CacheOpErrors) + } } } @@ -521,6 +553,9 @@ func (l *Loader) mergeResultAnalytics(res *result) { if len(res.l2ErrorEvents) > 0 { l.ctx.cacheAnalytics.MergeL2Errors(res.l2ErrorEvents) } + if len(res.l2CacheOpErrors) > 0 { + l.ctx.cacheAnalytics.MergeL2CacheOpErrors(res.l2CacheOpErrors) + } } func (l *Loader) callOnFinished(res *result) { @@ -699,6 +734,14 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson if res.cacheSkipFetch { // Merge cached data into items for _, key := range res.l1CacheKeys { + if key.FromCache == nil { + continue + } + // Negative cache hit: subgraph has nothing for this entity, skip merge. + // MergeValues(object, null) would discard the null anyway (astjson behavior). + if key.FromCache.Type() == astjson.TypeNull { + continue + } // Merge cached data into item _, _, err := astjson.MergeValues(l.jsonArena, key.Item, key.FromCache) if err != nil { @@ -712,6 +755,10 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson if res.partialCacheEnabled && len(res.cachedItemIndices) > 0 { for _, idx := range res.cachedItemIndices { if idx < len(res.l1CacheKeys) && res.l1CacheKeys[idx] != nil && res.l1CacheKeys[idx].FromCache != nil { + // Negative cache hit: skip merge (subgraph has nothing for this entity) + if res.l1CacheKeys[idx].FromCache.Type() == astjson.TypeNull { + continue + } _, _, err := astjson.MergeValues(l.jsonArena, res.l1CacheKeys[idx].Item, res.l1CacheKeys[idx].FromCache) if err != nil { return l.renderErrorsFailedToFetch(fetchItem, res, "invalid cache item") @@ -778,7 +825,10 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson } // Check if data needs processing. - if res.postProcessing.SelectResponseDataPath != nil && astjson.ValueIsNull(responseData) { + // When negative caching is enabled, null responseData is valid (entity not found) + // and should flow through to the merge path where NegativeCacheHit gets set. + negativeCachingNull := res.cacheConfig.NegativeCacheTTL > 0 && len(items) > 0 && responseData != nil && responseData.Type() == astjson.TypeNull + if res.postProcessing.SelectResponseDataPath != nil && astjson.ValueIsNull(responseData) && !negativeCachingNull { // When: // - No errors or data are present // - Status code is not within the 2XX range @@ -834,6 +884,10 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson } if len(res.l2CacheKeys) > 0 && res.l2CacheKeys[0] != nil { res.l2CacheKeys[0].Item = items[0] + // Negative caching: detect when subgraph returned null for this entity + if responseData != nil && responseData.Type() == astjson.TypeNull && res.cacheConfig.NegativeCacheTTL > 0 { + res.l2CacheKeys[0].NegativeCacheHit = true + } } // Always run invalidation, even on partial-error responses. l.runCacheInvalidation(fetchItem, res, responseData, cacheInvalidation) @@ -925,6 +979,10 @@ func (l *Loader) mergeResult(fetchItem *FetchItem, res *result, items []*astjson } if i < len(res.l2CacheKeys) && res.l2CacheKeys[i] != nil { res.l2CacheKeys[i].Item = items[i] + // Negative caching: detect when subgraph returned null for this entity in the batch + if batch[i] != nil && batch[i].Type() == astjson.TypeNull && res.cacheConfig.NegativeCacheTTL > 0 { + res.l2CacheKeys[i].NegativeCacheHit = true + } } } @@ -1819,6 +1877,15 @@ func (p *_batchEntityToolPool) Put(item *batchEntityTools) { var ( batchEntityToolPool = _batchEntityToolPool{} + + // l2ArenaPool provides per-goroutine arenas for Phase 2 L2 cache allocations. + // Goroutine arenas are released in Loader.Free() (not inside the goroutine), + // because MergeValues creates cross-arena references into these arenas. + l2ArenaPool = sync.Pool{ + New: func() any { + return arena.NewMonotonicArena(arena.WithMinBufferSize(4096)) + }, + } ) func (l *Loader) loadBatchEntityFetch(ctx context.Context, fetchItem *FetchItem, fetch *BatchEntityFetch, items []*astjson.Value, res *result) error { diff --git a/v2/pkg/engine/resolve/loader_cache.go b/v2/pkg/engine/resolve/loader_cache.go index 1bef04305a..707bdc820c 100644 --- a/v2/pkg/engine/resolve/loader_cache.go +++ b/v2/pkg/engine/resolve/loader_cache.go @@ -5,6 +5,7 @@ import ( "context" "slices" "strconv" + "strings" "time" "github.com/pkg/errors" @@ -94,7 +95,7 @@ func (l *Loader) cacheKeysToEntries(a arena.Arena, cacheKeys []*CacheKey) ([]*Ca seen := make(map[string]struct{}, len(cacheKeys)) for i := range cacheKeys { for j := range cacheKeys[i].Keys { - if cacheKeys[i].Item == nil { + if cacheKeys[i].Item == nil || cacheKeys[i].NegativeCacheHit { continue } keyStr := cacheKeys[i].Keys[j] @@ -122,6 +123,29 @@ func (l *Loader) cacheKeysToEntries(a arena.Arena, cacheKeys []*CacheKey) ([]*Ca return out, nil } +// cacheKeysToNegativeEntries collects L2 cache entries for null entity responses (negative caching). +// Only entries flagged with NegativeCacheHit are included. The stored value is the JSON literal "null". +func (l *Loader) cacheKeysToNegativeEntries(cacheKeys []*CacheKey) []*CacheEntry { + var out []*CacheEntry + seen := make(map[string]struct{}) + for i := range cacheKeys { + if !cacheKeys[i].NegativeCacheHit { + continue + } + for _, keyStr := range cacheKeys[i].Keys { + if _, ok := seen[keyStr]; ok { + continue + } + seen[keyStr] = struct{}{} + out = append(out, &CacheEntry{ + Key: keyStr, + Value: []byte("null"), + }) + } + } + return out +} + // prepareCacheKeys generates cache keys for L1 and/or L2 based on configuration. // Called on main thread before any cache lookups. // Sets res.l1CacheKeys for L1 lookup (no prefix) and res.l2CacheKeys for L2 lookup (with prefix). @@ -163,14 +187,21 @@ func (l *Loader) prepareCacheKeys(info *FetchInfo, cfg FetchCacheConfiguration, res.cache = l.caches[cfg.CacheName] } if res.cache != nil { - // Calculate prefix for L2 (subgraph header isolation) + // Calculate prefix for L2 (global prefix + subgraph header isolation) var prefix string + globalPrefix := l.ctx.ExecutionOptions.Caching.GlobalCacheKeyPrefix if cfg.IncludeSubgraphHeaderPrefix && l.ctx.SubgraphHeadersBuilder != nil { _, headersHash := l.ctx.SubgraphHeadersBuilder.HeadersForSubgraph(info.DataSourceName) var buf [20]byte b := strconv.AppendUint(buf[:0], headersHash, 10) - prefix = string(b) + if globalPrefix != "" { + prefix = globalPrefix + ":" + string(b) + } else { + prefix = string(b) + } res.headerHash = headersHash + } else if globalPrefix != "" { + prefix = globalPrefix } // Render L2 cache keys with prefix @@ -407,7 +438,7 @@ func (l *Loader) tryL2CacheLoad(ctx context.Context, info *FetchInfo, res *resul return false, nil } - cacheKeyStrings := l.extractCacheKeysStrings(l.jsonArena, res.l2CacheKeys) + cacheKeyStrings := l.extractCacheKeysStrings(res.goroutineArena, res.l2CacheKeys) if len(cacheKeyStrings) == 0 { res.cacheMustBeUpdated = true return false, nil @@ -446,12 +477,22 @@ func (l *Loader) tryL2CacheLoad(ctx context.Context, info *FetchInfo, res *resul } if err != nil { // L2 cache errors are non-fatal, continue to fetch + if analyticsEnabled { + res.l2CacheOpErrors = append(res.l2CacheOpErrors, CacheOperationError{ + Operation: "get", + CacheName: res.cacheConfig.CacheName, + EntityType: entityType, + DataSource: dataSource, + Message: truncateErrorMessage(err.Error(), 256), + ItemCount: len(cacheKeyStrings), + }) + } res.cacheMustBeUpdated = true return false, nil } // Populate FromCache fields in L2 CacheKeys (which have prefixed keys) - err = l.populateFromCache(l.jsonArena, res.l2CacheKeys, cacheEntries) + err = l.populateFromCache(res.goroutineArena, res.l2CacheKeys, cacheEntries) if err != nil { res.cacheMustBeUpdated = true return false, nil @@ -464,8 +505,8 @@ func (l *Loader) tryL2CacheLoad(ctx context.Context, info *FetchInfo, res *resul if len(ck.EntityMergePath) > 0 && ck.FromCache != nil { wrapped := ck.FromCache for i := len(ck.EntityMergePath) - 1; i >= 0; i-- { - obj := astjson.ObjectValue(l.jsonArena) - obj.Set(l.jsonArena, ck.EntityMergePath[i], wrapped) + obj := astjson.ObjectValue(res.goroutineArena) + obj.Set(res.goroutineArena, ck.EntityMergePath[i], wrapped) wrapped = obj } ck.FromCache = wrapped @@ -496,10 +537,24 @@ func (l *Loader) tryL2CacheLoad(ctx context.Context, info *FetchInfo, res *resul res.l1CacheKeys[i].FromCache = res.l2CacheKeys[i].FromCache // Track per-entity L2 hit/miss (atomic operations - thread-safe) if res.l1CacheKeys[i].FromCache != nil { - if info != nil && info.ProvidesData != nil && l.validateItemHasRequiredData(res.l1CacheKeys[i].FromCache, info.ProvidesData) { + // Negative cache hit: L2 stored a null sentinel for this entity. + // The subgraph previously returned null (without errors), meaning it has + // nothing for this entity. Treat as a cache hit to avoid re-fetching. + if res.l1CacheKeys[i].FromCache.Type() == astjson.TypeNull && res.cacheConfig.NegativeCacheTTL > 0 { + if analyticsEnabled && len(res.l1CacheKeys[i].Keys) > 0 { + res.l2AnalyticsEvents = append(res.l2AnalyticsEvents, CacheKeyEvent{ + CacheKey: res.l1CacheKeys[i].Keys[0], EntityType: entityType, + Kind: CacheKeyHit, DataSource: dataSource, ByteSize: 4, // "null" + Shadow: shadowMode, + }) + } + if res.partialCacheEnabled { + res.cachedItemIndices = append(res.cachedItemIndices, i) + } + } else if info != nil && info.ProvidesData != nil && l.validateItemHasRequiredData(res.l1CacheKeys[i].FromCache, info.ProvidesData) { // Denormalize from original field names to current query aliases for merging if hasAliases { - res.l1CacheKeys[i].FromCache = l.denormalizeFromCache(res.l1CacheKeys[i].FromCache, info.ProvidesData) + res.l1CacheKeys[i].FromCache = l.denormalizeFromCache(res.goroutineArena, res.l1CacheKeys[i].FromCache, info.ProvidesData) } if analyticsEnabled && len(res.l1CacheKeys[i].Keys) > 0 { byteSize := len(res.l1CacheKeys[i].FromCache.MarshalTo(nil)) @@ -572,7 +627,7 @@ func (l *Loader) tryL2CacheLoad(ctx context.Context, info *FetchInfo, res *resul if info != nil && info.ProvidesData != nil && l.validateItemHasRequiredData(ck.FromCache, info.ProvidesData) { // Denormalize from original field names to current query aliases for merging if hasAliases { - res.l2CacheKeys[i].FromCache = l.denormalizeFromCache(ck.FromCache, info.ProvidesData) + res.l2CacheKeys[i].FromCache = l.denormalizeFromCache(res.goroutineArena, ck.FromCache, info.ProvidesData) } if analyticsEnabled && len(ck.Keys) > 0 { byteSize := len(res.l2CacheKeys[i].FromCache.MarshalTo(nil)) @@ -784,13 +839,18 @@ func (l *Loader) populateL1CacheForRootFieldEntities(fetchItem *FetchItem) { continue } - // Store in L1 cache + // Store in L1 cache, skipping degraded keys with empty key objects for _, ck := range cacheKeys { if ck == nil { continue } for _, keyStr := range ck.Keys { - // Use the entity directly as the cache value + // Skip keys with empty key objects — these occur when @key fields are missing + // from the query selection. Such keys would collide for all entities of the + // same type, causing incorrect cache sharing. + if strings.Contains(keyStr, `"key":{}`) { + continue + } l.l1Cache.LoadOrStore(keyStr, entity) } } @@ -879,22 +939,61 @@ func (l *Loader) updateL2Cache(res *result) { return } - if len(cacheEntries) == 0 { - return - } - // Enrich context with fetch identity when debug mode is enabled ctx := l.ctx.ctx if l.ctx.Debug { ctx = WithCacheFetchInfo(ctx, res.fetchInfo, res.cacheConfig) } - // Cache set errors are non-fatal - silently ignore - _ = res.cache.Set(ctx, cacheEntries, res.cacheConfig.TTL) + // Track successfully written entries for analytics + var writtenEntries []*CacheEntry + + // Store regular (non-null) cache entries + if len(cacheEntries) > 0 { + if setErr := res.cache.Set(ctx, cacheEntries, res.cacheConfig.TTL); setErr != nil { + if l.ctx.cacheAnalyticsEnabled() { + l.ctx.cacheAnalytics.RecordCacheOperationError(CacheOperationError{ + Operation: "set", + CacheName: res.cacheConfig.CacheName, + EntityType: res.analyticsEntityType, + DataSource: res.ds.Name, + Message: truncateErrorMessage(setErr.Error(), 256), + ItemCount: len(cacheEntries), + }) + } + } else { + writtenEntries = append(writtenEntries, cacheEntries...) + } + } + + // Negative caching: store null sentinels with separate TTL for entities the subgraph returned null for + if res.cacheConfig.NegativeCacheTTL > 0 { + negEntries := l.cacheKeysToNegativeEntries(keysToStore) + if len(negEntries) > 0 { + if setErr := res.cache.Set(ctx, negEntries, res.cacheConfig.NegativeCacheTTL); setErr != nil { + if l.ctx.cacheAnalyticsEnabled() { + l.ctx.cacheAnalytics.RecordCacheOperationError(CacheOperationError{ + Operation: "set_negative", + CacheName: res.cacheConfig.CacheName, + EntityType: res.analyticsEntityType, + DataSource: res.ds.Name, + Message: truncateErrorMessage(setErr.Error(), 256), + ItemCount: len(negEntries), + }) + } + } else { + writtenEntries = append(writtenEntries, negEntries...) + } + } + } + + if len(writtenEntries) == 0 { + return + } // Record L2 write events for analytics if l.ctx.cacheAnalyticsEnabled() { - for _, entry := range cacheEntries { + for _, entry := range writtenEntries { if entry == nil { continue } @@ -1039,6 +1138,7 @@ func (l *Loader) compareShadowValues(res *result, info *FetchInfo) { // detectMutationEntityImpact checks if a mutation response contains a cached entity // and either invalidates (deletes) the L2 cache entry or compares it for staleness analytics. // Called from mergeResult on the main thread after the mutation fetch completes. +// Handles both single-entity (object) and list (array) mutation responses. func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, responseData *astjson.Value) map[string]struct{} { if info == nil || info.OperationType != ast.OperationTypeMutation { return nil @@ -1068,8 +1168,9 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon // Extract entity data from mutation response // For root mutation: responseData = {"updateUsername": {"id":"1234","username":"UpdatedMe"}} + // or for list mutations: responseData = {"deleteUsers": [{"id":"1"},{"id":"2"}]} entityData := responseData.Get(mutationFieldName) - if entityData == nil || entityData.Type() != astjson.TypeObject { + if entityData == nil { return nil } @@ -1081,6 +1182,40 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon return nil } + switch entityData.Type() { + case astjson.TypeObject: + return l.detectSingleMutationEntityImpact(cache, cfg, info, entityData, entityProvidesData, mutationFieldName) + case astjson.TypeArray: + items, _ := entityData.Array() + var deletedKeys map[string]struct{} + for _, item := range items { + if item == nil || item.Type() != astjson.TypeObject { + continue + } + itemDeleted := l.detectSingleMutationEntityImpact(cache, cfg, info, item, entityProvidesData, mutationFieldName) + for k, v := range itemDeleted { + if deletedKeys == nil { + deletedKeys = make(map[string]struct{}) + } + deletedKeys[k] = v + } + } + return deletedKeys + default: + return nil + } +} + +// detectSingleMutationEntityImpact handles invalidation and analytics for a single entity +// returned by a mutation. Called by detectMutationEntityImpact for each entity. +func (l *Loader) detectSingleMutationEntityImpact( + cache LoaderCache, + cfg *MutationEntityImpactConfig, + info *FetchInfo, + entityData *astjson.Value, + entityProvidesData *Object, + mutationFieldName string, +) map[string]struct{} { // Build L2 cache key for lookup cacheKey := l.buildMutationEntityCacheKey(cfg, entityData, info) if cacheKey == "" { @@ -1096,8 +1231,19 @@ func (l *Loader) detectMutationEntityImpact(res *result, info *FetchInfo, respon // Invalidate L2 cache entry if configured var deletedKeys map[string]struct{} if cfg.InvalidateCache { - _ = cache.Delete(l.ctx.ctx, []string{cacheKey}) - deletedKeys = map[string]struct{}{cacheKey: {}} + if delErr := cache.Delete(l.ctx.ctx, []string{cacheKey}); delErr != nil { + if l.ctx.cacheAnalyticsEnabled() { + l.ctx.cacheAnalytics.RecordCacheOperationError(CacheOperationError{ + Operation: "delete", + CacheName: cfg.CacheName, + EntityType: cfg.EntityTypeName, + Message: truncateErrorMessage(delErr.Error(), 256), + ItemCount: 1, + }) + } + } else { + deletedKeys = map[string]struct{}{cacheKey: {}} + } } // Analytics comparison requires cacheAnalytics to be enabled @@ -1168,12 +1314,19 @@ func (l *Loader) buildMutationEntityCacheKey(cfg *MutationEntityImpactConfig, en keyObj.Set(l.jsonArena, "key", keysObj) keyJSON := string(keyObj.MarshalTo(nil)) - // Add prefix if needed + // Apply global prefix and subgraph header prefix to mirror prepareCacheKeys(). var cacheKey string + globalPrefix := l.ctx.ExecutionOptions.Caching.GlobalCacheKeyPrefix if cfg.IncludeSubgraphHeaderPrefix && l.ctx.SubgraphHeadersBuilder != nil { _, headersHash := l.ctx.SubgraphHeadersBuilder.HeadersForSubgraph(info.DataSourceName) prefix := strconv.FormatUint(headersHash, 10) - cacheKey = prefix + ":" + keyJSON + if globalPrefix != "" { + cacheKey = globalPrefix + ":" + prefix + ":" + keyJSON + } else { + cacheKey = prefix + ":" + keyJSON + } + } else if globalPrefix != "" { + cacheKey = globalPrefix + ":" + keyJSON } else { cacheKey = keyJSON } @@ -1312,15 +1465,20 @@ func (l *Loader) processExtensionsCacheInvalidation(res *result, cacheInvalidati baseKey := string(keyObj.MarshalTo(nil)) cacheKey := baseKey - // Apply subgraph header prefix if configured for this entity type. - // This mirrors prepareCacheKeys() which prefixes L2 keys with a hash of the - // HTTP headers sent to the subgraph, enabling per-tenant cache isolation. - // Result: "55555:{"__typename":"User","key":{"id":"1"}}" + // Apply global prefix and subgraph header prefix to mirror prepareCacheKeys(). + // Order: global prefix → header hash prefix → interceptor. + globalPrefix := l.ctx.ExecutionOptions.Caching.GlobalCacheKeyPrefix if entityConfig.IncludeSubgraphHeaderPrefix && l.ctx.SubgraphHeadersBuilder != nil { _, headersHash := l.ctx.SubgraphHeadersBuilder.HeadersForSubgraph(subgraphName) var buf [20]byte b := strconv.AppendUint(buf[:0], headersHash, 10) - cacheKey = string(b) + ":" + cacheKey + if globalPrefix != "" { + cacheKey = globalPrefix + ":" + string(b) + ":" + cacheKey + } else { + cacheKey = string(b) + ":" + cacheKey + } + } else if globalPrefix != "" { + cacheKey = globalPrefix + ":" + cacheKey } // Apply user-provided L2 cache key interceptor if set. @@ -1353,8 +1511,15 @@ func (l *Loader) processExtensionsCacheInvalidation(res *result, cacheInvalidati } // Execute batched L2 cache deletes — one Delete call per cache instance. - for _, batch := range batches { - _ = batch.cache.Delete(l.ctx.ctx, batch.keys) + for cacheName, batch := range batches { + if delErr := batch.cache.Delete(l.ctx.ctx, batch.keys); delErr != nil && l.ctx.cacheAnalyticsEnabled() { + l.ctx.cacheAnalytics.RecordCacheOperationError(CacheOperationError{ + Operation: "delete", + CacheName: cacheName, + Message: truncateErrorMessage(delErr.Error(), 256), + ItemCount: len(batch.keys), + }) + } } } @@ -1572,14 +1737,14 @@ func (l *Loader) normalizeNode(val *astjson.Value, node Node) *astjson.Value { // denormalizeFromCache reverses normalizeForCache: maps suffixed schema field names back // to query aliases. Returns input unchanged if obj.HasAliases is false (fast path). -func (l *Loader) denormalizeFromCache(item *astjson.Value, obj *Object) *astjson.Value { +func (l *Loader) denormalizeFromCache(a arena.Arena, item *astjson.Value, obj *Object) *astjson.Value { if item == nil || obj == nil || !obj.HasAliases { return item } if item.Type() != astjson.TypeObject { return item } - result := astjson.ObjectValue(l.jsonArena) + result := astjson.ObjectValue(a) for _, field := range obj.Fields { lookupName := l.cacheFieldName(field) outputName := unsafebytes.BytesToString(field.Name) @@ -1587,8 +1752,8 @@ func (l *Loader) denormalizeFromCache(item *astjson.Value, obj *Object) *astjson if fieldValue == nil { continue } - denormalizedValue := l.denormalizeNode(fieldValue, field.Value) - result.Set(l.jsonArena, outputName, denormalizedValue) + denormalizedValue := l.denormalizeNode(a, fieldValue, field.Value) + result.Set(a, outputName, denormalizedValue) } // Preserve __typename if present if typenameValue := item.Get("__typename"); typenameValue != nil { @@ -1600,25 +1765,25 @@ func (l *Loader) denormalizeFromCache(item *astjson.Value, obj *Object) *astjson } } if !hasTypenameField { - result.Set(l.jsonArena, "__typename", typenameValue) + result.Set(a, "__typename", typenameValue) } } return result } // denormalizeNode recursively denormalizes nested objects/arrays. -func (l *Loader) denormalizeNode(val *astjson.Value, node Node) *astjson.Value { +func (l *Loader) denormalizeNode(a arena.Arena, val *astjson.Value, node Node) *astjson.Value { if val == nil || node == nil { return val } switch n := node.(type) { case *Object: - return l.denormalizeFromCache(val, n) + return l.denormalizeFromCache(a, val, n) case *Array: if n.Item != nil && val.Type() == astjson.TypeArray { - arr := astjson.ArrayValue(l.jsonArena) + arr := astjson.ArrayValue(a) for i, item := range val.GetArray() { - arr.SetArrayItem(l.jsonArena, i, l.denormalizeNode(item, n.Item)) + arr.SetArrayItem(a, i, l.denormalizeNode(a, item, n.Item)) } return arr } diff --git a/v2/pkg/engine/resolve/mutation_cache_impact_test.go b/v2/pkg/engine/resolve/mutation_cache_impact_test.go new file mode 100644 index 0000000000..85d34f2194 --- /dev/null +++ b/v2/pkg/engine/resolve/mutation_cache_impact_test.go @@ -0,0 +1,725 @@ +package resolve + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/astjson" + "github.com/wundergraph/go-arena" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" +) + +// --------------------------------------------------------------------------- +// navigateProvidesDataToField +// --------------------------------------------------------------------------- + +func TestNavigateProvidesDataToField(t *testing.T) { + t.Run("valid field name returns inner Object", func(t *testing.T) { + inner := &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}}, + {Name: []byte("username"), Value: &Scalar{Path: []string{"username"}}}, + }, + } + provides := &Object{ + Fields: []*Field{ + {Name: []byte("updateUsername"), Value: inner}, + }, + } + + got := navigateProvidesDataToField(provides, "updateUsername") + assert.Equal(t, inner, got) + }) + + t.Run("missing field name returns nil", func(t *testing.T) { + provides := &Object{ + Fields: []*Field{ + {Name: []byte("updateUsername"), Value: &Object{}}, + }, + } + + got := navigateProvidesDataToField(provides, "deleteUser") + assert.Nil(t, got) + }) + + t.Run("nil providesData returns nil", func(t *testing.T) { + got := navigateProvidesDataToField(nil, "anything") + assert.Nil(t, got) + }) + + t.Run("field value is not Object returns nil", func(t *testing.T) { + provides := &Object{ + Fields: []*Field{ + {Name: []byte("scalarField"), Value: &Scalar{Path: []string{"scalarField"}}}, + }, + } + + got := navigateProvidesDataToField(provides, "scalarField") + assert.Nil(t, got) + }) +} + +// --------------------------------------------------------------------------- +// buildEntityKeyValue +// --------------------------------------------------------------------------- + +func TestBuildEntityKeyValue(t *testing.T) { + t.Run("simple key", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + data, err := astjson.ParseWithArena(ar, `{"id":"123","name":"Alice"}`) + require.NoError(t, err) + + keyFields := []KeyField{{Name: "id"}} + result := buildEntityKeyValue(ar, data, keyFields) + got := string(result.MarshalTo(nil)) + + assert.Equal(t, `{"id":"123"}`, got) + }) + + t.Run("composite key", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + data, err := astjson.ParseWithArena(ar, `{"id":"1","orgId":"acme","name":"Bob"}`) + require.NoError(t, err) + + keyFields := []KeyField{{Name: "id"}, {Name: "orgId"}} + result := buildEntityKeyValue(ar, data, keyFields) + got := string(result.MarshalTo(nil)) + + assert.Equal(t, `{"id":"1","orgId":"acme"}`, got) + }) + + t.Run("nested key", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + data, err := astjson.ParseWithArena(ar, `{"key":{"subId":"x"},"name":"Carol"}`) + require.NoError(t, err) + + keyFields := []KeyField{ + {Name: "key", Children: []KeyField{{Name: "subId"}}}, + } + result := buildEntityKeyValue(ar, data, keyFields) + got := string(result.MarshalTo(nil)) + + assert.Equal(t, `{"key":{"subId":"x"}}`, got) + }) + + t.Run("missing field in data omits field from output", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + data, err := astjson.ParseWithArena(ar, `{"name":"Dave"}`) + require.NoError(t, err) + + keyFields := []KeyField{{Name: "id"}} + result := buildEntityKeyValue(ar, data, keyFields) + got := string(result.MarshalTo(nil)) + + // "id" is missing in data, so it is omitted from the result + assert.Equal(t, `{}`, got) + }) +} + +// --------------------------------------------------------------------------- +// buildMutationEntityCacheKey +// --------------------------------------------------------------------------- + +func TestBuildMutationEntityCacheKey(t *testing.T) { + t.Run("basic key without prefix", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + ctx := NewContext(context.Background()) + + l := &Loader{ + jsonArena: ar, + ctx: ctx, + } + + entityData, err := astjson.ParseWithArena(ar, `{"id":"1234","username":"Alice"}`) + require.NoError(t, err) + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + } + info := &FetchInfo{ + DataSourceName: "accounts", + } + + got := l.buildMutationEntityCacheKey(cfg, entityData, info) + assert.Equal(t, `{"__typename":"User","key":{"id":"1234"}}`, got) + }) + + t.Run("with header prefix", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + ctx := NewContext(context.Background()) + ctx.SubgraphHeadersBuilder = &mockSubgraphHeadersBuilder{ + hashes: map[string]uint64{"accounts": 99887766}, + } + + l := &Loader{ + jsonArena: ar, + ctx: ctx, + } + + entityData, err := astjson.ParseWithArena(ar, `{"id":"1234","username":"Alice"}`) + require.NoError(t, err) + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + IncludeSubgraphHeaderPrefix: true, + } + info := &FetchInfo{ + DataSourceName: "accounts", + } + + got := l.buildMutationEntityCacheKey(cfg, entityData, info) + assert.Equal(t, `99887766:{"__typename":"User","key":{"id":"1234"}}`, got) + }) + + t.Run("with interceptor", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.L2CacheKeyInterceptor = func(_ context.Context, key string, info L2CacheKeyInterceptorInfo) string { + return "tenant-42:" + key + } + + l := &Loader{ + jsonArena: ar, + ctx: ctx, + } + + entityData, err := astjson.ParseWithArena(ar, `{"id":"1234"}`) + require.NoError(t, err) + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + } + info := &FetchInfo{ + DataSourceName: "accounts", + } + + got := l.buildMutationEntityCacheKey(cfg, entityData, info) + assert.Equal(t, `tenant-42:{"__typename":"User","key":{"id":"1234"}}`, got) + }) +} + +// --------------------------------------------------------------------------- +// buildMutationEntityDisplayKey +// --------------------------------------------------------------------------- + +func TestBuildMutationEntityDisplayKey(t *testing.T) { + t.Run("display key always without prefix", func(t *testing.T) { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + ctx := NewContext(context.Background()) + // Even with a SubgraphHeadersBuilder, display key has no prefix + ctx.SubgraphHeadersBuilder = &mockSubgraphHeadersBuilder{ + hashes: map[string]uint64{"accounts": 99887766}, + } + + l := &Loader{ + jsonArena: ar, + ctx: ctx, + } + + entityData, err := astjson.ParseWithArena(ar, `{"id":"1234","username":"Alice"}`) + require.NoError(t, err) + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + IncludeSubgraphHeaderPrefix: true, + } + + got := l.buildMutationEntityDisplayKey(cfg, entityData) + assert.Equal(t, `{"__typename":"User","key":{"id":"1234"}}`, got) + }) +} + +// --------------------------------------------------------------------------- +// detectMutationEntityImpact +// --------------------------------------------------------------------------- + +func TestDetectMutationEntityImpact(t *testing.T) { + // Helper: builds a Loader with minimal fields for detectMutationEntityImpact. + makeLoader := func(ctx *Context, cache LoaderCache, cacheName string) *Loader { + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + return &Loader{ + jsonArena: ar, + ctx: ctx, + caches: map[string]LoaderCache{cacheName: cache}, + l1Cache: &sync.Map{}, + } + } + + // Helper: builds a result with MutationEntityImpactConfig. + makeResult := func(cfg *MutationEntityImpactConfig) *result { + return &result{ + cacheConfig: FetchCacheConfiguration{ + MutationEntityImpactConfig: cfg, + }, + } + } + + // Helper: builds FetchInfo for a mutation. + makeMutationInfo := func(rootFieldName string, providesData *Object) *FetchInfo { + return &FetchInfo{ + OperationType: ast.OperationTypeMutation, + DataSourceName: "accounts", + RootFields: []GraphCoordinate{ + {TypeName: "Mutation", FieldName: rootFieldName}, + }, + ProvidesData: providesData, + } + } + + // Common ProvidesData: mutation returns an object with id and username. + entityProvidesData := &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}}, + {Name: []byte("username"), Value: &Scalar{Path: []string{"username"}}}, + }, + } + mutationProvidesData := &Object{ + Fields: []*Field{ + {Name: []byte("updateUsername"), Value: entityProvidesData}, + }, + } + + t.Run("non-mutation operation returns nil", func(t *testing.T) { + ctx := NewContext(context.Background()) + l := makeLoader(ctx, NewFakeLoaderCache(), "default") + + info := &FetchInfo{ + OperationType: ast.OperationTypeQuery, // not a mutation + } + res := makeResult(&MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + }) + responseData := astjson.MustParse(`{"updateUsername":{"id":"1234","username":"NewMe"}}`) + + got := l.detectMutationEntityImpact(res, info, responseData) + assert.Nil(t, got) + }) + + t.Run("nil info returns nil", func(t *testing.T) { + ctx := NewContext(context.Background()) + l := makeLoader(ctx, NewFakeLoaderCache(), "default") + + res := makeResult(&MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + }) + responseData := astjson.MustParse(`{"updateUsername":{"id":"1234","username":"NewMe"}}`) + + got := l.detectMutationEntityImpact(res, nil, responseData) + assert.Nil(t, got) + }) + + t.Run("no MutationEntityImpactConfig returns nil", func(t *testing.T) { + ctx := NewContext(context.Background()) + l := makeLoader(ctx, NewFakeLoaderCache(), "default") + + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(nil) // no config + responseData := astjson.MustParse(`{"updateUsername":{"id":"1234","username":"NewMe"}}`) + + got := l.detectMutationEntityImpact(res, info, responseData) + assert.Nil(t, got) + }) + + t.Run("InvalidateCache true deletes cache entry and returns deletedKeys", func(t *testing.T) { + cache := NewFakeLoaderCache() + // Pre-populate cache with the entity + cacheKey := `{"__typename":"User","key":{"id":"1234"}}` + _ = cache.Set(context.Background(), []*CacheEntry{ + {Key: cacheKey, Value: []byte(`{"id":"1234","username":"OldMe"}`)}, + }, 0) + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, cache, "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"updateUsername":{"id":"1234","username":"NewMe"}}`) + require.NoError(t, err) + + deletedKeys := l.detectMutationEntityImpact(res, info, responseData) + + // Should return the deleted key + assert.Equal(t, map[string]struct{}{cacheKey: {}}, deletedKeys) + + // Verify cache entry was actually deleted + entries, _ := cache.Get(context.Background(), []string{cacheKey}) + assert.Nil(t, entries[0], "cache entry should be deleted") + }) + + t.Run("analytics enabled, no cached value records MutationEvent with HadCachedValue=false", func(t *testing.T) { + cache := NewFakeLoaderCache() // empty cache + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, cache, "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"updateUsername":{"id":"1234","username":"NewMe"}}`) + require.NoError(t, err) + + _ = l.detectMutationEntityImpact(res, info, responseData) + + stats := ctx.GetCacheStats() + require.Len(t, stats.MutationEvents, 1) + + event := stats.MutationEvents[0] + assert.Equal(t, "updateUsername", event.MutationRootField) + assert.Equal(t, "User", event.EntityType) + assert.Equal(t, `{"__typename":"User","key":{"id":"1234"}}`, event.EntityCacheKey) // display key (no prefix) + assert.Equal(t, false, event.HadCachedValue) // no cached value in empty cache + assert.Equal(t, false, event.IsStale) + assert.Equal(t, uint64(0), event.CachedHash) // zero because no cached value + assert.NotEqual(t, uint64(0), event.FreshHash) + assert.Equal(t, 0, event.CachedBytes) + assert.NotEqual(t, 0, event.FreshBytes) + }) + + t.Run("analytics enabled, stale cached value records MutationEvent with IsStale=true", func(t *testing.T) { + cache := NewFakeLoaderCache() + cacheKey := `{"__typename":"User","key":{"id":"1234"}}` + // Cached value has username="OldMe" (differs from mutation response) + _ = cache.Set(context.Background(), []*CacheEntry{ + {Key: cacheKey, Value: []byte(`{"id":"1234","username":"OldMe"}`)}, + }, 0) + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, cache, "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"updateUsername":{"id":"1234","username":"NewMe"}}`) + require.NoError(t, err) + + _ = l.detectMutationEntityImpact(res, info, responseData) + + stats := ctx.GetCacheStats() + require.Len(t, stats.MutationEvents, 1) + + event := stats.MutationEvents[0] + assert.Equal(t, "updateUsername", event.MutationRootField) + assert.Equal(t, "User", event.EntityType) + assert.Equal(t, true, event.HadCachedValue) // cache was populated + assert.Equal(t, true, event.IsStale) // username changed: OldMe -> NewMe + assert.NotEqual(t, uint64(0), event.CachedHash) + assert.NotEqual(t, uint64(0), event.FreshHash) + assert.NotEqual(t, event.CachedHash, event.FreshHash) // hashes differ because content differs + assert.NotEqual(t, 0, event.CachedBytes) + assert.NotEqual(t, 0, event.FreshBytes) + }) + + t.Run("analytics enabled, fresh cached value records MutationEvent with IsStale=false", func(t *testing.T) { + cache := NewFakeLoaderCache() + cacheKey := `{"__typename":"User","key":{"id":"1234"}}` + // Cached value matches the mutation response exactly + _ = cache.Set(context.Background(), []*CacheEntry{ + {Key: cacheKey, Value: []byte(`{"id":"1234","username":"NewMe"}`)}, + }, 0) + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, cache, "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"updateUsername":{"id":"1234","username":"NewMe"}}`) + require.NoError(t, err) + + _ = l.detectMutationEntityImpact(res, info, responseData) + + stats := ctx.GetCacheStats() + require.Len(t, stats.MutationEvents, 1) + + event := stats.MutationEvents[0] + assert.Equal(t, "updateUsername", event.MutationRootField) + assert.Equal(t, "User", event.EntityType) + assert.Equal(t, true, event.HadCachedValue) // cache was populated + assert.Equal(t, false, event.IsStale) // cached value matches mutation response + assert.Equal(t, event.CachedHash, event.FreshHash) // hashes are equal + assert.NotEqual(t, uint64(0), event.CachedHash) + assert.NotEqual(t, 0, event.CachedBytes) + assert.NotEqual(t, 0, event.FreshBytes) + }) + + t.Run("InvalidateCache false with analytics records event but no Delete", func(t *testing.T) { + cache := NewFakeLoaderCache() + cacheKey := `{"__typename":"User","key":{"id":"1234"}}` + _ = cache.Set(context.Background(), []*CacheEntry{ + {Key: cacheKey, Value: []byte(`{"id":"1234","username":"OldMe"}`)}, + }, 0) + cache.ClearLog() + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, cache, "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: false, // no deletion + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"updateUsername":{"id":"1234","username":"NewMe"}}`) + require.NoError(t, err) + + deletedKeys := l.detectMutationEntityImpact(res, info, responseData) + assert.Nil(t, deletedKeys, "no keys should be deleted when InvalidateCache=false") + + // Verify only a Get was logged (for analytics), no Delete + log := cache.GetLog() + require.Len(t, log, 1, "exactly 1 cache operation: Get for analytics comparison") + assert.Equal(t, "get", log[0].Operation) + + // Verify cache entry still exists + entries, _ := cache.Get(context.Background(), []string{cacheKey}) + assert.NotNil(t, entries[0], "cache entry should still exist") + + // Verify MutationEvent was recorded + stats := ctx.GetCacheStats() + require.Len(t, stats.MutationEvents, 1) + assert.Equal(t, true, stats.MutationEvents[0].HadCachedValue) + assert.Equal(t, true, stats.MutationEvents[0].IsStale) // username changed + }) + + t.Run("no caches map returns nil", func(t *testing.T) { + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := &Loader{ + jsonArena: arena.NewMonotonicArena(arena.WithMinBufferSize(1024)), + ctx: ctx, + caches: nil, // no caches + } + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + responseData := astjson.MustParse(`{"updateUsername":{"id":"1234","username":"NewMe"}}`) + + got := l.detectMutationEntityImpact(res, info, responseData) + assert.Nil(t, got) + }) + + t.Run("nil ProvidesData returns nil", func(t *testing.T) { + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, NewFakeLoaderCache(), "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := &FetchInfo{ + OperationType: ast.OperationTypeMutation, + DataSourceName: "accounts", + RootFields: []GraphCoordinate{ + {TypeName: "Mutation", FieldName: "updateUsername"}, + }, + ProvidesData: nil, // no ProvidesData + } + res := makeResult(cfg) + + responseData := astjson.MustParse(`{"updateUsername":{"id":"1234","username":"NewMe"}}`) + + got := l.detectMutationEntityImpact(res, info, responseData) + assert.Nil(t, got) + }) + + t.Run("response data not an object returns nil", func(t *testing.T) { + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, NewFakeLoaderCache(), "default") + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("updateUsername", mutationProvidesData) + res := makeResult(cfg) + + // Mutation returns a string instead of object + responseData := astjson.MustParse(`{"updateUsername":"not-an-object"}`) + + got := l.detectMutationEntityImpact(res, info, responseData) + assert.Nil(t, got) + }) + + t.Run("array response invalidates all entities in the list", func(t *testing.T) { + cache := NewFakeLoaderCache() + // Pre-populate cache with two entities + cacheKey1 := `{"__typename":"User","key":{"id":"1"}}` + cacheKey2 := `{"__typename":"User","key":{"id":"2"}}` + _ = cache.Set(context.Background(), []*CacheEntry{ + {Key: cacheKey1, Value: []byte(`{"id":"1","username":"Alice"}`)}, + {Key: cacheKey2, Value: []byte(`{"id":"2","username":"Bob"}`)}, + }, 0) + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.Caching.EnableCacheAnalytics = true + ctx.initCacheAnalytics() + + l := makeLoader(ctx, cache, "default") + + // ProvidesData for a list mutation: {deleteUsers: [{id, username}]} + listEntityProvidesData := &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}}, + {Name: []byte("username"), Value: &Scalar{Path: []string{"username"}}}, + }, + } + listMutationProvidesData := &Object{ + Fields: []*Field{ + {Name: []byte("deleteUsers"), Value: listEntityProvidesData}, + }, + } + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("deleteUsers", listMutationProvidesData) + res := makeResult(cfg) + + // Mutation returns an array of entities + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"deleteUsers":[{"id":"1","username":"Alice"},{"id":"2","username":"Bob"}]}`) + require.NoError(t, err) + + deletedKeys := l.detectMutationEntityImpact(res, info, responseData) + + // Both entities should be invalidated + assert.Equal(t, map[string]struct{}{cacheKey1: {}, cacheKey2: {}}, deletedKeys) + + // Verify both cache entries were deleted + entries, _ := cache.Get(context.Background(), []string{cacheKey1, cacheKey2}) + assert.Nil(t, entries[0], "first entity should be deleted") + assert.Nil(t, entries[1], "second entity should be deleted") + + // Verify analytics recorded events for both entities + stats := ctx.GetCacheStats() + require.Len(t, stats.MutationEvents, 2, "should record mutation event for each entity in the list") + assert.Equal(t, cacheKey1, stats.MutationEvents[0].EntityCacheKey) + assert.Equal(t, true, stats.MutationEvents[0].HadCachedValue) + assert.Equal(t, cacheKey2, stats.MutationEvents[1].EntityCacheKey) + assert.Equal(t, true, stats.MutationEvents[1].HadCachedValue) + }) + + t.Run("array response with non-object items skips them", func(t *testing.T) { + cache := NewFakeLoaderCache() + cacheKey := `{"__typename":"User","key":{"id":"1"}}` + _ = cache.Set(context.Background(), []*CacheEntry{ + {Key: cacheKey, Value: []byte(`{"id":"1","username":"Alice"}`)}, + }, 0) + + ctx := NewContext(context.Background()) + l := makeLoader(ctx, cache, "default") + + listEntityProvidesData := &Object{ + Fields: []*Field{ + {Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}}, + {Name: []byte("username"), Value: &Scalar{Path: []string{"username"}}}, + }, + } + listMutationProvidesData := &Object{ + Fields: []*Field{ + {Name: []byte("deleteUsers"), Value: listEntityProvidesData}, + }, + } + + cfg := &MutationEntityImpactConfig{ + EntityTypeName: "User", + KeyFields: []KeyField{{Name: "id"}}, + CacheName: "default", + InvalidateCache: true, + } + info := makeMutationInfo("deleteUsers", listMutationProvidesData) + res := makeResult(cfg) + + // Array with mixed types: one valid object, one null, one string + responseData, err := astjson.ParseWithArena(l.jsonArena, `{"deleteUsers":[{"id":"1","username":"Alice"},null,"invalid"]}`) + require.NoError(t, err) + + deletedKeys := l.detectMutationEntityImpact(res, info, responseData) + + // Only the valid object entity should be invalidated + assert.Equal(t, map[string]struct{}{cacheKey: {}}, deletedKeys) + }) +} diff --git a/v2/pkg/engine/resolve/negative_cache_test.go b/v2/pkg/engine/resolve/negative_cache_test.go new file mode 100644 index 0000000000..4121c56c1f --- /dev/null +++ b/v2/pkg/engine/resolve/negative_cache_test.go @@ -0,0 +1,477 @@ +package resolve + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/go-arena" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" + "github.com/wundergraph/graphql-go-tools/v2/pkg/fastjsonext" +) + +// newNegativeCacheProductProvidesData returns a ProvidesData object for negative cache tests. +// Uses only "name" since that's what the entity fetch requests (unlike the interceptor +// helper which includes "id" + "name"). +func newNegativeCacheProductProvidesData() *Object { + return &Object{ + Fields: []*Field{ + { + Name: []byte("name"), + Value: &Scalar{ + Path: []string{"name"}, + Nullable: false, + }, + }, + }, + } +} + +// newNegativeCacheEntitySegments returns input template segments for negative cache entity fetches. +func newNegativeCacheEntitySegments() []TemplateSegment { + return []TemplateSegment{ + { + Data: []byte(`{"method":"POST","url":"http://products.service","body":{"query":"query($representations: [_Any!]!){_entities(representations: $representations){... on Product {name}}}","variables":{"representations":[`), + SegmentType: StaticSegmentType, + }, + { + SegmentType: VariableSegmentType, + VariableKind: ResolvableObjectVariableKind, + Renderer: NewGraphQLVariableResolveRenderer(&Object{ + Fields: []*Field{ + {Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}}, + {Name: []byte("id"), Value: &String{Path: []string{"id"}}}, + }, + }), + }, + { + Data: []byte(`]}}}`), + SegmentType: StaticSegmentType, + }, + } +} + +func TestNegativeCaching(t *testing.T) { + t.Run("null entity stored as negative sentinel and served on second request", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cache := NewFakeLoaderCache() + + // Root fetch provides the product reference + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil + }).AnyTimes() + + // Entity fetch returns null (entity not found in this subgraph) + productDS := NewMockDataSource(ctrl) + productDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"_entities":[null]}}`), nil + }).Times(1) // Only called ONCE — second request uses negative cache + + cacheKeyTemplate := newProductCacheKeyTemplate() + providesData := newNegativeCacheProductProvidesData() + + buildResponse := func() *GraphQLResponse { + return &GraphQLResponse{ + Info: &GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Fetches: Sequence( + // Root fetch to populate product reference + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + { + Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), + SegmentType: StaticSegmentType, + }, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + + // Entity fetch that returns null + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: productDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: cacheKeyTemplate, + NegativeCacheTTL: 10 * time.Second, + }, + }, + InputTemplate: InputTemplate{ + Segments: newNegativeCacheEntitySegments(), + }, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: providesData, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("product"), + Value: &Object{ + Path: []string{"product"}, + Nullable: true, + Fields: []*Field{ + { + Name: []byte("name"), + Value: &String{ + Path: []string{"name"}, + Nullable: true, + }, + }, + }, + }, + }, + }, + }, + } + } + + execute := func() string { + loader := &Loader{ + caches: map[string]LoaderCache{ + "default": cache, + }, + } + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL2Cache = true + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, buildResponse(), resolvable) + require.NoError(t, err) + + return string(fastjsonext.PrintGraphQLResponse(resolvable.data, resolvable.errors)) + } + + // First execution: subgraph is called, returns null + out1 := execute() + t.Logf("First output: %s", out1) + + // Verify the null sentinel was stored in L2 + cacheLog := cache.GetLog() + var setFound bool + for _, entry := range cacheLog { + if entry.Operation == "set" { + for _, key := range entry.Keys { + t.Logf("Stored cache key: %s", key) + } + setFound = true + } + } + assert.True(t, setFound, "Expected a cache set operation for the negative sentinel") + + // Find the last set operation's first key and verify stored value is "null" + for i := len(cacheLog) - 1; i >= 0; i-- { + if cacheLog[i].Operation == "set" && len(cacheLog[i].Keys) > 0 { + storedValue := cache.GetValue(cacheLog[i].Keys[0]) + assert.Equal(t, "null", string(storedValue), "Negative cache sentinel should be 'null' bytes") + break + } + } + + cache.ClearLog() + + // Second execution: should NOT call the subgraph (negative cache hit) + out2 := execute() + t.Logf("Second output: %s", out2) + + // Verify L2 cache was read (GET) and returned a hit + cacheLog2 := cache.GetLog() + var getFound bool + for _, entry := range cacheLog2 { + if entry.Operation == "get" { + for i, hit := range entry.Hits { + t.Logf("Cache key %s: hit=%v", entry.Keys[i], hit) + if hit { + getFound = true + } + } + } + } + assert.True(t, getFound, "Expected L2 cache hit for negative sentinel on second call") + }) + + t.Run("negative caching disabled when NegativeCacheTTL is 0", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cache := NewFakeLoaderCache() + + // Root fetch provides the product reference + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil + }).AnyTimes() + + // Subgraph returns null both times — no negative caching + productDS := NewMockDataSource(ctrl) + productDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"_entities":[null]}}`), nil + }).Times(2) // Called TWICE because negative caching is disabled + + cacheKeyTemplate := newProductCacheKeyTemplate() + providesData := newNegativeCacheProductProvidesData() + + buildResponse := func() *GraphQLResponse { + return &GraphQLResponse{ + Info: &GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Fetches: Sequence( + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + { + Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), + SegmentType: StaticSegmentType, + }, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: productDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 30 * time.Second, + CacheKeyTemplate: cacheKeyTemplate, + NegativeCacheTTL: 0, // Negative caching disabled + }, + }, + InputTemplate: InputTemplate{ + Segments: newNegativeCacheEntitySegments(), + }, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: providesData, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("product"), + Value: &Object{ + Path: []string{"product"}, + Nullable: true, + Fields: []*Field{ + { + Name: []byte("name"), + Value: &String{ + Path: []string{"name"}, + Nullable: true, + }, + }, + }, + }, + }, + }, + }, + } + } + + execute := func() { + loader := &Loader{ + caches: map[string]LoaderCache{ + "default": cache, + }, + } + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL2Cache = true + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, buildResponse(), resolvable) + require.NoError(t, err) + } + + // Both calls should hit the subgraph (no negative caching) + execute() + cache.ClearLog() + execute() + // gomock verifies Times(2) — both calls went to subgraph + }) + + t.Run("negative cache sentinel uses NegativeCacheTTL not regular TTL", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cache := NewFakeLoaderCache() + + // Root fetch provides the product reference + rootDS := NewMockDataSource(ctrl) + rootDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"product":{"__typename":"Product","id":"prod-1"}}}`), nil + }).Times(1) + + // Entity fetch returns null + productDS := NewMockDataSource(ctrl) + productDS.EXPECT(). + Load(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, headers any, input []byte) ([]byte, error) { + return []byte(`{"data":{"_entities":[null]}}`), nil + }).Times(1) + + cacheKeyTemplate := newProductCacheKeyTemplate() + providesData := newNegativeCacheProductProvidesData() + + response := &GraphQLResponse{ + Info: &GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Fetches: Sequence( + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: rootDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + }, + }, + InputTemplate: InputTemplate{ + Segments: []TemplateSegment{ + { + Data: []byte(`{"method":"POST","url":"http://root.service","body":{"query":"{product {__typename id}}"}}`), + SegmentType: StaticSegmentType, + }, + }, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query"), + + SingleWithPath(&SingleFetch{ + FetchConfiguration: FetchConfiguration{ + DataSource: productDS, + PostProcessing: PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data", "_entities", "0"}, + }, + Caching: FetchCacheConfiguration{ + Enabled: true, + CacheName: "default", + TTL: 60 * time.Second, + CacheKeyTemplate: cacheKeyTemplate, + NegativeCacheTTL: 5 * time.Second, // Much shorter than regular TTL + }, + }, + InputTemplate: InputTemplate{ + Segments: newNegativeCacheEntitySegments(), + }, + Info: &FetchInfo{ + DataSourceID: "products", + DataSourceName: "products", + OperationType: ast.OperationTypeQuery, + ProvidesData: providesData, + }, + DataSourceIdentifier: []byte("graphql_datasource.Source"), + }, "query.product", ObjectPath("product")), + ), + Data: &Object{ + Fields: []*Field{ + { + Name: []byte("product"), + Value: &Object{ + Path: []string{"product"}, + Nullable: true, + Fields: []*Field{ + { + Name: []byte("name"), + Value: &String{ + Path: []string{"name"}, + Nullable: true, + }, + }, + }, + }, + }, + }, + }, + } + + loader := &Loader{ + caches: map[string]LoaderCache{ + "default": cache, + }, + } + + ctx := NewContext(context.Background()) + ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true + ctx.ExecutionOptions.Caching.EnableL2Cache = true + + ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024)) + resolvable := NewResolvable(ar, ResolvableOptions{}) + err := resolvable.Init(ctx, nil, ast.OperationTypeQuery) + require.NoError(t, err) + + err = loader.LoadGraphQLResponseData(ctx, response, resolvable) + require.NoError(t, err) + + // Verify the TTL used for the negative sentinel + cacheLog := cache.GetLog() + for _, entry := range cacheLog { + if entry.Operation == "set" { + t.Logf("Set: keys=%v ttl=%v", entry.Keys, entry.TTL) + // The negative sentinel should use NegativeCacheTTL (5s), not regular TTL (60s) + assert.Equal(t, 5*time.Second, entry.TTL, "Negative cache sentinel should use NegativeCacheTTL") + } + } + }) +} diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 789cedf5ac..8dd03408d2 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -97,6 +97,23 @@ func (r *Resolver) SetAsyncErrorWriter(w AsyncErrorWriter) { r.asyncErrorWriter = w } +// CacheCircuitBreakerOpen returns true if the circuit breaker for the named cache +// is currently open (blocking L2 operations). Returns false if the cache doesn't +// exist or has no circuit breaker configured. +func (r *Resolver) CacheCircuitBreakerOpen(cacheName string) bool { + if r.options.Caches == nil { + return false + } + cache, ok := r.options.Caches[cacheName] + if !ok { + return false + } + if cb, ok := cache.(*circuitBreakerCache); ok { + return cb.state.isOpen() + } + return false +} + type tools struct { resolvable *Resolvable loader *Loader @@ -192,6 +209,12 @@ type ResolverOptions struct { Caches map[string]LoaderCache + // CacheCircuitBreakers configures per-cache circuit breakers. + // Map key must match a key in Caches. Entries for missing cache names are ignored. + // When a breaker trips (consecutive failures >= threshold), all L2 operations for + // that cache are skipped until the cooldown period elapses. + CacheCircuitBreakers map[string]CircuitBreakerConfig + // EntityCacheConfigs maps subgraphName → entityTypeName → config. // Used by extensions-based cache invalidation to look up cache settings at runtime. EntityCacheConfigs map[string]map[string]*EntityCacheInvalidationConfig @@ -280,6 +303,9 @@ func New(ctx context.Context, options ResolverOptions) *Resolver { options.InboundRequestDeduplicationShardCount = n } + // Wrap caches with circuit breakers where configured + options.Caches = wrapCachesWithCircuitBreakers(options.Caches, options.CacheCircuitBreakers) + resolver := &Resolver{ ctx: ctx, options: options, @@ -712,15 +738,21 @@ func (r *Resolver) handleTriggerEntityCache(config *triggerEntityCacheConfig, da return } - // Get the subgraph header prefix for cache key isolation + // Get the global prefix and subgraph header prefix for cache key isolation. + // Mirrors prepareCacheKeys(): global prefix → header hash prefix → interceptor. var prefix string + globalPrefix := config.resolveCtx.ExecutionOptions.Caching.GlobalCacheKeyPrefix if config.pop.IncludeSubgraphHeaderPrefix && config.resolveCtx.SubgraphHeadersBuilder != nil { _, hash := config.resolveCtx.SubgraphHeadersBuilder.HeadersForSubgraph(config.pop.DataSourceName) - if hash != 0 { - var buf [20]byte - b := strconv.AppendUint(buf[:0], hash, 10) + var buf [20]byte + b := strconv.AppendUint(buf[:0], hash, 10) + if globalPrefix != "" { + prefix = globalPrefix + ":" + string(b) + } else { prefix = string(b) } + } else if globalPrefix != "" { + prefix = globalPrefix } // We need a temporary resolvable to parse the subscription data and extract entity items. @@ -791,6 +823,22 @@ func (r *Resolver) handleTriggerEntityCache(config *triggerEntityCacheConfig, da return } + // Apply L2CacheKeyInterceptor to match the full key construction pipeline + // used by prepareCacheKeys() and processExtensionsCacheInvalidation(). + // Without this, custom key transforms (e.g., tenant prefix) would be missing + // from subscription cache operations, causing cache key mismatches. + if interceptor := config.resolveCtx.ExecutionOptions.Caching.L2CacheKeyInterceptor; interceptor != nil { + interceptorInfo := L2CacheKeyInterceptorInfo{ + SubgraphName: config.pop.DataSourceName, + CacheName: config.pop.CacheName, + } + for _, ck := range cacheKeys { + for i, key := range ck.Keys { + ck.Keys[i] = interceptor(config.resolveCtx.ctx, key, interceptorInfo) + } + } + } + // Use the resolver context (not client context) since this is a trigger-level operation ctx := r.ctx diff --git a/v2/pkg/engine/resolve/trigger_cache_test.go b/v2/pkg/engine/resolve/trigger_cache_test.go index 49855d6006..c779750e06 100644 --- a/v2/pkg/engine/resolve/trigger_cache_test.go +++ b/v2/pkg/engine/resolve/trigger_cache_test.go @@ -81,6 +81,7 @@ func TestHandleTriggerEntityCache(t *testing.T) { Operation: "set", Keys: []string{`{"__typename":"Product","key":{"id":"prod-1"}}`}, Hits: nil, + TTL: 30 * time.Second, }, log[0], "should set the entity with correct cache key") // Verify stored data