Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
364 changes: 364 additions & 0 deletions v2/pkg/engine/resolve/loader_parallel_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,364 @@
package resolve

import (
"context"
"net/http"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Check failure on line 10 in v2/pkg/engine/resolve/loader_parallel_race_test.go

View workflow job for this annotation

GitHub Actions / Linters (1.25)

File is not properly formatted (gci)
"github.com/wundergraph/go-arena"
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
"github.com/wundergraph/graphql-go-tools/v2/pkg/fastjsonext"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// TestResolveParallel_NoConcurrentArenaRace verifies that parallel entity fetches
// with L2 caching do not race on the arena. This test exercises the goroutine code
// paths in resolveParallel Phase 2 (extractCacheKeysStrings, populateFromCache,
// denormalizeFromCache) which allocate from per-goroutine arenas.
//
// Run with: go test -race -run TestResolveParallel_NoConcurrentArenaRace ./v2/pkg/engine/resolve/... -v -count=1
func TestResolveParallel_NoConcurrentArenaRace(t *testing.T) {
t.Run("parallel batch entity fetches with L2 cache miss", func(t *testing.T) {
// Scenario: Root fetch → Parallel(
// BatchEntityFetch (products subgraph, L2 miss → subgraph fetch),
// BatchEntityFetch (inventory subgraph, L2 miss → subgraph fetch),
// )
// Both fetches run as goroutines in Phase 2, exercising arena allocations concurrently.
// With -race, this would detect if goroutines accidentally share l.jsonArena.

productsDS := &staticDataSource{data: []byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Widget"},{"__typename":"Product","id":"prod-2","name":"Gadget"}]}}`)}
inventoryDS := &staticDataSource{data: []byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","inStock":true},{"__typename":"Product","id":"prod-2","inStock":false}]}}`)}

productCacheKeyTemplate := &EntityQueryCacheKeyTemplate{
Keys: NewResolvableObjectVariable(&Object{
Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
},
}),
}

inventoryCacheKeyTemplate := &EntityQueryCacheKeyTemplate{
Keys: NewResolvableObjectVariable(&Object{
Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
},
}),
}

// Run 100 iterations to increase the race window probability
for range 100 {
cache := NewFakeLoaderCache()

rootDS := &staticDataSource{data: []byte(`{"data":{"products":[{"__typename":"Product","id":"prod-1"},{"__typename":"Product","id":"prod-2"}]}}`)}

response := &GraphQLResponse{
Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery},
Fetches: Sequence(
SingleWithPath(&SingleFetch{
FetchConfiguration: FetchConfiguration{
DataSource: rootDS,
PostProcessing: PostProcessingConfiguration{SelectResponseDataPath: []string{"data"}},
},
InputTemplate: InputTemplate{
Segments: []TemplateSegment{{Data: []byte(`{"method":"POST","url":"http://products"}`), SegmentType: StaticSegmentType}},
},
}, "query"),
Parallel(
SingleWithPath(&BatchEntityFetch{
Input: BatchInput{
Header: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`{"method":"POST","url":"http://products","body":{"query":"names","variables":{"representations":[`), SegmentType: StaticSegmentType}}},
Items: []InputTemplate{{Segments: []TemplateSegment{{
SegmentType: VariableSegmentType,
VariableKind: ResolvableObjectVariableKind,
Renderer: NewGraphQLVariableResolveRenderer(&Object{Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
}}),
}}}},
Separator: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`,`), SegmentType: StaticSegmentType}}},
Footer: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`]}}}`), SegmentType: StaticSegmentType}}},
},
DataSource: productsDS,
PostProcessing: PostProcessingConfiguration{SelectResponseDataPath: []string{"data", "_entities"}},
Info: &FetchInfo{
DataSourceName: "products",
OperationType: ast.OperationTypeQuery,
RootFields: []GraphCoordinate{{TypeName: "Product"}},
ProvidesData: &Object{
Fields: []*Field{
{Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}},
{Name: []byte("name"), Value: &Scalar{Path: []string{"name"}}},
},
},
},
Caching: FetchCacheConfiguration{
Enabled: true,
CacheName: "default",
CacheKeyTemplate: productCacheKeyTemplate,
UseL1Cache: true,
TTL: 60_000_000_000, // 60s
},
}, "query.products", ArrayPath("products")),
SingleWithPath(&BatchEntityFetch{
Input: BatchInput{
Header: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`{"method":"POST","url":"http://inventory","body":{"query":"stock","variables":{"representations":[`), SegmentType: StaticSegmentType}}},
Items: []InputTemplate{{Segments: []TemplateSegment{{
SegmentType: VariableSegmentType,
VariableKind: ResolvableObjectVariableKind,
Renderer: NewGraphQLVariableResolveRenderer(&Object{Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
}}),
}}}},
Separator: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`,`), SegmentType: StaticSegmentType}}},
Footer: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`]}}}`), SegmentType: StaticSegmentType}}},
},
DataSource: inventoryDS,
PostProcessing: PostProcessingConfiguration{SelectResponseDataPath: []string{"data", "_entities"}},
Info: &FetchInfo{
DataSourceName: "inventory",
OperationType: ast.OperationTypeQuery,
RootFields: []GraphCoordinate{{TypeName: "Product"}},
ProvidesData: &Object{
Fields: []*Field{
{Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}},
{Name: []byte("inStock"), Value: &Scalar{Path: []string{"inStock"}}},
},
},
},
Caching: FetchCacheConfiguration{
Enabled: true,
CacheName: "inventory",
CacheKeyTemplate: inventoryCacheKeyTemplate,
UseL1Cache: true,
TTL: 60_000_000_000,
},
}, "query.products", ArrayPath("products")),
),
),
Data: &Object{
Fields: []*Field{
{
Name: []byte("products"),
Value: &Array{
Path: []string{"products"},
Item: &Object{
Fields: []*Field{
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
{Name: []byte("name"), Value: &String{Path: []string{"name"}}},
{Name: []byte("inStock"), Value: &Boolean{Path: []string{"inStock"}}},
},
},
},
},
},
},
}

ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024))
loader := &Loader{
jsonArena: ar,
caches: map[string]LoaderCache{"default": cache, "inventory": cache},
entityCacheConfigs: map[string]map[string]*EntityCacheInvalidationConfig{},
}

