Apache Pulsar Reference Guide

本参考指南展示了 Quarkus 应用程序如何使用 Quarkus Messaging 与 Apache Pulsar 交互。

This reference guide demonstrates how your Quarkus application can utilize Quarkus Messaging to interact with Apache Pulsar.

Introduction

Apache Pulsar是一个开源分布式消息和流平台,专为云而构建。它提供了一个针对具有分层存储功能的服务器消息的多租户高性能解决方案。

Apache Pulsar is an open-source, distributed messaging and streaming platform built for the cloud. It provides a multi-tenant, high-performance solution to server messaging with tiered storage capabilities.

Pulsar 实现了发布订阅模式:

Pulsar implements the publish-subscribe pattern:

  • Producers publish messages to topics.

  • Consumers create subscriptions to those topics to receive and process incoming messages, and send acknowledgments to the broker when processing is finished.

  • When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully.

Pulsar 集群包括:

A Pulsar cluster consists of

  • One or more brokers, which are stateless components.

  • A metadata store for maintaining topic metadata, schema, coordination and cluster configuration.

  • A set of bookies used for persistent storage of messages.

Quarkus Extension for Apache Pulsar

Quarkus 通过 SmallRye Reactive Messaging 框架支持 Apache Pulsar。基于 Eclipse MicroProfile Reactive Messaging 规范 3.0,它提出了桥接 CDI 和事件驱动的灵活编程模型。

Quarkus provides support for Apache Pulsar through SmallRye Reactive Messaging framework. Based on Eclipse MicroProfile Reactive Messaging specification 3.0, it proposes a flexible programming model bridging CDI and event-driven.

本指南深入介绍了 Apache Pulsar 和 SmallRye Reactive Messaging 框架。快速入门,请参阅 Getting Started to Quarkus Messaging with Apache Pulsar

This guide provides an in-depth look on Apache Pulsar and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to Quarkus Messaging with Apache Pulsar.

你可以在项目基本目录中运行以下命令向你的项目添加 messaging-pulsar 扩展:

You can add the messaging-pulsar extensions to your project by running the following command in your project base directory:

Unresolved directive in pulsar.adoc - include::{includes}/devtools/extension-add.adoc[]

这会将以下内容添加到构建文件中:

This will add the following to your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-pulsar")

该扩展包括 pulsar-clients-original 3.0.0 版本作为传递依赖项,并且兼容于 Pulsar 代理版本 2.10.x。

The extension includes pulsar-clients-original version 3.0.0 as a transitive dependency and is compatible with Pulsar brokers version 2.10.x.

Configuring SmallRye Pulsar Connector

因为 SmallRye Reactive Messaging 框架支持不同的消息传递后端,例如 Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT 等,所以它采用通用词汇:

Because SmallRye Reactive Messaging framework supports different messaging backends like Apache Kafka, Apache Pulsar, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:

  • Applications send and receive messages. Message wraps a payload and can be extended with some metadata. This should not be confused with a Pulsar Message, which consists of value, key With the Pulsar connector, a Reactive Messaging message corresponds to a Pulsar message.

  • Messages transit on channels. Application components connect to channels to publish and consume messages. The Pulsar connector maps channels to Pulsar topics.

  • Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Pulsar is named smallrye-pulsar.

具有传入通道的 Pulsar 连接器的最小配置如下:

A minimal configuration for the Pulsar connector with an incoming channel looks like the following:

%prod.pulsar.client.serviceUrl=pulsar:6650 1
mp.messaging.incoming.prices.connector=smallrye-pulsar 2
1 Configure the Pulsar broker service url for the production profile. You can configure it globally or per channel using mp.messaging.incoming.$channel.serviceUrl property. In dev mode and when running tests, Dev Services for Pulsar automatically starts a Pulsar broker.
2 Configure the connector to manage the prices channel. By default, the topic name is same as the channel name.

你可以配置主题属性来覆盖它。

You can configure the topic attribute to override it.

%prod 前缀表示该属性仅在应用程序在生产模式下运行时使用(因此在开发或测试模式下不使用)。请参阅 Profile documentation 了解详细信息。

The %prod prefix indicates that the property is only used when the application runs in prod mode (so not in dev or test). Refer to the Profile documentation for further details.

Connector auto-attachment

如果你的类路径上有一个连接器,则可以省略 connector 属性配置。Quarkus 自动将 orphan 通道关联到类路径上的(唯一)连接器。Orphan 通道是下游没有消费者的传出通道,或上游没有生产者的传入通道。

If you have a single connector on your classpath, you can omit the connector attribute configuration. Quarkus automatically associates orphan channels to the (unique) connector found on the classpath. Orphan channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer.

可以使用以下方法禁用此自动附加功能:

This auto-attachment can be disabled using:

quarkus.messaging.auto-connector-attachment=false

有关更多配置选项,请参见 Configuring Pulsar clients

For more configuration options see Configuring Pulsar clients.

Receiving messages from Pulsar

Pulsar 连接器使用 Pulsar 客户端连接到 Pulsar 代理,并创建消费者以接收来自 Pulsar 代理的消息,并且将每个 Pulsar Message`映射到反应式消息传递 `Message

The Pulsar Connector connects to a Pulsar broker using a Pulsar client and creates consumers to receive messages from Pulsar brokers, and it maps each Pulsar Message into Reactive Messaging Message.

Example

假设您有一个正在运行且可以使用 `pulsar:6650`地址访问的 Pulsar 代理。按如下方式配置您的应用程序以接收 `prices`通道上的 Pulsar 消息:

Let’s imagine you have a Pulsar broker running, and accessible using the pulsar:6650 address. Configure your application to receive Pulsar messages on the prices channel as follows:

mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
  1. Configure the Pulsar broker service url.

  2. Make sure consumer subscription starts receiving messages from the Earliest position.

您不需要设置 Pulsar 主题,也不需要设置消费者名称。默认情况下,连接器使用通道名称 (prices)。您可以配置 `topic`和 `consumerName`属性来覆盖它们。

