The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Guia de Referência do Apache Kafka

Este guia de referência demonstra como sua aplicação Quarkus pode utilizar a Mensageria Quarkus para interagir com o Apache Kafka.

1. Introdução

O Apache Kafka é uma plataforma popular de streaming de eventos distribuídos de código aberto. É comumente usado para pipelines de dados de alto desempenho, análise de streaming, integração de dados e aplicações de missão crítica. Semelhante a uma fila de mensagens ou a uma plataforma de mensagens corporativas, ele permite que você:

  • publique (escreva) e assine (leia) fluxos de eventos, chamados de registros.

  • armazene fluxos de registros de forma durável e confiável dentro de tópicos.

  • processe fluxos de registros à medida que eles ocorrem ou retrospectivamente.

E toda esta funcionalidade é fornecida de uma forma distribuída, altamente escalável, elástica, tolerante a falhas e segura.

2. Extensão Quarkus para Apache Kafka

O Quarkus oferece suporte para Apache Kafka através do framework Mensageria Reativa do SmallRye. Baseado na especificação Eclipse MicroProfile Reactive Messaging 2.0, ela propõe um modelo de programação flexível que conecta CDI e processamento orientado a eventos.

Este guia fornece uma visão aprofundada sobre Apache Kafka e o framework SmallRye Reactive Messaging. Para um início rápido, consulte Introdução à Mensageria Quarkus com Apache Kafka.

Você pode adicionar a extensão messaging-kafka ao seu projeto executando o seguinte comando no diretório base do seu projeto:

CLI
quarkus extension add messaging-kafka
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-kafka'
Gradle
./gradlew addExtension --extensions='messaging-kafka'

Isto adicionará o seguinte ao seu arquivo de build:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

A extensão inclui kafka-clients versão 3.2.1 como uma dependência transitiva e é compatível com brokers Kafka versão 2.x.

3. Configuring SmallRye Kafka Connector

Because SmallRye Reactive Messaging framework supports different messaging backends like Apache Kafka, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:

  • Aplicações enviam e recebem mensagens. Uma mensagem encapsula um payload e pode ser estendida com alguns metadados. Com o conector Kafka, uma mensagem corresponde a um registro Kafka.

  • As mensagens transitam nos canais. Os componentes da aplicação ligam-se aos canais para publicar e consumir mensagens. O conector Kafka mapeia canais para tópicos Kafka.

  • Os canais são ligados a backends de mensagens através de conectores. Os conectores são configurados para mapear as mensagens de entrada para um canal específico (consumido pela aplicação) e recolher as mensagens de saída enviadas para um canal específico. Cada conector é dedicado a uma tecnologia de mensagens específica. Por exemplo, o conector que lida com o Kafka tem o nome de smallrye-kafka.

Uma configuração mínima para o conector Kafka com um canal de entrada tem o seguinte aspecto:

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.incoming.prices.connector=smallrye-kafka (2)
1 Configure a localização do broker para o perfil de produção. Você pode configurá-lo globalmente ou por canal usando a propriedade mp.messaging.incoming.$channel.bootstrap.servers. No modo de desenvolvimento e ao executar testes, Dev Services para o Kafka inicia automaticamente um broker Kafka. Quando não fornecida, esta propriedade tem o valor padrão localhost:9092.
2 Configure o conector para gerenciar o canal de preços. Por padrão, o nome do tópico é o mesmo que o nome do canal. Você pode configurar o atributo do tópico para o substituir.
O prefixo %prod indica que a propriedade só é utilizada quando a aplicação é executada em modo de produção (portanto, não em desenvolvimento ou teste). Consulte a documentação do Perfil para obter mais detalhes.
Fixação automática do conector

Se você tiver um único conector no classpath, poderá omitir a configuração do atributo connector. O Quarkus associa automaticamente os canais órfãos ao conector (único) encontrado no classpath. Os canais órfãos são canais de saída sem um consumidor downstream ou canais de entrada sem um produtor ascendente.

Esta ligação automática pode ser desativada utilizando:

quarkus.messaging.auto-connector-attachment=false

4. Recebendo mensagens do Kafka

Continuando com a configuração mínima anterior, a sua aplicação Quarkus pode receber diretamente a conteúdo da mensagem:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

Existem várias outras formas da sua aplicação consumir mensagens recebidas:

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

O tipo Message permite que o método consumidor acesse os metadados da mensagem recebida e manipule a confirmação manualmente. Iremos explorar diferentes estratégias de confirmação em Estratégias de Confirmação.

Se você pretende acessar diretamente os objetos do registro Kafka, utilize:

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord é fornecido pelo cliente Kafka subjacente e pode ser injetado diretamente no método do consumidor. Outra abordagem mais simples consiste em usar Record:

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record é um simples encapsulador em torno da chave e do conteúdo do registro Kafka recebido.

@Channel

Alternativamente, sua aplicação pode injetar um Multi no seu bean e assinar os seus eventos, como no exemplo seguinte:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

Esse é um bom exemplo de como integrar um consumidor Kafka com outro downstream, neste exemplo, expondo-o como um endpoint Server-Sent Events.

Ao consumir mensagens com @Channel, o código da aplicação é responsável pela assinatura. No exemplo acima, o endpoint Quarkus REST (anteriormente RESTEasy Reactive) cuida disso para você.

Os seguintes tipos podem ser injetados como canais:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

Assim como no exemplo anterior com Message, se o seu canal injetado recebe payloads (Multi<T>), ele reconhece a mensagem automaticamente e suporta múltiplos assinantes. Se o seu canal injetado recebe Mensagem (Multi<Message<T>>), você será responsável pela confirmação e transmissão. Iremos explorar o envio de mensagens de transmissão em Difusão de mensagens em vários consumidores.

Injetar @Channel("prices") ou ter @Incoming("prices") não configura automaticamente a aplicação para consumir mensagens do Kafka. Você precisa configurar um conector de entrada com mp.messaging.incoming.prices…​ ou ter um método @Outgoing("prices") em algum lugar do sua aplicação (nesse caso, prices será um canal na memória).

4.1. Bloqueando o processamento

A Mensageria Reativa invoca seu método em um thread de E/S. Consulte a documentação da Arquitetura Reativa do Quarkus para obter mais detalhes sobre esse tópico. Mas, muitas vezes, você precisa combinar o envio de mensagens reativas com processamento blocante, como interações de banco de dados. Para isso, você precisa usar a anotação @Blocking indicando que o processamento está bloqueando e não deve ser executado no thread do chamador.

Por exemplo, o código a seguir ilustra como é possível armazenar conteúdos recebidos em uma base de dados usando o Hibernate com Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

O exemplo completo está disponível nodiretório kafka-panache-quickstart.

Existem 2 anotações de @Blocking:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

Eles têm o mesmo efeito. Portanto, você pode usar os dois. O primeiro fornece um ajuste mais refinado, como o pool de trabalho a ser usado e se ele preserva a ordem. O segundo, usado também com outros recursos reativos do Quarkus, usa o pool de trabalho padrão e preserva a ordem.

Informações detalhadas sobre a utilização da anotação @Blocking podem ser encontradas em Mensageria Reativa do SmallRye - Lidando com execução blocante.

@RunOnVirtualThread

Para executar o processamento blocante em threads virtuais Java, consulte a documentação de suporte do Quarkus à Virtual Thread com Mensageria Reativa.

@Transactional

Se o seu método estiver anotado com @Transactional, será considerado blocante automaticamente, mesmo que o método não esteja anotado com @Blocking.

4.2. Estratégias de Reconhecimento

Todas as mensagens recebidas por um consumidor devem ser confirmadas. Na ausência de confirmação, o processamento é considerado um erro. Se o método do consumidor receber um Record ou um conteúdo, a mensagem será confirmada no retorno do método, também conhecido como Strategy.POST_PROCESSING. Se o método do consumidor retornar outro fluxo reativo ou CompletionStage, a mensagem será confirmada quando a mensagem downstream for confirmada. Você pode substituir o comportamento padrão para acessar a mensagem na chegada (Strategy.PRE_PROCESSING) ou não acessar a mensagem de forma alguma (Strategy.NONE) no método do consumidor, como no exemplo a seguir:

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

Se o método consumidor receber uma Message, a estratégia de reconhecimento será Strategy.MANUAL e o método consumidor será responsável por reconhecer/não reconhecer a mensagem.

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

Como mencionado acima, o método também pode substituir a estratégia de confirmação para PRE_PROCESSING ou NONE.

4.3. Estratégias de Confirmação

Quando uma mensagem produzida a partir de um registro do Kafka é reconhecida, o conector invoca uma estratégia de confirmação. Essas estratégias decidem quando o deslocamento do consumidor para um tópico/partição específico é confirmado. A confirmação de um deslocamento indica que todos os registros anteriores foram processados. É também a posição em que a aplicação reiniciaria o processamento após uma recuperação de falha ou uma reinicialização.

A confirmação de cada deslocamento tem penalidades de desempenho, pois o gerenciamento de deslocamento do Kafka pode ser lento. No entanto, não confirmar o deslocamento com frequência suficiente pode levar à duplicação de mensagens se a aplicação falhar entre duas confirmações.

O conector Kafka suporta três estratégias:

  • throttled mantém o controle das mensagens recebidas e confirma um deslocamento da última mensagem recebida em sequência (ou seja, todas as mensagens anteriores também foram recebidas). Essa estratégia garante a entrega pelo menos uma vez, mesmo que o canal execute o processamento assíncrono. O conector rastreia os registros recebidos e periodicamente (período especificado por auto.commit.interval.ms, padrão: 5000 ms) confirma o maior deslocamento consecutivo. O conector será marcado como não saudável se uma mensagem associada a um registro não for confirmada em throttled.unprocessed-record-max-age.ms (padrão: 60000 ms). Na verdade, essa estratégia não pode confirmar o deslocamento assim que houver falha no processamento de um único registro. Se throttled.unprocessed-record-max-age.ms for definido como menor ou igual a 0, ele não realizará nenhuma verificação de integridade. Essa configuração pode levar à falta de memória se houver mensagens do tipo "pílula de veneno" (que nunca são aceitas). Essa estratégia é o padrão se enable.auto.commit não estiver explicitamente definido como verdadeiro.

  • checkpoint permite persistir os offsets do consumidor em um state store, em vez de registra-los de volta para o broker Kafka. Usando a API CheckpointMetadata, o código do consumidor pode persistir um processing state com o offset do registro para marcar o progresso de um consumidor. Quando o processamento continua a partir de um offset previamente persistido, ele posiciona o consumidor Kafka nesse offset e também restaura o estado persistido, continuando o processamento com estado de onde parou. A estratégia de checkpoint mantém localmente o estado de processamento associado ao último offset e o persiste periodicamente no state store (período especificado por auto.commit.interval.ms (padrão: 5000)). O conector será marcado como não saudável se nenhum estado de processamento for persistido no state store em checkpoint.unsynced-state-max-age.ms (padrão: 10000). Se checkpoint.unsynced-state-max-age.ms for definido como menor ou igual a 0, ele não realizará nenhuma verificação de saúde. Para mais informações, consulte Processamento com estado com Ponto de Verificação.

  • latest confirma o offset do registro recebido pelo consumidor Kafka assim que a mensagem associada é reconhecida (se o offset for maior que o offset previamente confirmado). Esta estratégia fornece entrega pelo menos uma vez se o canal processa a mensagem sem realizar nenhum processamento assíncrono. Especificamente, o offset da mensagem mais recentemente reconhecida sempre será confirmado, mesmo que mensagens mais antigas ainda não tenham terminado de ser processadas. Em caso de incidente, como uma falha, o processamento reiniciaria após o último commit, resultando em mensagens mais antigas que nunca foram processadas com sucesso e completamente, o que poderia ser interpretado como perda de mensagem. Esta estratégia não deve ser usada em um ambiente de carga alta, pois a confirmação do offset é custosa. No entanto, ela reduz o risco de duplicatas.

  • ignore não realiza nenhuma confirmação. Essa estratégia é a estratégia padrão quando o consumidor é configurado explicitamente com enable.auto.commit como true. Ela delega a confirmação de deslocamento para o cliente Kafka subjacente. Quando enable.auto.commit é true, essa estratégia NÃO garante a entrega pelo menos uma vez. A Mensageria Reativa do SmallRye processa registros de forma assíncrona, de modo que os deslocamentos podem ser confirmados para registros que foram pesquisados, mas ainda não processados. Em caso de falha, apenas os registros que ainda não foram confirmados serão reprocessados.

O conector Kafka desativa a confirmação automática do Kafka quando ela não está explicitamente ativada. Esse comportamento difere do consumidor tradicional do Kafka. Se a alta taxa de transferência for importante para você, e se não estiver limitado pelo downstream, recomendamos:

  • utilizar a política throttled,

  • ou definir enable.auto.commit como verdadeiro e anotar o método de consumo com @Acknowledgment(Acknowledgment.Strategy.NONE).

SmallRye Reactive Messaging enables implementing custom commit strategies. See SmallRye Reactive Messaging documentation for more information.

4.4. Estratégias de Tratamento de Erros

Se uma mensagem produzida a partir de um registro Kafka não for enviada, é aplicada uma estratégia de falha. O conector Kafka suporta três estratégias:

  • fail: falha a aplicação, não serão processados mais registros (estratégia padrão). O deslocamento do registro que não foi processado corretamente não é confirmado.

  • ignore: a falha é registrada, mas o processamento continua. O deslocamento do registro que não foi processado corretamente é confirmado.

  • dead-letter-queue: o deslocamento do registro que não foi processado corretamente é confirmado, mas o registro é escrito em um tópico de letra morta do Kafka.

A estratégia é selecionada utilizando o atributo failure-strategy.

No caso do dead-letter-queue, você pode configurar os seguintes atributos:

  • dead-letter-queue.topic: o tópico usado para escrever os registros não processados corretamente, o padrão é dead-letter-topic-$channel, sendo $channel o nome do canal.

  • dead-letter-queue.key.serializer: o serializador usado para escrever a chave de registro na fila de letra morta. Por padrão, o serializador é deduzido a partir do desserializador da chave.

  • dead-letter-queue.value.serializer: o serializador usado para escrever o valor do registro na fila de letras mortas. Por padrão, o serializador é deduzido a partir do desserializador do valor.

