Guia de referência do Vert.x
Vert.x is a toolkit for building reactive applications. As described in the Quarkus Reactive Architecture, Quarkus uses Vert.x underneath.
This guide is the companion to the Using Eclipse Vert.x API from a Quarkus Application guide. It provides more advanced details about the usage and the configuration of the Vert.x instance used by Quarkus.
Access the Vert.x instance
To access the managed Vert.x instance, add the quarkus-vertx
extension to your project.
This dependency might already be available in your project (as a transitive dependency).
With this extension, you can retrieve the managed instance of Vert.x using either field or constructor injection:
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
You can inject either the:
-
io.vertx.core.Vertx
instance exposing the bare Vert.x API -
io.vertx.mutiny.core.Vertx
instance exposing the Mutiny API
We recommend using the Mutiny variant as it integrates with the other reactive APIs provided by Quarkus.
Mutiny
If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
Documentation about the Vert.x Mutiny variant is available on https://smallrye.io/smallrye-mutiny-vertx-bindings.
Configure the Vert.x instance
You can configure the Vert.x instance from the application.properties
file.
The following table lists the supported properties:
Propriedade de Configuração Fixa no Momento da Compilação - Todas as outras propriedades de configuração podem ser sobrepostas em tempo de execução.
Configuration property |
Tipo |
Padrão |
---|---|---|
Enables or disables the Vert.x cache. Environment variable: Show more |
boolean |
|
Configure the file cache directory. When not set, the cache is stored in the system temporary directory (read from the Note that this property is ignored if the Environment variable: Show more |
string |
|
Enables or disabled the Vert.x classpath resource resolver. Environment variable: Show more |
boolean |
|
The number of event loops. By default, it matches the number of CPUs detected on the system. Environment variable: Show more |
int |
|
The maximum amount of time the event loop can be blocked. Environment variable: Show more |
|
|
The amount of time before a warning is displayed if the event loop is blocked. Environment variable: Show more |
|
|
The maximum amount of time the worker thread can be blocked. Environment variable: Show more |
|
|
The size of the internal thread pool (used for the file system). Environment variable: Show more |
int |
|
The queue size. For most applications this should be unbounded Environment variable: Show more |
int |
|
The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of Environment variable: Show more |
float |
|
The amount of time a thread will stay alive with no work. Environment variable: Show more |
|
|
Prefill thread pool when creating a new Executor. When Environment variable: Show more |
boolean |
|
Enables the async DNS resolver. Environment variable: Show more |
boolean |
|
PEM Key/cert config is disabled by default. Environment variable: Show more |
boolean |
|
Comma-separated list of the path to the key files (Pem format). Environment variable: Show more |
list of string |
|
Comma-separated list of the path to the certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
boolean |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
boolean |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
PEM Trust config is disabled by default. Environment variable: Show more |
boolean |
|
Comma-separated list of the trust certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
boolean |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
boolean |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
The accept backlog. Environment variable: Show more |
int |
|
The client authentication. Environment variable: Show more |
string |
|
The connect timeout. Environment variable: Show more |
|
|
The idle timeout in milliseconds. Environment variable: Show more |
||
The receive buffer size. Environment variable: Show more |
int |
|
The number of reconnection attempts. Environment variable: Show more |
int |
|
The reconnection interval in milliseconds. Environment variable: Show more |
|
|
Whether to reuse the address. Environment variable: Show more |
boolean |
|
Whether to reuse the port. Environment variable: Show more |
boolean |
|
The send buffer size. Environment variable: Show more |
int |
|
The so linger. Environment variable: Show more |
int |
|
Enables or Disabled SSL. Environment variable: Show more |
boolean |
|
Whether to keep the TCP connection opened (keep-alive). Environment variable: Show more |
boolean |
|
Configure the TCP no delay. Environment variable: Show more |
boolean |
|
Configure the traffic class. Environment variable: Show more |
int |
|
Enables or disables the trust all parameter. Environment variable: Show more |
boolean |
|
The host name. Environment variable: Show more |
string |
|
int |
||
The public host name. Environment variable: Show more |
string |
|
The public port. Environment variable: Show more |
int |
|
Enables or disables the clustering. Environment variable: Show more |
boolean |
|
The ping interval. Environment variable: Show more |
|
|
The ping reply interval. Environment variable: Show more |
|
|
The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever. Environment variable: Show more |
int |
|
The minimum amount of time in seconds that a successfully resolved address will be cached. Environment variable: Show more |
int |
|
The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached. Environment variable: Show more |
int |
|
The maximum number of queries to be sent during a resolution. Environment variable: Show more |
int |
|
The duration after which a DNS query is considered to be failed. Environment variable: Show more |
|
|
Set the path of an alternate hosts configuration file to use instead of the one provided by the os. The default value is Environment variable: Show more |
string |
|
Set the hosts configuration refresh period in millis, The resolver caches the hosts configuration (configured using Environment variable: Show more |
int |
|
Set the list of DNS server addresses, an address is the IP of the dns server, followed by an optional colon and a port, e.g Environment variable: Show more |
list of string |
|
Set to true to enable the automatic inclusion in DNS queries of an optional record that hints the remote DNS server about how much data the resolver can read per response. Environment variable: Show more |
boolean |
|
Set the DNS queries Recursion Desired flag value. Environment variable: Show more |
boolean |
|
Set the lists of DNS search domains. When the search domain list is null, the effective search domain list will be populated using the system DNS search domains. Environment variable: Show more |
list of string |
|
Set the ndots value used when resolving using search domains, the default value is Environment variable: Show more |
int |
|
Set to Environment variable: Show more |
boolean |
|
Set to Environment variable: Show more |
boolean |
|
Enable or disable native transport Environment variable: Show more |
boolean |
|
About the Duration format
To write duration values, use the standard Você também pode usar um formato simplificado, começando com um número:
Em outros casos, o formato simplificado é traduzido para o formato 'java.time.Duration' para análise:
|
See Customize the Vert.x configuration to configure the Vert.x instance using a programmatic approach.
Use Vert.x clients
In addition to Vert.x core, you can use most Vert.x ecosystem libraries. Some Quarkus extension already wraps Vert.x libraries.
Available APIs
The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Check the associated documentation to learn how to use them.
API |
Extension or Dependency |
Documentação |
AMQP Client |
|
|
Circuit Breaker |
|
|
Consul Client |
|
|
DB2 Client |
|
|
Kafka Client |
|
|
Mail Client |
|
|
MQTT Client |
|
No guide yet |
MS SQL Client |
|
|
MySQL Client |
|
|
Oracle Client |
|
|
PostgreSQL Client |
|
|
RabbitMQ Client |
|
|
Cliente Redis |
|
|
Web Client |
|
To learn more about the usage of the Vert.x Mutiny API, refer to https://smallrye.io/smallrye-mutiny-vertx-bindings.
Use the Vert.x Web Client
This section gives an example using the Vert.x WebClient
in the context of a Quarkus REST (formerly RESTEasy Reactive) application.
As indicated in the table above, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
Now, in your code, you can create an instance of WebClient
:
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
This resource creates a WebClient
and, upon request, uses this client to invoke a remote HTTP API.
Depending on the result, the response is forwarded as received, or it creates a JSON object wrapping the error.
The WebClient
is asynchronous (and non-blocking), to the endpoint returns a Uni
.
The application can also run as a native executable.
But, first, we need to instruct Quarkus to enable ssl (if the remote API uses HTTPS).
Open the src/main/resources/application.properties
and add:
quarkus.ssl.native=true
Then, create the native executable with:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Use Vert.x JSON
Vert.x APIs often rely on JSON.
Vert.x provides two convenient classes to manipulate JSON document: io.vertx.core.json.JsonObject
and io.vertx.core.json.JsonArray
.
JsonObject
can be used to map an object into its JSON representation and build an object from a JSON document:
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
Note that these features use the mapper managed by the quarkus-jackson
extension.
Refer to Jackson configuration to customize the mapping.
JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and Quarkus REST). Consider these endpoints:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]
This works equally well when the JSON content is a request body or is wrapped in a Uni
, Multi
, CompletionStage
or Publisher
.
Use Verticles
Verticles is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x. This model does not claim to be a strict actor-model implementation, but it shares similarities, especially concerning concurrency, scaling, and deployment. To use this model, you write and deploy verticles, communicating by sending messages on the event bus.
You can deploy verticles in Quarkus. It supports:
-
bare verticle - Java classes extending
io.vertx.core.AbstractVerticle
-
Mutiny verticle - Java classes extending
io.smallrye.mutiny.vertx.core.AbstractVerticle
Deploy Verticles
To deploy verticles, use the deployVerticle
method:
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
If you use the Mutiny-variant of Vert.x, be aware that the deployVerticle
method returns a Uni
, and you would need to trigger a subscription to make the actual deployment.
An example explaining how to deploy verticles during the initialization of the application will follow. |
Use @ApplicationScoped beans as Verticle
In general, Vert.x verticles are not CDI beans. And so cannot use injection. However, in Quarkus, you can deploy verticles as beans. Note that in this case, CDI (Arc in Quarkus) is responsible for creating the instance.
The following snippet provides an example:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
You don’t have to inject the vertx
instance; instead, leverage the protected field from AbstractVerticle
.
Then, deploy the verticle instances with:
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
If you want to deploy every exposed AbstractVerticle
, you can use:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
Create multiple verticles instances
When using @ApplicationScoped
, you will get a single instance for your verticle.
Having multiple instances of verticles can be helpful to share the load among them.
Each of them will be associated with a different I/O thread (Vert.x event loop).
To deploy multiple instances of your verticle, use the @Dependent
scope instead of @ApplicationScoped
:
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
Then, deploy your verticle as follows:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
The init
method receives an Instance<MyVerticle>
.
Then, you pass a supplier to the deployVerticle
method.
The supplier is just calling the get()
method.
Thanks to the @Dependent
scope, it returns a new instance on every call.
Finally, you pass the desired number of instances to the DeploymentOptions
, such as two in the previous example.
It will call the supplier twice, which will create two instances of your verticle.
Use the Event Bus
Vert.x comes with a built-in event bus that you can use from your Quarkus application. So, your application components (CDI beans, resources…) can interact using asynchronous events, thus promoting loose-coupling.
With the event bus, you send messages to virtual addresses. The event bus offers three types of delivery mechanisms:
-
point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;
-
publish/subscribe - publish a message; all the consumers listening to the address are receiving the message;
-
request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous fashion.
All these delivery mechanisms are non-blocking and are providing one of the fundamental bricks to build reactive applications.
Consume events
While you can use the Vert.x API to register consumers, Quarkus comes with declarative support.
To consume events, use the io.quarkus.vertx.ConsumeEvent
annotation:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | If not set, the address is the fully qualified name of the bean; for instance, in this snippet, it’s org.acme.vertx.GreetingService . |
2 | The method parameter is the message body. If the method returns something, it’s the message response. |
Configure the address
The @ConsumeEvent
annotation can be configured to set the address:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | Receive the messages sent to the greeting address |
The address value can be a property expression.
In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}")
.
Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}")
.
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
1 | Receive the messages sent to the address configured with the my.consumer.address key. |
If no config property with the specified key exists and no default value is set then the application startup fails. |
Process events asynchronously
The previous examples use synchronous processing.
Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni
or a java.util.concurrent.CompletionStage
:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
Blocking processing of events
By default, the code consuming the event must be non-blocking, as it’s called on an I/O thread.
If your processing is blocking, use the @io.smallrye.common.annotation.Blocking
annotation:
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
Alternatively, you can use the blocking
attribute from the @ConsumeEvent
annotation:
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
When using @Blocking
, it ignores the value of the blocking
attribute of @ConsumeEvent
.
Reply to events
The return value of a method annotated with @ConsumeEvent
is used to respond to the incoming message.
For instance, in the following snippet, the returned String
is the response.
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
You can also return a Uni<T>
or a CompletionStage<T>
to handle asynchronous reply:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
You can inject an
|
Implement fire-and-forget interactions
You don’t have to reply to received messages.
Typically, for a fire and forget interaction, the messages are consumed, and the sender does not need to know about it.
To implement this pattern, your consumer method returns void
.
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Consume messages (instead of events)
Unlike the previous example using the payloads directly, you can also use Message
directly:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
Handle failures
If a method annotated with @ConsumeEvent
throws an exception, then:
-
if a reply handler is set, then the failure is propagated back to the sender via an
io.vertx.core.eventbus.ReplyException
with codeConsumeEvent#FAILURE_CODE
and the exception message, -
if no reply handler is set, then the exception is rethrown (and wrapped in a
RuntimeException
if necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler()
.
Send messages
Sending and publishing messages use the Vert.x event bus:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Inject the Event bus |
2 | Send a message to the address greeting . Message payload is name |
The EventBus
object provides methods to:
-
send
a message to a specific address - one single consumer receives the message. -
publish
a message to a specific address - all consumers receive the messages. -
request
a message and expect a reply
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
Process events on virtual threads
Methods annotated with @ConsumeEvent
can also be annotated with @RunOnVirtualThread
.
In this case, the method is invoked on a virtual thread.
Each event is invoked on a different virtual thread.
To use this feature, make sure:
-
Your Java runtime supports virtual threads.
-
Your method uses a blocking signature.
The second point means only methods returning an object or void
can use @RunOnVirtualThread
.
Methods returning a Uni
or a CompletionStage
cannot run on virtual threads.
Read the virtual thread guide for more details.
Use codecs
The https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/message_codecs[codecs] to _serialize and deserialize message objects.
Quarkus provides a default codec for local delivery.
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent
where ConsumeEvent#local() == true
(which is the default).
So that you can exchange the message objects as follows:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
If you want to use a specific codec, you need to set it on both ends explicitly:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | Set the name of the codec to use to send the message |
2 | Set the codec to use to receive the message |
Combine HTTP and the Event Bus
Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.
In your HTTP endpoint class, inject the event bus and uses the request
method to send a message to the event bus and expect a response:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | send the name to the greeting address and request a response |
2 | when we get the response, extract the body and send it to the user |
the HTTP method returns a Uni .
If you are using Quarkus REST, Uni support is built-in.
If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.
|
We need a consumer listening on the greeting
address.
This consumer can be in the same class or another bean such as:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
This bean receives the name and returns the greeting message.
With this in place, every HTTP request on /bus/quarkus
sends a message to the event bus, waits for a reply, and when this one arrives, writes the HTTP response:
Hello Quarkus
To better understand, let’s detail how the HTTP request/response has been handled:
-
The request is received by the
greeting
method -
a message containing the name is sent to the event bus
-
Another bean receives this message and computes the response
-
This response is sent back using the reply mechanism
-
Once the reply is received by the sender, the content is written to the HTTP response
Bidirectional communication with browsers by using SockJS
The SockJS bridge provided by Vert.x allows browser applications and Quarkus applications to communicate using the event bus. It connects both sides. So, both sides can send messages received on the other side. It supports the three delivery mechanisms.
SockJS negotiates the communication channel between the Quarkus application and the browser. If WebSockets are supported, it uses them; otherwise, it degrades to SSE, long polling, etc.
So use SockJS, you need to configure the bridge, especially the addresses that will be used to communicate:
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
This code configures the SockJS bridge to send all the messages targeting the ticks
address to the connected browsers.
More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.
The browser must use the vertx-eventbus
JavaScript library to consume the message:
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
Use native transports
Native transports are not supported in native executables. |
To use io_uring , refer to the Use io_uring section.
|
Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms. To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
You will also have to explicitly configure Vert.x to use the native transport.
In application.properties
add:
quarkus.vertx.prefer-native-transport=true
Or in application.yml
:
quarkus:
vertx:
prefer-native-transport: true
If all is well quarkus will log:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
Use a Vert.x context-aware scheduler
Some Mutiny operators need to schedule work on an executor thread pool.
A good example is .onItem().delayIt().by(Duration.ofMillis(10)
as it needs such an executor to delay emissions.
The default executor is returned by io.smallrye.mutiny.infrastructure.Infrastructure
and it is already configured and managed by Quarkus.
That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.
The io.smallrye.mutiny.vertx.core.ContextAwareScheduler
interface offers an API to obtain context-aware schedulers.
Such a scheduler is configured with:
-
a delegate
ScheduledExecutorService
of your choice (hint: you can reuseInfrastructure.getDefaultWorkerPool()
), and -
a context fetching strategy among:
-
an explicit
Context
, or -
calling
Vertx::getOrCreateContext()
either on the current thread or later when the scheduling request happens, or -
calling
Vertx::currentContext()
, which fails if the current thread is not a Vert.x thread.
-
Here is a sample where ContextAwareScheduler
is used:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart()
.
The delayIt
operator uses that scheduler, and we can check that the context that we get in invoke
is a Vert.x duplicated context where the data for key "foo"
has been propagated.
Use a Unix domain socket
Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.
This will only work on platforms that support Use native transports. |
Enable the appropriate Use native transports and set the following environment property:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
By itself this will not disable the tcp socket which by default will open on
0.0.0.0:8080
. It can be explicitly disabled:
quarkus.http.host-enabled=false
These properties can be set through Java’s -D
command line parameter or
on application.properties
.
Do not forget to add the native transport dependency. See Use native transports for details. |
Make sure your application has the right permissions to write to the socket. |
Use io_uring
io_uring is not supported in native executables.
|
io_uring support is experimental
|
io_uring
is a Linux kernel interface that allows you to send and receive data asynchronously.
It provides unified semantics for both file and network I/O.
It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.
To learn more about io_uring
, we recommend the following links:
-
Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.
-
The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.
-
What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
To use io_uring
, you need to add two dependencies to your project and enable native transport.
First add the following dependencies to your project:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
Then, in the application.properties
, add:
quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?
To check if you can use
If it prints something like above, you can use |
Troubleshooting
|
Domain sockets are not yet supported with io_uring. |
The Vert.x asynchronous file system API does not use io_uring yet. |
Deploy on read-only environments
In environments with read only file systems you may receive errors of the form:
java.lang.IllegalStateException: Failed to create cache dir
Assuming /tmp/
is writable this can be fixed by setting the vertx.cacheDirBase
property to point to a directory in /tmp/
for instance in Kubernetes by creating an environment variable JAVA_OPTS
with the value -Dvertx.cacheDirBase=/tmp/vertx
, or setting the quarkus.vertx.cache-directory
property in application.properties
:
quarkus.vertx.cache-directory=/tmp/vertx
Customize the Vert.x configuration
The configuration of the managed Vert.x instance can be provided using the application.properties
file, but also using special beans.
CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer
interface can be used to customize the Vert.x configuration.
For example, the following customizer change the tmp
base directory:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
The customizer beans received the VertxOptions
(coming from the application configuration), and can modify them.