You don’t need to set the Pulsar topic, nor the consumer name. By default, the connector uses the channel name (prices). You can configure the topic and consumerName attributes to override them.

在 Pulsar 中,消费者需要为主题订阅提供 subscriptionName。如果未提供,连接器会生成一个唯一的 subscription name

In Pulsar, consumers need to provide a subscriptionName for topic subscriptions. If not provided the connector generates a unique subscription name.

然后,您的应用程序可以直接接收 `double`有效负载:

Then, your application can receive the double payload directly:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,您可以检索反应式消息传递类型 Message<Double>

Or, you can retrieve the Reactive Messaging type Message<Double>:

@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
    return msg.ack();
}

反应式消息传递 `Message`类型允许使用的方法访问传入的消息元数据,并手动处理确认。

The Reactive Messaging Message type lets the consuming method access the incoming message metadata and handle the acknowledgment manually.

如果您想直接访问 Pulsar 消息对象,请使用:

If you want to access the Pulsar message objects directly, use:

@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
    String key = msg.getKey();
    String value = msg.getValue();
    String topic = msg.topicName();
    // ...
}

`org.apache.pulsar.client.api.Message`由底层 Pulsar 客户端提供,可以直接在消费者方法中使用。

org.apache.pulsar.client.api.Message is provided by the underlying Pulsar client and can be used directly with the consumer method.

或者,您的应用程序可以在您的 bean 中注入一个 Multi,使用通道名称进行标识,并订阅它的事件,如下例所示:

Alternatively, your application can inject a Multi in your bean, identified with the channel name and subscribe to its events as the following example:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

使用 @Channel 使用消息时,应用程序代码负责订阅。在上面的示例中,Quarkus REST 端点(以前的 RESTEasy Reactive)会为您处理此事。

When consuming messages with @Channel, the application code is responsible for the subscription. In the example above, the Quarkus REST (formerly RESTEasy Reactive) endpoint handles that for you.

可以将以下类型注入为通道:

Following types can be injected as channels:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

与前面的 Message`示例一样,如果注入的通道接收有效负载 (`Multi<T>),则会自动确认消息,并支持多个订阅者。如果已注入的通道接收 Message (Multi<Message<T>>),您将负责确认和广播。

As with the previous Message example, if your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If your injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgment and broadcasting.

Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释:

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

可以在 SmallRye Reactive Messaging – Handling blocking execution 中找到关于 @Blocking 注释的用法详细信息。

Detailed information on the usage of @Blocking annotation can be found in SmallRye Reactive Messaging – Handling blocking execution.

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

@Transactional

如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。

If your method is annotated with @Transactional, it will be considered blocking automatically, even if the method is not annotated with @Blocking.

Pulsar Subscription Types

可以灵活地使用 Pulsar *subscriptionType*消费者配置来实现不同的消息传递场景,例如发布-订阅或排队。

Pulsar subscriptionType consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing.

  • Exclusive subscription type allows specifying a unique subscription name for "fan-out pub-sub messaging". This is the default subscription type.

  • Shared, Key_Shared or Failover subscription types allow multiple consumers to share the same subscription name, to achieve "message queuing" among consumers.

如果未提供订阅名称,Quarkus 将生成一个唯一 ID。

If a subscription name is not provided Quarkus generates a unique id.

Deserialization and Pulsar Schema

Pulsar 连接器允许配置 Pulsar 消费者基础架构的模式配置。有关更多信息,请参阅 Pulsar Schema Configuration & Auto Schema Discovery

The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar consumer. See the Pulsar Schema Configuration & Auto Schema Discovery for more information.

Acknowledgement Strategies

当由 Pulsar 消息产生的消息 acknowledged*时,连接器会向 Pulsar 代理发送 acknowledgement request。所有反应式消息传递消息都需要 *acknowledged,这在大多数情况下都是自动处理的。可以使用以下两种策略向 Pulsar 代理发送确认请求:

When a message produced from a Pulsar Message is acknowledged, the connector sends an acknowledgement request to the Pulsar broker. All Reactive Messaging messages need to be acknowledged, which is handled automatically in most cases. Acknowledgement requests can be sent to the Pulsar broker using the following two strategies:

  • Individual acknowledgement is the default strategy, an acknowledgement request is to the broker for each message.

  • Cumulative acknowledgement, configured using ack-strategy=cumulative, the consumer only acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.

默认情况下,Pulsar 消费器不会等待来自代理的确认确认来验证确认。你可以使用 `ackReceiptEnabled=true`启用此功能。

By default, the Pulsar consumer does not wait for the acknowledgement confirmation from the broker to validate an acknowledgement. You can enable this using ackReceiptEnabled=true.

Failure Handling Strategies

如果由 Pulsar 消息生成的消息是 nacked,则会应用错误策略。Quarkus Pulsar 扩展支持 4 个策略:

If a message produced from a Pulsar message is nacked, a failure strategy is applied. The Quarkus Pulsar extension supports 4 strategies:

  • nack (default) sends negative acknowledgment to the broker, triggering the broker to redeliver this message to the consumer. The negative acknowledgment can be further configured using negativeAckRedeliveryDelayMicros and negativeAck.redeliveryBackoff properties.

  • fail fail the application, no more messages will be processed.

  • ignore the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.

  • continue the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with Acknowledgement timeout configuration.

  • reconsume-later sends the message to the retry letter topic using the reconsumeLater API to be reconsumed with a delay. The delay can be configured using the reconsumeLater.delay property and defaults to 3 seconds. Custom delay or properties per message can be configured by adding an instance of io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata to the failure metadata.

Acknowledgement timeout

与否定确认类似,对于 acknowledgement timeout机制,Pulsar 客户端跟踪未确认的消息,针对给定的 ackTimeout*时间段,并向代理发送 *redeliver unacknowledged messages request,因此代理将未确认的消息重新发送给消费器。

Similar to the negative acknowledgement, with the acknowledgement timeout mechanism, the Pulsar client tracks the unacknowledged messages, for the given ackTimeout period and sends redeliver unacknowledged messages request to the broker, thus the broker resends the unacknowledged messages to the consumer.

