The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Apache Pulsar Reference Guide

This reference guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to interact with Apache Pulsar.

1. Introdução

Apache Pulsar is an open-source, distributed messaging and streaming platform built for the cloud. It provides a multi-tenant, high-performance solution to server messaging with tiered storage capabilities.

Pulsar implements the publish-subscribe pattern:

  • Producers publish messages to topics.

  • Consumers create subscriptions to those topics to receive and process incoming messages, and send acknowledgments to the broker when processing is finished.

  • When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully.

A Pulsar cluster consists of

  • One or more brokers, which are stateless components.

  • A metadata store for maintaining topic metadata, schema, coordination and cluster configuration.

  • A set of bookies used for persistent storage of messages.

2. Quarkus Extension for Apache Pulsar

Quarkus provides support for Apache Pulsar through SmallRye Reactive Messaging framework. Based on Eclipse MicroProfile Reactive Messaging specification 3.0, it proposes a flexible programming model bridging CDI and event-driven.

This guide provides an in-depth look on Apache Pulsar and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to SmallRye Reactive Messaging with Apache Pulsar.

You can add the smallrye-reactive-messaging-pulsar extensions to your project by running the following command in your project base directory:

CLI
quarkus extension add smallrye-reactive-messaging-pulsar
Maven
./mvnw quarkus:add-extension -Dextensions='smallrye-reactive-messaging-pulsar'
Gradle
./gradlew addExtension --extensions='smallrye-reactive-messaging-pulsar'

Isto irá adicionar o seguinte trecho no seu arquivo de build:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-pulsar</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-pulsar")

The extension includes pulsar-clients-original version 3.0.0 as a transitive dependency and is compatible with Pulsar brokers version 2.10.x.

3. Configuring Smallrye Pulsar Connector

Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, Apache Pulsar, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:

  • Applications send and receive messages. Message wraps a payload and can be extended with some metadata. This should not be confused with a Pulsar Message, which consists of value, key With the Pulsar connector, a Reactive Messaging message corresponds to a Pulsar message.

  • Messages transit on channels. Application components connect to channels to publish and consume messages. The Pulsar connector maps channels to Pulsar topics.

  • Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Pulsar is named smallrye-pulsar.

A minimal configuration for the Pulsar connector with an incoming channel looks like the following:

%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 Configure the Pulsar broker service url for the production profile. You can configure it globally or per channel using mp.messaging.incoming.$channel.serviceUrl property. In dev mode and when running tests, Dev Services para Pulsar automatically starts a Pulsar broker.
2 Configure the connector to manage the prices channel. By default, the topic name is same as the channel name.

You can configure the topic attribute to override it.

O prefixo %prod indica que a propriedade só é utilizada quando a aplicação é executada em modo de produção (portanto, não em desenvolvimento ou teste). Consulte a documentação do Perfil para obter mais detalhes.
Fixação automática do conector

If you have a single connector on your classpath, you can omit the connector attribute configuration. Quarkus automatically associates orphan channels to the (unique) connector found on the classpath. Orphan channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer.

Esta ligação automática pode ser desativada utilizando:

quarkus.reactive-messaging.auto-connector-attachment=false

For more configuration options see Configuring Pulsar clients.

4. Receiving messages from Pulsar

The Pulsar Connector connects to a Pulsar broker using a Pulsar client and creates consumers to receive messages from Pulsar brokers, and it maps each Pulsar Message into Reactive Messaging Message.

4.1. Example

Let’s imagine you have a Pulsar broker running, and accessible using the pulsar:6650 address. Configure your application to receive Pulsar messages on the prices channel as follows:

mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
  1. Configure the Pulsar broker service url.

  2. Make sure consumer subscription starts receiving messages from the Earliest position.

You don’t need to set the Pulsar topic, nor the consumer name. By default, the connector uses the channel name (prices). You can configure the topic and consumerName attributes to override them.

In Pulsar, consumers need to provide a subscriptionName for topic subscriptions. If not provided the connector generates a unique subscription name.

Then, your application can receive the double payload directly:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

Or, you can retrieve the Reactive Messaging type Message<Double>:

@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
    return msg.ack();
}

The Reactive Messaging Message type lets the consuming method access the incoming message metadata and handle the acknowledgment manually.

If you want to access the Pulsar message objects directly, use:

@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
    String key = msg.getKey();
    String value = msg.getValue();
    String topic = msg.topicName();
    // ...
}

org.apache.pulsar.client.api.Message is provided by the underlying Pulsar client and can be used directly with the consumer method.

Alternatively, your application can inject a Multi in your bean, identified with the channel name and subscribe to its events as the following example:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

Ao consumir mensagens com @Channel, o código da aplicação é responsável pela assinatura. No exemplo acima, o endpoint do RESTEasy Reativo cuida disso para você.

Os seguintes tipos podem ser injetados como canais:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

As with the previous Message example, if your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If your injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgment and broadcasting.

4.2. Bloqueando o processamento

A Mensageria Reativa invoca seu método em um thread de E/S. Consulte a documentação da Arquitetura Reativa do Quarkus para obter mais detalhes sobre esse tópico. Mas, muitas vezes, você precisa combinar o envio de mensagens reativas com processamento blocante, como interações de banco de dados. Para isso, você precisa usar a anotação @Blocking indicando que o processamento está bloqueando e não deve ser executado no thread do chamador.

Por exemplo, o código a seguir ilustra como é possível armazenar conteúdos recebidos em uma base de dados usando o Hibernate com Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

Existem 2 anotações de @Blocking:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

Eles têm o mesmo efeito. Portanto, você pode usar os dois. O primeiro fornece um ajuste mais refinado, como o pool de trabalho a ser usado e se ele preserva a ordem. O segundo, usado também com outros recursos reativos do Quarkus, usa o pool de trabalho padrão e preserva a ordem.

Informações detalhadas sobre a utilização da anotação @Blocking podem ser encontradas em Mensageria Reativa do SmallRye - Lidando com execução blocante.

@RunOnVirtualThread

Para executar o processamento blocante em threads virtuais Java, consulte a documentação de suporte do Quarkus à Virtual Thread com Mensageria Reativa.

@Transactional

Se o seu método estiver anotado com @Transactional, será considerado blocante automaticamente, mesmo que o método não esteja anotado com @Blocking.

4.3. Pulsar Subscription Types

Pulsar subscriptionType consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing.

  • Exclusive subscription type allows specifying a unique subscription name for "fan-out pub-sub messaging". This is the default subscription type.

  • Shared, Key_Shared or Failover subscription types allow multiple consumers to share the same subscription name, to achieve "message queuing" among consumers.

If a subscription name is not provided Quarkus generates a unique id.

4.4. Deserialization and Pulsar Schema

