Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public HttpServer build() {
if (sslOptions != null) {
sslOptions = sslOptions.copy();
}
server = new CleanableHttpServer(vertx, new TcpHttpServer(vertx, new HttpServerConfig(config), sslOptions, sslEngineOptions, null, registerWebSocketWriteHandlers));
server = new CleanableHttpServer(vertx, new TcpHttpServer(vertx, new HttpServerConfig(config), sslOptions, sslEngineOptions, true, registerWebSocketWriteHandlers));
}
} else if (useQuic) {
server = new CleanableHttpServer(vertx, new QuicHttpServer(vertx, new HttpServerConfig(config), sslOptions.copy(), null));
server = new CleanableHttpServer(vertx, new QuicHttpServer(vertx, new HttpServerConfig(config), sslOptions.copy(), true));
} else {
throw new IllegalArgumentException("You must set at least one supported HTTP version");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ public HybridHttpServer(VertxInternal vertx, HttpServerConfig config, ServerSSLO
this.engineOptions = engineOptions;
}

public HttpServerInternal tcpServer(HttpServerMetrics<?, ?> httpMetrics) {
public HttpServerInternal tcpServer() {
if (tcpServer == null) {
TcpHttpServer server = new TcpHttpServer(vertx, new HttpServerConfig(config), sslOptions.copy(), engineOptions, httpMetrics, false);
TcpHttpServer server = new TcpHttpServer(vertx, new HttpServerConfig(config), sslOptions.copy(), engineOptions, false, false);
setHandlers(server);
tcpServer = server;
}
return tcpServer;
}

public HttpServerInternal quicServer(HttpServerMetrics<?, ?> httpMetrics) {
public HttpServerInternal quicServer() {
if (quicServer == null) {
QuicHttpServer server = new QuicHttpServer(vertx, new HttpServerConfig(config), sslOptions.copy(), httpMetrics);
QuicHttpServer server = new QuicHttpServer(vertx, new HttpServerConfig(config), sslOptions.copy(), false);
setHandlers(server);
quicServer = server;
}
Expand Down Expand Up @@ -158,11 +158,8 @@ public Future<HttpServer> listen(SocketAddress address) {

@Override
public Future<HttpServer> listen(ContextInternal context) {
SocketAddress tcpLocalAddress = SocketAddress.inetSocketAddress(config.getTcpPort(), config.getTcpHost());
SocketAddress udpLocalAddress = SocketAddress.inetSocketAddress(config.getQuicPort(), config.getQuicHost());
httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, tcpLocalAddress, udpLocalAddress) : null;
return listen(tcpServer(httpMetrics).listen(context),
quicServer(httpMetrics).listen(context));
return listen(tcpServer().listen(context),
quicServer().listen(context)).onSuccess(this::createMetrics);
}

@Override
Expand All @@ -173,8 +170,17 @@ public Future<HttpServer> listen(ContextInternal context, SocketAddress address)
HttpServerConfig config = new HttpServerConfig(this.config);
config.setHost(address.host());
config.setPort(address.port());
httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, address, address) : null;
return listen(tcpServer(httpMetrics).listen(context, address), quicServer(httpMetrics).listen(context, address));
return listen(tcpServer().listen(context, address), quicServer().listen(context, address)).onSuccess(this::createMetrics);
}

private void createMetrics(HttpServer ignored) {
SocketAddress tcpLocalAddress = SocketAddress.inetSocketAddress(tcpServer.actualPort(), config.getTcpHost());
SocketAddress udpLocalAddress = SocketAddress.inetSocketAddress(quicServer.actualPort(), config.getQuicHost());

httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, tcpLocalAddress, udpLocalAddress) : null;

tcpServer.setMetrics(httpMetrics);
quicServer.setMetrics(httpMetrics);
}

private Future<HttpServer> listen(Future<HttpServer> f1, Future<HttpServer> f2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ public class QuicHttpServer implements HttpServerInternal {
private Handler<Throwable> exceptionHandler;
private QuicServerImpl quicServer;
private HttpServerMetrics<?, ?> httpMetrics;
private ConnectionHandler internalConnectionHandler;
private volatile int actualPort;

public QuicHttpServer(VertxInternal vertx, HttpServerConfig config, ServerSSLOptions sslOptions, HttpServerMetrics<?, ?> httpMetrics) {
public QuicHttpServer(VertxInternal vertx, HttpServerConfig config, ServerSSLOptions sslOptions, boolean manageMetrics) {

// We own the copy
sslOptions.setApplicationLayerProtocols(Arrays.asList(Http3.supportedApplicationProtocols()));
Expand All @@ -61,8 +62,7 @@ public QuicHttpServer(VertxInternal vertx, HttpServerConfig config, ServerSSLOpt
this.http3Config = config.getHttp3Config() != null ? config.getHttp3Config() : new Http3ServerConfig();
this.quicConfig = config.getQuicConfig() != null ? config.getQuicConfig() : new QuicServerConfig();
this.actualPort = 0;
this.httpMetrics = httpMetrics;
this.manageMetrics = httpMetrics == null;
this.manageMetrics = manageMetrics;
}

@Override
Expand Down Expand Up @@ -139,7 +139,6 @@ public Future<Boolean> updateTrafficShapingOptions(TrafficShapingOptions options
private static class ConnectionHandler implements Handler<QuicConnection> {

private final QuicServer transport;
private final HttpServerMetrics<?, ?> httpMetrics;
private final Handler<HttpServerRequest> requestHandler;
private final Handler<HttpConnection> connectionHandler;
private final boolean handle100ContinueAutomatically;
Expand All @@ -148,9 +147,9 @@ private static class ConnectionHandler implements Handler<QuicConnection> {
private final int maxFormBufferedSize;
private final Http3Settings localSettings;
private final boolean logEnabled;
private HttpServerMetrics<?, ?> httpMetrics;

public ConnectionHandler(QuicServer transport,
HttpServerMetrics<?, ?> httpMetrics,
Handler<HttpServerRequest> requestHandler,
Handler<HttpConnection> connectionHandler,
boolean handle100ContinueAutomatically,
Expand All @@ -160,7 +159,6 @@ public ConnectionHandler(QuicServer transport,
Http3Settings localSettings,
boolean logEnabled) {
this.transport = transport;
this.httpMetrics = httpMetrics;
this.requestHandler = requestHandler;
this.connectionHandler = connectionHandler;
this.handle100ContinueAutomatically = handle100ContinueAutomatically;
Expand Down Expand Up @@ -198,6 +196,10 @@ public void handle(QuicConnection connection) {
ctx.dispatch(http3Connection, handler);
}
}

void setMetrics(HttpServerMetrics<?, ?> httpMetrics) {
this.httpMetrics = httpMetrics;
}
}

@Override
Expand Down Expand Up @@ -228,9 +230,6 @@ public Future<HttpServer> listen(ContextInternal current, SocketAddress address)
requestHandler = this.requestHandler;
connectionHandler = this.connectionHandler;
quicServer = new QuicServerImpl(vertx, quicConfig, "http", sslOptions);
if (manageMetrics) {
httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, null, address) : null;
}
}

if (requestHandler == null) {
Expand All @@ -240,14 +239,23 @@ public Future<HttpServer> listen(ContextInternal current, SocketAddress address)
boolean logEnabled = quicConfig.getLogConfig() != null && quicConfig.getLogConfig().isEnabled();
quicConfig.setLogConfig(null);

quicServer.connectHandler(new ConnectionHandler(quicServer, httpMetrics, requestHandler, connectionHandler,
internalConnectionHandler = new ConnectionHandler(quicServer, requestHandler, connectionHandler,
config.isHandle100ContinueAutomatically(), config.getMaxFormAttributeSize(), config.getMaxFormFields(), config.getMaxFormBufferedBytes(),
http3Config.getInitialSettings() != null ? http3Config.getInitialSettings().copy() : new Http3Settings(), logEnabled));
http3Config.getInitialSettings() != null ? http3Config.getInitialSettings().copy() : new Http3Settings(), logEnabled);

quicServer.connectHandler(internalConnectionHandler);
quicServer.exceptionHandler(exceptionHandler);
return quicServer
.bind(current, address)
.map(addr -> {
actualPort = addr.port();

if (manageMetrics) {
var actualAddress = SocketAddress.inetSocketAddress(actualPort, address.host());
httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, null, actualAddress) : null;
internalConnectionHandler.setMetrics(httpMetrics);
}

return this;
});
}
Expand Down Expand Up @@ -281,6 +289,13 @@ public int actualPort() {
return actualPort;
}

@Override
public void setMetrics(HttpServerMetrics<?, ?> httpMetrics) {
this.httpMetrics = httpMetrics;

internalConnectionHandler.setMetrics(httpMetrics);
}

@Override
public Metrics getMetrics() {
return httpMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ public class TcpHttpServer implements HttpServerInternal {
private HttpServerMetrics<?, ?> httpMetrics;

public TcpHttpServer(VertxInternal vertx, HttpServerConfig config, ServerSSLOptions sslOptions,
SSLEngineOptions engineOptions, HttpServerMetrics<?, ?> httpMetrics, boolean registerWebSocketWriteHandlers) {
SSLEngineOptions engineOptions, boolean manageMetrics, boolean registerWebSocketWriteHandlers) {
this.vertx = vertx;
this.config = config;
this.ssl = sslOptions != null;
this.sslOptions = sslOptions;
this.engineOptions = engineOptions;
this.registerWebSocketWriteHandlers = registerWebSocketWriteHandlers;
this.httpMetrics = httpMetrics;
this.manageMetrics = httpMetrics == null;
this.manageMetrics = manageMetrics;
}

@Override
Expand Down Expand Up @@ -272,13 +271,16 @@ public synchronized Future<HttpServer> listen(ContextInternal context, SocketAdd
initializer.configurePipeline(soi.channel(), null, null, ((NetSocketImpl) so).metrics());
});
tcpServer = server;
if (manageMetrics) {
httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, address, null) : null;
}
closeSequence = new CloseSequence(p -> doClose(server, p), p -> doShutdown(server, p ));

closeSequence = new CloseSequence(p -> doClose(server, p), p -> doShutdown(server, p));
Promise<HttpServer> result = context.promise();
tcpServer.listen(listenContext, address).onComplete(ar -> {
if (ar.succeeded()) {
if (manageMetrics) {
var actualAddress = SocketAddress.inetSocketAddress(tcpServer.actualPort(), address.host());
httpMetrics = vertx.metrics() != null ? vertx.metrics().createHttpServerMetrics(config, actualAddress, null) : null;
}

result.complete(this);
} else {
result.fail(ar.cause());
Expand Down Expand Up @@ -335,6 +337,11 @@ private Completable<Void> foo(Completable<Void> completable) {
return completable;
}

@Override
public void setMetrics(HttpServerMetrics<?, ?> httpMetrics) {
this.httpMetrics = httpMetrics;
}

@Override
public Future<Void> shutdown(Duration timeout) {
CloseSequence seq;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.vertx.core.http.HttpServer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;

/**
Expand All @@ -31,4 +32,6 @@ default HttpServerInternal unwrap() {
return this;
}

default void setMetrics(HttpServerMetrics<?, ?> httpMetrics) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -531,11 +531,13 @@ private void bind(
}
});
}
var actualLocalAddress = localAddress;
// Update port to actual port when it is not a domain socket as wildcard port 0 might have been used
if (bindAddress.isInetSocket()) {
actualPort = ((InetSocketAddress)ch.localAddress()).getPort();
actualLocalAddress = SocketAddress.inetSocketAddress(actualPort, localAddress.host());
}
metrics = createMetrics(vertx.metrics(), localAddress);
metrics = createMetrics(vertx.metrics(), actualLocalAddress);
promise.complete(ch);
} else {
promise.fail(res.cause());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.vertx.tests.metrics;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakemetrics.FakeHttpServerMetrics;
import io.vertx.test.fakemetrics.FakeMetricsBase;
import io.vertx.test.fakemetrics.FakeMetricsFactory;
import io.vertx.test.http.HttpTestBase;
import org.junit.Test;

public class HttpMetricsPortTest extends HttpTestBase {

@Override
protected HttpServerOptions createBaseServerOptions() {
return new HttpServerOptions().setPort(0).setHost(DEFAULT_HTTP_HOST);
}

@Override
protected VertxOptions getOptions() {
VertxOptions options = super.getOptions();
options.setMetricsOptions(new MetricsOptions().setEnabled(true));
return options;
}

@Override
protected Vertx createVertx(VertxOptions options) {
return Vertx.builder().with(options)
.withMetrics(new FakeMetricsFactory())
.build();
}

@Test
public void actualPortInMetricsWhenDynamicPortIsUsed() throws Exception {
server.requestHandler(req -> {
FakeHttpServerMetrics metrics = FakeMetricsBase.httpMetricsOf(server);

assertEquals(server.actualPort(), metrics.tcpLocalAddress().port());

req.response().end();
testComplete();
});

startServer();

var requestOptions = new RequestOptions()
.setPort(server.actualPort());

client.request(new RequestOptions(requestOptions).setURI(TestUtils.randomAlphaString(16))).onComplete(onSuccess(HttpClientRequest::send));
await();
}

}
Loading