要配置超时和重新传递退避机制,可以设置 `ackTimeoutMillis`和 `ackTimeout.redeliveryBackoff`属性。`ackTimeout.redeliveryBackoff`值接受以毫秒为单位的最小延迟、以毫秒为单位的最大延迟和乘数的逗号分隔值:

To configure the timeout and redelivery backoff mechanism you can set ackTimeoutMillis and ackTimeout.redeliveryBackoff properties. The ackTimeout.redeliveryBackoff value accepts comma separated values of min delay in milliseconds, max delay in milliseconds and multiplier respectively:

mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2

Reconsume later and retry letter topic

retry letter topic将未成功使用的消息推送到拒信主题中并继续消息使用。请注意,拒信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试信主题。

The retry letter topic pushes messages that are not consumed successfully to a dead letter topic and continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.

mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2

Dead-letter topic

dead letter topic将未成功使用的消息推送到拒信主题中,并继续消息使用。请注意,拒信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试信主题。

The dead letter topic pushes messages that are not consumed successfully to a dead letter topic an continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.

mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

用于重新传递的 *Negative acknowledgment*或 *acknowledgment timeout*方法将重新传递包含至少一条未处理消息的消息批。有关更多信息,请参见 Producer Batching

Negative acknowledgment or acknowledgment timeout methods for redelivery will redeliver the whole batch of messages containing at least an unprocessed message. See Producer Batching for more information.

Receiving Pulsar Messages in Batches

默认情况下,传入的方法单独接收每个 Pulsar 消息。你可以使用 `batchReceive=true`属性,或在消费器配置中设置 `batchReceivePolicy`来启用批模式。

By default, incoming methods receive each Pulsar message individually. You can enable batch mode using batchReceive=true property, or setting a batchReceivePolicy in consumer configuration.

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
        String key = msg.getKey();
        String topic = msg.getTopicName();
        long timestamp = msg.getEventTime();
        //... process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
        //... process messages
    }
}

或者你可以直接接收有效负载列表到消费方法:

Or you can directly receive the list of payloads to the consume method:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Quarkus 会自动侦测传入通道的批类型并自动设置批配置。你可以使用 `mp.messaging.incoming.$channel.batchReceive`属性明确配置批模式。

Quarkus auto-detects batch types for incoming channels and sets batch configuration automatically. You can configure batch mode explicitly with mp.messaging.incoming.$channel.batchReceive property.

Sending messages to Pulsar

Pulsar 连接器可以将响应式消息传递 Message 写为 Pulsar 消息。

The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message.

Example

假设你有一个正在运行且可以使用 `pulsar:6650`地址访问的 Pulsar 代理。按照以下方式配置你的应用程序,以将 `prices`通道中的消息写入 Pulsar 消息:

Let’s imagine you have a Pulsar broker running, and accessible using the pulsar:6650 address. Configure your application to write the messages from the prices channel into a Pulsar Messages as follows:

mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
  1. Configure the Pulsar broker service url.

无需设置 Pulsar 主题或生产者名称。默认情况下,连接器使用频道名称 (prices)。可以配置 topicproducerName 属性来覆盖它们。

You don’t need to set the Pulsar topic, nor the producer name. By default, the connector uses the channel name (prices). You can configure the topic and producerName attributes to override them.

然后,应用程序必须向 prices 频道发送 Message<Double>。它可以使用 double 有效负载,如下面的代码片段所示:

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

请注意,generate 方法返回 Multi<Double>,它实现了 Flow.Publisher 接口。此发布者将由框架用于生成消息并将其发送到已配置的 Pulsar 主题。

Note that the generate method returns a Multi<Double>, which implements the Flow.Publisher interface. This publisher will be used by the framework to generate messages and send them to the configured Pulsar topic.

除了返回有效负载,还可以返回 io.smallrye.reactive.messaging.pulsar.OutgoingMessage 来发送 Pulsar 消息:

Instead of returning a payload, you can return a io.smallrye.reactive.messaging.pulsar.OutgoingMessage to send Pulsar messages:

@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

有效负载可以封装在 `org.eclipse.microprofile.reactive.messaging.Message`中,以更好地控制写入记录:

Payload can be wrapped inside org.eclipse.microprofile.reactive.messaging.Message to have more control on the written records:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

发送 Messages 时,可以添加 io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata 的实例来影响消息写入 Pulsar 的方式。

When sending Messages, you can add an instance of io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata to influence how the message is going to be written to Pulsar.

除了返回 Flow.Publisher 的方法签名之外,传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器来创建无限流。

Other than method signatures returning a Flow.Publisher, outgoing method can also return single message. In this case the producer will use this method as generator to create an infinite stream.

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

Serialization and Pulsar Schema

Pulsar 连接器允许为底层 Pulsar 生产者配置架构配置。有关更多信息,请参见 Pulsar Schema Configuration & Auto Schema Discovery

The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. See the Pulsar Schema Configuration & Auto Schema Discovery for more information.

Sending key/value pairs

为了将 Kev/Value 对发送到 Pulsar,可以使用 KeyValue 架构配置 Pulsar 生产者架构。

In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a KeyValue schema.

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

如果需要更多控制来书写记录,请使用 PulsarOutgoingMessageMetadata

If you need more control on the written records, use PulsarOutgoingMessageMetadata.

Acknowledgement

在从生产者接收到消息后,Pulsar 代理将 MessageId 分配给消息并将其发送回生产者,确认消息已发布。

Upon receiving a message from a Producer, a Pulsar broker assigns a MessageId to the message and sends it back to the producer, confirming that the message is published.

默认情况下,连接器会等待 Pulsar 确认记录才能继续处理(确认接收的 Message)。可以通过将 waitForWriteCompletion 属性设置为 false 来禁用此功能。

By default, the connector does wait for Pulsar to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

如果无法写入记录,则会 nacked 消息。

If a record cannot be written, the message is nacked.

Pulsar 客户端会自动在发生故障时重试发送消息,直到达到 send timeout。可以将 send timeoutsendTimeoutMs 属性进行配置,默认值为 30 秒。

