The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Introdução à Mensageria Quarkus com Apache Kafka

Este guia demonstra como sua aplicação Quarkus pode utilizar a Mensageria Quarkus para interagir com o Apache Kafka.

Pré-requisitos

Para concluir este guia, você precisa:

  • Cerca de 15 minutos

  • Um IDE

  • JDK 17+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.9

  • Docker e Docker Compose ou Podman e Docker Compose

  • Opcionalmente, o Quarkus CLI se você quiser usá-lo

  • Opcionalmente, Mandrel ou GraalVM instalado e configurado apropriadamente se você quiser criar um executável nativo (ou Docker se você usar uma compilação de contêiner nativo)

Arquitetura

Neste guia, vamos desenvolver duas aplicações que se comunicam com o Kafka. A primeira aplicação envia um quote request para o Kafka e consome mensagens Kafka do tópico quote. A segunda aplicação recebe o quote request e envia uma quote de volta.

Architecture

A primeira aplicação, o producer, permitirá que o usuário solicite algumas cotações por meio de um endpoint HTTP. Para cada solicitação de cotação, um identificador aleatório é gerado e retornado ao usuário, para marcar a solicitação de cotação como pending. Ao mesmo tempo, o id de solicitação gerado é enviado por um tópico Kafka quote-requests.

Producer App UI

A segunda aplicação, o processor, lerá do tópico quote-requests, colocará um preço aleatório na cotação e a enviará para um tópico Kafka chamado quotes.

Por fim, o producer lerá as cotações e as enviará para o navegador usando Server-Sent Events. Assim, o usuário verá o preço da cotação atualizado de pending para o preço recebido em tempo real.

Solução

Recomendamos que siga as instruções nas próximas seções e crie as aplicações passo a passo. No entanto, você pode ir direto para o exemplo finalizado.

Clone o repositório Git: git clone https://github.com/quarkusio/quarkus-quickstarts.git, ou baixe um arquivo.

A solução está localizada no diretório kafka-quickstart .

Criando o projeto Maven

Primeiramente, temos que criar dois projetos: o producer e o processor.

Para criar o projeto producer, em um terminal, execute:

CLI
quarkus create app org.acme:kafka-quickstart-producer \
    --extension='rest-jackson,messaging-kafka' \
    --no-code

Para criar um projeto Gradle, adicione a opção --gradle ou --gradle-kotlin-dsl.

Para obter mais informações sobre como instalar e usar a CLI do Quarkus, consulte o guia Quarkus CLI.

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.17.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart-producer \
    -Dextensions='rest-jackson,messaging-kafka' \
    -DnoCode

Para criar um projeto Gradle, adicione a opção '-DbuildTool=gradle' ou '-DbuildTool=gradle-kotlin-dsl'.