ctx := NewContext(context.Background())
ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true
ctx.ExecutionOptions.Caching.EnableL1Cache = true
ctx.ExecutionOptions.Caching.EnableL2Cache = true

resolvable := NewResolvable(ar, ResolvableOptions{})
err := resolvable.Init(ctx, nil, ast.OperationTypeQuery)
require.NoError(t, err)

err = loader.LoadGraphQLResponseData(ctx, response, resolvable)
require.NoError(t, err)

out := fastjsonext.PrintGraphQLResponse(resolvable.data, resolvable.errors)
assert.Contains(t, out, `"id":"prod-1"`)
assert.Contains(t, out, `"id":"prod-2"`)
Comment on lines +183 to +185
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Assert fields that only the parallel entity fetches can produce.

These checks only look for id, but id is already present in the root fetch payload. The test will still pass if the Phase 2 goroutine paths are skipped or misconfigured, which weakens it as a race regression test. Please assert name and inStock too, or otherwise prove those parallel branches ran.

🔧 Minimal assertion upgrade
- assert.Contains(t, out, `"id":"prod-1"`)
- assert.Contains(t, out, `"id":"prod-2"`)
+ assert.Contains(t, out, `"name":"Widget"`)
+ assert.Contains(t, out, `"name":"Gadget"`)
+ assert.Contains(t, out, `"inStock":true`)
+ assert.Contains(t, out, `"inStock":false`)