The Pulsar client automatically retries sending messages in case of failure, until the send timeout is reached. The send timeout is configurable with sendTimeoutMs attribute and by default is 30 seconds.

Back-pressure and inflight records

Pulsar 出站连接器处理背压,监控等待写入 Pulsar 代理的待处理消息数。待处理消息数使用 maxPendingMessages 属性进行配置,默认为 1000。

The Pulsar outbound connector handles back-pressure, monitoring the number of pending messages waiting to be written to the Pulsar broker. The number of pending messages is configured using the maxPendingMessages attribute and defaults to 1000.

连接器只会同时发送该数量的消息。除非至少有一个待处理的消息得到代理确认,否则不会发送其他消息。然后,当代理的一个待处理消息得到确认时,连接器将向 Pulsar 写入一条新消息。

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one pending message gets acknowledged by the broker. Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged.

还可以通过将 maxPendingMessages 设置为 0 来删除待处理消息的限制。请注意,Pulsar 还允许使用 maxPendingMessagesAcrossPartitions 配置每个分区中待处理的消息数。

You can also remove the limit of pending messages by setting maxPendingMessages to 0. Note that Pulsar also enables to configure the number of pending messages per partition using maxPendingMessagesAcrossPartitions.

Producer Batching

默认情况下,Pulsar 生产者会将各个消息批处理后一起发布到代理。可以使用 batchingMaxPublishDelayMicrosbatchingPartitionSwitchFrequencyByPublishDelaybatchingMaxMessagesbatchingMaxBytes 配置属性配置批处理参数,或使用 batchingEnabled=false 完全禁用它。

By default, the Pulsar producer batches individual messages together to be published to the broker. You can configure batching parameters using batchingMaxPublishDelayMicros, batchingPartitionSwitchFrequencyByPublishDelay, batchingMaxMessages, batchingMaxBytes configuration properties, or disable it completely with batchingEnabled=false.

使用 Key_Shared 消费者订阅时,可以将 batcherBuilder 配置为 BatcherBuilder.KEY_BASED

When using Key_Shared consumer subscriptions, the batcherBuilder can be configured to BatcherBuilder.KEY_BASED.

Pulsar Transactions and Exactly-Once Processing

Pulsar transactions 允许事件流应用程序在一次原子操作中消费、处理和生成消息。

Pulsar transactions enable event streaming applications to consume, process, and produce messages in one atomic operation.

事务允许一个或多个生产者向多个主题发送一批消息,其中一批消息中的所有消息最终对任何消费者可见,或对消费者不可见。

Transactions allow one or multiple producers to send batch of messages to multiple topics where all messages in the batch are eventually visible to any consumer, or none is ever visible to consumers.

为了使用,事务支持需要在代理配置中使用 transactionCoordinatorEnabled=truesystemTopicEnabled=true 代理配置激活。

In order to be used, transaction support needs to be activated on the broker configuration, using transactionCoordinatorEnabled=true and systemTopicEnabled=true broker configuration.

在客户端上,也需要在 PulsarClient 配置中启用事务支持:

On the client side, the transaction support also needs to be enabled on PulsarClient configuration:

mp.messaging.outgoing.tx-producer.enableTransaction=true

Pulsar 连接器提供 PulsarTransactions 自定义发射器,用于在事务中写入记录。

Pulsar connector provides PulsarTransactions custom emitter for writing records inside a transaction.

它可以作为常规发射器 @Channel 使用:

It can be used as a regular emitter @Channel:

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarTransactionalProducer {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<OutgoingMessage<Integer>> txProducer;

    @Inject
    @Channel("other-producer")
    PulsarTransactions<String> producer;

    @Incoming("in")
    public Uni<Void> emitInTransaction(Message<Integer> in) {
        return txProducer.withTransaction(emitter -> {
            emitter.send(OutgoingMessage.of("a", 1));
            emitter.send(OutgoingMessage.of("b", 2));
            emitter.send(OutgoingMessage.of("c", 3));
            producer.send(emitter, "4");
            producer.send(emitter, "5");
            producer.send(emitter, "6");
            return Uni.createFrom().completionStage(in::ack);
        });
    }

}

给定 withTransaction 方法的函数接收一个 TransactionalEmitter 来生成记录,并返回一个 Uni 来提供事务的结果。如果处理成功完成,则刷新生产者并提交事务。如果处理抛出异常、返回一个失败的 Uni 或标记 TransactionalEmitter 要中止,则中止事务。

The function given to the withTransaction method receives a TransactionalEmitter for producing records, and returns a Uni that provides the result of the transaction. If the processing completes successfully, the producer is flushed and the transaction is committed. If the processing throws an exception, returns a failing Uni, or marks the TransactionalEmitter for abort, the transaction is aborted.

多个事务性生产者可以参与单一事务。这确保在事务启动后发送所有消息,并在提交事务之前,刷新所有参与的生产者。

Multiple transactional producers can participate in a single transaction. This ensures all messages are sent using the started transaction and before the transaction is committed, all participating producers are flushed.

如果这种方法是在 Vert.x 上下文中调用的,则处理函数也在该上下文中调用。否则,它在生产者的发送线程中调用。

If this method is called on a Vert.x context, the processing function is also called on that context. Otherwise, it is called on the sending thread of the producer.

Exactly-Once Processing

Pulsar 事务 API 还可以与生成的消息一起管理事务中的消费者偏移。这反过来又支持以消费-转换-生成模式(也称为恰好一次处理)将消费者与事务性生产者耦合在一起。这意味着应用程序消费消息,处理它们,将结果发布到主题中,并在事务中提交已消费消息的偏移。

Pulsar Transactions API also allows managing consumer offsets inside a transaction, together with produced messages. This in turn enables coupling a consumer with a transactional producer in a consume-transform-produce pattern, also known as exactly-once processing. It means that an application consumes messages, processes them, publishes the results to a topic, and commits offsets of the consumed messages in a transaction.

PulsarTransactions 发射器还提供了一种方法,可以在事务中对传入的 Pulsar 消息应用恰好一次处理。

