The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Utilização do Apache Kafka com o Schema Registry e o Avro

This guide shows how your Quarkus application can use Apache Kafka, Avro serialized records, and connect to a schema registry (such as the Confluent Schema Registry or Apicurio Registry).

If you are not familiar with Kafka and Kafka in Quarkus in particular, consider first going through the Using Apache Kafka with Reactive Messaging guide.

Pré-requisitos

Para concluir este guia, você precisa:

  • Mais ou menos 30 minutes

  • Um IDE

  • JDK 17+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.9

  • Docker e Docker Compose ou Podman e Docker Compose

  • Opcionalmente, o Quarkus CLI se você quiser usá-lo

  • Opcionalmente, Mandrel ou GraalVM instalado e configurado apropriadamente se você quiser criar um executável nativo (ou Docker se você usar uma compilação de contêiner nativo)

Arquitetura

In this guide we are going to implement a REST resource, namely MovieResource, that will consume movie DTOs and put them in a Kafka topic.

Then, we will implement a consumer that will consume and collect messages from the same topic. The collected messages will be then exposed by another resource, ConsumedMovieResource, via Server-Sent Events.

The Movies will be serialized and deserialized using Avro. The schema, describing the Movie, is stored in Apicurio Registry. The same concept applies if you are using the Confluent Avro serde and Confluent Schema Registry.

Solução

Recomendamos que siga as instruções nas seções seguintes e crie a aplicação passo a passo. No entanto, você pode ir diretamente para o exemplo completo.

Clone o repositório Git: git clone https://github.com/quarkusio/quarkus-quickstarts.git, ou baixe um arquivo.

The solution is located in the kafka-avro-schema-quickstart directory.

Criando o projeto Maven

Primeiro, precisamos de um novo projeto. Crie um novo projeto com o seguinte comando:

CLI
quarkus create app org.acme:kafka-avro-schema-quickstart \
    --extension='rest-jackson,messaging-kafka,apicurio-registry-avro' \
    --no-code
cd kafka-avro-schema-quickstart

Para criar um projeto Gradle, adicione a opção --gradle ou --gradle-kotlin-dsl.

Para obter mais informações sobre como instalar e usar a CLI do Quarkus, consulte o guia Quarkus CLI.

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.16.3:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-avro-schema-quickstart \
    -Dextensions='rest-jackson,messaging-kafka,apicurio-registry-avro' \
    -DnoCode
cd kafka-avro-schema-quickstart

Para criar um projeto Gradle, adicione a opção '-DbuildTool=gradle' ou '-DbuildTool=gradle-kotlin-dsl'.

Para usuários do Windows:

  • Se estiver usando cmd, (não use barra invertida '\' e coloque tudo na mesma linha)

  • Se estiver usando o Powershell, envolva os parâmetros '-D' entre aspas duplas, por exemplo, '"-DprojectArtifactId=kafka-avro-schema-quickstart"'

If you use Confluent Schema Registry, you don’t need the quarkus-apicurio-registry-avro extension. Instead, you need the quarkus-confluent-registry-avro extension and a few more dependencies. See Using the Confluent Schema Registry for details.

Avro schema

Apache Avro is a data serialization system. Data structures are described using schemas. The first thing we need to do is to create a schema describing the Movie structure. Create a file called src/main/avro/movie.avsc with the schema for our record (Kafka message):

{
  "namespace": "org.acme.kafka.quarkus",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "year",
      "type": "int"
    }
  ]
}

If you build the project with:

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

the movies.avsc will get compiled to a Movie.java file placed in the target/generated-sources/avsc directory.

Take a look at the Avro specification to learn more about the Avro syntax and supported types.

With Quarkus, there’s no need to use a specific Maven plugin to process the Avro schema, this is all done for you by the quarkus-avro extension!

If you run the project with:

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

the changes you do to the schema file will be automatically applied to the generated Java files.

The Movie producer

Having defined the schema, we can now jump to implementing the MovieResource.

Let’s open the MovieResource, inject an Emitter of Movie DTO and implement a @POST method that consumes Movie and sends it through the Emitter:

package org.acme.kafka;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/movies")
public class MovieResource {
    private static final Logger LOGGER = Logger.getLogger(MovieResource.class);

    @Channel("movies")
    Emitter<Movie> emitter;

    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
        emitter.send(movie);
        return Response.accepted().build();
    }

}

Now, we need to map the movies channel (the Emitter emits to this channel) to a Kafka topic. To achieve this, edit the application.properties file, and add the following content:

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

You might have noticed that we didn’t define the value.serializer. That’s because Quarkus can autodetect that io.apicurio.registry.serde.avro.AvroKafkaSerializer is appropriate here, based on the @Channel declaration, structure of the Movie type, and presence of the Apicurio Registry libraries. We still have to define the apicurio.registry.auto-register property.

If you use Confluent Schema Registry, you don’t have to configure value.serializer either. It is also detected automatically. The Confluent Schema Registry analogue of apicurio.registry.auto-register is called auto.register.schemas. It defaults to true, so it doesn’t have to be configured in this example. It can be explicitly set to false if you want to disable automatic schema registration.