O registro escrito na fila de cartas mortas contém um conjunto de cabeçalhos adicionais sobre o registro original:

  • dead-letter-reason: o motivo da falha

  • dead-letter-cause: a causa da falha, se houver

  • dead-letter-topic: o tópico original do registro

  • dead-letter-partition: a partição original do registro (inteiro mapeado para String)

  • dead-letter-deslocamento: o deslocamento original do registro (long mapeado para String)

SmallRye Reactive Messaging enables implementing custom failure strategies. See SmallRye Reactive Messaging documentation for more information.

4.4.1. Repetindo o processamento

Você pode combinar a Mensageria Reativa com a Tolerância a Falhas do SmallRye e tentar novamente o processamento em caso de falha:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

Você pode configurar o atraso, o número de tentativas, o jitter, etc.

Se o método devolver um Uni ou CompletionStage, é necessário acrescentar a anotação @NonBlocking:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}
A anotação @NonBlocking só é necessária com a Tolerância a Falha do SmallRye 5.1.0 e versões anteriores. A partir da Tolerância a Falha do SmallRye 5.2.0 (disponível desde o Quarkus 2.1.0.Final), ela não é necessária. Consulte a documentação da Tolerância a Falha do SmallRye para obter mais informações.

As mensagens recebidas são reconhecidas somente quando o processamento é concluído com êxito. Portanto, ele confirma o deslocamento após o processamento bem-sucedido. Se o processamento ainda falhar, mesmo depois de todas as tentativas, a mensagem será não reconhecida e a estratégia de falha será aplicada.

4.4.2. Tratando Falhas de Desserialização

Quando ocorre uma falha na desserialização, você pode interceptá-la e fornecer uma estratégia de falha. Para isso, você precisa criar um bean que implemente a interface DeserializationFailureHandler<T>:

@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

Para utilizar este manipulador de falhas, o bean deve ser exposto com o qualificador @Identifier e a configuração do conector deve especificar o atributo mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler (para desserializadores de chave ou de valor).

O manipulador é chamado com detalhes da desserialização, inclusive a ação representada como Uni<T>. Na desserialização Uni podem ser implementadas estratégias de falha, como tentar novamente, fornecer um valor de reserva ou aplicar o tempo limite.

Se você não configurar um manipulador de falha de desserialização e ocorrer uma falha de desserialização, a aplicação será marcada como não saudável. Você também pode ignorar a falha, o que registrará a exceção e produzirá um valor null. Para habilitar esse comportamento, configure o atributo mp.messaging.incoming.$channel.fail-on-deserialization-failure para false.

Se o atributo fail-on-deserialization-failure estiver configurado como false e o atributo failure-strategy for dead-letter-queue, o registro com falha será enviado para o tópico da dead letter queue correspondente.

4.5. Grupos de Consumidores

No Kafka, um grupo de consumidores é um conjunto de consumidores que cooperam para consumir dados de um tópico. Um tópico é dividido em um conjunto de partições. As partições de um tópico são atribuídas entre os consumidores do grupo, o que permite dimensionar efetivamente a taxa de transferência do consumo. Observe que cada partição é atribuída a um único consumidor de um grupo. No entanto, um consumidor pode ser atribuído a várias partições se o número de partições for maior que o número de consumidores no grupo.

Vamos explorar brevemente diferentes padrões de produtor/consumidor e como implementá-los usando o Quarkus:

  1. Uma única thread de consumidor dentro de um grupo de consumidores

    Esse é o comportamento padrão de uma aplicação que se inscreve em um tópico do Kafka: Cada conector Kafka criará um único thread de consumidor e o colocará em um único grupo de consumidores. O ID do grupo de consumidores tem como padrão o nome da aplicação, conforme definido pela propriedade de configuração quarkus.application.name. Ele também pode ser definido por meio da propriedade kafka.group.id.

    Architecture
  2. Vários threads de consumidores em um grupo de consumidores

    Para uma determinada instância da aplicação, o número de consumidores dentro do grupo de consumidores pode ser configurado usando a propriedade mp.messaging.incoming.$channel.concurrency. As partições do tópico assinado serão divididas entre as threads de consumidores. Observe que se o valor de concorrência exceder o número de partições do tópico, algumas threads de consumidores não serão atribuídas a nenhuma partição.

    Architecture
    Depreciação

    O atributo de concorrência fornece uma maneira agnóstica de conector para canais concorrentes não blocantes e substitui o atributo partitions específico do conector Kafka. O atributo partitions está, portanto, obsoleto e será removido em versões futuras.

  3. Várias aplicações de consumo dentro de um grupo de consumidores

    Da mesma forma que no exemplo anterior, várias instâncias de uma aplicação podem se inscrever em um único grupo de consumidores, configurado por meio da propriedade mp.messaging.incoming.$channel.group.id ou deixado como padrão para o nome da aplicação. Isso, por sua vez, dividirá as partições do tópico entre as instâncias da aplicação.

    Architecture
  4. Pub/Sub: Vários grupos de consumidores inscritos em um tópico

    Por fim, diferentes aplicações podem se inscrever independentemente nos mesmos tópicos usando diferentes IDs de grupos de consumidores. Por exemplo, as mensagens publicadas em um tópico chamado pedidos podem ser consumidas de forma independente em duas aplicações de consumo, uma com mp.messaging.incoming.orders.group.id=invoicing e a segunda com mp.messaging.incoming.orders.group.id=shipping. Assim, diferentes grupos de consumidores podem ser dimensionados de forma independente de acordo com os requisitos de consumo de mensagens.

    Architecture

Um requisito negocial comum é consumir e processar os registros do Kafka em ordem. O broker Kafka preserva a ordem dos registros dentro de uma partição e não dentro de um tópico. Portanto, é importante pensar em como os registros são particionados dentro de um tópico. O particionador padrão usa o hash da chave do registro para calcular a partição de um registro ou, quando a chave não é definida, escolhe uma partição aleatoriamente por lote ou registros.

During normal operation, a Kafka consumer preserves the order of records inside each partition assigned to it. SmallRye Reactive Messaging keeps this order for processing, unless @Blocking(ordered = false) is used (see Bloqueando o processamento).

Note que, devido aos rebalanceamentos dos consumidores, os consumidores Kafka apenas garantem o processamento pelo menos uma vez de registros individuais, o que significa que os registros não confirmados podem ser processados novamente pelos consumidores.

4.5.1. Listener de Rebalanceamento do Consumidor

Em um grupo de consumidores, à medida que novos membros do grupo chegam e membros antigos saem, as partições são reatribuídas para que cada membro receba uma parte proporcional das partições. Isso é conhecido como rebalanceamento do grupo. Para lidar com a confirmação de deslocamento e as partições atribuídas, você pode fornecer um listener de rebalanceamento de consumidor. Para isso, implemente a interface io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener e exponha-a como um bean CDI com o qualificador @Idenfier. Um caso de uso comum é armazenar o deslocamento em um armazém de dados separado para implementar a semântica exatamente único ou iniciar o processamento em um deslocamento específico.

O listener é chamado sempre que a atribuição de tópico/partição do consumidor é alterada. Por exemplo, quando a aplicação é iniciada, ele invoca call-back partitionsAssigned com o conjunto inicial de tópicos/partições associados ao consumidor. Se, mais tarde, esse conjunto for alterado, ele chama novamente os callbacks partitionsRevoked e partitionsAssigned, para que você possa implementar uma lógica personalizada.

Observe que os métodos do ouvinte de rebalanceamento são chamados a partir da thread de polling do Kafka e bloquearão a thread do chamador até a conclusão. Isso ocorre porque o protocolo de rebalanceamento tem barreiras de sincronização, e o uso de código assíncrono em um ouvinte de rebalanceamento pode ser executado após a barreira de sincronização.

Quando os tópicos/partições são atribuídos ou revogados por um consumidor, o envio de mensagens é interrompido e retomado após a conclusão do rebalanceamento.

Se o ouvinte de rebalanceamento lidar com a confirmação de deslocamento em nome do usuário (usando a estratégia de confirmação NONE), o ouvinte de rebalanceamento deverá confirmar o deslocamento de forma síncrona na chamada de retorno partitionsRevoked. Também recomendamos aplicar a mesma lógica quando a aplicação for interrompida.

Ao contrário dos métodos ConsumerRebalanceListener do Apache Kafka, os métodos io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener transmitem o consumidor Kafka e o conjunto de tópicos/partições.

No exemplo a seguir, configuramos um consumidor que sempre inicia com mensagens de, no máximo, 10 minutos atrás (ou deslocamento 0). Primeiro, precisamos fornecer um bean que implemente io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener e seja anotado com io.smallrye.common.annotation.Identifier. Em seguida, devemos configurar nosso conector de entrada para usar esse bean.

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

Para configurar o conector de entrada para usar o ouvinte fornecido, definimos o identificador do ouvinte de rebalanceamento do consumidor: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

Ou fazer com que o nome do ouvinte seja o mesmo que o ID do grupo:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

A definição do nome do ouvinte de rebalanceamento do consumidor tem precedência sobre a utilização do ID do grupo.

4.5.2. Utilizando grupos de consumidores únicos

Para processar todos os registros de um tópico (desde o seu início), é necessário:

  1. definir auto.deslocamento.reset = earliest

  2. atribuir o seu consumidor a um grupo de consumidores não utilizado por nenhuma outra aplicação.

O Quarkus gera um UUID que muda entre duas execuções (inclusive no modo de desenvolvimento). Assim, você tem certeza de que nenhum outro consumidor o utiliza e recebe um novo ID de grupo exclusivo sempre que a aplicação é iniciada.

Você pode utilizar esse UUID gerado como o grupo de consumidores da seguinte forma:

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}
Se o atributo group.id não estiver definido, a propriedade de configuração quarkus.application.name é utilizada por padrão.

4.5.3. Atribuição manual de tópico-partição

O atributo de canal assign-seek permite atribuir manualmente tópico-partição a um canal de entrada do Kafka, e, opcionalmente, buscar um deslocamento especificado na partição para começar a consumir registros. Se assign-seek for usado, o consumidor não será inscrito dinamicamente nos tópicos, mas, em vez disso, atribuirá estaticamente as partições descritas. No tópico-partição manual o rebalanceamento não ocorre e, portanto, os ouvintes de rebalanceamento nunca são chamados.

O atributo recebe uma lista de triplas separadas por vírgulas: <topic>:<partition>:<deslocamento>.

Por exemplo, a configuração

mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20

atribui o consumidor à:

  • Partição 0 do tópico 'topic', configurando a posição inicial no deslocamento 10.

  • Partição 1 do tópico 'topic2', configurando a posição inicial no deslocamento 20.

O tópico, partição, e deslocamento em cada tripla pode ter as seguintes variações:

  • Se o tópico for omitido, o tópico configurado será usado.

  • Se o deslocamento for omitido, partições são atribuídas para o consumidor mas não serão buscadas para o deslocamento.

  • Se o deslocamento é 0, ele busca o início do tópico-partição.

  • Se o deslocamento é -1, ele busca o fim do tópico-partição.

4.6. Recebendo Registros Kafka em Lotes

Por padrão, os métodos de entrada recebem cada registro do Kafka individualmente. Por trás disso, os clientes consumidores do Kafka consultam o broker constantemente e recebem registros em lotes, apresentados dentro do contêiner ConsumerRecords.

No modo batch, a sua aplicação pode receber todos os registros devolvidos pela consulta do consumidor de uma só vez.

Para tal, é necessário especificar um tipo de contêiner compatível para receber todos os dados:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

O método incoming também pode receber os tipos Message<List<Payload>>, Message<ConsumerRecords<Key, Payload>> e ConsumerRecords<Key, Payload>. Eles fornecem acesso aos detalhes do registro, como offset ou timestamp:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
    for (ConsumerRecord<String, Double> record : records.getPayload()) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

Observe que o processamento bem-sucedido do lote de registros de entrada confirmará os deslocamentos mais recentes de cada partição recebida dentro do lote. A estratégia de confirmação configurada será aplicada somente a esses registros.

Inversamente, se o processamento lançar uma exceção, todas as mensagens são não reconhecidas, aplicando a estratégia de falha a todos os registros dentro do lote.

O Quarkus detecta automaticamente os tipos de lote para os canais de entrada e define a configuração do lote automaticamente. Você pode configurar o modo de lote explicitamente com a propriedade mp.messaging.incoming.$channel.batch.

4.7. Processamento com estado com Ponto de Verificação

A estratégia de submissão checkpoint (ponto de verificação) é uma funcionalidade experimental e pode ser alterada no futuro.

SmallRye Reactive Messaging checkpoint commit strategy allows consumer applications to process messages in a stateful manner, while also respecting Kafka consumer scalability. An incoming channel with checkpoint commit strategy persists consumer offsets on an external state store, such as a relational database or a key-value store. As a result of processing consumed records, the consumer application can accumulate an internal state for each topic-partition assigned to the Kafka consumer. This local state will be periodically persisted to the state store and will be associated with the offset of the record that produced it.

Essa estratégia não confirma nenhum deslocamento para o broker Kafka, portanto, quando novas partições são atribuídas ao consumidor, ou seja, o consumidor é reiniciado ou as instâncias do grupo de consumidores são escalonadas, o consumidor retoma o processamento a partir do último deslocamento do ponto de verificação com seu estado salvo.

O código do consumidor do canal @Incoming pode manipular o estado de processamento por meio da API CheckpointMetadata. Por exemplo, um consumidor que calcula a média móvel dos preços recebidos em um tópico do Kafka teria a seguinte aparência:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class MeanCheckpointConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> record) {
        // Get the `CheckpointMetadata` from the incoming message
        CheckpointMetadata<AveragePrice> checkpoint = CheckpointMetadata.fromMessage(record);

        // `CheckpointMetadata` allows transforming the processing state
        // Applies the given function, starting from the value `0.0` when no previous state exists
        checkpoint.transform(new AveragePrice(), average -> average.update(record.getPayload()), /* persistOnAck */ true);

        // `persistOnAck` flag set to true, ack will persist the processing state
        // associated with the latest offset (per partition).
        return record.ack();
    }

    static class AveragePrice {
        long count;
        double mean;

        AveragePrice update(double newPrice) {
            mean += ((newPrice - mean) / ++count);
            return this;
        }
    }
}