The PulsarTransactions emitter also provides a way to apply exactly-once processing to an incoming Pulsar message inside a transaction.

以下示例包括事务中的批处理 Pulsar 消息。

The following example includes a batch of Pulsar messages inside a transaction.

mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

    @ApplicationScoped
    public class PulsarExactlyOnceProcessor {

        @Inject
        @Channel("tx-out-example")
        PulsarTransactions<Integer> txProducer;

        @Incoming("in-channel")
        public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
            return txProducer.withTransactionAndAck(batch, emitter -> {
                for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
                    emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
                }
                return Uni.createFrom().voidItem();
            });
        }

    }

如果处理成功完成,则消息在事务中确认,并提交事务。

If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.

在使用恰好一次处理时,只能单独确认消息,而不能累积确认消息。

When using exactly-once processing, messages can only be acked individually rather than cumulatively.

如果需要中止处理,则消息被拒绝。可以采用一种故障策略来重试处理或简单地停止故障。请注意,如果事务失败并被中止,则从 withTransaction 返回的 Uni 将产生故障。

If the processing needs to abort, the message is nack’ed. One of the failure strategies can be employed in order to retry the processing or simply fail-stop. Note that the Uni returned from the withTransaction will yield a failure if the transaction fails and is aborted.

应用程序可以选择处理错误情况,但为了使消息消耗继续进行,从 @Incoming 方法返回的 Uni 不能导致故障。PulsarTransactions#withTransactionAndAck 方法将确认和拒绝消息,但不会停止反应式流。忽略故障只是将消费者重置为上次提交的偏移,并从那里恢复处理。

The application can choose to handle the error case, but for the message consumption to continue, Uni returned from the @Incoming method must not result in failure. PulsarTransactions#withTransactionAndAck method will ack and nack the message but will not stop the reactive stream. Ignoring the failure simply resets the consumer to the last committed offsets and resumes the processing from there.

为了避免故障情况下出现重复项,建议在代理端启用消息重复消除和批次索引级别确认:

In order to avoid duplicates in case of failure, it is recommended to enable message deduplication and batch index level acknowledgment on the broker side:

quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000

mp.messaging.incoming.data.batchIndexAckEnabled=true

Pulsar Schema Configuration & Auto Schema Discovery

Pulsar 消息以非结构化字节数组形式连同有效负载存储。Pulsar schema 定义了如何将结构化数据序列化到原始消息字节中。schema 应用于生产者和消费者,以使用强制数据结构来写入和读取。在将数据发布到主题之前将其序列化为原始字节,并在将原始字节传递给消费者之前对其进行反序列化。

Pulsar messages are stored with payloads as unstructured byte array. A Pulsar schema defines how to serialize structured data to the raw message bytes. The schema is applied in producers and consumers to write and read with an enforced data structure. It serializes data into raw bytes before they are published to a topic and deserializes the raw bytes before they are delivered to consumers.

Pulsar 使用模式注册表作为一个存储已注册模式信息的中央存储库,通过代理,它使生产者/消费者能够协调主题消息的模式。默认情况下,Apache BookKeeper 用于存储模式。

Pulsar uses a schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic’s messages through brokers. By default the Apache BookKeeper is used to store schemas.

Pulsar API 为许多 primitive typescomplex types提供内置架构信息,例如 Key/Value、Avro 和 Protobuf。

Pulsar API provides built-in schema information for a number of primitive types and complex types such as Key/Value, Avro and Protobuf.

Pulsar 连接器允许使用 `schema`属性指定架构作为原始类型:

The Pulsar Connector allows specifying the schema as a primitive type using the schema property:

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

如果 `schema`属性的值与 Schema Type匹配,将会使用该类型创建一个简单的架构,并用于该频道。

If the value for the schema property matches a Schema Type a simple schema will be created with that type and will be used for that channel.

Pulsar 连接器允许通过 CDI 提供 `Schema`bean,并用 `@Identifier`限定符标识,来配置复杂的架构类型。

The Pulsar Connector allows configuring complex schema types by providing Schema beans through CDI, identified with the @Identifier qualifier.

例如,以下 bean 提供 JSON 架构和 Key/Value 架构:

For example the following bean provides an JSON schema and a Key/Value schema:

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

要为定义好的架构配置入站频道`users`,你需要将 `schema`属性设置为架构 `user-schema`的标识符:

To configure the incoming channel users with defined schema, you need to set the schema property to the identifier of the schema user-schema:

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

如果找不到 `schema`属性,连接器会查找用频道名称标识的 `Schema`bean。例如,出站频道 `a-channel`将使用 Key/Value 架构。

If no schema property is found, the connector looks for Schema beans identified with the channel name. For example, the outgoing channel a-channel will use the key/value schema.

mp.messaging.outgoing.a-channel.connector=smallrye-pulsar

如果没有提供架构信息,入站频道将使用 Schema.AUTO_CONSUME(),而连接器将使用 `Schema.AUTO_PRODUCE_BYTES()`架构。

If no schema information is provided incoming channels will use Schema.AUTO_CONSUME(), whereas outgoing channels will use Schema.AUTO_PRODUCE_BYTES() schemas.

Auto Schema Discovery

当使用 Quarkus 消息传递 Pulsar (io.quarkus:quarkus-messaging-pulsar) 时,Quarkus 经常会自动检测配置正确的 Pulsar 架构。此自动检测基于 @Incoming`和 `@Outgoing`方法的声明,以及注入的 `@Channel

When using Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar), Quarkus can often automatically detect the correct Pulsar Schema to configure. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected `@Channel`s.

例如,如果您声明

For example, if you declare

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

你的配置表明 generated-price`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `generated-price`频道中 `schema`的属性设置为 Pulsar 架构 `INT32

and your configuration indicates that the generated-price channel uses the smallrye-pulsar connector, then Quarkus will automatically set the schema attribute of the generated-price channel to Pulsar Schema INT32.

类似地,如果您声明

Similarly, if you declare

@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
    ...
}

你的配置表明 `my-pulsar-consumer`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `schema`的属性设置为 Pulsar `BYTES`架构。

