diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java index fb6f2d04a88..65a5f63b21f 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java @@ -73,7 +73,7 @@ protected Subscription onStart(SnapshotWatcher watcher) { return Subscription.noop(); } return new ResourceNodeAdapter(configSource, context, resourceName, CLUSTER) - .switchMap(resource -> resource2snapshot(resource, configSource)) + .switchMapEager(resource -> resource2snapshot(resource, configSource)) .subscribe(watcher); } @@ -94,7 +94,7 @@ private SnapshotStream resource2snapshot(ClusterXdsResource res final SnapshotStream> lbStream = SnapshotStream.combineLatest(endpointSnapshotStream, transportSocket, socketMatchesStream, LoadBalancerInput::new) - .switchMap(input -> { + .switchMapEager(input -> { if (!input.endpointSnapshot.isPresent()) { return SnapshotStream.empty(); } @@ -102,7 +102,7 @@ private SnapshotStream resource2snapshot(ClusterXdsResource res if (context.clusterManager().hasLocalCluster() && !resourceName.equals(context.clusterManager().localClusterName())) { return new LocalClusterStream(context.clusterManager()) - .switchMap(localCluster -> { + .switchMapEager(localCluster -> { return new LoadBalancerStream( resource, endpointSnapshot, input.transportSocket, diff --git a/xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java b/xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java index 15e4946bf4e..0ae4973402f 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java @@ -46,14 +46,6 @@ public void handleResponse( final String errorDetails; if (holder.errors().isEmpty()) { sender.ackResponse(resourceParser.type(), response.getVersionInfo(), response.getNonce()); - // The version was updated, so we always update the cache in case a late watcher subscribed. - // 1) A subscriber is added, and a stream is created. - // 2) Immediately, the subscriber is removed but a response later updates the version. - // At this step, subscribers cannot be notified as there are no watchers - // and the value isn't cached. - // 3) Afterward, a new subscriber is added - this subscriber doesn't see any cached values so the - // subscriber is in an indefinite waiting state until the version is incremented. - storage.updateCache(resourceParser.type(), holder.parsedResources()); } else { errorDetails = errorMessageJoiner.join(holder.errors()); sender.nackResponse(resourceParser.type(), response.getNonce(), errorDetails); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java b/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java index d75b6ab525d..fd685797c12 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.java @@ -32,6 +32,7 @@ final class DefaultSubscriptionContext implements SubscriptionContext { private final MeterIdPrefix meterIdPrefix; private final DirectoryWatchService watchService; private final BootstrapSecrets bootstrapSecrets; + private final ResourceNodeMeterBinderFactory meterBinderFactory; DefaultSubscriptionContext(EventExecutor eventLoop, XdsClusterManager clusterManager, ConfigSourceMapper configSourceMapper, @@ -46,6 +47,7 @@ final class DefaultSubscriptionContext implements SubscriptionContext { this.meterIdPrefix = meterIdPrefix; this.watchService = watchService; this.bootstrapSecrets = bootstrapSecrets; + meterBinderFactory = new ResourceNodeMeterBinderFactory(meterRegistry, meterIdPrefix); } @Override @@ -92,4 +94,9 @@ public DirectoryWatchService watchService() { public BootstrapSecrets bootstrapSecrets() { return bootstrapSecrets; } + + @Override + public ResourceNodeMeterBinderFactory meterBinderFactory() { + return meterBinderFactory; + } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java index 3fe607cf8e8..74d22fce52c 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java @@ -58,7 +58,7 @@ protected Subscription onStart(SnapshotWatcher watcher) { .subscribe(watcher); } return new ResourceNodeAdapter(configSource, context, resourceName, LISTENER) - .switchMap(resource -> resource2snapshot(resource, configSource)) + .switchMapEager(resource -> resource2snapshot(resource, configSource)) .subscribe(watcher); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java index f94380c4f8b..24bad4ad629 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java @@ -24,7 +24,7 @@ final class ResourceNodeAdapter extends RefCountedStream< private final SubscriptionContext context; private final String name; private final XdsType type; - private final ResourceNodeMeterBinder resourceNodeMeterBinder; + private final ResourceNodeMeterBinderFactory.ResourceNodeMeterBinder resourceNodeMeterBinder; ResourceNodeAdapter(ConfigSource configSource, SubscriptionContext context, @@ -33,9 +33,7 @@ final class ResourceNodeAdapter extends RefCountedStream< this.context = context; this.name = name; this.type = type; - resourceNodeMeterBinder = new ResourceNodeMeterBinder(context.meterRegistry(), - context.meterIdPrefix(), - type, name); + resourceNodeMeterBinder = context.meterBinderFactory().acquire(type, name); } @Override diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinder.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinder.java deleted file mode 100644 index 768f8f72c73..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinder.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2025 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import java.util.Locale; - -import com.linecorp.armeria.common.metric.MeterIdPrefix; - -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.MeterRegistry; - -/** - * Records metrics for each {@link ResourceNode}. - * This is not done at the user-exposed {@link SnapshotWatcher} level so that users can - * observe the internal state/lifecycle of {@link ResourceNode}s via metrics. - */ -final class ResourceNodeMeterBinder implements ResourceWatcher { - - private final MeterRegistry meterRegistry; - private long updatedRevision; - private boolean closed; - - private final Gauge revisionGauge; - private final Counter errorCounter; - private final Counter missingCounter; - - ResourceNodeMeterBinder(MeterRegistry meterRegistry, - MeterIdPrefix meterIdPrefix, XdsType type, String resourceName) { - this.meterRegistry = meterRegistry; - meterIdPrefix = meterIdPrefix.withTags("name", resourceName, - "type", type.name().toLowerCase(Locale.ROOT)); - revisionGauge = Gauge.builder(meterIdPrefix.name("resource.node.revision"), () -> updatedRevision) - .tags(meterIdPrefix.tags()) - .register(meterRegistry); - errorCounter = Counter.builder(meterIdPrefix.name("resource.node.error")) - .tags(meterIdPrefix.tags()) - .register(meterRegistry); - missingCounter = Counter.builder(meterIdPrefix.name("resource.node.missing")) - .tags(meterIdPrefix.tags()) - .register(meterRegistry); - } - - void close() { - if (closed) { - return; - } - closed = true; - meterRegistry.remove(revisionGauge); - meterRegistry.remove(errorCounter); - meterRegistry.remove(missingCounter); - } - - @Override - public void onError(XdsType type, String resourceName, Throwable t) { - errorCounter.increment(); - } - - @Override - public void onResourceDoesNotExist(XdsType type, String resourceName) { - missingCounter.increment(); - } - - @Override - public void onChanged(XdsResource update) { - updatedRevision = update.revision(); - } -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java new file mode 100644 index 00000000000..9481bacc01c --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java @@ -0,0 +1,143 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.linecorp.armeria.common.metric.MeterIdPrefix; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +class ResourceNodeMeterBinderFactory { + + private final Map metersMap = new HashMap<>(); + private final MeterRegistry meterRegistry; + private final MeterIdPrefix meterIdPrefix; + + ResourceNodeMeterBinderFactory(MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix) { + this.meterRegistry = meterRegistry; + this.meterIdPrefix = meterIdPrefix; + } + + ResourceNodeMeterBinder acquire(XdsType type, String resourceName) { + final Key key = new Key(type, resourceName); + return metersMap.compute(key, (ignored, existing) -> { + if (existing != null) { + existing.refCount.incrementAndGet(); + return existing; + } + return new ResourceNodeMeterBinder(key, meterRegistry, meterIdPrefix); + }); + } + + /** + * Records metrics for each {@link ResourceNode}. + * This is not done at the user-exposed {@link SnapshotWatcher} level so that users can + * observe the internal state/lifecycle of {@link ResourceNode}s via metrics. + */ + final class ResourceNodeMeterBinder implements ResourceWatcher { + + private final Key key; + private boolean closed; + private final MeterRegistry meterRegistry; + private final AtomicLong updatedRevision = new AtomicLong(); + private final AtomicInteger refCount = new AtomicInteger(1); + private final Gauge revisionGauge; + private final Counter errorCounter; + private final Counter missingCounter; + + private ResourceNodeMeterBinder(Key key, MeterRegistry meterRegistry, + MeterIdPrefix meterIdPrefix) { + this.key = key; + this.meterRegistry = meterRegistry; + meterIdPrefix = meterIdPrefix.withTags("name", key.resourceName, + "type", key.type.name().toLowerCase(Locale.ROOT)); + revisionGauge = Gauge.builder(meterIdPrefix.name("resource.node.revision"), + updatedRevision, AtomicLong::get) + .tags(meterIdPrefix.tags()) + .register(meterRegistry); + errorCounter = Counter.builder(meterIdPrefix.name("resource.node.error")) + .tags(meterIdPrefix.tags()) + .register(meterRegistry); + missingCounter = Counter.builder(meterIdPrefix.name("resource.node.missing")) + .tags(meterIdPrefix.tags()) + .register(meterRegistry); + } + + void close() { + if (closed) { + return; + } + if (refCount.decrementAndGet() == 0) { + closed = true; + meterRegistry.remove(revisionGauge); + meterRegistry.remove(errorCounter); + meterRegistry.remove(missingCounter); + metersMap.remove(key); + } + } + + @Override + public void onError(XdsType type, String resourceName, Throwable t) { + errorCounter.increment(); + } + + @Override + public void onResourceDoesNotExist(XdsType type, String resourceName) { + missingCounter.increment(); + } + + @Override + public void onChanged(XdsResource update) { + updatedRevision.set(update.revision()); + } + } + + private static final class Key { + private final XdsType type; + private final String resourceName; + + private Key(XdsType type, String resourceName) { + this.type = type; + this.resourceName = resourceName; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final Key other = (Key) obj; + return type == other.type && resourceName.equals(other.resourceName); + } + + @Override + public int hashCode() { + return Objects.hash(type, resourceName); + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java index 15436ddfda4..29ff2a49875 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java @@ -59,7 +59,7 @@ protected Subscription onStart(SnapshotWatcher watcher) { assert configSource != null; final SnapshotStream snapshotStream = new ResourceNodeAdapter( configSource, context, resourceName, ROUTE) - .switchMap(routeResource -> new RouteSnapshotStream(routeResource, context)); + .switchMapEager(routeResource -> new RouteSnapshotStream(routeResource, context)); return snapshotStream.subscribe(watcher); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java b/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java index a17da70dee8..b3eacc6ca1f 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java @@ -46,9 +46,9 @@ default SnapshotStream map(Function mapper) { return new MapStream<>(this, mapper); } - default > SnapshotStream switchMap( + default > SnapshotStream switchMapEager( Function mapper) { - return new SwitchMapStream<>(this, mapper); + return new SwitchMapEagerStream<>(this, mapper); } static , I> SnapshotStream> combineNLatest(List stream) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java b/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java index 2bc465f2e0b..fc030d61e83 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java @@ -22,9 +22,6 @@ import java.util.Map; import java.util.Set; -import com.google.common.collect.ImmutableMap; - -import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.SafeCloseable; import io.netty.util.concurrent.EventExecutor; @@ -36,7 +33,6 @@ final class SubscriberStorage implements SafeCloseable { private final Map>> subscriberMap = new EnumMap<>(XdsType.class); - private final ResourceCache resourceCache = new ResourceCache(); SubscriberStorage(EventExecutor eventLoop, long timeoutMillis) { this.eventLoop = eventLoop; @@ -52,8 +48,7 @@ boolean register(XdsType type, String resourceName, Reso type, key -> new HashMap<>()).get(resourceName); boolean updated = false; if (subscriber == null) { - subscriber = new XdsStreamSubscriber<>(type, resourceName, eventLoop, timeoutMillis, - resourceCache); + subscriber = new XdsStreamSubscriber<>(type, resourceName, eventLoop, timeoutMillis); subscriberMap.get(type).put(resourceName, subscriber); updated = true; } @@ -91,10 +86,6 @@ Map> subscribers(XdsType return unsafeCast(subscriberMap.getOrDefault(type, Collections.emptyMap())); } - void updateCache(XdsType type, Map resources) { - resourceCache.updateResources(type, resources); - } - static T unsafeCast(Object obj) { //noinspection unchecked return (T) obj; @@ -115,19 +106,4 @@ public void close() { }); subscriberMap.clear(); } - - static class ResourceCache { - - private final Map> type2resources = new HashMap<>(); - - void updateResources(XdsType type, Map resources) { - type2resources.put(type, resources); - } - - @Nullable - Object find(XdsType type, String resourceName) { - return type2resources.getOrDefault(type, ImmutableMap.of()) - .get(resourceName); - } - } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java b/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java index 159c8035ee1..2e9199d2aa0 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java @@ -41,4 +41,6 @@ interface SubscriptionContext { DirectoryWatchService watchService(); BootstrapSecrets bootstrapSecrets(); + + ResourceNodeMeterBinderFactory meterBinderFactory(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SwitchMapStream.java b/xds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.java similarity index 76% rename from xds/src/main/java/com/linecorp/armeria/xds/SwitchMapStream.java rename to xds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.java index a62f6cd91c2..b8de6c34611 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SwitchMapStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.java @@ -20,7 +20,7 @@ import com.linecorp.armeria.common.annotation.Nullable; -final class SwitchMapStream extends RefCountedStream { +final class SwitchMapEagerStream extends RefCountedStream { private final SnapshotStream upstream; private final Function> mapper; @@ -30,9 +30,10 @@ final class SwitchMapStream extends RefCountedStream { @Nullable private Subscription innerSub; private long epoch; + private long innerEpoch; - SwitchMapStream(SnapshotStream upstream, - Function> mapper) { + SwitchMapEagerStream(SnapshotStream upstream, + Function> mapper) { this.upstream = upstream; this.mapper = mapper; } @@ -47,20 +48,29 @@ protected Subscription onStart(SnapshotWatcher watcher) { return; } - if (innerSub != null) { - innerSub.close(); - innerSub = null; - } + final Subscription prevInnerSub = innerSub; + final long mappedEpoch = ++innerEpoch; final SnapshotStream innerStream; try { innerStream = mapper.apply(snapshot); } catch (Throwable t) { + innerEpoch--; emit(null, t); return; } - innerSub = innerStream.subscribe(this::emit); + innerSub = innerStream.subscribe((value, error) -> { + if (mappedEpoch != innerEpoch) { + return; + } + emit(value, error); + }); + + if (prevInnerSub != null) { + prevInnerSub.close(); + } + // If stopped during subscription, close immediately if (subscriptionEpoch != epoch) { innerSub.close(); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java index b5055172e53..bc901956a1c 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java @@ -65,20 +65,20 @@ protected Subscription onStart(SnapshotWatcher watcher) .build(); final SecretStream secretStream = new SecretStream(secret, context); validationStream = secretStream - .switchMap(resource -> new CertificateValidationContextStream(context, resource)) + .switchMapEager(resource -> new CertificateValidationContextStream(context, resource)) .map(Optional::of); } else if (commonTlsContext.hasValidationContextSdsSecretConfig()) { final SdsSecretConfig sdsConfig = commonTlsContext.getValidationContextSdsSecretConfig(); final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); validationStream = secretStream - .switchMap(resource -> new CertificateValidationContextStream(context, resource)) + .switchMapEager(resource -> new CertificateValidationContextStream(context, resource)) .map(Optional::of); } else if (commonTlsContext.hasCombinedValidationContext()) { final CombinedCertificateValidationContext combined = commonTlsContext.getCombinedValidationContext(); final SdsSecretConfig sdsConfig = combined.getValidationContextSdsSecretConfig(); final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); - validationStream = secretStream.switchMap(resource -> new CertificateValidationContextStream( + validationStream = secretStream.switchMapEager(resource -> new CertificateValidationContextStream( context, resource, combined.getDefaultValidationContext())) .map(Optional::of); } else { @@ -90,13 +90,13 @@ protected Subscription onStart(SnapshotWatcher watcher) final TlsCertificate tlsCertificate = commonTlsContext.getTlsCertificatesList().get(0); final Secret secret = Secret.newBuilder().setTlsCertificate(tlsCertificate).build(); final SecretStream secretStream = new SecretStream(secret, context); - tlsCertStream = secretStream.switchMap(resource -> new TlsCertificateStream(context, resource)) + tlsCertStream = secretStream.switchMapEager(resource -> new TlsCertificateStream(context, resource)) .map(Optional::of); } else if (!commonTlsContext.getTlsCertificateSdsSecretConfigsList().isEmpty()) { final SdsSecretConfig sdsConfig = commonTlsContext.getTlsCertificateSdsSecretConfigsList().get(0); final SecretStream secretStream = new SecretStream(sdsConfig, configSource, context); - tlsCertStream = secretStream.switchMap(resource -> new TlsCertificateStream(context, resource)) + tlsCertStream = secretStream.switchMapEager(resource -> new TlsCertificateStream(context, resource)) .map(Optional::of); } else { // static diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java index 05b03972b0f..ea1af5a7144 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java @@ -26,7 +26,6 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.SafeCloseable; -import com.linecorp.armeria.xds.SubscriberStorage.ResourceCache; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ScheduledFuture; @@ -38,7 +37,6 @@ class XdsStreamSubscriber implements SafeCloseable { private final XdsType type; private final String resource; private final long timeoutMillis; - private final ResourceCache resourceCache; private final EventExecutor eventLoop; @Nullable @@ -48,13 +46,11 @@ class XdsStreamSubscriber implements SafeCloseable { private ScheduledFuture initialAbsentFuture; private final Set> resourceWatchers = new HashSet<>(); - XdsStreamSubscriber(XdsType type, String resource, EventExecutor eventLoop, long timeoutMillis, - ResourceCache resourceCache) { + XdsStreamSubscriber(XdsType type, String resource, EventExecutor eventLoop, long timeoutMillis) { this.type = type; this.resource = resource; this.eventLoop = eventLoop; this.timeoutMillis = timeoutMillis; - this.resourceCache = resourceCache; restartTimer(); } @@ -134,12 +130,11 @@ boolean isEmpty() { return resourceWatchers.isEmpty(); } - @SuppressWarnings("unchecked") void registerWatcher(ResourceWatcher watcher) { resourceWatchers.add(watcher); - final Object cached = resourceCache.find(type, resource); + final T cached = data; if (cached != null) { - watcher.onChanged((T) cached); + watcher.onChanged(cached); } else if (absent) { watcher.onResourceDoesNotExist(type, resource); } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapTest.java b/xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.java similarity index 90% rename from xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapTest.java rename to xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.java index d915b4c6bc5..fc094b9be62 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.java @@ -33,14 +33,14 @@ import com.linecorp.armeria.xds.SnapshotStream.Subscription; @SuppressWarnings("CheckReturnValue") -class StreamSwitchMapTest { +class StreamSwitchMapEagerTest { @Test - void switchMapBasicFlow() { + void switchMapEagerBasicFlow() { final TestStream upstream = new TestStream<>(); final List received = new ArrayList<>(); - final SnapshotStream switched = upstream.switchMap( + final SnapshotStream switched = upstream.switchMapEager( value -> SnapshotStream.just(value + "-mapped")); switched.subscribe((snapshot, error) -> { @@ -56,13 +56,13 @@ void switchMapBasicFlow() { } @Test - void switchMapSwitchesToNewInnerStream() { + void switchMapEagerSwitchesToNewInnerStream() { final TestStream upstream = new TestStream<>(); final TestStream inner1 = new TestStream<>(); final TestStream inner2 = new TestStream<>(); final List received = new ArrayList<>(); - final SnapshotStream switched = upstream.switchMap(value -> { + final SnapshotStream switched = upstream.switchMapEager(value -> { if ("key1".equals(value)) { return inner1; } else { @@ -92,12 +92,12 @@ void switchMapSwitchesToNewInnerStream() { } @Test - void switchMapUnsubscribesFromPreviousInnerStream() { + void switchMapEagerUnsubscribesFromPreviousInnerStream() { final TestStream upstream = new TestStream<>(); final TestStream inner1 = new TestStream<>(); final TestStream inner2 = new TestStream<>(); - final SnapshotStream switched = upstream.switchMap(value -> { + final SnapshotStream switched = upstream.switchMapEager(value -> { if ("key1".equals(value)) { return inner1; } else { @@ -116,11 +116,11 @@ void switchMapUnsubscribesFromPreviousInnerStream() { } @Test - void switchMapPropagatesUpstreamError() { + void switchMapEagerPropagatesUpstreamError() { final TestStream upstream = new TestStream<>(); final List errors = new ArrayList<>(); - final SnapshotStream switched = upstream.switchMap(SnapshotStream::just); + final SnapshotStream switched = upstream.switchMapEager(SnapshotStream::just); switched.subscribe((snapshot, error) -> { if (error != null) { @@ -135,12 +135,12 @@ void switchMapPropagatesUpstreamError() { } @Test - void switchMapPropagatesInnerError() { + void switchMapEagerPropagatesInnerError() { final TestStream upstream = new TestStream<>(); final TestStream inner = new TestStream<>(); final List errors = new ArrayList<>(); - final SnapshotStream switched = upstream.switchMap(value -> inner); + final SnapshotStream switched = upstream.switchMapEager(value -> inner); switched.subscribe((snapshot, error) -> { if (error != null) { @@ -157,12 +157,12 @@ void switchMapPropagatesInnerError() { } @Test - void switchMapPropagatesMapperException() { + void switchMapEagerPropagatesMapperException() { final TestStream upstream = new TestStream<>(); final List errors = new ArrayList<>(); final RuntimeException testError = new RuntimeException("mapper error"); - final SnapshotStream switched = upstream.switchMap(value -> { + final SnapshotStream switched = upstream.switchMapEager(value -> { throw testError; }); @@ -178,11 +178,11 @@ void switchMapPropagatesMapperException() { } @Test - void switchMapUnsubscribeClosesAllStreams() { + void switchMapEagerUnsubscribeClosesAllStreams() { final TestStream upstream = new TestStream<>(); final TestStream inner = new TestStream<>(); - final SnapshotStream switched = upstream.switchMap(value -> inner); + final SnapshotStream switched = upstream.switchMapEager(value -> inner); final Subscription subscription = switched.subscribe((snapshot, error) -> {}); @@ -198,13 +198,13 @@ void switchMapUnsubscribeClosesAllStreams() { } @Test - void switchMapMultipleSubscribers() { + void switchMapEagerMultipleSubscribers() { final TestStream upstream = new TestStream<>(); final List received1 = new ArrayList<>(); final List received2 = new ArrayList<>(); final SnapshotStream switched = - upstream.switchMap(value -> SnapshotStream.just(value + "-mapped")); + upstream.switchMapEager(value -> SnapshotStream.just(value + "-mapped")); switched.subscribe((snapshot, error) -> { if (snapshot != null) { @@ -224,12 +224,12 @@ void switchMapMultipleSubscribers() { } @Test - void switchMapNewSubscriberReceivesLatestValue() { + void switchMapEagerNewSubscriberReceivesLatestValue() { final TestStream upstream = new TestStream<>(); final List received = new ArrayList<>(); final SnapshotStream switched = - upstream.switchMap(value -> SnapshotStream.just(value + "-mapped")); + upstream.switchMapEager(value -> SnapshotStream.just(value + "-mapped")); switched.subscribe((snapshot, error) -> {}); @@ -250,7 +250,7 @@ void upstreamEmitsAfterSwitch() { final Deque> q = new ArrayDeque<>(); - final SnapshotStream switched = upstream.switchMap(value -> { + final SnapshotStream switched = upstream.switchMapEager(value -> { final TestStream testStream = new TestStream<>(); q.push(testStream); for (TestStream stream : q) { @@ -280,14 +280,14 @@ void upstreamEmitsAfterSwitch() { } @Test - void switchMapOnlyStartsUpstreamOnceWithMultipleSubscribers() { + void switchMapEagerOnlyStartsUpstreamOnceWithMultipleSubscribers() { final AtomicInteger upstreamStartCount = new AtomicInteger(0); final TestStream upstream = new TestStream<>(() -> { upstreamStartCount.incrementAndGet(); return Subscription.noop(); }); - final SnapshotStream switched = upstream.switchMap(SnapshotStream::just); + final SnapshotStream switched = upstream.switchMapEager(SnapshotStream::just); switched.subscribe((snapshot, error) -> {}); switched.subscribe((snapshot, error) -> {}); @@ -296,12 +296,12 @@ void switchMapOnlyStartsUpstreamOnceWithMultipleSubscribers() { } @Test - void switchMapHandlesStaticStreams() { + void switchMapEagerHandlesStaticStreams() { final List received = new ArrayList<>(); final SnapshotStream stream = SnapshotStream.just("static-value"); final SnapshotStream switched = - stream.switchMap(value -> SnapshotStream.just(value + "-mapped")); + stream.switchMapEager(value -> SnapshotStream.just(value + "-mapped")); switched.subscribe((snapshot, error) -> { if (snapshot != null) { @@ -313,11 +313,11 @@ void switchMapHandlesStaticStreams() { } @Test - void switchMapClosesInnerStreamOnUnsubscribe() { + void switchMapEagerClosesInnerStreamOnUnsubscribe() { final TestStream upstream = new TestStream<>(); final TestStream inner = new TestStream<>(); - final SnapshotStream switched = upstream.switchMap(value -> inner); + final SnapshotStream switched = upstream.switchMapEager(value -> inner); final Subscription sub1 = switched.subscribe((snapshot, error) -> {}); final Subscription sub2 = switched.subscribe((snapshot, error) -> {}); @@ -337,7 +337,7 @@ void emitClosesImmediately() { final Deque>> q = new ArrayDeque<>(); - final SnapshotStream> switched = upstream.switchMap(value -> { + final SnapshotStream> switched = upstream.switchMapEager(value -> { final TestStream> testStream = new TestStream<>(); q.push(testStream); for (TestStream> stream : q) {