Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Subscription onStart(SnapshotWatcher<ClusterSnapshot> watcher) {
return Subscription.noop();
}
return new ResourceNodeAdapter<ClusterXdsResource>(configSource, context, resourceName, CLUSTER)
.switchMap(resource -> resource2snapshot(resource, configSource))
.switchMapEager(resource -> resource2snapshot(resource, configSource))
.subscribe(watcher);
}

Expand All @@ -94,15 +94,15 @@ private SnapshotStream<ClusterSnapshot> resource2snapshot(ClusterXdsResource res
final SnapshotStream<Optional<XdsLoadBalancer>> lbStream =
SnapshotStream.combineLatest(endpointSnapshotStream, transportSocket, socketMatchesStream,
LoadBalancerInput::new)
.switchMap(input -> {
.switchMapEager(input -> {
if (!input.endpointSnapshot.isPresent()) {
return SnapshotStream.empty();
}
final EndpointSnapshot endpointSnapshot = input.endpointSnapshot.get();
if (context.clusterManager().hasLocalCluster() &&
!resourceName.equals(context.clusterManager().localClusterName())) {
return new LocalClusterStream(context.clusterManager())
.switchMap(localCluster -> {
.switchMapEager(localCluster -> {
return new LoadBalancerStream(
resource, endpointSnapshot,
input.transportSocket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ public <I extends Message, O extends XdsResource> void handleResponse(
final String errorDetails;
if (holder.errors().isEmpty()) {
sender.ackResponse(resourceParser.type(), response.getVersionInfo(), response.getNonce());
// The version was updated, so we always update the cache in case a late watcher subscribed.
// 1) A subscriber is added, and a stream is created.
// 2) Immediately, the subscriber is removed but a response later updates the version.
// At this step, subscribers cannot be notified as there are no watchers
// and the value isn't cached.
// 3) Afterward, a new subscriber is added - this subscriber doesn't see any cached values so the
// subscriber is in an indefinite waiting state until the version is incremented.
storage.updateCache(resourceParser.type(), holder.parsedResources());
} else {
errorDetails = errorMessageJoiner.join(holder.errors());
sender.nackResponse(resourceParser.type(), response.getNonce(), errorDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final class DefaultSubscriptionContext implements SubscriptionContext {
private final MeterIdPrefix meterIdPrefix;
private final DirectoryWatchService watchService;
private final BootstrapSecrets bootstrapSecrets;
private final ResourceNodeMeterBinderFactory meterBinderFactory;

DefaultSubscriptionContext(EventExecutor eventLoop, XdsClusterManager clusterManager,
ConfigSourceMapper configSourceMapper,
Expand All @@ -46,6 +47,7 @@ final class DefaultSubscriptionContext implements SubscriptionContext {
this.meterIdPrefix = meterIdPrefix;
this.watchService = watchService;
this.bootstrapSecrets = bootstrapSecrets;
meterBinderFactory = new ResourceNodeMeterBinderFactory(meterRegistry, meterIdPrefix);
}

@Override
Expand Down Expand Up @@ -92,4 +94,9 @@ public DirectoryWatchService watchService() {
public BootstrapSecrets bootstrapSecrets() {
return bootstrapSecrets;
}

@Override
public ResourceNodeMeterBinderFactory meterBinderFactory() {
return meterBinderFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected Subscription onStart(SnapshotWatcher<ListenerSnapshot> watcher) {
.subscribe(watcher);
}
return new ResourceNodeAdapter<ListenerXdsResource>(configSource, context, resourceName, LISTENER)
.switchMap(resource -> resource2snapshot(resource, configSource))
.switchMapEager(resource -> resource2snapshot(resource, configSource))
.subscribe(watcher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ final class ResourceNodeAdapter<T extends XdsResource> extends RefCountedStream<
private final SubscriptionContext context;
private final String name;
private final XdsType type;
private final ResourceNodeMeterBinder resourceNodeMeterBinder;
private final ResourceNodeMeterBinderFactory.ResourceNodeMeterBinder resourceNodeMeterBinder;

ResourceNodeAdapter(ConfigSource configSource,
SubscriptionContext context,
Expand All @@ -33,9 +33,7 @@ final class ResourceNodeAdapter<T extends XdsResource> extends RefCountedStream<
this.context = context;
this.name = name;
this.type = type;
resourceNodeMeterBinder = new ResourceNodeMeterBinder(context.meterRegistry(),
context.meterIdPrefix(),
type, name);
resourceNodeMeterBinder = context.meterBinderFactory().acquire(type, name);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.linecorp.armeria.common.metric.MeterIdPrefix;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

class ResourceNodeMeterBinderFactory {

private final Map<Key, ResourceNodeMeterBinder> metersMap = new HashMap<>();
private final MeterRegistry meterRegistry;
private final MeterIdPrefix meterIdPrefix;

ResourceNodeMeterBinderFactory(MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix) {
this.meterRegistry = meterRegistry;
this.meterIdPrefix = meterIdPrefix;
}

ResourceNodeMeterBinder acquire(XdsType type, String resourceName) {
final Key key = new Key(type, resourceName);
return metersMap.compute(key, (ignored, existing) -> {
if (existing != null) {
existing.refCount.incrementAndGet();
return existing;
}
return new ResourceNodeMeterBinder(key, meterRegistry, meterIdPrefix);
});
}

/**
* Records metrics for each {@link ResourceNode}.
* This is not done at the user-exposed {@link SnapshotWatcher} level so that users can
* observe the internal state/lifecycle of {@link ResourceNode}s via metrics.
*/
final class ResourceNodeMeterBinder implements ResourceWatcher<XdsResource> {

private final Key key;
private boolean closed;
private final MeterRegistry meterRegistry;
private final AtomicLong updatedRevision = new AtomicLong();
private final AtomicInteger refCount = new AtomicInteger(1);
private final Gauge revisionGauge;
private final Counter errorCounter;
private final Counter missingCounter;

private ResourceNodeMeterBinder(Key key, MeterRegistry meterRegistry,
MeterIdPrefix meterIdPrefix) {
this.key = key;
this.meterRegistry = meterRegistry;
meterIdPrefix = meterIdPrefix.withTags("name", key.resourceName,
"type", key.type.name().toLowerCase(Locale.ROOT));
revisionGauge = Gauge.builder(meterIdPrefix.name("resource.node.revision"),
updatedRevision, AtomicLong::get)
.tags(meterIdPrefix.tags())
.register(meterRegistry);
errorCounter = Counter.builder(meterIdPrefix.name("resource.node.error"))
.tags(meterIdPrefix.tags())
.register(meterRegistry);
missingCounter = Counter.builder(meterIdPrefix.name("resource.node.missing"))
.tags(meterIdPrefix.tags())
.register(meterRegistry);
}

void close() {
if (closed) {
return;
}
if (refCount.decrementAndGet() == 0) {
closed = true;
meterRegistry.remove(revisionGauge);
meterRegistry.remove(errorCounter);
meterRegistry.remove(missingCounter);
metersMap.remove(key);
}
}

@Override
public void onError(XdsType type, String resourceName, Throwable t) {
errorCounter.increment();
}

@Override
public void onResourceDoesNotExist(XdsType type, String resourceName) {
missingCounter.increment();
}

@Override
public void onChanged(XdsResource update) {
updatedRevision.set(update.revision());
}
}

private static final class Key {
private final XdsType type;
private final String resourceName;

private Key(XdsType type, String resourceName) {
this.type = type;
this.resourceName = resourceName;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final Key other = (Key) obj;
return type == other.type && resourceName.equals(other.resourceName);
}

@Override
public int hashCode() {
return Objects.hash(type, resourceName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected Subscription onStart(SnapshotWatcher<RouteSnapshot> watcher) {
assert configSource != null;
final SnapshotStream<RouteSnapshot> snapshotStream = new ResourceNodeAdapter<RouteXdsResource>(
configSource, context, resourceName, ROUTE)
.switchMap(routeResource -> new RouteSnapshotStream(routeResource, context));
.switchMapEager(routeResource -> new RouteSnapshotStream(routeResource, context));
return snapshotStream.subscribe(watcher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ default <R> SnapshotStream<R> map(Function<? super T, ? extends R> mapper) {
return new MapStream<>(this, mapper);
}

default <R, O extends SnapshotStream<? extends R>> SnapshotStream<R> switchMap(
default <R, O extends SnapshotStream<? extends R>> SnapshotStream<R> switchMapEager(
Function<? super T, ? extends O> mapper) {
return new SwitchMapStream<>(this, mapper);
return new SwitchMapEagerStream<>(this, mapper);
}

static <S extends SnapshotStream<I>, I> SnapshotStream<List<I>> combineNLatest(List<S> stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ final class SubscriberStorage implements SafeCloseable {

private final Map<XdsType, Map<String, XdsStreamSubscriber<?>>> subscriberMap =
new EnumMap<>(XdsType.class);
private final ResourceCache resourceCache = new ResourceCache();

SubscriberStorage(EventExecutor eventLoop, long timeoutMillis) {
this.eventLoop = eventLoop;
Expand All @@ -52,8 +51,7 @@ <T extends XdsResource> boolean register(XdsType type, String resourceName, Reso
type, key -> new HashMap<>()).get(resourceName);
boolean updated = false;
if (subscriber == null) {
subscriber = new XdsStreamSubscriber<>(type, resourceName, eventLoop, timeoutMillis,
resourceCache);
subscriber = new XdsStreamSubscriber<>(type, resourceName, eventLoop, timeoutMillis);
subscriberMap.get(type).put(resourceName, subscriber);
updated = true;
}
Expand Down Expand Up @@ -91,10 +89,6 @@ <T extends XdsResource> Map<String, XdsStreamSubscriber<T>> subscribers(XdsType
return unsafeCast(subscriberMap.getOrDefault(type, Collections.emptyMap()));
}

void updateCache(XdsType type, Map<String, Object> resources) {
resourceCache.updateResources(type, resources);
}

static <T> T unsafeCast(Object obj) {
//noinspection unchecked
return (T) obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ interface SubscriptionContext {
DirectoryWatchService watchService();

BootstrapSecrets bootstrapSecrets();

ResourceNodeMeterBinderFactory meterBinderFactory();
}
Loading
Loading