and your configuration indicates that the my-pulsar-consumer channel uses the smallrye-pulsar connector, then Quarkus will automatically set the schema attribute to Pulsar BYTES Schema.

最后,如果您声明

Finally, if you declare

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

你的配置表明 `price-create`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `schema`设置为 Pulsar `INT64`架构。

and your configuration indicates that the price-create channel uses the smallrye-pulsar connector, then Quarkus will automatically set the schema to Pulsar INT64 Schema.

Pulsar 架构自动检测支持的类型全套集是:

The full set of types supported by the Pulsar Schema autodetection is:

  • short and java.lang.Short

  • int and java.lang.Integer

  • long and java.lang.Long

  • float and java.lang.Float

  • double and java.lang.Double

  • byte[]

  • java.time.Instant

  • java.sql.Timestamp

  • java.time.LocalDate

  • java.time.LocalTime

  • java.time.LocalDateTime

  • java.nio.ByteBuffer

  • classes generated from Avro schemas, as well as Avro GenericRecord, will be configured with AVRO schema type

  • classes generated from Protobuf schemas, will be configured with PROTOBUF schema type

  • other classes will automatically be configured with JSON schema type

请注意,`JSON`架构类型强制进行架构验证。

Note that JSON schema type enforces schema validation.

除那些 Pulsar 提供的架构外,Quarkus 还提供以下架构实现 without enforcing validation:

In addition to those Pulsar-provided schemas, Quarkus provides following schema implementations without enforcing validation :

  • io.vertx.core.buffer.Buffer will be configured with io.quarkus.pulsar.schema.BufferSchema schema

  • io.vertx.core.json.JsonObject will be configured with io.quarkus.pulsar.schema.JsonObjectSchema schema

  • io.vertx.core.json.JsonArray will be configured with io.quarkus.pulsar.schema.JsonArraySchema schema

  • For schema-less Json serialization, if the schema configuration is set to ObjectMapper<fully_qualified_name_of_the_bean>, a Schema will be generated using the Jackson ObjectMapper, without enforcing a Pulsar Schema validation. io.quarkus.pulsar.schema.ObjectMapperSchema can be used to explicitly configure JSON schema without validation.

如果通过配置设置了 schema,它将不会被自动检测替换。

If a schema is set by configuration, it won’t be replaced by the auto-detection.

如果您在序列化器自动检测方面遇到了任何问题,可以通过设置 quarkus.messaging.pulsar.serializer-autodetection.enabled=false 完全关闭它。如果您发现您需要这样做,请在 Quarkus issue tracker 中提交一个漏洞,以便我们修复您遇到的任何问题。

In case you have any issues with serializer auto-detection, you can switch it off completely by setting quarkus.messaging.pulsar.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

Dev Services for Pulsar

借助 Quarkus Messaging Pulsar 扩展(quarkus-messaging-pulsar),针对 Pulsar 的 Dev Services 会在开发模式和运行测试时自动启动一个 Pulsar 代理。因此,你无需手动启动代理。应用程序会自动配置。

With Quarkus Messaging Pulsar extension (quarkus-messaging-pulsar) Dev Services for Pulsar automatically starts a Pulsar broker in dev mode and when running tests. So, you don’t have to start a broker manually. The application is configured automatically.

Enabling / Disabling Dev Services for Pulsar

除非满足以下条件,否则会自动启用针对 Pulsar 的 Dev Services:

Dev Services for Pulsar is automatically enabled unless:

  • quarkus.pulsar.devservices.enabled is set to false

  • the pulsar.client.serviceUrl is configured

  • all the Reactive Messaging Pulsar channels have the serviceUrl attribute set

用于 Pulsar 的 Dev Services 依赖于 Docker 来启动代理。如果您的环境不支持 Docker,您将需要手动启动代理或连接到已运行的代理。您可以使用 pulsar.client. 配置代理地址。

Dev Services for Pulsar relies on Docker to start the broker. If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. You can configure the broker address using pulsar.client..

Shared broker

大多数情况下,您需要在应用程序之间共享代理。用于 Pulsar 的 Dev Services 为在 dev 模式下运行您的多个 Quarkus 应用程序实施了一个 service discovery 机制,以共享一个代理。

Most of the time you need to share the broker between applications. Dev Services for Pulsar implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.

用于 Pulsar 的 Dev Services 使用 quarkus-dev-service-pulsar 标签启动容器,该标签用于识别容器。

Dev Services for Pulsar starts the container with the quarkus-dev-service-pulsar label which is used to identify the container.

如果您需要多个(共享)代理,您可以配置 quarkus.pulsar.devservices.service-name 属性并指出代理名称。它会寻找具有相同值的一个容器,或者如果找不到容器,则会启动一个新容器。默认服务名称是 pulsar

If you need multiple (shared) brokers, you can configure the quarkus.pulsar.devservices.service-name attribute and indicate the broker name. It looks for a container with the same value, or starts a new one if none can be found. The default service name is pulsar.

在 dev 模式下默认启用共享,但在测试模式下禁用共享。您可以使用 quarkus.pulsar.devservices.shared=false 禁用共享。

Sharing is enabled by default in dev mode, but disabled in test mode. You can disable the sharing with quarkus.pulsar.devservices.shared=false.

Setting the port

默认情况下,用于 Pulsar 的 Dev Services 会选择一个随机端口并配置应用程序。您可以通过配置 quarkus.pulsar.devservices.port 属性来设置端口。

By default, Dev Services for Pulsar picks a random port and configures the application. You can set the port by configuring the quarkus.pulsar.devservices.port property.

请注意,Pulsar 通告地址会使用所选端口自动配置。

Note that the Pulsar advertised address is automatically configured with the chosen port.

Configuring the image

用于 Pulsar 的 Dev Services 支持 official Apache Pulsar image

Dev Services for Pulsar supports the official Apache Pulsar image.

可以配置自定义镜像名称,如下所示:

A custom image name can be configured as such:

quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7

Configuring the Pulsar broker

您可以使用自定义代理配置配置用于 Pulsar 的 Dev Services。