Also applies to: 343-345

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@v2/pkg/engine/resolve/loader_parallel_race_test.go` around lines 183 - 185,
The test currently only asserts `"id":"prod-1"`/`"id":"prod-2"` which can be
satisfied by the root fetch; update the assertions in
loader_parallel_race_test.go (the block using
fastjsonext.PrintGraphQLResponse(resolvable.data, resolvable.errors) and
variable resolvable) to also assert fields that only the parallel entity fetches
produce (e.g. assert the output contains `"name":"..."` and
`"inStock":true/false` for both prod-1 and prod-2), and make the same assertion
upgrade for the other occurrence around the 343-345 region so the test proves
the Phase 2 goroutine branches actually ran.


loader.Free()
ar.Reset()
}
})

t.Run("parallel batch entity fetches with L2 cache hit", func(t *testing.T) {
// Scenario: Same as above but with pre-populated L2 cache.
// Goroutines exercise populateFromCache (parsing cached JSON on goroutine arena).

cache := NewFakeLoaderCache()
// Pre-populate L2 cache with entity data
cache.SetRawData(`{"__typename":"Product","key":{"id":"prod-1"}}`, []byte(`{"__typename":"Product","id":"prod-1","name":"Widget"}`), 60_000_000_000)
cache.SetRawData(`{"__typename":"Product","key":{"id":"prod-2"}}`, []byte(`{"__typename":"Product","id":"prod-2","name":"Gadget"}`), 60_000_000_000)
Comment on lines +200 to +203
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep the partial-hit scenario isolated per iteration.

This subtest does not reliably stay in the advertised “products hit / inventory miss” state. The cache is preloaded once outside the loop and then reused for all 100 runs, and the inventory fetch also points at default. That means the inventory branch can observe the product cache entries immediately, and after the first iteration the remaining runs will trend toward hit+hit instead of hit+miss. Use a fresh product/inventory cache setup per iteration and give inventory its own cache namespace/object.

Also applies to: 217-217, 295-300, 327-327

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@v2/pkg/engine/resolve/loader_parallel_race_test.go` around lines 200 - 203,
The test currently reuses a single FakeLoaderCache across all loop iterations
and sets product entries once, causing later iterations to see stale hits;
change the subtest to allocate fresh caches per iteration (e.g., call
NewFakeLoaderCache() inside the loop) and preload only the product cache entries
into a product-specific cache instance via SetRawData, while creating a separate
inventory-specific cache instance (or namespace) for the inventory loader so
inventory misses remain isolated; update the inventory loader setup so it uses
the inventory cache instance instead of the shared `default` cache.


productCacheKeyTemplate := &EntityQueryCacheKeyTemplate{
Keys: NewResolvableObjectVariable(&Object{
Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
},
}),
}