The Movie consumer

So, we can write records into Kafka containing our Movie data. That data is serialized using Avro. Now, it’s time to implement a consumer for them.

Let’s create ConsumedMovieResource that will consume Movie messages from the movies-from-kafka channel and will expose it via Server-Sent Events:

package org.acme.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {

    @Channel("movies-from-kafka")
    Multi<Movie> movies;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<String> stream() {
        return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
    }
}

The last bit of the application’s code is the configuration of the movies-from-kafka channel in application.properties:

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

You might have noticed that we didn’t define the value.deserializer. That’s because Quarkus can autodetect that io.apicurio.registry.serde.avro.AvroKafkaDeserializer is appropriate here, based on the @Channel declaration, structure of the Movie type, and presence of the Apicurio Registry libraries. We don’t have to define the apicurio.registry.use-specific-avro-reader property either, that is also configured automatically.

If you use Confluent Schema Registry, you don’t have to configure value.deserializer or specific.avro.reader either. They are both detected automatically.

Executando a aplicação

Start the application in dev mode:

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

Kafka broker and Apicurio Registry instance are started automatically thanks to Dev Services. See Dev Services for Kafka and Dev Services for Apicurio Registry for more details.

You might have noticed that we didn’t configure the schema registry URL anywhere. This is because Dev Services for Apicurio Registry configures all Kafka channels in Quarkus Messaging to use the automatically started registry instance.

Apicurio Registry, in addition to its native API, also exposes an endpoint that is API-compatible with Confluent Schema Registry. Therefore, this automatic configuration works both for Apicurio Registry serde and Confluent Schema Registry serde.

However, note that there’s no Dev Services support for running Confluent Schema Registry itself. If you want to use a running instance of Confluent Schema Registry, configure its URL, together with the URL of a Kafka broker:

kafka.bootstrap.servers=PLAINTEXT://localhost:9092
mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081

In the second terminal, query the ConsumedMovieResource resource with curl:

curl -N http://localhost:8080/consumed-movies

In the third one, post a few movies:

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  http://localhost:8080/movies

Observe what is printed in the second terminal. You should see something along the lines of:

data:'The Shawshank Redemption' from 1994

data:'The Godfather' from 1972

data:'The Dark Knight' from 2008

data:'12 Angry Men' from 1957

Execução em modo JVM ou nativo

When not running in dev or test mode, you will need to start your own Kafka broker and Apicurio Registry. The easiest way to get them running is to use docker-compose to start the appropriate containers.

If you use Confluent Schema Registry, you already have a Kafka broker and Confluent Schema Registry instance running and configured. You can ignore the docker-compose instructions here, as well as the Apicurio Registry configuration.

Create a docker-compose.yaml file at the root of the project with the following content:

version: '2'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:2.4.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod

Before starting the application, let’s first start the Kafka broker and Apicurio Registry:

docker-compose up
To stop the containers, use docker-compose down. You can also clean up the containers with docker-compose rm

You can build the application with:

CLI
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

And run it in JVM mode with:

java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
By default, the application tries to connect to a Kafka broker listening at localhost:9092. You can configure the bootstrap server using: java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar

Specifying the registry URL on the command line is not very convenient, so you can add a configuration property only for the prod profile:

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2

You can build a native executable with:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

and run it with:

./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092

Testing the application

As mentioned above, Dev Services for Kafka and Apicurio Registry automatically start and configure a Kafka broker and Apicurio Registry instance in dev mode and for tests. Hence, we don’t have to set up Kafka and Apicurio Registry ourselves. We can just focus on writing the test.

First, let’s add test dependencies on REST Client and Awaitility to the build file:

pom.xml
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-client</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")

In the test, we will send movies in a loop and check if the ConsumedMovieResource returns what we send.

package org.acme.kafka;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

@QuarkusTest
public class MovieResourceTest {

    @TestHTTPResource("/consumed-movies")
    URI consumedMovies;

    @Test
    public void testHelloEndpoint() throws InterruptedException {
        // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(consumedMovies);

        List<String> received = new CopyOnWriteArrayList<>();

        SseEventSource source = SseEventSource.target(target).build();
        source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));

        // in a separate thread, feed the `MovieResource`
        ExecutorService movieSender = startSendingMovies();

        source.open();

        // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
        await().atMost(5, SECONDS).until(() -> received.size() >= 2);
        assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
                "'12 Angry Men' from 1957"));
        source.close();

        // shutdown the executor that is feeding the `MovieResource`
        movieSender.shutdownNow();
        movieSender.awaitTermination(5, SECONDS);
    }

    private ExecutorService startSendingMovies() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            while (true) {
                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        return executorService;
    }

}
We modified the MovieResourceTest that was generated together with the project. This test class has a subclass, NativeMovieResourceIT, that runs the same test against the native executable. To run it, execute:
CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

Manual setup

If we couldn’t use Dev Services and wanted to start a Kafka broker and Apicurio Registry instance manually, we would define a QuarkusTestResourceLifecycleManager.

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>strimzi-test-container</artifactId>
    <version>0.105.0</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
