Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/23454.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: Fixed XDS package to generate correct endpoints and cluster configurations for API Gateways when peered, and updated the API Gateway update handler to propogate mesh gateway config to its upstreams.
```
19 changes: 19 additions & 0 deletions agent/proxycfg/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat

for _, rule := range route.Rules {
for _, service := range rule.Services {
// Retrieving the meshGatewayConfig from handlerAPIGateway instance.
// `handlerAPIGateway` embeds `handlerState`, which exposes `serviceInstance.proxyCfg`.
// serviceInstance.proxyCfg.MeshGateway is replicated from NodeService during state setup/update.
// and NodeService populated for all gateway's during service resistration `AgentRegisterService`.
//
// So, Whenever any change happens in NodeService, proxyCfg manager will recreate
// the state where it copies NodeService to serviceInstance and
// then calls this api_gateway handleUpdates method.
// which will update the Mesh-Gateway config to api_gateway upstreams (below).
// h.service = <name of api-gateway>
meshGatewayConfig := h.proxyCfg.MeshGateway

for _, listener := range snap.APIGateway.Listeners {
shouldBind := false
for _, parent := range route.Parents {
Expand All @@ -382,6 +394,11 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
Config: map[string]interface{}{
"protocol": "http",
},

// Propogate the meshGatewayConfig in api gateway upstreams
// so that meshGatewayMode can be used in XDS for
// endpoints and cluster config generation.
MeshGateway: meshGatewayConfig,
}

listenerKey := APIGatewayListenerKeyFromListener(listener)
Expand Down Expand Up @@ -410,6 +427,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
snap.APIGateway.TCPRoutes.Set(ref, route)

for _, service := range route.Services {
meshGatewayConfig := h.proxyCfg.MeshGateway
upstreamID := NewUpstreamIDFromServiceName(service.ServiceName())
seenUpstreamIDs.add(upstreamID)

Expand All @@ -436,6 +454,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
Config: map[string]interface{}{
"protocol": "tcp",
},
MeshGateway: meshGatewayConfig,
}

listenerKey := APIGatewayListenerKeyFromListener(listener)
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func newKindHandler(config stateConfig, s serviceInstance, ch chan UpdateEvent)
case structs.ServiceKindIngressGateway:
handler = &handlerIngressGateway{handlerState: h}
case structs.ServiceKindAPIGateway:
h.logger = config.logger.Named(logging.APIGateway)
handler = &handlerAPIGateway{handlerState: h}
default:
return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
Expand Down
127 changes: 126 additions & 1 deletion agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
)

func TestStateChanged(t *testing.T) {
func TestStateChangedConnectProxy(t *testing.T) {
tests := []struct {
name string
ns *structs.NodeService
Expand Down Expand Up @@ -108,6 +108,131 @@ func TestStateChanged(t *testing.T) {
},
want: true,
},
{
name: "different proxy mesh gateway mode",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Proxy.MeshGateway.Mode = structs.MeshGatewayModeLocal
return &ns, token
},
want: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
proxyID := ProxyID{ServiceID: tt.ns.CompoundServiceID()}
state, err := newState(proxyID, tt.ns, testSource, tt.token, stateConfig{logger: hclog.New(nil)}, rate.NewLimiter(rate.Inf, 1))
require.NoError(t, err)
otherNS, otherToken := tt.mutate(*tt.ns, tt.token)
require.Equal(t, tt.want, state.Changed(otherNS, otherToken))
})
}
}

func TestStateChangedAPIGateway(t *testing.T) {
tests := []struct {
name string
ns *structs.NodeService
token string
mutate func(ns structs.NodeService, token string) (*structs.NodeService, string)
want bool
}{
{
name: "nil node service",
ns: structs.TestNodeServiceAPIGateway(t),
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
return nil, token
},
want: true,
},
{
name: "same service",
ns: structs.TestNodeServiceAPIGateway(t),
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
return &ns, token
}, want: false,
},
{
name: "same service, different token",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
return &ns, "bar"
},
want: true,
},
{
name: "different address",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Address = "10.10.10.10"
return &ns, token
},
want: true,
},
{
name: "different port",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Port = 12345
return &ns, token
},
want: true,
},
{
name: "different service kind",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Kind = ""
return &ns, token
},
want: true,
},
{
name: "different proxy target",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Proxy.DestinationServiceName = "badger"
return &ns, token
},
want: true,
},
{
name: "different proxy upstreams",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Proxy.Upstreams = nil
return &ns, token
},
want: true,
},
{
name: "different mesh gateway mode (local)",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Proxy.MeshGateway.Mode = structs.MeshGatewayModeLocal
return &ns, token
},
want: true,
},
{
name: "different mesh gateway mode (remote)",
ns: structs.TestNodeServiceAPIGateway(t),
token: "foo",
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
ns.Proxy.MeshGateway.Mode = structs.MeshGatewayModeRemote
return &ns, token
},
want: true,
},
}

for _, tt := range tests {
Expand Down
25 changes: 25 additions & 0 deletions agent/structs/testing_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,35 @@ func TestNodeServiceMeshGateway(t testing.T) *NodeService {
}

func TestNodeServiceAPIGateway(t testing.T) *NodeService {
entMeta := DefaultEnterpriseMetaInPartition("")
return &NodeService{
Kind: ServiceKindAPIGateway,
Service: "api-gateway",
Address: "1.1.1.1",

// ---------------------------------------
// Adding TestConnectProxyConfig to the proxy field here within TestNodeServiceAPIGateway
// to test whether APIGateway is able to handle state changes within ConnectProxyConfig (TestStateChangedAPIGateway).
// Please note:
// The naming may suggest that ConnectProxyConfig should only be used for ConnectProxy,
// but "APIGateway state" uses serviceInstance, which embeds ConnectProxyConfig as part of its state,
// so any changes to ConnectProxyConfig will also impact APIGateway, such as change in "mesh gateway mode".
//
// For example, let's say a user updates the mesh gateway mode of an API gateway,
// First NodeService.Proxy will be updated and then proxyCfg manager detects change in config
// and it recreates the state for api_gateway which would copy the
// NodeService.Proxy.MeshGateway to serviceInstance.proxyCfg.MeshGateway in `newServiceInstanceFromNodeService` (serviceInstance is part of state)
// and then proxyCfg manager calls the api_gateway handleUpdates method which would
// update the api_gateway upstreams with new meshGateway config.
//
// Now, this serviceInstance.proxyCfg and NodeService.Proxy
// refers to same proxy configuration, which is of type ConnectProxyConfig.

// So, we need to test it as well.
// ---------------------------------------

Proxy: TestConnectProxyConfig(t),
EnterpriseMeta: *entMeta,
}
}

Expand Down
126 changes: 113 additions & 13 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,25 +1533,47 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
for _, groupedTarget := range targetGroups {
s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName)

// Now this makeUpstreamClusterForDiscoveryChain, is a generic method
// and used by connect proxy, ingress gateway and api gateway.
//
// Issue: This method always make cluster (without endpoints).
// `ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},`
//
// Envoy Exception:
// As we know that any service whose upstream endpoint is of type hostname,
// envoy cannot resolve hostname as EDS.
// So, we need to use CDS to send endpoints as well along with cluster configs.
//
// Context:
// When we have 2 consul DC peered with mesh gw ON AWS and
// we have API gw on DC1 which need to access a service X which exist on DC2.
// Also, API gateway is configured to in mesh-gw remote mode.
// In this case, API gateway upstream would be DC2's mesh-gateway and
// we configure API GW envoy upstream endpoint to it.
// The problem is envoy exception and AWS mesh-gw lb type.
// AWS generates hostname based endpoints for mesh-gw lb endpoint and
// when we have hostname based endpoints envoy cannot resolve it via EDS,
// So we configure that endpoint via CDS.
//
// If not fixed, whenever any service (gateway) whose upstream endpoint is of hostname type,
// cluster endpoints will be empty and envoy will fail to route traffic to that cluster.
//
// Fix: Add logic to check if we should create a cluster config
// without upstream endpoint or with upstream endpoint (hostnames)
// based on upstream endpoint type.
//
// You can refer makeUpstreamClusterForPeerService - used by connect proxy for similar logic.
// or makeGatewayCluster - used by mesh gw for peering services.

c := &envoy_cluster_v3.Cluster{
Name: groupedTarget.ClusterName,
AltStatName: groupedTarget.ClusterName,
ConnectTimeout: durationpb.New(node.Resolver.ConnectTimeout),
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
Name: groupedTarget.ClusterName,
AltStatName: groupedTarget.ClusterName,
ConnectTimeout: durationpb.New(node.Resolver.ConnectTimeout),
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{
Value: 0, // disable panic threshold
},
},
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core_v3.ConfigSource{
InitialFetchTimeout: cfgSnap.GetXDSCommonConfig(s.Logger).GetXDSFetchTimeout(),
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
Ads: &envoy_core_v3.AggregatedConfigSource{},
},
},
},
// TODO(peering): make circuit breakers or outlier detection work?
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(upstreamConfig.Limits),
Expand Down Expand Up @@ -1582,6 +1604,58 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
return nil, fmt.Errorf("cannot have more than one target")
}

targetInfo := groupedTarget.Targets[0]
targetUID := proxycfg.NewUpstreamIDFromTargetID(targetInfo.TargetID)

meshGatewayMode, err := s.getMeshGatewayMode(cfgSnap, upstream, targetUID, groupedTarget.ClusterName)
if err != nil {
s.Logger.Error(err.Error(), "cluster", groupedTarget.ClusterName)
}

// Check if cluster need to be configured with hostnames or not.
useEDS := true
if targetUID.Peer != "" {
if _, ok := upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[targetUID]; ok {
// If we're using local mesh gw, the fact that upstreams use hostnames doesn't matter.
// If we're not using local mesh gw, then resort to CDS/DNS.
if meshGatewayMode != structs.MeshGatewayModeLocal {
useEDS = false
}
}
}

if useEDS {
c.ClusterDiscoveryType = &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS}
c.EdsClusterConfig = &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core_v3.ConfigSource{
InitialFetchTimeout: cfgSnap.GetXDSCommonConfig(s.Logger).GetXDSFetchTimeout(),
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
Ads: &envoy_core_v3.AggregatedConfigSource{},
},
},
}
} else {
hostnameEndpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(targetUID)
if !ok || len(hostnameEndpoints) == 0 {
// The upstream snapshot should deliver hostname endpoints soon; skip this cluster until then.
s.Logger.Debug("peer hostname endpoints not ready for discovery chain target",
"target", targetInfo.TargetID,
"upstream", targetUID,
"cluster", groupedTarget.ClusterName,
)
continue
}
c.EdsClusterConfig = nil
configureClusterWithHostnames(
s.Logger,
c,
"", /*TODO: should make configurable ? */
hostnameEndpoints,
true, /*isRemote*/
false, /*onlyPassing*/
)
}
if targetInfo := groupedTarget.Targets[0]; targetInfo.TLSContext != nil {
transportSocket, err := makeUpstreamTLSTransportSocket(targetInfo.TLSContext)
if err != nil {
Expand Down Expand Up @@ -1609,6 +1683,32 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
return out, nil
}

func (s *ResourceGenerator) getMeshGatewayMode(
cfgSnap *proxycfg.ConfigSnapshot, upstream *structs.Upstream,
targetUID proxycfg.UpstreamID, clusterName string,
) (structs.MeshGatewayMode, error) {
defaultMode := structs.MeshGatewayModeDefault
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
upstreamConfig, _ := cfgSnap.ConnectProxy.
GetUpstream(targetUID, &cfgSnap.ProxyID.EnterpriseMeta)
if upstreamConfig != nil {
return upstreamConfig.MeshGateway.Mode, nil
}
return defaultMode, nil
case structs.ServiceKindAPIGateway,
structs.ServiceKindIngressGateway:
if upstream != nil {
return upstream.MeshGateway.Mode, nil
}
return defaultMode, nil
case structs.ServiceKindMeshGateway:
// Mesh Gateway mesh mode will always be remote.
return structs.MeshGatewayModeRemote, nil
}
return structs.MeshGatewayModeDefault, fmt.Errorf("unexpected service kind %q when determining mesh gateway mode for cluster %q", cfgSnap.Kind, clusterName)
}

func (s *ResourceGenerator) makeExportedUpstreamClustersForMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
// NOTE: Despite the mesh gateway already having one cluster per service
// (and subset) in the local datacenter we cannot reliably use those to
Expand Down
Loading
Loading