The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar consumer. See the Pulsar Schema Configuration & Auto Schema Discovery for more information.

4.5. Acknowledgement Strategies

When a message produced from a Pulsar Message is acknowledged, the connector sends an acknowledgement request to the Pulsar broker. All Reactive Messaging messages need to be acknowledged, which is handled automatically in most cases. Acknowledgement requests can be sent to the Pulsar broker using the following two strategies:

  • Individual acknowledgement is the default strategy, an acknowledgement request is to the broker for each message.

  • Cumulative acknowledgement, configured using ack-strategy=cumulative, the consumer only acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.

By default, the Pulsar consumer does not wait for the acknowledgement confirmation from the broker to validate an acknowledgement. You can enable this using ackReceiptEnabled=true.

4.6. Failure Handling Strategies

If a message produced from a Pulsar message is nacked, a failure strategy is applied. The Quarkus Pulsar extension supports 4 strategies:

  • nack (default) sends negative acknowledgment to the broker, triggering the broker to redeliver this message to the consumer. The negative acknowledgment can be further configured using negativeAckRedeliveryDelayMicros and negativeAck.redeliveryBackoff properties.

  • fail fail the application, no more messages will be processed.

  • ignore the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.

  • continue the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with Acknowledgement timeout configuration.

  • reconsume-later sends the message to the retry letter topic using the reconsumeLater API to be reconsumed with a delay. The delay can be configured using the reconsumeLater.delay property and defaults to 3 seconds. Custom delay or properties per message can be configured by adding an instance of io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata to the failure metadata.

4.6.1. Acknowledgement timeout

Similar to the negative acknowledgement, with the acknowledgement timeout mechanism, the Pulsar client tracks the unacknowledged messages, for the given ackTimeout period and sends redeliver unacknowledged messages request to the broker, thus the broker resends the unacknowledged messages to the consumer.

To configure the timeout and redelivery backoff mechanism you can set ackTimeoutMillis and ackTimeout.redeliveryBackoff properties. The ackTimeout.redeliveryBackoff value accepts comma separated values of min delay in milliseconds, max delay in milliseconds and multiplier respectively:

mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2

4.6.2. Reconsume later and retry letter topic

The retry letter topic pushes messages that are not consumed successfully to a dead letter topic and continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.

mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.enableRetry=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2

4.6.3. Dead-letter topic

The dead letter topic pushes messages that are not consumed successfully to a dead letter topic an continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.

mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

Negative acknowledgment or acknowledgment timeout methods for redelivery will redeliver the whole batch of messages containing at least an unprocessed message. See Producer Batching for more information.

4.7. Receiving Pulsar Messages in Batches

By default, incoming methods receive each Pulsar message individually. You can enable batch mode using batchReceive=true property, or setting a batchReceivePolicy in consumer configuration.

@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
    for (PulsarMessage<Double> msg : messages) {
        msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
            String key = metadata.getKey();
            String topic = metadata.getTopicName();
            long timestamp = metadata.getEventTime();
            //... process messages
        });
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
    for (Message<Double> msg : messages) {
        //... process messages
    }
}

Or you can directly receive the list of payloads to the consume method:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Quarkus auto-detects batch types for incoming channels and sets batch configuration automatically. You can configure batch mode explicitly with mp.messaging.incoming.$channel.batchReceive property.

5. Sending messages to Pulsar

The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.

5.1. Example

Let’s imagine you have a Pulsar broker running, and accessible using the pulsar:6650 address. Configure your application to write the messages from the prices channel into a Pulsar Messages as follows:

mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
  1. Configure the Pulsar broker service url.

You don’t need to set the Pulsar topic, nor the producer name. By default, the connector uses the channel name (prices). You can configure the topic and producerName attributes to override them.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

Note that the generate method returns a Multi<Double>, which implements the Flow.Publisher interface. This publisher will be used by the framework to generate messages and send them to the configured Pulsar topic.

Instead of returning a payload, you can return a io.smallrye.reactive.messaging.pulsar.OutgoingMessage to send Pulsar messages:

@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

O conteúdo pode ser envolvido em uma org.eclipse.microprofile.reactive.messaging.Message para ter mais controle sobre os registros escritos:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

When sending Messages, you can add an instance of io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata to influence how the message is going to be written to Pulsar.

Other than method signatures returning a Flow.Publisher, outgoing method can also return single message. In this case the producer will use this method as generator to create an infinite stream.

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.2. Serialization and Pulsar Schema

The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. See the Pulsar Schema Configuration & Auto Schema Discovery for more information.

5.3. Sending key/value pairs

In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a KeyValue schema.

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

If you need more control on the written records, use PulsarOutgoingMessageMetadata.

5.4. Acknowledgement

Upon receiving a message from a Producer, a Pulsar broker assigns a MessageId to the message and sends it back to the producer, confirming that the message is published.

By default, the connector does wait for Pulsar to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

If a record cannot be written, the message is nacked.

The Pulsar client automatically retries sending messages in case of failure, until the send timeout is reached. The send timeout is configurable with sendTimeoutMs attribute and by default is 30 seconds.

5.5. Back-pressure and inflight records

The Pulsar outbound connector handles back-pressure, monitoring the number of pending messages waiting to be written to the Pulsar broker. The number of pending messages is configured using the maxPendingMessages attribute and defaults to 1000.

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one pending message gets acknowledged by the broker. Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged.

You can also remove the limit of pending messages by setting maxPendingMessages to 0. Note that Pulsar also enables to configure the number of pending messages per partition using maxPendingMessagesAcrossPartitions.

5.6. Producer Batching

By default, the Pulsar producer batches individual messages together to be published to the broker. You can configure batching parameters using batchingMaxPublishDelayMicros, batchingPartitionSwitchFrequencyByPublishDelay, batchingMaxMessages, batchingMaxBytes configuration properties, or disable it completely with batchingEnabled=false.

When using Key_Shared consumer subscriptions, the batcherBuilder can be configured to BatcherBuilder.KEY_BASED.

6. Pulsar Transactions and Exactly-Once Processing

Pulsar transactions enable event streaming applications to consume, process, and produce messages in one atomic operation.

Transactions allow one or multiple producers to send batch of messages to multiple topics where all messages in the batch are eventually visible to any consumer, or none is ever visible to consumers.

In order to be used, transaction support needs to be activated on the broker configuration, using transactionCoordinatorEnabled=true and systemTopicEnabled=true broker configuration.

On the client side, the transaction support also needs to be enabled on PulsarClient configuration:

mp.messaging.outgoing.tx-producer.enableTransaction=true

Pulsar connector provides PulsarTransactions custom emitter for writing records inside a transaction.