Para usuários do Windows:

  • Se estiver usando cmd, (não use barra invertida '\' e coloque tudo na mesma linha)

  • Se estiver usando o Powershell, envolva os parâmetros '-D' entre aspas duplas, por exemplo, '"-DprojectArtifactId=kafka-quickstart-producer"'

Esse comando cria a estrutura do projeto e seleciona duas extensões do Quarkus que usaremos:

  1. Quarkus REST (anteriormente RESTEasy Reactive) e seu suporte Jackson (para manipular JSON) para servir o endpoint HTTP.

  2. O conector Kafka para Mensageria Reativa

Para criar o projeto processor, a partir do mesmo diretório, execute:

CLI
quarkus create app org.acme:kafka-quickstart-processor \
    --extension='messaging-kafka' \
    --no-code

Para criar um projeto Gradle, adicione a opção --gradle ou --gradle-kotlin-dsl.

Para obter mais informações sobre como instalar e usar a CLI do Quarkus, consulte o guia Quarkus CLI.

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.17.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart-processor \
    -Dextensions='messaging-kafka' \
    -DnoCode

Para criar um projeto Gradle, adicione a opção '-DbuildTool=gradle' ou '-DbuildTool=gradle-kotlin-dsl'.

Para usuários do Windows:

  • Se estiver usando cmd, (não use barra invertida '\' e coloque tudo na mesma linha)

  • Se estiver usando o Powershell, envolva os parâmetros '-D' entre aspas duplas, por exemplo, '"-DprojectArtifactId=kafka-quickstart-processor"'

Nesse ponto, você deve ter a seguinte estrutura:

.
├── kafka-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── kafka-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

Abra os dois projetos na sua IDE preferida.

Serviços de Desenvolvimento

Não é necessário iniciar um broker Kafka ao usar o modo de desenvolvimento ou para testes. O Quarkus inicia um broker para você automaticamente. Veja Dev Services para Kafka para detalhes.

O objeto Quote

A classe Quote será usada tanto no projeto producer quanto no processor. Para simplificar, duplicaremos a classe. Em ambos os projetos, crie o arquivo src/main/java/org/acme/kafka/model/Quote.java, com o seguinte conteúdo:

package org.acme.kafka.model;

public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

A representação JSON dos objetos Quote será usada nas mensagens enviadas para o tópico Kafka e também nos Server-Sent Events para navegadores web.

O Quarkus possui capacidades integradas para lidar com mensagens Kafka JSON. Em uma seção seguinte, criaremos classes de serializador/deserializador para Jackson.

Enviando pedido de cotação

Dentro do projeto producer, crie o arquivo src/main/java/org/acme/kafka/producer/QuotesResource.java e adicione o seguinte conteúdo:

package org.acme.kafka.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests")
    Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" Kafka topic using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString(); (3)
    }
}
1 Injete um Emitter de Mensageria Reativa para enviar mensagens para o canal quote-requests.
2 Em uma solicitação post, gere um UUID aleatório e envie-o para o tópico Kafka usando o emissor.
3 Retorne o mesmo UUID para o cliente.

O canal quote-requests será gerenciado como um tópico Kafka, pois esse é o único conector no classpath. Se não indicado de outra forma, como neste exemplo, o Quarkus usa o nome do canal como nome do tópico. Então, neste exemplo, a aplicação escreve no tópico quote-requests. O Quarkus também configura o serializador automaticamente, porque detecta que o Emitter produz valores String.

Quando você tem vários conectores, você precisaria indicar qual conector deseja usar na configuração do aplicativo.

Processando solicitações de cotação

Agora vamos consumir a solicitação de cotação e dar um preço. Dentro do projeto processor, crie o arquivo src/main/java/org/acme/kafka/processor/QuotesProcessor.java e adicione o seguinte conteúdo:

package org.acme.kafka.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" Kafka topic (mapped to "requests" channel) and giving out a random quote.
 * The result is pushed to the "quotes" Kafka topic.
 */
@ApplicationScoped
public class QuotesProcessor {

    private Random random = new Random();