O método transform aplica a função de transformação ao estado atual, produzindo um estado alterado e registrando-o localmente para checkpointing. Por padrão, o estado local é mantido no armazém de estado periodicamente, período especificado por auto.commit.interval.ms, (padrão: 5000). Se o sinalizador persistOnAck for fornecido, o estado mais recente será persistido no armazém de estado ansiosamente no reconhecimento da mensagem. O método setNext funciona de forma semelhante, definindo diretamente o estado mais recente.

A estratégia de confirmação de ponto de verificação rastreia quando um estado de processamento foi mantido pela última vez para cada partição de tópico. Se uma alteração de estado pendente não puder ser mantida por checkpoint.unsynced-state-max-age.ms (padrão: 10000), o canal será marcado como não saudável.

4.7.1. State stores (armazéns de estado)

As implementações de armazém de estado determinam onde e como os estados de processamento são mantidos. Isso é configurado pela propriedade mp.messaging.incoming.[channel-name].checkpoint.state-store. A serialização de objetos de estado depende da implementação do armazém de estado. Para instruir os armazéns de estado para serialização, pode ser necessário configurar o nome da classe dos objetos de estado usando a propriedade mp.messaging.incoming.[channel-name].checkpoint.state-type.

O Quarkus fornece as seguintes implementações de armazém de estado:

  • quarkus-redis: Usa a extensão quarkus-redis-client para manter os estados de processamento. Jackson é usado para serializar o estado de processamento em Json. Para objetos complexos, é necessário configurar a propriedade checkpoint.state-type com o nome da classe do objeto. Por padrão, o armazém de estado usa o cliente Redis padrão, mas se for necessário usar um cliente nomeado, o nome do cliente poderá ser especificado usando a propriedade mp.messaging.incoming.[channel-name].checkpoint.quarkus-redis.client-name. Os estados de processamento serão armazenados no Redis usando o esquema de nomeação de chave [consumer-group-id]:[topic]:[partition].

Por exemplo, a configuração do código anterior seria a seguinte:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-redis
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.MeanCheckpointConsumer.AveragePrice
# ...
# if using a named redis client
mp.messaging.incoming.prices.checkpoint.quarkus-redis.client-name=my-redis
quarkus.redis.my-redis.hosts=redis://localhost:7000
quarkus.redis.my-redis.password=<redis-pwd>
  • quarkus-hibernate-reactive: Usa a extensão quarkus-hibernate-reactive para manter os estados de processamento. Os objetos de estado de processamento devem ser uma entidade da Jakarta Persistence e estender a classe CheckpointEntity, que lida com identificadores de objeto compostos pelo ID do grupo de consumidores, tópico e partição. Portanto, o nome da classe da entidade precisa ser configurado usando a propriedade checkpoint.state-type.

Por exemplo, a configuração do código anterior seria a seguinte:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-reactive
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity

Sendo AveragePriceEntity uma entidade da Jakarta Persistence que estende CheckpointEntity:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

@Entity
public class AveragePriceEntity extends CheckpointEntity {
    public long count;
    public double mean;

    public AveragePriceEntity update(double newPrice) {
        mean += ((newPrice - mean) / ++count);
        return this;
    }
}
  • quarkus-hibernate-orm: Usa a extensão quarkus-hibernate-orm para manter os estados de processamento. É semelhante ao armazém de estado anterior, mas usa o Hibernate ORM em vez do Hibernate Reativo.

Quando configurado, ele pode usar um persistence-unit nomeado para o armazém de estado de ponto de verificação:

mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity
mp.messaging.incoming.prices.checkpoint.quarkus-hibernate-orm.persistence-unit=prices
# ... Setup "prices" persistence unit
quarkus.datasource."prices".db-kind=postgresql
quarkus.datasource."prices".username=<your username>
quarkus.datasource."prices".password=<your password>
quarkus.datasource."prices".jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.hibernate-orm."prices".datasource=prices
quarkus.hibernate-orm."prices".packages=org.acme

Para obter instruções sobre como implementar armazéns de estado personalizados, consulte Implementando Armazéns de Estado.

5. Enviando mensagens para o Kafka

A configuração dos canais de saída do conector Kafka é semelhante à dos canais de entrada:

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.topic=prices (3)
1 Configure a localização do broker para o perfil de produção. Você pode configurá-lo globalmente ou por canal usando a propriedade mp.messaging.outgoing.$channel.bootstrap.servers. No modo de desenvolvimento e ao executar testes, Dev Services para o Kafka inicia automaticamente um broker Kafka. Quando não fornecida, esta propriedade tem o valor padrão localhost:9092.
2 Configure o conector para gerenciar o canal prices-out.
3 Por padrão, o nome do tópico é igual ao nome do canal. Você pode configurar o atributo de tópico para o substituir.

Dentro da configuração da aplicação, os nomes dos canais são exclusivos. Portanto, se quiser configurar um canal de entrada e de saída no mesmo tópico, você precisará nomear os canais de forma diferente (como nos exemplos deste guia, mp.messaging.incoming.prices e mp.messaging.outgoing.prices-out).

Em seguida, sua aplicação pode gerar mensagens e publicá-las no canal prices-out. Ele pode usar os conteúdos do double, como no trecho a seguir:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

Você não deve chamar métodos anotados com @Incoming e/ou @Outgoing diretamente a partir do seu código. Eles são invocados pela estrutura. O fato do código do usuário os invocar não teria o resultado esperado.

Observe que o método generate retorna um Multi<Double>, que implementa a interface Publisher do Reactive Streams. Esse publicador será usado pela estrutura para gerar mensagens e enviá-las ao tópico Kafka configurado.

Em vez de devolver uma conteúdo, você pode devolver um io.smallrye.reactive.messaging.kafka.Record para enviar pares de chave/valor:

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

O conteúdo pode ser envolvido em uma org.eclipse.microprofile.reactive.messaging.Message para ter mais controle sobre os registros escritos:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata permite definir atributos de metadados do registro do Kafka, como key, topic, partition ou timestamp. Um caso de uso é selecionar dinamicamente o tópico de destino de uma mensagem. Nesse caso, em vez de configurar o tópico dentro do arquivo de configuração da aplicação, você precisa usar os metadados de saída para definir o nome do tópico.

Além das assinaturas de método que retornam um Publisher do Reactive Stream (Multi é uma implementação de Publisher), o método de saída também pode retornar uma única mensagem. Nesse caso, o produtor usará esse método como gerador para criar um fluxo infinito.

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.1. Enviando de mensagens com o Emitter

Às vezes, você precisa ter uma forma imperativa de enviar mensagens.

Por exemplo, se você precisar enviar uma mensagem para um fluxo ao receber uma solicitação POST dentro de um endpoint REST. Nesse caso, você não pode usar @Outgoing porque seu método tem parâmetros.

Para tal, pode utilizar um Emitter.

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

O envio de um conteúdo devolve um CompletionStage, concluído quando a mensagem é recebida. Se a transmissão da mensagem falhar, o CompletionStage é completado excepcionalmente com a razão do não reconhecimento.

A configuração de Emitter é efetuada da mesma forma que a outra configuração de fluxo utilizada por @Incoming e @Outgoing.

Usando o Emitter, você envia mensagens do seu código imperativo para mensagens reativas. Essas mensagens são armazenadas em uma fila até serem enviadas. Se o cliente produtor do Kafka não conseguir acompanhar as mensagens que tentam ser enviadas para o Kafka, essa fila pode se tornar um consumidor de memória e você pode até ficar sem memória. Você pode usar @OnOverflow para configurar a estratégia de contrapressão (back-pressure). Ele permite que você configure o tamanho da fila (o padrão é 256) e a estratégia a ser aplicada quando o tamanho do buffer for atingido. As estratégias disponíveis são DROP, LATEST, FAIL, BUFFER, UNBOUNDED_BUFFER e NONE.

Com a API Emitter, você também pode encapsular o payload de saída dentro de Message<T>. Tal como nos exemplos anteriores, Message te permite tratar os casos de reconhecimento/não reconhecimento de forma diferente.

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

Se preferir usar APIs de fluxo reativo, você pode usar MutinyEmitter que retornará Uni<Void> do método send. Portanto, você pode usar as APIs do Mutiny para lidar com mensagens e erros downstream.

import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

Também é possível bloquear o envio do evento para o emissor com o método sendAndAwait. Ele só retornará do método quando o evento for aceito ou bloqueado pelo receptor.

Depreciação

As classes io.smallrye.reactive.messaging.annotations.Emitter, io.smallrye.reactive.messaging.annotations.Channel e io.smallrye.reactive.messaging.annotations.OnOverflow são agora obsoletas e substituídas por:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

O novo método Emitter.send devolve um CompletionStage concluído quando a mensagem produzida é confirmada.

Depreciação

MutinyEmitter#send(Message msg) está obsoleto em favor dos seguintes métodos que recebem Message para emitir:

  • <M extends Message<? extends T>> Uni<Void> sendMessage(M msg)

  • <M extends Message<? extends T>> void sendMessageAndAwait(M msg)

  • <M extends Message<? extends T>> Cancellable sendMessageAndForget(M msg)

Mais informações sobre como usar Emitter podem ser encontradas em Mensageria Reativa do SmallRye - Emissores e Canais

5.2. Escrever Reconhecimento

Quando o broker do Kafka recebe um registro, seu reconhecimento pode demorar, dependendo da configuração. Além disso, ele armazena na memória os registros que não podem ser gravados.

Por padrão, o conector espera que o Kafka confirme o registro para continuar o processamento (reconhecendo a mensagem recebida). Você pode desativar isso definindo o atributo waitForWriteCompletion como false.

Note que o atributo acks tem um enorme impacto no reconhecimento do registro.

Se não for possível escrever um registro, a mensagem é não reconhecida.

5.3. Contrapressão

O conector de saída do Kafka lida com a contrapressão, monitorando o número de mensagens em trânsito que aguardam gravação no broker do Kafka. O número de mensagens em trânsito é configurado usando o atributo max-inflight-messages e o padrão é 1024.

O conector envia apenas essa quantidade de mensagens ao mesmo tempo. Nenhuma outra mensagem será enviada até que pelo menos uma mensagem em andamento seja confirmada pelo broker. Em seguida, o conector grava uma nova mensagem no Kafka quando uma das mensagens em andamento do broker é reconhecida. Certifique-se de configurar o batch.size e o linger.ms do Kafka adequadamente.

Você também pode remover o limite de mensagens em andamento definindo max-inflight-messages como 0. No entanto, observe que o produtor do Kafka poderá bloquear se o número de solicitações atingir max.in.flight.requests.per.connection.

5.4. Nova tentativa de envio de mensagens

Quando o produtor Kafka recebe um erro do servidor, se for um erro transitório e recuperável, o cliente tentará enviar novamente o lote de mensagens. Esse comportamento é controlado pelos parâmetros retries e retry.backoff.ms. Além disso, a Mensageria Reativa do SmallRye tentará enviar novamente mensagens individuais em erros recuperáveis, dependendo dos parâmetros retries e delivery.timeout.ms.

Observe que, embora ter novas tentativas em um sistema confiável seja uma prática recomendada, o parâmetro max.in.flight.requests.per.connection tem como padrão 5, o que significa que a ordem das mensagens não é garantida. Se a ordem das mensagens for imprescindível para o seu caso de uso, definir max.in.flight.requests.per.connection como 1 garantirá que um único lote de mensagens seja enviado por vez, às custas de limitar a taxa de transferência do produtor.

Para aplicar um mecanismo de retentativa em erros de processamento, consulte a seção sobre Repetindo o processamento.

5.5. Tratando Falhas de Serialização

Para o cliente produtor do Kafka, as falhas de serialização não são recuperáveis e, portanto, o envio da mensagem não é repetido. Nesses casos, talvez seja necessário aplicar uma estratégia de falha para o serializador. Para isso, você precisa criar um bean que implemente a interface SerializationFailureHandler<T>:

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();
    }
}

Para utilizar este manipulador de falhas, o bean deve ser exposto com o qualificador @Identifier e a configuração do conector deve especificar o atributo mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler (para serializadores de chave ou de valor).

O manipulador é chamado com detalhes da serialização, incluindo a ação representada como Uni<byte[]>. Observe que o método deve aguardar o resultado e retornar o vetor de bytes serializado.

5.6. Canais na memória

Em alguns casos de uso, é conveniente usar os padrões de mensagens para transferir mensagens dentro da mesma aplicação. Quando você não conecta um canal a um backend de mensagens como o Kafka, tudo acontece na memória, e os fluxos são criados encadeando métodos. Cada cadeia ainda é um fluxo reativo e aplica o protocolo de contrapressão.

A estrutura verifica se a cadeia produtor/consumidor está completa, o que significa que, se a aplicação gravar mensagens em um canal na memória (usando um método com apenas @Outgoing, ou um Emitter), ele também deverá consumir as mensagens de dentro da aplicação (usando um método com apenas @Incoming ou usando um fluxo não gerenciado).

5.7. Difusão de mensagens em vários consumidores

Por padrão, um canal pode ser vinculado a um único consumidor, usando o método @Incoming ou o fluxo reativo @Channel. Na inicialização da aplicação, os canais são verificados para formar uma cadeia de consumidores e produtores com um único consumidor e produtor. Você pode substituir esse comportamento definindo mp.messaging.$channel.broadcast=true em um canal.

No caso dos canais na memória, a anotação @Broadcast pode ser utilizada no método @Outgoing. Por exemplo,

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

Reciprocamente, vários produtores no mesmo canal podem ser mesclados com a configuração mp.messaging.incoming.$channel.merge=true. Nos métodos @Incoming, você pode controlar como vários canais são mesclados usando a anotação @Merge.

A repetição da anotação @Outgoing nos métodos de saída ou de processamento permite outra maneira de enviar mensagens para vários canais de saída:

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MultipleProducers {

    private final Random random = new Random();

    @Outgoing("generated")
    @Outgoing("generated-2")
    double priceBroadcast() {
        return random.nextDouble();
    }

}

No exemplo anterior, o preço gerado será transmitido para ambos os canais de saída. O exemplo a seguir envia seletivamente mensagens para vários canais de saída usando o objeto de contêiner Targeted, contendo key como nome do canal e value como conteúdo da mensagem.

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.Targeted;