It can be used as a regular emitter @Channel:

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarTransactionalProducer {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<OutgoingMessage<Integer>> txProducer;

    @Inject
    @Channel("other-producer")
    PulsarTransactions<String> producer;

    @Incoming("in")
    public Uni<Void> emitInTransaction(Message<Integer> in) {
        return txProducer.withTransaction(emitter -> {
            emitter.send(OutgoingMessage.of("a", 1));
            emitter.send(OutgoingMessage.of("b", 2));
            emitter.send(OutgoingMessage.of("c", 3));
            producer.send(emitter, "4");
            producer.send(emitter, "5");
            producer.send(emitter, "6");
            return Uni.createFrom().completionStage(in::ack);
        });
    }

}

The function given to the withTransaction method receives a TransactionalEmitter for producing records, and returns a Uni that provides the result of the transaction. If the processing completes successfully, the producer is flushed and the transaction is committed. If the processing throws an exception, returns a failing Uni, or marks the TransactionalEmitter for abort, the transaction is aborted.

Multiple transactional producers can participate in a single transaction. This ensures all messages are sent using the started transaction and before the transaction is committed, all participating producers are flushed.

If this method is called on a Vert.x context, the processing function is also called on that context. Otherwise, it is called on the sending thread of the producer.

6.1. Processamento Exactly-Once (Exatamente Único)

Pulsar Transactions API also allows managing consumer offsets inside a transaction, together with produced messages. This in turn enables coupling a consumer with a transactional producer in a consume-transform-produce pattern, also known as exactly-once processing. It means that an application consumes messages, processes them, publishes the results to a topic, and commits offsets of the consumed messages in a transaction.

The PulsarTransactions emitter also provides a way to apply exactly-once processing to an incoming Pulsar message inside a transaction.

The following example includes a batch of Pulsar messages inside a transaction.

mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarExactlyOnceProcessor {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<Integer> txProducer;

    @Incoming("in-channel")
    public Uni<Void> emitInTransaction(PulsarIncomingBatchMessage<Integer> batch) {
        return txProducer.withTransactionAndAck(batch, emitter -> {
            for (PulsarMessage<Integer> record : batch) {
                emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey()));
            }
            return Uni.createFrom().voidItem();
        });
    }

}

If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.

When using exactly-once processing, messages can only be acked individually rather than cumulatively.

If the processing needs to abort, the message is nack’ed. One of the failure strategies can be employed in order to retry the processing or simply fail-stop. Note that the Uni returned from the withTransaction will yield a failure if the transaction fails and is aborted.

The application can choose to handle the error case, but for the message consumption to continue, Uni returned from the @Incoming method must not result in failure. PulsarTransactions#withTransactionAndAck method will ack and nack the message but will not stop the reactive stream. Ignoring the failure simply resets the consumer to the last committed offsets and resumes the processing from there.

In order to avoid duplicates in case of failure, it is recommended to enable message deduplication and batch index level acknowledgment on the broker side:

quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000

mp.messaging.incoming.data.batchIndexAckEnabled=true

7. Pulsar Schema Configuration & Auto Schema Discovery

Pulsar messages are stored with payloads as unstructured byte array. A Pulsar schema defines how to serialize structured data to the raw message bytes. The schema is applied in producers and consumers to write and read with an enforced data structure. It serializes data into raw bytes before they are published to a topic and deserializes the raw bytes before they are delivered to consumers.

Pulsar uses a schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic’s messages through brokers. By default the Apache BookKeeper is used to store schemas.

Pulsar API provides built-in schema information for a number of primitive types and complex types such as Key/Value, Avro and Protobuf.

The Pulsar Connector allows specifying the schema as a primitive type using the schema property:

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

If the value for the schema property matches a Schema Type a simple schema will be created with that type and will be used for that channel.

The Pulsar Connector allows configuring complex schema types by providing Schema beans through CDI, identified with the @Identifier qualifier.

For example the following bean provides an JSON schema and a Key/Value schema:

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

To configure the incoming channel users with defined schema, you need to set the schema property to the identifier of the schema user-schema:

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

If no schema property is found, the connector looks for Schema beans identified with the channel name. For example, the outgoing channel a-channel will use the key/value schema.

mp.messaging.outgoing.a-channel.connector=smallrye-pulsar

If no schema information is provided incoming channels will use Schema.AUTO_CONSUME(), whereas outgoing channels will use Schema.AUTO_PRODUCE_BYTES() schemas.

7.1. Auto Schema Discovery

When using SmallRye Reactive Messaging Pulsar (io.quarkus:quarkus-smallrye-reactive-messaging-pulsar), Quarkus can often automatically detect the correct Pulsar Schema to configure. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected @Channels.

Por exemplo, se você declarar

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

and your configuration indicates that the generated-price channel uses the smallrye-pulsar connector, then Quarkus will automatically set the schema attribute of the generated-price channel to Pulsar Schema INT32.

Da mesma forma, se você declarar

@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
    ...
}

and your configuration indicates that the my-pulsar-consumer channel uses the smallrye-pulsar connector, then Quarkus will automatically set the schema attribute to Pulsar BYTES Schema.

Finalmente, se você declarar

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

and your configuration indicates that the price-create channel uses the smallrye-pulsar connector, then Quarkus will automatically set the schema to Pulsar INT64 Schema.

The full set of types supported by the Pulsar Schema autodetection is:

  • short e java.lang.Short

  • int e java.lang.Integer

  • long e java.lang.Long

  • float e java.lang.Float

  • double e java.lang.Double

  • byte[]

  • java.time.Instant

  • java.sql.Timestamp

  • java.time.LocalDate

  • java.time.LocalTime

  • java.time.LocalDateTime

  • java.nio.ByteBuffer

  • classes generated from Avro schemas, as well as Avro GenericRecord, will be configured with AVRO schema type

  • classes generated from Protobuf schemas, will be configured with PROTOBUF schema type

  • other classes will automatically be configured with JSON schema type

Note that JSON schema type enforces schema validation.

In addition to those Pulsar-provided schemas, Quarkus provides following schema implementations without enforcing validation :

  • io.vertx.core.buffer.Buffer will be configured with io.quarkus.pulsar.schema.BufferSchema schema

  • io.vertx.core.json.JsonObject will be configured with io.quarkus.pulsar.schema.JsonObjectSchema schema

  • io.vertx.core.json.JsonArray will be configured with io.quarkus.pulsar.schema.JsonArraySchema schema

  • For schema-less Json serialization, if the schema configuration is set to ObjectMapper<fully_qualified_name_of_the_bean>, a Schema will be generated using the Jackson ObjectMapper, without enforcing a Pulsar Schema validation. io.quarkus.pulsar.schema.ObjectMapperSchema can be used to explicitly configure JSON schema without validation.

If a schema is set by configuration, it won’t be replaced by the auto-detection.

