Add support for DELTA_GRPC, AGGREGATED_DELTA_GRPC#6709
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR introduces Delta xDS protocol support alongside the existing State-of-the-World implementation by refactoring the xDS stream architecture. It replaces Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant AdsStream as AdsXdsStream
participant ActualStream as SotwActualStream<br/>(SOTW Mode)
participant gRPC as gRPC Server
participant SC as StateCoordinator
participant Store as ResourceStateStore
App->>AdsStream: start()
AdsStream->>AdsStream: reset()
AdsStream->>ActualStream: resourcesUpdated(LISTENER)
ActualStream->>gRPC: send DiscoveryRequest
gRPC->>ActualStream: onNext(DiscoveryResponse)
ActualStream->>ActualStream: parse resources
ActualStream->>SC: onResourceUpdated(LISTENER, resource)
SC->>Store: putVersioned(name, resource, revision)
Store-->>SC: revised resource (incremented revision)
SC->>App: watcher.onChanged(resource)
ActualStream->>gRPC: send ACK DiscoveryRequest
Note over AdsStream,Store: Cache update triggers
App->>Store: (external cache update)
App->>AdsStream: resourcesUpdated(LISTENER)
AdsStream->>ActualStream: resourcesUpdated(LISTENER)
ActualStream->>gRPC: send DiscoveryRequest(updated version)
gRPC->>ActualStream: onNext(DiscoveryResponse)
ActualStream->>SC: onResourceUpdated(LISTENER, newResource)
SC->>App: watcher.onChanged(newResource)
sequenceDiagram
participant App as Application
participant AdsStream as AdsXdsStream
participant ActualStream as DeltaActualStream<br/>(Delta Mode)
participant Queue as ACK/NACK Queue
participant gRPC as gRPC Server
participant SC as StateCoordinator
App->>AdsStream: start()
AdsStream->>AdsStream: reset()
AdsStream->>ActualStream: resourcesUpdated(LISTENER)
ActualStream->>Queue: enqueue(LISTENER)
ActualStream->>ActualStream: drainRequests()
ActualStream->>gRPC: send DeltaDiscoveryRequest<br/>(subscribe LISTENER)
gRPC->>ActualStream: onNext(DeltaDiscoveryResponse)
ActualStream->>ActualStream: parse delta resources
alt Parse Success
ActualStream->>SC: onResourceUpdated(LISTENER, resource)
ActualStream->>Queue: ackResponse(nonce)
else Parse Error
ActualStream->>SC: onResourceError(LISTENER, cause)
ActualStream->>Queue: nackResponse(nonce, error)
end
ActualStream->>ActualStream: drainRequests()
ActualStream->>gRPC: send DeltaDiscoveryRequest(ACK/NACK)
Note over ActualStream,gRPC: Resource deletion
gRPC->>ActualStream: onNext(DeltaDiscoveryResponse<br/>with removed_resources)
ActualStream->>SC: onResourceMissing(LISTENER, name)
ActualStream->>Queue: ackResponse(nonce)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
DELTA_GRPC, AGGREGATED_DELTA_GRPC
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java (1)
56-64:⚠️ Potential issue | 🟠 MajorCancel the previous absent timer before rearming it.
Line 61 overwrites
initialAbsentFuturewithout cancelling the older task. IfrestartTimer()is called again before the first timeout fires, the old future becomes unreachable, soclose()/onData()can no longer cancel it and you'll get orphanedonAbsent()callbacks.Suggested fix
void restartTimer() { if (!enableAbsentOnTimeout) { return; } + maybeCancelAbsentTimer(); initialAbsentFuture = eventLoop.schedule(() -> { initialAbsentFuture = null; onAbsent(); }, timeoutMillis, TimeUnit.MILLISECONDS); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java` around lines 56 - 64, The restartTimer() method currently overwrites initialAbsentFuture without cancelling any previous scheduled task, which leaks orphaned onAbsent() callbacks; modify restartTimer() to check initialAbsentFuture (the ScheduledFuture set by eventLoop.schedule) and cancel it (e.g., initialAbsentFuture.cancel(false)) if non-null and not already done before scheduling a new task, then assign the new ScheduledFuture to initialAbsentFuture so close() / onData() can still cancel it; keep the existing enableAbsentOnTimeout guard and scheduling via eventLoop.schedule(..., timeoutMillis, TimeUnit.MILLISECONDS).
🧹 Nitpick comments (3)
xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java (1)
73-85: Consider addingvolatiletoCapturingWatcherfields for cross-thread visibility.The watcher's
missingTypeandmissingNameare written from the event loop thread but read from the test thread. While Awaitility's polling likely masks this, addingvolatilewould ensure correct visibility semantics.Suggested fix
private static final class CapturingWatcher implements ResourceWatcher<XdsResource> { - private XdsType missingType; - private String missingName; + private volatile XdsType missingType; + private volatile String missingName;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java` around lines 73 - 85, The CapturingWatcher class has fields missingType and missingName that are written on the event loop thread in onResourceDoesNotExist but read from the test thread; mark both fields (missingType and missingName) as volatile in the CapturingWatcher class to ensure cross-thread visibility so tests reliably observe updates from the event loop.xds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.java (1)
108-135: Log ACK vs NACK explicitly at DEBUG.Right now the request logs include nonce/version counts but not whether the request carries
error_detail. That makes ACKs and NACKs look identical unless TRACE is enabled. LogginghasErrorDetail(and optionally the status code/message) would make rejection flows much easier to troubleshoot.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.java` around lines 108 - 135, The DEBUG logs in requestSent(DiscoveryRequest) and requestSent(DeltaDiscoveryRequest) don't indicate whether the request is an ACK or NACK because they omit error_detail; update both debug log lines (the ones using logger.debug in DefaultConfigSourceLifecycleObserver) to include request.hasErrorDetail() (or equivalent) and, if present, include the error status details (e.g., request.getErrorDetail().getCode() and request.getErrorDetail().getMessage()) so the debug output clearly shows whether the request carries an error and the status info when available.it/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsPreprocessorTest.java (1)
73-91: Consider deduplicating identical hello server extensions.
helloServerandhelloServer2use identical setup; extracting a tiny factory helper would reduce repetition.♻️ Optional refactor
- `@RegisterExtension` - `@Order`(1) - static final ServerExtension helloServer = new ServerExtension() { - `@Override` - protected void configure(ServerBuilder sb) { - sb.service("/hello", (ctx, req) -> HttpResponse.of("world")); - sb.http(0); - } - }; - - `@RegisterExtension` - `@Order`(2) - static final ServerExtension helloServer2 = new ServerExtension() { - `@Override` - protected void configure(ServerBuilder sb) { - sb.service("/hello", (ctx, req) -> HttpResponse.of("world")); - sb.http(0); - } - }; + `@RegisterExtension` + `@Order`(1) + static final ServerExtension helloServer = newHelloServer(); + + `@RegisterExtension` + `@Order`(2) + static final ServerExtension helloServer2 = newHelloServer(); + + private static ServerExtension newHelloServer() { + return new ServerExtension() { + `@Override` + protected void configure(ServerBuilder sb) { + sb.service("/hello", (ctx, req) -> HttpResponse.of("world")); + sb.http(0); + } + }; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsPreprocessorTest.java` around lines 73 - 91, The two test ServerExtension instances helloServer and helloServer2 are identical; refactor by extracting a small factory/helper method (e.g., createHelloServer()) that returns a configured ServerExtension (overriding configure to add the "/hello" service and sb.http(0)); then replace both helloServer and helloServer2 declarations to call that factory so the duplicated configure implementations are removed and the intent is clearer.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsResourceWatcherTest.java`:
- Around line 61-70: The SimpleCache instance is declared static and captured by
the static ServerExtension, so it is shared across test methods; make test state
per-instance by removing the static modifier from the cache field (change
"private static final SimpleCache<String> cache" to an instance field) and also
make the ServerExtension instance non-static (remove static from "static final
ServerExtension server") so the ServerExtension captures the per-test cache;
ensure V3DiscoveryServer is constructed from that instance cache (the
identifiers: cache, SimpleCache, server, ServerExtension, V3DiscoveryServer).
In `@xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java`:
- Around line 86-88: The code currently uses the response-wide systemVersionInfo
instead of each resource's own version, causing per-resource versions to be
lost; update the logic in ResourceParser.parse handling (around the
parse(unpackedMessage, version) call) to pass the resource's individual version
(resource.getVersion()) into parse/result rather than overwriting it with
systemVersionInfo when systemVersionInfo is present, so that the resulting
resourceUpdate and subsequent StateCoordinator.resourceVersions(...) retain
per-resource delta versions for correct delta resume semantics; ensure you only
fall back to systemVersionInfo where truly appropriate (not replacing each
Resource.getVersion()) and keep references to systemVersionInfo and
resource.getVersion() unchanged elsewhere.
In `@xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java`:
- Around line 80-86: nackResponse currently always schedules a retry using
backoff.nextDelayMillis(ackBackoffAttempts) and may pass a negative delay to
eventLoop.schedule; compute long nextDelayMillis =
backoff.nextDelayMillis(ackBackoffAttempts) and if nextDelayMillis < 0 do not
schedule (instead invoke the existing terminal handling used elsewhere, e.g.
call retryOrClose or close the stream) otherwise call eventLoop.schedule(...,
nextDelayMillis, TimeUnit.MILLISECONDS); update nackResponse (and use
ackBackoffAttempts, backoff.nextDelayMillis, eventLoop.schedule,
sendDiscoveryRequest, retryOrClose/close logic) accordingly so no negative delay
is ever passed to the event loop.
In `@xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java`:
- Around line 169-184: The current onAck and nextRevision logic treats an empty
version string as identical to the previous version and stops advancing revision
numbers, which collapses multiple delta ACKs that use "" as systemVersionInfo;
change both methods (onAck and nextRevision in StateCoordinator) to treat an
empty "" version as always advancing: i.e., only short-circuit when
prevVersion.version equals version AND version is non-empty, otherwise increment
the revision (or create a new VersionInfo) so each response with an empty
version still advances the per-type revision stored in versions (refer to
VersionInfo and the versions map keyed by XdsType).
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java`:
- Around line 41-44: The checkArgument call in XdsConverterUtil is logging the
wrong variable; change the formatted argument from configSource to apiType so
the message reports the actual offending value (update the checkArgument
invocation that references apiType,
ApiType.GRPC/DELTA_GRPC/AGGREGATED_GRPC/AGGREGATED_DELTA_GRPC and currently
passes configSource to instead pass apiType).
In `@xds/src/test/java/com/linecorp/armeria/xds/DeltaXdsStreamTest.java`:
- Around line 213-259: The test currently only increments nackCount but never
causes DeltaActualStream to send a real NACK; replace the passive bookkeeping in
the RecordingLifecycleObserver override with a real rejection flow: when ackRef
is false, have the test trigger a server-side validation failure by pushing an
invalid snapshot (e.g., a resource with an invalid type/name similar to the SotW
test) via cache.setSnapshot so the stream will emit an outbound
DeltaDiscoveryRequest that contains a non-null error_detail; assert that an
outbound DeltaDiscoveryRequest (from AdsXdsStream/DeltaActualStream) has
error_detail set, then set ackRef true, push a valid snapshot with
cache.setSnapshot, and finally assert recovery by verifying
lifecycleObserver.responses() contains the expected clusterName. Ensure you
reference and modify ackRef, nackCount,
RecordingLifecycleObserver.responseReceived, cache.setSnapshot,
stateCoordinator.register, stream.start and the assertion that checks outbound
DeltaDiscoveryRequest.error_detail.
---
Outside diff comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java`:
- Around line 56-64: The restartTimer() method currently overwrites
initialAbsentFuture without cancelling any previous scheduled task, which leaks
orphaned onAbsent() callbacks; modify restartTimer() to check
initialAbsentFuture (the ScheduledFuture set by eventLoop.schedule) and cancel
it (e.g., initialAbsentFuture.cancel(false)) if non-null and not already done
before scheduling a new task, then assign the new ScheduledFuture to
initialAbsentFuture so close() / onData() can still cancel it; keep the existing
enableAbsentOnTimeout guard and scheduling via eventLoop.schedule(...,
timeoutMillis, TimeUnit.MILLISECONDS).
---
Nitpick comments:
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsPreprocessorTest.java`:
- Around line 73-91: The two test ServerExtension instances helloServer and
helloServer2 are identical; refactor by extracting a small factory/helper method
(e.g., createHelloServer()) that returns a configured ServerExtension
(overriding configure to add the "/hello" service and sb.http(0)); then replace
both helloServer and helloServer2 declarations to call that factory so the
duplicated configure implementations are removed and the intent is clearer.
In
`@xds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.java`:
- Around line 108-135: The DEBUG logs in requestSent(DiscoveryRequest) and
requestSent(DeltaDiscoveryRequest) don't indicate whether the request is an ACK
or NACK because they omit error_detail; update both debug log lines (the ones
using logger.debug in DefaultConfigSourceLifecycleObserver) to include
request.hasErrorDetail() (or equivalent) and, if present, include the error
status details (e.g., request.getErrorDetail().getCode() and
request.getErrorDetail().getMessage()) so the debug output clearly shows whether
the request carries an error and the status info when available.
In `@xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java`:
- Around line 73-85: The CapturingWatcher class has fields missingType and
missingName that are written on the event loop thread in onResourceDoesNotExist
but read from the test thread; mark both fields (missingType and missingName) as
volatile in the CapturingWatcher class to ensure cross-thread visibility so
tests reliably observe updates from the event loop.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a69dc8f7-bb4f-4ea7-bcbf-244c563492c0
📒 Files selected for processing (54)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/ConfigSourceLifecycleObserverTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsPreprocessorTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsResourceWatcherTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneMatrixTest.javatesting-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.javaxds/src/main/java/com/linecorp/armeria/xds/AbstractXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterStream.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceLifecycleObserver.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultSubscriptionContext.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaDiscoveryStub.javaxds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/EndpointXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerStream.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinder.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.javaxds/src/main/java/com/linecorp/armeria/xds/RouteResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/RouteStream.javaxds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/SotwXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.javaxds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.javaxds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.javaxds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.javaxds/src/main/java/com/linecorp/armeria/xds/XdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/XdsResourceException.javaxds/src/main/java/com/linecorp/armeria/xds/XdsResponseHandler.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStreamState.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.javaxds/src/test/java/com/linecorp/armeria/xds/DeltaXdsStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/SotwXdsStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.javaxds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.javaxds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
💤 Files with no reviewable changes (5)
- xds/src/main/java/com/linecorp/armeria/xds/XdsStreamState.java
- xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsResponseHandler.java
- xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinder.java
- xds/src/main/java/com/linecorp/armeria/xds/SotwXdsStream.java
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6709 +/- ##
============================================
- Coverage 74.46% 73.77% -0.69%
- Complexity 22234 24173 +1939
============================================
Files 1963 2180 +217
Lines 82437 90853 +8416
Branches 10764 11896 +1132
============================================
+ Hits 61385 67028 +5643
- Misses 15918 18183 +2265
- Partials 5134 5642 +508 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java (1)
83-89:⚠️ Potential issue | 🟠 MajorHandle the terminal backoff case here too.
AdsXdsStream.retryOrClose()already stops whenbackoff.nextDelayMillis(...)goes negative, butnackResponse()still passes that value straight toeventLoop.schedule(...). Once the retry budget is exhausted, this path can keep rescheduling instead of stopping.Suggested fix
void nackResponse(XdsType type, String nonce, String errorDetail) { ackBackoffAttempts++; noncesMap.put(type, nonce); + final long nextDelayMillis = backoff.nextDelayMillis(ackBackoffAttempts); + if (nextDelayMillis < 0) { + return; + } eventLoop.schedule(() -> sendDiscoveryRequest(type, lastAckedVersions.get(type), stateCoordinator.interestedResources(type), nonce, errorDetail), - backoff.nextDelayMillis(ackBackoffAttempts), TimeUnit.MILLISECONDS); + nextDelayMillis, TimeUnit.MILLISECONDS); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java` around lines 83 - 89, nackResponse currently unconditionally schedules a retry even if backoff.nextDelayMillis(...) returns a negative value; compute long delay = backoff.nextDelayMillis(ackBackoffAttempts) and if delay < 0, call the same terminal handling used by AdsXdsStream.retryOrClose() (i.e., do not call eventLoop.schedule and instead invoke the stream-close/retry-or-close path), otherwise call eventLoop.schedule(...) with that non-negative delay; reference nackResponse, backoff.nextDelayMillis, eventLoop.schedule and reuse the retryOrClose/terminal logic to stop rescheduling when the backoff budget is exhausted.
🧹 Nitpick comments (1)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneMatrixTest.java (1)
85-86: Consider test isolation with static cache.The
cacheandversionare static and shared across all parameterized test cases. While each test resets the snapshot viacache.setSnapshot(GROUP, emptySnapshot())at the start, parallel test execution could cause interference. TheServerExtensionis also static and captures this cache.Since JUnit 5 runs parameterized tests sequentially by default, this should be safe, but adding
@Execution(ExecutionMode.SAME_THREAD)would make the sequential execution requirement explicit and guard against future configuration changes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneMatrixTest.java` around lines 85 - 86, The static shared state (version, cache and the static ServerExtension) can cause interference across parameterized test cases; add the JUnit parallel-execution safeguard by annotating the test class (XdsControlPlaneMatrixTest) with `@Execution`(ExecutionMode.SAME_THREAD) and import org.junit.jupiter.api.parallel.Execution and org.junit.jupiter.api.parallel.ExecutionMode so tests run sequentially on the same thread and avoid parallel interference with the static cache/version/ServerExtension.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java`:
- Around line 75-80: ackResponse() and nackResponse() are currently sharing the
same nonce/version bookkeeping (noncesMap, lastAckedVersions) so a delayed NACK
can be overwritten by ACK logic; create a separate per-XdsType pendingNack
structure (e.g., Map<XdsType, PendingNack> pendingNacks) to hold the rejected
nonce and ErrorInfo instead of writing into noncesMap/lastAckedVersions in
nackResponse(), update nackResponse() to set pendingNacks[type] and start the
backoff timer without mutating ack state (ackBackoffAttempts, noncesMap,
lastAckedVersions), and make sendDiscoveryRequest(...) and the backoff timer
read pendingNacks live (prefer the pendingNack for nonce/errorDetail if present,
otherwise use noncesMap/lastAckedVersions and null errorDetail) so a later
resourcesUpdated() or ackResponse() can supersede and clear pendingNacks safely.
---
Duplicate comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java`:
- Around line 83-89: nackResponse currently unconditionally schedules a retry
even if backoff.nextDelayMillis(...) returns a negative value; compute long
delay = backoff.nextDelayMillis(ackBackoffAttempts) and if delay < 0, call the
same terminal handling used by AdsXdsStream.retryOrClose() (i.e., do not call
eventLoop.schedule and instead invoke the stream-close/retry-or-close path),
otherwise call eventLoop.schedule(...) with that non-negative delay; reference
nackResponse, backoff.nextDelayMillis, eventLoop.schedule and reuse the
retryOrClose/terminal logic to stop rescheduling when the backoff budget is
exhausted.
---
Nitpick comments:
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneMatrixTest.java`:
- Around line 85-86: The static shared state (version, cache and the static
ServerExtension) can cause interference across parameterized test cases; add the
JUnit parallel-execution safeguard by annotating the test class
(XdsControlPlaneMatrixTest) with `@Execution`(ExecutionMode.SAME_THREAD) and
import org.junit.jupiter.api.parallel.Execution and
org.junit.jupiter.api.parallel.ExecutionMode so tests run sequentially on the
same thread and avoid parallel interference with the static
cache/version/ServerExtension.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 18794cfa-33ce-40d8-a51c-329f3613413d
📒 Files selected for processing (12)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/ConfigSourceLifecycleObserverTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsResourceWatcherTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneMatrixTest.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.javaxds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java
💤 Files with no reviewable changes (1)
- xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java
🚧 Files skipped from review as they are similar to previous changes (2)
- xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.java
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java (1)
28-31: Consider exposingFULL_SOTW_TYPESfor reuse inSotwActualStream.
FULL_SOTW_TYPESis defined here asEnumSet.of(XdsType.LISTENER, XdsType.CLUSTER), butSotwActualStream(lines 198-199) hardcodes the same check:resourceParser.type() == XdsType.LISTENER || resourceParser.type() == XdsType.CLUSTER. This creates a divergence risk—if these types change, both locations must be updated. Consider makingFULL_SOTW_TYPESpackage-visible or providing a helper method thatSotwActualStreamcan use.♻️ Suggested approach
- private static final Set<XdsType> FULL_SOTW_TYPES = EnumSet.of(XdsType.LISTENER, XdsType.CLUSTER); + static final Set<XdsType> FULL_SOTW_TYPES = EnumSet.of(XdsType.LISTENER, XdsType.CLUSTER);Then in
SotwActualStream:- if (fullStateOfTheWorld && - (resourceParser.type() == XdsType.LISTENER || resourceParser.type() == XdsType.CLUSTER)) { + if (fullStateOfTheWorld && StateCoordinator.FULL_SOTW_TYPES.contains(resourceParser.type())) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java` around lines 28 - 31, Expose or provide access to the FULL_SOTW_TYPES set so SotwActualStream can reuse it instead of duplicating the check; locate FULL_SOTW_TYPES in StateCoordinator and make it package-visible (remove private) or add a package-visible helper (e.g., isFullSotwType(XdsType)) and then replace the hardcoded check in SotwActualStream (the resourceParser.type() == XdsType.LISTENER || resourceParser.type() == XdsType.CLUSTER) with a call to the shared set or helper to avoid divergence.xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java (1)
59-64: Remove unused fieldpreviouslySentResources.The
previouslySentResourcesmap is declared but never read from or written to. This appears to be leftover from a previous implementation approach.♻️ Suggested fix
private final ArrayDeque<PendingAck> ackQueue = new ArrayDeque<>(); private final EnumSet<XdsType> pendingUpdates = EnumSet.noneOf(XdsType.class); - // Tracks what was last sent per type on this stream; reset on each new ActualStream. - private final Map<XdsType, Set<String>> previouslySentResources = new EnumMap<>(XdsType.class); // Types for which initial_resource_versions has already been sent on this stream. private final Set<XdsType> initialVersionsSent = EnumSet.noneOf(XdsType.class);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java` around lines 59 - 64, Remove the dead field previouslySentResources from class DeltaActualStream: delete the declaration "private final Map<XdsType, Set<String>> previouslySentResources = new EnumMap<>(XdsType.class);" and any now-unused imports or references related only to that field (e.g., EnumMap) so the class no longer contains an unused member; leave other fields (initialVersionsSent, completed, draining) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@xds/src/test/java/com/linecorp/armeria/xds/XdsClientIntegrationTest.java`:
- Around line 229-235: The test currently calls watcher.blockingChanged(...)
inside await().untilAsserted(...), which destructively consumes the watcher
queue and causes flaky retries; change the test to avoid consuming events during
retries by removing watcher.blockingChanged from inside untilAsserted and
instead read a non-destructive snapshot (e.g., call cache.getSnapshot(GROUP) or
use a non-consuming peek method if available) inside the untilAsserted
assertions: use cache.getSnapshot(GROUP).clusters().resources().get("cluster1")
and assert on the returned ClusterSnapshot/version fields (or call
watcher.blockingChanged once before entering untilAsserted if you must consume
exactly one event), ensuring watcher.blockingChanged is not invoked on every
retry.
---
Nitpick comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java`:
- Around line 59-64: Remove the dead field previouslySentResources from class
DeltaActualStream: delete the declaration "private final Map<XdsType,
Set<String>> previouslySentResources = new EnumMap<>(XdsType.class);" and any
now-unused imports or references related only to that field (e.g., EnumMap) so
the class no longer contains an unused member; leave other fields
(initialVersionsSent, completed, draining) unchanged.
In `@xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java`:
- Around line 28-31: Expose or provide access to the FULL_SOTW_TYPES set so
SotwActualStream can reuse it instead of duplicating the check; locate
FULL_SOTW_TYPES in StateCoordinator and make it package-visible (remove private)
or add a package-visible helper (e.g., isFullSotwType(XdsType)) and then replace
the hardcoded check in SotwActualStream (the resourceParser.type() ==
XdsType.LISTENER || resourceParser.type() == XdsType.CLUSTER) with a call to the
shared set or helper to avoid divergence.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 81c03400-dd75-4880-bbc5-39b5ccd520af
📒 Files selected for processing (10)
xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.javaxds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.javaxds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.javaxds/src/test/java/com/linecorp/armeria/xds/SotwXdsStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/XdsClientIntegrationTest.java
💤 Files with no reviewable changes (1)
- xds/src/test/java/com/linecorp/armeria/xds/SotwXdsStreamTest.java
🚧 Files skipped from review as they are similar to previous changes (1)
- xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
xds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.java (1)
60-77: Consider documenting theinstanceof AbstractXdsResourcecheck.The conditional
withRevisioncall on lines 67-68 means that if a non-AbstractXdsResourceis ever passed, the storedResourceState.revisionwon't be reflected inresource.revision(). Based on the codebase, all currentXdsResourceimplementations extendAbstractXdsResource, so this is effectively dead code, but it could cause subtle inconsistencies if a non-AbstractXdsResourceimplementation is added later.If this is intentional defensive coding, a brief comment would help future maintainers understand the design choice.
💡 Optional: Add a clarifying comment
final long revision = prev != null ? prev.revision + 1 : 1; + // All current XdsResource implementations extend AbstractXdsResource. + // This check is defensive for potential future implementations. final XdsResource revised = resource instanceof AbstractXdsResource ? ((AbstractXdsResource) resource).withRevision(revision) : resource;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.java` around lines 60 - 77, The instanceof AbstractXdsResource branch in putVersioned can produce a stored ResourceState.revision that isn't reflected by resource.revision() for non-AbstractXdsResource instances; add a concise comment above the conditional in putVersioned explaining that currently all XdsResource implementations extend AbstractXdsResource so withRevision(...) is applied, that this branch is defensive to avoid casting non-conforming implementations, and note the potential inconsistency risk if a non-AbstractXdsResource is introduced (referencing AbstractXdsResource, XdsResource, withRevision, and ResourceState.revision) so future maintainers understand the rationale.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneErrorHandlingTest.java`:
- Around line 89-94: The NackTracker is declared as a static shared counter
(nackTracker / NackTracker) so delayed NACKs from previous parameterized runs
can leak into later runs; make the tracker instance-local to each test run
instead: replace the static nackTracker with a per-test NackTracker created in
the test setup (or in the parameterized test method) and ensure the
server/stream is configured to use that per-run instance, then change assertions
to call the per-run tracker’s nackCount() and compare to the per-run nacksBefore
so each parameterized invocation observes only its own NACKs.
In `@xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.java`:
- Around line 76-84: The constructor of ConfigSourceClient currently infers
isDelta/isAds for gRPC variants but does not validate ApiType, allowing
unsupported values (e.g., REST or UNSET) to fall through; call
XdsConverterUtil.validateConfigSource(...) early in the ConfigSourceClient
constructor (or validate apiType explicitly there) using the
apiConfigSource/apiType values and throw an IllegalArgumentException on
unsupported ApiType, so creation fails fast instead of defaulting to a SotW gRPC
stream; ensure the validation happens before using apiType to compute
isDelta/isAds.
---
Nitpick comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.java`:
- Around line 60-77: The instanceof AbstractXdsResource branch in putVersioned
can produce a stored ResourceState.revision that isn't reflected by
resource.revision() for non-AbstractXdsResource instances; add a concise comment
above the conditional in putVersioned explaining that currently all XdsResource
implementations extend AbstractXdsResource so withRevision(...) is applied, that
this branch is defensive to avoid casting non-conforming implementations, and
note the potential inconsistency risk if a non-AbstractXdsResource is introduced
(referencing AbstractXdsResource, XdsResource, withRevision, and
ResourceState.revision) so future maintainers understand the rationale.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cbc79f7e-9825-4a2c-b8bc-9eedc7f6a7f1
📒 Files selected for processing (7)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneErrorHandlingTest.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.javaxds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
🚧 Files skipped from review as they are similar to previous changes (2)
- xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
- xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneErrorHandlingTest.java (1)
89-104:⚠️ Potential issue | 🟠 MajorIsolate NACK accounting per parameterized invocation.
nackTrackeris static and shared across the whole matrix, but NACKs arrive asynchronously after backoff. A late NACK from the previous case can satisfy the next case’snackCount() > nacksBeforecheck without the current malformed snapshot ever being rejected.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneErrorHandlingTest.java` around lines 89 - 104, The NackTracker is declared static and shared across all parameterized runs, causing late asynchronous NACKs to leak between test cases; change nackTracker to be per-test-instance (remove static) or reinitialize/reset it before each parameterized invocation so each test uses a fresh NackTracker; locate the static field named nackTracker and either make it an instance field used when constructing V3DiscoveryServer or add a `@BeforeEach` method that calls nackTracker.reset()/new NackTracker() so the NACK accounting is isolated for each test run.xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java (1)
81-86:⚠️ Potential issue | 🟠 MajorKeep delayed NACK state separate from ACK bookkeeping.
nackResponse()writes the rejected nonce intononcesMapimmediately. IfresourcesUpdated()fires during the 3s delay, it sends a normal request with that nonce and noerror_detail, which effectively ACKs the response we just NACKed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java` around lines 81 - 86, nackResponse currently writes the rejected nonce into noncesMap immediately, which allows resourcesUpdated to ACK it before the delayed NACK is sent; instead, keep delayed NACK state separate by storing the nonce+errorDetail in a separate pendingNacks structure (e.g., pendingNacks.put(type, (nonce,errorDetail))) and do NOT mutate noncesMap or lastAckedVersions until the scheduled task runs; change nackResponse to only schedule a runnable that, when executed, moves the entry from pendingNacks into noncesMap and calls sendDiscoveryRequest(type, lastAckedVersions.get(type), stateCoordinator.interestedResources(type), nonce, errorDetail); ensure resourcesUpdated continues to consult noncesMap for ACK logic so it will not ACK the pending NACK prematurely.xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java (1)
158-167:⚠️ Potential issue | 🟠 MajorDiff delta subscriptions against the last sent subscription set.
After the first request,
previouscomes fromstateCoordinator.resourceVersions(type).keySet(), which only tracks delivered versions. If a watcher unsubscribes before the first response, or later resubscribes after a successful unsubscribe, the control plane never sees the matching subscribe/unsubscribe delta and the stream state drifts.Suggested direction
+ private final Map<XdsType, ImmutableSet<String>> subscribedResources = + new EnumMap<>(XdsType.class); ... if (isFirstOnStream) { subscribe = current; unsubscribe = ImmutableSet.of(); } else { - final Set<String> previous = stateCoordinator.resourceVersions(type).keySet(); + final Set<String> previous = subscribedResources.getOrDefault(type, ImmutableSet.of()); subscribe = new HashSet<>(current); subscribe.removeAll(previous); unsubscribe = new HashSet<>(previous); unsubscribe.removeAll(current); } ... final DeltaDiscoveryRequest request = builder.build(); lifecycleObserver.requestSent(request); requestObserver.onNext(request); + subscribedResources.put(type, ImmutableSet.copyOf(current));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java` around lines 158 - 167, Delta computation currently diffs against stateCoordinator.resourceVersions(type).keySet(), which reflects delivered resource versions and can miss subscribe/unsubscribe transitions; instead diff against the last subscription set that this stream actually sent for the type. Add or use a per-type "lastSentSubscriptions" map in DeltaActualStream (keyed by type) and compute previous = lastSentSubscriptions.getOrDefault(type, ImmutableSet.of()); keep the existing isFirstOnStream branch to initialize subscribe/unsubscribe, otherwise compute subscribe = current - previous and unsubscribe = previous - current; after sending the delta update lastSentSubscriptions.put(type, current) so future diffs are against the last sent subscription set.
🧹 Nitpick comments (1)
xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java (1)
55-57: Consider adding null check forlistenerparameter.Per coding guidelines, public/user-facing methods should add explicit null checks using
Objects.requireNonNull(obj, "paramName"). While this constructor is package-private, consistency with the project's defensive coding style would be beneficial.Suggested fix
+ import static java.util.Objects.requireNonNull; + ListenerXdsResource(Listener listener, String version) { - this(listener, version, 0); + this(requireNonNull(listener, "listener"), version, 0); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java` around lines 55 - 57, Add an explicit null check for the constructor parameter by calling Objects.requireNonNull(listener, "listener") at the start of the ListenerXdsResource(Listener listener, String version) constructor (or delegate to the three-arg constructor after the check). This ensures the listener parameter is validated and follows the project's defensive style; reference the ListenerXdsResource constructor and the listener parameter when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java`:
- Around line 136-137: The call in AdsXdsStream that schedules a reconnect uses
Math.max(nextDelayMillis, 1_000L) which converts a negative sentinel from
backoff.nextDelayMillis(connBackoffAttempts) into a 1s retry and breaks backoff
exhaustion; change the logic to check nextDelayMillis returned by
backoff.nextDelayMillis(connBackoffAttempts) and if it is negative, do not
schedule eventLoop.schedule(this::reset, ...), instead treat it as exhaustion
(e.g., return/stop retrying or transition to terminal state and log), otherwise
schedule with the positive nextDelayMillis as-is (or Math.max(nextDelayMillis,
1L) to avoid zero).
---
Duplicate comments:
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneErrorHandlingTest.java`:
- Around line 89-104: The NackTracker is declared static and shared across all
parameterized runs, causing late asynchronous NACKs to leak between test cases;
change nackTracker to be per-test-instance (remove static) or reinitialize/reset
it before each parameterized invocation so each test uses a fresh NackTracker;
locate the static field named nackTracker and either make it an instance field
used when constructing V3DiscoveryServer or add a `@BeforeEach` method that calls
nackTracker.reset()/new NackTracker() so the NACK accounting is isolated for
each test run.
In `@xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java`:
- Around line 158-167: Delta computation currently diffs against
stateCoordinator.resourceVersions(type).keySet(), which reflects delivered
resource versions and can miss subscribe/unsubscribe transitions; instead diff
against the last subscription set that this stream actually sent for the type.
Add or use a per-type "lastSentSubscriptions" map in DeltaActualStream (keyed by
type) and compute previous = lastSentSubscriptions.getOrDefault(type,
ImmutableSet.of()); keep the existing isFirstOnStream branch to initialize
subscribe/unsubscribe, otherwise compute subscribe = current - previous and
unsubscribe = previous - current; after sending the delta update
lastSentSubscriptions.put(type, current) so future diffs are against the last
sent subscription set.
In `@xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java`:
- Around line 81-86: nackResponse currently writes the rejected nonce into
noncesMap immediately, which allows resourcesUpdated to ACK it before the
delayed NACK is sent; instead, keep delayed NACK state separate by storing the
nonce+errorDetail in a separate pendingNacks structure (e.g.,
pendingNacks.put(type, (nonce,errorDetail))) and do NOT mutate noncesMap or
lastAckedVersions until the scheduled task runs; change nackResponse to only
schedule a runnable that, when executed, moves the entry from pendingNacks into
noncesMap and calls sendDiscoveryRequest(type, lastAckedVersions.get(type),
stateCoordinator.interestedResources(type), nonce, errorDetail); ensure
resourcesUpdated continues to consult noncesMap for ACK logic so it will not ACK
the pending NACK prematurely.
---
Nitpick comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java`:
- Around line 55-57: Add an explicit null check for the constructor parameter by
calling Objects.requireNonNull(listener, "listener") at the start of the
ListenerXdsResource(Listener listener, String version) constructor (or delegate
to the three-arg constructor after the check). This ensures the listener
parameter is validated and follows the project's defensive style; reference the
ListenerXdsResource constructor and the listener parameter when making the
change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 2f161e22-80e2-48e0-a21f-ce50cea29636
📒 Files selected for processing (42)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/ConfigSourceLifecycleObserverTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsPreprocessorTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsResourceWatcherTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneErrorHandlingTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/XdsControlPlaneMatrixTest.javatesting-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.javaxds/src/main/java/com/linecorp/armeria/xds/AbstractXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceClient.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceLifecycleObserver.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.javaxds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaDiscoveryStub.javaxds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/EndpointXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceStateStore.javaxds/src/main/java/com/linecorp/armeria/xds/RouteResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/SotwXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.javaxds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.javaxds/src/main/java/com/linecorp/armeria/xds/XdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/XdsResourceException.javaxds/src/main/java/com/linecorp/armeria/xds/XdsResponseHandler.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStreamState.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.javaxds/src/test/java/com/linecorp/armeria/xds/SotwXdsStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.javaxds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.javaxds/src/test/java/com/linecorp/armeria/xds/XdsClientIntegrationTest.java
💤 Files with no reviewable changes (5)
- xds/src/main/java/com/linecorp/armeria/xds/XdsStreamState.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsResponseHandler.java
- xds/src/main/java/com/linecorp/armeria/xds/SotwXdsStream.java
- xds/src/test/java/com/linecorp/armeria/xds/SotwXdsStreamTest.java
- xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java
✅ Files skipped from review due to trivial changes (4)
- testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsResourceException.java
- xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java
🚧 Files skipped from review as they are similar to previous changes (19)
- xds/src/main/java/com/linecorp/armeria/xds/XdsConverterUtil.java
- xds/src/main/java/com/linecorp/armeria/xds/SecretResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/VirtualHostXdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java
- xds/src/main/java/com/linecorp/armeria/xds/DeltaDiscoveryStub.java
- xds/src/main/java/com/linecorp/armeria/xds/EndpointXdsResource.java
- xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
- xds/src/main/java/com/linecorp/armeria/xds/RouteXdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsPreprocessorTest.java
- xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/SecretXdsResource.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/DeltaXdsResourceWatcherTest.java
- xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/DefaultConfigSourceLifecycleObserver.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/ConfigSourceLifecycleObserverTest.java
| backoff, eventLoop, stateCoordinator, lifecycleObserver, | ||
| XdsType.discoverableTypes()); | ||
| } | ||
| } else if (isDelta) { |
There was a problem hiding this comment.
nit) Should we use the else statement so these branches stays at the same level for readability?
if (isAds) {
if (isDelta) {
...
} else {
...
}
} else {
if (isDelta) {
...
} else {
...
}
}| return new ResourceState(ResourceStatus.WAITING_FOR_SERVER, null, 0); | ||
| } | ||
|
|
||
| private static ResourceState absent() { | ||
| return new ResourceState(ResourceStatus.ABSENT, null, 0); |
There was a problem hiding this comment.
Question) Should we cache them and reuse?
There was a problem hiding this comment.
I've decided to just remove absent, waiting states altogether for simplicity.
Now, SubscriberStorage only is affected by subscription/unsubscription, and ResourceStateStore is only affected by discovery responses.
| @Override | ||
| public void resourceUpdated(XdsType type, DeltaDiscoveryResponse response, | ||
| Map<String, Object> updatedResources) { | ||
| if (!updatedResources.isEmpty()) { |
There was a problem hiding this comment.
Can we also log when it's empty?
This PR should be reviewed after #6707 is merged
Subset of #6700
Motivation
Control planes such as Istiod uses the incremental (delta) xDS protocol (
DELTA_GRPC/AGGREGATED_DELTA_GRPC) by default.Additionally, the delta protocol is better defined and actually suits Armeria better in that wildcard queries are not necessary for SOTW types (Listener/Cluster)
Modifications
DeltaActualStreamimplementingDeltaDiscoveryRequest/DeltaDiscoveryResponsewire handling with ACK queueing, subscription diffs and removed-resource signalling.SotwXdsStreaminto a two-layer design:AdsXdsStream,CompositeXdsStream— owns connection lifecycle and retry/reconnect logic; delegates wire I/O to anActualStream(SotwActualStreamorDeltaActualStream).SotwActualStreamis extracted from the oldSotwXdsStream.StateCoordinatorto centralize resource version/revision/subscription trackingResourceStateStoreis responsible for tracking resource states represented by what the server thinks the client has. Each resource has a state (WAITING / VERSIONED / ABSENT).SubscriptionStorageis equivalent towatcher_mapin upstream and manages resource interest.ResourceStateStoreis now responsible for revision managementConfigSourceClientto select SotW vs. delta stream based onapi_type.SubscriberStorageto treatinitial_fetch_timeout: 0sas "no timeout" matching Envoy's documented behavior.Result
api_type: DELTA_GRPCorapi_type: AGGREGATED_DELTA_GRPCto communicate with control planes that prefer the incremental protocol, including Istiod.