The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Quarkus Virtual Thread support with Reactive Messaging

This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus.

This guide focuses on using virtual threads with Reactive Messaging extensions. Please refer to Writing simpler reactive REST services with Quarkus Virtual Thread support to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services.

By default, Reactive Messaging invokes message processing methods on an event-loop thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations. For this, you can use the @Blocking annotation indicating that the processing is blocking and should be run on a worker thread. You can read more on the blocking processing in SmallRye Reactive Messaging documentation.

The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads, instead of running it on an event-loop thread or a worker thread.

To enable virtual thread support on a message consumer method, simply add the @RunOnVirtualThread annotation to the method. If the JDK is compatible (Java 19 or later versions, we recommend 21+) then each incoming message will be offloaded to a new virtual thread. It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted.

Example using the Reactive Messaging Kafka extension

Let’s see an example of how to process Kafka records on virtual threads. First, make sure to have a reactive messaging extension dependency to your build file:

pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

Você também precisa se certificar de que está usando o Java 19 ou posterior (recomendamos o 21+), o que pode ser garantido no arquivo pom.xml com o seguinte:

pom.xml
<properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
</properties>

Execute a aplicação com:

java -jar target/quarkus-app/quarkus-run.jar

ou para utilizar o modo Quarkus Dev, insira o seguinte na configuração quarkus-maven-plugin:

pom.xml
<maven.compiler.release>21</maven.compiler.release>

Then you can start using the annotation @RunOnVirtualThread on your consumer methods also annotated with @Incoming. In the following example we’ll use the REST Client to make a blocking call to a REST endpoint:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.smallrye.common.annotation.RunOnVirtualThread;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @RestClient (2)
    PriceAlertService alertService;


    @Incoming("prices")
    @RunOnVirtualThread (1)
    public void consume(double price) {
        if (price > 90.0) {
            alertService.alert(price); (3)
        }
    }

    @Outgoing("prices-out") (4)
    public Multi<Double> randomPriceGenerator() {
        return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
            e.emit(r.nextDouble(100));
            return r;
        });
    }


}
1 @RunOnVirtualThread annotation on the @Incoming method ensures that the method will be called on a virtual thread.
2 the REST client stub is injected with the @RestClient annotation.
3 alert method blocks the virtual thread until the REST call returns.
4 This @Outgoing method generates random prices and writes them a Kafka topic to be consumed back by the application.

Note that by default Reactive Messaging message processing happens sequentially, preserving the order of messages. In the same way, @Blocking(ordered = false) annotation changes this behaviour, using @RunOnVirtualThread enforces concurrent message processing without preserving the order.

Use the @RunOnVirtualThread annotation

Methods signatures eligible to @RunOnVirtualThread

Only method can be annotated with @Blocking can use @RunOnVirtualThreads. The eligible method signatures are:

  • @Outgoing("channel-out") O generator()

  • @Outgoing("channel-out") Message<O> generator()

  • @Incoming("channel-in") @Outgoing("channel-out") O process(I in)

  • @Incoming("channel-in") @Outgoing("channel-out") Message<O> process(I in)

  • @Incoming("channel-in") void consume(I in)

  • @Incoming("channel-in") Uni<Void> consume(I in)

  • @Incoming("channel-in") Uni<Void> consume(Message<I> msg)

  • @Incoming("channel-in") CompletionStage<Void> consume(I in)

  • @Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)

Use of @RunOnVirtualThread annotation on methods and classes

You can use the @RunOnVirtualThread annotation:

  1. directly on a reactive messaging method - this method will be considered blocking and executed on a virtual thread

  2. on the class containing reactive messaging methods - the methods from this class annotation with @Blocking will be executed on virtual thread, except if the annotation defines a pool name configured to use regular worker threads

For example, you can use @RunOnVirtualThread directly on the method:

@ApplicationScoped
public class MyBean {

    @Incoming("in")
    @Outgoing("out")
    @RunOnVirtualThread
    public String process(String s) {
        // Called on a new virtual thread for every incoming message
    }
}

Alternatively, you can use @RunOnVirtualThread on the class itself:

@ApplicationScoped
@RunOnVirtualThread
public class MyBean {

    @Incoming("in1")
    @Outgoing("out1")
    public String process(String s) {
        // Called on the event loop - no @Blocking annotation
    }

    @Incoming("in2")
    @Outgoing("out2")
    @Blocking
    public String process(String s) {
        // Call on a new virtual thread for every incoming message
    }

    @Incoming("in3")
    @Outgoing("out3")
    @Blocking("my-worker-pool")
    public String process(String s) {
        // Called on a regular worker thread from the pool named "my-worker-pool"
    }
}

Control the maximum concurrency

In order to leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with @RunOnVirtualThread is 1024. As opposed to platform threads, virtual threads are not pooled and created per message. Therefore the maximum concurrency applies separately to all @RunOnVirtualThread methods.

There are two ways to customize the concurrency level:

  1. The @RunOnVirtualThread annotation can be used together with the @Blocking annotation to specify a worker name.

    @Incoming("prices")
    @RunOnVirtualThread
    @Blocking("my-worker")
    public void consume(double price) {
        //...
    }

    Then, for example, to set the maximum concurrency of this method down to 30, set using the config property smallrye.messaging.worker.my-worker.max-concurrency=30.

  2. For every @RunOnVirtualThread method that is not configured with a worker name, you can use the config property smallrye.messaging.worker.<virtual-thread>.max-concurrency.

Conteúdo Relacionado