diff --git a/CHANGELOG.md b/CHANGELOG.md index 0848502ea00..708cbab8521 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [CHANGE] Query-frontend: Removed support for calculating 'cache-adjusted samples processed' query statistic. The `-query-frontend.cache-samples-processed-stats` CLI flag has been deprecated and will be removed in a future release. Setting it has now no effect. #13582 * [CHANGE] Querier: Renamed experimental flag `-querier.prefer-availability-zone` to `-querier.prefer-availability-zones` and changed it to accept a comma-separated list of availability zones. All zones in the list are given equal priority when querying ingesters and store-gateways. #13756 #13758 * [CHANGE] Ingester: Stabilize experimental flag `-ingest-storage.write-logs-fsync-before-kafka-commit-concurrency` to fsync write logs before the offset is committed to Kafka. Remove `-ingest-storage.write-logs-fsync-before-kafka-commit-enabled` since this is always enabled now. #13591 +* [CHANGE] Query-frontend: Enforce a limit on the size of responses returned by query-frontends. Defaults to 128MB and can be configured with `-query-frontend.max-response-size-bytes`. #13829 * [FEATURE] Distributor: add `-distributor.otel-label-name-underscore-sanitization` and `-distributor.otel-label-name-preserve-underscores` that control sanitization of underscores during OTLP translation. #13133 * [FEATURE] Query-frontends: Automatically adjust features used in query plans generated for remote execution based on what the available queriers support. #13017 #13164 #13544 * [FEATURE] Memberlist: Add experimental support for zone-aware routing in order to reduce memberlist cross-AZ data transfer. #13129 #13651 #13664 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 5c14e123a89..1c139eba96f 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -8199,6 +8199,17 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "max_response_size_bytes", + "required": false, + "desc": "Maximum allowed response size for query responses, in bytes.", + "fieldValue": null, + "fieldDefaultValue": 134217728, + "fieldFlag": "query-frontend.max-response-size-bytes", + "fieldType": "int", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "extra_propagated_headers", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9a66e0515cb..b4b57cd9b39 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2427,6 +2427,8 @@ Usage of ./cmd/mimir/mimir: Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL. -query-frontend.max-query-expression-size-bytes int Max size of the raw query, in bytes. This limit is enforced by the query-frontend for instant, range and remote read queries. 0 to not apply a limit to the size of the query. + -query-frontend.max-response-size-bytes uint + [experimental] Maximum allowed response size for query responses, in bytes. (default 134217728) -query-frontend.max-retries-per-request int Maximum number of retries for a single request; beyond this, the downstream error is returned. (default 5) -query-frontend.max-total-query-length duration diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index dc5908ebc21..2b151dc838e 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -214,6 +214,7 @@ The following features are currently experimental: - Rewriting of queries to optimize processing: `-query-frontend.rewrite-histogram-queries` and `-query-frontend.rewrite-propagate-matchers` - Enable experimental Prometheus extended range selector modifiers `smoothed` and `anchored` (`-query-frontend.enabled-promql-extended-range-selectors=smoothed,anchored`) - Experimental PromQL functions and aggregations, including `mad_over_time`, `ts_of_min_over_time`, `ts_of_max_over_time`, `ts_of_first_over_time`, `ts_of_last_over_time`, `sort_by_label`, `sort_by_label_desc`, `limitk` and `limit_ratio` (`-query-frontend.enabled-promql-experimental-functions=...`) + - Limiting response size for queries (`-query-frontend.max-response-size-bytes`) - Query-scheduler - `-query-scheduler.querier-forget-delay` diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index d635d345281..3b902e993f1 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1996,6 +1996,10 @@ results_cache: # CLI flag: -query-frontend.use-active-series-decoder [use_active_series_decoder: | default = false] +# (experimental) Maximum allowed response size for query responses, in bytes. +# CLI flag: -query-frontend.max-response-size-bytes +[max_response_size_bytes: | default = 134217728] + # (advanced) Comma-separated list of request header names to allow to pass # through to the rest of the query path. This is in addition to a list of # required headers that the read path needs. diff --git a/go.mod b/go.mod index 34b9da23796..519f2a7ea51 100644 --- a/go.mod +++ b/go.mod @@ -387,3 +387,5 @@ replace github.com/prometheus/otlptranslator => github.com/grafana/mimir-otlptra // Use a fork of client_golang with changes from: // - https://github.com/prometheus/client_golang/pull/1925 replace github.com/prometheus/client_golang => github.com/colega/prometheus-client_golang v1.19.1-0.20251204143415-11cda2079634 + +replace github.com/json-iterator/go => github.com/charleskorn/json-iterator-go v0.0.0-20251215054129-8df468a86247 diff --git a/go.sum b/go.sum index 62a74db5bc7..de1ccd97d59 100644 --- a/go.sum +++ b/go.sum @@ -213,6 +213,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f h1:P1GSPnbxmhUafKGBcaY2qqx34mBdC4GVDm/RN3iKKuE= github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= +github.com/charleskorn/json-iterator-go v0.0.0-20251215054129-8df468a86247 h1:m/mfd1NT13+CjBba/8U0Lm2X8kk/p+1wK/Bv2iPf7yE= +github.com/charleskorn/json-iterator-go v0.0.0-20251215054129-8df468a86247/go.mod h1:TBzl5BIHNXfS9+C35ZyJaklL7mLDbgUkcgXzSLa8Tk0= github.com/chromedp/cdproto v0.0.0-20210526005521-9e51b9051fd0/go.mod h1:At5TxYYdxkbQL0TSefRjhLE3Q0lgvqKKMSFUglJ7i1U= github.com/chromedp/cdproto v0.0.0-20210706234513-2bc298e8be7f/go.mod h1:At5TxYYdxkbQL0TSefRjhLE3Q0lgvqKKMSFUglJ7i1U= github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89 h1:aPflPkRFkVwbW6dmcVqfgwp1i+UWGFH6VgR1Jim5Ygc= @@ -723,9 +725,6 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -833,7 +832,6 @@ github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6U github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 2b9c84f49ef..a464e2e4351 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -557,6 +557,7 @@ "query-frontend.max-retries-per-request": 5, "query-frontend.not-running-timeout": 2000000000, "query-frontend.query-sharding-target-series-per-shard": 0, + "query-frontend.max-response-size-bytes": 134217728, "query-frontend.extra-propagated-headers": "", "query-frontend.query-result-response-format": "protobuf", "query-frontend.client-cluster-validation.label": "", diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 196f481912a..5516e6f45de 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -43,6 +43,7 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/chunkinfologger" + "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/propagation" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -54,6 +55,8 @@ var ( errRequestNoQuery = apierror.New(apierror.TypeBadData, "the request has no query") allFormats = []string{formatJSON, formatProtobuf} + responseSizeTooLargeErrorFormat = globalerror.MaxResponseSizeBytes.MessageWithPerInstanceLimitConfig("the query response exceeded the maximum allowed size (limit: %d bytes)", maxResponseSizeBytesFlag) + // List of HTTP headers to propagate when a Prometheus request is encoded into a HTTP request. // Read consistency level and max delay headers are propagated as HTTP header -> Request.Context -> Request.Header, so there's no need to explicitly propagate it here. codecPropagateHeadersMetrics = []string{compat.ForceFallbackHeaderName, chunkinfologger.ChunkInfoLoggingHeader, api.ReadConsistencyOffsetsHeader, querier.FilterQueryablesHeader} @@ -226,6 +229,7 @@ type Codec struct { lookbackDelta time.Duration preferredQueryResultResponseFormat string propagateHeadersMetrics, propagateHeadersLabels []string + formats []formatter injector propagation.Injector } @@ -240,21 +244,21 @@ type formatter interface { ContentType() v1.MIMEType } -var jsonFormatterInstance = jsonFormatter{} - -var knownFormats = []formatter{ - jsonFormatterInstance, - ProtobufFormatter{}, -} - func NewCodec( registerer prometheus.Registerer, lookbackDelta time.Duration, queryResultResponseFormat string, propagateHeaders []string, injector propagation.Injector, + maxResponseSizeBytes uint64, ) Codec { return Codec{ + formats: []formatter{ + newJSONFormatter(maxResponseSizeBytes), + ProtobufFormatter{ + maxEncodedSize: maxResponseSizeBytes, + }, + }, metrics: newCodecMetrics(registerer), lookbackDelta: lookbackDelta, preferredQueryResultResponseFormat: queryResultResponseFormat, @@ -929,7 +933,7 @@ func (c Codec) DecodeMetricsQueryResponse(ctx context.Context, r *http.Response, } } - formatter := findFormatter(contentType) + formatter := c.findFormatter(contentType) if formatter == nil { return nil, apierror.Newf(apierror.TypeInternal, "unknown response content type '%v'", contentType) } @@ -990,7 +994,7 @@ func (c Codec) DecodeLabelsSeriesQueryResponse(ctx context.Context, r *http.Resp } } - formatter := findFormatter(contentType) + formatter := c.findFormatter(contentType) if formatter == nil { return nil, apierror.Newf(apierror.TypeInternal, "unknown response content type '%v'", contentType) } @@ -1042,8 +1046,8 @@ func (c Codec) DecodeLabelsSeriesQueryResponse(ctx context.Context, r *http.Resp return response, nil } -func findFormatter(contentType string) formatter { - for _, f := range knownFormats { +func (c Codec) findFormatter(contentType string) formatter { + for _, f := range c.formats { if f.ContentType().String() == contentType { return f } @@ -1073,7 +1077,8 @@ func (c Codec) EncodeMetricsQueryResponse(ctx context.Context, req *http.Request start := time.Now() b, err := formatter.EncodeQueryResponse(a) if err != nil { - return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err) + typ := apierror.TypeForError(err, apierror.TypeInternal) + return nil, apierror.Newf(typ, "error encoding response: %v", err) } encodeDuration := time.Since(start) @@ -1138,7 +1143,8 @@ func (c Codec) EncodeLabelsSeriesQueryResponse(ctx context.Context, req *http.Re var err error b, err = formatter.EncodeLabelsResponse(a) if err != nil { - return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err) + typ := apierror.TypeForError(err, apierror.TypeInternal) + return nil, apierror.Newf(typ, "error encoding response: %v", err) } case true: a, ok := res.(*PrometheusSeriesResponse) @@ -1153,7 +1159,8 @@ func (c Codec) EncodeLabelsSeriesQueryResponse(ctx context.Context, req *http.Re var err error b, err = formatter.EncodeSeriesResponse(a) if err != nil { - return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err) + typ := apierror.TypeForError(err, apierror.TypeInternal) + return nil, apierror.Newf(typ, "error encoding response: %v", err) } } @@ -1172,13 +1179,13 @@ func (c Codec) EncodeLabelsSeriesQueryResponse(ctx context.Context, req *http.Re return &resp, nil } -func (Codec) negotiateContentType(acceptHeader string) (string, formatter) { +func (c Codec) negotiateContentType(acceptHeader string) (string, formatter) { if acceptHeader == "" { - return jsonMimeType, jsonFormatterInstance + return c.formats[0].ContentType().String(), c.formats[0] } for _, clause := range goautoneg.ParseAccept(acceptHeader) { - for _, formatter := range knownFormats { + for _, formatter := range c.formats { if formatter.ContentType().Satisfies(clause) { return formatter.ContentType().String(), formatter } diff --git a/pkg/frontend/querymiddleware/codec_json.go b/pkg/frontend/querymiddleware/codec_json.go index 4a2c38ae739..c995a01cd79 100644 --- a/pkg/frontend/querymiddleware/codec_json.go +++ b/pkg/frontend/querymiddleware/codec_json.go @@ -6,15 +6,33 @@ package querymiddleware import ( + "errors" + + jsoniter "github.com/json-iterator/go" v1 "github.com/prometheus/prometheus/web/api/v1" + + apierror "github.com/grafana/mimir/pkg/api/error" ) const jsonMimeType = "application/json" -type jsonFormatter struct{} +type jsonFormatter struct { + encoder jsoniter.API +} + +func newJSONFormatter(maxEncodedSize uint64) jsonFormatter { + cfg := jsoniter.Config{ + EscapeHTML: false, // No HTML in our responses. + SortMapKeys: true, + ValidateJsonRawMessage: true, + MaxMarshalledBytes: maxEncodedSize, + } + + return jsonFormatter{encoder: cfg.Froze()} +} func (j jsonFormatter) EncodeQueryResponse(resp *PrometheusResponse) ([]byte, error) { - return json.Marshal(resp) + return j.marshal(resp) } func (j jsonFormatter) DecodeQueryResponse(buf []byte) (*PrometheusResponse, error) { @@ -28,7 +46,7 @@ func (j jsonFormatter) DecodeQueryResponse(buf []byte) (*PrometheusResponse, err } func (j jsonFormatter) EncodeLabelsResponse(resp *PrometheusLabelsResponse) ([]byte, error) { - return json.Marshal(resp) + return j.marshal(resp) } func (j jsonFormatter) DecodeLabelsResponse(buf []byte) (*PrometheusLabelsResponse, error) { @@ -42,7 +60,7 @@ func (j jsonFormatter) DecodeLabelsResponse(buf []byte) (*PrometheusLabelsRespon } func (j jsonFormatter) EncodeSeriesResponse(resp *PrometheusSeriesResponse) ([]byte, error) { - return json.Marshal(resp) + return j.marshal(resp) } func (j jsonFormatter) DecodeSeriesResponse(buf []byte) (*PrometheusSeriesResponse, error) { @@ -62,3 +80,19 @@ func (j jsonFormatter) Name() string { func (j jsonFormatter) ContentType() v1.MIMEType { return v1.MIMEType{Type: "application", SubType: "json"} } + +func (j jsonFormatter) marshal(v interface{}) ([]byte, error) { + b, err := j.encoder.Marshal(v) + if err != nil { + var limitErr jsoniter.ExceededMaxMarshalledBytesError + + if errors.As(err, &limitErr) { + return nil, apierror.Newf(apierror.TypeTooLargeEntry, "JSON response is too large: "+responseSizeTooLargeErrorFormat, limitErr.MaxMarshalledBytes) + } + + return nil, err + } + + return b, nil + +} diff --git a/pkg/frontend/querymiddleware/codec_json_test.go b/pkg/frontend/querymiddleware/codec_json_test.go index 3c70664a756..8d618336f73 100644 --- a/pkg/frontend/querymiddleware/codec_json_test.go +++ b/pkg/frontend/querymiddleware/codec_json_test.go @@ -10,6 +10,7 @@ import ( "bytes" "context" "io" + "math" "net/http" "reflect" "testing" @@ -167,7 +168,7 @@ func TestCodec_JSONResponse_Metrics(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewCodec(reg, 0*time.Minute, formatJSON, nil, &propagation.NoopInjector{}) + codec := NewCodec(reg, 0*time.Minute, formatJSON, nil, &propagation.NoopInjector{}, math.MaxUint64) body, err := json.Marshal(tc.resp) require.NoError(t, err) @@ -479,7 +480,7 @@ func TestCodec_JSONEncoding_Metrics(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewCodec(reg, 0*time.Minute, formatJSON, nil, &propagation.NoopInjector{}) + codec := NewCodec(reg, 0*time.Minute, formatJSON, nil, &propagation.NoopInjector{}, math.MaxUint64) httpRequest := &http.Request{ Header: http.Header{"Accept": []string{jsonMimeType}}, } @@ -508,7 +509,69 @@ func TestCodec_JSONEncoding_Metrics(t *testing.T) { } } -func TestCodec_JSONEncoding_Labels(t *testing.T) { +func TestCodec_JSONEncoding_Metrics_SizeLimit(t *testing.T) { + resp := &PrometheusResponse{ + Status: statusSuccess, + Data: &PrometheusData{ + ResultType: model.ValString.String(), + Result: []SampleStream{ + { + Labels: []mimirpb.LabelAdapter{{Name: "value", Value: "foo"}}, + Samples: []mimirpb.Sample{{TimestampMs: 1_500}}, + }, + }, + }, + Headers: expectedProtobufResponseHeaders, + } + + expectedPayload, err := json.Marshal(resp) + require.NoError(t, err) + + encodeWithLimit := func(t *testing.T, limit uint64) ([]byte, error) { + reg := prometheus.NewPedanticRegistry() + codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}, limit) + + httpRequest := &http.Request{ + Header: http.Header{"Accept": []string{jsonMimeType}}, + } + + httpResponse, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, resp) + if err != nil { + return nil, err + } + + require.Equal(t, http.StatusOK, httpResponse.StatusCode) + require.Equal(t, jsonMimeType, httpResponse.Header.Get("Content-Type")) + + body, err := io.ReadAll(httpResponse.Body) + require.NoError(t, err) + + return body, nil + } + + t.Run("payload too large for limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) - 1) + body, err := encodeWithLimit(t, limit) + require.Equal(t, apierror.Newf(apierror.TypeTooLargeEntry, "error encoding response: JSON response is too large: the query response exceeded the maximum allowed size (limit: %d bytes) (err-mimir-max-response-size-bytes). To adjust the related limit, configure -query-frontend.max-response-size-bytes, or contact your service administrator.", limit), err) + require.Nil(t, body) + }) + + t.Run("payload exactly same size as limit", func(t *testing.T) { + limit := uint64(len(expectedPayload)) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) + + t.Run("payload smaller than limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) + 1) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) +} + +func TestCodec_JSONEncoding_LabelsAndSeries(t *testing.T) { for _, tc := range []struct { name string expectedJSON string @@ -580,3 +643,121 @@ func TestCodec_JSONEncoding_Labels(t *testing.T) { }) } } + +func TestCodec_JSONEncoding_Labels_SizeLimit(t *testing.T) { + resp := &PrometheusLabelsResponse{ + Status: statusSuccess, + Data: []string{ + "foo", + "bar", + }, + } + + expectedPayload, err := json.Marshal(resp) + require.NoError(t, err) + + encodeWithLimit := func(t *testing.T, limit uint64) ([]byte, error) { + reg := prometheus.NewPedanticRegistry() + codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}, limit) + + httpRequest := &http.Request{ + Header: http.Header{"Accept": []string{jsonMimeType}}, + } + + httpResponse, err := codec.EncodeLabelsSeriesQueryResponse(context.Background(), httpRequest, resp, false) + if err != nil { + return nil, err + } + + require.Equal(t, http.StatusOK, httpResponse.StatusCode) + require.Equal(t, jsonMimeType, httpResponse.Header.Get("Content-Type")) + + body, err := io.ReadAll(httpResponse.Body) + require.NoError(t, err) + + return body, nil + } + + t.Run("payload too large for limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) - 1) + body, err := encodeWithLimit(t, limit) + require.Equal(t, apierror.Newf(apierror.TypeTooLargeEntry, "error encoding response: JSON response is too large: the query response exceeded the maximum allowed size (limit: %d bytes) (err-mimir-max-response-size-bytes). To adjust the related limit, configure -query-frontend.max-response-size-bytes, or contact your service administrator.", limit), err) + require.Nil(t, body) + }) + + t.Run("payload exactly same size as limit", func(t *testing.T) { + limit := uint64(len(expectedPayload)) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) + + t.Run("payload smaller than limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) + 1) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) +} + +func TestCodec_JSONEncoding_Series_SizeLimit(t *testing.T) { + resp := &PrometheusSeriesResponse{ + Status: statusSuccess, + Data: []SeriesData{ + { + "__name__": "series_1", + "foo": "bar", + }, + { + "__name__": "hist_series_1", + "hoo": "hbar", + }, + }, + } + + expectedPayload, err := json.Marshal(resp) + require.NoError(t, err) + + encodeWithLimit := func(t *testing.T, limit uint64) ([]byte, error) { + reg := prometheus.NewPedanticRegistry() + codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}, limit) + + httpRequest := &http.Request{ + Header: http.Header{"Accept": []string{jsonMimeType}}, + } + + httpResponse, err := codec.EncodeLabelsSeriesQueryResponse(context.Background(), httpRequest, resp, true) + if err != nil { + return nil, err + } + + require.Equal(t, http.StatusOK, httpResponse.StatusCode) + require.Equal(t, jsonMimeType, httpResponse.Header.Get("Content-Type")) + + body, err := io.ReadAll(httpResponse.Body) + require.NoError(t, err) + + return body, nil + } + + t.Run("payload too large for limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) - 1) + body, err := encodeWithLimit(t, limit) + require.Equal(t, apierror.Newf(apierror.TypeTooLargeEntry, "error encoding response: JSON response is too large: the query response exceeded the maximum allowed size (limit: %d bytes) (err-mimir-max-response-size-bytes). To adjust the related limit, configure -query-frontend.max-response-size-bytes, or contact your service administrator.", limit), err) + require.Nil(t, body) + }) + + t.Run("payload exactly same size as limit", func(t *testing.T) { + limit := uint64(len(expectedPayload)) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) + + t.Run("payload smaller than limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) + 1) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) +} diff --git a/pkg/frontend/querymiddleware/codec_protobuf.go b/pkg/frontend/querymiddleware/codec_protobuf.go index edb42e831a9..b3cd4be04ca 100644 --- a/pkg/frontend/querymiddleware/codec_protobuf.go +++ b/pkg/frontend/querymiddleware/codec_protobuf.go @@ -9,10 +9,13 @@ import ( "github.com/prometheus/common/model" v1 "github.com/prometheus/prometheus/web/api/v1" + apierror "github.com/grafana/mimir/pkg/api/error" "github.com/grafana/mimir/pkg/mimirpb" ) -type ProtobufFormatter struct{} +type ProtobufFormatter struct { + maxEncodedSize uint64 +} func (f ProtobufFormatter) Name() string { return formatProtobuf @@ -76,7 +79,20 @@ func (f ProtobufFormatter) EncodeQueryResponse(resp *PrometheusResponse) ([]byte } } - return payload.Marshal() + size := payload.Size() + if uint64(size) > f.maxEncodedSize { + return nil, apierror.Newf(apierror.TypeTooLargeEntry, "Protobuf response (%d bytes) is too large: "+responseSizeTooLargeErrorFormat, size, f.maxEncodedSize) + } + + // The code below is based on the autogenerated Marshal(), but takes advantage of the fact + // we've already called Size() above. + data := make([]byte, size) + n, err := payload.MarshalToSizedBuffer(data[:size]) + if err != nil { + return nil, err + } + + return data[:n], nil } func (ProtobufFormatter) encodeStringData(data []SampleStream) (mimirpb.StringData, error) { diff --git a/pkg/frontend/querymiddleware/codec_protobuf_test.go b/pkg/frontend/querymiddleware/codec_protobuf_test.go index 4357abc4696..db19ec48310 100644 --- a/pkg/frontend/querymiddleware/codec_protobuf_test.go +++ b/pkg/frontend/querymiddleware/codec_protobuf_test.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "io" + "math" "net/http" "testing" "time" @@ -634,7 +635,7 @@ func TestProtobufFormat_DecodeResponse(t *testing.T) { for _, tc := range protobufCodecScenarios { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}) + codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}, math.MaxUint64) body, err := tc.payload.Marshal() require.NoError(t, err) @@ -675,7 +676,7 @@ func TestProtobufFormat_EncodeResponse(t *testing.T) { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}) + codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}, math.MaxUint64) expectedBodyBytes, err := tc.payload.Marshal() require.NoError(t, err) @@ -712,6 +713,62 @@ func TestProtobufFormat_EncodeResponse(t *testing.T) { } } +func TestProtobufFormat_EncodeResponse_SizeLimit(t *testing.T) { + for _, tc := range protobufCodecScenarios { + if tc.response == nil { + continue + } + + t.Run(tc.name, func(t *testing.T) { + encodeWithLimit := func(t *testing.T, limit uint64) ([]byte, error) { + reg := prometheus.NewPedanticRegistry() + codec := NewCodec(reg, 0*time.Minute, formatProtobuf, nil, &propagation.NoopInjector{}, limit) + + httpRequest := &http.Request{ + Header: http.Header{"Accept": []string{mimirpb.QueryResponseMimeType}}, + } + + httpResponse, err := codec.EncodeMetricsQueryResponse(context.Background(), httpRequest, tc.response) + if err != nil { + return nil, err + } + + require.Equal(t, http.StatusOK, httpResponse.StatusCode) + require.Equal(t, mimirpb.QueryResponseMimeType, httpResponse.Header.Get("Content-Type")) + + body, err := io.ReadAll(httpResponse.Body) + require.NoError(t, err) + + return body, nil + } + + expectedPayload, err := tc.payload.Marshal() + require.NoError(t, err) + + t.Run("payload too large for limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) - 1) + body, err := encodeWithLimit(t, limit) + require.Equal(t, apierror.Newf(apierror.TypeTooLargeEntry, "error encoding response: Protobuf response (%d bytes) is too large: the query response exceeded the maximum allowed size (limit: %d bytes) (err-mimir-max-response-size-bytes). To adjust the related limit, configure -query-frontend.max-response-size-bytes, or contact your service administrator.", len(expectedPayload), limit), err) + require.Nil(t, body) + }) + + t.Run("payload exactly same size as limit", func(t *testing.T) { + limit := uint64(len(expectedPayload)) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) + + t.Run("payload smaller than limit", func(t *testing.T) { + limit := uint64(len(expectedPayload) + 1) + body, err := encodeWithLimit(t, limit) + require.NoError(t, err) + require.Equal(t, expectedPayload, body) + }) + }) + } +} + func BenchmarkProtobufFormat_DecodeResponse(b *testing.B) { headers := http.Header{"Content-Type": []string{mimirpb.QueryResponseMimeType}} codec := newTestCodecWithFormat(formatProtobuf) diff --git a/pkg/frontend/querymiddleware/codec_test.go b/pkg/frontend/querymiddleware/codec_test.go index 35014e86b25..3ec07d5a35c 100644 --- a/pkg/frontend/querymiddleware/codec_test.go +++ b/pkg/frontend/querymiddleware/codec_test.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "io" + "math" "math/rand" "net/http" "net/url" @@ -877,10 +878,10 @@ func TestCodec_EncodeResponse_ContentNegotiation(t *testing.T) { Error: "something went wrong", } - jsonBody, err := jsonFormatter{}.EncodeQueryResponse(testResponse) + jsonBody, err := newJSONFormatter(math.MaxUint64).EncodeQueryResponse(testResponse) require.NoError(t, err) - protobufBody, err := ProtobufFormatter{}.EncodeQueryResponse(testResponse) + protobufBody, err := ProtobufFormatter{maxEncodedSize: math.MaxUint64}.EncodeQueryResponse(testResponse) require.NoError(t, err) scenarios := map[string]struct { @@ -2131,7 +2132,7 @@ func newTestCodecWithHeaders(propagateHeaders []string) Codec { } func newTestCodecWithFormatAndHeaders(format string, propagateHeaders []string) Codec { - return NewCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, format, propagateHeaders, &api.ConsistencyInjector{}) + return NewCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, format, propagateHeaders, &api.ConsistencyInjector{}, math.MaxUint64) } func mustSucceed[T any](value T, err error) T { diff --git a/pkg/frontend/querymiddleware/model_extra_test.go b/pkg/frontend/querymiddleware/model_extra_test.go index 258665e9f68..bba1b5635a9 100644 --- a/pkg/frontend/querymiddleware/model_extra_test.go +++ b/pkg/frontend/querymiddleware/model_extra_test.go @@ -5,6 +5,7 @@ package querymiddleware import ( "context" "io" + "math" "net/http" "net/url" "slices" @@ -120,7 +121,7 @@ func TestMetricQueryRequestCloneHeaders(t *testing.T) { httpReq.Header.Set("Content-Type", "application/x-www-form-urlencoded") httpReq.Header.Set("X-Test-Header", "test-value") - c := NewCodec(prometheus.NewPedanticRegistry(), time.Minute*5, "json", nil, &propagation.NoopInjector{}) + c := NewCodec(prometheus.NewPedanticRegistry(), time.Minute*5, "json", nil, &propagation.NoopInjector{}, math.MaxUint64) originalReq, err := c.DecodeMetricsQueryRequest(context.Background(), httpReq) require.NoError(t, err) diff --git a/pkg/frontend/querymiddleware/roundtrip.go b/pkg/frontend/querymiddleware/roundtrip.go index e748f31b1e5..48112a42082 100644 --- a/pkg/frontend/querymiddleware/roundtrip.go +++ b/pkg/frontend/querymiddleware/roundtrip.go @@ -51,6 +51,8 @@ const ( queryTypeActiveSeries = "active_series" queryTypeActiveNativeHistogramMetrics = "active_native_histogram_metrics" queryTypeOther = "other" + + maxResponseSizeBytesFlag = "query-frontend.max-response-size-bytes" ) var ( @@ -75,6 +77,7 @@ type Config struct { TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"` ShardActiveSeriesQueries bool `yaml:"shard_active_series_queries" category:"experimental"` UseActiveSeriesDecoder bool `yaml:"use_active_series_decoder" category:"experimental"` + MaxResponseSizeBytes uint64 `yaml:"max_response_size_bytes" category:"experimental"` // CacheKeyGenerator allows to inject a CacheKeyGenerator to use for generating cache keys. // If nil, the querymiddleware package uses a DefaultCacheKeyGenerator with SplitQueriesByInterval. @@ -117,6 +120,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ShardActiveSeriesQueries, "query-frontend.shard-active-series-queries", false, "True to enable sharding of active series queries.") f.BoolVar(&cfg.UseActiveSeriesDecoder, "query-frontend.use-active-series-decoder", false, "Set to true to use the zero-allocation response decoder for active series queries.") f.BoolVar(&cfg.CacheSamplesProcessedStats, "query-frontend.cache-samples-processed-stats", false, "Cache statistics of processed samples on results cache. Deprecated: has no effect.") + f.Uint64Var(&cfg.MaxResponseSizeBytes, maxResponseSizeBytesFlag, 128*1024*1024, "Maximum allowed response size for query responses, in bytes.") cfg.ResultsCache.RegisterFlags(f) // This field isn't user-configurable, but we still need to set a default value so that subsequent Add() calls don't panic due to a nil map. diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index e9316ae93b6..8cb5b25394f 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -2091,7 +2091,7 @@ func TestQueryDecoding(t *testing.T) { } func newTestCodec() querymiddleware.Codec { - return querymiddleware.NewCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil, &api.ConsistencyInjector{}) + return querymiddleware.NewCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, "json", nil, &api.ConsistencyInjector{}, math.MaxUint64) } func BenchmarkProtobufResponseStreamShouldAbortReading(b *testing.B) { diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 0093b2f32eb..c8757fba362 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -829,6 +829,7 @@ func (t *Mimir) initQueryFrontendCodec() (services.Service, error) { t.Cfg.Frontend.QueryMiddleware.QueryResultResponseFormat, t.Cfg.Frontend.QueryMiddleware.ExtraPropagateHeaders, &propagation.MultiInjector{Injectors: t.Injectors}, + t.Cfg.Frontend.QueryMiddleware.MaxResponseSizeBytes, ) return nil, nil diff --git a/pkg/util/globalerror/user.go b/pkg/util/globalerror/user.go index 2db654ab9c9..1b6c21809e4 100644 --- a/pkg/util/globalerror/user.go +++ b/pkg/util/globalerror/user.go @@ -38,6 +38,7 @@ const ( MaxChunkBytesPerQuery ID = "max-chunks-bytes-per-query" MaxEstimatedChunksPerQuery ID = "max-estimated-chunks-per-query" MaxEstimatedMemoryConsumptionPerQuery ID = "max-estimated-memory-consumption-per-query" + MaxResponseSizeBytes ID = "max-response-size-bytes" DistributorMaxIngestionRate ID = "distributor-max-ingestion-rate" DistributorMaxInflightPushRequests ID = "distributor-max-inflight-push-requests" diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index 1a3ff723c45..285368b5bb5 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -9,6 +9,7 @@ import ( "encoding/json" "flag" "fmt" + "math" "net/http" "net/http/httputil" "net/url" @@ -390,7 +391,7 @@ func (p *Proxy) Start() error { // Since we are only doing query requests decoding, we only care about the lookback delta for the Codec instance. // The other config parameters are not relevant. - codec := querymiddleware.NewCodec(p.registerer, p.cfg.BackendsLookbackDelta, "json", nil, &propagation.NoopInjector{}) + codec := querymiddleware.NewCodec(p.registerer, p.cfg.BackendsLookbackDelta, "json", nil, &propagation.NoopInjector{}, math.MaxUint64) // register routes for _, route := range p.routes { diff --git a/tools/querytee/proxy_test.go b/tools/querytee/proxy_test.go index 522331a6d6d..626451ebbe1 100644 --- a/tools/querytee/proxy_test.go +++ b/tools/querytee/proxy_test.go @@ -10,6 +10,7 @@ import ( "flag" "fmt" "io" + "math" "net/http" "net/http/httptest" "net/url" @@ -1238,5 +1239,5 @@ type backendTestConfig struct { // newTestQueryDecoder creates a decoder instance for use in tests func newTestQueryDecoder() QueryRequestDecoder { - return querymiddleware.NewCodec(nil, 5*time.Minute, "json", nil, &propagation.NoopInjector{}) + return querymiddleware.NewCodec(nil, 5*time.Minute, "json", nil, &propagation.NoopInjector{}, math.MaxUint64) } diff --git a/vendor/github.com/json-iterator/go/config.go b/vendor/github.com/json-iterator/go/config.go index 2adcdc3b790..1f2b43a3952 100644 --- a/vendor/github.com/json-iterator/go/config.go +++ b/vendor/github.com/json-iterator/go/config.go @@ -25,6 +25,13 @@ type Config struct { ValidateJsonRawMessage bool ObjectFieldMustBeSimpleString bool CaseSensitive bool + + // MaxMarshalledBytes limits the maximum size of the output. + // + // While it guarantees not to return more bytes than MaxMarshalledBytes, + // it does not guarantee that the internal buffer will be smaller than MaxMarshalledBytes. + // In most cases, the internal buffer may be larger by only a few bytes. + MaxMarshalledBytes uint64 } // API the public interface of this package. @@ -80,6 +87,7 @@ type frozenConfig struct { streamPool *sync.Pool iteratorPool *sync.Pool caseSensitive bool + maxMarshalledBytes uint64 } func (cfg *frozenConfig) initCache() { @@ -134,6 +142,7 @@ func (cfg Config) Froze() API { onlyTaggedField: cfg.OnlyTaggedField, disallowUnknownFields: cfg.DisallowUnknownFields, caseSensitive: cfg.CaseSensitive, + maxMarshalledBytes: cfg.MaxMarshalledBytes, } api.streamPool = &sync.Pool{ New: func() interface{} { @@ -293,9 +302,22 @@ func (cfg *frozenConfig) MarshalToString(v interface{}) (string, error) { return string(stream.Buffer()), nil } -func (cfg *frozenConfig) Marshal(v interface{}) ([]byte, error) { +func (cfg *frozenConfig) Marshal(v interface{}) (_ []byte, err error) { stream := cfg.BorrowStream(nil) defer cfg.ReturnStream(stream) + + defer func() { + // See Stream.enforceMaxBytes() for an explanation of this. + if r := recover(); r != nil { + if limitError, ok := r.(ExceededMaxMarshalledBytesError); ok { + err = limitError + return + } + + panic(r) + } + }() + stream.WriteVal(v) if stream.Error != nil { return nil, stream.Error diff --git a/vendor/github.com/json-iterator/go/reflect_map.go b/vendor/github.com/json-iterator/go/reflect_map.go index 58296713013..4e479c8a516 100644 --- a/vendor/github.com/json-iterator/go/reflect_map.go +++ b/vendor/github.com/json-iterator/go/reflect_map.go @@ -2,11 +2,12 @@ package jsoniter import ( "fmt" - "github.com/modern-go/reflect2" "io" "reflect" "sort" "unsafe" + + "github.com/modern-go/reflect2" ) func decoderOfMap(ctx *ctx, typ reflect2.Type) ValDecoder { @@ -106,15 +107,17 @@ func encoderOfMapKey(ctx *ctx, typ reflect2.Type) ValEncoder { } } - if typ == textMarshalerType { - return &directTextMarshalerEncoder{ - stringEncoder: ctx.EncoderOf(reflect2.TypeOf("")), + if typ.Kind() != reflect.String { + if typ == textMarshalerType { + return &directTextMarshalerEncoder{ + stringEncoder: ctx.EncoderOf(reflect2.TypeOf("")), + } } - } - if typ.Implements(textMarshalerType) { - return &textMarshalerEncoder{ - valType: typ, - stringEncoder: ctx.EncoderOf(reflect2.TypeOf("")), + if typ.Implements(textMarshalerType) { + return &textMarshalerEncoder{ + valType: typ, + stringEncoder: ctx.EncoderOf(reflect2.TypeOf("")), + } } } diff --git a/vendor/github.com/json-iterator/go/reflect_native.go b/vendor/github.com/json-iterator/go/reflect_native.go index f88722d14d1..b591b13c31b 100644 --- a/vendor/github.com/json-iterator/go/reflect_native.go +++ b/vendor/github.com/json-iterator/go/reflect_native.go @@ -444,6 +444,7 @@ func (codec *base64Codec) Encode(ptr unsafe.Pointer, stream *Stream) { buf := make([]byte, size) encoding.Encode(buf, src) stream.buf = append(stream.buf, buf...) + stream.enforceMaxBytes() } stream.writeByte('"') } diff --git a/vendor/github.com/json-iterator/go/stream.go b/vendor/github.com/json-iterator/go/stream.go index 23d8a3ad6b1..a07a51305af 100644 --- a/vendor/github.com/json-iterator/go/stream.go +++ b/vendor/github.com/json-iterator/go/stream.go @@ -1,18 +1,24 @@ package jsoniter import ( + "fmt" "io" ) // stream is a io.Writer like object, with JSON specific write functions. // Error is not returned as return value, but stored as Error member on this stream instance. type Stream struct { - cfg *frozenConfig - out io.Writer - buf []byte - Error error - indention int - Attachment interface{} // open for customized encoder + cfg *frozenConfig + out io.Writer + buf []byte + Error error + indention int + Attachment interface{} // open for customized encoder + enforceMarshalledBytesLimit bool + + // Number of bytes remaining before marshalled size exceeds cfg.maxMarshalledBytes. + // This is tracked as an amount remaining to account for bytes already flushed in Write(). + marshalledBytesLimitRemaining uint64 } // NewStream create new stream instance. @@ -20,12 +26,15 @@ type Stream struct { // out can be nil if write to internal buffer. // bufSize is the initial size for the internal buffer in bytes. func NewStream(cfg API, out io.Writer, bufSize int) *Stream { + config := cfg.(*frozenConfig) return &Stream{ - cfg: cfg.(*frozenConfig), - out: out, - buf: make([]byte, 0, bufSize), - Error: nil, - indention: 0, + cfg: config, + out: out, + buf: make([]byte, 0, bufSize), + Error: nil, + indention: 0, + enforceMarshalledBytesLimit: config.maxMarshalledBytes > 0, + marshalledBytesLimitRemaining: config.maxMarshalledBytes, } } @@ -38,6 +47,7 @@ func (stream *Stream) Pool() StreamPool { func (stream *Stream) Reset(out io.Writer) { stream.out = out stream.buf = stream.buf[:0] + stream.marshalledBytesLimitRemaining = stream.cfg.maxMarshalledBytes } // Available returns how many bytes are unused in the buffer. @@ -66,9 +76,12 @@ func (stream *Stream) SetBuffer(buf []byte) { // why the write is short. func (stream *Stream) Write(p []byte) (nn int, err error) { stream.buf = append(stream.buf, p...) + stream.enforceMaxBytes() + if stream.out != nil { nn, err = stream.out.Write(stream.buf) stream.buf = stream.buf[nn:] + stream.marshalledBytesLimitRemaining -= uint64(nn) return } return len(p), nil @@ -77,22 +90,51 @@ func (stream *Stream) Write(p []byte) (nn int, err error) { // WriteByte writes a single byte. func (stream *Stream) writeByte(c byte) { stream.buf = append(stream.buf, c) + stream.enforceMaxBytes() } func (stream *Stream) writeTwoBytes(c1 byte, c2 byte) { stream.buf = append(stream.buf, c1, c2) + stream.enforceMaxBytes() } func (stream *Stream) writeThreeBytes(c1 byte, c2 byte, c3 byte) { stream.buf = append(stream.buf, c1, c2, c3) + stream.enforceMaxBytes() } func (stream *Stream) writeFourBytes(c1 byte, c2 byte, c3 byte, c4 byte) { stream.buf = append(stream.buf, c1, c2, c3, c4) + stream.enforceMaxBytes() } func (stream *Stream) writeFiveBytes(c1 byte, c2 byte, c3 byte, c4 byte, c5 byte) { stream.buf = append(stream.buf, c1, c2, c3, c4, c5) + stream.enforceMaxBytes() +} + +func (stream *Stream) enforceMaxBytes() { + if !stream.enforceMarshalledBytesLimit { + return + } + + if uint64(len(stream.buf)) > stream.marshalledBytesLimitRemaining { + // Why do we do this rather than return an error? + // Most of the writing methods on Stream do not return an error, and introducing this would be a + // breaking change for custom encoders. + // Furthermore, nothing checks if the stream has failed until the object has been completely written + // so if we don't panic here, we'd continue writing the rest of the object, negating the purpose of + // this limit. + panic(ExceededMaxMarshalledBytesError{stream.cfg.maxMarshalledBytes}) + } +} + +type ExceededMaxMarshalledBytesError struct { + MaxMarshalledBytes uint64 +} + +func (err ExceededMaxMarshalledBytesError) Error() string { + return fmt.Sprintf("marshalling produced a result over the configured limit of %d bytes", err.MaxMarshalledBytes) } // Flush writes any buffered data to the underlying io.Writer. @@ -117,6 +159,7 @@ func (stream *Stream) Flush() error { // WriteRaw write string out without quotes, just like []byte func (stream *Stream) WriteRaw(s string) { stream.buf = append(stream.buf, s...) + stream.enforceMaxBytes() } // WriteNil write null to stream @@ -207,4 +250,5 @@ func (stream *Stream) writeIndention(delta int) { for i := 0; i < toWrite; i++ { stream.buf = append(stream.buf, ' ') } + stream.enforceMaxBytes() } diff --git a/vendor/github.com/json-iterator/go/stream_float.go b/vendor/github.com/json-iterator/go/stream_float.go index 826aa594ac6..dd826f8fc7a 100644 --- a/vendor/github.com/json-iterator/go/stream_float.go +++ b/vendor/github.com/json-iterator/go/stream_float.go @@ -27,6 +27,15 @@ func (stream *Stream) WriteFloat32(val float32) { } } stream.buf = strconv.AppendFloat(stream.buf, float64(val), fmt, -1, 32) + if fmt == 'e' { + // clean up e-09 to e-9 + n := len(stream.buf) + if n >= 4 && stream.buf[n-4] == 'e' && stream.buf[n-3] == '-' && stream.buf[n-2] == '0' { + stream.buf[n-2] = stream.buf[n-1] + stream.buf = stream.buf[:n-1] + } + } + stream.enforceMaxBytes() } // WriteFloat32Lossy write float32 to stream with ONLY 6 digits precision although much much faster @@ -76,6 +85,15 @@ func (stream *Stream) WriteFloat64(val float64) { } } stream.buf = strconv.AppendFloat(stream.buf, float64(val), fmt, -1, 64) + if fmt == 'e' { + // clean up e-09 to e-9 + n := len(stream.buf) + if n >= 4 && stream.buf[n-4] == 'e' && stream.buf[n-3] == '-' && stream.buf[n-2] == '0' { + stream.buf[n-2] = stream.buf[n-1] + stream.buf = stream.buf[:n-1] + } + } + stream.enforceMaxBytes() } // WriteFloat64Lossy write float64 to stream with ONLY 6 digits precision although much much faster diff --git a/vendor/github.com/json-iterator/go/stream_int.go b/vendor/github.com/json-iterator/go/stream_int.go index d1059ee4c20..2935c9ef692 100644 --- a/vendor/github.com/json-iterator/go/stream_int.go +++ b/vendor/github.com/json-iterator/go/stream_int.go @@ -32,6 +32,7 @@ func writeBuf(buf []byte, v uint32) []byte { // WriteUint8 write uint8 to stream func (stream *Stream) WriteUint8(val uint8) { stream.buf = writeFirstBuf(stream.buf, digits[val]) + stream.enforceMaxBytes() } // WriteInt8 write int8 to stream @@ -44,6 +45,7 @@ func (stream *Stream) WriteInt8(nval int8) { val = uint8(nval) } stream.buf = writeFirstBuf(stream.buf, digits[val]) + stream.enforceMaxBytes() } // WriteUint16 write uint16 to stream @@ -56,6 +58,7 @@ func (stream *Stream) WriteUint16(val uint16) { r1 := val - q1*1000 stream.buf = writeFirstBuf(stream.buf, digits[q1]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() return } @@ -76,6 +79,7 @@ func (stream *Stream) WriteUint32(val uint32) { q1 := val / 1000 if q1 == 0 { stream.buf = writeFirstBuf(stream.buf, digits[val]) + stream.enforceMaxBytes() return } r1 := val - q1*1000 @@ -83,6 +87,7 @@ func (stream *Stream) WriteUint32(val uint32) { if q2 == 0 { stream.buf = writeFirstBuf(stream.buf, digits[q1]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() return } r2 := q1 - q2*1000 @@ -96,6 +101,7 @@ func (stream *Stream) WriteUint32(val uint32) { } stream.buf = writeBuf(stream.buf, digits[r2]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() } // WriteInt32 write int32 to stream @@ -115,6 +121,7 @@ func (stream *Stream) WriteUint64(val uint64) { q1 := val / 1000 if q1 == 0 { stream.buf = writeFirstBuf(stream.buf, digits[val]) + stream.enforceMaxBytes() return } r1 := val - q1*1000 @@ -122,6 +129,7 @@ func (stream *Stream) WriteUint64(val uint64) { if q2 == 0 { stream.buf = writeFirstBuf(stream.buf, digits[q1]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() return } r2 := q1 - q2*1000 @@ -130,6 +138,7 @@ func (stream *Stream) WriteUint64(val uint64) { stream.buf = writeFirstBuf(stream.buf, digits[q2]) stream.buf = writeBuf(stream.buf, digits[r2]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() return } r3 := q2 - q3*1000 @@ -139,6 +148,7 @@ func (stream *Stream) WriteUint64(val uint64) { stream.buf = writeBuf(stream.buf, digits[r3]) stream.buf = writeBuf(stream.buf, digits[r2]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() return } r4 := q3 - q4*1000 @@ -149,6 +159,7 @@ func (stream *Stream) WriteUint64(val uint64) { stream.buf = writeBuf(stream.buf, digits[r3]) stream.buf = writeBuf(stream.buf, digits[r2]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() return } r5 := q4 - q5*1000 @@ -165,6 +176,7 @@ func (stream *Stream) WriteUint64(val uint64) { stream.buf = writeBuf(stream.buf, digits[r3]) stream.buf = writeBuf(stream.buf, digits[r2]) stream.buf = writeBuf(stream.buf, digits[r1]) + stream.enforceMaxBytes() } // WriteInt64 write int64 to stream diff --git a/vendor/github.com/json-iterator/go/stream_str.go b/vendor/github.com/json-iterator/go/stream_str.go index 54c2ba0b3a2..d4d3a122cf7 100644 --- a/vendor/github.com/json-iterator/go/stream_str.go +++ b/vendor/github.com/json-iterator/go/stream_str.go @@ -233,6 +233,7 @@ func (stream *Stream) WriteStringWithHTMLEscaped(s string) { } if i == valLen { stream.buf = append(stream.buf, '"') + stream.enforceMaxBytes() return } writeStringSlowPathWithHTMLEscaped(stream, i, s, valLen) @@ -323,6 +324,7 @@ func (stream *Stream) WriteString(s string) { } if i == valLen { stream.buf = append(stream.buf, '"') + stream.enforceMaxBytes() return } writeStringSlowPath(stream, i, s, valLen) diff --git a/vendor/modules.txt b/vendor/modules.txt index 707d439e12b..3b279389718 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -973,7 +973,7 @@ github.com/jmespath/go-jmespath # github.com/jpillora/backoff v1.0.0 ## explicit; go 1.13 github.com/jpillora/backoff -# github.com/json-iterator/go v1.1.12 +# github.com/json-iterator/go v1.1.12 => github.com/charleskorn/json-iterator-go v0.0.0-20251215054129-8df468a86247 ## explicit; go 1.12 github.com/json-iterator/go # github.com/julienschmidt/httprouter v1.3.0 @@ -2202,3 +2202,4 @@ sigs.k8s.io/yaml # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20250911094103-5456b6e45604 # github.com/prometheus/otlptranslator => github.com/grafana/mimir-otlptranslator v0.0.0-20251017074411-ea1e8f863e1d # github.com/prometheus/client_golang => github.com/colega/prometheus-client_golang v1.19.1-0.20251204143415-11cda2079634 +# github.com/json-iterator/go => github.com/charleskorn/json-iterator-go v0.0.0-20251215054129-8df468a86247