Skip to content

Commit

Permalink
Merge pull request #42409 from mkouba/issue-42366
Browse files Browse the repository at this point in the history
Vertx HTTP: execute custom logic when HTTP server is started
  • Loading branch information
geoand committed Aug 9, 2024
2 parents 48ecb86 + 8e2e6e1 commit 2c6d750
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 11 deletions.
21 changes: 21 additions & 0 deletions docs/src/main/asciidoc/http-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,27 @@ public class MyCustomizer implements HttpServerOptionsCustomizer {
<1> By making the class a managed bean, Quarkus will take the customizer into account when it starts the Vert.x servers
<2> In this case, we only care about customizing the HTTP server, so we just override the `customizeHttpServer` method, but users should be aware that `HttpServerOptionsCustomizer` allows configuring the HTTPS and Domain Socket servers as well


== How to execute logic when HTTP server started

In order to execute some custom action when the HTTP server is started you'll need to declare an _asynchronous_ CDI observer method.
Quarkus _asynchronously_ fires CDI events of types `io.quarkus.vertx.http.HttpServerStart`, `io.quarkus.vertx.http.HttpsServerStart` and `io.quarkus.vertx.http.DomainSocketServerStart` when the corresponding HTTP server starts listening on the configured host and port.

.`HttpServerStart` example
[source,java]
----
@ApplicationScoped
public class MyListener {
void httpStarted(@ObservesAsync HttpServerStart start) { <1>
// ...notified when the HTTP server starts listening
}
}
----
<1> An asynchronous `HttpServerStart` observer method may be declared by annotating an `HttpServerStart` parameter with `@jakarta.enterprise.event.ObservesAsync`.

NOTE: It's not possible to use the `StartupEvent` for this particular use case because this CDI event is fired before the HTTP server is started.

[[reverse-proxy]]
== Running behind a reverse proxy

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.quarkus.vertx.http.start;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.event.ObservesAsync;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.http.HttpServerStart;
import io.quarkus.vertx.http.HttpsServerStart;
import io.smallrye.certs.Format;
import io.smallrye.certs.junit5.Certificate;
import io.smallrye.certs.junit5.Certificates;

@Certificates(baseDir = "target/certs", certificates = @Certificate(name = "ssl-test", password = "secret", formats = {
Format.JKS, Format.PKCS12, Format.PEM }))
public class HttpServerStartEventsTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot(root -> root.addClasses(MyListener.class)
.addAsResource(new File("target/certs/ssl-test-keystore.jks"), "server-keystore.jks"))
.overrideConfigKey("quarkus.http.ssl.certificate.key-store-file", "server-keystore.jks")
.overrideConfigKey("quarkus.http.ssl.certificate.key-store-password", "secret");

@Test
public void test() throws InterruptedException {
assertTrue(MyListener.HTTP.await(5, TimeUnit.SECONDS));
assertTrue(MyListener.HTTPS.await(5, TimeUnit.SECONDS));
// httpsStarted() is static
assertEquals(1, MyListener.COUNTER.get());
}

@Dependent
public static class MyListener {

static final AtomicInteger COUNTER = new AtomicInteger();
static final CountDownLatch HTTP = new CountDownLatch(1);
static final CountDownLatch HTTPS = new CountDownLatch(1);

void httpStarted(@ObservesAsync HttpServerStart start) {
assertNotNull(start.options());
HTTP.countDown();
}

static void httpsStarted(@ObservesAsync HttpsServerStart start) {
assertNotNull(start.options());
HTTPS.countDown();
}

@PreDestroy
void destroy() {
COUNTER.incrementAndGet();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.vertx.http;

import io.vertx.core.http.HttpServerOptions;

/**
* Quarkus fires a CDI event of this type asynchronously when the domain socket server starts listening
* on the configured host and port.
*
* <pre>
* &#064;ApplicationScoped
* public class MyListener {
*
* void domainSocketStarted(&#064;ObservesAsync DomainSocketServerStart start) {
* // ...notified when the domain socket server starts listening
* }
* }
* </pre>
*/
public record DomainSocketServerStart(HttpServerOptions options) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.vertx.http;

import io.vertx.core.http.HttpServerOptions;

/**
* Quarkus fires a CDI event of this type asynchronously when the HTTP server starts listening
* on the configured host and port.
*
* <pre>
* &#064;ApplicationScoped
* public class MyListener {
*
* void httpStarted(&#064;ObservesAsync HttpServerStart start) {
* // ...notified when the HTTP server starts listening
* }
* }
* </pre>
*/
public record HttpServerStart(HttpServerOptions options) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.vertx.http;

import io.vertx.core.http.HttpServerOptions;

/**
* Quarkus fires a CDI event of this type asynchronously when the HTTPS server starts listening
* on the configured host and port.
*
* <pre>
* &#064;ApplicationScoped
* public class MyListener {
*
* void httpsStarted(&#064;ObservesAsync HttpsServerStart start) {
* // ...notified when the HTTPS server starts listening
* }
* }
* </pre>
*/
public record HttpsServerStart(HttpServerOptions options) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -46,6 +47,7 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.bootstrap.runner.Timing;
Expand All @@ -71,7 +73,10 @@
import io.quarkus.tls.runtime.config.TlsConfig;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.quarkus.vertx.http.DomainSocketServerStart;
import io.quarkus.vertx.http.HttpServerOptionsCustomizer;
import io.quarkus.vertx.http.HttpServerStart;
import io.quarkus.vertx.http.HttpsServerStart;
import io.quarkus.vertx.http.ManagementInterface;
import io.quarkus.vertx.http.runtime.HttpConfiguration.InsecureRequests;
import io.quarkus.vertx.http.runtime.devmode.RemoteSyncHandler;
Expand Down Expand Up @@ -744,8 +749,9 @@ private static CompletableFuture<String> initializeMainHttpServer(Vertx vertx, H
launchMode, websocketSubProtocols, registry);

// Customize
if (Arc.container() != null) {
List<InstanceHandle<HttpServerOptionsCustomizer>> instances = Arc.container()
ArcContainer container = Arc.container();
if (container != null) {
List<InstanceHandle<HttpServerOptionsCustomizer>> instances = container
.listAll(HttpServerOptionsCustomizer.class);
for (InstanceHandle<HttpServerOptionsCustomizer> instance : instances) {
HttpServerOptionsCustomizer customizer = instance.get();
Expand Down Expand Up @@ -784,12 +790,17 @@ private static CompletableFuture<String> initializeMainHttpServer(Vertx vertx, H
CompletableFuture<String> futureResult = new CompletableFuture<>();

AtomicInteger connectionCount = new AtomicInteger();

// Note that a new HttpServer is created for each IO thread but we only want to fire the events (HttpServerStart etc.) once,
// for the first server that started listening
// See https://vertx.io/docs/vertx-core/java/#_server_sharing for more information
AtomicBoolean startEventsFired = new AtomicBoolean();

vertx.deployVerticle(new Supplier<Verticle>() {
@Override
public Verticle get() {
return new WebDeploymentVerticle(httpMainServerOptions, httpMainSslServerOptions, httpMainDomainSocketOptions,
launchMode,
insecureRequestStrategy, httpConfiguration, connectionCount, registry);
launchMode, insecureRequestStrategy, httpConfiguration, connectionCount, registry, startEventsFired);
}
}, new DeploymentOptions().setInstances(ioThreads), new Handler<AsyncResult<String>>() {
@Override
Expand Down Expand Up @@ -1129,11 +1140,12 @@ private static class WebDeploymentVerticle extends AbstractVerticle implements R
private final HttpConfiguration quarkusConfig;
private final AtomicInteger connectionCount;
private final List<Long> reloadingTasks = new CopyOnWriteArrayList<>();
private final AtomicBoolean startEventsFired;

public WebDeploymentVerticle(HttpServerOptions httpOptions, HttpServerOptions httpsOptions,
HttpServerOptions domainSocketOptions, LaunchMode launchMode,
InsecureRequests insecureRequests, HttpConfiguration quarkusConfig, AtomicInteger connectionCount,
TlsConfigurationRegistry registry) {
TlsConfigurationRegistry registry, AtomicBoolean startEventsFired) {
this.httpOptions = httpOptions;
this.httpsOptions = httpsOptions;
this.launchMode = launchMode;
Expand All @@ -1142,6 +1154,7 @@ public WebDeploymentVerticle(HttpServerOptions httpOptions, HttpServerOptions ht
this.quarkusConfig = quarkusConfig;
this.connectionCount = connectionCount;
this.registry = registry;
this.startEventsFired = startEventsFired;
org.crac.Core.getGlobalContext().register(this);
}

Expand All @@ -1166,6 +1179,9 @@ public void start(Promise<Void> startFuture) {
.fail(new IllegalArgumentException("Must configure at least one of http, https or unix domain socket"));
}

ArcContainer container = Arc.container();
boolean notifyStartObservers = container != null ? startEventsFired.compareAndSet(false, true) : false;

if (httpServerEnabled) {
httpServer = vertx.createHttpServer(httpOptions);
if (insecureRequests == HttpConfiguration.InsecureRequests.ENABLED) {
Expand Down Expand Up @@ -1196,27 +1212,34 @@ public void handle(HttpServerRequest req) {
}
});
}
setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount, connectionCount);
setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount, connectionCount,
container, notifyStartObservers);
}

if (domainSocketOptions != null) {
domainSocketServer = vertx.createHttpServer(domainSocketOptions);
domainSocketServer.requestHandler(ACTUAL_ROOT);
setupUnixDomainSocketHttpServer(domainSocketServer, domainSocketOptions, startFuture, remainingCount);
setupUnixDomainSocketHttpServer(domainSocketServer, domainSocketOptions, startFuture, remainingCount,
container, notifyStartObservers);
}

if (httpsOptions != null) {
httpsServer = vertx.createHttpServer(httpsOptions);
httpsServer.requestHandler(ACTUAL_ROOT);
setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount, connectionCount);
setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount, connectionCount,
container, notifyStartObservers);
}
}

private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOptions options,
Promise<Void> startFuture,
AtomicInteger remainingCount) {
AtomicInteger remainingCount, ArcContainer container, boolean notifyStartObservers) {
httpServer.listen(SocketAddress.domainSocketAddress(options.getHost()), event -> {
if (event.succeeded()) {
if (notifyStartObservers) {
container.beanManager().getEvent().select(DomainSocketServerStart.class)
.fireAsync(new DomainSocketServerStart(options));
}
if (remainingCount.decrementAndGet() == 0) {
startFuture.complete(null);
}
Expand All @@ -1240,7 +1263,8 @@ private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOp
}

private void setupTcpHttpServer(HttpServer httpServer, HttpServerOptions options, boolean https,
Promise<Void> startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount) {
Promise<Void> startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount,
ArcContainer container, boolean notifyStartObservers) {

if (quarkusConfig.limits.maxConnections.isPresent() && quarkusConfig.limits.maxConnections.getAsInt() > 0) {
var tracker = vertx.isMetricsEnabled()
Expand Down Expand Up @@ -1315,11 +1339,20 @@ public void handle(AsyncResult<HttpServer> event) {
}

if (https) {
CDI.current().select(HttpCertificateUpdateEventListener.class).get()
container.instance(HttpCertificateUpdateEventListener.class).get()
.register(event.result(), quarkusConfig.tlsConfigurationName.orElse(TlsConfig.DEFAULT_NAME),
"http server");
}

if (notifyStartObservers) {
Event<Object> startEvent = container.beanManager().getEvent();
if (https) {
startEvent.select(HttpsServerStart.class).fireAsync(new HttpsServerStart(options));
} else {
startEvent.select(HttpServerStart.class).fireAsync(new HttpServerStart(options));
}
}

if (remainingCount.decrementAndGet() == 0) {
//make sure we only complete once
startFuture.complete(null);
Expand Down

0 comments on commit 2c6d750

Please sign in to comment.