diff --git a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/PreprocessorErrorTest.java b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/PreprocessorErrorTest.java index 383d4ed9962..a55e6c5e7cc 100644 --- a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/PreprocessorErrorTest.java +++ b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/PreprocessorErrorTest.java @@ -100,7 +100,7 @@ static Stream testCases() { port_value: 8081 """, IllegalArgumentException.class, - "No route has been selected for listener " + "No route for listener " ), Arguments.of( //language=YAML diff --git a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/VirtualHostRoutingTest.java b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/VirtualHostRoutingTest.java index a3d9ce2cd41..44654022179 100644 --- a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/VirtualHostRoutingTest.java +++ b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/VirtualHostRoutingTest.java @@ -289,7 +289,7 @@ void noMatchTest() { .isInstanceOf(UnprocessedRequestException.class) .cause() .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("No route has been selected for listener"); + .hasMessageStartingWith("No route for listener"); } } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java index b3271a6fafa..8d98bd9530d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java @@ -17,7 +17,6 @@ package com.linecorp.armeria.xds; import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; final class ClusterResourceParser extends ResourceParser { @@ -26,13 +25,8 @@ final class ClusterResourceParser extends ResourceParser clientMap = new HashMap<>(); private boolean closed; ControlPlaneClientManager(Bootstrap bootstrap, EventExecutor eventLoop, BootstrapClusters bootstrapClusters, ConfigSourceMapper configSourceMapper, - MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix) { + MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix, + XdsExtensionRegistry extensionRegistry) { bootstrapNode = bootstrap.getNode(); this.eventLoop = eventLoop; this.bootstrapClusters = bootstrapClusters; this.configSourceMapper = configSourceMapper; this.meterRegistry = meterRegistry; this.meterIdPrefix = meterIdPrefix; + this.extensionRegistry = extensionRegistry; } void subscribe(ResourceNode node) { @@ -65,7 +68,7 @@ void subscribe(ResourceNode node) { final ConfigSourceClient client = clientMap.computeIfAbsent( configSource, ignored -> new ConfigSourceClient( configSource, eventLoop, bootstrapNode, bootstrapClusters, configSourceMapper, - meterRegistry, meterIdPrefix)); + meterRegistry, meterIdPrefix, extensionRegistry)); client.addSubscriber(type, name, node); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java b/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java index fd685797c12..4606e688b51 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java @@ -33,12 +33,14 @@ final class DefaultSubscriptionContext implements SubscriptionContext { private final DirectoryWatchService watchService; private final BootstrapSecrets bootstrapSecrets; private final ResourceNodeMeterBinderFactory meterBinderFactory; + private final XdsExtensionRegistry extensionRegistry; DefaultSubscriptionContext(EventExecutor eventLoop, XdsClusterManager clusterManager, ConfigSourceMapper configSourceMapper, ControlPlaneClientManager controlPlaneClientManager, MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix, - DirectoryWatchService watchService, BootstrapSecrets bootstrapSecrets) { + DirectoryWatchService watchService, BootstrapSecrets bootstrapSecrets, + XdsExtensionRegistry extensionRegistry) { this.eventLoop = eventLoop; this.clusterManager = clusterManager; this.configSourceMapper = configSourceMapper; @@ -47,6 +49,7 @@ final class DefaultSubscriptionContext implements SubscriptionContext { this.meterIdPrefix = meterIdPrefix; this.watchService = watchService; this.bootstrapSecrets = bootstrapSecrets; + this.extensionRegistry = extensionRegistry; meterBinderFactory = new ResourceNodeMeterBinderFactory(meterRegistry, meterIdPrefix); } @@ -99,4 +102,9 @@ public BootstrapSecrets bootstrapSecrets() { public ResourceNodeMeterBinderFactory meterBinderFactory() { return meterBinderFactory; } + + @Override + public XdsExtensionRegistry extensionRegistry() { + return extensionRegistry; + } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java b/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java index 5b7533109fd..04a9964189d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableSet; -import com.google.protobuf.Message; import com.google.rpc.Code; import com.google.rpc.Status; @@ -248,11 +247,12 @@ public void onCompleted() { owner.retryOrClose(false); } - private void handleResponse( - ResourceParser resourceParser, DeltaDiscoveryResponse response) { + private void handleResponse(ResourceParser resourceParser, DeltaDiscoveryResponse response) { final XdsType type = resourceParser.type(); final List deltaResources = response.getResourcesList(); - final ParsedResourcesHolder holder = resourceParser.parseDeltaResources(deltaResources); + final ParsedResourcesHolder holder = + resourceParser.parseDeltaResources(deltaResources, + stateCoordinator.extensionRegistry()); if (!holder.errors().isEmpty()) { holder.invalidResources().forEach((name, error) -> diff --git a/xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java index 42e9b32bac3..19b553a8cda 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java @@ -25,7 +25,8 @@ final class EndpointResourceParser extends ResourceParser mergeFilterConfigs( } static ClientPreprocessors buildDownstreamFilter( - @Nullable HttpConnectionManager connectionManager) { - if (connectionManager == null) { + XdsExtensionRegistry extensionRegistry, + List httpFilters, Map filterConfigs) { + if (httpFilters.isEmpty()) { return ClientPreprocessors.of(); } - final List httpFilters = connectionManager.getHttpFiltersList(); final ClientPreprocessorsBuilder builder = ClientPreprocessors.builder(); for (int i = httpFilters.size() - 1; i >= 0; i--) { final HttpFilter httpFilter = httpFilters.get(i); - final XdsHttpFilter instance = resolveInstance(httpFilter, null); + final Any perRouteConfig = filterConfigs.get(httpFilter.getName()); + final XdsHttpFilter instance = resolveInstance(extensionRegistry, httpFilter, perRouteConfig); if (instance == null) { continue; } @@ -69,13 +68,14 @@ static ClientPreprocessors buildDownstreamFilter( } static ClientDecoration buildUpstreamFilter( + XdsExtensionRegistry extensionRegistry, List httpFilters, Map filterConfigs, @Nullable RetryPolicy retryPolicy) { final ClientDecorationBuilder builder = ClientDecoration.builder(); for (int i = httpFilters.size() - 1; i >= 0; i--) { final HttpFilter httpFilter = httpFilters.get(i); final Any perRouteConfig = filterConfigs.get(httpFilter.getName()); - final XdsHttpFilter instance = resolveInstance(httpFilter, perRouteConfig); + final XdsHttpFilter instance = resolveInstance(extensionRegistry, httpFilter, perRouteConfig); if (instance == null) { continue; } @@ -90,15 +90,18 @@ static ClientDecoration buildUpstreamFilter( } @Nullable - private static XdsHttpFilter resolveInstance( + static XdsHttpFilter resolveInstance( + XdsExtensionRegistry extensionRegistry, HttpFilter httpFilter, @Nullable Any perRouteConfig) { - final HttpFilterFactory filterFactory = - HttpFilterFactoryRegistry.filterFactory(httpFilter.getName()); - if (filterFactory == null) { + final Any defaultConfig = httpFilter.getTypedConfig(); + final Any filterConfig = perRouteConfig != null ? perRouteConfig : defaultConfig; + final HttpFilterFactory factory = extensionRegistry.query( + filterConfig, httpFilter.getName(), HttpFilterFactory.class); + if (factory == null) { if (!httpFilter.getIsOptional()) { throw new IllegalArgumentException( "Unknown HTTP filter '" + httpFilter.getName() + - "': no HttpFilterFactory registered. Register an SPI " + + "': no HttpFilterFactory registered. Register an " + "HttpFilterFactory implementation to handle this filter."); } return null; @@ -107,9 +110,7 @@ private static XdsHttpFilter resolveInstance( httpFilter.getConfigTypeCase() == ConfigTypeCase.CONFIGTYPE_NOT_SET, "Only 'typed_config' is supported, but '%s' was supplied", httpFilter.getConfigTypeCase()); - final Any effectiveConfig = - perRouteConfig != null ? perRouteConfig : httpFilter.getTypedConfig(); - return filterFactory.create(httpFilter, effectiveConfig); + return factory.create(httpFilter, filterConfig, extensionRegistry.validator()); } private FilterUtil() {} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/HttpConnectionManagerFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/HttpConnectionManagerFactory.java new file mode 100644 index 00000000000..c6e09c48370 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/HttpConnectionManagerFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.List; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; + +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; + +final class HttpConnectionManagerFactory implements XdsExtensionFactory { + + static final HttpConnectionManagerFactory INSTANCE = new HttpConnectionManagerFactory(); + private static final String NAME = "envoy.http_connection_manager"; + private static final String TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" + + ".HttpConnectionManager"; + private static final List TYPE_URLS = ImmutableList.of(TYPE_URL); + + private HttpConnectionManagerFactory() {} + + @Override + public String name() { + return NAME; + } + + @Override + public List typeUrls() { + return TYPE_URLS; + } + + HttpConnectionManager create(Any config, XdsResourceValidator validator) { + return validator.unpack(config, HttpConnectionManager.class); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java index 231110ee746..8faa1fc5358 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java @@ -61,7 +61,9 @@ private void initializeBootstrap(Bootstrap bootstrap, SubscriptionContext bootst void register(Listener listener, SubscriptionContext context, SnapshotWatcher watcher) { checkArgument(!nodes.containsKey(listener.getName()), "Static listener with name '%s' already registered", listener.getName()); - final ListenerStream node = new ListenerStream(new ListenerXdsResource(listener), context); + final ListenerXdsResource listenerResource = + ListenerResourceParser.INSTANCE.parse(listener, context.extensionRegistry(), ""); + final ListenerStream node = new ListenerStream(listenerResource, context); nodes.put(listener.getName(), node); eventLoop.execute(safeRunnable(() -> { final Subscription subscription = node.subscribe(watcher); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java index 1a5910f8314..eda6e63d95b 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java @@ -16,9 +16,17 @@ package com.linecorp.armeria.xds; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.filter.XdsHttpFilter; + import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; final class ListenerResourceParser extends ResourceParser { @@ -26,17 +34,44 @@ final class ListenerResourceParser extends ResourceParser resolveDownstreamFilters( + @Nullable HttpConnectionManager connectionManager, + XdsExtensionRegistry registry) { + if (connectionManager == null) { + return ImmutableList.of(); + } + final List httpFilters = connectionManager.getHttpFiltersList(); + final ImmutableList.Builder builder = ImmutableList.builder(); + for (HttpFilter httpFilter : httpFilters) { + final XdsHttpFilter instance = FilterUtil.resolveInstance(registry, httpFilter, null); + if (instance != null) { + builder.add(instance); } } - return resource; + return builder.build(); + } + + @Override + ListenerXdsResource parse(Listener message, XdsExtensionRegistry registry, String version) { + final HttpConnectionManager connectionManager = + unpackConnectionManager(message, registry); + final List downstreamFilters = + resolveDownstreamFilters(connectionManager, registry); + return new ListenerXdsResource(message, connectionManager, downstreamFilters, version); } @Override diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java index 3b5d587d81b..de0fcdd6719 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java @@ -19,7 +19,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import com.linecorp.armeria.client.ClientPreprocessors; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; @@ -34,7 +33,6 @@ public final class ListenerSnapshot implements Snapshot { private final ListenerXdsResource listenerXdsResource; @Nullable private final RouteSnapshot routeSnapshot; - private final ClientPreprocessors downstreamFilter; ListenerSnapshot(ListenerXdsResource listenerXdsResource) { this(listenerXdsResource, null); @@ -43,7 +41,6 @@ public final class ListenerSnapshot implements Snapshot { ListenerSnapshot(ListenerXdsResource listenerXdsResource, @Nullable RouteSnapshot routeSnapshot) { this.listenerXdsResource = listenerXdsResource; this.routeSnapshot = routeSnapshot; - downstreamFilter = FilterUtil.buildDownstreamFilter(listenerXdsResource.connectionManager()); } @Override @@ -59,13 +56,6 @@ public RouteSnapshot routeSnapshot() { return routeSnapshot; } - /** - * Returns the downstream filter. - */ - public ClientPreprocessors downstreamFilter() { - return downstreamFilter; - } - @Override public boolean equals(Object object) { if (this == object) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java index 06b884e6c12..12b61e61d49 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java @@ -16,19 +16,16 @@ package com.linecorp.armeria.xds; -import static com.google.common.base.Preconditions.checkArgument; - import java.util.List; -import com.google.protobuf.Any; - import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.client.endpoint.RouterFilterFactory.RouterXdsHttpFilter; +import com.linecorp.armeria.xds.filter.XdsHttpFilter; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; /** * A resource object for a {@link Listener}. @@ -36,42 +33,38 @@ @UnstableApi public final class ListenerXdsResource extends AbstractXdsResource { - private static final String HTTP_CONNECTION_MANAGER_TYPE_URL = - "type.googleapis.com/" + - "envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"; - private static final String ROUTER_TYPE_URL = - "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router"; - private final Listener listener; @Nullable private final HttpConnectionManager connectionManager; + private final List downstreamFilters; @Nullable private final Router router; - ListenerXdsResource(Listener listener) { - this(listener, ""); - } - - ListenerXdsResource(Listener listener, String version) { - this(listener, version, 0); + ListenerXdsResource(Listener listener, @Nullable HttpConnectionManager connectionManager, + List downstreamFilters, String version) { + this(listener, connectionManager, downstreamFilters, version, 0); } - private ListenerXdsResource(Listener listener, String version, long revision) { + private ListenerXdsResource(Listener listener, @Nullable HttpConnectionManager connectionManager, + List downstreamFilters, + String version, long revision) { super(version, revision); - XdsValidatorIndexRegistry.assertValid(listener); this.listener = listener; + this.connectionManager = connectionManager; + this.downstreamFilters = downstreamFilters; + router = findRouter(downstreamFilters); + } - if (listener.getApiListener().hasApiListener()) { - final Any apiListener = listener.getApiListener().getApiListener(); - if (HTTP_CONNECTION_MANAGER_TYPE_URL.equals(apiListener.getTypeUrl())) { - connectionManager = XdsValidatorIndexRegistry.unpack(apiListener, HttpConnectionManager.class); - } else { - throw new IllegalArgumentException("Unsupported api listener: " + apiListener); - } - } else { - connectionManager = null; + @Nullable + private static Router findRouter(List filters) { + if (filters.isEmpty()) { + return null; + } + final XdsHttpFilter last = filters.get(filters.size() - 1); + if (last instanceof RouterXdsHttpFilter) { + return ((RouterXdsHttpFilter) last).router(); } - router = router(connectionManager); + return null; } @Override @@ -102,7 +95,8 @@ ListenerXdsResource withRevision(long revision) { if (revision == revision()) { return this; } - return new ListenerXdsResource(listener, version(), revision); + return new ListenerXdsResource(listener, connectionManager, downstreamFilters, + version(), revision); } /** @@ -112,22 +106,4 @@ ListenerXdsResource withRevision(long revision) { public Router router() { return router; } - - @Nullable - private static Router router(@Nullable HttpConnectionManager connectionManager) { - if (connectionManager == null) { - return null; - } - final List httpFilters = connectionManager.getHttpFiltersList(); - if (httpFilters.isEmpty()) { - return null; - } - final HttpFilter lastHttpFilter = httpFilters.get(httpFilters.size() - 1); - if (!ROUTER_TYPE_URL.equals(lastHttpFilter.getTypedConfig().getTypeUrl())) { - // the router should be the last/terminal filter - return null; - } - checkArgument(lastHttpFilter.hasTypedConfig(), "Only typedConfig is supported for 'Router'."); - return XdsValidatorIndexRegistry.unpack(lastHttpFilter.getTypedConfig(), Router.class); - } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java new file mode 100644 index 00000000000..652a138617b --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java @@ -0,0 +1,54 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.common.annotation.Nullable; + +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; + +final class RawBufferTransportSocketFactory implements TransportSocketFactory { + + static final RawBufferTransportSocketFactory INSTANCE = new RawBufferTransportSocketFactory(); + private static final String NAME = "envoy.transport_sockets.raw_buffer"; + private static final String TYPE_URL = + "type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer"; + private static final List TYPE_URLS = ImmutableList.of(TYPE_URL); + + private RawBufferTransportSocketFactory() {} + + @Override + public String name() { + return NAME; + } + + @Override + public List typeUrls() { + return TYPE_URLS; + } + + @Override + public SnapshotStream create( + SubscriptionContext context, @Nullable ConfigSource configSource, + TransportSocket transportSocket) { + return SnapshotStream.just(new TransportSocketSnapshot(transportSocket)); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java index 1606bb4dcd2..47942a5fe15 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java @@ -31,9 +31,10 @@ abstract class ResourceParser { abstract Class clazz(); - abstract O parse(I message, String version); + abstract O parse(I message, XdsExtensionRegistry extensionRegistry, String version); - ParsedResourcesHolder parseResources(List resources, String version) { + ParsedResourcesHolder parseResources(List resources, XdsExtensionRegistry extensionRegistry, + String version) { final ImmutableMap.Builder parsedResources = ImmutableMap.builder(); final ImmutableMap.Builder invalidResources = ImmutableMap.builder(); @@ -51,7 +52,8 @@ ParsedResourcesHolder parseResources(List resources, String version) { final String name = name(unpackedMessage); final O resourceUpdate; try { - resourceUpdate = parse(unpackedMessage, version); + extensionRegistry.assertValid(unpackedMessage); + resourceUpdate = parse(unpackedMessage, extensionRegistry, version); } catch (Exception e) { invalidResources.put(name, e); continue; @@ -65,7 +67,8 @@ ParsedResourcesHolder parseResources(List resources, String version) { invalidResources.buildKeepingLast()); } - ParsedResourcesHolder parseDeltaResources(List resources) { + ParsedResourcesHolder parseDeltaResources(List resources, + XdsExtensionRegistry extensionRegistry) { final ImmutableMap.Builder parsedResources = ImmutableMap.builder(); final ImmutableMap.Builder invalidResources = ImmutableMap.builder(); @@ -83,13 +86,13 @@ ParsedResourcesHolder parseDeltaResources(List resources) { final String name = resource.getName(); final O resourceUpdate; try { - resourceUpdate = parse(unpackedMessage, resource.getVersion()); + extensionRegistry.assertValid(unpackedMessage); + resourceUpdate = parse(unpackedMessage, extensionRegistry, resource.getVersion()); } catch (Exception e) { invalidResources.put(name, e); continue; } - // Resource parsed successfully. parsedResources.put(name, resourceUpdate); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java index b3e0811212e..b6716fbb4e8 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java @@ -25,9 +25,12 @@ import com.google.protobuf.Any; import com.linecorp.armeria.client.ClientDecoration; +import com.linecorp.armeria.client.ClientPreprocessors; import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.HttpPreClient; import com.linecorp.armeria.client.RpcClient; +import com.linecorp.armeria.client.RpcPreClient; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; @@ -52,11 +55,13 @@ public final class RouteEntry { private final int index; private final HttpClient httpClient; private final RpcClient rpcClient; + private final HttpPreClient httpPreClient; + private final RpcPreClient rpcPreClient; private final RouteEntryMatcher matcher; RouteEntry(Route route, @Nullable ClusterSnapshot clusterSnapshot, int index, @Nullable ListenerXdsResource listenerResource, RouteXdsResource routeResource, - VirtualHostXdsResource vhostResource) { + VirtualHostXdsResource vhostResource, XdsExtensionRegistry extensionRegistry) { this.route = route; this.clusterSnapshot = clusterSnapshot; this.index = index; @@ -86,9 +91,21 @@ public final class RouteEntry { final RetryPolicy effectiveRetryPolicy = retryPolicy == RetryPolicy.getDefaultInstance() ? null : retryPolicy; final ClientDecoration clientDecoration = FilterUtil.buildUpstreamFilter( - upstreamFilters, filterConfigs, effectiveRetryPolicy); + extensionRegistry, upstreamFilters, filterConfigs, effectiveRetryPolicy); httpClient = clientDecoration.decorate(DelegatingHttpClient.of()); rpcClient = clientDecoration.rpcDecorate(DelegatingRpcClient.of()); + + // Build downstream filters (HCM http_filters) with per-route config + final List hcmHttpFilters; + if (listenerResource != null && listenerResource.connectionManager() != null) { + hcmHttpFilters = listenerResource.connectionManager().getHttpFiltersList(); + } else { + hcmHttpFilters = ImmutableList.of(); + } + final ClientPreprocessors downstreamPreprocessors = FilterUtil.buildDownstreamFilter( + extensionRegistry, hcmHttpFilters, filterConfigs); + httpPreClient = downstreamPreprocessors.decorate(DelegatingHttpClient.of()); + rpcPreClient = downstreamPreprocessors.rpcDecorate(DelegatingRpcClient.of()); } /** @@ -118,6 +135,22 @@ public Any filterConfig(String filterName) { return filterConfigs.get(filterName); } + /** + * Returns the downstream {@link HttpPreClient} chain for this route. + */ + @UnstableApi + public HttpPreClient httpPreClient() { + return httpPreClient; + } + + /** + * Returns the downstream {@link RpcPreClient} chain for this route. + */ + @UnstableApi + public RpcPreClient rpcPreClient() { + return rpcPreClient; + } + /** * Returns whether this route matches the specified {@link ClientRequestContext}. */ diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteResourceParser.java index fefca542c8e..4abc3759c74 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteResourceParser.java @@ -20,12 +20,13 @@ final class RouteResourceParser extends ResourceParser { - public static final RouteResourceParser INSTANCE = new RouteResourceParser(); + static final RouteResourceParser INSTANCE = new RouteResourceParser(); private RouteResourceParser() {} @Override - RouteXdsResource parse(RouteConfiguration message, String version) { + RouteXdsResource parse(RouteConfiguration message, XdsExtensionRegistry registry, + String version) { return new RouteXdsResource(message, version); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java index 5ffee74f89a..aab85fb375c 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java @@ -163,9 +163,11 @@ private static class RouteEntryStream extends RefCountedStream { @Override protected Subscription onStart(SnapshotWatcher watcher) { + final XdsExtensionRegistry extensionRegistry = context.extensionRegistry(); if (!route.getRoute().hasCluster()) { return SnapshotStream.just(new RouteEntry(route, null, index, - listenerResource, routeResource, vhostResource)) + listenerResource, routeResource, vhostResource, + extensionRegistry)) .subscribe(watcher); } final SnapshotWatcher mapped = (snapshot, t) -> { @@ -174,7 +176,8 @@ protected Subscription onStart(SnapshotWatcher watcher) { return; } watcher.onUpdate(new RouteEntry(route, snapshot, index, - listenerResource, routeResource, vhostResource), null); + listenerResource, routeResource, vhostResource, + extensionRegistry), null); }; return context.clusterManager().register(clusterName, context, mapped); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.java index edca0874074..38d2b33091d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.java @@ -34,7 +34,6 @@ public final class RouteXdsResource extends AbstractXdsResource { private RouteXdsResource(RouteConfiguration routeConfiguration, String version, long revision) { super(version, revision); - XdsValidatorIndexRegistry.assertValid(routeConfiguration); this.routeConfiguration = routeConfiguration; } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.java index 8bd3a814e31..420daedc613 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.java @@ -35,7 +35,7 @@ Class clazz() { } @Override - SecretXdsResource parse(Secret message, String version) { + SecretXdsResource parse(Secret message, XdsExtensionRegistry registry, String version) { return new SecretXdsResource(message, version); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.java b/xds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.java index c9486db6de2..2b61fa73b39 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.java @@ -40,7 +40,6 @@ public final class SecretXdsResource extends AbstractXdsResource { private SecretXdsResource(Secret secret, String version, long revision) { super(version, revision); - XdsValidatorIndexRegistry.assertValid(secret); this.secret = secret; } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java b/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java index 84b36848c30..c1f11fe6df9 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.Message; import com.google.rpc.Code; import com.google.rpc.Status; @@ -45,7 +44,7 @@ final class SotwActualStream implements StreamObserver, AdsXd private static final Logger logger = LoggerFactory.getLogger(SotwActualStream.class); // NACK backoff to prevent hot loops when the server keeps sending bad responses - static final long NACK_BACKOFF_MILLIS = 3_000L; + static final long NACK_BACKOFF_MILLIS = 1_000L; private final StreamObserver requestObserver; private final AdsXdsStream owner; @@ -172,10 +171,10 @@ public void onCompleted() { owner.retryOrClose(false); } - private void handleResponse( - ResourceParser resourceParser, DiscoveryResponse response) { + private void handleResponse(ResourceParser resourceParser, DiscoveryResponse response) { final ParsedResourcesHolder holder = - resourceParser.parseResources(response.getResourcesList(), response.getVersionInfo()); + resourceParser.parseResources(response.getResourcesList(), + stateCoordinator.extensionRegistry(), response.getVersionInfo()); if (!holder.errors().isEmpty()) { holder.invalidResources().forEach((name, error) -> stateCoordinator.onResourceError( diff --git a/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java b/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java index 209a530c5a3..3a2627ce642 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java @@ -28,10 +28,17 @@ final class StateCoordinator implements SafeCloseable { private final SubscriberStorage subscriberStorage; private final ResourceStateStore stateStore; + private final XdsExtensionRegistry extensionRegistry; - StateCoordinator(EventExecutor eventLoop, long timeoutMillis, boolean delta) { + StateCoordinator(EventExecutor eventLoop, long timeoutMillis, boolean delta, + XdsExtensionRegistry extensionRegistry) { subscriberStorage = new SubscriberStorage(eventLoop, timeoutMillis, delta); stateStore = new ResourceStateStore(); + this.extensionRegistry = extensionRegistry; + } + + XdsExtensionRegistry extensionRegistry() { + return extensionRegistry; } boolean register(XdsType type, String resourceName, ResourceWatcher watcher) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java b/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java index 2e9199d2aa0..bb912ff1ce6 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java @@ -43,4 +43,6 @@ interface SubscriptionContext { BootstrapSecrets bootstrapSecrets(); ResourceNodeMeterBinderFactory meterBinderFactory(); + + XdsExtensionRegistry extensionRegistry(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java new file mode 100644 index 00000000000..09ede15b0cf --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import com.linecorp.armeria.common.annotation.Nullable; + +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; + +interface TransportSocketFactory extends XdsExtensionFactory { + + SnapshotStream create(SubscriptionContext context, + @Nullable ConfigSource configSource, + TransportSocket transportSocket); +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java index bc901956a1c..fe6634047e5 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java @@ -16,18 +16,10 @@ package com.linecorp.armeria.xds; -import java.util.Optional; - import com.linecorp.armeria.common.annotation.Nullable; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.TransportSocket; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext.CombinedCertificateValidationContext; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsCertificate; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; final class TransportSocketStream extends RefCountedStream { @@ -45,68 +37,18 @@ final class TransportSocketStream extends RefCountedStream watcher) { - if (!"envoy.transport_sockets.tls".equals(transportSocket.getName())) { + if (transportSocket.equals(TransportSocket.getDefaultInstance())) { return SnapshotStream.just(new TransportSocketSnapshot(transportSocket)) .subscribe(watcher); } - if (!transportSocket.hasTypedConfig()) { - return SnapshotStream.just(new TransportSocketSnapshot(TransportSocket.getDefaultInstance())) - .subscribe(watcher); - } - final UpstreamTlsContext tlsContext = XdsValidatorIndexRegistry.unpack(transportSocket.getTypedConfig(), - UpstreamTlsContext.class); - final CommonTlsContext commonTlsContext = tlsContext.getCommonTlsContext(); - - final SnapshotStream> validationStream; - - if (commonTlsContext.hasValidationContext()) { - final Secret secret = Secret.newBuilder() - .setValidationContext(commonTlsContext.getValidationContext()) - .build(); - final SecretStream secretStream = new SecretStream(secret, context); - validationStream = secretStream - .switchMapEager(resource -> new CertificateValidationContextStream(context, resource)) - .map(Optional::of); - } else if (commonTlsContext.hasValidationContextSdsSecretConfig()) { - final SdsSecretConfig sdsConfig = commonTlsContext.getValidationContextSdsSecretConfig(); - final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); - validationStream = secretStream - .switchMapEager(resource -> new CertificateValidationContextStream(context, resource)) - .map(Optional::of); - } else if (commonTlsContext.hasCombinedValidationContext()) { - final CombinedCertificateValidationContext combined = - commonTlsContext.getCombinedValidationContext(); - final SdsSecretConfig sdsConfig = combined.getValidationContextSdsSecretConfig(); - final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); - validationStream = secretStream.switchMapEager(resource -> new CertificateValidationContextStream( - context, resource, combined.getDefaultValidationContext())) - .map(Optional::of); - } else { - validationStream = SnapshotStream.empty(); - } - - final SnapshotStream> tlsCertStream; - if (!commonTlsContext.getTlsCertificatesList().isEmpty()) { - final TlsCertificate tlsCertificate = commonTlsContext.getTlsCertificatesList().get(0); - final Secret secret = Secret.newBuilder().setTlsCertificate(tlsCertificate).build(); - final SecretStream secretStream = new SecretStream(secret, context); - tlsCertStream = secretStream.switchMapEager(resource -> new TlsCertificateStream(context, resource)) - .map(Optional::of); - } else if (!commonTlsContext.getTlsCertificateSdsSecretConfigsList().isEmpty()) { - final SdsSecretConfig sdsConfig = - commonTlsContext.getTlsCertificateSdsSecretConfigsList().get(0); - final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); - tlsCertStream = secretStream.switchMapEager(resource -> new TlsCertificateStream(context, resource)) - .map(Optional::of); - } else { - // static - tlsCertStream = SnapshotStream.empty(); + final TransportSocketFactory factory = context.extensionRegistry().query( + transportSocket.getTypedConfig(), transportSocket.getName(), + TransportSocketFactory.class); + if (factory == null) { + throw new IllegalArgumentException( + "No TransportSocketFactory registered for transport socket: " + + transportSocket.getName()); } - - final SnapshotStream stream = - SnapshotStream.combineLatest(tlsCertStream, validationStream, (cert, validation) -> { - return new TransportSocketSnapshot(transportSocket, cert, validation); - }); - return stream.subscribe(watcher); + return factory.create(context, configSource, transportSocket).subscribe(watcher); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java new file mode 100644 index 00000000000..311b866d65a --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java @@ -0,0 +1,116 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.List; +import java.util.Optional; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.common.annotation.Nullable; + +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext.CombinedCertificateValidationContext; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsCertificate; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; + +final class UpstreamTlsTransportSocketFactory implements TransportSocketFactory { + + static final UpstreamTlsTransportSocketFactory INSTANCE = new UpstreamTlsTransportSocketFactory(); + private static final String NAME = "envoy.transport_sockets.tls"; + private static final String TYPE_URL = + "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext"; + private static final List TYPE_URLS = ImmutableList.of(TYPE_URL); + + private UpstreamTlsTransportSocketFactory() {} + + @Override + public String name() { + return NAME; + } + + @Override + public List typeUrls() { + return TYPE_URLS; + } + + @Override + public SnapshotStream create( + SubscriptionContext context, @Nullable ConfigSource configSource, + TransportSocket transportSocket) { + if (!transportSocket.hasTypedConfig()) { + return SnapshotStream.just(new TransportSocketSnapshot(TransportSocket.getDefaultInstance())); + } + final UpstreamTlsContext tlsContext = context.extensionRegistry().unpack( + transportSocket.getTypedConfig(), UpstreamTlsContext.class); + final CommonTlsContext commonTlsContext = tlsContext.getCommonTlsContext(); + + final SnapshotStream> validationStream; + + if (commonTlsContext.hasValidationContext()) { + final Secret secret = Secret.newBuilder() + .setValidationContext(commonTlsContext.getValidationContext()) + .build(); + final SecretStream secretStream = new SecretStream(secret, context); + validationStream = secretStream + .switchMapEager(resource -> new CertificateValidationContextStream(context, resource)) + .map(Optional::of); + } else if (commonTlsContext.hasValidationContextSdsSecretConfig()) { + final SdsSecretConfig sdsConfig = commonTlsContext.getValidationContextSdsSecretConfig(); + final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); + validationStream = secretStream + .switchMapEager(resource -> new CertificateValidationContextStream(context, resource)) + .map(Optional::of); + } else if (commonTlsContext.hasCombinedValidationContext()) { + final CombinedCertificateValidationContext combined = + commonTlsContext.getCombinedValidationContext(); + final SdsSecretConfig sdsConfig = combined.getValidationContextSdsSecretConfig(); + final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); + validationStream = secretStream.switchMapEager(resource -> new CertificateValidationContextStream( + context, resource, combined.getDefaultValidationContext())) + .map(Optional::of); + } else { + validationStream = SnapshotStream.empty(); + } + + final SnapshotStream> tlsCertStream; + if (!commonTlsContext.getTlsCertificatesList().isEmpty()) { + final TlsCertificate tlsCertificate = commonTlsContext.getTlsCertificatesList().get(0); + final Secret secret = Secret.newBuilder().setTlsCertificate(tlsCertificate).build(); + final SecretStream secretStream = new SecretStream(secret, context); + tlsCertStream = secretStream.switchMapEager(resource -> new TlsCertificateStream(context, resource)) + .map(Optional::of); + } else if (!commonTlsContext.getTlsCertificateSdsSecretConfigsList().isEmpty()) { + final SdsSecretConfig sdsConfig = + commonTlsContext.getTlsCertificateSdsSecretConfigsList().get(0); + final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); + tlsCertStream = secretStream.switchMapEager(resource -> new TlsCertificateStream(context, resource)) + .map(Optional::of); + } else { + // static + tlsCertStream = SnapshotStream.empty(); + } + + return SnapshotStream.combineLatest(tlsCertStream, validationStream, (cert, validation) -> { + return new TransportSocketSnapshot(transportSocket, cert, validation); + }); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.java b/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.java index ad35547df20..cb6e5900005 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.java @@ -31,7 +31,6 @@ public final class VirtualHostXdsResource extends AbstractXdsResource { VirtualHostXdsResource(VirtualHost virtualHost, String version, long revision) { super(version, revision); - XdsValidatorIndexRegistry.assertValid(virtualHost); this.virtualHost = virtualHost; } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java index 9b6204195f4..fa62c6e7f08 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java @@ -47,8 +47,11 @@ final class XdsBootstrapImpl implements XdsBootstrap { this.bootstrap = bootstrap; this.defaultWatcher = defaultWatcher; this.eventLoop = requireNonNull(eventLoop, "eventLoop"); - clusterManager = new XdsClusterManager(eventLoop, bootstrap, meterIdPrefix, meterRegistry); + final XdsResourceValidator resourceValidator = new XdsResourceValidator(); + final XdsExtensionRegistry extensionRegistry = XdsExtensionRegistry.of(resourceValidator); + extensionRegistry.assertValid(bootstrap); watchService = new DirectoryWatchService(); + clusterManager = new XdsClusterManager(eventLoop, bootstrap, meterIdPrefix, meterRegistry); final BootstrapClusters bootstrapClusters = new BootstrapClusters(bootstrap, clusterManager, defaultWatcher); final BootstrapSecrets bootstrapSecrets = new BootstrapSecrets(bootstrap); @@ -56,10 +59,10 @@ final class XdsBootstrapImpl implements XdsBootstrap { final ConfigSourceMapper configSourceMapper = new ConfigSourceMapper(bootstrap); controlPlaneClientManager = new ControlPlaneClientManager( bootstrap, eventLoop, bootstrapClusters, - configSourceMapper, meterRegistry, meterIdPrefix); + configSourceMapper, meterRegistry, meterIdPrefix, extensionRegistry); subscriptionContext = new DefaultSubscriptionContext( eventLoop, clusterManager, configSourceMapper, controlPlaneClientManager, - meterRegistry, meterIdPrefix, watchService, bootstrapSecrets); + meterRegistry, meterIdPrefix, watchService, bootstrapSecrets, extensionRegistry); bootstrapClusters.initializeStaticClusters(subscriptionContext); listenerManager = new ListenerManager(eventLoop, bootstrap, subscriptionContext, defaultWatcher); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java deleted file mode 100644 index 28198f0d51e..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2023 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.linecorp.armeria.common.annotation.Nullable; - -import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; -import io.envoyproxy.envoy.config.core.v3.ApiConfigSource.ApiType; -import io.envoyproxy.envoy.config.core.v3.ConfigSource; - -final class XdsConverterUtil { - - private XdsConverterUtil() {} - - static void validateConfigSource(@Nullable ConfigSource configSource) { - if (configSource == null || configSource.equals(ConfigSource.getDefaultInstance())) { - return; - } - checkArgument(configSource.hasAds() || configSource.hasApiConfigSource() || configSource.hasSelf(), - "Only one of (Ads, ApiConfigSource, or Self) type ConfigSource is supported for %s", - configSource); - if (configSource.hasApiConfigSource()) { - final ApiConfigSource apiConfigSource = configSource.getApiConfigSource(); - final ApiType apiType = apiConfigSource.getApiType(); - checkArgument(apiType == ApiType.GRPC || apiType == ApiType.DELTA_GRPC || - apiType == ApiType.AGGREGATED_GRPC || apiType == ApiType.AGGREGATED_DELTA_GRPC, - "Unsupported apiType %s. Only GRPC, DELTA_GRPC, AGGREGATED_GRPC and " + - "AGGREGATED_DELTA_GRPC are supported.", apiType); - checkArgument(apiConfigSource.getGrpcServicesCount() > 0, - "At least once GrpcService is required for ApiConfigSource for %s", configSource); - apiConfigSource.getGrpcServicesList().forEach( - grpcService -> checkArgument(grpcService.hasEnvoyGrpc(), - "Only envoyGrpc is supported for %s", grpcService)); - } - } -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionFactory.java new file mode 100644 index 00000000000..b8753e9405b --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.List; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * Base interface for xDS extension factories, resolved by the {@code XdsExtensionRegistry}. + * + *

Subtype-specific behavior (e.g. HTTP filter creation) lives on sub-interfaces + * such as {@link com.linecorp.armeria.xds.filter.HttpFilterFactory}. + */ +@UnstableApi +public interface XdsExtensionFactory { + + /** + * Returns the extension name for registry resolution (required, non-nullable). + * For example, {@code "envoy.filters.http.router"} or {@code "envoy.transport_sockets.tls"}. + */ + String name(); + + /** + * Returns the type URLs for registry resolution. + * For example, + * {@code List.of("type.googleapis.com/envoy.extensions.filters.http.router.v3.Router")}. + * Returns an empty list if this factory has no type-URL-based registration. + */ + default List typeUrls() { + return ImmutableList.of(); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java new file mode 100644 index 00000000000..2cadbdc9351 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java @@ -0,0 +1,154 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.Map; +import java.util.ServiceLoader; + +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.Message; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.filter.HttpFilterFactory; + +/** + * A dual-key registry for {@link XdsExtensionFactory} instances. + * Factories are resolved by type URL (primary) or extension name (fallback). + * + *

Also serves as the single entry point for {@link Any}-related operations: + * factory lookup ({@link #query}) and proto decode ({@link #unpack}). + */ +final class XdsExtensionRegistry { + + private final Map byTypeUrl; + private final Map byName; + private final XdsResourceValidator validator; + + private XdsExtensionRegistry(Map byTypeUrl, + Map byName, + XdsResourceValidator validator) { + this.byTypeUrl = byTypeUrl; + this.byName = byName; + this.validator = validator; + } + + static XdsExtensionRegistry of(XdsResourceValidator validator) { + final ImmutableMap.Builder byName = ImmutableMap.builder(); + final ImmutableMap.Builder byTypeUrl = ImmutableMap.builder(); + + // Load SPI-discovered HttpFilterFactory instances as base factories + ServiceLoader.load(HttpFilterFactory.class).forEach(factory -> { + register(factory, byName, byTypeUrl); + }); + + // Built-in network filter factories + register(HttpConnectionManagerFactory.INSTANCE, byName, byTypeUrl); + + // Built-in transport socket factories + register(UpstreamTlsTransportSocketFactory.INSTANCE, byName, byTypeUrl); + register(RawBufferTransportSocketFactory.INSTANCE, byName, byTypeUrl); + + return new XdsExtensionRegistry(byTypeUrl.build(), byName.build(), validator); + } + + private static void register(XdsExtensionFactory factory, + ImmutableMap.Builder byName, + ImmutableMap.Builder byTypeUrl) { + byName.put(factory.name(), factory); + for (String typeUrl : factory.typeUrls()) { + byTypeUrl.put(typeUrl, factory); + } + } + + XdsResourceValidator validator() { + return validator; + } + + /** + * Validates the given message using both pgv structural validation and supported-field + * validation. + */ + void assertValid(Object message) { + validator.assertValid(message); + } + + /** + * Unpacks an {@link Any} into the expected proto type using the validator. + */ + T unpack(Any any, Class expectedType) { + return validator.unpack(any, expectedType); + } + + /** + * Looks up a factory by typeUrl and validates it implements the expected type. + * Returns {@code null} if no factory is registered. + * + * @throws IllegalArgumentException if the factory does not implement the expected interface + */ + @Nullable + T queryByTypeUrl(String typeUrl, Class expectedType) { + final XdsExtensionFactory factory = byTypeUrl.get(typeUrl); + if (factory == null) { + return null; + } + if (!expectedType.isInstance(factory)) { + throw new IllegalArgumentException( + "Factory for typeUrl '" + typeUrl + "' is " + factory.getClass().getName() + + ", expected " + expectedType.getName()); + } + return expectedType.cast(factory); + } + + /** + * Looks up a factory by name and validates it implements the expected type. + * Returns {@code null} if no factory is registered. + * + * @throws IllegalArgumentException if the factory does not implement the expected interface + */ + @Nullable + T queryByName(String name, Class expectedType) { + final XdsExtensionFactory factory = byName.get(name); + if (factory == null) { + return null; + } + if (!expectedType.isInstance(factory)) { + throw new IllegalArgumentException( + "Factory for name '" + name + "' is " + factory.getClass().getName() + + ", expected " + expectedType.getName()); + } + return expectedType.cast(factory); + } + + /** + * Resolves a factory by {@link Any}'s typeUrl first, then by name. + * Returns {@code null} if no factory is found. + * + * @throws IllegalArgumentException if a found factory does not implement the expected interface + */ + @Nullable + T query(Any any, @Nullable String name, Class expectedType) { + final T factory = queryByTypeUrl(any.getTypeUrl(), expectedType); + if (factory != null) { + return factory; + } + if (name != null) { + return queryByName(name, expectedType); + } + return null; + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsResourceValidator.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsResourceValidator.java new file mode 100644 index 00000000000..f290e510877 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsResourceValidator.java @@ -0,0 +1,91 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import static java.util.Objects.requireNonNull; + +import java.util.Comparator; +import java.util.ServiceLoader; + +import com.google.common.collect.Streams; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.validator.XdsValidatorIndex; + +/** + * Per-bootstrap validator that delegates to the highest-priority {@link XdsValidatorIndex} loaded via SPI. + * Created once in {@link XdsBootstrapBuilder#build()} and threaded through the entire resource pipeline. + * + *

Validation is performed at exactly two levels: + *

    + *
  • Static resources — {@link XdsBootstrapImpl} calls {@link #assertValid(Object)} once + * on the entire {@code Bootstrap} message at construction time. Both pgv and supported-field + * validators recurse into nested messages, so this single call covers all static clusters, + * listeners, secrets, and their sub-messages. Inline sub-resources (e.g. {@code VirtualHost} + * within a {@code RouteConfiguration}, {@code ClusterLoadAssignment} within a {@code Cluster}) + * are covered by parent validation and do not need separate calls.
  • + *
  • Dynamic resources — calls + * {@link #assertValid(Object)} on each top-level resource unpacked from a + * {@code DiscoveryResponse}. Validation failures are caught and reported as invalid + * resources (NACK'd back to the control plane).
  • + *
+ * + *

In addition, {@link #unpack(Any, Class)} is used for {@code google.protobuf.Any}-typed + * fields that cannot be validated by parent recursion (since {@code Any} is opaque to protobuf + * field traversal). + */ +@UnstableApi +public final class XdsResourceValidator { + + private static final XdsValidatorIndex spiValidator = + Streams.stream(ServiceLoader.load(XdsValidatorIndex.class, + XdsValidatorIndex.class.getClassLoader())) + .max(Comparator.comparingInt(XdsValidatorIndex::priority)) + .orElse(XdsValidatorIndex.noop()); + + XdsResourceValidator() { + } + + /** + * Validates the given message using the SPI-loaded {@link XdsValidatorIndex}. + */ + void assertValid(Object message) { + requireNonNull(message, "message"); + spiValidator.assertValid(message); + } + + /** + * Unpacks an {@link Any} message into the given class and validates the result. + * This is necessary for {@code Any}-typed fields because protobuf treats {@code Any} + * as an opaque blob — parent-level validation cannot recurse into it. + */ + public T unpack(Any message, Class clazz) { + requireNonNull(message, "message"); + requireNonNull(clazz, "clazz"); + final T unpacked; + try { + unpacked = message.unpack(clazz); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("Error unpacking: " + clazz.getName(), e); + } + assertValid(unpacked); + return unpacked; + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java index 61b208e3117..487965a3349 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java @@ -53,7 +53,7 @@ class XdsStreamSubscriber implements SafeCloseable { restartTimer(); } - void restartTimer() { + private void restartTimer() { if (!enableAbsentOnTimeout) { return; } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsValidatorIndexRegistry.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsValidatorIndexRegistry.java deleted file mode 100644 index f82221f9cc5..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsValidatorIndexRegistry.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2025 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import java.util.Comparator; -import java.util.ServiceLoader; - -import com.google.common.collect.Streams; -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; - -import com.linecorp.armeria.xds.validator.XdsValidatorIndex; - -final class XdsValidatorIndexRegistry { - - private static final XdsValidatorIndex xdsValidatorIndex = - Streams.stream(ServiceLoader.load(XdsValidatorIndex.class, - XdsValidatorIndex.class.getClassLoader())) - .max(Comparator.comparingInt(XdsValidatorIndex::priority)) - .orElse(XdsValidatorIndex.noop()); - - static void assertValid(Object message) { - xdsValidatorIndex.assertValid(message); - } - - static T unpack(Any message, Class clazz) { - final T unpacked; - try { - unpacked = message.unpack(clazz); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException("Error unpacking: " + clazz.getName(), e); - } - xdsValidatorIndex.assertValid(unpacked); - return unpacked; - } - - private XdsValidatorIndexRegistry() { - } -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouteConfig.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouteConfig.java index 3c8177c7d0f..a17813eacdc 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouteConfig.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouteConfig.java @@ -16,28 +16,19 @@ package com.linecorp.armeria.xds.client.endpoint; -import com.linecorp.armeria.client.HttpPreClient; import com.linecorp.armeria.client.PreClientRequestContext; -import com.linecorp.armeria.client.RpcPreClient; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.ListenerSnapshot; import com.linecorp.armeria.xds.RouteEntry; import com.linecorp.armeria.xds.RouteSnapshot; import com.linecorp.armeria.xds.VirtualHostSnapshot; -import com.linecorp.armeria.xds.internal.DelegatingHttpClient; -import com.linecorp.armeria.xds.internal.DelegatingRpcClient; final class RouteConfig { private final ListenerSnapshot listenerSnapshot; - - private final HttpPreClient httpPreClient; - private final RpcPreClient rpcPreClient; private final VirtualHostMatcher virtualHostMatcher; RouteConfig(ListenerSnapshot listenerSnapshot) { this.listenerSnapshot = listenerSnapshot; - httpPreClient = listenerSnapshot.downstreamFilter().decorate(DelegatingHttpClient.of()); - rpcPreClient = listenerSnapshot.downstreamFilter().rpcDecorate(DelegatingRpcClient.of()); virtualHostMatcher = new VirtualHostMatcher(listenerSnapshot); } @@ -45,14 +36,6 @@ ListenerSnapshot listenerSnapshot() { return listenerSnapshot; } - HttpPreClient httpPreClient() { - return httpPreClient; - } - - RpcPreClient rpcPreClient() { - return rpcPreClient; - } - @Nullable RouteEntry select(PreClientRequestContext ctx) { final RouteSnapshot routeSnapshot = listenerSnapshot.routeSnapshot(); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java index 301c72678e4..e305471458f 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java @@ -16,7 +16,7 @@ package com.linecorp.armeria.xds.client.endpoint; -import static com.linecorp.armeria.xds.client.endpoint.XdsAttributeKeys.ROUTE_CONFIG; +import static com.linecorp.armeria.xds.client.endpoint.XdsAttributeKeys.SELECTED_ROUTE; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.PreClient; @@ -35,23 +35,15 @@ final class RouterFilter implements Prepr @Override public O execute(PreClient delegate, PreClientRequestContext ctx, I req) throws Exception { - final RouteConfig routeConfig = ctx.attr(ROUTE_CONFIG); - if (routeConfig == null) { + final RouteEntry selectedRoute = ctx.attr(SELECTED_ROUTE); + if (selectedRoute == null) { final UnprocessedRequestException e = UnprocessedRequestException.of( new IllegalArgumentException( - "RouteConfig is not set for the ctx. If a new ctx has been used, " + + "SELECTED_ROUTE is not set for the ctx. If a new ctx has been used, " + "please make sure to use ctx.newDerivedContext().")); ctx.cancel(e); throw e; } - final RouteEntry selectedRoute = routeConfig.select(ctx); - if (selectedRoute == null) { - final UnprocessedRequestException e = UnprocessedRequestException.of( - new IllegalArgumentException("No route has been selected for listener '" + - routeConfig.listenerSnapshot() + '.')); - ctx.cancel(e); - throw e; - } final ClusterSnapshot clusterSnapshot = selectedRoute.clusterSnapshot(); if (clusterSnapshot == null) { final UnprocessedRequestException e = UnprocessedRequestException.of( diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java index f4f3aa28694..4e7aa91023c 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java @@ -16,6 +16,9 @@ package com.linecorp.armeria.xds.client.endpoint; +import java.util.List; + +import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.linecorp.armeria.client.HttpPreprocessor; @@ -25,6 +28,7 @@ import com.linecorp.armeria.common.RpcRequest; import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.XdsResourceValidator; import com.linecorp.armeria.xds.filter.HttpFilterFactory; import com.linecorp.armeria.xds.filter.XdsHttpFilter; @@ -38,10 +42,52 @@ public final class RouterFilterFactory implements HttpFilterFactory { private static final String NAME = "envoy.filters.http.router"; + private static final String TYPE_URL = + "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router"; + private static final List TYPE_URLS = ImmutableList.of(TYPE_URL); private static final RouterFilter rpcFilter = new RouterFilter<>(); private static final RouterFilter httpFilter = new RouterFilter<>(); - private static final XdsHttpFilter ROUTER_FILTER = new XdsHttpFilter() { + @Override + public String name() { + return NAME; + } + + @Override + public List typeUrls() { + return TYPE_URLS; + } + + @Override + public XdsHttpFilter create(HttpFilter filter, Any config, XdsResourceValidator validator) { + final Router router; + if (config == Any.getDefaultInstance()) { + router = Router.getDefaultInstance(); + } else { + router = validator.unpack(config, Router.class); + } + return new RouterXdsHttpFilter(router); + } + + /** + * An {@link XdsHttpFilter} that holds the parsed {@link Router} config. + */ + @UnstableApi + public static final class RouterXdsHttpFilter implements XdsHttpFilter { + + private final Router router; + + RouterXdsHttpFilter(Router router) { + this.router = router; + } + + /** + * Returns the {@link Router} config. + */ + public Router router() { + return router; + } + @Override public HttpPreprocessor httpPreprocessor() { return httpFilter::execute; @@ -51,15 +97,5 @@ public HttpPreprocessor httpPreprocessor() { public RpcPreprocessor rpcPreprocessor() { return rpcFilter::execute; } - }; - - @Override - public String filterName() { - return NAME; - } - - @Override - public XdsHttpFilter create(HttpFilter filter, Any config) { - return ROUTER_FILTER; } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java index 320924749c5..1124b5bb7fc 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.xds.client.endpoint; +import com.linecorp.armeria.xds.RouteEntry; + import io.envoyproxy.envoy.config.core.v3.Metadata; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; @@ -29,8 +31,8 @@ final class XdsAttributeKeys { AttributeKey.valueOf(XdsAttributeKeys.class, "LOCALITY_LB_ENDPOINTS_KEY"); static final AttributeKey XDS_RANDOM = AttributeKey.valueOf(XdsAttributeKeys.class, "XDS_RANDOM"); - static final AttributeKey ROUTE_CONFIG = - AttributeKey.valueOf(XdsAttributeKeys.class, "ROUTE_CONFIG"); + static final AttributeKey SELECTED_ROUTE = + AttributeKey.valueOf(XdsAttributeKeys.class, "SELECTED_ROUTE"); static final AttributeKey ROUTE_METADATA_MATCH = AttributeKey.valueOf(XdsAttributeKeys.class, "ROUTE_METADATA_MATCH"); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHttpPreprocessor.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHttpPreprocessor.java index 220bee6bf35..9e0f551b027 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHttpPreprocessor.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHttpPreprocessor.java @@ -21,9 +21,11 @@ import com.linecorp.armeria.client.HttpPreprocessor; import com.linecorp.armeria.client.PreClient; import com.linecorp.armeria.client.PreClientRequestContext; +import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.RouteEntry; import com.linecorp.armeria.xds.XdsBootstrap; import com.linecorp.armeria.xds.internal.DelegatingHttpClient; @@ -60,7 +62,14 @@ private XdsHttpPreprocessor(String listenerName, XdsBootstrap xdsBootstrap) { @Override HttpResponse execute1(PreClient delegate, PreClientRequestContext ctx, HttpRequest req, RouteConfig routeConfig) throws Exception { + final RouteEntry selectedRoute = routeConfig.select(ctx); + if (selectedRoute == null) { + throw UnprocessedRequestException.of( + new IllegalArgumentException("No route for listener '" + + routeConfig.listenerSnapshot() + "'.")); + } + ctx.setAttr(XdsAttributeKeys.SELECTED_ROUTE, selectedRoute); DelegatingHttpClient.setDelegate(ctx, delegate); - return routeConfig.httpPreClient().execute(ctx, req); + return selectedRoute.httpPreClient().execute(ctx, req); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsPreprocessor.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsPreprocessor.java index bc5ce85cacf..fb74743c24a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsPreprocessor.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsPreprocessor.java @@ -16,8 +16,6 @@ package com.linecorp.armeria.xds.client.endpoint; -import static com.linecorp.armeria.xds.client.endpoint.XdsAttributeKeys.ROUTE_CONFIG; - import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -77,7 +75,6 @@ private O execute0(PreClient delegate, PreClientRequestContext ctx, I req, throw UnprocessedRequestException.of( new TimeoutException("Couldn't select a snapshot for listener '" + listenerName + "'.")); } - ctx.setAttr(ROUTE_CONFIG, routeConfig); return execute1(delegate, ctx, req, routeConfig); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsRpcPreprocessor.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsRpcPreprocessor.java index 1921377093e..4d170f97b69 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsRpcPreprocessor.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsRpcPreprocessor.java @@ -21,9 +21,11 @@ import com.linecorp.armeria.client.PreClient; import com.linecorp.armeria.client.PreClientRequestContext; import com.linecorp.armeria.client.RpcPreprocessor; +import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.common.RpcRequest; import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.RouteEntry; import com.linecorp.armeria.xds.XdsBootstrap; import com.linecorp.armeria.xds.internal.DelegatingRpcClient; @@ -60,7 +62,14 @@ private XdsRpcPreprocessor(String listenerName, XdsBootstrap xdsBootstrap) { @Override RpcResponse execute1(PreClient delegate, PreClientRequestContext ctx, RpcRequest req, RouteConfig routeConfig) throws Exception { + final RouteEntry selectedRoute = routeConfig.select(ctx); + if (selectedRoute == null) { + throw UnprocessedRequestException.of( + new IllegalArgumentException("No route for listener '" + + routeConfig.listenerSnapshot() + "'.")); + } + ctx.setAttr(XdsAttributeKeys.SELECTED_ROUTE, selectedRoute); DelegatingRpcClient.setDelegate(ctx, delegate); - return routeConfig.rpcPreClient().execute(ctx, req); + return selectedRoute.rpcPreClient().execute(ctx, req); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java index f513a737845..ffc0328d87d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java @@ -20,6 +20,8 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.XdsExtensionFactory; +import com.linecorp.armeria.xds.XdsResourceValidator; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; @@ -33,12 +35,7 @@ * Returning {@code null} from {@link #create} causes the filter to be silently skipped. */ @UnstableApi -public interface HttpFilterFactory { - - /** - * The filter name that should be equivalent to {@link HttpFilter#getName()}. - */ - String filterName(); +public interface HttpFilterFactory extends XdsExtensionFactory { /** * Creates an {@link XdsHttpFilter} for the given filter and its raw typed config. @@ -54,7 +51,8 @@ public interface HttpFilterFactory { * @param httpFilter the filter descriptor from {@link HttpConnectionManager#getHttpFiltersList()} * @param config the raw typed config {@link Any}; may be {@link Any#getDefaultInstance()} * if no config was provided + * @param validator the {@link XdsResourceValidator} for validating and unpacking {@link Any} protos */ @Nullable - XdsHttpFilter create(HttpFilter httpFilter, Any config); + XdsHttpFilter create(HttpFilter httpFilter, Any config, XdsResourceValidator validator); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java deleted file mode 100644 index bc8371044fa..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2025 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds.filter; - -import java.util.Map; -import java.util.ServiceLoader; - -import com.google.common.collect.ImmutableMap; - -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.annotation.UnstableApi; - -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; - -/** - * A registry for {@link HttpFilterFactory} implementations. - */ -@UnstableApi -public final class HttpFilterFactoryRegistry { - - private static final Map factories; - - static { - final ImmutableMap.Builder factoriesBuilder = ImmutableMap.builder(); - ServiceLoader.load(HttpFilterFactory.class).forEach(factory -> { - factoriesBuilder.put(factory.filterName(), factory); - }); - factories = factoriesBuilder.build(); - } - - /** - * Returns the registered {@link HttpFilterFactory}. - * - * @param name the name of the filter represented by {@link HttpFilter#getName()} - */ - @Nullable - public static HttpFilterFactory filterFactory(String name) { - return factories.get(name); - } - - private HttpFilterFactoryRegistry() { - } -} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java b/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java index ab359407203..7dd4127813e 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java @@ -37,7 +37,9 @@ class StateCoordinatorTest { @Test void lateSubscriberReceivesCachedResource() { - final StateCoordinator coordinator = new StateCoordinator(eventLoop.get(), 15_000, false); + final XdsExtensionRegistry extensionRegistry = XdsExtensionRegistry.of(new XdsResourceValidator()); + final StateCoordinator coordinator = new StateCoordinator(eventLoop.get(), 15_000, false, + extensionRegistry); final ClusterXdsResource resource = new ClusterXdsResource(createCluster(CLUSTER_NAME), "1").withRevision(1); coordinator.onResourceUpdated(XdsType.CLUSTER, CLUSTER_NAME, resource); @@ -51,7 +53,9 @@ void lateSubscriberReceivesCachedResource() { @Test void missingResourceNotCachedAfterRemoval() { - final StateCoordinator coordinator = new StateCoordinator(eventLoop.get(), 15_000, false); + final XdsExtensionRegistry extensionRegistry = XdsExtensionRegistry.of(new XdsResourceValidator()); + final StateCoordinator coordinator = new StateCoordinator(eventLoop.get(), 15_000, false, + extensionRegistry); final CapturingWatcher watcher1 = new CapturingWatcher(); coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, watcher1); @@ -70,7 +74,9 @@ void missingResourceNotCachedAfterRemoval() { @Test void stateRetainedAfterUnsubscribe() { - final StateCoordinator coordinator = new StateCoordinator(eventLoop.get(), 15_000, false); + final XdsExtensionRegistry extensionRegistry = XdsExtensionRegistry.of(new XdsResourceValidator()); + final StateCoordinator coordinator = new StateCoordinator(eventLoop.get(), 15_000, false, + extensionRegistry); final RouteXdsResource resource = new RouteXdsResource(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), "1") .withRevision(1); diff --git a/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java b/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java index 694548405fc..6fbe68aa93c 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java @@ -66,13 +66,15 @@ void nonClusterListenerTimeout() { storage.register(XdsType.ROUTE, ROUTE_NAME, watcher); await().atMost(1, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(watcher.missingType).isEqualTo(XdsType.ROUTE)); - assertThat(watcher.missingName).isEqualTo(ROUTE_NAME); + .untilAsserted(() -> { + assertThat(watcher.missingType).isEqualTo(XdsType.ROUTE); + assertThat(watcher.missingName).isEqualTo(ROUTE_NAME); + }); } private static final class CapturingWatcher implements ResourceWatcher { - private XdsType missingType; - private String missingName; + private volatile XdsType missingType; + private volatile String missingName; @Override public void onChanged(XdsResource update) {} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java new file mode 100644 index 00000000000..162d131e9bf --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +import com.google.protobuf.Any; +import com.google.protobuf.Duration; + +import com.linecorp.armeria.xds.filter.HttpFilterFactory; + +class XdsExtensionRegistryTest { + + private static final XdsResourceValidator VALIDATOR = new XdsResourceValidator(); + + private static XdsExtensionRegistry createRegistry() { + return XdsExtensionRegistry.of(VALIDATOR); + } + + @Test + void queryWithTypeMismatch() { + // HttpConnectionManagerFactory is registered by default and is not an HttpFilterFactory + final XdsExtensionRegistry registry = createRegistry(); + assertThatThrownBy(() -> registry.queryByTypeUrl( + "type.googleapis.com/envoy.extensions.filters.network" + + ".http_connection_manager.v3.HttpConnectionManager", + HttpFilterFactory.class)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("expected"); + } + + @Test + void spiFactoriesLoadedByDefault() { + // SPI should load RouterFilterFactory + final XdsExtensionRegistry registry = createRegistry(); + final HttpFilterFactory resolved = registry.queryByName( + "envoy.filters.http.router", HttpFilterFactory.class); + assertThat(resolved).isNotNull(); + assertThat(resolved).isInstanceOf(HttpFilterFactory.class); + } + + @Test + void emptyRegistryReturnsNull() { + final XdsExtensionRegistry registry = createRegistry(); + assertThat(registry.queryByName("nonexistent.filter", HttpFilterFactory.class)).isNull(); + assertThat(registry.queryByTypeUrl("type.googleapis.com/nonexistent", + HttpFilterFactory.class)).isNull(); + } + + @Test + void assertValidDelegatesToValidator() { + final XdsExtensionRegistry registry = createRegistry(); + // Should not throw for a valid message + final Duration valid = Duration.newBuilder().setSeconds(42).build(); + registry.assertValid(valid); + } + + @Test + void unpackDelegatesToValidator() { + final XdsExtensionRegistry registry = createRegistry(); + final Duration original = Duration.newBuilder().setSeconds(42).build(); + final Any packed = Any.pack(original); + final Duration unpacked = registry.unpack(packed, Duration.class); + assertThat(unpacked).isEqualTo(original); + } + + @Test + void queryPreferTypeUrl() { + final XdsExtensionRegistry registry = createRegistry(); + // RouterFilterFactory is registered by both name and type URL via SPI + final String routerTypeUrl = + "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router"; + final Any any = Any.newBuilder().setTypeUrl(routerTypeUrl).build(); + + // query() checks type URL first + assertThat(registry.query(any, "envoy.filters.http.router", HttpFilterFactory.class)) + .isNotNull(); + // falls back to name when type URL doesn't match + final Any unknownAny = Any.newBuilder().setTypeUrl("unknown").build(); + assertThat(registry.query(unknownAny, "envoy.filters.http.router", + HttpFilterFactory.class)) + .isNotNull(); + // returns null when neither matches + assertThat(registry.query(unknownAny, "unknown", HttpFilterFactory.class)).isNull(); + } +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsResourceValidatorTest.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsResourceValidatorTest.java new file mode 100644 index 00000000000..e2907ae8161 --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/XdsResourceValidatorTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +import com.google.protobuf.Any; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.pgv.ValidationException; + +class XdsResourceValidatorTest { + + @Test + void pgvValidationIsEnabled() { + final XdsResourceValidator validator = new XdsResourceValidator(); + final VirtualHost virtualHost = VirtualHost.getDefaultInstance(); + assertThatThrownBy(() -> validator.assertValid(virtualHost)) + .isInstanceOf(IllegalArgumentException.class) + .cause() + .isInstanceOf(ValidationException.class) + .hasMessageContaining("length must be at least 1 but got: 0"); + } + + @Test + void supportedFieldValidationRuns() { + // SPI loads DefaultXdsValidatorIndex which includes supported-field validation + final XdsResourceValidator validator = new XdsResourceValidator(); + // Cluster with name set (passes pgv) — should not throw + final Cluster cluster = Cluster.newBuilder().setName("test").build(); + validator.assertValid(cluster); + } + + @Test + void unpackValidatesUnpackedMessage() { + final XdsResourceValidator validator = new XdsResourceValidator(); + // pack a valid cluster + final Cluster cluster = Cluster.newBuilder().setName("test").build(); + final Any packed = Any.pack(cluster); + final Cluster unpacked = validator.unpack(packed, Cluster.class); + assertThat(unpacked.getName()).isEqualTo("test"); + } + + @Test + void unpackFailsOnInvalidMessage() { + final XdsResourceValidator validator = new XdsResourceValidator(); + // Pack a default VirtualHost (will fail pgv) + final VirtualHost vhost = VirtualHost.getDefaultInstance(); + final Any packed = Any.pack(vhost); + assertThatThrownBy(() -> validator.unpack(packed, VirtualHost.class)) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsValidatorIndexRegistryTest.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsValidatorIndexRegistryTest.java deleted file mode 100644 index 4fce41eed71..00000000000 --- a/xds/src/test/java/com/linecorp/armeria/xds/XdsValidatorIndexRegistryTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2025 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import org.junit.jupiter.api.Test; - -import io.envoyproxy.envoy.config.route.v3.VirtualHost; -import io.envoyproxy.pgv.ValidationException; - -class XdsValidatorIndexRegistryTest { - - @Test - void validationIsEnabled() throws Exception { - final VirtualHost virtualHost = VirtualHost.getDefaultInstance(); - assertThatThrownBy(() -> XdsValidatorIndexRegistry.assertValid(virtualHost)) - .isInstanceOf(IllegalArgumentException.class) - .cause() - .isInstanceOf(ValidationException.class) - .hasMessageContaining("length must be at least 1 but got: 0"); - } -}