@ApplicationScoped
public class TargetedProducers {

    @Incoming("in")
    @Outgoing("out1")
    @Outgoing("out2")
    @Outgoing("out3")
    public Targeted process(double price) {
        Targeted targeted = Targeted.of("out1", "Price: " + price,
                "out2", "Quote: " + price);
        if (price > 90.0) {
            return targeted.with("out3", price);
        }
        return targeted;
    }

}

Observe que a detecção automática para serializadores do Kafka não funciona para assinaturas usando o Targeted.

Informações detalhadas sobre a utilização da anotação @Blocking podem ser encontradas em Mensageria Reativa do SmallRye - Lindando com execução blocante.

5.8. Transações Kafka

As transações do Kafka permitem gravações atômicas em vários tópicos e partições do Kafka. O conector Kafka fornece o emissor personalizado KafkaTransactions para gravar registros do Kafka dentro de uma transação. Ele pode ser injetado como um emissor regular @Channel:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaTransactionalProducer {

    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;

    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();
        });
    }

}

A função dada ao método withTransaction recebe um TransactionalEmitter para produzir registros e devolve um Uni que fornece o resultado da transação.

  • Se o processamento for concluído com êxito, o produtor é descarregado e a transação é confirmada.

  • Se o processamento lançar uma exceção, retornar uma Uni de falha, ou marcar o TransactionalEmitter para abortar, a transação é abortada.

Os produtores transacionais do Kafka exigem a configuração da propriedade do cliente acks=all e um ID exclusivo para transactional.id, o que implica enable.idempotence=true. Quando o Quarkus detecta o uso de KafkaTransactions para um canal de saída, ele configura essas propriedades no canal, fornecendo um valor padrão de "${quarkus.application.name}-${channelName}" para a propriedade transactional.id.

Note que, para utilização em produção, o transactional.id deve ser único em todas as instâncias da aplicação.

Enquanto um emissor de mensagens normal suportaria chamadas simultâneas para os métodos send e, consequentemente, colocaria em fila as mensagens de saída a serem gravadas no Kafka, um emissor KafkaTransactions suporta apenas uma transação por vez. Uma transação é considerada em andamento desde a chamada para o withTransaction até o retorno do Uni resultar em sucesso ou falha. Enquanto uma transação estiver em andamento, as chamadas subsequentes para withTransaction, inclusive as aninhadas dentro da função fornecida, lançarão IllegalStateException.

Observe que, na Mensageria Reativa, a execução dos métodos de processamento já é serializada, a menos que @Blocking(ordered = false) seja usado. Se o withTransaction puder ser chamado simultaneamente, por exemplo, de um endpoint REST, é recomendável limitar a concorrência da execução. Isso pode ser feito usando a anotação @Bulkhead do Tolerância a Falhas do Microprofile.

5.8.1. Consumidores conscientes das transações

Se você pretende consumir apenas registros escritos e confirmados no âmbito de uma transação Kafka, terá de configurar a propriedade isolation.level no canal de entrada como:

mp.messaging.incoming.prices-in.isolation.level=read_committed

6. Requisição-Resposta do Kafka

O padrão de Requisição-Resposta do Kafka permite publicar um registro de requisição em um tópico do Kafka e, em seguida, aguardar um registro de resposta que responda à requisição inicial. O conector Kafka fornece o emissor personalizado KafkaRequestReply, que implementa o solicitante (ou cliente) do padrão de requisição-resposta para canais de saída do Kafka:

Ele pode ser injetado como um emissor @Channel regular:

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {

    @Channel("request-reply")
    KafkaRequestReply<Integer, String> requestReply;

    @POST
    @Path("/req-rep")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> post(Integer request) {
        return requestReply.request(request);
    }

}

O método de requisição publica o registro no tópico de destino configurado do canal de saída e consulta um tópico de resposta (por padrão, o tópico de destino com o sufixo -replies) em busca de um registro de resposta. Quando a resposta é recebida, o Uni retornado é concluído com o valor do registro. A operação de envio da requisição gera um ID de correlação e define um cabeçalho (por padrão, REPLY_CORRELATION_ID), que espera que seja enviado de volta no registro de resposta.

O respondendor pode ser implementado usando um processador de Mensageria Reativa (consulte Processando Mensagens).

For more information on Kafka Request Reply feature and advanced configuration options, see the SmallRye Reactive Messaging Documentation.

7. Processando Mensagens

As aplicações que transmitem dados geralmente precisam consumir alguns eventos de um tópico, processá-los e publicar o resultado em um tópico diferente. Um método processador pode ser implementado de forma simples usando as anotações @Incoming e @Outgoing:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

O parâmetro do método process é o conteúdo da mensagem de entrada, enquanto o valor de retorno será usado como conteúdo da mensagem de saída. As assinaturas mencionadas anteriormente para os tipos de parâmetro e retorno também são compatíveis, como Message<T>, Record<K, V>, etc.

É possível aplicar o processamento assíncrono de fluxos consumindo e devolvendo o tipo de fluxo reativo Multi<T>:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

7.1. Propagando a Chave de Registro

Ao processar mensagens, é possível propagar a chave do registro de entrada para o registro de saída.

Ativada com a configuração mp.messaging.outgoing.$channel.propagate-record-key=true, a propagação da chave de registro produz o registro de saída com a mesma chave do registro de entrada.

Se o registro de saída já contiver uma chave, ela não será substituída pela chave do registro de entrada. Se o registro de entrada tiver uma chave nula, será usada a propriedade mp.messaging.outgoing.$channel.key.

7.2. Processamento Exactly-Once (Exatamente Único)

O Kafka Transactions permite gerenciar os deslocamentos do consumidor dentro de uma transação, juntamente com as mensagens produzidas. Isso permite acoplar um consumidor a um produtor transacional em um padrão consume-transforma-produz, também conhecido como processamento exatamente único.

O emissor personalizado KafkaTransactions fornece uma forma de aplicar um processamento exatamente único a uma mensagem Kafka de entrada dentro de uma transação.

O exemplo seguinte inclui um lote de registros Kafka dentro de uma transação.

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaExactlyOnceProcessor {

    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;

    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
                emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); (3)
            }
            return Uni.createFrom().voidItem();
        });
    }

}
1 Recomenda-se usar o processamento exatamente único junto com o modo de consumo em lote. Embora seja possível usá-lo com uma única mensagem do Kafka, isso terá um impacto significativo no desempenho.
2 A mensagem consumida é passada para o KafkaTransactions#withTransactionAndAck para lidar com os registros de offset e confirmação de mensagem.
3 O método send grava registros no Kafka dentro da transação, sem aguardar o recebimento do envio pelo broker. As mensagens pendentes de gravação no Kafka serão armazenadas em buffer e liberadas antes de confirmar a transação. Portanto, é recomendável configurar o @OnOverflow bufferSize para que caibam mensagens suficientes, por exemplo, o max.poll.records, quantidade máxima de registros retornados em um lote.
  • Se o processamento for concluído com êxito, antes de confirmar a transação, os deslocamentos da partição de tópicos da mensagem de lote indicada serão confirmados na transação.

  • Se o processamento precisar de ser abortado, depois de abortar a transação, a posição do consumidor é redefinida para o último deslocamento confirmado, retomando efetivamente o consumo a partir desse deslocamento. Se nenhum deslocamento do consumidor tiver sido comprometido com uma partição de tópico, a posição do consumidor é redefinida para o início da partição de tópico, mesmo que a política de redefinição de deslocamento seja `latest`.

Ao usar o processamento exatamente único, as confirmações de deslocamento de mensagens consumidas são tratadas pela transação e, portanto, a aplicação não deve confirmar os deslocamentos por outros meios. O consumidor deve ter enable.auto.commit=false (o padrão) e definir explicitamente commit-strategy=ignore:

mp.messaging.incoming.prices-in.commit-strategy=ignore
mp.messaging.incoming.prices-in.failure-strategy=ignore

7.2.1. Tratamento de erros para o processamento exatamente único

O Uni retornado do KafkaTransactions#withTransaction produzirá uma falha se a transação falhar e for abortada. A aplicação pode optar por tratar o caso de erro, mas se um Uni com falha for retornado do método @Incoming, o canal de entrada falhará efetivamente e interromperá o fluxo reativo.

O método KafkaTransactions#withTransactionAndAck reconhece e não reconhece a mensagem, mas não retornará um Uni com falha. As mensagens não reconhecidas serão tratadas pela estratégia de falha do canal de entrada (veja Estratégias de Tratamento de Erros). Configurar failure-strategy=ignore simplesmente redefine o consumidor Kafka para os últimos deslocamentos confirmados e retoma o consumo a partir daí.

8. Acessando clientes Kafka diretamente

Em casos raros, você pode precisar acessar os clientes Kafka subjacentes. KafkaClientService fornece acesso thread-safe a Producer e Consumer.

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

A KafkaClientService é uma API experimental e pode sofrer alterações no futuro.

Você também pode obter a configuração do Kafka injetada na sua aplicação e criar diretamente clientes produtores, consumidores e administradores do Kafka:

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

O mapa de configuração default-kafka-broker contém todas as propriedades da aplicação prefixadas com kafka. ou KAFKA_. Para mais opções de configuração, consulte Resolução de Configuração do Kafka.

9. Serialização JSON

O Quarkus tem capacidades incorporadas para lidar com mensagens JSON Kafka.

Imagine que temos uma classe de dados Fruit da seguinte forma:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

E queremos utilizá-la para receber mensagens do Kafka, fazer alguma transformação de preços e enviar mensagens de volta para o Kafka.

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

Para isso, precisamos configurar a serialização JSON com Jackson ou JSON-B.

Com a serialização JSON corretamente configurada, também é possível utilizar Publisher<Fruit> e Emitter<Fruit>.

9.1. Serialização via Jackson

O Quarkus possui suporte embutido para serialização e desserialização JSON baseado no Jackson. Ele também irá gerar o serializador e desserializador para você, então você não precisa configurar nada. Quando a geração está desativada, você pode usar os ObjectMapperSerializer e ObjectMapperDeserializer fornecidos, como explicado abaixo.

Existe um ObjectMapperSerializer que pode ser usado para serializar todos os objetos de dados via Jackson. Você pode criar uma subclasse vazia se quiser usar Detecção automática de serializador/desserializador.

Por padrão, o ObjectMapperSerializer serializa null como a String "null". Isso pode ser personalizado com a definição da propriedade de configuração do Kafka json.serialize.null-as-null=true, que serializará null como null. Isso é útil quando se usa um tópico compactado, pois null é usado como uma tombstone para saber quais mensagens são excluídas durante a fase de compactação.

A classe correspondente do desserializador precisa ser uma subclasse. Portanto, vamos criar um FruitDeserializer que estende o ObjectMapperDeserializer.

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

Por fim, configure os seus canais para utilizarem o serializador e desserializador Jackson.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Agora, suas mensagens Kafka conterão uma representação serializada pelo Jackson do seu objeto de dados Fruit. Nesse caso, a configuração do deserializer não é necessária, pois a Detecção automática de serializador/desserializador está habilitada por padrão.

Se você pretende desserializar uma lista de fruits, tem de criar um desserializador com um Jackson TypeReference denotando a coleção genérica utilizada.

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
    }
}

9.2. Serialização via JSON-B

Em primeiro lugar, é necessário incluir a extensão quarkus-jsonb.

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-jsonb")

Existe um JsonbSerializer que pode ser usado para serializar todos os objetos de dados via JSON-B. Você pode criar uma subclasse vazia se quiser usar Detecção automática de serializador/desserializador.

Por padrão, o JsonbSerializer serializa null como a String "null". Isso pode ser personalizado com a definição da propriedade de configuração do Kafka json.serialize.null-as-null=true, que serializará null como null. Isso é útil quando se usa um tópico compactado, pois null é usado como uma tombstone para saber quais mensagens são excluídas durante a fase de compactação.

A classe correspondente do desserializador precisa ser uma subclasse. Portanto, vamos criar um FruitDeserializer que estende o genérico JsonbDeserializer.

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

Por fim, configure os seus canais para utilizarem o serializador e desserializador JSON-B.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

Agora, as suas mensagens Kafka conterão uma representação serializada JSON-B do seu objeto de dados Fruit.

Para desserializar uma lista de fruits, é necessário criar um desserializador com um Tipo denotando a coleção genérica utilizada.

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());
    }
}
Se não quiser criar um desserializador para cada objeto de dados, você pode usar o genérico io.vertx.kafka.client.serialization.JsonObjectDeserializer que desserializará para um io.vertx.core.json.JsonObject. O serializador correspondente também pode ser usado: io.vertx.kafka.client.serialization.JsonObjectSerializer.

10. Serialização Avro

Isso é descrito em um guia dedicado: Usando Apache Kafka com Registro de Esquema e o Avro.

11. Serialização do Esquema JSON

Isso é descrito em um guia dedicado: Usando Apache Kafka com Registro de Esquema e Esquema JSON.

12. Detecção automática de serializador/desserializador

Ao usar a Mensageria Quarkus com Kafka (io.quarkus:quarkus-messaging-kafka), o Quarkus pode frequentemente detectar automaticamente a classe correta de serializador e desserializador. Essa detecção automática é baseada nas declarações dos métodos @Incoming e @Outgoing, assim como nos @Channels injetados.

Por exemplo, se você declarar

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

e a sua configuração indicar que o canal generated-price utiliza o conector smallrye-kafka, o Quarkus definirá automaticamente o value.serializer para o IntegerSerializer incorporado no Kafka.

Da mesma forma, se você declarar

@Incoming("my-kafka-records")
public void consume(Record<Long, byte[]> record) {
    ...
}

e a sua configuração indicar que o canal my-kafka-records utiliza o conector smallrye-kafka, então o Quarkus definirá automaticamente o key.deserializer para o LongDeserializer incorporado no Kafka, bem como o value.deserializer para ByteArrayDeserializer.

Finalmente, se você declarar

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

e a sua configuração indicar que o canal price-create utiliza o conector smallrye-kafka, o Quarkus definirá automaticamente o value.serializer para o DoubleSerializer incorporado no Kafka.