You can configure the Dev Services for Pulsar with custom broker configuration.

下面的示例启用事务支持:

The following example enables transaction support:

quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true

Configuring Pulsar clients

Pulsar 客户端、使用者和生成器可以高度定制,以配置 Pulsar 客户端应用程序的行为。

Pulsar clients, consumers and producers are very customizable to configure how a Pulsar client application behaves.

Pulsar 连接器会分别为每个通道创建 Pulsar 客户端和使用者或生成器,每个都采用合理的默认值以简化其配置。尽管创建过程已得到处理,但仍然可以通过 Pulsar 通道来配置所有可用的配置选项。

The Pulsar connector creates a Pulsar client and, a consumer or a producer per channel, each with sensible defaults to ease their configuration. Although the creation is handled, all available configuration options remain configurable through Pulsar channels.

虽然创建 PulsarClientPulsarConsumerPulsarProducer 的习惯方法是通过构建器 API,但其实质是,这些 API 每次都构建一个配置对象,以传递给实现。那些是 ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData

While idiomatic way of creating PulsarClient, PulsarConsumer or PulsarProducer are through builder APIs, in its essence those APIs build each time a configuration object, to pass onto the implementation. Those are ClientConfigurationData, ConsumerConfigurationData and ProducerConfigurationData.

Pulsar Connector 允许直接对此类配置对象接收属性。例如,用于 PulsarClient 的代理身份验证信息是使用 authPluginClassNameauthParams 属性接收的。为了配置传入通道 data 的身份验证:

Pulsar Connector allows receiving properties for those configuration objects directly. For example, the broker authentication information for PulsarClient is received using authPluginClassName and authParams properties. In order to configure the authentication for the incoming channel data :

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

请注意,Pulsar 使用者属性 subscriptionInitialPosition 也已使用 enum 值 SubscriptionInitialPosition.Earliest 标识的 Earliest 值进行配置。

Note that the Pulsar consumer property subscriptionInitialPosition is also configured with the Earliest value which represents with enum value SubscriptionInitialPosition.Earliest.

这种方法涵盖了大多数配置情况。但是,诸如 CryptoKeyReaderServiceUrlProvider 等非序列化对象无法通过这种方式进行配置。Pulsar Connector 允许考虑 Pulsar 配置数据对象的实例 –ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData

This approach covers most of the configuration cases. However, non-serializable objects such as CryptoKeyReader, ServiceUrlProvider etc. cannot be configured this way. The Pulsar Connector allows taking into account instances of Pulsar configuration data objects – ClientConfigurationData, ConsumerConfigurationData, ProducerConfigurationData:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

检索此实例并使用它来配置连接器使用的客户端。您需要使用 client-configurationconsumer-configurationproducer-configuration 属性指示客户端的名称:

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-configuration, consumer-configuration or producer-configuration attributes:

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

如果没有配置 [client|consumer|producer]-configuration,连接器将查找带有通道名称的标识的实例:

If no [client|consumer|producer]-configuration is configured, the connector will look for instances identified with the channel name:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

您还可以提供包含按键排列的配置值的 Map<String, Object>

You also can provide a Map<String, Object> containing configuration values by key:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public Map<String, Object> getProducerConfig() {
        return Map.of(
                "batcherBuilder", BatcherBuilder.KEY_BASED,
                "sendTimeoutMs", 3000,
                "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    }
}

不同的配置源按从最低到最高重要性的顺序加载,如下所示:

Different configuration sources are loaded in the following order of precedence, from the least important to the highest:

  1. Map<String, Object> config map produced with default config identifier, default-pulsar-client, default-pulsar-consumer, default-pulsar-producer.

  2. Map<String, Object> config map produced with identifier in the configuration or channel name

  3. [Client|Producer|Consuemr]ConfigurationData object produced with identifier in the channel configuration or the channel name

  4. Channel configuration properties named with [Client|Producer|Consuemr]ConfigurationData field names.

有关配置选项的详尽列表,请参阅 Configuration Reference

See Configuration Reference for the exhaustive list of configuration options.

Configuring Pulsar Authentication

Pulsar 提供了一个可插入式验证框架,Pulsar 代理/代理使用这个机制来验证客户端。

Pulsar provides a pluggable authentication framework, and Pulsar brokers/proxies use this mechanism to authenticate clients.

客户端可以在 application.properties 文件中使用 authPluginClassNameauthParams 属性进行配置:

Clients can be configured in application.properties file using authPluginClassName and authParams attributes:

pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}

或者以编程方式:

Or programmatically:

import java.util.Map;

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData config() {
        var data = new ClientConfigurationData();
        var auth = new AuthenticationBasic();
        auth.configure(Map.of("userId", "superuser", "password", "admin"));
        data.setAuthentication(auth);
        return data;
    }
}

Configuring access to Datastax Luna Streaming

Luna Streaming 是 Apache Pulsar 的一个生产准备发布版本,带有 DataStax 的工具和支持。在创建 DataStax Luna Pulsar 租户后,请注意自动生成的令牌,并配置令牌验证:

Luna Streaming is a production-ready distribution of Apache Pulsar, with tools and support from DataStax. After creating your DataStax Luna Pulsar tenant, note the auto generated token, and configure the token authentication:

pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....

确保事先创建主题或在命名空间配置中启用 Auto Topic Creation

Make sure to create topics beforehand, or enable the Auto Topic Creation in the namespace configuration.

请注意,主题配置需要引用主题的全名:

Note that the topic configuration needs to reference full name of topics:

mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices

Configuring access to StreamNative Cloud

StreamNative Cloud 是一个完全托管的 Pulsar-as-a-Service,可用于不同的部署选项,无论它是完全托管的、在由 StreamNative 管理的公共云中托管的或在 Kubernetes 上自托管的。

StreamNative Cloud is a fully managed Pulsar-as-a-Service available in different deployment options, whether it is fully-hosted, on a public cloud but managed by StreamNative or self-managed on Kubernetes.

