diff --git a/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java b/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java index 3d4bcb30f93..21da214ee71 100644 --- a/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/observation/ObservationClient.java @@ -29,7 +29,6 @@ import com.linecorp.armeria.common.RequestHeadersBuilder; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.common.logging.RequestLogProperty; import com.linecorp.armeria.internal.common.RequestContextExtension; import com.linecorp.armeria.server.observation.ObservationService; @@ -138,24 +137,26 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex private static void enrichObservation(ClientRequestContext ctx, ClientObservationContext clientObservationContext, Observation observation) { - ctx.log() - .whenAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME) - .thenAccept(requestLog -> observation.event(Events.WIRE_SEND)); - - ctx.log() - .whenAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME) - .thenAccept(requestLog -> { - if (requestLog.responseFirstBytesTransferredTimeNanos() != null) { - observation.event(Events.WIRE_RECEIVE); - } - }); - - ctx.log().whenComplete() - .thenAccept(requestLog -> { - // TODO: ClientConnectionTimings - there is no way to record events - // with a specific timestamp for an observation - clientObservationContext.setResponse(requestLog); - observation.stop(); - }); + ctx.log().addListener((property, log) -> { + switch (property) { + case REQUEST_FIRST_BYTES_TRANSFERRED_TIME: + observation.event(Events.WIRE_SEND); + break; + case RESPONSE_FIRST_BYTES_TRANSFERRED_TIME: + if (log.responseFirstBytesTransferredTimeNanos() != null) { + observation.event(Events.WIRE_RECEIVE); + } + break; + case ALL_COMPLETE: + // TODO: ClientConnectionTimings - there is no way to record events + // with a specific timestamp for an observation + clientObservationContext.setResponse(log); + observation.stop(); + break; + default: + // Do nothing. + break; + } + }); } } diff --git a/core/src/main/java/com/linecorp/armeria/common/logging/DefaultRequestLog.java b/core/src/main/java/com/linecorp/armeria/common/logging/DefaultRequestLog.java index b4a9185bbf3..dac13587ebc 100644 --- a/core/src/main/java/com/linecorp/armeria/common/logging/DefaultRequestLog.java +++ b/core/src/main/java/com/linecorp/armeria/common/logging/DefaultRequestLog.java @@ -17,7 +17,34 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.linecorp.armeria.common.logging.RequestLogProperty.ALL_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.AUTHENTICATED_USER; import static com.linecorp.armeria.common.logging.RequestLogProperty.FLAGS_ALL_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.FLAGS_REQUEST_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.FLAGS_RESPONSE_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.NAME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_CAUSE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_CONTENT; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_CONTENT_PREVIEW; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_END_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_HEADERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_LENGTH; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_START_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_TRAILERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_CAUSE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_CONTENT; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_CONTENT_PREVIEW; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_END_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_HEADERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_LENGTH; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_START_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_TRAILERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.SCHEME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.SESSION; import static java.util.Objects.requireNonNull; import java.util.ArrayList; @@ -25,12 +52,16 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.Lock; import javax.net.ssl.SSLSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.errorprone.annotations.concurrent.GuardedBy; @@ -81,6 +112,8 @@ final class DefaultRequestLog implements RequestLog, RequestLogBuilder { RequestHeaders.builder(HttpMethod.UNKNOWN, "?").scheme("https").authority("?").build(); private static final ResponseHeaders DUMMY_RESPONSE_HEADERS = ResponseHeaders.of(HttpStatus.UNKNOWN); + private static final Logger logger = LoggerFactory.getLogger(DefaultRequestLog.class); + private final RequestContext ctx; private int currentAttempt; @@ -104,6 +137,8 @@ final class DefaultRequestLog implements RequestLog, RequestLogBuilder { @GuardedBy("lock") private final List pendingFutures = new ArrayList<>(4); + private final List listeners = new CopyOnWriteArrayList<>(); + private final Lock lock = new ReentrantShortLock(); @Nullable private UnmodifiableFuture partiallyCompletedFuture; @@ -177,7 +212,7 @@ final class DefaultRequestLog implements RequestLog, RequestLogBuilder { @Override public boolean isComplete() { - return isComplete(flags); + return isAvailable(ALL_COMPLETE); } private static boolean isComplete(int flags) { @@ -186,7 +221,7 @@ private static boolean isComplete(int flags) { @Override public boolean isRequestComplete() { - return hasInterestedFlags(flags, RequestLogProperty.FLAGS_REQUEST_COMPLETE); + return isAvailable(REQUEST_COMPLETE); } @Override @@ -226,7 +261,7 @@ public RequestLog getIfAvailable(Iterable properties) { return isAvailable(properties) ? this : null; } - private static boolean hasInterestedFlags(int flags, RequestLogProperty property) { + static boolean hasInterestedFlags(int flags, RequestLogProperty property) { return hasInterestedFlags(flags, property.flag()); } @@ -252,6 +287,16 @@ private static int interestedFlags(Iterable properties) { return flags; } + private static List toProperties(int flags) { + final ImmutableList.Builder properties = ImmutableList.builder(); + for (RequestLogProperty property : RequestLogProperty.values()) { + if (hasInterestedFlags(flags, property.flag())) { + properties.add(property); + } + } + return properties.build(); + } + @Override public RequestLog partial() { return partial(flags); @@ -263,12 +308,12 @@ private RequestLog partial(int flags) { @Override public CompletableFuture whenComplete() { - return future(FLAGS_ALL_COMPLETE); + return whenAvailable(ALL_COMPLETE); } @Override public CompletableFuture whenRequestComplete() { - return future(RequestLogProperty.FLAGS_REQUEST_COMPLETE); + return future(REQUEST_COMPLETE.flag()); } @Override @@ -403,7 +448,18 @@ private void updateFlags(RequestLogProperty property) { private void updateFlags(int flags) { for (;;) { final int oldFlags = this.flags; - final int newFlags = oldFlags | flags; + int newFlags = oldFlags | flags; + // Set the corresponding COMPLETE flags if all properties for request/response are available. + if (hasInterestedFlags(newFlags, FLAGS_REQUEST_COMPLETE)) { + newFlags |= REQUEST_COMPLETE.flag(); + } + if (hasInterestedFlags(newFlags, FLAGS_RESPONSE_COMPLETE)) { + newFlags |= RESPONSE_COMPLETE.flag(); + } + if (hasInterestedFlags(newFlags, FLAGS_ALL_COMPLETE)) { + newFlags |= ALL_COMPLETE.flag(); + } + if (oldFlags == newFlags) { break; } @@ -416,6 +472,7 @@ private void updateFlags(int flags) { } finally { lock.unlock(); } + maybeNotifyListeners(oldFlags, newFlags); if (satisfiedFutures != null) { final RequestLog log = partial(newFlags); completeSatisfiedFutures(satisfiedFutures, log, ctx); @@ -513,8 +570,8 @@ public void defer(Iterable properties) { } private void defer(int flag) { - if (hasInterestedFlags(flag, RequestLogProperty.REQUEST_CONTENT.flag())) { - flag |= RequestLogProperty.NAME.flag(); + if (hasInterestedFlags(flag, REQUEST_CONTENT.flag())) { + flag |= NAME.flag(); } for (;;) { @@ -557,41 +614,49 @@ public void addChild(RequestLogAccess child) { private void propagateRequestSideLog(RequestLogAccess child) { // Update the available properties always by adding a callback, // because the child's properties will never be available immediately. - child.whenAvailable(RequestLogProperty.REQUEST_START_TIME) - .thenAccept(log -> startRequest(log.requestStartTimeNanos(), log.requestStartTimeMicros())); - child.whenAvailable(RequestLogProperty.SESSION) - .thenAccept(log -> session(log.channel(), log.sessionProtocol(), - log.sslSession(), log.connectionTimings())); - child.whenAvailable(RequestLogProperty.SCHEME) - .thenAccept(log -> serializationFormat(log.scheme().serializationFormat())); - child.whenAvailable(RequestLogProperty.NAME) - .thenAccept(log -> { - final String serviceName = log.serviceName(); - final String name = log.name(); - if (serviceName != null) { - name(serviceName, name); - } else { - name(name); - } - }); - child.whenAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME) - .thenAccept(log -> { - final Long timeNanos = log.requestFirstBytesTransferredTimeNanos(); - if (timeNanos != null) { - requestFirstBytesTransferred(timeNanos); - } - }); - child.whenAvailable(RequestLogProperty.REQUEST_HEADERS) - .thenAccept(log -> requestHeaders(log.requestHeaders())); - child.whenAvailable(RequestLogProperty.REQUEST_CONTENT) - .thenAccept(log -> requestContent(log.requestContent(), log.rawRequestContent())); - child.whenRequestComplete().thenAccept(log -> { - requestLength(log.requestLength()); - requestContentPreview(log.requestContentPreview()); - requestTrailers(log.requestTrailers()); - // Note that we do not propagate `requestCause` because otherwise the request which succeeded after - // retries can be considered to have failed. - endRequest0(/* requestCause */ null, log.requestEndTimeNanos()); + child.addListener((property, log) -> { + switch (property) { + case REQUEST_START_TIME: + startRequest(log.requestStartTimeNanos(), log.requestStartTimeMicros()); + break; + case SESSION: + session(log.channel(), log.sessionProtocol(), log.sslSession(), log.connectionTimings()); + break; + case SCHEME: + serializationFormat(log.scheme().serializationFormat()); + break; + case NAME: + final String serviceName = log.serviceName(); + final String name = log.name(); + if (serviceName != null) { + name(serviceName, name); + } else { + name(name); + } + break; + case REQUEST_FIRST_BYTES_TRANSFERRED_TIME: + final Long timeNanos = log.requestFirstBytesTransferredTimeNanos(); + if (timeNanos != null) { + requestFirstBytesTransferred(timeNanos); + } + break; + case REQUEST_HEADERS: + requestHeaders(log.requestHeaders()); + break; + case REQUEST_CONTENT: + requestContent(log.requestContent(), log.rawRequestContent()); + break; + case REQUEST_COMPLETE: + requestLength(log.requestLength()); + requestContentPreview(log.requestContentPreview()); + requestTrailers(log.requestTrailers()); + // Note that we do not propagate `requestCause` because otherwise the request which + // succeeded after retries can be considered to have failed. + endRequest0(/* requestCause */ null, log.requestEndTimeNanos()); + break; + default: + break; + } }); } @@ -614,84 +679,60 @@ public void endResponseWithChild(RequestLogAccess child) { } private void propagateResponseSideLog(RequestLog childLog) { - if (childLog.isAvailable(RequestLogProperty.RESPONSE_CAUSE)) { - // Update responseCause first if available because callbacks of the other properties may need it - // to retry or open circuit breakers. - final Throwable responseCause = childLog.responseCause(); - if (responseCause != null) { - responseCause(responseCause); + childLog.addListener((property, log) -> { + switch (property) { + case RESPONSE_CAUSE: + // Update responseCause first if available because callbacks of the other properties may + // need it to retry or open circuit breakers. + final Throwable responseCause = log.responseCause(); + if (responseCause != null) { + responseCause(responseCause); + } + break; + case RESPONSE_START_TIME: + startResponse(log.responseStartTimeNanos(), log.responseStartTimeMicros(), true); + break; + case RESPONSE_FIRST_BYTES_TRANSFERRED_TIME: + final Long timeNanos = log.responseFirstBytesTransferredTimeNanos(); + if (timeNanos != null) { + responseFirstBytesTransferred(timeNanos); + } + break; + case RESPONSE_HEADERS: + responseHeaders(log.responseHeaders()); + break; + case RESPONSE_TRAILERS: + responseTrailers(log.responseTrailers()); + break; + case RESPONSE_COMPLETE: + responseContent(log.responseContent(), log.rawResponseContent()); + responseLength(log.responseLength()); + responseContentPreview(log.responseContentPreview()); + responseTrailers(log.responseTrailers()); + endResponse0(log.responseCause(), log.responseEndTimeNanos()); + break; + default: + break; } - } - - // Update the available properties without adding a callback if the childLog already has them. - if (childLog.isAvailable(RequestLogProperty.RESPONSE_START_TIME)) { - startResponse(childLog.responseStartTimeNanos(), childLog.responseStartTimeMicros(), true); - } else { - childLog.whenAvailable(RequestLogProperty.RESPONSE_START_TIME) - .thenAccept(log -> startResponse(log.responseStartTimeNanos(), - log.responseStartTimeMicros(), true)); - } - - if (childLog.isAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME)) { - final Long timeNanos = childLog.responseFirstBytesTransferredTimeNanos(); - if (timeNanos != null) { - responseFirstBytesTransferred(timeNanos); - } - } else { - childLog.whenAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME) - .thenAccept(log -> { - final Long timeNanos = log.responseFirstBytesTransferredTimeNanos(); - if (timeNanos != null) { - responseFirstBytesTransferred(timeNanos); - } - }); - } - - if (childLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS)) { - responseHeaders(childLog.responseHeaders()); - } else { - childLog.whenAvailable(RequestLogProperty.RESPONSE_HEADERS) - .thenAccept(log -> responseHeaders(log.responseHeaders())); - } - - if (childLog.isAvailable(RequestLogProperty.RESPONSE_TRAILERS)) { - responseTrailers(childLog.responseTrailers()); - } else { - childLog.whenAvailable(RequestLogProperty.RESPONSE_TRAILERS) - .thenAccept(log -> responseTrailers(log.responseTrailers())); - } - - if (childLog.isComplete()) { - propagateResponseEndData(childLog); - } else { - childLog.whenComplete().thenAccept(this::propagateResponseEndData); - } - } - - private void propagateResponseEndData(RequestLog log) { - responseContent(log.responseContent(), log.rawResponseContent()); - responseLength(log.responseLength()); - responseContentPreview(log.responseContentPreview()); - responseTrailers(log.responseTrailers()); - endResponse0(log.responseCause(), log.responseEndTimeNanos()); + }); } // Request-side methods. @Override public void startRequest(long requestStartTimeNanos, long requestStartTimeMicros) { - if (isAvailable(RequestLogProperty.REQUEST_START_TIME)) { + if (isAvailable(REQUEST_START_TIME)) { return; } this.requestStartTimeNanos = requestStartTimeNanos; this.requestStartTimeMicros = requestStartTimeMicros; - updateFlags(RequestLogProperty.REQUEST_START_TIME); + updateFlags(REQUEST_START_TIME); } @Override public long requestStartTimeMicros() { - ensureAvailable(RequestLogProperty.REQUEST_START_TIME); + ensureAvailable(REQUEST_START_TIME); return requestStartTimeMicros; } @@ -702,40 +743,40 @@ public long requestStartTimeMillis() { @Override public long requestStartTimeNanos() { - ensureAvailable(RequestLogProperty.REQUEST_START_TIME); + ensureAvailable(REQUEST_START_TIME); return requestStartTimeNanos; } @Nullable @Override public Long requestFirstBytesTransferredTimeNanos() { - ensureAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME); + ensureAvailable(REQUEST_FIRST_BYTES_TRANSFERRED_TIME); return requestFirstBytesTransferredTimeNanosSet ? requestFirstBytesTransferredTimeNanos : null; } @Override public long requestEndTimeNanos() { - ensureAvailable(RequestLogProperty.REQUEST_END_TIME); + ensureAvailable(REQUEST_END_TIME); return requestEndTimeNanos; } @Override public long requestDurationNanos() { - ensureAvailable(RequestLogProperty.REQUEST_END_TIME); + ensureAvailable(REQUEST_END_TIME); return requestEndTimeNanos - requestStartTimeNanos; } @Nullable @Override public Throwable requestCause() { - ensureAvailable(RequestLogProperty.REQUEST_CAUSE); + ensureAvailable(REQUEST_CAUSE); return requestCause; } @Override public void session(@Nullable Channel channel, SessionProtocol sessionProtocol, @Nullable ClientConnectionTimings connectionTimings) { - if (isAvailable(RequestLogProperty.SESSION)) { + if (isAvailable(SESSION)) { return; } @@ -747,7 +788,7 @@ public void session(@Nullable Channel channel, SessionProtocol sessionProtocol, @Override public void session(@Nullable Channel channel, SessionProtocol sessionProtocol, @Nullable SSLSession sslSession, @Nullable ClientConnectionTimings connectionTimings) { - if (isAvailable(RequestLogProperty.SESSION)) { + if (isAvailable(SESSION)) { return; } @@ -763,37 +804,37 @@ private void session0(@Nullable Channel channel, SessionProtocol sessionProtocol this.sessionProtocol = sessionProtocol; this.connectionTimings = connectionTimings; maybeSetScheme(); - updateFlags(RequestLogProperty.SESSION); + updateFlags(SESSION); } private void maybeSetScheme() { - if (isAvailable(RequestLogProperty.SCHEME) || + if (isAvailable(SCHEME) || serializationFormat == SerializationFormat.NONE) { return; } assert sessionProtocol != null; scheme = Scheme.of(serializationFormat, sessionProtocol); - updateFlags(RequestLogProperty.SCHEME); + updateFlags(SCHEME); } @Nullable @Override public Channel channel() { - ensureAvailable(RequestLogProperty.SESSION); + ensureAvailable(SESSION); return channel; } @Nullable @Override public SSLSession sslSession() { - ensureAvailable(RequestLogProperty.SESSION); + ensureAvailable(SESSION); return sslSession; } @Override public SessionProtocol sessionProtocol() { - ensureAvailable(RequestLogProperty.SESSION); + ensureAvailable(SESSION); assert sessionProtocol != null; return sessionProtocol; } @@ -801,20 +842,20 @@ public SessionProtocol sessionProtocol() { @Nullable @Override public ClientConnectionTimings connectionTimings() { - ensureAvailable(RequestLogProperty.SESSION); + ensureAvailable(SESSION); return connectionTimings; } @Override public void serializationFormat(SerializationFormat serializationFormat) { - if (isAvailable(RequestLogProperty.SCHEME) || this.serializationFormat != SerializationFormat.NONE) { + if (isAvailable(SCHEME) || this.serializationFormat != SerializationFormat.NONE) { return; } this.serializationFormat = requireNonNull(serializationFormat, "serializationFormat"); if (sessionProtocol != null) { scheme = Scheme.of(serializationFormat, sessionProtocol); - updateFlags(RequestLogProperty.SCHEME); + updateFlags(SCHEME); } } @@ -825,7 +866,7 @@ public SerializationFormat serializationFormat() { @Override public Scheme scheme() { - ensureAvailable(RequestLogProperty.SCHEME); + ensureAvailable(SCHEME); assert scheme != null; return scheme; } @@ -833,13 +874,13 @@ public Scheme scheme() { @Nullable @Override public String serviceName() { - ensureAvailable(RequestLogProperty.NAME); + ensureAvailable(NAME); return serviceName; } @Override public String name() { - ensureAvailable(RequestLogProperty.NAME); + ensureAvailable(NAME); assert name != null; return name; } @@ -849,12 +890,12 @@ public void name(String name) { requireNonNull(name, "name"); checkArgument(!name.isEmpty(), "name is empty."); - if (isAvailable(RequestLogProperty.NAME)) { + if (isAvailable(NAME)) { return; } this.name = name; - updateFlags(RequestLogProperty.NAME); + updateFlags(NAME); } @Override @@ -864,18 +905,18 @@ public void name(String serviceName, String name) { requireNonNull(name, "name"); checkArgument(!name.isEmpty(), "name is empty."); - if (isAvailable(RequestLogProperty.NAME)) { + if (isAvailable(NAME)) { return; } this.serviceName = serviceName; this.name = name; - updateFlags(RequestLogProperty.NAME); + updateFlags(NAME); } @Override public String fullName() { - ensureAvailable(RequestLogProperty.NAME); + ensureAvailable(NAME); if (fullName != null) { return fullName; } @@ -892,13 +933,13 @@ public String fullName() { @Nullable @Override public String authenticatedUser() { - ensureAvailable(RequestLogProperty.AUTHENTICATED_USER); + ensureAvailable(AUTHENTICATED_USER); return authenticatedUser; } @Override public void authenticatedUser(String authenticatedUser) { - if (isAvailable(RequestLogProperty.AUTHENTICATED_USER)) { + if (isAvailable(AUTHENTICATED_USER)) { return; } this.authenticatedUser = requireNonNull(authenticatedUser, "authenticatedUser"); @@ -906,7 +947,7 @@ public void authenticatedUser(String authenticatedUser) { @Override public long requestLength() { - ensureAvailable(RequestLogProperty.REQUEST_LENGTH); + ensureAvailable(REQUEST_LENGTH); return requestLength; } @@ -916,17 +957,17 @@ public void requestLength(long requestLength) { throw new IllegalArgumentException("requestLength: " + requestLength + " (expected: >= 0)"); } - if (isAvailable(RequestLogProperty.REQUEST_LENGTH)) { + if (isAvailable(REQUEST_LENGTH)) { return; } this.requestLength = requestLength; - updateFlags(RequestLogProperty.REQUEST_LENGTH); + updateFlags(REQUEST_LENGTH); } @Override public void requestFirstBytesTransferred() { - if (isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { + if (isAvailable(REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { return; } requestFirstBytesTransferred0(System.nanoTime()); @@ -934,7 +975,7 @@ public void requestFirstBytesTransferred() { @Override public void requestFirstBytesTransferred(long requestFirstBytesTransferredTimeNanos) { - if (isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { + if (isAvailable(REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { return; } requestFirstBytesTransferred0(requestFirstBytesTransferredTimeNanos); @@ -943,7 +984,7 @@ public void requestFirstBytesTransferred(long requestFirstBytesTransferredTimeNa private void requestFirstBytesTransferred0(long requestFirstBytesTransferredTimeNanos) { this.requestFirstBytesTransferredTimeNanos = requestFirstBytesTransferredTimeNanos; requestFirstBytesTransferredTimeNanosSet = true; - updateFlags(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME); + updateFlags(REQUEST_FIRST_BYTES_TRANSFERRED_TIME); } @Override @@ -952,7 +993,7 @@ public void increaseRequestLength(long deltaBytes) { throw new IllegalArgumentException("deltaBytes: " + deltaBytes + " (expected: >= 0)"); } - if (isAvailable(RequestLogProperty.REQUEST_LENGTH)) { + if (isAvailable(REQUEST_LENGTH)) { return; } @@ -967,31 +1008,31 @@ public void increaseRequestLength(HttpData data) { @Override public RequestHeaders requestHeaders() { - ensureAvailable(RequestLogProperty.REQUEST_HEADERS); + ensureAvailable(REQUEST_HEADERS); assert requestHeaders != null; return requestHeaders; } @Override public void requestHeaders(RequestHeaders requestHeaders) { - if (isAvailable(RequestLogProperty.REQUEST_HEADERS)) { + if (isAvailable(REQUEST_HEADERS)) { return; } this.requestHeaders = requireNonNull(requestHeaders, "requestHeaders"); - updateFlags(RequestLogProperty.REQUEST_HEADERS); + updateFlags(REQUEST_HEADERS); } @Nullable @Override public Object requestContent() { - ensureAvailable(RequestLogProperty.REQUEST_CONTENT); + ensureAvailable(REQUEST_CONTENT); return requestContent; } @Override public void requestContent(@Nullable Object requestContent, @Nullable Object rawRequestContent) { - if (isAvailable(RequestLogProperty.REQUEST_CONTENT)) { + if (isAvailable(REQUEST_CONTENT)) { return; } @@ -1000,48 +1041,48 @@ public void requestContent(@Nullable Object requestContent, @Nullable Object raw if (requestContent instanceof RpcRequest && ctx.rpcRequest() == null) { ctx.updateRpcRequest((RpcRequest) requestContent); } - updateFlags(RequestLogProperty.REQUEST_CONTENT); + updateFlags(REQUEST_CONTENT); setNamesIfAbsent(); } @Nullable @Override public Object rawRequestContent() { - ensureAvailable(RequestLogProperty.REQUEST_CONTENT); + ensureAvailable(REQUEST_CONTENT); return rawRequestContent; } @Nullable @Override public String requestContentPreview() { - ensureAvailable(RequestLogProperty.REQUEST_CONTENT_PREVIEW); + ensureAvailable(REQUEST_CONTENT_PREVIEW); return requestContentPreview; } @Override public void requestContentPreview(@Nullable String requestContentPreview) { - if (isAvailable(RequestLogProperty.REQUEST_CONTENT_PREVIEW)) { + if (isAvailable(REQUEST_CONTENT_PREVIEW)) { return; } this.requestContentPreview = requestContentPreview; - updateFlags(RequestLogProperty.REQUEST_CONTENT_PREVIEW); + updateFlags(REQUEST_CONTENT_PREVIEW); } @Override public HttpHeaders requestTrailers() { - ensureAvailable(RequestLogProperty.REQUEST_TRAILERS); + ensureAvailable(REQUEST_TRAILERS); return requestTrailers; } @Override public void requestTrailers(HttpHeaders requestTrailers) { - if (isAvailable(RequestLogProperty.REQUEST_TRAILERS)) { + if (isAvailable(REQUEST_TRAILERS)) { return; } requireNonNull(requestTrailers, "requestTrailers"); this.requestTrailers = requestTrailers; - updateFlags(RequestLogProperty.REQUEST_TRAILERS); + updateFlags(REQUEST_TRAILERS); } @Override @@ -1077,13 +1118,13 @@ private void endRequest0(@Nullable Throwable requestCause, long requestEndTimeNa final int deferredFlags; if (requestCause != null) { // Will auto-fill request content and its preview if request has failed. - deferredFlags = this.deferredFlags & ~(RequestLogProperty.REQUEST_CONTENT.flag() | - RequestLogProperty.REQUEST_CONTENT_PREVIEW.flag()); + deferredFlags = this.deferredFlags & ~(REQUEST_CONTENT.flag() | + REQUEST_CONTENT_PREVIEW.flag()); } else { deferredFlags = this.deferredFlags; } - final int flags = RequestLogProperty.FLAGS_REQUEST_COMPLETE & ~deferredFlags; + final int flags = FLAGS_REQUEST_COMPLETE & ~deferredFlags; if (isAvailable(flags)) { return; } @@ -1110,7 +1151,7 @@ private void endRequest0(@Nullable Throwable requestCause, long requestEndTimeNa } // Set names if request content is not deferred - if (!hasInterestedFlags(deferredFlags, RequestLogProperty.REQUEST_CONTENT)) { + if (!hasInterestedFlags(deferredFlags, REQUEST_CONTENT)) { setNamesIfAbsent(); } @@ -1166,7 +1207,7 @@ private void setNamesIfAbsent() { serviceName = newServiceName; name = newName; - updateFlags(RequestLogProperty.NAME); + updateFlags(NAME); } } @@ -1183,20 +1224,20 @@ public void startResponse(long responseStartTimeNanos, long responseStartTimeMic } private void startResponse(long responseStartTimeNanos, long responseStartTimeMicros, boolean updateFlags) { - if (isAvailable(RequestLogProperty.RESPONSE_START_TIME)) { + if (isAvailable(RESPONSE_START_TIME)) { return; } this.responseStartTimeNanos = responseStartTimeNanos; this.responseStartTimeMicros = responseStartTimeMicros; if (updateFlags) { - updateFlags(RequestLogProperty.RESPONSE_START_TIME); + updateFlags(RESPONSE_START_TIME); } } @Override public long responseStartTimeMicros() { - ensureAvailable(RequestLogProperty.RESPONSE_START_TIME); + ensureAvailable(RESPONSE_START_TIME); return responseStartTimeMicros; } @@ -1207,45 +1248,45 @@ public long responseStartTimeMillis() { @Override public long responseStartTimeNanos() { - ensureAvailable(RequestLogProperty.RESPONSE_START_TIME); + ensureAvailable(RESPONSE_START_TIME); return responseStartTimeNanos; } @Nullable @Override public Long responseFirstBytesTransferredTimeNanos() { - ensureAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME); + ensureAvailable(RESPONSE_FIRST_BYTES_TRANSFERRED_TIME); return responseFirstBytesTransferredTimeNanosSet ? responseFirstBytesTransferredTimeNanos : null; } @Override public long responseEndTimeNanos() { - ensureAvailable(RequestLogProperty.RESPONSE_END_TIME); + ensureAvailable(RESPONSE_END_TIME); return responseEndTimeNanos; } @Override public long responseDurationNanos() { - ensureAvailable(RequestLogProperty.RESPONSE_END_TIME); + ensureAvailable(RESPONSE_END_TIME); return responseEndTimeNanos - responseStartTimeNanos; } @Override public long totalDurationNanos() { - ensureAvailable(RequestLogProperty.RESPONSE_END_TIME); + ensureAvailable(RESPONSE_END_TIME); return responseEndTimeNanos - requestStartTimeNanos; } @Nullable @Override public Throwable responseCause() { - ensureAvailable(RequestLogProperty.RESPONSE_CAUSE); + ensureAvailable(RESPONSE_CAUSE); return responseCause; } @Override public void responseCause(Throwable cause) { - if (isAvailable(RequestLogProperty.RESPONSE_CAUSE)) { + if (isAvailable(RESPONSE_CAUSE)) { return; } @@ -1255,7 +1296,7 @@ public void responseCause(Throwable cause) { @Override public long responseLength() { - ensureAvailable(RequestLogProperty.RESPONSE_LENGTH); + ensureAvailable(RESPONSE_LENGTH); return responseLength; } @@ -1265,7 +1306,7 @@ public void responseLength(long responseLength) { throw new IllegalArgumentException("responseLength: " + responseLength + " (expected: >= 0)"); } - if (isAvailable(RequestLogProperty.RESPONSE_LENGTH)) { + if (isAvailable(RESPONSE_LENGTH)) { return; } @@ -1274,7 +1315,7 @@ public void responseLength(long responseLength) { @Override public void responseFirstBytesTransferred() { - if (isAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME)) { + if (isAvailable(RESPONSE_FIRST_BYTES_TRANSFERRED_TIME)) { return; } responseFirstBytesTransferred0(System.nanoTime()); @@ -1282,7 +1323,7 @@ public void responseFirstBytesTransferred() { @Override public void responseFirstBytesTransferred(long responseFirstBytesTransferredTimeNanos) { - if (isAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME)) { + if (isAvailable(RESPONSE_FIRST_BYTES_TRANSFERRED_TIME)) { return; } responseFirstBytesTransferred0(responseFirstBytesTransferredTimeNanos); @@ -1291,7 +1332,7 @@ public void responseFirstBytesTransferred(long responseFirstBytesTransferredTime private void responseFirstBytesTransferred0(long responseFirstBytesTransferredTimeNanos) { this.responseFirstBytesTransferredTimeNanos = responseFirstBytesTransferredTimeNanos; responseFirstBytesTransferredTimeNanosSet = true; - updateFlags(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME); + updateFlags(RESPONSE_FIRST_BYTES_TRANSFERRED_TIME); } @Override @@ -1300,7 +1341,7 @@ public void increaseResponseLength(long deltaBytes) { throw new IllegalArgumentException("deltaBytes: " + deltaBytes + " (expected: >= 0)"); } - if (isAvailable(RequestLogProperty.RESPONSE_LENGTH)) { + if (isAvailable(RESPONSE_LENGTH)) { return; } @@ -1315,31 +1356,31 @@ public void increaseResponseLength(HttpData data) { @Override public ResponseHeaders responseHeaders() { - ensureAvailable(RequestLogProperty.RESPONSE_HEADERS); + ensureAvailable(RESPONSE_HEADERS); assert responseHeaders != null; return responseHeaders; } @Override public void responseHeaders(ResponseHeaders responseHeaders) { - if (isAvailable(RequestLogProperty.RESPONSE_HEADERS)) { + if (isAvailable(RESPONSE_HEADERS)) { return; } this.responseHeaders = requireNonNull(responseHeaders, "responseHeaders"); - updateFlags(RequestLogProperty.RESPONSE_HEADERS); + updateFlags(RESPONSE_HEADERS); } @Nullable @Override public Object responseContent() { - ensureAvailable(RequestLogProperty.RESPONSE_CONTENT); + ensureAvailable(RESPONSE_CONTENT); return responseContent; } @Override public void responseContent(@Nullable Object responseContent, @Nullable Object rawResponseContent) { - if (isAvailable(RequestLogProperty.RESPONSE_CONTENT)) { + if (isAvailable(RESPONSE_CONTENT)) { return; } @@ -1356,48 +1397,48 @@ public void responseContent(@Nullable Object responseContent, @Nullable Object r this.responseContent = responseContent; this.rawResponseContent = rawResponseContent; - updateFlags(RequestLogProperty.RESPONSE_CONTENT); + updateFlags(RESPONSE_CONTENT); } @Nullable @Override public String responseContentPreview() { - ensureAvailable(RequestLogProperty.RESPONSE_CONTENT_PREVIEW); + ensureAvailable(RESPONSE_CONTENT_PREVIEW); return responseContentPreview; } @Override public void responseContentPreview(@Nullable String responseContentPreview) { - if (isAvailable(RequestLogProperty.RESPONSE_CONTENT_PREVIEW)) { + if (isAvailable(RESPONSE_CONTENT_PREVIEW)) { return; } this.responseContentPreview = responseContentPreview; - updateFlags(RequestLogProperty.RESPONSE_CONTENT_PREVIEW); + updateFlags(RESPONSE_CONTENT_PREVIEW); } @Nullable @Override public Object rawResponseContent() { - ensureAvailable(RequestLogProperty.RESPONSE_CONTENT); + ensureAvailable(RESPONSE_CONTENT); return rawResponseContent; } @Override public HttpHeaders responseTrailers() { - ensureAvailable(RequestLogProperty.RESPONSE_TRAILERS); + ensureAvailable(RESPONSE_TRAILERS); return responseTrailers; } @Override public void responseTrailers(HttpHeaders responseTrailers) { - if (isAvailable(RequestLogProperty.RESPONSE_TRAILERS)) { + if (isAvailable(RESPONSE_TRAILERS)) { return; } requireNonNull(responseTrailers, "responseTrailers"); this.responseTrailers = responseTrailers; - updateFlags(RequestLogProperty.RESPONSE_TRAILERS); + updateFlags(RESPONSE_TRAILERS); } @Override @@ -1428,13 +1469,13 @@ private void endResponse0(@Nullable Throwable responseCause, long responseEndTim final int deferredFlags; if (responseCause != null) { // Will auto-fill response content and its preview if response has failed. - deferredFlags = this.deferredFlags & ~(RequestLogProperty.RESPONSE_CONTENT.flag() | - RequestLogProperty.RESPONSE_CONTENT_PREVIEW.flag()); + deferredFlags = this.deferredFlags & ~(RESPONSE_CONTENT.flag() | + RESPONSE_CONTENT_PREVIEW.flag()); } else { deferredFlags = this.deferredFlags; } - final int flags = RequestLogProperty.FLAGS_RESPONSE_COMPLETE & ~deferredFlags; + final int flags = FLAGS_RESPONSE_COMPLETE & ~deferredFlags; if (isAvailable(flags)) { return; } @@ -1467,7 +1508,39 @@ private void setResponseCause(@Nullable Throwable responseCause, boolean updateF if (responseCause != null) { this.responseCause = responseCause; if (updateFlag) { - updateFlags(RequestLogProperty.RESPONSE_CAUSE); + updateFlags(RESPONSE_CAUSE); + } + } + } + + @Override + public void addListener(RequestLogListener listener) { + requireNonNull(listener, "listener"); + listener = new IdempotentRequestLogListener(listener); + listeners.add(listener); + + // Notify the listener of all already available properties. + for (RequestLogProperty property : toProperties(flags)) { + notifyListener(listener, property); + } + } + + private void notifyListener(RequestLogListener listener, RequestLogProperty property) { + try { + listener.onEvent(property, this); + } catch (Throwable t) { + logger.warn("An exception was raised from a RequestLogListener: {}", listener, t); + } + } + + private void maybeNotifyListeners(int oldFlags, int newFlags) { + if (!listeners.isEmpty()) { + final int addedFlags = newFlags & ~oldFlags; + final List properties = toProperties(addedFlags); + for (RequestLogProperty newProp : properties) { + for (RequestLogListener listener : listeners) { + notifyListener(listener, newProp); + } } } } @@ -1880,6 +1953,14 @@ public HttpHeaders responseTrailers() { return responseTrailers; } + @Override + public void addListener(RequestLogListener listener) { + requireNonNull(listener, "listener"); + for (RequestLogProperty property : RequestLogProperty.values()) { + notifyListener(listener, property); + } + } + @Override public String toString() { return DefaultRequestLog.this.toString(); diff --git a/core/src/main/java/com/linecorp/armeria/common/logging/IdempotentRequestLogListener.java b/core/src/main/java/com/linecorp/armeria/common/logging/IdempotentRequestLogListener.java new file mode 100644 index 00000000000..37be49eb8f3 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/logging/IdempotentRequestLogListener.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.logging; + +import static com.linecorp.armeria.common.logging.DefaultRequestLog.hasInterestedFlags; + +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; + +/** + * A {@link RequestLogListener} that delivers each event only once to the delegate listener. + */ +final class IdempotentRequestLogListener extends ReentrantShortLock implements RequestLogListener { + + private static final long serialVersionUID = -573237359665852226L; + + private final RequestLogListener delegate; + private int notifiedFlags; + + IdempotentRequestLogListener(RequestLogListener delegate) { + this.delegate = delegate; + } + + @Override + public void onEvent(RequestLogProperty property, RequestLog log) { + lock(); + try { + if (hasInterestedFlags(notifiedFlags, property)) { + // Already notified. + return; + } + notifiedFlags |= property.flag(); + } finally { + unlock(); + } + delegate.onEvent(property, log); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogAccess.java b/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogAccess.java index 69bdff63359..783368a0670 100644 --- a/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogAccess.java +++ b/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogAccess.java @@ -230,6 +230,13 @@ public interface RequestLogAccess { */ RequestLog ensureAvailable(Iterable properties); + /** + * Adds the specified {@link RequestLogListener} which will be invoked when a {@link RequestLogProperty} + * becomes available. + */ + @UnstableApi + void addListener(RequestLogListener listener); + /** * Returns the {@link RequestLog} for the {@link Request}, where all properties may not be available yet. * Note that this method is potentially unsafe; an attempt to access an unavailable property will trigger diff --git a/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogListener.java b/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogListener.java new file mode 100644 index 00000000000..6e027cc0cf8 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogListener.java @@ -0,0 +1,39 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.logging; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * A listener that listens to all events of a {@link RequestLog}. + * + *

If a {@link RequestLogProperty} was completed before adding this listener to the {@link RequestLog}, + * the {@link #onEvent(RequestLogProperty, RequestLog)} method will be invoked immediately with the already + * completed property upon adding the listener. + * + *

Note that this listener may be invoked in the I/O worker thread so make sure to offload any blocking + * operations to a separate thread pool. + */ +@UnstableApi +@FunctionalInterface +public interface RequestLogListener { + + /** + * Invoked when the specified {@link RequestLogProperty} is completed. + */ + void onEvent(RequestLogProperty property, RequestLog log); +} diff --git a/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogProperty.java b/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogProperty.java index d75ff0a8526..b6f4a1bc97e 100644 --- a/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogProperty.java +++ b/core/src/main/java/com/linecorp/armeria/common/logging/RequestLogProperty.java @@ -37,16 +37,6 @@ public enum RequestLogProperty { */ REQUEST_START_TIME(true), - /** - * {@link RequestLog#requestEndTimeNanos()}, {@link RequestLog#requestDurationNanos()}. - */ - REQUEST_END_TIME(true), - - /** - * {@link RequestLog#requestFirstBytesTransferredTimeNanos()}. - */ - REQUEST_FIRST_BYTES_TRANSFERRED_TIME(true), - /** * {@link RequestLog#channel()}, {@link RequestLog#sessionProtocol()}, {@link RequestLog#sslSession()}, * {@link RequestLog#connectionTimings()}. @@ -73,6 +63,11 @@ public enum RequestLogProperty { */ REQUEST_HEADERS(true), + /** + * {@link RequestLog#requestFirstBytesTransferredTimeNanos()}. + */ + REQUEST_FIRST_BYTES_TRANSFERRED_TIME(true), + /** * {@link RequestLog#requestContent()}, {@link RequestLog#rawRequestContent()}. */ @@ -98,8 +93,24 @@ public enum RequestLogProperty { */ REQUEST_CAUSE(true), + /** + * {@link RequestLog#requestEndTimeNanos()}, {@link RequestLog#requestDurationNanos()}. + */ + REQUEST_END_TIME(true), + + /** + * Indicates that the request is complete and all request properties are available. + */ + REQUEST_COMPLETE(true), + // Response properties + /** + * {@link RequestLog#responseCause()}. + */ + // Notify the response cause before other response properties to propagate the cause as early as possible. + RESPONSE_CAUSE(false), + /** * {@link RequestLog#responseStartTimeMicros()}, {@link RequestLog#responseStartTimeMillis()}, * {@link RequestLog#responseStartTimeNanos()}. @@ -107,21 +118,15 @@ public enum RequestLogProperty { RESPONSE_START_TIME(false), /** - * {@link RequestLog#responseEndTimeNanos()}, {@link RequestLog#responseDurationNanos()}, - * {@link RequestLog#totalDurationNanos()}. + * {@link RequestLog#responseHeaders()}. */ - RESPONSE_END_TIME(false), + RESPONSE_HEADERS(false), /** * {@link RequestLog#responseFirstBytesTransferredTimeNanos()}. */ RESPONSE_FIRST_BYTES_TRANSFERRED_TIME(false), - /** - * {@link RequestLog#responseHeaders()}. - */ - RESPONSE_HEADERS(false), - /** * {@link RequestLog#responseContent()}. */ @@ -132,20 +137,32 @@ public enum RequestLogProperty { */ RESPONSE_CONTENT_PREVIEW(false), + /** + * {@link RequestLog#responseLength()}. + */ + // TODO(ikhoon): Check if this property is actually used anywhere. + RESPONSE_LENGTH(false), + /** * {@link RequestLog#responseTrailers()}. */ RESPONSE_TRAILERS(false), /** - * {@link RequestLog#responseLength()}. + * {@link RequestLog#responseEndTimeNanos()}, {@link RequestLog#responseDurationNanos()}, + * {@link RequestLog#totalDurationNanos()}. */ - RESPONSE_LENGTH(false), + RESPONSE_END_TIME(false), /** - * {@link RequestLog#responseCause()}. + * Indicates that the response is complete and all response properties are available. + */ + RESPONSE_COMPLETE(false), + + /** + * Indicates that both the request and response are complete and all properties are available. */ - RESPONSE_CAUSE(false); + ALL_COMPLETE(false); private static final Set REQUEST_PROPERTIES = Arrays.stream(values()) @@ -154,7 +171,7 @@ public enum RequestLogProperty { private static final Set RESPONSE_PROPERTIES = Arrays.stream(values()) - .filter(p -> !p.isRequestProperty) + .filter(p -> !p.isRequestProperty && p != ALL_COMPLETE) .collect(Sets.toImmutableEnumSet()); private static final Set ALL_PROPERTIES = @@ -165,8 +182,8 @@ public enum RequestLogProperty { static final int FLAGS_ALL_COMPLETE; static { - FLAGS_REQUEST_COMPLETE = flags(REQUEST_PROPERTIES); - FLAGS_RESPONSE_COMPLETE = flags(RESPONSE_PROPERTIES); + FLAGS_REQUEST_COMPLETE = flags(REQUEST_PROPERTIES) & ~REQUEST_COMPLETE.flag(); + FLAGS_RESPONSE_COMPLETE = flags(RESPONSE_PROPERTIES) & ~RESPONSE_COMPLETE.flag(); FLAGS_ALL_COMPLETE = FLAGS_REQUEST_COMPLETE | FLAGS_RESPONSE_COMPLETE; } diff --git a/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java b/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java index a86a85b3d14..2a22c9ae118 100644 --- a/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java +++ b/core/src/main/java/com/linecorp/armeria/server/observation/ObservationService.java @@ -26,7 +26,6 @@ import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.common.logging.RequestLogProperty; import com.linecorp.armeria.internal.common.RequestContextExtension; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServiceRequestContext; @@ -140,23 +139,24 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc private static void enrichObservation(ServiceRequestContext ctx, ServiceObservationContext serviceObservationContext, Observation observation) { - ctx.log() - .whenAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME) - .thenAccept(requestLog -> observation.event( - HttpServiceObservationDocumentation.Events.WIRE_RECEIVE)); - - ctx.log() - .whenAvailable(RequestLogProperty.RESPONSE_FIRST_BYTES_TRANSFERRED_TIME) - .thenAccept(requestLog -> { - if (requestLog.responseFirstBytesTransferredTimeNanos() != null) { - observation.event(HttpServiceObservationDocumentation.Events.WIRE_SEND); - } - }); - - ctx.log().whenComplete() - .thenAccept(requestLog -> { - serviceObservationContext.setResponse(requestLog); - observation.stop(); - }); + ctx.log().addListener((property, log) -> { + switch (property) { + case REQUEST_FIRST_BYTES_TRANSFERRED_TIME: + observation.event(HttpServiceObservationDocumentation.Events.WIRE_RECEIVE); + break; + case RESPONSE_FIRST_BYTES_TRANSFERRED_TIME: + if (log.responseFirstBytesTransferredTimeNanos() != null) { + observation.event(HttpServiceObservationDocumentation.Events.WIRE_SEND); + } + break; + case ALL_COMPLETE: + serviceObservationContext.setResponse(log); + observation.stop(); + break; + default: + // Do nothing. + break; + } + }); } } diff --git a/core/src/test/java/com/linecorp/armeria/common/logging/DefaultRequestLogTest.java b/core/src/test/java/com/linecorp/armeria/common/logging/DefaultRequestLogTest.java index f1e9f7ba376..f30753723dd 100644 --- a/core/src/test/java/com/linecorp/armeria/common/logging/DefaultRequestLogTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/logging/DefaultRequestLogTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -66,13 +65,6 @@ class DefaultRequestLogTest { - private static final BiFunction headersSanitizer = - (ctx, headers) -> "sanitized_headers"; - private static final BiFunction contentSanitizer = - (ctx, content) -> "sanitized_content"; - private static final BiFunction trailersSanitizer = - (ctx, trailers) -> "sanitized_trailers"; - @Mock private RequestContext ctx; @@ -188,7 +180,6 @@ void rpcFailure_responseContentWithCause() { @ValueSource(booleans = {true, false}) void addChild(boolean isResponseEndingWithFirstChild) { when(ctx.method()).thenReturn(HttpMethod.GET); - when(ctx.eventLoop()).thenReturn(ContextAwareEventLoop.of(ctx, ImmediateEventLoop.INSTANCE)); final DefaultRequestLog firstChild = new DefaultRequestLog(ctx); final DefaultRequestLog lastChild = new DefaultRequestLog(ctx); log.addChild(firstChild); diff --git a/core/src/test/java/com/linecorp/armeria/common/logging/IdempotentRequestLogListenerTest.java b/core/src/test/java/com/linecorp/armeria/common/logging/IdempotentRequestLogListenerTest.java new file mode 100644 index 00000000000..cfc51bda4ac --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/common/logging/IdempotentRequestLogListenerTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.logging; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +class IdempotentRequestLogListenerTest { + + @Test + void shouldNotifyExactlyOnce() { + final List events = new ArrayList<>(); + final RequestLogListener listener = (property, log) -> { + events.add(property); + }; + final IdempotentRequestLogListener idempotentListener = new IdempotentRequestLogListener(listener); + idempotentListener.onEvent(RequestLogProperty.REQUEST_START_TIME, null); + idempotentListener.onEvent(RequestLogProperty.REQUEST_START_TIME, null); + idempotentListener.onEvent(RequestLogProperty.REQUEST_END_TIME, null); + idempotentListener.onEvent(RequestLogProperty.REQUEST_END_TIME, null); + assertThat(events).containsExactly(RequestLogProperty.REQUEST_START_TIME, + RequestLogProperty.REQUEST_END_TIME); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/common/logging/RequestLogListenerTest.java b/core/src/test/java/com/linecorp/armeria/common/logging/RequestLogListenerTest.java new file mode 100644 index 00000000000..571b95e8160 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/common/logging/RequestLogListenerTest.java @@ -0,0 +1,224 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.logging; + +import static com.linecorp.armeria.common.logging.RequestLogProperty.ALL_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_CONTENT; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_CONTENT_PREVIEW; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_END_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_HEADERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_LENGTH; +import static com.linecorp.armeria.common.logging.RequestLogProperty.REQUEST_START_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_COMPLETE; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_CONTENT; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_END_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_HEADERS; +import static com.linecorp.armeria.common.logging.RequestLogProperty.RESPONSE_START_TIME; +import static com.linecorp.armeria.common.logging.RequestLogProperty.SESSION; +import static com.linecorp.armeria.common.logging.RequestLogProperty.allProperties; +import static com.linecorp.armeria.common.logging.RequestLogProperty.requestProperties; +import static com.linecorp.armeria.common.logging.RequestLogProperty.responseProperties; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.server.ServiceRequestContext; + +class RequestLogListenerTest { + + @Test + void listenerNotifiedWhenPropertyBecomesAvailable() { + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final List notifiedProperties = new ArrayList<>(); + + ctx.log().addListener((property, log) -> notifiedProperties.add(property)); + + // The properties are set when the ctx is created. + assertThat(notifiedProperties).containsExactly(REQUEST_START_TIME, SESSION, REQUEST_HEADERS); + + ctx.logBuilder().requestFirstBytesTransferred(); + assertThat(notifiedProperties).containsExactly(REQUEST_START_TIME, SESSION, REQUEST_HEADERS, + REQUEST_FIRST_BYTES_TRANSFERRED_TIME); + // End request + ctx.logBuilder().endRequest(); + assertThat(notifiedProperties).containsAll(requestProperties()); + notifiedProperties.clear(); + + // Start response + ctx.logBuilder().startResponse(); + assertThat(notifiedProperties).containsExactly(RESPONSE_START_TIME); + + // Add response headers + ctx.logBuilder().responseHeaders(ResponseHeaders.of(HttpStatus.OK)); + assertThat(notifiedProperties).containsExactly(RESPONSE_START_TIME, RESPONSE_HEADERS); + + // End response + ctx.logBuilder().endResponse(); + assertThat(notifiedProperties).containsAll(responseProperties()); + assertThat(notifiedProperties).contains(ALL_COMPLETE); + } + + @Test + void listenerNotifiedImmediatelyForAlreadyAvailableProperties() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final RequestLogAccess logAccess = ctx.log(); + + ctx.logBuilder().endRequest(); + + // Now add listener - it should be notified immediately for already available properties + final List notifiedProperties = new ArrayList<>(); + logAccess.addListener((property, log) -> notifiedProperties.add(property)); + + assertThat(notifiedProperties).containsExactlyElementsOf(requestProperties()); + } + + @Test + void multipleListenersNotified() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final RequestLogAccess logAccess = ctx.log(); + + final List listener1Events = new ArrayList<>(); + final List listener2Events = new ArrayList<>(); + + logAccess.addListener((property, log) -> listener1Events.add(property)); + + logAccess.addListener((property, log) -> listener2Events.add(property)); + + ctx.logBuilder().endRequest(); + + // Both listeners should be notified + assertThat(listener1Events).isEqualTo(listener2Events); + assertThat(listener1Events).containsExactlyInAnyOrderElementsOf(requestProperties()); + } + + @Test + void listenerReceivesCorrectLog() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/test")); + final RequestLogAccess logAccess = ctx.log(); + + final List receivedLogs = new ArrayList<>(); + + logAccess.addListener((property, log) -> receivedLogs.add(log)); + + ctx.logBuilder().endRequest(); + + assertThat(receivedLogs.get(0)).isSameAs(ctx.log()); + } + + @Test + void listenerNotifiedForRequestContentProperties() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final RequestLogAccess logAccess = ctx.log(); + final List notifiedProperties = new ArrayList<>(); + + logAccess.addListener((property, log) -> notifiedProperties.add(property)); + + assertThat(notifiedProperties).doesNotContain(REQUEST_CONTENT); + final RpcRequest rpcRequest = RpcRequest.of(String.class, "hello"); + ctx.logBuilder().requestContent(rpcRequest, null); + assertThat(notifiedProperties).contains(REQUEST_CONTENT); + + assertThat(notifiedProperties).doesNotContain(REQUEST_CONTENT_PREVIEW); + ctx.logBuilder().requestContentPreview("preview"); + assertThat(notifiedProperties).contains(REQUEST_CONTENT_PREVIEW); + } + + @Test + void listenerExceptionDoesNotAffectOtherListeners() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final RequestLogAccess logAccess = ctx.log(); + + final List goodListenerEvents = new ArrayList<>(); + + // Add a listener that throws an exception + logAccess.addListener((property, log) -> { + throw new RuntimeException("Test exception from listener"); + }); + + // Add a good listener + logAccess.addListener((property, log) -> goodListenerEvents.add(property)); + + ctx.logBuilder().endRequest(); + ctx.logBuilder().endResponse(); + + // The good listener should still be notified despite the exception in the first listener + assertThat(goodListenerEvents).containsExactlyInAnyOrderElementsOf(allProperties()); + } + + @Test + void listenerNotifiedForDeferredProperties() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final RequestLogAccess logAccess = ctx.log(); + final List notifiedProperties = new ArrayList<>(); + + logAccess.addListener((property, log) -> notifiedProperties.add(property)); + + // Defer content properties + ctx.logBuilder().defer(REQUEST_CONTENT); + ctx.logBuilder().defer(RESPONSE_CONTENT); + + ctx.logBuilder().startRequest(); + ctx.logBuilder().endRequest(); + ctx.logBuilder().startResponse(); + ctx.logBuilder().endResponse(); + + assertThat(notifiedProperties).contains(REQUEST_END_TIME); + assertThat(notifiedProperties).doesNotContain(REQUEST_CONTENT); + assertThat(notifiedProperties).doesNotContain(REQUEST_COMPLETE); + + assertThat(notifiedProperties).contains(RESPONSE_END_TIME); + assertThat(notifiedProperties).doesNotContain(RESPONSE_CONTENT); + assertThat(notifiedProperties).doesNotContain(RESPONSE_COMPLETE); + assertThat(notifiedProperties).doesNotContain(ALL_COMPLETE); + + // Now set the deferred properties + ctx.logBuilder().requestContent(null, null); + assertThat(notifiedProperties).contains(REQUEST_CONTENT, REQUEST_COMPLETE); + assertThat(notifiedProperties).doesNotContain(ALL_COMPLETE); + + ctx.logBuilder().responseContent(null, null); + assertThat(notifiedProperties).contains(RESPONSE_CONTENT, RESPONSE_COMPLETE, ALL_COMPLETE); + } + + @Test + void listenerNotifiedForRequestLength() { + final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final RequestLogAccess logAccess = ctx.log(); + final List notifiedProperties = new ArrayList<>(); + + logAccess.addListener((property, log) -> { + notifiedProperties.add(property); + if (property == REQUEST_LENGTH) { + assertThat(log.requestLength()).isEqualTo(100L); + } + }); + + ctx.logBuilder().requestLength(100L); + assertThat(notifiedProperties).contains(REQUEST_LENGTH); + } +}