O conjunto completo de tipos suportados pela autodetecção do serializador/desserializador é:

  • short e java.lang.Short

  • int e java.lang.Integer

  • long e java.lang.Long

  • float e java.lang.Float

  • double e java.lang.Double

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • classes para as quais existe uma implementação direta de org.apache.kafka.common.serialization.Serializer<T> / org.apache.kafka.common.serialization.Deserializer<T>.

    • a implementação tem de especificar o argumento do tipo T como o tipo (des)serializado.

  • classes geradas a partir de esquemas Avro, bem como Avro GenericRecord, se o Confluent ou o Apicurio Registry serde estiver presente

    • no caso de existirem vários serdes Avro, o serializador/desserializador deve ser configurado manualmente para as classes geradas pelo Avro, uma vez que a detecção automática é impossível

    • consulte Usando Apache Kafka com Registro de Esquema e Avro para obter mais informações sobre a utilização das bibliotecas Confluent ou Apicurio Registry

  • classes para as quais uma subclasse de ObjectMapperSerializer / ObjectMapperDeserializer está presente, conforme descrito em Serialização via Jackson

    • tecnicamente não é necessário criar subclasse de ObjectMapperSerializer, mas, nesse caso, a detecção automática não é possível

  • classes para as quais uma subclasse de JsonbSerializer / JsonbDeserializer está presente, conforme descrito em Serialização via JSON-B

    • tecnicamente não é necessário criar subclasse de JsonbSerializer, mas, nesse caso, a detecção automática não é possível

Se um serializador/desserializador for definido pela configuração, não será substituído pela detecção automática.

Se você tiver algum problema com a detecção automática de serializadores, pode desativá-la completamente definindo quarkus.messaging.kafka.serializer-autodetection.enabled=false. Se achar que precisa fazer isso, por favor, registre um bug no Quarkus issue tracker para que possamos corrigir o problema.

13. Geração de serializador/desserializador JSON

O Quarkus gera automaticamente serializadores e desserializadores para canais onde:

  1. o serializador/desserializador não está configurado

  2. a detecção automática não encontrou um serializador/desserializador correspondente

Utiliza Jackson por baixo.

Esta geração pode ser desativada utilizando:

quarkus.messaging.kafka.serializer-generation.enabled=false
A geração não suporta coleções como List<Fruit>. Consulte Serialização via Jackson para escrever seu próprio serializador/desserializador para este caso.

14. Usando Registro de Esquemas

Isso é descrito em um guia dedicado para Avro: Usando Apache Kafka com Registro de Esquema e Avro. E um outro para Esquema JSON: Usando Apache Kafka com Registro de Esquema e Esquema JSON.

15. Verificações de Integridade

O Quarkus fornece várias verificações de integridade para o Kafka. Essas verificações são usadas em conjunto com a extensão quarkus-smallrye-health.

15.1. Verificação de Prontidão do Broker Kafka

Ao usar a extensão quarkus-kafka-client, você pode ativar a verificação de integridade da prontidão definindo a propriedade quarkus.kafka.health.enabled como true no seu application.properties. Essa verificação informa o status da interação com um broker Kafka padrão (configurado usando kafka.bootstrap.servers). Ela requer uma conexão de administrador com o broker Kafka e está desativada por padrão. Se estiver ativada, quando o usuário acessar o endpoint /q/health/ready da sua aplicação, terá informações sobre o status de validação da conexão.

15.2. Verificações de Integridade da Mensageria Reativa do Kafka

Quando usar Mensageria Reativa e o conector Kafka, cada canal configurado (de entrada ou de saída) fornece verificações de inicialização, vivacidade e prontidão.

  • A verificação de inicialização verifica se a comunicação com o cluster Kafka está estabelecida.

  • A verificação de vivacidade capta qualquer falha irrecuperável que ocorra durante a comunicação com o Kafka.

  • A verificação de prontidão verifica se o conector Kafka está pronto para consumir/produzir mensagens para os tópicos Kafka configurados.

Para cada canal, é possível desativar as verificações utilizando:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
Você pode configurar o bootstrap.servers para cada canal usando a propriedade mp.messaging.incoming|outgoing.$channel.bootstrap.servers. O padrão é kafka.bootstrap.servers.

As verificações de inicialização e prontidão da Mensageria Reativa oferecem duas estratégias. A estratégia padrão verifica se uma conexão ativa foi estabelecida com o broker. Essa abordagem não é intrusiva, pois se baseia em métricas incorporadas do cliente Kafka.

Usando o atributo health-topic-verification-enabled=true, a sonda de inicialização usa um cliente administrador para verificar a lista de tópicos. Já a sonda de prontidão para um canal de entrada verifica se pelo menos uma partição está atribuída para consumo e, para um canal de saída, verifica se o tópico usado pelo produtor existe no broker.

Observe que, para isso, é necessária uma conexão de administrador. Você pode ajustar o tempo limite das chamadas de verificação de tópico para o broker usando a configuração health-topic-verification-timeout.

16. Observabilidade

Se a extensão OpenTelemetry estiver presente, os canais do conector Kafka funcionam sem problemas com o OpenTelemetry Tracing. As mensagens escritas nos tópicos Kafka propagam o span de rastreamento atual. Nos canais de entrada, se um registro Kafka consumido contiver informações de rastreamento, o processamento da mensagem herda o span da mensagem como pai.

O rastreamento pode ser desativado explicitamente por canal:

mp.messaging.incoming.data.tracing-enabled=false

Se a extensão Micrometer está presente, então as métricas dos clientes produtor e consumidor do Kafka são expostas como medidores do Micrometer.

16.1. Métricas do canal

Métricas específicas por canal também podem ser coletadas e expostas como medidores do Micrometer. As seguintes métricas podem ser coletadas por canal, identificadas com a tag channel:

  • quarkus.messaging.message.count : O número de mensagens produzidas ou recebidas

  • quarkus.messaging.message.acks : Número de mensagens processadas com êxito

  • quarkus.messaging.message.failures : Número de mensagens processadas com falhas

  • quarkus.messaging.message.duration : A duração do processamento da mensagem.

Por motivos de compatibilidade com versões anteriores, as métricas de canal não são ativadas por padrão e podem ser ativadas com:

A observação de mensagens depende da interceptação de mensagens e, portanto, não suporta canais que consomem mensagens com um tipo de mensagem personalizado, como IncomingKafkaRecord, KafkaRecord, IncomingKafkaRecordBatch ou KafkaRecordBatch.

A interceptação e observação de mensagens ainda funcionam com canais que consomem o tipo genérico Message ou payloads personalizados habilitados pelos conversores.

smallrye.messaging.observation.enabled=true

17. Fluxos Kafka

Isso é descrito em um guia dedicado: Usando Fluxos do Apache Kafka.

18. Usando Snappy para compressão de mensagens

Nos canais de saída, você pode ativar a compressão Snappy definindo o atributo compression.type para snappy:

mp.messaging.outgoing.fruit-out.compression.type=snappy

No modo JVM, ele funcionará imediatamente. No entanto, para compilar sua aplicação em um executável nativo, você precisa adicionar quarkus.kafka.snappy.enabled=true ao seu application.properties.

No modo nativo, o Snappy está desativado por padrão, uma vez que a utilização do Snappy requer a incorporação de uma biblioteca nativa e a sua descompactação quando a aplicação é iniciada.

19. Autenticação com OAuth

Se o seu broker Kafka usar o OAuth como mecanismo de autenticação, você precisará configurar o consumidor Kafka para habilitar esse processo de autenticação. Primeiro, adicione a seguinte dependência à sua aplicação:

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>
<!-- if compiling to native you'd need also the following dependency -->
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-common</artifactId>
</dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")
// if compiling to native you'd need also the following dependency
implementation("io.strimzi:kafka-oauth-common")

Essa dependência fornece o manipulador de retorno de chamada necessário para lidar com o fluxo de trabalho do OAuth. Em seguida, em application.properties, adicione:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

Atualize os valores oauth.client.id, oauth.client.secret e oauth.token.endpoint.uri.

A autenticação OAuth funciona nos modos JVM e nativo. Como o SSL não é habilitado por padrão no modo nativo, quarkus.ssl.native=true deve ser adicionado para dar suporte ao JaasClientOauthLoginCallbackHandler, que usa SSL. (Consulte o guia Usando SSL com Executáveis Nativos para obter mais detalhes.)

20. TLS Configuration

Kafka client extension integrates with the Quarkus TLS registry to configure clients.

To configure the TLS for the default Kafka configuration, you need to provide a named TLS configuration in the application.properties:

quarkus.tls.your-tls-config.trust-store.pem.certs=target/certs/kafka.crt,target/certs/kafka-ca.crt
# ...
kafka.tls-configuration-name=your-tls-config
# enable ssl security protocol
kafka.security.protocol=ssl

This will in turn provide the Kafka client with a ssl.engine.factory.class implementation.

Make sure also to enable the SSL channel security protocol using the security.protocol property configured to SSL or SASL_SSL.

Quarkus Messaging channels can be configured individually to use a specific TLS configuration:

mp.messaging.incoming.your-channel.tls-configuration-name=your-tls-config
mp.messaging.incoming.your-channel.security.protocol=ssl

21. Testando uma aplicação Kafka

21.1. Testando sem um broker

Pode ser útil testar a aplicação sem ter que iniciar um broker Kafka. Para isso, você pode trocar os canais gerenciados pelo conector Kafka para a em memória.

Esta abordagem só funciona para testes JVM. Não pode ser utilizada para testes nativos (porque estes não suportam a injeção).

Digamos que queremos testar a seguinte aplicação de processador:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

Em primeiro lugar, adicione a seguinte dependência de teste à sua aplicação:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

Em seguida, crie um recurso de teste Quarkus da seguinte forma:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 Mude o canal de entrada orders (esperando mensagens do Kafka) para em memória.
2 Mude o canal de saída beverages (escrever mensagens para o Kafka) para em memória.
3 Constrói e devolve um Map que contém todas as propriedades necessárias para configurar a aplicação para utilizar canais em memória.
4 Quando o teste parar, limpe o InMemoryConnector (elimine todas as mensagens recebidas e enviadas)

Crie um teste Quarkus utilizando o recurso de teste criado acima:

import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 Injete o conector em memória na sua classe de teste.
2 Recupere o canal de entrada (orders) - o canal deve ter sido trocado para em memória no recurso de teste.
3 Recuperar o canal de saída (beverages) - o canal deve ter sido trocado para em memória no recurso de teste.
4 Use o método send para enviar uma mensagem ao canal orders. A aplicação processará essa mensagem e enviará uma mensagem ao canal beverages.
5 Utilize o método received no canal beverages para verificar as mensagens produzidas pela aplicação.

Se o seu consumidor Kafka for baseado em lotes, você precisará enviar um lote de mensagens para o canal, criando-as manualmente.

Por exemplo:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
        System.out.println("Order received " + orders.getPayload().size());
        return orders.ack();
    }
}
import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")

    InMemoryConnector connector;

    @Test
    void testProcessOrder() {
        InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
        var committed = new AtomicBoolean(false);  (1)
        var commitHandler = new KafkaCommitHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
                committed.set(true);  (2)
                return null;
            }
        };
        var failureHandler = new KafkaFailureHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
                return null;
            }
        };

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
        var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
        var batch = new IncomingKafkaRecordBatch<>(
            records, "kafka", 0, commitHandler, failureHandler, false, false);  (3)

        ordersIn.send(batch);

        await().until(committed::get);  (4)
    }
}
1 Crie um AtomicBoolean para rastrear se o lote foi confirmado.
2 Atualize committed quando o lote for confirmado.
3 Crie um IncomingKafkaRecordBatch com um único registro.
4 Aguarde até que o lote seja confirmado.

Com os canais em memória, pudemos testar o código da aplicação processando mensagens sem iniciar um broker Kafka. Note que os diferentes canais em memória são independentes, e a troca do conector do canal para em memória não simula a entrega de mensagens entre canais configurados para o mesmo tópico do Kafka.

21.1.1. Propagação de contexto com InMemoryConnector

Por padrão, canais em memória enviam mensagens na thread do chamador, que seria a thread principal em testes de unidade.

A dependência quarkus-test-vertx provê a anotação @io.quarkus.test.vertx.RunOnVertxContext, que quando usada em um método de teste, executa o teste em um contexto do Vert.x.

No entando, a maioria dos outros conectores lidam com a propagação de contexto enviando mensagens em contextos Vert.x duplicados separados.

Se os seus testes são dependentes da propagação de contexto, você pode configurar os canais de conector em memória com o atributo run-on-vertx-context para enviar eventos, incluindo mensagens e reconhecimentos, em um contexto Vert.x. Alternativamente você pode trocar esse comportamento usando o método InMemorySource#runOnVertxContext.

21.2. Testando usando um broker Kafka

Se você estiver usando Dev Services para o Kafka, um broker Kafka será iniciado e estará disponível durante os testes, a menos que seja desativado no perfil %test. Embora seja possível conectar-se a este broker usando a API do Kafka Clients, A Biblioteca Companheira do Kafka propõe uma forma mais fácil de interagir com um broker Kafka e criar ações de consumidor, produtor e administrador dentro dos testes.

Para utilizar a API KafkaCompanion nos testes, comece adicionando a seguinte dependência:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

que fornece io.quarkus.test.kafka.KafkaCompanionResource - uma implementação de io.quarkus.test.common.QuarkusTestResourceLifecycleManager.

Em seguida, utilize @QuarkusTestResource para configurar o Kafka Companion em testes, por exemplo:

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {

    @InjectKafkaCompanion (1)
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());
    }
}
1 @InjectKafkaCompanion injeta a instância KafkaCompanion, configurada para acessar o broker Kafka criado para os testes.
2 Utilize KafkaCompanion para criar uma tarefa de produção que escreva 10 registros no tópico 'orders'.
3 Crie uma tarefa de consumidor que subscreva o tópico 'orders-processed' e consuma 10 registros.
4 Aguarde a conclusão da tarefa do consumidor.

You need to configure

mp.messaging.incoming.orders.connector=smallrye-kafka
mp.messaging.incoming.orders.auto.offset.reset=earliest

otherwise you will get an java.lang.AssertionError: No completion (or failure) event received in the last 10000 ms in <4>