In case you have any issues with serializer auto-detection, you can switch it off completely by setting quarkus.reactive-messaging.pulsar.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

8. Dev Services para Pulsar

Com a extensão Smallrye Reactive Messaging Pulsar do Quarkus (quarkus-smallrye-reactive-messaging-pulsar), o Dev Services para Pulsar inicia automaticamente um broker Pulsar no modo de desenvolvimento e ao executar testes. Portanto, você não precisa iniciar um broker manualmente. O aplicativo é configurado automaticamente.

8.1. Ativando / Desativando Dev Services para o Pulsar

Os Dev services para Pulsar são ativados automaticamente, a menos que:

  • quarkus.pulsar.devservices.enabled é definido como false

  • o pulsar.client.serviceUrl está configurado

  • todos os canais Pulsar de mensagens reativas têm o atributo serviceUrl definido

Dev Services para a Pulsar dependem do Docker para iniciar o agente. Se o seu ambiente não for compatível com o Docker, você precisará iniciar o agente manualmente ou conectar-se a um agente já em execução. Você pode configurar o endereço do agente usando pulsar.client..

8.2. Broker partilhado

Na maioria das vezes, você precisa compartilhar o broker entre os aplicativos. O Dev Services para Pulsar implementa um mecanismo de descoberta de serviços para que seus vários aplicativos Quarkus em execução no modo de desenvolvimento compartilhem um único broker.

O Dev Services para Pulsar inicia o contêiner com a etiqueta quarkus-dev-service-pulsar que é utilizada para identificar o contêiner.

Se precisar de vários brokers (compartilhados), você pode configurar o atributo quarkus.pulsar.devservices.service-name e indicar o nome do broker. Ele procura um contêiner com o mesmo valor ou inicia um novo se nenhum for encontrado. O nome do serviço padrão é pulsar .

O compartilhamento é ativado por padrão no modo de desenvolvimento, mas desativado no modo de teste. Você pode desativar o compartilhamento com quarkus.pulsar.devservices.shared=false.

8.3. Definindo a porta

Por padrão, o Dev Services para Pulsar escolhe uma porta aleatória e configura o aplicativo. Você pode definir a porta configurando a propriedade quarkus.pulsar.devservices.port.

Note que o endereço anunciado pelo Pulsar é automaticamente configurado com a porta escolhida.

8.4. Configurando a imagem

O Dev Services para Pulsar suporta a imagem oficial do Apache Pulsar.

Um nome de imagem personalizado pode ser configurado. Por exemplo:

quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7

8.5. Configurar o broker Pulsar

É possível configurar o Dev Services para a Pulsar com a configuração personalizada do broker.

O exemplo seguinte ativa o suporte de transação:

quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true

9. Configuring Pulsar clients

Pulsar clients, consumers and producers are very customizable to configure how a Pulsar client application behaves.

The Pulsar connector creates a Pulsar client and, a consumer or a producer per channel, each with sensible defaults to ease their configuration. Although the creation is handled, all available configuration options remain configurable through Pulsar channels.

While idiomatic way of creating PulsarClient, PulsarConsumer or PulsarProducer are through builder APIs, in its essence those APIs build each time a configuration object, to pass onto the implementation. Those are ClientConfigurationData, ConsumerConfigurationData and ProducerConfigurationData.

Pulsar Connector allows receiving properties for those configuration objects directly. For example, the broker authentication information for PulsarClient is received using authPluginClassName and authParams properties. In order to configure the authentication for the incoming channel data :

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

Note that the Pulsar consumer property subscriptionInitialPosition is also configured with the Earliest value which represents with enum value SubscriptionInitialPosition.Earliest.

This approach covers most of the configuration cases. However, non-serializable objects such as CryptoKeyReader, ServiceUrlProvider etc. cannot be configured this way. The Pulsar Connector allows taking into account instances of Pulsar configuration data objects – ClientConfigurationData, ConsumerConfigurationData, ProducerConfigurationData:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-configuration, consumer-configuration or producer-configuration attributes:

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

If no [client|consumer|producer]-configuration is configured, the connector will look for instances identified with the channel name:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

You also can provide a Map<String, Object> containing configuration values by key:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public Map<String, Object> getProducerConfig() {
        return Map.of(
                "batcherBuilder", BatcherBuilder.KEY_BASED,
                "sendTimeoutMs", 3000,
                "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    }
}

Different configuration sources are loaded in the following order of precedence, from the least important to the highest:

  1. Map<String, Object> config map produced with default config identifier, default-pulsar-client, default-pulsar-consumer, default-pulsar-producer.

  2. Map<String, Object> config map produced with identifier in the configuration or channel name

  3. [Client|Producer|Consuemr]ConfigurationData object produced with identifier in the channel configuration or the channel name

  4. Channel configuration properties named with [Client|Producer|Consuemr]ConfigurationData field names.

See Referência de configuração for the exhaustive list of configuration options.

9.1. Configuring Pulsar Authentication

Pulsar provides a pluggable authentication framework, and Pulsar brokers/proxies use this mechanism to authenticate clients.

Clients can be configured in application.properties file using authPluginClassName and authParams attributes:

pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}

Or programmatically:

import java.util.Map;

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData config() {
        var data = new ClientConfigurationData();
        var auth = new AuthenticationBasic();
        auth.configure(Map.of("userId", "superuser", "password", "admin"));
        data.setAuthentication(auth);
        return data;
    }
}

9.1.1. Configuring access to Datastax Luna Streaming

Luna Streaming is a production-ready distribution of Apache Pulsar, with tools and support from DataStax. After creating your DataStax Luna Pulsar tenant, note the auto generated token, and configure the token authentication:

pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....

Make sure to create topics beforehand, or enable the Auto Topic Creation in the namespace configuration.

Note that the topic configuration needs to reference full name of topics:

mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices

9.1.2. Configuring access to StreamNative Cloud

StreamNative Cloud is a fully managed Pulsar-as-a-Service available in different deployment options, whether it is fully-hosted, on a public cloud but managed by StreamNative or self-managed on Kubernetes.

The StreamNative Pulsar clusters use Oauth2 authentication, so you need to make sure that a service account exists with required permissions to the Pulsar namespace/topic your application is using.

Next, you need to download the Key file (which serves as private key) of the service account and note the issuer URL (typically https://auth.streamnative.cloud/) and the audience (for example urn:sn:pulsar:o-rf3ol:redhat) for your cluster. The Pulsar Clients page in the Admin section in the StreamNative Cloud console helps you with this process.

To configure your application with Pulsar Oauth2 authentication:

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}

Note that the pulsar.client.authParams configuration contains a Json string with issuerUrl, audience and the privateKey in the data:application/json;base64,<base64-encoded-key-file> format.

Alternatively you can configure the authentication programmatically:

package org.acme.pulsar;

import java.net.MalformedURLException;
import java.net.URL;

import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

@ApplicationScoped
public class PulsarAuth {

    @ConfigProperty(name = "pulsar.issuerUrl")
    String issuerUrl;

    @ConfigProperty(name = "pulsar.credentials")
    String credentials;

    @ConfigProperty(name = "pulsar.audience")
    String audience;

    @Produces
    @Identifier("pulsar-auth")
    public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
        var data = new ClientConfigurationData();
        data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
        return data;
    }
}

This assumes that the key file is included to the application classpath as a resource, then the configuration would like the following:

mp.messaging.incoming.prices.client-configuration=pulsar-auth

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json

Note that channels using the client configuration identified with pulsar-auth need to set the client-configuration attribute.

10. Verificações de Integridade

The Quarkus extension reports startup, readiness and liveness of each channel managed by the Pulsar connector. Health checks rely on the Pulsar client to verify that a connection is established with the broker.

Startup and Readiness probes for both inbound and outbound channels report OK when the connection with the broker is established.

The Liveness probe for both inbound and outbound channels reports OK when the connection is established with the broker AND that no failures have been caught.

Note that a message processing failures nacks the message which is then handled by the failure-strategy. It is the responsibility of the failure-strategy to report the failure and influence the outcome of the liveness checks. The fail failure strategy reports the failure and so the liveness check will report the failure.

11. Referência de configuração

Following are the list of configuration attributes for the Pulsar connector channels, consumers, producers and clients. See the Pulsar Client Configuration for more information on how the Pulsar clients are configured.

11.1. Incoming channel configuration (receiving from Pulsar)

Os seguintes atributos são configurados utilizando:

mp.messaging.incoming.your-channel-name.attribute=value
Table 1. Atributos de entrada do conector 'smallrye-pulsar'
Atributo (alias) Descrição Tipo Obrigatório Padrão

ack-strategy

Especifique a estratégia de confirmação a ser aplicada quando uma mensagem produzida a partir de um registro for reconhecida. Os valores podem ser 'ack', 'cumulativos'.

string

falso

ack

ackTimeout.redeliveryBackoff

Valores separados por vírgulas para configurar o tempo limite do ack MultiplierRedeliveryBackoff, min delay, max delay, multiplicador.

string

falso

batchReceive

Se o recebimento em lote é usado para consumir mensagens

boolean

falso

false

client-configuration

Identificador de um bean CDI que fornece a configuração padrão do cliente Pulsar para esse canal. A configuração do canal ainda pode substituir qualquer atributo. O bean deve ter um tipo de Map<String, Object> e deve usar o qualificador @io.smallrye.common.annotation.Identifier para definir o identificador.

string

falso

consumer-configuration

Identificador de um bean CDI que fornece a configuração padrão do consumidor Pulsar para esse canal. A configuração do canal ainda pode substituir qualquer atributo. O bean deve ter um tipo de Map<String, Object> e deve usar o qualificador @io.smallrye.common.annotation.Identifier para definir o identificador.

string

falso

deadLetterPolicy.deadLetterTopic

Nome do tópico de dead letter para onde as mensagens com falha serão enviadas

string

falso

deadLetterPolicy.initialSubscriptionName

Nome do nome da assinatura inicial do tópico de dead letter

string

falso

deadLetterPolicy.maxRedeliverCount

Número máximo de vezes que uma mensagem será entregue novamente antes de ser enviada para o tópico de letra morta

int

falso

deadLetterPolicy.retryLetterTopic

Nome do tópico de repetição para onde as mensagens com falha serão enviadas

string

falso

failure-strategy

Especifique a estratégia de falha a ser aplicada quando uma mensagem produzida a partir de um registro for reconhecida negativamente (nack). Os valores podem ser 'nack' (padrão), 'fail', 'ignore' ou 'reconsume-later

string

falso

nack

health-enabled

Se o relatório de integridade está habilitado (padrão) ou desabilitado

boolean

falso

true

negativeAck.redeliveryBackoff

Valores separados por vírgulas para configurar o ack negativo MultiplierRedeliveryBackoff, min delay, max delay, multiplicador.

string

falso

reconsumeLater.delay

Atraso padrão para estratégia de falha de reconsumo, em segundos

long

falso

3

schema

O tipo de esquema Pulsar deste canal. Quando configurado, um esquema é criado com o SchemaType fornecido e usado para o canal. Quando ausente, o esquema é resolvido procurando um bean CDI digitado 'Esquema' qualificado com '@Identifier' e o nome do canal. Como fallback AUTO_CONSUME ou AUTO_PRODUCE são usados.

string

falso

serviceUrl

A URL do serviço Pulsar

string

falso

pulsar://localhost:6650

topic

O tópico Pulsar consumido/povoado. Se não estiver definido, o nome do canal será usado

string

falso

tracing-enabled

Se o rastreamento está habilitado (padrão) ou desabilitado

boolean

falso

true

You can also configure properties supported by the underlying Pulsar consumer.

These properties can also be globally configured using pulsar.consumer prefix:

pulsar.consumer.subscriptionInitialPosition=Earliest
Table 2. Atributos do consumidor Pulsar
Atributo Descrição Tipo Arquivo de configuração Padrão

topicNames

Nome do tópico

Conjunto

true

[]

topicsPattern

Padrão de tópico

Padrão

true

subscriptionName

Nome da assinatura

String

true

subscriptionType

Tipo de assinatura.
Quatro tipos de assinatura estão disponíveis:
* Exclusive
* Failover
* Shared
* Key_Shared

SubscriptionType

true

Exclusive

subscriptionProperties

Map

true

subscriptionMode

SubscriptionMode

true

Durable

messageListener

MessageListener

falso

consumerEventListener

ConsumerEventListener

falso

negativeAckRedeliveryBackoff

Interface para mensagem personalizada é política negativeAcked. Você pode especificar 'RedeliveryBackoff' para um consumidor.

RedeliveryBackoff

falso

ackTimeoutRedeliveryBackoff

Interface para mensagem personalizada é a política ackTimeout. Você pode especificar 'RedeliveryBackoff' para um consumidor.

RedeliveryBackoff

falso

receiverQueueSize

Tamanho da fila de destinatários de um consumidor.
Por exemplo, o número de mensagens acumuladas por um consumidor antes de um aplicativo chamar 'Receber'.
Um valor maior que o valor padrão aumenta a taxa de transferência do consumidor, embora às custas de mais utilização de memória.

int

true

1000

acknowledgementsGroupTimeMicros

Agrupe uma confirmação de consumidor por um tempo especificado.
Por padrão, um consumidor usa o tempo de agrupamento de 100ms para enviar confirmações a um corretor.
Definir um tempo de grupo de 0 envia confirmações imediatamente.
Um tempo de grupo mais longo é mais eficiente às custas de um ligeiro aumento nas reentregas de mensagens após uma falha.

