Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static Stream<Arguments> testCases() {
port_value: 8081
""",
IllegalArgumentException.class,
"No route has been selected for listener "
"No route for listener "
),
Arguments.of(
//language=YAML
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void noMatchTest() {
.isInstanceOf(UnprocessedRequestException.class)
.cause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("No route has been selected for listener");
.hasMessageStartingWith("No route for listener");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.linecorp.armeria.xds;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;

final class ClusterResourceParser extends ResourceParser<Cluster, ClusterXdsResource> {

Expand All @@ -26,13 +25,8 @@ final class ClusterResourceParser extends ResourceParser<Cluster, ClusterXdsReso
private ClusterResourceParser() {}

@Override
ClusterXdsResource parse(Cluster cluster, String version) {
final ClusterXdsResource resource = new ClusterXdsResource(cluster, version);
if (cluster.hasEdsClusterConfig()) {
final EdsClusterConfig eds = cluster.getEdsClusterConfig();
XdsConverterUtil.validateConfigSource(eds.getEdsConfig());
}
return resource;
ClusterXdsResource parse(Cluster cluster, XdsExtensionRegistry registry, String version) {
return new ClusterXdsResource(cluster, version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@

package com.linecorp.armeria.xds;

import static com.google.common.base.Preconditions.checkArgument;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext;

/**
* A resource object for a {@link Cluster}.
Expand All @@ -31,8 +27,6 @@
public final class ClusterXdsResource extends AbstractXdsResource {

private final Cluster cluster;
@Nullable
UpstreamTlsContext upstreamTlsContext;

ClusterXdsResource(Cluster cluster) {
this(cluster, "");
Expand All @@ -44,33 +38,7 @@ public final class ClusterXdsResource extends AbstractXdsResource {

private ClusterXdsResource(Cluster cluster, String version, long revision) {
super(version, revision);
XdsValidatorIndexRegistry.assertValid(cluster);
this.cluster = cluster;
upstreamTlsContext = upstreamTlsContext(cluster);
}

@Nullable
private static UpstreamTlsContext upstreamTlsContext(Cluster cluster) {
if (cluster.hasTransportSocket()) {
final String transportSocketName = cluster.getTransportSocket().getName();
checkArgument("envoy.transport_sockets.tls".equals(transportSocketName),
"Unexpected tls transport socket name '%s'", transportSocketName);
if (!cluster.getTransportSocket().hasTypedConfig()) {
return UpstreamTlsContext.getDefaultInstance();
}
return XdsValidatorIndexRegistry.unpack(cluster.getTransportSocket().getTypedConfig(),
UpstreamTlsContext.class);
}
return null;
}

/**
* The upstream TLS context extracted from {@link Cluster#getTransportSocket()}.
*/
@Nullable
@UnstableApi
public UpstreamTlsContext upstreamTlsContext() {
return upstreamTlsContext;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ final class ConfigSourceClient implements SafeCloseable {
EventExecutor eventLoop,
Node node, BootstrapClusters bootstrapClusters,
ConfigSourceMapper configSourceMapper, MeterRegistry meterRegistry,
MeterIdPrefix meterIdPrefix) {
MeterIdPrefix meterIdPrefix,
XdsExtensionRegistry extensionRegistry) {
final ApiConfigSource apiConfigSource;
if (configSource.hasAds()) {
apiConfigSource = configSourceMapper.bootstrapAdsConfig();
Expand Down Expand Up @@ -87,7 +88,8 @@ final class ConfigSourceClient implements SafeCloseable {
apiType == ApiType.AGGREGATED_DELTA_GRPC;

final long fetchTimeoutMillis = initialFetchTimeoutMillis(configSource);
stateCoordinator = new StateCoordinator(eventLoop, fetchTimeoutMillis, isDelta);
stateCoordinator = new StateCoordinator(eventLoop, fetchTimeoutMillis, isDelta,
extensionRegistry);
final Backoff backoff = Backoff.ofDefault();
if (isAds) {
final ConfigSourceLifecycleObserver lifecycleObserver = metersFunction.apply("ads");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ final class ControlPlaneClientManager implements SafeCloseable {
private final ConfigSourceMapper configSourceMapper;
private final MeterRegistry meterRegistry;
private final MeterIdPrefix meterIdPrefix;
private final XdsExtensionRegistry extensionRegistry;
private final Map<ConfigSource, ConfigSourceClient> clientMap = new HashMap<>();
private boolean closed;

ControlPlaneClientManager(Bootstrap bootstrap, EventExecutor eventLoop,
BootstrapClusters bootstrapClusters,
ConfigSourceMapper configSourceMapper,
MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix) {
MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix,
XdsExtensionRegistry extensionRegistry) {
bootstrapNode = bootstrap.getNode();
this.eventLoop = eventLoop;
this.bootstrapClusters = bootstrapClusters;
this.configSourceMapper = configSourceMapper;
this.meterRegistry = meterRegistry;
this.meterIdPrefix = meterIdPrefix;
this.extensionRegistry = extensionRegistry;
}

void subscribe(ResourceNode<?> node) {
Expand All @@ -65,7 +68,7 @@ void subscribe(ResourceNode<?> node) {
final ConfigSourceClient client = clientMap.computeIfAbsent(
configSource, ignored -> new ConfigSourceClient(
configSource, eventLoop, bootstrapNode, bootstrapClusters, configSourceMapper,
meterRegistry, meterIdPrefix));
meterRegistry, meterIdPrefix, extensionRegistry));
client.addSubscriber(type, name, node);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ final class DefaultSubscriptionContext implements SubscriptionContext {
private final DirectoryWatchService watchService;
private final BootstrapSecrets bootstrapSecrets;
private final ResourceNodeMeterBinderFactory meterBinderFactory;
private final XdsExtensionRegistry extensionRegistry;

DefaultSubscriptionContext(EventExecutor eventLoop, XdsClusterManager clusterManager,
ConfigSourceMapper configSourceMapper,
ControlPlaneClientManager controlPlaneClientManager,
MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix,
DirectoryWatchService watchService, BootstrapSecrets bootstrapSecrets) {
DirectoryWatchService watchService, BootstrapSecrets bootstrapSecrets,
XdsExtensionRegistry extensionRegistry) {
this.eventLoop = eventLoop;
this.clusterManager = clusterManager;
this.configSourceMapper = configSourceMapper;
Expand All @@ -47,6 +49,7 @@ final class DefaultSubscriptionContext implements SubscriptionContext {
this.meterIdPrefix = meterIdPrefix;
this.watchService = watchService;
this.bootstrapSecrets = bootstrapSecrets;
this.extensionRegistry = extensionRegistry;
meterBinderFactory = new ResourceNodeMeterBinderFactory(meterRegistry, meterIdPrefix);
}

Expand Down Expand Up @@ -99,4 +102,9 @@ public BootstrapSecrets bootstrapSecrets() {
public ResourceNodeMeterBinderFactory meterBinderFactory() {
return meterBinderFactory;
}

@Override
public XdsExtensionRegistry extensionRegistry() {
return extensionRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Message;
import com.google.rpc.Code;
import com.google.rpc.Status;

Expand Down Expand Up @@ -248,11 +247,12 @@ public void onCompleted() {
owner.retryOrClose(false);
}

private <I extends Message, O extends XdsResource> void handleResponse(
ResourceParser<I, O> resourceParser, DeltaDiscoveryResponse response) {
private void handleResponse(ResourceParser<?, ?> resourceParser, DeltaDiscoveryResponse response) {
final XdsType type = resourceParser.type();
final List<Resource> deltaResources = response.getResourcesList();
final ParsedResourcesHolder holder = resourceParser.parseDeltaResources(deltaResources);
final ParsedResourcesHolder holder =
resourceParser.parseDeltaResources(deltaResources,
stateCoordinator.extensionRegistry());

if (!holder.errors().isEmpty()) {
holder.invalidResources().forEach((name, error) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ final class EndpointResourceParser extends ResourceParser<ClusterLoadAssignment,
private EndpointResourceParser() {}

@Override
EndpointXdsResource parse(ClusterLoadAssignment message, String version) {
EndpointXdsResource parse(ClusterLoadAssignment message, XdsExtensionRegistry registry,
String version) {
return new EndpointXdsResource(message, version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public final class EndpointXdsResource extends AbstractXdsResource {

private EndpointXdsResource(ClusterLoadAssignment clusterLoadAssignment, String version, long revision) {
super(version, revision);
XdsValidatorIndexRegistry.assertValid(clusterLoadAssignment);
this.clusterLoadAssignment = clusterLoadAssignment;
}

Expand Down
31 changes: 16 additions & 15 deletions xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import com.linecorp.armeria.client.ClientPreprocessorsBuilder;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.filter.HttpFilterFactory;
import com.linecorp.armeria.xds.filter.HttpFilterFactoryRegistry;
import com.linecorp.armeria.xds.filter.XdsHttpFilter;

import io.envoyproxy.envoy.config.route.v3.RetryPolicy;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter.ConfigTypeCase;

Expand All @@ -50,15 +48,16 @@ static Map<String, Any> mergeFilterConfigs(
}

static ClientPreprocessors buildDownstreamFilter(
@Nullable HttpConnectionManager connectionManager) {
if (connectionManager == null) {
XdsExtensionRegistry extensionRegistry,
List<HttpFilter> httpFilters, Map<String, Any> filterConfigs) {
if (httpFilters.isEmpty()) {
return ClientPreprocessors.of();
}
final List<HttpFilter> httpFilters = connectionManager.getHttpFiltersList();
final ClientPreprocessorsBuilder builder = ClientPreprocessors.builder();
for (int i = httpFilters.size() - 1; i >= 0; i--) {
final HttpFilter httpFilter = httpFilters.get(i);
final XdsHttpFilter instance = resolveInstance(httpFilter, null);
final Any perRouteConfig = filterConfigs.get(httpFilter.getName());
final XdsHttpFilter instance = resolveInstance(extensionRegistry, httpFilter, perRouteConfig);
if (instance == null) {
continue;
}
Expand All @@ -69,13 +68,14 @@ static ClientPreprocessors buildDownstreamFilter(
}

static ClientDecoration buildUpstreamFilter(
XdsExtensionRegistry extensionRegistry,
List<HttpFilter> httpFilters, Map<String, Any> filterConfigs,
@Nullable RetryPolicy retryPolicy) {
final ClientDecorationBuilder builder = ClientDecoration.builder();
for (int i = httpFilters.size() - 1; i >= 0; i--) {
final HttpFilter httpFilter = httpFilters.get(i);
final Any perRouteConfig = filterConfigs.get(httpFilter.getName());
final XdsHttpFilter instance = resolveInstance(httpFilter, perRouteConfig);
final XdsHttpFilter instance = resolveInstance(extensionRegistry, httpFilter, perRouteConfig);
if (instance == null) {
continue;
}
Expand All @@ -90,15 +90,18 @@ static ClientDecoration buildUpstreamFilter(
}

@Nullable
private static XdsHttpFilter resolveInstance(
static XdsHttpFilter resolveInstance(
XdsExtensionRegistry extensionRegistry,
HttpFilter httpFilter, @Nullable Any perRouteConfig) {
final HttpFilterFactory filterFactory =
HttpFilterFactoryRegistry.filterFactory(httpFilter.getName());
if (filterFactory == null) {
final Any defaultConfig = httpFilter.getTypedConfig();
final Any filterConfig = perRouteConfig != null ? perRouteConfig : defaultConfig;
final HttpFilterFactory factory = extensionRegistry.query(
filterConfig, httpFilter.getName(), HttpFilterFactory.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (factory == null) {
if (!httpFilter.getIsOptional()) {
throw new IllegalArgumentException(
"Unknown HTTP filter '" + httpFilter.getName() +
"': no HttpFilterFactory registered. Register an SPI " +
"': no HttpFilterFactory registered. Register an " +
"HttpFilterFactory implementation to handle this filter.");
}
return null;
Expand All @@ -107,9 +110,7 @@ private static XdsHttpFilter resolveInstance(
httpFilter.getConfigTypeCase() == ConfigTypeCase.CONFIGTYPE_NOT_SET,
"Only 'typed_config' is supported, but '%s' was supplied",
httpFilter.getConfigTypeCase());
final Any effectiveConfig =
perRouteConfig != null ? perRouteConfig : httpFilter.getTypedConfig();
return filterFactory.create(httpFilter, effectiveConfig);
return factory.create(httpFilter, filterConfig, extensionRegistry.validator());
}

private FilterUtil() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds;

import java.util.List;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;

import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;

final class HttpConnectionManagerFactory implements XdsExtensionFactory {

static final HttpConnectionManagerFactory INSTANCE = new HttpConnectionManagerFactory();
private static final String NAME = "envoy.http_connection_manager";
private static final String TYPE_URL =
"type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" +
".HttpConnectionManager";
private static final List<String> TYPE_URLS = ImmutableList.of(TYPE_URL);

private HttpConnectionManagerFactory() {}

@Override
public String name() {
return NAME;
}

@Override
public List<String> typeUrls() {
return TYPE_URLS;
}

HttpConnectionManager create(Any config, XdsResourceValidator validator) {
return validator.unpack(config, HttpConnectionManager.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ private void initializeBootstrap(Bootstrap bootstrap, SubscriptionContext bootst
void register(Listener listener, SubscriptionContext context, SnapshotWatcher<Object> watcher) {
checkArgument(!nodes.containsKey(listener.getName()),
"Static listener with name '%s' already registered", listener.getName());
final ListenerStream node = new ListenerStream(new ListenerXdsResource(listener), context);
final ListenerXdsResource listenerResource =
ListenerResourceParser.INSTANCE.parse(listener, context.extensionRegistry(), "");
final ListenerStream node = new ListenerStream(listenerResource, context);
nodes.put(listener.getName(), node);
eventLoop.execute(safeRunnable(() -> {
final Subscription subscription = node.subscribe(watcher);
Expand Down
Loading
Loading