Se o Dev Service Kafka estiver disponível durante os testes, KafkaCompanionResource utiliza o broker Kafka criado, caso contrário cria um broker Kafka utilizando Strimzi Test Container.

A configuração do broker Kafka criado pode ser personalizada usando @ResourceArg, por exemplo:

@QuarkusTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
})
public class OrderProcessorTest {
    // ...
}

21.2.1. Recurso de teste personalizado

Como alternativa, você pode iniciar um broker Kafka em um recurso de teste. O trecho a seguir mostra um recurso de teste iniciando um broker do Kafka usando Testcontainers :

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 Configure a localização do bootstrap do Kafka, para que a aplicação se ligue a este broker.

22. Dev Services para o Kafka

If any Kafka-related extension is present (e.g. quarkus-messaging-kafka), Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. So, you don’t have to start a broker manually. The application is configured automatically.

Because starting a Kafka broker can be long, Dev Services for Kafka uses Redpanda, a Kafka compatible broker which starts in ~1 second.

22.1. Ativar/desativar Dev Services para o Kafka

Os Dev Services para o Kafka é ativado automaticamente, a menos que:

  • quarkus.kafka.devservices.enabled esteja definido como false

  • o kafka.bootstrap.servers esteja configurado

  • todos os canais Kafka de mensagens reativas tenham o atributo bootstrap.servers definido

Dev Services for Kafka relies on Docker to start the broker. If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. You can configure the broker address using kafka.bootstrap.servers.

22.2. broker compartilhado

Most of the time you need to share the broker between applications. Dev Services for Kafka implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.

O Dev Services para o Kafka inicia o broker com a etiqueta quarkus-dev-service-kafka que é utilizada para identificar o container.

If you need multiple (shared) brokers, you can configure the quarkus.kafka.devservices.service-name attribute and indicate the broker name. It looks for a container with the same value, or starts a new one if none can be found. The default service name is kafka.

Sharing is enabled by default in dev mode, but disabled in test mode. You can disable the sharing with quarkus.kafka.devservices.shared=false.

22.3. Definindo a porta

By default, Dev Services for Kafka picks a random port and configures the application. You can set the port by configuring the quarkus.kafka.devservices.port property.

Note que o endereço anunciado do Kafka é automaticamente configurado com a porta escolhida.

22.4. Configurando a imagem

Dev Services for Kafka supports Redpanda, kafka-native and Strimzi (in Kraft mode) images.

Redpanda is a Kafka compatible event streaming platform. Because it provides a fast startup times, Dev Services defaults to Redpanda images from redpandadata/redpanda. You can select any version from https://hub.docker.com/r/redpandadata/redpanda.

kafka-native provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM. While still being experimental, it provides very fast startup times with small footprint.

O tipo da imagem pode ser configurado utilizando

quarkus.kafka.devservices.provider=kafka-native

Strimzi provides container images and Operators for running Apache Kafka on Kubernetes. While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments. Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start.

quarkus.kafka.devservices.provider=strimzi

Para o Strimzi, você pode selecionar qualquer imagem com uma versão Kafka que tenha suporte para o Kraft (2.8.1 e superior) em https://quay.io/repository/strimzi-test-container/test-container?tab=tags

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0

22.5. Configurando os tópicos no Kafka

You can configure the Dev Services for Kafka to create topics once the broker is started. Topics are created with given number of partitions and 1 replica.

The following example creates a topic named test with 3 partitions, and a second topic named messages with 2 partitions.

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

If a topic already exists with the given name, the creation is skipped, without trying to re-partition the existing topic to a different number of partitions.

Você pode configurar o tempo limite para as chamadas do client admin do Kafka utilizadas na criação de tópicos utilizando quarkus.kafka.devservices.topic-partitions-timeout. A predefinição é de 2 segundos.

22.6. Suporte aos producers transacionais e idempotentes

By default, the Redpanda broker is configured to enable transactions and idempotence features. You can disable those using:

quarkus.kafka.devservices.redpanda.transaction-enabled=false
As transações Redpanda não suportam um processamento exatamente único.

23. Kafka Dev UI

If any Kafka-related extension is present (e.g. quarkus-messaging-kafka), the Quarkus Dev UI is extended with a Kafka broker management UI. It is connected automatically to the Kafka broker configured for the application.

Kafka Dev UI link

Com o Kafka Dev UI, pode gerenciar diretamente o seu cluster Kafka e executar tarefas, tais como:

  • Listar e criar tópicos

  • Visualização de registros

  • Publicação de novos registros

  • Inspeção da lista de grupos de consumidores e do respetivo desfasamento de consumo

Kafka Dev UI records
A Kafka Dev UI faz parte da interface de desenvolvimento do Quarkus e só está disponível no modo de desenvolvimento.

24. Vinculações de Serviços do Kubernetes

A extensão Quarkus Kafka oferece suporte à Especificação de Vinculação de Serviço para Kubernetes. Você pode ativar isso adicionando a extensão quarkus-kubernetes-service-binding à sua aplicação.

Quando executada em clusters Kubernetes configurados adequadamente, a extensão Kafka extrairá sua configuração de conexão do broker Kafka da associação de serviços disponível dentro do cluster, sem a necessidade de configuração do usuário.

25. Modelo de execução

A Mensageria Reativa invoca os métodos do usuário em uma thread de I/O. Portanto, por padrão, os métodos não devem bloquear. Conforme descrito em Bloqueando o processamento, você precisa adicionar a anotação @Blocking no método se ele bloquear a thread do chamador.

Consulte a documentação da Arquitetura Reativa do Quarkus para obter mais informações sobre este tópico.

26. Decoradores de Canais

A Mensageria Reativa do SmallRye suporta a decoração de canais de entrada e saída para implementar preocupações transversais, como monitoramento, rastreamento ou interceptação de mensagens. Para mais informações sobre a implementação de decoradores e interceptadores de mensagens, veja a documentação da Mensageria Reativa do SmallRye.

27. Referência de Configuração

Mais detalhes sobre a configuração da Mensageria Reativa do SmallRye podem ser encontrados na documentação da Mensageria Reativa do SmallRye - Conector Kafka.

Cada canal pode ser desativado através da configuração utilizando:

mp.messaging.[incoming|outgoing].[channel].enabled=false

Os atributos mais importantes são enumerados nos quadros seguintes:

27.1. Configuração do canal de entrada (sondagem a partir do Kafka)

Os seguintes atributos são configurados utilizando:

mp.messaging.incoming.your-channel-name.attribute=value

Algumas propriedades têm apelidos que podem ser configurados globalmente:

kafka.bootstrap.servers=...

Você também pode passar qualquer propriedade suportada pelo consumidor Kafka subjacente.

Por exemplo, para configurar a propriedade max.poll.records, utilize:

mp.messaging.incoming.[channel].max.poll.records=1000

Algumas propriedades do cliente consumidor são configuradas com valores predefinidos sensíveis:

Se não estiver definido, reconnect.backoff.max.ms é definido para 10000 para evitar uma carga elevada ao desconectar.

Se não for definido, key.deserializer é definido como org.apache.kafka.common.serialization.StringDeserializer.

O consumidor client.id é configurado de acordo com o número de clientes a criar utilizando a propriedade mp.messaging.incoming.[channel].partitions.

  • Se for fornecido um client.id, este é usado como está ou sufixado com o índice do cliente se a propriedade partitions estiver definida.

  • Se não for fornecido um client.id, este é gerado como [client-id-prefix][channel-name][-index].

Table 1. Atributos de entrada do conector 'smallrye-kafka'
Atributo (alias) Descrição Obrigatório Padrão

bootstrap.servers

(kafka.bootstrap.servers)

Uma lista separada por vírgulas de host:port a ser usada para estabelecer a conexão inicial com o cluster Kafka.

Tipo: string

falso

localhost:9092

topic

O tema Kafka consumido/populado. Se nem essa propriedade nem as propriedades 'topics' estiverem definidas, o nome do canal será usado

Tipo: string

falso

health-enabled

Se o relatório de integridade está habilitado (padrão) ou desabilitado

Tipo: boolean

falso

true

health-readiness-enabled

Se o relatório de integridade de prontidão está habilitado (padrão) ou desabilitado

Tipo: boolean

falso

true

health-readiness-topic-verification

deprecated - Se a verificação de prontidão deve verificar se os tópicos existem no broker. O padrão é false. Habilitá-lo requer uma conexão de administrador. Deprecated: use 'health-topic-verification-enabled' em vez disso.

Tipo: boolean

falso

health-readiness-timeout

deprecated - Durante a verificação de integridade de prontidão, o conector se conecta ao broker e recupera a lista de tópicos. Esse atributo especifica a duração máxima (em ms) para a recuperação. Se excedido, o canal é considerado não pronto. Preterido: use 'health-topic-verification-timeout' em vez disso.

Tipo: long

falso

health-topic-verification-enabled

Se a verificação de inicialização e prontidão deve verificar se os tópicos existem no broker. O padrão é false. Habilitá-lo requer uma conexão de cliente administrador.

Tipo: boolean

falso

false

health-topic-verification-timeout

Durante a verificação de integridade de inicialização e preparação, o conector se conecta ao agente e recupera a lista de tópicos. Esse atributo especifica a duração máxima (em ms) para a recuperação. Se excedido, o canal é considerado não pronto.

Tipo: long

falso

2000

tracing-enabled

Se o rastreamento está habilitado (padrão) ou desabilitado

Tipo: boolean

falso

true

client-id-prefix

Prefix for Kafka client client.id attribute. If defined configured or generated client.id will be prefixed with the given value, otherwise kafka-consumer- is the prefix.

Type: string

falso

checkpoint.state-store

While using the checkpoint commit-strategy, the name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.StateStore.Factory to specify the state store implementation.

Type: string

falso

checkpoint.state-type

While using the checkpoint commit-strategy, the fully qualified type name of the state object to persist in the state store. When provided, it can be used by the state store implementation to help persisting the processing state object.

Type: string

falso

checkpoint.unsynced-state-max-age.ms

While using the checkpoint commit-strategy, specify the max age in milliseconds that the processing state must be persisted before the connector is marked as unhealthy. Setting this attribute to 0 disables this monitoring.

Type: int

falso

10000

cloud-events

Habilita (padrão) ou desabilita o suporte a Cloud Event. Se habilitado em um canal incoming, o conector analisará os registros de entrada e tentará criar metadados do Cloud Event. Se habilitado em um outgoing, o conector enviará as mensagens de saída como Cloud Event se a mensagem incluir Metadados de Evento de Nuvem.

Tipo: boolean

falso

true

kafka-configuration

Identificador de um bean CDI que fornece a configuração padrão do consumidor/produtor Kafka para esse canal. A configuração do canal ainda pode substituir qualquer atributo. O bean deve ter um tipo de Map<String, Object> e deve usar o qualificador @io.smallrye.common.annotation.Identifier para definir o identificador.

Tipo: string

falso

topics

A comma-separating list of topics to be consumed. Cannot be used with the topic or pattern properties

Type: string

falso

pattern

Indicate that the topic property is a regular expression. Must be used with the topic property. Cannot be used with the topics property

Type: boolean

falso

false

key.deserializer

The deserializer classname used to deserialize the record’s key

Type: string

falso

org.apache.kafka.common.serialization.StringDeserializer

lazy-client

Se o cliente Kafka é criado preguiçosamente ou ansiosamente.

Tipo: boolean

falso

false

value.deserializer

The deserializer classname used to deserialize the record’s value

Type: string

true

fetch.min.bytes

The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.

Type: int

falso

1

group.id

A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the quarkus.application.name configuration property.

If that is not set either, a unique, generated id is used.

It is recommended to always define a group.id, the automatic generation is only a convenient feature for development. You can explicitly ask for automatically generated unique id by setting this property to ${quarkus.uuid}.

Type: string

falso

enable.auto.commit

If enabled, consumer’s offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit.

Type: boolean

falso

false

retry

Whether the connection to the broker is re-attempted in case of failure

Type: boolean

falso

true

retry-attempts

The maximum number of reconnection before failing. -1 means infinite retry

Type: int

falso

-1

retry-max-wait

The max delay (in seconds) between 2 reconnects

Type: int

falso

30

broadcast

Whether the Kafka records should be dispatched to multiple consumer

Type: boolean

falso

false

auto.offset.reset

What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none

Type: string

falso

latest

failure-strategy

Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be fail (default), ignore, or dead-letter-queue

Type: string

falso

fail

commit-strategy

Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be latest, ignore or throttled. If enable.auto.commit is true then the default is ignore otherwise it is throttled

Type: string

falso

throttled.unprocessed-record-max-age.ms

While using the throttled commit-strategy, specify the max age in milliseconds that an unprocessed message can be before the connector is marked as unhealthy. Setting this attribute to 0 disables this monitoring.

Type: int

falso

60000

dead-letter-queue.topic

When the failure-strategy is set to dead-letter-queue indicates on which topic the record is sent. Defaults is dead-letter-topic-$channel

Type: string

falso

dead-letter-queue.key.serializer

When the failure-strategy is set to dead-letter-queue indicates the key serializer to use. If not set the serializer associated to the key deserializer is used

Type: string

falso

dead-letter-queue.value.serializer

When the failure-strategy is set to dead-letter-queue indicates the value serializer to use. If not set the serializer associated to the value deserializer is used

Type: string

falso

partitions

The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic

Type: int

falso

1

requests

When partitions is greater than 1, this attribute allows configuring how many records are requested by each consumer every time.

Type: int

falso

128

consumer-rebalance-listener.name

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener. If set, this rebalance listener is applied to the consumer.

Type: string

falso

key-deserialization-failure-handler

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler. If set, deserialization failure happening when deserializing keys are delegated to this handler which may retry or provide a fallback value.

Type: string

falso

value-deserialization-failure-handler

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler. If set, deserialization failure happening when deserializing values are delegated to this handler which may retry or provide a fallback value.

Type: string

falso

fail-on-deserialization-failure

When no deserialization failure handler is set and a deserialization failure happens, report the failure and mark the application as unhealthy. If set to false and a deserialization failure happens, a null value is forwarded.

Type: boolean

falso

true

graceful-shutdown

Whether a graceful shutdown should be attempted when the application terminates.

Type: boolean

falso

true

poll-timeout

The polling timeout in milliseconds. When polling records, the poll will wait at most that duration before returning records. Default is 1000ms