long

true

100000

maxAcknowledgmentGroupSize

Agrupe uma confirmação de consumidor para o número de mensagens.

int

true

1000

negativeAckRedeliveryDelayMicros

Atraso para aguardar antes de entregar novamente as mensagens que não puderam ser processadas.
Quando um aplicativo usa 'Consumer#negativeAcknowledge(Message)', as mensagens com falha são entregues novamente após um tempo limite fixo.

long

true

60000000

maxTotalReceiverQueueSizeAcrossPartitions

O tamanho máximo total da fila do receptor nas partições.
Essa configuração reduz o tamanho da fila do receptor para partições individuais se o tamanho total da fila do receptor exceder esse valor.

int

true

50000

consumerName

Nome do consumidor

String

true

ackTimeoutMillis

Tempo limite de mensagens não acked

long

true

0

tickDurationMillis

Granularidade da reentrega do tempo limite de ack.
Usar um 'tickDurationMillis' mais alto reduz a sobrecarga de memória para rastrear mensagens ao definir o tempo limite de ack-para um valor maior (por exemplo, 1 hora).

long

true

1000

priorityLevel

Nível de prioridade para um consumidor ao qual um corretor dá mais prioridade ao enviar mensagens no tipo de assinatura compartilhada.
O corretor segue prioridades decrescentes. Por exemplo, 0=max-priority, 1, 2,…​
No tipo de assinatura compartilhada, o corretor primeiro envia mensagens para os consumidores de nível máximo de prioridade se eles tiverem permissões. Caso contrário, o corretor considera os consumidores de nível de prioridade seguinte.
Exemplo 1
Se uma assinatura tiver consumerA com 'priorityLevel' 0 e consumerB com 'priorityLevel' 1, o corretor somente despachará mensagens para consumerA até que ele esgote as permissões e, em seguida, começará a enviar mensagens para consumerB.
Exemplo 2
Prioridade do Consumidor, Nível, Licenças
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

A ordem em que um corretor envia mensagens para os consumidores é: C1, C2, C3, C1, C4, C5, C4.

int

true

0

maxPendingChunkedMessage

O tamanho máximo de uma fila que contém mensagens em partes pendentes. Quando o limite é atingido, o consumidor descarta mensagens pendentes para otimizar a utilização da memória.

int

true

10

autoAckOldestChunkedMessageOnQueueFull

Se as mensagens em bloco pendentes devem ser confirmadas automaticamente quando o limite de 'maxPendingChunkedMessage' for atingido. Se definido como 'false', essas mensagens serão reentregues pelo corretor.

boolean

true

falso

expireTimeOfIncompleteChunkedMessageMillis

O intervalo de tempo para expirar blocos incompletos se um consumidor não receber todos os blocos no período de tempo especificado. O valor padrão é 1 minuto.

long

true

60000

cryptoKeyReader

CryptoKeyReader

falso

messageCrypto

MessageCrypto

falso

cryptoFailureAction

O consumidor deve agir quando receber uma mensagem que não pode ser descriptografada.
* FAIL: esta é a opção padrão para falhar mensagens até que a criptografia seja bem-sucedida.
* DESCARTAR:reconhecer silenciosamente e não entregar mensagem a um aplicativo.
* CONSUME: entregar mensagens criptografadas para aplicativos. É responsabilidade do aplicativo descriptografar a mensagem.

A descompactação da mensagem falha.

Se as mensagens contiverem mensagens em lote, um cliente não poderá recuperar mensagens individuais em lote.

A mensagem criptografada entregue contém 'EncryptionContext' que contém informações de criptografia e compactação usando qual aplicativo pode descriptografar a carga útil da mensagem consumida.

ConsumerCryptoFailureAction

true

FAIL

properties

Um nome ou propriedade de valor desse consumidor.

'propriedades' são metadados definidos pelo aplicativo anexados a um consumidor.

Ao obter estatísticas de tópico, associe esses metadados às estatísticas do consumidor para facilitar a identificação.

SortedMap

true

{}

readCompacted

Se habilitar 'readCompacted', um consumidor lerá mensagens de um tópico compactado em vez de ler uma lista de pendências de mensagens completa de um tópico.

Um consumidor só vê o valor mais recente para cada chave no tópico compactado, até chegar ao ponto na mensagem de tópico ao compactar a lista de pendências. Além desse ponto, envie mensagens normalmente.

Habilitar apenas 'readCompacted' em assinaturas de tópicos persistentes, que tenham um único consumidor ativo (como falha ou assinaturas exclusivas).

A tentativa de habilitá-lo em assinaturas de tópicos não persistentes ou em assinaturas compartilhadas leva a uma chamada de assinatura lançando um 'PulsarClientException'.

boolean

true

falso

subscriptionInitialPosition

Posição inicial na qual definir o cursor ao se inscrever em um tópico pela primeira vez.

SubscriptionInitialPosition

true

Latest

patternAutoDiscoveryPeriod

Período de descoberta automática de tópicos ao usar um padrão para o consumidor do tópico.

O valor padrão e mínimo é 1 minuto.

int

true

60

regexSubscriptionMode

Ao assinar um tópico usando uma expressão regular, você pode escolher um determinado tipo de tópicos.

* PersistentOnly: inscreva-se apenas em tópicos persistentes.
* NonPersistentOnly: inscreva-se apenas em tópicos não persistentes.
* AllTopics: inscreva-se em tópicos persistentes e não persistentes.

RegexSubscriptionMode

true

PersistentOnly

deadLetterPolicy

Política de dead letter para os consumidores.

Por padrão, algumas mensagens provavelmente são reentregues muitas vezes, até mesmo na medida em que nunca param.

Usando o mecanismo de dead letter, as mensagens têm a contagem máxima de reentrega. Ao exceder o número máximo de reentregas, as mensagens são enviadas para o Tópico de Dead Letter e reconhecidas automaticamente.

Você pode ativar o mecanismo de letra morta definindo 'deadLetterPolicy'.

Ao especificar a política de dead letter enquanto não especifica 'ackTimeoutMillis', você pode definir o tempo limite de ack para 30000 milissegundos.

DeadLetterPolicy

true

retryEnable

boolean

true

falso

batchReceivePolicy

BatchReceivePolicy

falso

autoUpdatePartitions

If autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.

Note: this is only for partitioned consumers.

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

replicateSubscriptionState

Se 'replicateSubscriptionState' estiver habilitado, um estado de assinatura será replicado para clusters replicados geograficamente.

boolean

true

falso

resetIncludeHead

boolean

true

falso

keySharedPolicy

KeySharedPolicy

falso

