-
Notifications
You must be signed in to change notification settings - Fork 158
Expand file tree
/
Copy pathcache_analytics.go
More file actions
1017 lines (932 loc) · 32.5 KB
/
cache_analytics.go
File metadata and controls
1017 lines (932 loc) · 32.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package resolve
import (
"strings"
"time"
"github.com/cespare/xxhash/v2"
"github.com/wundergraph/astjson"
)
// CacheLevel indicates whether a cache operation targets L1 or L2.
type CacheLevel uint8
const (
CacheLevelL1 CacheLevel = iota + 1
CacheLevelL2
)
// CacheKeyEventKind classifies the result of a cache key lookup.
type CacheKeyEventKind uint8
const (
CacheKeyHit CacheKeyEventKind = iota + 1
CacheKeyMiss // Key not found or value nil
CacheKeyPartialHit // Key found but missing required fields
)
// FieldSource indicates where the data for an entity came from.
type FieldSource uint8
const (
FieldSourceSubgraph FieldSource = iota // Default: data came from subgraph fetch
FieldSourceL1 // Data came from L1 (per-request) cache
FieldSourceL2 // Data came from L2 (external) cache
FieldSourceShadowCached // Cached value saved during shadow comparison
)
// CacheKeyEvent records a single cache key lookup result.
type CacheKeyEvent struct {
CacheKey string
EntityType string
Kind CacheKeyEventKind
DataSource string
ByteSize int
CacheAgeMs int64 // age of cached entry in ms (L2 hits only, 0 = unknown)
Shadow bool // true if this event occurred in shadow mode
}
// CacheWriteEvent records a single cache write operation.
type CacheWriteEvent struct {
CacheKey string
EntityType string
ByteSize int
DataSource string
CacheLevel CacheLevel
TTL time.Duration
Shadow bool // true if this write occurred in shadow mode
}
// FetchTimingEvent records the duration of a subgraph fetch or cache lookup.
type FetchTimingEvent struct {
DataSource string // subgraph name
EntityType string // entity type (empty for root fetches)
DurationMs int64 // time spent on this operation in milliseconds
Source FieldSource // what handled this: Subgraph (fetch), L2 (cache GET)
ItemCount int // number of entities in this fetch/lookup
IsEntityFetch bool // true for _entities, false for root field
HTTPStatusCode int // HTTP status code from subgraph response (0 for cache hits)
ResponseBytes int // response body size in bytes (0 for cache hits)
TTFBMs int64 // time to first byte in milliseconds (0 when unavailable)
}
// SubgraphErrorEvent records a subgraph error for analytics.
type SubgraphErrorEvent struct {
DataSource string // subgraph name
EntityType string // entity type (empty for root fetches)
Message string // error message (truncated for safety)
Code string // error code from errors[0].extensions.code (empty if not present)
}
// EntityFieldHash stores an xxhash of a scalar field value on an entity type,
// along with the entity's key data and the source of the data.
type EntityFieldHash struct {
EntityType string
FieldName string
FieldHash uint64 // xxhash of the non-key field value
KeyRaw string // raw key JSON e.g. {"id":"1234"} (when HashKeys=false)
KeyHash uint64 // xxhash of key JSON (when HashKeys=true)
Source FieldSource // where the entity data came from (L1/L2/Subgraph)
}
// EntityTypeInfo holds the entity type name and its instance count.
type EntityTypeInfo struct {
TypeName string
Count int
UniqueKeys int // number of distinct entity keys
}
// entityCount is an internal type for accumulating entity counts.
type entityCount struct {
typeName string
count int
uniqueKeys map[string]struct{} // set of seen entity key JSONs
}
// entitySourceRecord records where each entity's data came from.
type entitySourceRecord struct {
entityType string
keyJSON string
source FieldSource
}
// ShadowComparisonEvent records a comparison between cached and fresh data in shadow mode.
type ShadowComparisonEvent struct {
CacheKey string // cache key for correlation
EntityType string // entity type name
IsFresh bool // true if ProvidesData fields match between cached and fresh
CachedHash uint64 // xxhash of extracted ProvidesData fields from cached value
FreshHash uint64 // xxhash of extracted ProvidesData fields from fresh value
CachedBytes int // byte size of cached ProvidesData fields
FreshBytes int // byte size of fresh ProvidesData fields
DataSource string // which subgraph provided the data (e.g. "accounts")
CacheAgeMs int64 // how old the cached entry was in milliseconds (0 = unknown)
ConfiguredTTL time.Duration // TTL configured for this entity type
}
// MutationEvent records that a mutation returned a cacheable entity.
// Recorded during mutation execution by proactively comparing the mutation response
// with the L2 cached value for the same entity.
type MutationEvent struct {
MutationRootField string // e.g., "updateUsername"
EntityType string // e.g., "User"
EntityCacheKey string // display key e.g. {"__typename":"User","key":{"id":"1234"}}
HadCachedValue bool // true if L2 had a cached value for this entity
IsStale bool // true if cached value differs from mutation response (always false when HadCachedValue=false)
CachedHash uint64 // xxhash of cached ProvidesData fields (0 when HadCachedValue=false)
FreshHash uint64 // xxhash of mutation response ProvidesData fields
CachedBytes int // 0 when HadCachedValue=false
FreshBytes int
}
// CacheOperationError records a cache operation (Get/Set/Delete) that returned an error.
// Cache errors are non-fatal (the engine falls back to subgraph fetch), but tracking them
// in analytics allows operators to detect cache infrastructure issues.
type CacheOperationError struct {
Operation string // "get", "set", or "delete"
CacheName string // named cache instance
EntityType string // entity type (empty for root fetches)
DataSource string // subgraph name
Message string // error message (truncated for safety)
ItemCount int // number of keys involved in the failed operation
}
// HeaderImpactEvent records a fresh fetch that wrote to L2 cache with header-prefixed keys.
// A cross-request consumer can aggregate these events: when the same BaseKey appears with
// different HeaderHash values but identical ResponseHash values, the forwarded headers
// do not affect the subgraph response, and IncludeSubgraphHeaderPrefix can be disabled.
type HeaderImpactEvent struct {
BaseKey string // cache key WITHOUT header prefix (stable identity for grouping)
HeaderHash uint64 // hash of forwarded headers for this subgraph
ResponseHash uint64 // xxhash of the response value bytes written to L2
EntityType string // entity type (e.g., "User") or "Query" for root fields
DataSource string // subgraph name
}
// CacheAnalyticsCollector accumulates cache analytics events during request execution.
// All methods are designed to be called from a single goroutine (main thread) except
// where noted. L2 events from goroutines are accumulated on per-result slices and
// merged on the main thread via MergeL2Events.
type CacheAnalyticsCollector struct {
l1KeyEvents []CacheKeyEvent
l2KeyEvents []CacheKeyEvent
writeEvents []CacheWriteEvent
fieldHashes []EntityFieldHash // flat slice (was: nested maps)
entityCounts []entityCount // simple type→count (was: map)
entitySources []entitySourceRecord // records where each entity's data came from
fetchTimings []FetchTimingEvent // main thread timings
errorEvents []SubgraphErrorEvent // main thread errors
l2ErrorEvents []SubgraphErrorEvent // accumulated in goroutines, merged on main thread
l2FetchTimings []FetchTimingEvent // accumulated in goroutines, merged on main thread
shadowComparisons []ShadowComparisonEvent // shadow mode staleness comparison events
mutationEvents []MutationEvent // mutation entity impact events
headerImpactEvents []HeaderImpactEvent // header impact events for L2 writes with header prefix
cacheOpErrors []CacheOperationError // cache operation errors (main thread)
l2CacheOpErrors []CacheOperationError // accumulated in goroutines, merged on main thread
xxh *xxhash.Digest
}
// NewCacheAnalyticsCollector creates a new collector with pre-allocated slices.
func NewCacheAnalyticsCollector() *CacheAnalyticsCollector {
return &CacheAnalyticsCollector{
l1KeyEvents: make([]CacheKeyEvent, 0, 16),
l2KeyEvents: make([]CacheKeyEvent, 0, 16),
writeEvents: make([]CacheWriteEvent, 0, 8),
fieldHashes: make([]EntityFieldHash, 0, 32),
entityCounts: make([]entityCount, 0, 4),
entitySources: make([]entitySourceRecord, 0, 16),
fetchTimings: make([]FetchTimingEvent, 0, 8),
errorEvents: make([]SubgraphErrorEvent, 0, 4),
xxh: xxhash.New(),
}
}
// RecordL1KeyEvent records an L1 cache key lookup event. Main thread only.
func (c *CacheAnalyticsCollector) RecordL1KeyEvent(kind CacheKeyEventKind, entityType, cacheKey, dataSource string, byteSize int) {
c.l1KeyEvents = append(c.l1KeyEvents, CacheKeyEvent{
CacheKey: cacheKey,
EntityType: entityType,
Kind: kind,
DataSource: dataSource,
ByteSize: byteSize,
})
}
// RecordL2KeyEvent records an L2 cache key lookup event. Main thread only.
// Use MergeL2Events to merge events collected on per-result slices from goroutines.
func (c *CacheAnalyticsCollector) RecordL2KeyEvent(kind CacheKeyEventKind, entityType, cacheKey, dataSource string, byteSize int) {
c.l2KeyEvents = append(c.l2KeyEvents, CacheKeyEvent{
CacheKey: cacheKey,
EntityType: entityType,
Kind: kind,
DataSource: dataSource,
ByteSize: byteSize,
})
}
// MergeL2Events merges L2 events collected on a per-result slice (from goroutines)
// into the collector. Must be called on the main thread.
func (c *CacheAnalyticsCollector) MergeL2Events(events []CacheKeyEvent) {
c.l2KeyEvents = append(c.l2KeyEvents, events...)
}
// RecordWrite records a cache write event. Main thread only.
func (c *CacheAnalyticsCollector) RecordWrite(cacheLevel CacheLevel, entityType, cacheKey, dataSource string, byteSize int, ttl time.Duration) {
c.writeEvents = append(c.writeEvents, CacheWriteEvent{
CacheKey: cacheKey,
EntityType: entityType,
ByteSize: byteSize,
DataSource: dataSource,
CacheLevel: cacheLevel,
TTL: ttl,
})
}
// HashFieldValue computes an xxhash of the given field value bytes and records it
// as an EntityFieldHash with entity key and source information.
func (c *CacheAnalyticsCollector) HashFieldValue(entityType, fieldName string, valueBytes []byte, keyRaw string, keyHash uint64, source FieldSource) {
c.xxh.Reset()
_, _ = c.xxh.Write(valueBytes)
hash := c.xxh.Sum64()
c.fieldHashes = append(c.fieldHashes, EntityFieldHash{
EntityType: entityType,
FieldName: fieldName,
FieldHash: hash,
KeyRaw: keyRaw,
KeyHash: keyHash,
Source: source,
})
}
// IncrementEntityCount increments the instance count for the given entity type.
// If keyJSON is non-empty, it is tracked for unique key counting.
func (c *CacheAnalyticsCollector) IncrementEntityCount(typeName string, keyJSON string) {
for i := range c.entityCounts {
if c.entityCounts[i].typeName == typeName {
c.entityCounts[i].count++
if keyJSON != "" {
if c.entityCounts[i].uniqueKeys == nil {
c.entityCounts[i].uniqueKeys = make(map[string]struct{}, 4)
}
c.entityCounts[i].uniqueKeys[keyJSON] = struct{}{}
}
return
}
}
var keys map[string]struct{}
if keyJSON != "" {
keys = map[string]struct{}{keyJSON: {}}
}
c.entityCounts = append(c.entityCounts, entityCount{typeName: typeName, count: 1, uniqueKeys: keys})
}
// RecordEntitySource records the source of data for a specific entity instance.
// Main thread only.
func (c *CacheAnalyticsCollector) RecordEntitySource(entityType, keyJSON string, source FieldSource) {
c.entitySources = append(c.entitySources, entitySourceRecord{
entityType: entityType,
keyJSON: keyJSON,
source: source,
})
}
// MergeEntitySources merges entity source records collected in goroutines
// into the collector. Must be called on the main thread.
func (c *CacheAnalyticsCollector) MergeEntitySources(sources []entitySourceRecord) {
c.entitySources = append(c.entitySources, sources...)
}
// RecordFetchTiming records a fetch timing event. Main thread only.
func (c *CacheAnalyticsCollector) RecordFetchTiming(event FetchTimingEvent) {
c.fetchTimings = append(c.fetchTimings, event)
}
// MergeL2FetchTimings merges fetch timing events collected in goroutines into the collector.
// Must be called on the main thread.
func (c *CacheAnalyticsCollector) MergeL2FetchTimings(timings []FetchTimingEvent) {
c.fetchTimings = append(c.fetchTimings, timings...)
}
// RecordError records a subgraph error event. Main thread only.
func (c *CacheAnalyticsCollector) RecordError(event SubgraphErrorEvent) {
c.errorEvents = append(c.errorEvents, event)
}
// MergeL2Errors merges error events collected in goroutines into the collector.
// Must be called on the main thread.
func (c *CacheAnalyticsCollector) MergeL2Errors(events []SubgraphErrorEvent) {
c.errorEvents = append(c.errorEvents, events...)
}
// RecordShadowComparison records a shadow mode comparison between cached and fresh data.
// Main thread only.
func (c *CacheAnalyticsCollector) RecordShadowComparison(event ShadowComparisonEvent) {
c.shadowComparisons = append(c.shadowComparisons, event)
}
// RecordMutationEvent records a mutation entity impact event. Main thread only.
func (c *CacheAnalyticsCollector) RecordMutationEvent(event MutationEvent) {
c.mutationEvents = append(c.mutationEvents, event)
}
// RecordHeaderImpactEvent records a header impact event. Main thread only.
func (c *CacheAnalyticsCollector) RecordHeaderImpactEvent(event HeaderImpactEvent) {
c.headerImpactEvents = append(c.headerImpactEvents, event)
}
// RecordCacheOperationError records a cache operation error. Main thread only.
func (c *CacheAnalyticsCollector) RecordCacheOperationError(event CacheOperationError) {
c.cacheOpErrors = append(c.cacheOpErrors, event)
}
// MergeL2CacheOpErrors merges cache operation errors collected in goroutines into the collector.
// Must be called on the main thread.
func (c *CacheAnalyticsCollector) MergeL2CacheOpErrors(events []CacheOperationError) {
c.cacheOpErrors = append(c.cacheOpErrors, events...)
}
// EntitySource returns the source for a given entity instance.
// Returns FieldSourceSubgraph if no record is found (the default).
func (c *CacheAnalyticsCollector) EntitySource(entityType, keyJSON string) FieldSource {
for i := len(c.entitySources) - 1; i >= 0; i-- {
if c.entitySources[i].entityType == entityType && c.entitySources[i].keyJSON == keyJSON {
return c.entitySources[i].source
}
}
return FieldSourceSubgraph
}
// Snapshot produces a read-only CacheAnalyticsSnapshot from the collected data.
// Duplicate events (same cache key appearing multiple times due to entity batch positions)
// are consolidated: consumers see one event per unique (CacheKey, Kind) for reads,
// one per CacheKey for writes, and one per CacheKey for shadow comparisons.
func (c *CacheAnalyticsCollector) Snapshot() CacheAnalyticsSnapshot {
snap := CacheAnalyticsSnapshot{
L1Reads: deduplicateKeyEvents(c.l1KeyEvents),
L2Reads: deduplicateKeyEvents(c.l2KeyEvents),
FieldHashes: c.fieldHashes,
FetchTimings: c.fetchTimings,
ErrorEvents: c.errorEvents,
ShadowComparisons: deduplicateShadowComparisons(c.shadowComparisons),
MutationEvents: c.mutationEvents,
HeaderImpactEvents: deduplicateHeaderImpactEvents(c.headerImpactEvents),
CacheOpErrors: c.cacheOpErrors,
}
// Split write events into L1 and L2, then deduplicate each
for _, we := range c.writeEvents {
switch we.CacheLevel {
case CacheLevelL1:
snap.L1Writes = append(snap.L1Writes, we)
case CacheLevelL2:
snap.L2Writes = append(snap.L2Writes, we)
}
}
snap.L1Writes = deduplicateWriteEvents(snap.L1Writes)
snap.L2Writes = deduplicateWriteEvents(snap.L2Writes)
// Build EntityTypes slice from entityCounts
if len(c.entityCounts) > 0 {
snap.EntityTypes = make([]EntityTypeInfo, len(c.entityCounts))
for i, ec := range c.entityCounts {
snap.EntityTypes[i] = EntityTypeInfo{
TypeName: ec.typeName,
Count: ec.count,
UniqueKeys: len(ec.uniqueKeys),
}
}
}
return snap
}
// deduplicateKeyEvents removes duplicate cache key events, keeping the first
// occurrence for each (CacheKey, Kind) pair. This consolidates events where the
// same entity key appears multiple times in a batch (e.g., User 1234 referenced
// by two different reviews).
func deduplicateKeyEvents(events []CacheKeyEvent) []CacheKeyEvent {
if len(events) == 0 {
return events
}
type dedupKey struct {
cacheKey string
kind CacheKeyEventKind
}
seen := make(map[dedupKey]struct{}, len(events))
out := make([]CacheKeyEvent, 0, len(events))
for _, ev := range events {
k := dedupKey{cacheKey: ev.CacheKey, kind: ev.Kind}
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
out = append(out, ev)
}
return out
}
// deduplicateWriteEvents removes duplicate cache write events, keeping the first
// occurrence for each CacheKey. Within a single cache level, the same key written
// multiple times (from batch positions referencing the same entity) is one operation.
func deduplicateWriteEvents(events []CacheWriteEvent) []CacheWriteEvent {
if len(events) == 0 {
return events
}
seen := make(map[string]struct{}, len(events))
out := make([]CacheWriteEvent, 0, len(events))
for _, ev := range events {
if _, ok := seen[ev.CacheKey]; ok {
continue
}
seen[ev.CacheKey] = struct{}{}
out = append(out, ev)
}
return out
}
// deduplicateShadowComparisons removes duplicate shadow comparison events,
// keeping the first occurrence for each CacheKey.
func deduplicateShadowComparisons(events []ShadowComparisonEvent) []ShadowComparisonEvent {
if len(events) == 0 {
return events
}
seen := make(map[string]struct{}, len(events))
out := make([]ShadowComparisonEvent, 0, len(events))
for _, ev := range events {
if _, ok := seen[ev.CacheKey]; ok {
continue
}
seen[ev.CacheKey] = struct{}{}
out = append(out, ev)
}
return out
}
// deduplicateHeaderImpactEvents removes duplicate header impact events,
// keeping the first occurrence for each unique event identity.
func deduplicateHeaderImpactEvents(events []HeaderImpactEvent) []HeaderImpactEvent {
if len(events) == 0 {
return events
}
seen := make(map[HeaderImpactEvent]struct{}, len(events))
out := make([]HeaderImpactEvent, 0, len(events))
for _, ev := range events {
if _, ok := seen[ev]; ok {
continue
}
seen[ev] = struct{}{}
out = append(out, ev)
}
return out
}
// CacheAnalyticsSnapshot is a read-only snapshot of cache analytics data.
// Requires EnableCacheAnalytics to be set; returns empty when disabled.
type CacheAnalyticsSnapshot struct {
// Cache read events (nil when analytics disabled)
L1Reads []CacheKeyEvent
L2Reads []CacheKeyEvent
// Cache write events, split by level
L1Writes []CacheWriteEvent
L2Writes []CacheWriteEvent
// Fetch timing events
FetchTimings []FetchTimingEvent
// Subgraph error events
ErrorEvents []SubgraphErrorEvent
// Field value hashes: flat slice of EntityFieldHash
FieldHashes []EntityFieldHash
// Entity tracking: type + count inline
EntityTypes []EntityTypeInfo
// Shadow mode comparison events
ShadowComparisons []ShadowComparisonEvent
// Mutation entity impact events
MutationEvents []MutationEvent
// Header impact events (L2 writes with header-prefixed keys)
HeaderImpactEvents []HeaderImpactEvent
// Cache operation errors (Get/Set/Delete failures)
CacheOpErrors []CacheOperationError
}
// L1HitRate returns the L1 cache hit rate as a float64 in [0, 1].
// Returns 0 if there are no L1 events.
func (s *CacheAnalyticsSnapshot) L1HitRate() float64 {
var hits, total int64
for _, ev := range s.L1Reads {
total++
if ev.Kind == CacheKeyHit {
hits++
}
}
if total == 0 {
return 0
}
return float64(hits) / float64(total)
}
// L2HitRate returns the L2 cache hit rate as a float64 in [0, 1].
// Returns 0 if there are no L2 events.
func (s *CacheAnalyticsSnapshot) L2HitRate() float64 {
var hits, total int64
for _, ev := range s.L2Reads {
total++
if ev.Kind == CacheKeyHit {
hits++
}
}
if total == 0 {
return 0
}
return float64(hits) / float64(total)
}
// CachedBytesServed returns the total bytes served from cache (L1 + L2 hits).
func (s *CacheAnalyticsSnapshot) CachedBytesServed() int64 {
var total int64
for _, ev := range s.L1Reads {
if ev.Kind == CacheKeyHit {
total += int64(ev.ByteSize)
}
}
for _, ev := range s.L2Reads {
if ev.Kind == CacheKeyHit {
total += int64(ev.ByteSize)
}
}
return total
}
// EntityTypeCacheStats holds per-entity-type cache statistics.
type EntityTypeCacheStats struct {
L1Hits int64
L1Misses int64
L2Hits int64
L2Misses int64
PartialHits int64
BytesServed int64
BytesWritten int64
}
// EventsByEntityType returns cache statistics grouped by entity type.
func (s *CacheAnalyticsSnapshot) EventsByEntityType() map[string]EntityTypeCacheStats {
result := make(map[string]EntityTypeCacheStats)
for _, ev := range s.L1Reads {
stats := result[ev.EntityType]
switch ev.Kind {
case CacheKeyHit:
stats.L1Hits++
stats.BytesServed += int64(ev.ByteSize)
case CacheKeyMiss:
stats.L1Misses++
case CacheKeyPartialHit:
stats.L1Misses++
stats.PartialHits++
}
result[ev.EntityType] = stats
}
for _, ev := range s.L2Reads {
stats := result[ev.EntityType]
switch ev.Kind {
case CacheKeyHit:
stats.L2Hits++
stats.BytesServed += int64(ev.ByteSize)
case CacheKeyMiss:
stats.L2Misses++
case CacheKeyPartialHit:
stats.L2Misses++
stats.PartialHits++
}
result[ev.EntityType] = stats
}
for _, ev := range s.L1Writes {
stats := result[ev.EntityType]
stats.BytesWritten += int64(ev.ByteSize)
result[ev.EntityType] = stats
}
for _, ev := range s.L2Writes {
stats := result[ev.EntityType]
stats.BytesWritten += int64(ev.ByteSize)
result[ev.EntityType] = stats
}
return result
}
// DataSourceCacheStats holds per-data-source cache statistics.
type DataSourceCacheStats struct {
L1Hits int64
L1Misses int64
L2Hits int64
L2Misses int64
BytesServed int64
BytesWritten int64
}
// EventsByDataSource returns cache statistics grouped by data source name.
func (s *CacheAnalyticsSnapshot) EventsByDataSource() map[string]DataSourceCacheStats {
result := make(map[string]DataSourceCacheStats)
for _, ev := range s.L1Reads {
stats := result[ev.DataSource]
switch ev.Kind {
case CacheKeyHit:
stats.L1Hits++
stats.BytesServed += int64(ev.ByteSize)
case CacheKeyMiss, CacheKeyPartialHit:
stats.L1Misses++
}
result[ev.DataSource] = stats
}
for _, ev := range s.L2Reads {
stats := result[ev.DataSource]
switch ev.Kind {
case CacheKeyHit:
stats.L2Hits++
stats.BytesServed += int64(ev.ByteSize)
case CacheKeyMiss, CacheKeyPartialHit:
stats.L2Misses++
}
result[ev.DataSource] = stats
}
for _, ev := range s.L1Writes {
stats := result[ev.DataSource]
stats.BytesWritten += int64(ev.ByteSize)
result[ev.DataSource] = stats
}
for _, ev := range s.L2Writes {
stats := result[ev.DataSource]
stats.BytesWritten += int64(ev.ByteSize)
result[ev.DataSource] = stats
}
return result
}
// SubgraphCallsAvoided returns the number of subgraph fetch operations
// that were avoided due to cache hits (L1 + L2).
func (s *CacheAnalyticsSnapshot) SubgraphCallsAvoided() int64 {
var hits int64
for _, ev := range s.L1Reads {
if ev.Kind == CacheKeyHit {
hits++
}
}
for _, ev := range s.L2Reads {
if ev.Kind == CacheKeyHit {
hits++
}
}
return hits
}
// PartialHitRate returns the fraction of cache lookups that were partial hits.
// Returns 0 if there are no cache events.
func (s *CacheAnalyticsSnapshot) PartialHitRate() float64 {
var partialHits, total int64
for _, ev := range s.L1Reads {
total++
if ev.Kind == CacheKeyPartialHit {
partialHits++
}
}
for _, ev := range s.L2Reads {
total++
if ev.Kind == CacheKeyPartialHit {
partialHits++
}
}
if total == 0 {
return 0
}
return float64(partialHits) / float64(total)
}
// ErrorsByDataSource returns error counts grouped by data source name.
func (s *CacheAnalyticsSnapshot) ErrorsByDataSource() map[string]int {
if len(s.ErrorEvents) == 0 {
return nil
}
result := make(map[string]int, len(s.ErrorEvents))
for _, ev := range s.ErrorEvents {
result[ev.DataSource]++
}
return result
}
// ErrorRate returns the fraction of subgraph fetches that resulted in errors.
// Denominator is total subgraph fetches (FieldSourceSubgraph timings) + errors.
// Returns 0 if there are no fetches or errors.
func (s *CacheAnalyticsSnapshot) ErrorRate() float64 {
errorCount := int64(len(s.ErrorEvents))
if errorCount == 0 {
return 0
}
var subgraphFetches int64
for _, ft := range s.FetchTimings {
if ft.Source == FieldSourceSubgraph {
subgraphFetches++
}
}
total := subgraphFetches + errorCount
if total == 0 {
return 0
}
return float64(errorCount) / float64(total)
}
// AvgFetchDurationMs returns the average fetch duration in milliseconds for the given data source.
// Only considers subgraph fetches (not cache lookups). Returns 0 if no fetches recorded.
func (s *CacheAnalyticsSnapshot) AvgFetchDurationMs(dataSource string) int64 {
var total, count int64
for _, ft := range s.FetchTimings {
if ft.DataSource == dataSource && ft.Source == FieldSourceSubgraph {
total += ft.DurationMs
count++
}
}
if count == 0 {
return 0
}
return total / count
}
// TotalTimeSavedMs estimates total time saved by cache hits in milliseconds.
// For each data source, multiplies the average fetch duration by the number of cache hits.
func (s *CacheAnalyticsSnapshot) TotalTimeSavedMs() int64 {
// Compute average fetch duration per datasource
type dsStats struct {
totalDuration int64
fetchCount int64
hitCount int64
}
dss := make(map[string]*dsStats)
for _, ft := range s.FetchTimings {
ds, ok := dss[ft.DataSource]
if !ok {
ds = &dsStats{}
dss[ft.DataSource] = ds
}
if ft.Source == FieldSourceSubgraph {
ds.totalDuration += ft.DurationMs
ds.fetchCount++
}
}
// Count cache hits per datasource from key events
for _, ev := range s.L1Reads {
if ev.Kind == CacheKeyHit {
ds, ok := dss[ev.DataSource]
if !ok {
ds = &dsStats{}
dss[ev.DataSource] = ds
}
ds.hitCount++
}
}
for _, ev := range s.L2Reads {
if ev.Kind == CacheKeyHit {
ds, ok := dss[ev.DataSource]
if !ok {
ds = &dsStats{}
dss[ev.DataSource] = ds
}
ds.hitCount++
}
}
var totalSaved int64
for _, ds := range dss {
if ds.fetchCount > 0 && ds.hitCount > 0 {
avgDuration := ds.totalDuration / ds.fetchCount
totalSaved += avgDuration * ds.hitCount
}
}
return totalSaved
}
// AvgCacheAgeMs returns the average cache age in milliseconds for L2 hits of the given entity type.
// Only considers L2 hits with known age (CacheAgeMs > 0). Returns 0 if no data available.
// If entityType is empty, returns the average across all entity types.
func (s *CacheAnalyticsSnapshot) AvgCacheAgeMs(entityType string) int64 {
var total, count int64
for _, ev := range s.L2Reads {
if ev.Kind == CacheKeyHit && ev.CacheAgeMs > 0 {
if entityType == "" || ev.EntityType == entityType {
total += ev.CacheAgeMs
count++
}
}
}
if count == 0 {
return 0
}
return total / count
}
// MaxCacheAgeMs returns the maximum cache age in milliseconds across all L2 hits.
// Returns 0 if no L2 hits with known age exist.
func (s *CacheAnalyticsSnapshot) MaxCacheAgeMs() int64 {
var maxAge int64
for _, ev := range s.L2Reads {
if ev.Kind == CacheKeyHit && ev.CacheAgeMs > maxAge {
maxAge = ev.CacheAgeMs
}
}
return maxAge
}
// ShadowFreshnessRate returns the fraction of shadow cache hits where the cached data
// matched the fresh data (ProvidesData fields were identical).
// Returns 0.0 if there are no shadow comparisons.
func (s *CacheAnalyticsSnapshot) ShadowFreshnessRate() float64 {
if len(s.ShadowComparisons) == 0 {
return 0
}
var fresh int64
for _, sc := range s.ShadowComparisons {
if sc.IsFresh {
fresh++
}
}
return float64(fresh) / float64(len(s.ShadowComparisons))
}
// ShadowStaleCount returns the number of shadow comparisons where cached data was stale.
func (s *CacheAnalyticsSnapshot) ShadowStaleCount() int64 {
var count int64
for _, sc := range s.ShadowComparisons {
if !sc.IsFresh {
count++
}
}
return count
}
// ShadowFreshnessRateByEntityType returns per-entity-type freshness rates.
// Returns nil if there are no shadow comparisons.
func (s *CacheAnalyticsSnapshot) ShadowFreshnessRateByEntityType() map[string]float64 {
if len(s.ShadowComparisons) == 0 {
return nil
}
type counts struct {
fresh int64
total int64
}
byType := make(map[string]*counts)
for _, sc := range s.ShadowComparisons {
c, ok := byType[sc.EntityType]
if !ok {
c = &counts{}
byType[sc.EntityType] = c
}
c.total++
if sc.IsFresh {
c.fresh++
}
}
result := make(map[string]float64, len(byType))
for typeName, c := range byType {
result[typeName] = float64(c.fresh) / float64(c.total)
}
return result
}
// computeCacheAgeMs computes cache age in milliseconds from remaining TTL and original TTL.
// Returns 0 if either value is zero or if the computed age would be negative.
func computeCacheAgeMs(remainingTTL, originalTTL time.Duration) int64 {
if remainingTTL <= 0 || originalTTL <= 0 {
return 0
}
age := originalTTL - remainingTTL
if age <= 0 {
return 0
}
return age.Milliseconds()
}
// truncateErrorMessage truncates an error message to maxLen bytes for analytics safety.
func truncateErrorMessage(msg string, maxLen int) string {
if len(msg) <= maxLen {
return msg
}
return msg[:maxLen]
}
// buildEntityKeyJSON builds a compact JSON key from an entity's key field values.
// For @key(fields: "id") and value={"id":"1234","name":"Alice"}:
//
// returns {"id":"1234"}
//
// For @key(fields: "id address { city }") and value={"id":"1234","address":{"city":"NYC","street":"Main"}}:
//
// returns {"id":"1234","address":{"city":"NYC"}} (only key fields, not street)
func buildEntityKeyJSON(value *astjson.Value, keyFields []KeyField) []byte {
if len(keyFields) == 0 {
return nil
}
buf := make([]byte, 0, 64)
buf = appendKeyFieldsJSON(buf, value, keyFields)
return buf
}
func appendKeyFieldsJSON(buf []byte, value *astjson.Value, keyFields []KeyField) []byte {
buf = append(buf, '{')
first := true
for _, kf := range keyFields {
fieldValue := value.Get(kf.Name)
if fieldValue == nil {
continue
}
if !first {
buf = append(buf, ',')
}
first = false
buf = append(buf, '"')
buf = append(buf, kf.Name...)
buf = append(buf, '"', ':')
if len(kf.Children) > 0 {
// Nested key: recursively extract only key fields
buf = appendKeyFieldsJSON(buf, fieldValue, kf.Children)
} else {
// Scalar key: marshal the value directly
buf = fieldValue.MarshalTo(buf)
}
}
buf = append(buf, '}')
return buf
}
// walkCachedResponseForSources walks a cached JSON value to find entity instances
// and accumulates their source records on a per-result slice (goroutine-safe).
func walkCachedResponseForSources(value *astjson.Value, keyFields []KeyField, entityType string, source FieldSource, out *[]entitySourceRecord) {
if value == nil {
return
}
switch value.Type() {
case astjson.TypeArray:
for _, item := range value.GetArray() {
walkCachedResponseForSources(item, keyFields, entityType, source, out)
}
case astjson.TypeObject:
keyJSON := buildEntityKeyJSON(value, keyFields)
if len(keyJSON) > 0 {
*out = append(*out, entitySourceRecord{
entityType: entityType,
keyJSON: string(keyJSON),
source: source,
})
}
}
}
// ParseKeyFields parses a selection set string into a structured KeyField tree.
// "id" → [{Name:"id"}]
// "id address { city country }" → [{Name:"id"}, {Name:"address", Children:[{Name:"city"}, {Name:"country"}]}]
func ParseKeyFields(selectionSet string) []KeyField {
words := strings.Fields(selectionSet)
fields, _ := parseKeyFieldsFromTokens(words, 0)
return fields
}
func parseKeyFieldsFromTokens(tokens []string, pos int) ([]KeyField, int) {
var fields []KeyField
for pos < len(tokens) {
token := tokens[pos]
if token == "}" {