Type: int

falso

1000

pause-if-no-requests

Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.

Type: boolean

falso

true

batch

Whether the Kafka records are consumed in batch. The channel injection point must consume a compatible type, such as List<Payload> or KafkaRecordBatch<Payload>.

Type: boolean

falso

false

max-queue-size-factor

Multiplier factor to determine maximum number of records queued for processing, using max.poll.records * max-queue-size-factor. Defaults to 2. In batch mode max.poll.records is considered 1.

Type: int

falso

2

27.2. Configuração do canal de saída (escrevendo no Kafka)

Os seguintes atributos são configurados utilizando:

mp.messaging.outgoing.your-channel-name.attribute=value

Algumas propriedades têm apelidos que podem ser configurados globalmente:

kafka.bootstrap.servers=...

Você também pode passar qualquer propriedade suportada pelo produtor Kafka subjacente.

Por exemplo, para configurar a propriedade max.block.ms, utilize:

mp.messaging.incoming.[channel].max.block.ms=10000

Algumas propriedades do cliente produtor são configuradas para valores predefinidos sensíveis:

Se não estiver definido, reconnect.backoff.max.ms é definido para 10000 para evitar uma carga elevada ao desconectar.

Se não for definido, key.serializer é definido como org.apache.kafka.common.serialization.StringSerializer.

Se não for definido, o produtor client.id é gerado como [client-id-prefix][channel-name].

Table 2. Atributos de saída do conector 'smallrye-kafka'
Atributo (alias) Descrição Obrigatório Padrão

acks

O número de reconhecimentos que o produtor exige que o líder tenha recebido antes de considerar um pedido completo. Isso controla a durabilidade dos registros que são enviados. Os valores aceitos são: 0, 1, todos

Tipo: string

falso

1

bootstrap.servers

(kafka.bootstrap.servers)

Uma lista separada por vírgulas de host:port a ser usada para estabelecer a conexão inicial com o cluster Kafka.

Tipo: string

falso

localhost:9092

client-id-prefix

Prefixo para o atributo 'client.id' do cliente Kafka. Se definido configurado ou gerado 'client.id' será prefixado com o valor dado, caso contrário 'kafka-producer-' é o prefixo.

Tipo: string

falso

buffer.memory

O total de bytes de memória que o produtor pode usar para armazenar em buffer registros aguardando para serem enviados ao servidor.

Tipo: long

falso

33554432

close-timeout

A quantidade de milissegundos à espera de um desligamento gracioso do produtor de Kafka

Tipo: int

falso

10000

cloud-events

Habilita (padrão) ou desabilita o suporte a Cloud Event. Se habilitado em um canal incoming, o conector analisará os registros de entrada e tentará criar metadados do Cloud Event. Se habilitado em um outgoing, o conector enviará as mensagens de saída como Cloud Event se a mensagem incluir Metadados de Evento de Nuvem.

Tipo: boolean

falso

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

Configura o atributo 'datacontenttype' padrão do evento de nuvem de saída. Requer que 'cloud-events' seja definido como 'true'. Esse valor será usado se a mensagem não configurar o próprio atributo 'datacontenttype'

Tipo: string

falso

cloud-events-data-schema

(cloud-events-default-data-schema)

Configura o atributo 'dataschema' padrão do evento de nuvem de saída. Requer que 'cloud-events' seja definido como 'true'. Esse valor será usado se a mensagem não configurar o próprio atributo 'dataschema'

Tipo: string

falso

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

Se o conector deve inserir automaticamente o atributo 'time' no evento de nuvem de saída. Requer que 'cloud-events' seja definido como 'true'. Esse valor será usado se a mensagem não configurar o próprio atributo 'time'

Tipo: boolean

falso

true

cloud-events-mode

O modo Cloud Event ('estruturado' ou 'binário' (padrão)). Indica como são gravados os eventos de nuvem no registro de saída

Tipo: string

falso

binary

cloud-events-source

(cloud-events-default-source)

Configure o atributo 'source' padrão do evento de nuvem de saída. Requer que 'cloud-events' seja definido como 'true'. Esse valor será usado se a mensagem não configurar o próprio atributo 'source'

Tipo: string

falso

cloud-events-subject

(cloud-events-default-subject)

Configure o atributo 'subject' padrão do evento de nuvem de saída. Requer que 'cloud-events' seja definido como 'true'. Esse valor será usado se a mensagem não configurar o próprio atributo 'subject'

Tipo: string

falso

cloud-events-type

(cloud-events-default-type)

Configure o atributo 'type' padrão do evento de nuvem de saída. Requer que 'cloud-events' seja definido como 'true'. Esse valor será usado se a mensagem não configurar o próprio atributo 'type'

Tipo: string

falso

health-enabled

Se o relatório de integridade está habilitado (padrão) ou desabilitado

Tipo: boolean

falso

true

health-readiness-enabled

Se o relatório de integridade de prontidão está habilitado (padrão) ou desabilitado

Tipo: boolean

falso

true

health-readiness-timeout

deprecated - Durante a verificação de integridade de prontidão, o conector se conecta ao broker e recupera a lista de tópicos. Esse atributo especifica a duração máxima (em ms) para a recuperação. Se excedido, o canal é considerado não pronto. Preterido: use 'health-topic-verification-timeout' em vez disso.

Tipo: long

falso

health-readiness-topic-verification

deprecated - Se a verificação de prontidão deve verificar se os tópicos existem no broker. O padrão é false. Habilitá-lo requer uma conexão de administrador. Deprecated: use 'health-topic-verification-enabled' em vez disso.

Tipo: boolean

falso

health-topic-verification-enabled

Se a verificação de inicialização e prontidão deve verificar se os tópicos existem no broker. O padrão é false. Habilitá-lo requer uma conexão de cliente administrador.

Tipo: boolean

falso

false

health-topic-verification-timeout

Durante a verificação de integridade de inicialização e preparação, o conector se conecta ao agente e recupera a lista de tópicos. Esse atributo especifica a duração máxima (em ms) para a recuperação. Se excedido, o canal é considerado não pronto.

Tipo: long

falso

2000

kafka-configuration

Identificador de um bean CDI que fornece a configuração padrão do consumidor/produtor Kafka para esse canal. A configuração do canal ainda pode substituir qualquer atributo. O bean deve ter um tipo de Map<String, Object> e deve usar o qualificador @io.smallrye.common.annotation.Identifier para definir o identificador.

Tipo: string

falso

key

Uma chave a ser usada ao gravar o registro

Tipo: string

falso

key-serialization-failure-handler

O nome definido em '@Identifier' de um bean que implementa 'io.smallrye.reactive.messaging.kafka.SerializationFailureHandler'. Se definido, a falha de serialização que ocorre quando as chaves de serialização são delegadas a esse manipulador, que pode fornecer um valor de fallback.

Tipo: string

falso

key.serializer

O nome da classe do serializador usado para serializar a chave do registro

Tipo: string

falso

org.apache.kafka.common.serialization.StringSerializer

lazy-client

Se o cliente Kafka é criado preguiçosamente ou ansiosamente.

Tipo: boolean

falso

false

max-inflight-messages

O número máximo de mensagens a serem gravadas em Kafka simultaneamente. Ele limita o número de mensagens aguardando para serem escritas e reconhecidas pelo corretor. Você pode definir esse atributo como '0' remover o limite

Tipo: long

falso

1024

merge

Se o conector deve permitir vários upstreams

Tipo: boolean

falso

false

partition

O id da partição de destino. -1 para permitir que o cliente determine a partição

Tipo: int

falso

-1

propagate-headers

Uma lista separada por vírgulas de cabeçalhos de registro de entrada a serem propagados para o registro de saída

Tipo: string

falso

propagate-record-key

Propagar a chave de registro de entrada para o registro de saída

Tipo: boolean

falso

false

retries

Se definido como um número positivo, o conector tentará reenviar qualquer registro que não tenha sido entregue com êxito (com um erro potencialmente transitório) até que o número de tentativas seja atingido. Se definido como 0, as novas tentativas serão desabilitadas. Se não estiver definido, o conector tentará reenviar qualquer registro que não tenha sido entregue (devido a um erro potencialmente transitório) durante um período de tempo configurado por 'delivery.timeout.ms'.

Tipo: long

falso

2147483647

topic

O tópico Kafka consumido/populado. Se nem essa propriedade nem as propriedades 'topics' estiverem definidas, o nome do canal será usado

Tipo: string

falso

tracing-enabled

Se o rastreamento está habilitado (padrão) ou desabilitado

Tipo: boolean

falso

true

value-serialization-failure-handler

O nome definido em '@Identifier' de um bean que implementa 'io.smallrye.reactive.messaging.kafka.SerializationFailureHandler'. Se definido, a falha de serialização que ocorre quando os valores de serialização são delegados a esse manipulador, que pode fornecer um valor de fallback.

Tipo: string

falso

value.serializer

O nome da classe do serializador usado para serializar a carga útil

Tipo: string

true

waitForWriteCompletion

Se o cliente espera que Kafka reconheça o registro escrito antes de reconhecer a mensagem

Tipo: boolean

falso

true

27.3. Resolução de Configuração do Kafka

O Quarkus expõe todas as propriedades da aplicação relacionadas ao Kafka, prefixadas com kafka. ou KAFKA_ dentro de um mapa de configuração com o nome default-kafka-broker. Essa configuração é usada para estabelecer a conexão com o broker Kafka.

Além desta configuração predefinida, você pode configurar o nome do produtor Map utilizando o atributo kafka-configuration:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

Nesse caso, o conector procura o Map associado ao nome my-configuration. Se kafka-configuration não estiver definido, será feita uma pesquisa opcional de um Map exposto com o nome do canal (my-channel no exemplo anterior).

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}
Se kafka-configuration estiver definido e não for possível encontrar Map, a implantação falha.

Os valores dos atributos são resolvidos da seguinte forma:

  1. o atributo é definido diretamente na configuração do canal (mp.messaging.incoming.my-channel.attribute=value),

  2. se não estiver definido, o conector procura um Map com o nome do canal ou o kafka-configuration configurado (se definido) e o valor é obtido a partir desse Map

  3. Se o Map resolvido não contém o valor, é utilizado o Map padrão (exposto com o nome default-kafka-broker)

27.4. Configurar canais condicionalmente

Você pode configurar os canais usando um perfil específico. Assim, os canais só são configurados (e adicionados à aplicação) quando o perfil especificado é ativado.

Para conseguir isso, você precisa:

  1. Prefixar as entradas mp.messaging.[incoming|outgoing].$channel com %my-profile tal como %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. Utilizar o @IfBuildProfile("my-profile") nos beans CDI que contêm anotações @Incoming(channel) e @Outgoing(channel) que só precisam ser ativadas quando o perfil é ativado.

Observe que as mensagens reativas verificam se o gráfico está completo. Portanto, ao usar essa configuração condicional, certifique-se de que a aplicação funcione com e sem o perfil ativado.

Note que esta abordagem também pode ser utilizada para alterar a configuração do canal com base num perfil.

28. Integrando com o Kafka - Padrões comuns

28.1. Escrever no Kafka a partir de um endpoint HTTP

Para enviar mensagens para o Kafka a partir de um endpoint HTTP, injete um Emitter (ou um MutinyEmitter) no seu endpoint:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 Injete um Emitter<String>
2 O método HTTP recebe o conteúdo e devolve um CompletionStage concluído quando a mensagem é escrita no Kafka
3 Envie a mensagem para o Kafka, o método send devolve um CompletionStage

O endpoint envia a conteúdo passado (de uma requisição HTTP POST) para o emissor. O canal do emissor é mapeado para um tópico do Kafka no arquivo application.properties:

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

O endpoint retorna um CompletionStage indicando a natureza assíncrona do método. O método emitter.send retorna um CompletionStage<Void>. O future retornado é concluído quando a mensagem tiver sido gravada no Kafka. Se a gravação falhar, o CompletionStage retornado será concluído excepcionalmente.

Se o endpoint não devolver um CompletionStage, a resposta HTTP pode ser escrita antes de a mensagem ser enviada para o Kafka, e então as falhas não serão comunicadas ao usuário.

Se você precisa enviar um registro Kafka, use:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 Note a utilização de um Emitter<Record<K, V>>
2 Crie o registro usando Record.of(k, v)

28.2. Persistindo mensagens Kafka com o Hibernate com Panache

Para persistir objetos recebidos do Kafka numa base de dados, você pode utilizar o Hibernate com o Panache.

Se você estiver usando o Hibernate Reactive, veja Persistência de mensagens Kafka com o Hibernate Reativo.

Vamos imaginar que você receba objetos Fruit. Para fins de simplificação, nossa classe Fruit é bastante simples:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

Para consumir instâncias do Fruit armazenadas num tópico do Kafka e mantê-las numa base de dados, você pode utilizar a seguinte abordagem:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
    }
}
1 Configurando o canal de entrada. Este canal lê a partir do Kafka.
2 Como estamos escrevendo em um banco de dados, devemos estar em uma transação. Essa anotação inicia uma nova transação e a confirma quando o método retorna. O Quarkus considera automaticamente o método como blocante. De fato, a gravação em um banco de dados usando o Hibernate clássico é blocante. Portanto, o Quarkus chama o método em um thread de trabalho que pode ser bloqueado (e não em um thread de E/S).
3 O método recebe cada Fruit. Note que seria necessário um desserializador para reconstruir as instâncias Fruit a partir dos registros Kafka.
4 Persiste o objeto fruit recebido.

Conforme mencionado em <4>, você precisa de um desserializador que possa criar um Fruit a partir do registro. Isso pode ser feito usando um desserializador Jackson:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

A configuração associada seria:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Consulte Serialização via Jackson para mais detalhes sobre o uso do Jackson com o Kafka. Também é possível utilizar Avro.

28.3. Persistência de mensagens Kafka com o Hibernate Reativo

Para persistir objetos recebidos do Kafka numa base de dados, você pode utilizar o Hibernate Reativo com Panache.

Vamos imaginar que você receba objetos Fruit. Para fins de simplificação, nossa classe Fruit é bastante simples:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 Certifique-se de que utiliza a variante reativa