batchIndexAckEnabled

boolean

true

falso

ackReceiptEnabled

boolean

true

falso

poolMessages

boolean

true

falso

payloadProcessor

MessagePayloadProcessor

falso

startPaused

boolean

true

falso

autoScaledReceiverQueueSizeEnabled

boolean

true

falso

topicConfigurations

List

true

[]

11.2. Outgoing channel configuration (publishing to Pulsar)

Table 3. Atributos de saída do conector 'smallrye-pulsar'
Atributo (alias) Descrição Tipo Obrigatório Padrão

client-configuration

Identificador de um bean CDI que fornece a configuração padrão do cliente Pulsar para esse canal. A configuração do canal ainda pode substituir qualquer atributo. O bean deve ter um tipo de Map<String, Object> e deve usar o qualificador @io.smallrye.common.annotation.Identifier para definir o identificador.

string

falso

health-enabled

Se o relatório de integridade está habilitado (padrão) ou desabilitado

boolean

falso

true

maxPendingMessages

O tamanho máximo de uma fila que contém mensagens pendentes, ou seja, mensagens aguardando para receber uma confirmação de um broker

int

falso

1000

producer-configuration

Identificador de um bean CDI que fornece a configuração padrão do produtor Pulsar para esse canal. A configuração do canal ainda pode substituir qualquer atributo. O bean deve ter um tipo de Map<String, Object> e deve usar o qualificador @io.smallrye.common.annotation.Identifier para definir o identificador.

string

falso

schema

O tipo de esquema Pulsar deste canal. Quando configurado, um esquema é criado com o SchemaType fornecido e usado para o canal. Quando ausente, o esquema é resolvido procurando um bean CDI digitado 'Esquema' qualificado com '@Identifier' e o nome do canal. Como fallback AUTO_CONSUME ou AUTO_PRODUCE são usados.

string

falso

serviceUrl

A URL do serviço Pulsar

string

falso

pulsar://localhost:6650

topic

O tópico Pulsar consumido/populado. Se não estiver definido, o nome do canal será usado

string

falso

tracing-enabled

Se o rastreamento está habilitado (padrão) ou desabilitado

boolean

falso

true

waitForWriteCompletion

Se o cliente espera que o broker reconheça o registro escrito antes de reconhecer a mensagem

boolean

falso

true

You can also configure properties supported by the underlying Pulsar producer.

These properties can also be globally configured using pulsar.producer prefix:

pulsar.producer.batchingEnabled=false
Table 4. Atributos do produtor Pulsar
Atributo Descrição Tipo Arquivo de configuração Padrão

topicName

Nome do tópico

String

true

producerName

Nome do produtor

String

true

sendTimeoutMs

Tempo limite de envio de mensagem em ms.
Se uma mensagem não for reconhecida por um servidor antes que o 'sendTimeout' expire, ocorrerá um erro.

long

true

30000

blockIfQueueFull

Se estiver definido como 'true', quando a fila de mensagens de saída estiver cheia, os métodos 'Send' e 'SendAsync' do produtor bloqueiam, em vez de falhar e lançar erros.
Se ele estiver definido como 'false', quando a fila de mensagens de saída estiver cheia, os métodos 'Send' e 'SendAsync' do produtor falham e as exceções 'ProducerQueueIsFullError' ocorrem.

O parâmetro 'MaxPendingMessages' determina o tamanho da fila de mensagens de saída.

boolean

true

falso

maxPendingMessages

O tamanho máximo de uma fila que contém mensagens pendentes.

Por exemplo, uma mensagem aguardando para receber uma confirmação de um broker.

Por padrão, quando a fila está cheia, todas as chamadas para os métodos 'Send' e 'SendAsync' falham **a menos que você defina 'BlockIfQueueFull' como 'true'.

int

true

0

maxPendingMessagesAcrossPartitions

O número máximo de mensagens pendentes entre partições.

Use a configuração para diminuir o máximo de mensagens pendentes para cada partição ('#setMaxPendingMessages(int)') se o número total exceder o valor configurado.

int

true

0

messageRoutingMode

Lógica de roteamento de mensagens para produtores em partitioned topics.
Aplique a lógica somente ao definir nenhuma tecla nas mensagens.
As opções disponíveis são as seguintes:
* 'pulsar. RoundRobinDistribution': round robin
* 'pulsar. UseSinglePartition': publicar todas as mensagens em uma única partição
* 'pulsar. CustomPartition': um esquema de particionamento personalizado

MessageRoutingMode

true

hashingScheme

Função de hash que determina a partição onde você publica uma mensagem específica (somente tópicos particionados).
As opções disponíveis são as seguintes:
* 'pulsar. JavastringHash': o equivalente a 'string.hashCode()' em Java
* 'pulsar. Murmur3_32Hash': aplica a função de hash Murmur3
* 'pulsar. BoostHash': aplica a função de hash da biblioteca Boost do C++

HashingScheme

true

JavaStringHash

cryptoFailureAction

O produtor deve tomar medidas quando a criptografia falhar.
* FAIL: se a criptografia falhar, as mensagens não criptografadas não serão enviadas.
* SEND: se a criptografia falhar, mensagens não criptografadas serão enviadas.

ProducerCryptoFailureAction

true

FAIL

customMessageRouter

MessageRouter

falso

batchingMaxPublishDelayMicros

Período de tempo de envio de mensagens em lote.

long

true

1000

batchingPartitionSwitchFrequencyByPublishDelay

int

true

10

batchingMaxMessages

O número máximo de mensagens permitidas em um lote.

int

true

1000

batchingMaxBytes

int

true

131072

batchingEnabled

Habilite o envio em lote de mensagens.

boolean

true

true

batcherBuilder

BatcherBuilder

falso

chunkingEnabled

Habilite o fragmento de mensagens.

boolean

true

falso

chunkMaxMessageSize

int

true

-1

cryptoKeyReader

CryptoKeyReader

falso

messageCrypto

MessageCrypto

falso

encryptionKeys

Conjunto

true

[]

compressionType

Tipo de compactação de dados de mensagem usado por um produtor.
Opções disponíveis:
* LZ4
* ZLIB
* ZSTD
* SNAPPY

CompressionType

true

NONE

initialSequenceId

Long

true

autoUpdatePartitions

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

multiSchema

boolean

true

true

accessMode

ProducerAccessMode

true

Shared

lazyStartPartitionedProducers

boolean

true

falso

properties

SortedMap

true

{}

initialSubscriptionName

Use essa configuração para criar automaticamente uma assinatura inicial ao criar um tópico. Se esse campo não estiver definido, a assinatura inicial não será criada.

String

true

11.3. Pulsar Client Configuration

Following is the configuration reference for the underlying PulsarClient. These options can be configured using the channel attribute:

mp.messaging.incoming.your-channel-name.numIoThreads=4

Or configured globally using pulsar.client prefix:

pulsar.client.serviceUrl=pulsar://pulsar:6650
Table 5. Atributos do cliente Pulsar
Atributo Descrição Tipo Arquivo de configuração Padrão

serviceUrl

URL HTTP do cluster pulsar para se conectar a um broker.

String

true

serviceUrlProvider

A classe de implementação de ServiceUrlProvider usada para gerar ServiceUrl.

ServiceUrlProvider

falso

authentication

Configurações de autenticação do cliente.

Autenticação

falso

authPluginClassName

Nome da classe do plugin de autenticação do cliente.

String

true

authParams

Parâmetro de autenticação do cliente.

String

true

authParamMap

Mapa de autenticação do cliente.

Map

true

operationTimeoutMs

Tempo limite da operação do cliente (em milissegundos).

long

true

30000

lookupTimeoutMs

Tempo limite de pesquisa do cliente (em milissegundos).

long

true

-1

statsIntervalSeconds

Intervalo para imprimir estatísticas do cliente (em segundos).

long

true

60

numIoThreads

Número de threads de E/S.

int

true

10

numListenerThreads

Número de threads de consumer listeners.

int

true

10

connectionsPerBroker

Número de conexões estabelecidas entre o cliente e cada Broker. Um valor 0 significa desabilitar o pool de conexões.

int

true

1

connectionMaxIdleSeconds

Solte a conexão se ela não for usada por mais de [connectionMaxIdleSeconds] segundos. Se [connectionMaxIdleSeconds] < 0, desabilitado o recurso que libera automaticamente as conexões ociosas

int

true

180

useTcpNoDelay

Se deve usar a opção TCP NoDelay.

boolean

true

true

useTls

Se deve usar TLS.

boolean

true

falso

tlsKeyFilePath

Caminho para o arquivo de chave TLS.

String

true

tlsCertificateFilePath

Caminho para o arquivo de certificado TLS.

String

true

tlsTrustCertsFilePath

Caminho para o arquivo de certificado TLS confiável.

String

true

tlsAllowInsecureConnection

Se o cliente aceita certificados TLS não confiáveis do broker.

boolean

true

falso

tlsHostnameVerificationEnable

Se o nome do host é validado quando o cliente cria uma conexão TLS com brokers.

boolean

true

falso

concurrentLookupRequest

O número de solicitações de pesquisa simultâneas que podem ser enviadas em cada conexão do broker. Definir um máximo evita sobrecarregar um broker.

int

true

5000

maxLookupRequest

Número máximo de solicitações de pesquisa permitidas em cada conexão do broker para evitar a sobrecarga de um broker.

int

true

50000

maxLookupRedirects

Tempos máximos de solicitações de pesquisa redirecionadas.

int

true

20

maxNumberOfRejectedRequestPerConnection

Número máximo de solicitações rejeitadas de um broker em um determinado período de tempo (60 segundos) após a conexão atual ser fechada e o cliente criar uma nova conexão para se conectar a um broker diferente.

int

true

50

keepAliveIntervalSeconds

Segundos de intervalo de manutenção ativa para cada conexão do broker do cliente.

int

true

30

connectionTimeoutMs

Duração da espera para que uma conexão com um broker seja estabelecida. Se a duração passar sem uma resposta de um broker, a tentativa de conexão será descartada.

int

true

10000

requestTimeoutMs

Duração máxima para concluir uma solicitação.

int

true

60000

readTimeoutMs

Tempo máximo de leitura de uma solicitação.

int

true

60000

autoCertRefreshSeconds

Segundos de atualização automática do certificado.

int

true

300

initialBackoffIntervalNanos

Intervalo de recuo inicial (em nanossegundos).

long

true

100000000

maxBackoffIntervalNanos

Intervalo máximo de recuo (em nanossegundos).

long

true

60000000000

enableBusyWait

Se deseja habilitar BusyWait para EpollEventLoopGroup.

boolean

true

falso

listenerName

Nome do ouvinte para pesquisa. Os clientes podem usar listenerName para escolher um dos ouvintes como a URL do serviço para criar uma conexão com o broker, desde que a rede esteja acessível." advertisedListeners" deve ser habilitado no lado do corretor.

String

true

useKeyStoreTls

Defina o TLS usando o modo KeyStore.

boolean

true

falso

sslProvider

O provedor TLS usado por um cliente interno para autenticar com outros brokers Pulsar.

String

true

tlsKeyStoreType

Configuração do tipo TLS KeyStore.

String

true

JKS

tlsKeyStorePath

Caminho do TLS KeyStore.

String

true

tlsKeyStorePassword

Senha do TLS KeyStore.

String

true

tlsTrustStoreType

Configuração do tipo TLS TrustStore. Você precisa definir essa configuração quando a autenticação do cliente é necessária.

String

true

JKS

tlsTrustStorePath

Caminho do TLS TrustStore.

String

true

tlsTrustStorePassword

Senha do TLS TrustStore.

String

true

tlsCiphers

Conjunto de cifras TLS.

Conjunto

true

[]

tlsProtocols

Protocolos de TLS.

Conjunto

true

[]

memoryLimitBytes

Limite de uso de memória do cliente (em byte). O padrão de 64 milhões pode garantir uma alta taxa de transferência do produtor.

long

true

67108864

proxyServiceUrl

URL do serviço de proxy. proxyServiceUrl e proxyProtocol devem ser mutuamente inclusivos.

String

true

proxyProtocol

Protocolo do serviço de proxy. proxyServiceUrl e proxyProtocol devem ser mutuamente inclusivos.

ProxyProtocol

true

enableTransaction

Se deve habilitar a transação.

boolean

true

falso

clock

Relógio

falso

dnsLookupBindAddress

O endereço de ligação de pesquisa dns do cliente Pulsar, o comportamento padrão é vincular em 0.0.0.0

String

true

dnsLookupBindPort

A porta de ligação de pesquisa dns do cliente Pulsar, entra em vigor quando dnsLookupBindAddress é configurado, o valor padrão é 0.

int

true

0

socks5ProxyAddress

Endereço do proxy SOCKS5.

InetSocketAddress

true

socks5ProxyUsername

Nome de usuário do proxy SOCKS5.

String

true

socks5ProxyPassword

Senha do proxy SOCKS5.

String

true

description

A descrição extra da versão do cliente. O comprimento não pode exceder 64.

String

true

Configuration properties not configurable in configuration files (non-serializable) is noted in the column Config file.

12. Indo mais longe

This guide has shown how you can interact with Pulsar using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.

Se quiser ir mais longe, consulte a documentação da Mensageria Reativa do SmallRye, a implementação utilizada no Quarkus.

Conteúdo Relacionado