build.gradle
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
    exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

    private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

    private GenericContainer<?> registry;

    @Override
    public Map<String, String> start() {
        kafka.start();
        registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
                .withExposedPorts(8080)
                .withEnv("QUARKUS_PROFILE", "prod");
        registry.start();
        Map<String, String> properties = new HashMap<>();
        properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
                "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
        return properties;
    }

    @Override
    public void stop() {
        registry.stop();
        kafka.stop();
    }
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
    ...
}

Using compatible versions of the Apicurio Registry

The quarkus-apicurio-registry-avro extension depends on recent versions of Apicurio Registry client, and most versions of Apicurio Registry server and client are backwards compatible. For some you need to make sure that the client used by Serdes is compatible with the server.

For example, with Apicurio Dev Service if you set the image name to use version 2.1.5.Final:

quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final

You need to make sure that apicurio-registry-serdes-avro-serde dependency and the REST client apicurio-common-rest-client-vertx dependency are set to compatible versions:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-apicurio-registry-avro</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-vertx</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-client</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-common</artifactId>
    <version>2.1.5.Final</version>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-serdes-avro-serde</artifactId>
    <version>2.1.5.Final</version>
    <exclusions>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-common-rest-client-jdk</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-client</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-common-rest-client-vertx</artifactId>
    <version>0.1.5.Final</version>
</dependency>
build.gradle
dependencies {
    implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))

    ...

    implementation("io.quarkus:quarkus-apicurio-registry-avro")
    implementation("io.apicurio:apicurio-registry-serdes-avro-serde") {
        exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
        exclude group: "io.apicurio", module: "apicurio-registry-client"
        exclude group: "io.apicurio", module: "apicurio-registry-common"
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-client") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-registry-common") {
        version {
            strictly "2.1.5.Final"
        }
    }
    implementation("io.apicurio:apicurio-common-rest-client-vertx") {
        version {
            strictly "0.1.5.Final"
        }
    }
}

Known previous compatible versions for apicurio-registry-client and apicurio-common-rest-client-vertx are the following

  • apicurio-registry-client 2.1.5.Final with apicurio-common-rest-client-vertx 0.1.5.Final

  • apicurio-registry-client 2.3.1.Final with apicurio-common-rest-client-vertx 0.1.13.Final

Using the Confluent Schema Registry

If you want to use the Confluent Schema Registry, you need the quarkus-confluent-registry-avro extension, instead of the quarkus-apicurio-registry-avro extension. Also, you need to add a few dependencies and a custom Maven repository to your pom.xml / build.gradle file:

pom.xml
<dependencies>
    ...
    <!-- the extension -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-confluent-registry-avro</artifactId>
    </dependency>
    <!-- Confluent registry libraries use Jakarta REST client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>jakarta.ws.rs</groupId>
                <artifactId>jakarta.ws.rs-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

<repositories>
    <!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
build.gradle
repositories {
    ...

    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    ...

    implementation("io.quarkus:quarkus-confluent-registry-avro")

    // Confluent registry libraries use Jakarta REST client
    implementation("io.quarkus:quarkus-rest-client")

    implementation("io.confluent:kafka-avro-serializer:7.2.0") {
        exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
    }
}

In JVM mode, any version of io.confluent:kafka-avro-serializer can be used. In native mode, Quarkus supports the following versions: 6.2.x, 7.0.x, 7.1.x, 7.2.x, 7.3.x.

For version 7.4.x and 7.5.x, due to an issue with the Confluent Schema Serializer, you need to add another dependency:

pom.xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-csv</artifactId>
</dependency>
build.gradle
dependencies {
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}

For any other versions, the native configuration may need to be adjusted.

Avro code generation details

In this guide we used the Quarkus code generation mechanism to generate Java files from Avro schema.

Under the hood, the mechanism uses org.apache.avro:avro-compiler.

You can use the following configuration properties to alter how it works:

  • avro.codegen.[avsc|avdl|avpr].imports - a list of files or directories that should be compiled first thus making them importable by subsequently compiled schemas. Note that imported files should not reference each other. All paths should be relative to the src/[main|test]/avro directory, or avro sub-directory in any source directory configured by the build system. Passed as a comma-separated list.

  • avro.codegen.stringType - the Java type to use for Avro strings. May be one of CharSequence, String or Utf8. Defaults to String

  • avro.codegen.createOptionalGetters - enables generating the getOptional…​ methods that return an Optional of the requested type. Defaults to false

  • avro.codegen.enableDecimalLogicalType - determines whether to use Java classes for decimal types, defaults to false

  • avro.codegen.createSetters - determines whether to create setters for the fields of the record. Defaults to false

  • avro.codegen.gettersReturnOptional - enables generating get…​ methods that return an Optional of the requested type. Defaults to false

  • avro.codegen.optionalGettersForNullableFieldsOnly, works in conjunction with gettersReturnOptional option. If it is set, Optional getters will be generated only for fields that are nullable. If the field is mandatory, regular getter will be generated. Defaults to false

Leitura adicional

Conteúdo Relacionado