StreamNative Pulsar 集群使用 Oauth2 身份验证,因此您需要确保存在 service account,其中包含应用程序正在使用的必需的 permissions to the Pulsar namespace/topic

The StreamNative Pulsar clusters use Oauth2 authentication, so you need to make sure that a service account exists with required permissions to the Pulsar namespace/topic your application is using.

接下来,您需要下载服务帐户的 Key file(充当 private key)并记下群集的 issuer URL(通常为 https://auth.streamnative.cloud/)和 audience(例如 urn:sn:pulsar:o-rf3ol:redhat)。Admin 部分中的 Pulsar Clients 页面 StreamNative Cloud 控制台可帮助您完成此过程。

Next, you need to download the Key file (which serves as private key) of the service account and note the issuer URL (typically https://auth.streamnative.cloud/) and the audience (for example urn:sn:pulsar:o-rf3ol:redhat) for your cluster. The Pulsar Clients page in the Admin section in the StreamNative Cloud console helps you with this process.

使用 Pulsar Oauth2 验证配置您的应用程序:

To configure your application with Pulsar Oauth2 authentication:

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}

请注意,pulsar.client.authParams 配置包含一个包含 issuerUrlaudienceprivateKey 的 Json 字符串,该字符串采用 data:application/json;base64,<base64-encoded-key-file> 格式。

Note that the pulsar.client.authParams configuration contains a Json string with issuerUrl, audience and the privateKey in the data:application/json;base64,<base64-encoded-key-file> format.

或者,您可以以编程方式配置身份验证:

Alternatively you can configure the authentication programmatically:

package org.acme.pulsar;

import java.net.MalformedURLException;
import java.net.URL;

import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

@ApplicationScoped
public class PulsarAuth {

    @ConfigProperty(name = "pulsar.issuerUrl")
    String issuerUrl;

    @ConfigProperty(name = "pulsar.credentials")
    String credentials;

    @ConfigProperty(name = "pulsar.audience")
    String audience;

    @Produces
    @Identifier("pulsar-auth")
    public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
        var data = new ClientConfigurationData();
        data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
        return data;
    }
}

这假设密钥文件作为资源包含在应用程序类路径中,那么配置就像下面这样:

This assumes that the key file is included to the application classpath as a resource, then the configuration would like the following:

mp.messaging.incoming.prices.client-configuration=pulsar-auth

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json

请注意,使用 pulsar-auth 标识的客户端配置的通道需要设置 client-configuration 属性。

Note that channels using the client configuration identified with pulsar-auth need to set the client-configuration attribute.

Health Checks

Quarkus 扩展报告由 Pulsar 连接器管理的每个通道的启动、准备和活动状态。运行状况检查依赖 Pulsar 客户端来验证是否已与代理建立连接。

The Quarkus extension reports startup, readiness and liveness of each channel managed by the Pulsar connector. Health checks rely on the Pulsar client to verify that a connection is established with the broker.

StartupReadiness 探针用于入站和出站通道均报告 OK,当时与代理的连接已建立。

Startup and Readiness probes for both inbound and outbound channels report OK when the connection with the broker is established.

用于入站和出站通道的 Liveness 探针报告 OK,当时已与代理建立连接,并且尚未捕获到任何故障。

The Liveness probe for both inbound and outbound channels reports OK when the connection is established with the broker AND that no failures have been caught.

请注意,消息处理在 nacks 中失败,该消息将由失败策略处理。失败策略负责报告故障并影响活动性检查的结果。fail 失败策略将报告故障,因此活动性检查也将报告故障。

Note that a message processing failures nacks the message which is then handled by the failure-strategy. It is the responsibility of the failure-strategy to report the failure and influence the outcome of the liveness checks. The fail failure strategy reports the failure and so the liveness check will report the failure.

Configuration Reference

以下是 Pulsar 连接器通道、使用者、生产者和客户端的配置属性列表。有关如何配置 Pulsar 客户端的更多信息,请参见 Pulsar Client Configuration

Following are the list of configuration attributes for the Pulsar connector channels, consumers, producers and clients. See the Pulsar Client Configuration for more information on how the Pulsar clients are configured.

Incoming channel configuration (receiving from Pulsar)

使用以下内容配置下列属性:

The following attributes are configured using:

mp.messaging.incoming.your-channel-name.attribute=value

Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-incoming.adoc[]

还可以配置底层 Pulsar 使用者支持的属性。

You can also configure properties supported by the underlying Pulsar consumer.

这些属性还可以使用 pulsar.consumer 前缀进行全局配置:

These properties can also be globally configured using pulsar.consumer prefix:

pulsar.consumer.subscriptionInitialPosition=Earliest

Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-consumer.adoc[]

Outgoing channel configuration (publishing to Pulsar)

Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-outgoing.adoc[]

还可以配置底层 Pulsar 生产者支持的属性。

You can also configure properties supported by the underlying Pulsar producer.

这些属性还可以使用 pulsar.producer 前缀进行全局配置:

These properties can also be globally configured using pulsar.producer prefix:

pulsar.producer.batchingEnabled=false

Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-producer.adoc[]

Pulsar Client Configuration

以下是底层 `PulsarClient`的配置参考。可以使用通道属性配置这些选项:

Following is the configuration reference for the underlying PulsarClient. These options can be configured using the channel attribute:

mp.messaging.incoming.your-channel-name.numIoThreads=4

或使用 pulsar.client 前缀进行全局配置:

Or configured globally using pulsar.client prefix:

pulsar.client.serviceUrl=pulsar://pulsar:6650

Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-client.adoc[]

在配置文件中不可配置的配置属性(不可序列化)在列 `Config file`中标明。

Configuration properties not configurable in configuration files (non-serializable) is noted in the column Config file.

Going further

本指南展示了你可以使用 Quarkus 与 Pulsar 进行交互的方式。它利用 Quarkus Messaging 构建数据流应用程序。

This guide has shown how you can interact with Pulsar using Quarkus. It utilizes Quarkus Messaging to build data streaming applications.

如果您想进一步了解,请查看 SmallRye Reactive Messaging的文档,这是Quarkus中使用的实现。

If you want to go further, check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.