productsDS := &staticDataSource{data: []byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","name":"Widget"},{"__typename":"Product","id":"prod-2","name":"Gadget"}]}}`)}
inventoryDS := &staticDataSource{data: []byte(`{"data":{"_entities":[{"__typename":"Product","id":"prod-1","inStock":true},{"__typename":"Product","id":"prod-2","inStock":false}]}}`)}

for range 100 {
rootDS := &staticDataSource{data: []byte(`{"data":{"products":[{"__typename":"Product","id":"prod-1"},{"__typename":"Product","id":"prod-2"}]}}`)}

response := &GraphQLResponse{
Info: &GraphQLResponseInfo{OperationType: ast.OperationTypeQuery},
Fetches: Sequence(
SingleWithPath(&SingleFetch{
FetchConfiguration: FetchConfiguration{
DataSource: rootDS,
PostProcessing: PostProcessingConfiguration{SelectResponseDataPath: []string{"data"}},
},
InputTemplate: InputTemplate{
Segments: []TemplateSegment{{Data: []byte(`{"method":"POST","url":"http://products"}`), SegmentType: StaticSegmentType}},
},
}, "query"),
Parallel(
SingleWithPath(&BatchEntityFetch{
Input: BatchInput{
Header: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`{"method":"POST","url":"http://products","body":{"query":"names","variables":{"representations":[`), SegmentType: StaticSegmentType}}},
Items: []InputTemplate{{Segments: []TemplateSegment{{
SegmentType: VariableSegmentType,
VariableKind: ResolvableObjectVariableKind,
Renderer: NewGraphQLVariableResolveRenderer(&Object{Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
}}),
}}}},
Separator: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`,`), SegmentType: StaticSegmentType}}},
Footer: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`]}}}`), SegmentType: StaticSegmentType}}},
},
DataSource: productsDS,
PostProcessing: PostProcessingConfiguration{SelectResponseDataPath: []string{"data", "_entities"}},
Info: &FetchInfo{
DataSourceName: "products",
OperationType: ast.OperationTypeQuery,
RootFields: []GraphCoordinate{{TypeName: "Product"}},
ProvidesData: &Object{
Fields: []*Field{
{Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}},
{Name: []byte("name"), Value: &Scalar{Path: []string{"name"}}},
},
},
},
Caching: FetchCacheConfiguration{
Enabled: true,
CacheName: "default",
CacheKeyTemplate: productCacheKeyTemplate,
UseL1Cache: true,
TTL: 60_000_000_000,
},
}, "query.products", ArrayPath("products")),
SingleWithPath(&BatchEntityFetch{
Input: BatchInput{
Header: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`{"method":"POST","url":"http://inventory","body":{"query":"stock","variables":{"representations":[`), SegmentType: StaticSegmentType}}},
Items: []InputTemplate{{Segments: []TemplateSegment{{
SegmentType: VariableSegmentType,
VariableKind: ResolvableObjectVariableKind,
Renderer: NewGraphQLVariableResolveRenderer(&Object{Fields: []*Field{
{Name: []byte("__typename"), Value: &String{Path: []string{"__typename"}}},
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
}}),
}}}},
Separator: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`,`), SegmentType: StaticSegmentType}}},
Footer: InputTemplate{Segments: []TemplateSegment{{Data: []byte(`]}}}`), SegmentType: StaticSegmentType}}},
},
DataSource: inventoryDS,
PostProcessing: PostProcessingConfiguration{SelectResponseDataPath: []string{"data", "_entities"}},
Info: &FetchInfo{
DataSourceName: "inventory",
OperationType: ast.OperationTypeQuery,
RootFields: []GraphCoordinate{{TypeName: "Product"}},
ProvidesData: &Object{
Fields: []*Field{
{Name: []byte("id"), Value: &Scalar{Path: []string{"id"}}},
{Name: []byte("inStock"), Value: &Scalar{Path: []string{"inStock"}}},
},
},
},
Caching: FetchCacheConfiguration{
Enabled: true,
CacheName: "default",
CacheKeyTemplate: productCacheKeyTemplate,
UseL1Cache: true,
TTL: 60_000_000_000,
},
}, "query.products", ArrayPath("products")),
),
),
Data: &Object{
Fields: []*Field{
{
Name: []byte("products"),
Value: &Array{
Path: []string{"products"},
Item: &Object{
Fields: []*Field{
{Name: []byte("id"), Value: &String{Path: []string{"id"}}},
{Name: []byte("name"), Value: &String{Path: []string{"name"}}},
{Name: []byte("inStock"), Value: &Boolean{Path: []string{"inStock"}}},
},
},
},
},
},
},
}

ar := arena.NewMonotonicArena(arena.WithMinBufferSize(1024))
loader := &Loader{
jsonArena: ar,
caches: map[string]LoaderCache{"default": cache},
entityCacheConfigs: map[string]map[string]*EntityCacheInvalidationConfig{},
}

ctx := NewContext(context.Background())
ctx.ExecutionOptions.DisableSubgraphRequestDeduplication = true
ctx.ExecutionOptions.Caching.EnableL1Cache = true
ctx.ExecutionOptions.Caching.EnableL2Cache = true

resolvable := NewResolvable(ar, ResolvableOptions{})
err := resolvable.Init(ctx, nil, ast.OperationTypeQuery)
require.NoError(t, err)

err = loader.LoadGraphQLResponseData(ctx, response, resolvable)
require.NoError(t, err)

out := fastjsonext.PrintGraphQLResponse(resolvable.data, resolvable.errors)
assert.Contains(t, out, `"id":"prod-1"`)
assert.Contains(t, out, `"id":"prod-2"`)

loader.Free()
ar.Reset()
}
})
}

// staticDataSource returns static data for every Load call. Thread-safe.
type staticDataSource struct {
data []byte
mu sync.Mutex
}

func (s *staticDataSource) Load(ctx context.Context, headers http.Header, input []byte) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]byte, len(s.data))
copy(out, s.data)
return out, nil
}

func (s *staticDataSource) LoadWithFiles(ctx context.Context, headers http.Header, input []byte, files []*httpclient.FileUpload) ([]byte, error) {
return s.Load(ctx, headers, input)
}
Loading