Para consumir instâncias do Fruit armazenadas num tópico do Kafka e mantê-las numa base de dados, você pode utilizar a seguinte abordagem:

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Inject
    Mutiny.Session session;                    (1)

    @Incoming("in")
    @ActivateRequestContext (2)
    public Uni<Void> consume(Fruit entity) {
        return session.withTransaction(t -> {  (3)
            return entity.persistAndFlush()    (4)
                    .replaceWithVoid();        (5)
        }).onTermination().call(() -> session.close()); (6)
    }

}
1 Injete a Session do Hibernate Reativo
2 As APIs Session e Panache do Hibernate Reativo exigem um Contexto de Requisição CDI ativo. A anotação @ActivateRequestContext cria um novo contexto de requisição e o destrói quando o Uni retorno do método é concluído. Se o Panache não for usado, o Mutiny.SessionFactory poderá ser injetado e usado de forma semelhante, sem a necessidade de ativar o contexto de requisição ou fechar a sessão manualmente.
3 Solicita uma nova transação. A transação é concluída quando a ação passada é concluída.
4 Persiste a entidade. Devolve um Uni<Fruit>.
5 Troque de volta para um Uni<Void>.
6 Feche a sessão - isto é, fechar a conexão com a base de dados. A conexão pode então ser reciclada.

Diferentemente do Hibernate clássico, você não pode usar @Transactional. Em vez disso, usamos session.withTransaction e persistimos nossa entidade. O map é usado para retornar um Uni<Void> e não um Uni<Fruit>.

Você precisa de um desserializador que possa criar um Fruit a partir do registro. Isso pode ser feito usando um desserializador Jackson:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

A configuração associada seria:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Consulte Serialização via Jackson para mais detalhes sobre o uso do Jackson com o Kafka. Também é possível utilizar Avro.

28.4. Escrevendo entidades gerenciadas pelo Hibernate no Kafka

Vamos imaginar o seguinte processo:

  1. VocêrRecebe uma requisição HTTP com um conteúdo,

  2. Você uma instância de entidade Hibernate a partir deste conteúdo,

  3. Você persiste essa entidade numa base de dados,

  4. Você envia a entidade para um tópico do Kafka

Se você estiver usando Hibernate Reactive, veja Escrevendo entidades gerenciadas pelo Hibernate Reativo no Kafka.

Como gravamos em um banco de dados, precisamos executar esse método em uma transação. No entanto, o envio da entidade para o Kafka ocorre de forma assíncrona. A operação retorna um relatório CompletionStage (ou um Uni se você usar um MutinyEmitter) quando a operação for concluída. Precisamos ter certeza de que a transação ainda está em execução até que o objeto seja gravado. Caso contrário, você poderá acessar o objeto fora da transação, o que não é permitido.

Para implementar este processo, é necessária a seguinte abordagem:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(new FruitDto(fruit));                       (3)
    }
}
1 Como estamos escrevendo para a base de dados, certifique-se de que executamos dentro de uma transação
2 O método recebe a instância de fruit a ser persistida. Devolve um CompletionStage que é utilizado para a demarcação da transação. A transação é confirmada quando o retorno CompletionStage é concluído. No nosso caso, é quando a mensagem é escrita no Kafka.
3 Envolva a entidade gerenciada em um objeto de transferência de dados e envie-a para o Kafka. Isso garante que a entidade gerenciada não seja afetada pela serialização do Kafka.

28.5. Escrevendo entidades gerenciadas pelo Hibernate Reativo no Kafka

Para enviar para entidades Kafka gerenciadas pelo Hibernate Reativo, recomendamos usar:

  • Quarkus REST para servir requisições HTTP

  • Um MutinyEmitter para enviar mensagens para um canal, para que possa ser facilmente integrado com a API do Mutiny exposta pelo Hibernate Reativo ou Hibernate Reativo com Panache.

O exemplo a seguir demonstra como receber um conteúdo, armazená-lo na base de dados utilizando o Hibernate Reativo com Panache e enviar a entidade persistente para o Kafka:

package org.acme;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 Injete um MutinyEmitter que expõe uma API do Mutiny. Ele simplifica a integração com a API do Mutiny exposta pelo Hibernate Reativo com Panache.
2 O método HTTP que recebe o conteúdo devolve um Uni<Void>. A resposta HTTP é escrita quando a operação é concluída (a entidade é persistida e escrita no Kafka).
3 Temos de escrever a entidade na base de dados numa transação.
4 Uma vez concluída a operação de persistência, enviamos a entidade para o Kafka. O método send devolve um Uni<Void>.

28.6. Transmitindo tópicos do Kafka como eventos enviados pelo servidor

A transmissão de um tópico do Kafka como eventos enviados pelo servidor (SSE) é simples:

  1. Você injeta o canal que representa o tópico Kafka no seu endpoint HTTP

  2. Você devolve esse canal como Publisher ou Multi a partir do método HTTP

O código seguinte apresenta um exemplo:

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

Alguns ambientes cortam a conexão SSE quando não há atividade suficiente. A solução alternativa consiste em enviar mensagens de ping (ou objetos vazios) periodicamente.

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

A solução alternativa é um pouco mais complexa, pois além de enviar os fruits provenientes do Kafka, precisamos enviar pings periodicamente. Para isso, mesclamos o fluxo proveniente do Kafka e um fluxo periódico que emite {} a cada 10 segundos.

28.7. Encadeando Transações do Kafka com transações Reativas do Hibernate

Ao encadear uma transação do Kafka com uma transação reativa do Hibernate, é possível enviar registros para uma transação do Kafka, realizar atualizações do banco de dados e confirmar a transação do Kafka somente se a transação do banco de dados for bem-sucedida.

O exemplo a seguir demonstra isso:

  • Receber um payload ao servir requisições HTTP usando Quarkus REST,

  • Limit concurrency of that HTTP endpoint using SmallRye Fault Tolerance,

  • Inicia uma transação Kafka e envia o conteúdo para o registro Kafka,

  • Armazenaoa conteúdo na base de dados utilizando Hibernate Reativo com Panache,

  • Confirma a transação Kafka apenas se a entidade for persistida com êxito.

package org.acme;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1) (2)
    public Uni<Void> post(Fruit fruit) { (3)
        return kafkaTx.withTransaction(emitter -> { (4)
            emitter.send(fruit); (5)
            return Panache.withTransaction(() -> { (6)
                return fruit.<Fruit>persist(); (7)
            });
        }).replaceWithVoid();
    }
}
1 Injeta um KafkaTransactions que expõe uma API do Mutiny. Isso permite a integração com a API do Mutiny exposta pelo Hibernate Reativo com Panache.
2 Limita a concorrência do endpoint HTTP a "1", impedindo o início de várias transações num determinado momento.
3 O método HTTP que recebe o conteúdo devolve um Uni<Void>. A resposta HTTP é escrita quando a operação é concluída (a entidade é persistida e a transação Kafka é confirmada).
4 Inicia uma transação Kafka.
5 Envia o contéudo para o Kafka dentro da transação Kafka.
6 Persiste a entidade na base de dados numa transação reativa do Hibernate.
7 Quando a operação de persistência for concluída e não houver erros, a transação do Kafka será confirmada. O resultado é omitido e retornado como resposta HTTP.

No exemplo anterior, a transação do banco de dados (interna) será confirmada seguida pela transação do Kafka (externa). Se você quiser confirmar a transação do Kafka primeiro e a transação do banco de dados depois, precisará aninhá-las na ordem inversa.

O próximo exemplo demonstra isso utilizando a API reativa do Hibernate (sem o Panache):

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @Inject Mutiny.SessionFactory sf; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        Context context = Vertx.currentContext(); (2)
        return sf.withTransaction(session -> (3)
                kafkaTx.withTransaction(emitter -> (4)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (5)
                ).emitOn(context::runOnContext) (6)
        );
    }
}
1 Injeta o SessionFactory do Hibernate Reativo.
2 Captura o contexto Vert.x do chamador.
3 Inicia uma transação Reativa do Hibernate.
4 Inicia uma transação Kafka.
5 Persiste o conteúdo e envia a entidade para o Kafka.
6 A transação do Kafka é encerrada no thread do emissor produtor do Kafka. Precisamos mudar para o contexto Vert.x capturado anteriormente para encerrar a transação do Hibernate Reativo no mesmo contexto em que a iniciamos.

29. Registrando

Para reduzir a quantidade de registros escritos pelo cliente Kafka, o Quarkus define o nível das seguintes categorias de registros para WARNING:

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

É possível substituir a configuração adicionando as seguintes linhas ao arquivo application.properties:

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

30. Conectando a clusters do Kafka Gerenciados

Esta seção explica como conectar aos notórios Kafka Cloud Services.

30.1. Hubs de Eventos do Azure

O Hub de Eventos do Azure fornece um endpoint compatível com o Apache Kafka.

Os Hubs de Eventos do Azure para Kafka não estão disponíveis na camada básica. Você precisa de pelo menos a camada padrão para usar o Kafka. Consulte Preços do Hubs de Eventos do Azure para ver as outras opções.

Para se conectar ao Hub de Eventos do Azure, utilizando o protocolo Kafka com TLS, você precisa da seguinte configuração:

kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
1 A porta é 9093.
2 Você precisa usar o JAAS PlainLoginModule.
3 O nome de usuário é a string $ConnectionString.
4 A cadeia de conexão do Hub de Eventos fornecida pelo Azure.

Substitua <YOUR.EVENTHUBS.CONNECTION.STRING> pela cadeia de conexão do seu namespace do Hub de Eventos. Para obter instruções sobre como obter a cadeia de conexão, consulte Obter uma cadeia de conexão dos Hubs de Evento. O resultado seria algo como:

kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Esta configuração pode ser global (como acima), ou definida na configuração do canal:

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

30.2. Red Hat OpenShift Streams for Apache Kafka

O Red Hat OpenShift Streams for Apache Kafka fornece brokers Kafka gerenciados. Primeiro, siga as instruções de Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka para criar sua instância de broker Kafka. Certifique-se de copiar o ID do cliente e o segredo do cliente associados à ServiceAccount que você criou.

Em seguida, pode configurar a aplicação Quarkus para se conectar ao broker da seguinte forma:

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)
1 A cadeia de conexão, fornecida no console de administração, tal como demo-c--​bjsv-ldd-cvavkc-a.bf2.kafka.rhcloud.com:443
2 O nome de usuário do kafka (o id do cliente da conta de serviço)
3 a senha do kafka (o segredo do cliente da conta de serviço)
Em geral, estas propriedades são prefixadas com %prod para as ativar apenas quando são executadas em modo de produção.
Conforme explicado em Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka, para usar o Red Hat OpenShift Streams for Apache Kafka, você deve criar o tópico antecipadamente, criar uma Service Account e fornecer permissões para ler e escrever no seu tópico a partir dessa conta de serviço. Os dados de autenticação (ID do cliente e segredo) estão relacionados à conta de serviço, o que significa que você pode implementar permissões detalhadas e restringir o acesso ao tópico.

Ao utilizar o Kubernetes, recomenda-se que defina o ID do cliente e o segredo num segredo do Kubernetes:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
stringData:
  KAFKA_USERNAME: "..."
  KAFKA_PASSWORD: "..."

Para permitir que a sua aplicação Quarkus utilize esse segredo, adicione a seguinte linha ao arquivo application.properties:

%prod.quarkus.openshift.env.secrets=kafka-credentials

30.2.1. Red Hat OpenShift Service Registry

O Red Hat OpenShift Service Registry fornece um registro de serviço totalmente gerenciado para lidar com esquemas Kafka.

Você pode seguir as instruções de Introdução ao Red Hat OpenShift Service Registry ou usar a CLI do rhoas para criar uma nova instância de registro de serviço:

rhoas service-registry create --name my-schema-registry

Não se esqueça de anotar o URL do Registro da instância criada. Para autenticação, você pode usar a mesma ServiceAccount que criou anteriormente. É necessário certificar-se de que ela tenha as permissões necessárias para acessar o registro de serviço.

Por exemplo, usando a CLI rhoas, é possível conceder o papel MANAGER à conta de serviço:

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

Em seguida, pode configurar a aplicação Quarkus para se conectar ao registro de esquemas da seguinte forma:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} (1)
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} (2)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} (3)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} (4)
1 A URL do registro de serviço, fornecida no console de administração, como https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2
2 O URL do endpoint de token OAuth, como https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token
3 O ID do cliente (da conta de serviço)
4 O segredo do cliente (da conta de serviço)

30.2.2. Vinculando serviços gerenciados do Red Hat OpenShift a aplicação Quarkus usando o Operador de Vinculação de Serviço

Se a sua aplicação Quarkus for implantada em um cluster Kubernetes ou OpenShift com o Operador de Vinculação de Serviço e os operadores do Serviços de Aplicação do OpenShift instalados, as configurações necessárias para acessar o Red Hat OpenShift Streams for Apache Kafka e o Service Registry podem ser injetadas na aplicação usando a Vinculação de Serviços do Kubernetes.

Para configurar a Vinculação de Serviço, primeiro você precisa conectar os serviços gerenciados do OpenShift ao seu cluster. Para um cluster do OpenShift, você pode seguir as instruções de Connecting a Kafka and Service Registry instance to your OpenShift cluster.

Depois de conectar o seu cluster às instâncias do RHOAS Kafka e do Service Registry, certifique-se de que concedeu as permissões necessárias à conta de serviço recém-criada.

Em seguida, usando a extensão Vinculação de Serviço Kubernetes, você pode configurar a aplicação Quarkus para gerar recursos ServiceBinding para esses serviços:

quarkus.kubernetes-service-binding.detect-binding-resources=true

quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka

quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

Para este exemplo, a compilação do Quarkus irá gerar os seguintes recursos ServiceBinding:

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
---
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

Você pode seguir Implantando no OpenShift para implantar sua aplicação, incluindo os recursos gerados do ServiceBinding. As propriedades de configuração necessárias para acessar as instâncias do Kafka e do Schema Registry serão injetadas na aplicação automaticamente durante a implantação.

31. Indo mais longe

Este guia mostrou como você pode interagir com o Kafka usando o Quarkus. Ele utiliza a Mensageria Quarkus para construir aplicações de streaming de dados.

Se quiser ir mais longe, consulte a documentação da Mensageria Reativa do SmallRye, a implementação utilizada no Quarkus.

Conteúdo Relacionado