    @Incoming("requests") (1)
    @Outgoing("quotes")   (2)
    @Blocking             (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard working task
        Thread.sleep(200);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 Indica que o método consome os itens do canal requests.
2 Indica que os objetos retornados pelo método são enviados para o canal quotes.
3 Indica que o processamento é bloqueante e não pode ser executado na thread chamadora.

Para cada Kafka record do tópico quote-requests, a Mensageria Reativa chama o método process, e envia o objeto Quote retornado para o canal quotes. Neste caso, precisamos configurar o canal no arquivo application.properties, para configurar os canais requests e quotes:

%dev.quarkus.http.port=8081

# Configure the incoming `quote-requests` Kafka topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.auto.offset.reset=earliest

Observe que, neste caso, temos uma configuração de conector de entrada e uma de saída, cada uma com um nome distinto. As propriedades de configuração são estruturadas da seguinte forma:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

O segmento channel-name deve corresponder ao valor definido nas anotações @Incoming e @Outgoing:

  • quote-requests → Tópico Kafka do qual lemos as solicitações de cotação

  • quotes → Tópico Kafka no qual escrevemos as cotações

Mais detalhes sobre esta configuração estão disponíveis na seção Producer configuration e Consumer configuration da documentação do Kafka. Essas propriedades são configuradas com o prefixo kafka. Uma lista exaustiva de propriedades de configuração está disponível em Kafka Reference Guide - Configuration.

mp.messaging.incoming.requests.auto.offset.reset=earliest instrui a aplicação a começar a ler os tópicos a partir do primeiro offset, quando não há um offset registrado para o grupo de consumidores. Em outras palavras, também processará mensagens enviadas antes de iniciarmos a aplicação processadora.

Não é necessário definir serializadores ou desserializadores. O Quarkus os detecta e, se nenhum for encontrado, gera-os usando serialização JSON.

Recebendo cotações

De volta ao nosso projeto producer. Vamos modificar o QuotesResource para consumir cotações do Kafka e enviá-las de volta ao cliente via Server-Sent Events:

import io.smallrye.mutiny.Multi;

...

@Channel("quotes")
Multi<Quote> quotes; (1)

/**
 * Endpoint retrieving the "quotes" Kafka topic and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 Injecta o canal quotes utilizando o qualificador @Channel
2 Indica que o conteúdo é enviado usando Server-Sent Events
3 Retorna o fluxo (Reactive Stream)

Não há necessidade de configurar nada, pois o Quarkus associará automaticamente o canal quotes ao tópico quotes do Kafka. Ele também gerará um desserializador para a classe Quote.

Serialização de mensagens no Kafka

Neste exemplo, usamos Jackson para serializar/desserializar mensagens do Kafka. Para mais opções de serialização de mensagens, veja Guia de Referência do Apache Kafka - Serialização JSON.

Sugerimos fortemente a adoção de uma abordagem baseada em contrato usando um Schema Registry. Para aprender mais sobre como usar o Apache Kafka com o Schema Registry e Avro, siga o guia Utilização do Apache Kafka com o Schema Registry e o Avro para o Avro ou você pode seguir o guia Utilização do Apache Kafka com o Schema Registry e JSON Schema.

A página HTML

O toque final, a página HTML solicitando cotações e exibindo os preços obtidos por SSE.

Dentro do projeto producer, crie o arquivo src/main/resources/META-INF/resources/quotes.html com o seguinte conteúdo:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Prices</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").prepend(row);
        });
    });

    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html((index, html) => {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

Nada espetacular aqui. Quando o usuário clica no botão, uma solicitação HTTP é feita para solicitar uma cotação, e uma cotação pendente é adicionada à lista. A cada cotação recebida por SSE, o item correspondente na lista é atualizado.

Executando a aplicação

Você só precisa executar ambas as aplicações. Em um terminal, execute:

mvn -f producer quarkus:dev

Em outro terminal, execute:

mvn -f processor quarkus:dev

O Quarkus inicia um broker Kafka automaticamente, configura a aplicação e compartilha a instância do broker Kafka entre diferentes aplicações. Veja Dev Services para o Kafka para mais detalhes.

Abra http://localhost:8080/quotes.html no seu navegador e solicite algumas cotações clicando no botão.

Execução na JVM ou modo nativo

Quando não estiver executando em modo de desenvolvimento ou teste, você precisará iniciar seu broker Kafka. Você pode seguir as instruções Site do Apache Kafka ou criar um arquivo docker-compose.yaml com o seguinte conteúdo:

version: '3.5'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
    networks:
      - kafka-quickstart-network

  kafka:
    image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      - kafka-quickstart-network

  producer:
    image: quarkus-quickstarts/kafka-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8080:8080"
    networks:
      - kafka-quickstart-network

  processor:
    image: quarkus-quickstarts/kafka-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafka-quickstart-network

networks:
  kafka-quickstart-network:
    name: kafkaquickstart

Certifique-se de construir primeiro ambas as aplicações no modo JVM com:

mvn -f producer package
mvn -f processor package

Uma vez empacotado, execute docker-compose up.

Este é um cluster de desenvolvimento, não use em produção.

Você também pode construir e executar nossas aplicações como executáveis nativos. Primeiro, compile ambas as aplicações como nativas:

mvn -f producer package -Dnative -Dquarkus.native.container-build=true
mvn -f processor package -Dnative -Dquarkus.native.container-build=true

Execute o sistema com:

export QUARKUS_MODE=native
docker-compose up --build

Avançando mais

Este guia mostrou como você pode interagir com o Kafka usando o Quarkus. Ele utiliza o SmallRye Reactive Messaging para construir aplicações de streaming de dados.

Para a lista completa de recursos e opções de configuração, consulte o Guia de Referência do Apache Kafka.

Neste guia, exploramos como podemos interagir com o Apache Kafka usando as extensões de Mensageria do Quarkus. A extensão do Quarkus para Kafka também permite acessar clientes Kafka diretamente.

Conteúdo Relacionado