Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/capabilities/vault/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func NewCapability(
if err != nil {
return nil, fmt.Errorf("could not create request batch size limiter: %w", err)
}
ciphertextLimiter, err := limits.MakeUpperBoundLimiter(limitsFactory, cresettings.Default.VaultCiphertextSizeLimit)
ciphertextLimiter, err := limits.MakeUpperBoundLimiter(limitsFactory, cresettings.Default.PerOwner.VaultCiphertextSizeLimit)
if err != nil {
return nil, fmt.Errorf("could not create ciphertext size limiter: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.101
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip/chains/evm v0.0.0-20260506144252-c100eabfda74
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260609183712-678afb1edd2e
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610184803-96d1e031407b
github.com/smartcontractkit/chainlink-common/keystore v1.2.0
github.com/smartcontractkit/chainlink-data-streams v0.1.15-0.20260522094612-5f9f748bd87a
github.com/smartcontractkit/chainlink-deployments-framework v0.105.0
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions core/services/gateway/handlers/capabilities/v2/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type gatewayHandler struct {
globalNodeRateLimiter limits.RateLimiter // Global rate limiter shared across all incoming node requests from workflow DON
perNodeRateLimiters map[string]limits.RateLimiter // Per-node rate limiters keyed by node address, one independent bucket per DON member
mtlsRequestRateLimiter limits.RateLimiter
mtlsConcurrencyLimiter limits.ResourcePoolLimiter[int] // Bounds the number of in-flight outbound mTLS requests
wg sync.WaitGroup
stopCh services.StopChan
responseCache ResponseCache // Caches HTTP responses to avoid redundant requests for outbound HTTP actions
Expand Down Expand Up @@ -141,6 +142,11 @@ func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfi
return nil, fmt.Errorf("failed to create mtls rate limiter: %w", err)
}

mtlsConcurrencyLimiter, err := limits.MakeResourcePoolLimiter(lf, cresettings.Default.GatewayHTTPActionMtlsConcurrencyLimit)
if err != nil {
return nil, fmt.Errorf("failed to create mtls concurrency limiter: %w", err)
}

metrics, err := metrics.NewMetrics(donConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize metrics: %w", err)
Expand All @@ -156,6 +162,7 @@ func NewGatewayHandler(handlerConfig json.RawMessage, donConfig *config.DONConfi
globalNodeRateLimiter: globalNodeRateLimiter,
perNodeRateLimiters: perNodeRateLimiters,
mtlsRequestRateLimiter: mtlsRequestRateLimiter,
mtlsConcurrencyLimiter: mtlsConcurrencyLimiter,
stopCh: make(services.StopChan),
responseCache: newResponseCache(lggr, cfg.OutboundRequestCacheTTLMs, metrics),
triggerHandler: triggerHandler,
Expand Down Expand Up @@ -272,13 +279,6 @@ func (h *gatewayHandler) send(ctx context.Context, httpReq network.HTTPRequest,
return h.httpClient.Send(ctx, httpReq)
}

// We don't have access to the org here, so this will fall back to the environment default (=false).
// That's appropriate because all fields set on the request come from untrusted nodes.
// The capability separately applies an org-specific check.
if !h.mtlsRequestRateLimiter.Allow(ctx) {
return nil, fmt.Errorf("global mtls request rate limit exceeded: %w", network.ErrBlockedRequest)
}

if h.httpClientFactory == nil {
return nil, errors.New("nil http client factory, cannot make mtls request")
}
Expand All @@ -290,16 +290,29 @@ func (h *gatewayHandler) send(ctx context.Context, httpReq network.HTTPRequest,
// b) we apply rate limits limiting the ability of sending nodes to spam requests
// c) we apply per-owner rate limits in the action capability in the
// workflow node limiting the ability of users to abuse this flow by spamming Mtls requests.
// The client enforces the mtls concurrency limit internally (on the request's
// capped-timeout context) before delegating to the underlying transport.
client, err := h.httpClientFactory(network.HTTPClientConfig{
Mtls: &gateway_common.MtlsAuth{
PrivateKey: req.Mtls.PrivateKey,
Certificate: req.Mtls.Certificate,
},
ConcurrencyLimiter: h.mtlsConcurrencyLimiter,
})
if err != nil {
return nil, fmt.Errorf("failed to instantiate http client for mtls request: %w", err)
}

// We don't have access to the org here, so this will fall back to the environment default (=false).
// That's appropriate because all fields set on the request come from untrusted nodes.
// The capability separately applies an org-specific check.

// Note: we intentionally consume the rate-limit after instantiating the client so that a malicious user
// can't send requests with invalid mtls credentials and thus cheaply consume global tokens.
if !h.mtlsRequestRateLimiter.Allow(ctx) {
return nil, fmt.Errorf("global mtls request rate limit exceeded: %w", network.ErrBlockedRequest)
}

return client.Send(ctx, httpReq)
}

Expand Down Expand Up @@ -478,6 +491,12 @@ func (h *gatewayHandler) Close() error {
h.lggr.Errorw("failed to close per-node rate limiter", "nodeAddr", nodeAddr, "err", err)
}
}
if err = h.mtlsRequestRateLimiter.Close(); err != nil {
h.lggr.Errorw("failed to close mtls request rate limiter", "err", err)
}
if err = h.mtlsConcurrencyLimiter.Close(); err != nil {
h.lggr.Errorw("failed to close mtls concurrency limiter", "err", err)
}
close(h.stopCh)
h.wg.Wait()
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,9 @@ func TestGatewayHandler_Send_NoMtls_UsesDefaultClient(t *testing.T) {
func TestGatewayHandler_Send_MtlsBlockedByRateLimit(t *testing.T) {
handler := createTestHandler(t)
handler.mtlsRequestRateLimiter = limits.GlobalRateLimiter(0, 0)
handler.httpClientFactory = func(config network.HTTPClientConfig) (network.HTTPClient, error) {
return httpmocks.NewHTTPClient(t), nil
}

httpReq := network.HTTPRequest{Method: "GET", URL: "https://example.com/api"}
outboundReq := gateway_common.OutboundHTTPRequest{
Expand All @@ -965,6 +968,41 @@ func TestGatewayHandler_Send_MtlsBlockedByRateLimit(t *testing.T) {
require.Contains(t, err.Error(), "global mtls request rate limit exceeded")
}

// TestGatewayHandler_Send_MtlsPassesConcurrencyLimiterToFactory verifies the
// handler hands its shared mtls concurrency limiter to the client factory. The
// limiter is enforced inside the HTTP client (on the request's capped-timeout
// context); that enforcement is covered by the network package tests.
func TestGatewayHandler_Send_MtlsPassesConcurrencyLimiterToFactory(t *testing.T) {
handler := createTestHandler(t)
handler.mtlsRequestRateLimiter = limits.GlobalRateLimiter(100, 100)

httpReq := network.HTTPRequest{Method: "GET", URL: "https://example.com/api"}
outboundReq := gateway_common.OutboundHTTPRequest{
Method: "GET",
URL: "https://example.com/api",
Mtls: &gateway_common.MtlsAuth{
PrivateKey: []byte("private-key"),
Certificate: []byte("certificate"),
},
}

expectedResp := &network.HTTPResponse{StatusCode: 200, Body: []byte("ok")}
mtlsClient := httpmocks.NewHTTPClient(t)
mtlsClient.EXPECT().Send(mock.Anything, httpReq).Return(expectedResp, nil).Once()

var capturedConfig network.HTTPClientConfig
handler.httpClientFactory = func(config network.HTTPClientConfig) (network.HTTPClient, error) {
capturedConfig = config
return mtlsClient, nil
}

resp, err := handler.send(testutils.Context(t), httpReq, outboundReq)
require.NoError(t, err)
require.Equal(t, expectedResp, resp)
require.NotNil(t, capturedConfig.ConcurrencyLimiter, "handler must pass its mtls concurrency limiter to the client factory")
require.Equal(t, handler.mtlsConcurrencyLimiter, capturedConfig.ConcurrencyLimiter)
}

func TestGatewayHandler_Send_MtlsUsesFactory(t *testing.T) {
handler := createTestHandler(t)
handler.mtlsRequestRateLimiter = limits.GlobalRateLimiter(100, 100)
Expand Down Expand Up @@ -1026,6 +1064,37 @@ func TestGatewayHandler_Send_MtlsFactoryError(t *testing.T) {
require.ErrorIs(t, err, factoryErr, "factory error should be wrapped and discoverable via errors.Is")
}

// TestGatewayHandler_Send_InvalidMtlsCertDoesNotConsumeGlobalTokens verifies that a
// request carrying invalid mTLS credentials does not consume a global rate-limit token.
// Otherwise a malicious user could cheaply drain the shared mtls token bucket by spamming
// requests with bogus certificates. It uses the real HTTP client factory so that the
// production code path is what rejects the certificate as invalid.
func TestGatewayHandler_Send_InvalidMtlsCertDoesNotConsumeGlobalTokens(t *testing.T) {
handler := createTestHandler(t)
// Burst of exactly 1: only a single mtls request may pass the rate limiter.
handler.mtlsRequestRateLimiter = limits.GlobalRateLimiter(1, 1)
handler.httpClientFactory = network.NewHTTPClientFactory(network.HTTPClientConfig{}, logger.Test(t))

httpReq := network.HTTPRequest{Method: "GET", URL: "https://example.com/api"}
outboundReq := gateway_common.OutboundHTTPRequest{
Method: "GET",
URL: "https://example.com/api",
Mtls: &gateway_common.MtlsAuth{PrivateKey: []byte("not-a-key"), Certificate: []byte("not-a-cert")},
}

ctx := testutils.Context(t)
resp, err := handler.send(ctx, httpReq, outboundReq)
require.Error(t, err)
require.Nil(t, resp)
require.Contains(t, err.Error(), "failed to parse MtlsAuth into KeyPair",
"the real client factory should reject the invalid certificate material")

// The single available token must still be present: the failed request above must not
// have consumed it.
require.True(t, handler.mtlsRequestRateLimiter.Allow(ctx),
"global mtls rate-limit token must not be consumed by a request with an invalid certificate")
}

// TestGatewayHandler_Send_MtlsRoutesThroughCallbackOnly_DefaultClientUntouched
// verifies that an mTLS request flowing through the full callback path does not
// touch the default (shared) http client even when the factory returns a working
Expand Down Expand Up @@ -1062,6 +1131,9 @@ func TestGatewayHandler_Send_MtlsRoutesThroughCallbackOnly_DefaultClientUntouche
func TestGatewayHandler_Send_MtlsBlockedRequestIsValidationError(t *testing.T) {
handler := createTestHandler(t)
handler.mtlsRequestRateLimiter = limits.GlobalRateLimiter(0, 0)
handler.httpClientFactory = func(config network.HTTPClientConfig) (network.HTTPClient, error) {
return httpmocks.NewHTTPClient(t), nil
}

httpReq := network.HTTPRequest{Method: "GET", URL: "https://example.com/api", Timeout: 5 * time.Second}
outboundReq := gateway_common.OutboundHTTPRequest{
Expand All @@ -1085,6 +1157,9 @@ func TestGatewayHandler_Send_MtlsBlockedRequestIsValidationError(t *testing.T) {
// meaning mtls is blocked out of the box.
func TestGatewayHandler_Send_MtlsRateLimitEnabledByDefault(t *testing.T) {
handler := createTestHandler(t)
handler.httpClientFactory = func(config network.HTTPClientConfig) (network.HTTPClient, error) {
return httpmocks.NewHTTPClient(t), nil
}

httpReq := network.HTTPRequest{Method: "GET", URL: "https://example.com/api"}
outboundReq := gateway_common.OutboundHTTPRequest{
Expand Down
2 changes: 1 addition & 1 deletion core/services/gateway/handlers/vault/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func newHandlerWithAuthorizer(methodConfig json.RawMessage, donConfig *config.DO
if err != nil {
return nil, fmt.Errorf("could not create request batch size limiter: %w", err)
}
ciphertextLimiter, err := limits.MakeUpperBoundLimiter(limitsFactory, cresettings.Default.VaultCiphertextSizeLimit)
ciphertextLimiter, err := limits.MakeUpperBoundLimiter(limitsFactory, cresettings.Default.PerOwner.VaultCiphertextSizeLimit)
if err != nil {
return nil, fmt.Errorf("could not create ciphertext size limiter: %w", err)
}
Expand Down
49 changes: 47 additions & 2 deletions core/services/gateway/network/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/doyensec/safeurl"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
"github.com/smartcontractkit/chainlink-common/pkg/types/gateway"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -51,6 +52,11 @@ type HTTPClientConfig struct {
// Mtls, when set, configures the client to present the supplied client
// certificate for mutual TLS.
Mtls *gateway.MtlsAuth

// ConcurrencyLimiter, when set together with Mtls, bounds the number of
// in-flight mTLS requests. The limiter is acquired on the request's
// (capped) context so waiters self-evict at the request timeout.
ConcurrencyLimiter limits.ResourcePoolLimiter[int]
}

// merge returns a copy of c with any set fields from override applied on top.
Expand Down Expand Up @@ -98,6 +104,9 @@ func (c HTTPClientConfig) merge(override HTTPClientConfig) HTTPClientConfig {
if override.Mtls != nil {
merged.Mtls = override.Mtls
}
if override.ConcurrencyLimiter != nil {
merged.ConcurrencyLimiter = override.ConcurrencyLimiter
}
return merged
}

Expand Down Expand Up @@ -223,8 +232,32 @@ func responseHeadersFromNetHeader(h http.Header) (map[string]string, map[string]
return headers, multiHeaders
}

// httpDoer is the subset of the HTTP client used by httpClient. It is satisfied
// by *safeurl.WrappedClient and by concurrencyLimitedClient, which decorates it.
type httpDoer interface {
Do(req *http.Request) (*http.Response, error)
}

// concurrencyLimitedClient bounds the number of in-flight requests delegated to
// the underlying client. The slot is acquired on the request's context, so a
// waiter self-evicts when that context (carrying the capped request timeout)
// expires rather than blocking indefinitely.
type concurrencyLimitedClient struct {
client httpDoer
limiter limits.ResourcePoolLimiter[int]
}

func (c *concurrencyLimitedClient) Do(req *http.Request) (*http.Response, error) {
free, err := c.limiter.Wait(req.Context(), 1)
if err != nil {
return nil, fmt.Errorf("mtls concurrency limit exceeded: %w", ErrBlockedRequest)
}
defer free()
return c.client.Do(req)
}

type httpClient struct {
client *safeurl.WrappedClient
client httpDoer
config HTTPClientConfig
lggr logger.Logger
metrics *httpClientMetrics
Expand Down Expand Up @@ -259,6 +292,8 @@ func NewHTTPClient(config HTTPClientConfig, lggr logger.Logger) (HTTPClient, err
SetCheckRedirect(disableRedirects).
SetTransport(defaultTransport)

var client httpDoer

if config.Mtls != nil {
// Defence-in-depth protection against accidental reuse
// of the HTTP client leading to auth'd connections leaking across
Expand All @@ -276,6 +311,16 @@ func NewHTTPClient(config HTTPClientConfig, lggr logger.Logger) (HTTPClient, err
MinVersion: tls.VersionTLS12,
}
safeConfigBuilder.SetTransport(defaultTransport)

if config.ConcurrencyLimiter == nil {
return nil, errors.New("mtls requires a ConcurrencyLimiter")
}
client = &concurrencyLimitedClient{
client: safeurl.Client(safeConfigBuilder.Build()),
limiter: config.ConcurrencyLimiter,
}
} else {
client = safeurl.Client(safeConfigBuilder.Build())
}

metrics, err := newHTTPClientMetrics()
Expand All @@ -285,7 +330,7 @@ func NewHTTPClient(config HTTPClientConfig, lggr logger.Logger) (HTTPClient, err

return &httpClient{
config: config,
client: safeurl.Client(safeConfigBuilder.Build()),
client: client,
lggr: lggr,
metrics: metrics,
}, nil
Expand Down
Loading
Loading