Apache Pulsar Reference Guide

Introduction

Apache Pulsar是一个开源分布式消息和流平台,专为云而构建。它提供了一个针对具有分层存储功能的服务器消息的多租户高性能解决方案。

Pulsar 实现了发布订阅模式:

  • 生产者将消息发布到 topics

  • 消费者为这些主题创建 subscriptions,以接收和处理传入消息,并在处理完成后向代理发送 acknowledgments

  • 当创建订阅时,Pulsar 会保留所有消息,即使消费者断开连接。只有当消费者确认所有这些消息都已成功处理时,才会丢弃保留的消息。

Pulsar 集群包括:

  • 一个或多个 brokers,属于无状态组件。

  • 一个 metadata store,用于维护专题元数据、架构、协调和集群配置。

  • 一组 bookies,用于持久化存储消息。

Quarkus Extension for Apache Pulsar

Quarkus 通过 SmallRye Reactive Messaging 框架支持 Apache Pulsar。基于 Eclipse MicroProfile Reactive Messaging 规范 3.0,它提出了桥接 CDI 和事件驱动的灵活编程模型。

本指南深入介绍了 Apache Pulsar 和 SmallRye Reactive Messaging 框架。快速入门,请参阅 Getting Started to Quarkus Messaging with Apache Pulsar

你可以在项目基本目录中运行以下命令向你的项目添加 messaging-pulsar 扩展:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

这会将以下内容添加到构建文件中:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-pulsar")

该扩展包括 pulsar-clients-original 3.0.0 版本作为传递依赖项,并且兼容于 Pulsar 代理版本 2.10.x。

Configuring SmallRye Pulsar Connector

因为 SmallRye Reactive Messaging 框架支持不同的消息传递后端,例如 Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT 等,所以它采用通用词汇:

  • 应用程序发送并接收 messagesMessage 封装一个 payload,可以用一些 metadata 扩展。这应该与一个 Pulsar Message 区分,后者由 value 和 key 组成。使用 Pulsar 连接器,一个 Reactive Messaging message 对应于一个 Pulsar message

  • 消息在 channels 上传输。应用程序组件连接到通道以发布和使用消息。Pulsar 连接器将 channels 映射到 Pulsar topics

  • 通道使用 connectors 连接到消息后端。连接器配置为将传入消息映射到一个特定通道(应用程序使用),并收集发送到特定通道的传出消息。每个连接器都专用于一个特定消息传递技术。例如,处理 Pulsar 的连接器名为 smallrye-pulsar

具有传入通道的 Pulsar 连接器的最小配置如下:

%prod.pulsar.client.serviceUrl=pulsar:6650 1
mp.messaging.incoming.prices.connector=smallrye-pulsar 2
1 为生产配置文件配置 Pulsar 代理服务 URL。你可以使用 mp.messaging.incoming.$channel.serviceUrl 属性全局配置或按通道配置。在开发模式下和运行测试时,Dev Services for Pulsar 自动启动一个 Pulsar 代理。
2 配置连接器以管理价格通道。默认情况下,topic 名称与通道名称相同。

你可以配置主题属性来覆盖它。

%prod 前缀表示该属性仅在应用程序在生产模式下运行时使用(因此在开发或测试模式下不使用)。请参阅 Profile documentation 了解详细信息。

Connector auto-attachment

如果你的类路径上有一个连接器,则可以省略 connector 属性配置。Quarkus 自动将 orphan 通道关联到类路径上的(唯一)连接器。Orphan 通道是下游没有消费者的传出通道,或上游没有生产者的传入通道。 可以使用以下方法禁用此自动附加功能:

quarkus.messaging.auto-connector-attachment=false

有关更多配置选项,请参见 Configuring Pulsar clients

Receiving messages from Pulsar

Pulsar 连接器使用 Pulsar 客户端连接到 Pulsar 代理,并创建消费者以接收来自 Pulsar 代理的消息,并且将每个 Pulsar Message`映射到反应式消息传递 `Message

Example

假设您有一个正在运行且可以使用 `pulsar:6650`地址访问的 Pulsar 代理。按如下方式配置您的应用程序以接收 `prices`通道上的 Pulsar 消息:

mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
  1. 配置 Pulsar 代理服务 URL。

  2. 确保消费者订阅开始接收来自 `Earliest`位置的消息。

您不需要设置 Pulsar 主题,也不需要设置消费者名称。默认情况下,连接器使用通道名称 (prices)。您可以配置 `topic`和 `consumerName`属性来覆盖它们。

在 Pulsar 中,消费者需要为主题订阅提供 subscriptionName。如果未提供,连接器会生成一个唯一的 subscription name

然后,您的应用程序可以直接接收 `double`有效负载:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,您可以检索反应式消息传递类型 Message<Double>

@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
    return msg.ack();
}

反应式消息传递 `Message`类型允许使用的方法访问传入的消息元数据,并手动处理确认。

如果您想直接访问 Pulsar 消息对象,请使用:

@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
    String key = msg.getKey();
    String value = msg.getValue();
    String topic = msg.topicName();
    // ...
}

`org.apache.pulsar.client.api.Message`由底层 Pulsar 客户端提供,可以直接在消费者方法中使用。

或者,您的应用程序可以在您的 bean 中注入一个 Multi,使用通道名称进行标识,并订阅它的事件,如下例所示:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

使用 @Channel 使用消息时,应用程序代码负责订阅。在上面的示例中,Quarkus REST 端点(以前的 RESTEasy Reactive)会为您处理此事。

可以将以下类型注入为通道:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

与前面的 Message`示例一样,如果注入的通道接收有效负载 (`Multi<T>),则会自动确认消息,并支持多个订阅者。如果已注入的通道接收 Message (Multi<Message<T>>),您将负责确认和广播。

Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。 可以在 SmallRye Reactive Messaging – Handling blocking execution 中找到关于 @Blocking 注释的用法详细信息。

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

@Transactional

如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。

Pulsar Subscription Types

可以灵活地使用 Pulsar *subscriptionType*消费者配置来实现不同的消息传递场景,例如发布-订阅或排队。

  • *Exclusive*订阅类型允许为“扇出式发布-订阅消息传递”指定 unique subscription name。这是默认订阅类型。

  • Shared、*Key_Shared*或 *Failover*订阅类型允许多个消费者共享 same subscription name,以便在消费者之间实现“消息排队”。

如果未提供订阅名称,Quarkus 将生成一个唯一 ID。

Deserialization and Pulsar Schema

Pulsar 连接器允许配置 Pulsar 消费者基础架构的模式配置。有关更多信息,请参阅 Pulsar Schema Configuration & Auto Schema Discovery

Acknowledgement Strategies

当由 Pulsar 消息产生的消息 acknowledged*时,连接器会向 Pulsar 代理发送 acknowledgement request。所有反应式消息传递消息都需要 *acknowledged,这在大多数情况下都是自动处理的。可以使用以下两种策略向 Pulsar 代理发送确认请求:

  • *Individual acknowledgement*是默认策略,即向代理发送每个消息的确认请求。

  • *Cumulative acknowledgement*使用 `ack-strategy=cumulative`配置,消费器仅确认收到的最后一条消息。流中直到(且包括)所提供的消息的所有消息都不会重新传递给该消费器。

默认情况下,Pulsar 消费器不会等待来自代理的确认确认来验证确认。你可以使用 `ackReceiptEnabled=true`启用此功能。

Failure Handling Strategies

如果由 Pulsar 消息生成的消息是 nacked,则会应用错误策略。Quarkus Pulsar 扩展支持 4 个策略:

  • nack *(default)*向代理发送 negative acknowledgment,触发代理将此消息重新传递给消费器。可以使用 `negativeAckRedeliveryDelayMicros`和 `negativeAck.redeliveryBackoff`属性进一步配置否定确认。

  • `fail`使应用程序失败,不再处理任何消息。

  • `ignore`记录错误,但将应用确认策略并且处理将继续。

  • `continue`记录错误,但处理继续进行,不应用确认或否定确认。此策略可与 Acknowledgement timeout配置结合使用。

  • `reconsume-later`使用 `reconsumeLater`API 将消息发送至 retry letter topic以便在延迟后重新使用。可以使用 `reconsumeLater.delay`属性配置延迟,默认值为 3 秒。可以通过将 `io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata`实例添加到错误元数据来配置每个消息的自定义延迟或属性。

Acknowledgement timeout

与否定确认类似,对于 acknowledgement timeout机制,Pulsar 客户端跟踪未确认的消息,针对给定的 ackTimeout*时间段,并向代理发送 *redeliver unacknowledged messages request,因此代理将未确认的消息重新发送给消费器。

要配置超时和重新传递退避机制,可以设置 `ackTimeoutMillis`和 `ackTimeout.redeliveryBackoff`属性。`ackTimeout.redeliveryBackoff`值接受以毫秒为单位的最小延迟、以毫秒为单位的最大延迟和乘数的逗号分隔值:

mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2

Reconsume later and retry letter topic

retry letter topic将未成功使用的消息推送到拒信主题中并继续消息使用。请注意,拒信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试信主题。

mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2

Dead-letter topic

dead letter topic将未成功使用的消息推送到拒信主题中,并继续消息使用。请注意,拒信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试信主题。

mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

用于重新传递的 *Negative acknowledgment*或 *acknowledgment timeout*方法将重新传递包含至少一条未处理消息的消息批。有关更多信息,请参见 Producer Batching

Receiving Pulsar Messages in Batches

默认情况下,传入的方法单独接收每个 Pulsar 消息。你可以使用 `batchReceive=true`属性,或在消费器配置中设置 `batchReceivePolicy`来启用批模式。

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
        String key = msg.getKey();
        String topic = msg.getTopicName();
        long timestamp = msg.getEventTime();
        //... process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
        //... process messages
    }
}

或者你可以直接接收有效负载列表到消费方法:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Quarkus 会自动侦测传入通道的批类型并自动设置批配置。你可以使用 `mp.messaging.incoming.$channel.batchReceive`属性明确配置批模式。

Sending messages to Pulsar

Pulsar 连接器可以将响应式消息传递 Message 写为 Pulsar 消息。

Example

假设你有一个正在运行且可以使用 `pulsar:6650`地址访问的 Pulsar 代理。按照以下方式配置你的应用程序,以将 `prices`通道中的消息写入 Pulsar 消息:

mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
  1. 配置 Pulsar 代理服务 URL。

无需设置 Pulsar 主题或生产者名称。默认情况下,连接器使用频道名称 (prices)。可以配置 topicproducerName 属性来覆盖它们。

然后,应用程序必须向 prices 频道发送 Message<Double>。它可以使用 double 有效负载,如下面的代码片段所示:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

请注意,generate 方法返回 Multi<Double>,它实现了 Flow.Publisher 接口。此发布者将由框架用于生成消息并将其发送到已配置的 Pulsar 主题。

除了返回有效负载,还可以返回 io.smallrye.reactive.messaging.pulsar.OutgoingMessage 来发送 Pulsar 消息:

@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

有效负载可以封装在 `org.eclipse.microprofile.reactive.messaging.Message`中,以更好地控制写入记录:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

发送 Messages 时,可以添加 io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata 的实例来影响消息写入 Pulsar 的方式。

除了返回 Flow.Publisher 的方法签名之外,传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器来创建无限流。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

Serialization and Pulsar Schema

Pulsar 连接器允许为底层 Pulsar 生产者配置架构配置。有关更多信息,请参见 Pulsar Schema Configuration & Auto Schema Discovery

Sending key/value pairs

为了将 Kev/Value 对发送到 Pulsar,可以使用 KeyValue 架构配置 Pulsar 生产者架构。

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

如果需要更多控制来书写记录,请使用 PulsarOutgoingMessageMetadata

Acknowledgement

在从生产者接收到消息后,Pulsar 代理将 MessageId 分配给消息并将其发送回生产者,确认消息已发布。

默认情况下,连接器会等待 Pulsar 确认记录才能继续处理(确认接收的 Message)。可以通过将 waitForWriteCompletion 属性设置为 false 来禁用此功能。

如果无法写入记录,则会 nacked 消息。

Pulsar 客户端会自动在发生故障时重试发送消息,直到达到 send timeout。可以将 send timeoutsendTimeoutMs 属性进行配置,默认值为 30 秒。

Back-pressure and inflight records

Pulsar 出站连接器处理背压,监控等待写入 Pulsar 代理的待处理消息数。待处理消息数使用 maxPendingMessages 属性进行配置,默认为 1000。

连接器只会同时发送该数量的消息。除非至少有一个待处理的消息得到代理确认,否则不会发送其他消息。然后,当代理的一个待处理消息得到确认时,连接器将向 Pulsar 写入一条新消息。

还可以通过将 maxPendingMessages 设置为 0 来删除待处理消息的限制。请注意,Pulsar 还允许使用 maxPendingMessagesAcrossPartitions 配置每个分区中待处理的消息数。

Producer Batching

默认情况下,Pulsar 生产者会将各个消息批处理后一起发布到代理。可以使用 batchingMaxPublishDelayMicrosbatchingPartitionSwitchFrequencyByPublishDelaybatchingMaxMessagesbatchingMaxBytes 配置属性配置批处理参数,或使用 batchingEnabled=false 完全禁用它。

使用 Key_Shared 消费者订阅时,可以将 batcherBuilder 配置为 BatcherBuilder.KEY_BASED

Pulsar Transactions and Exactly-Once Processing

Pulsar transactions 允许事件流应用程序在一次原子操作中消费、处理和生成消息。

事务允许一个或多个生产者向多个主题发送一批消息,其中一批消息中的所有消息最终对任何消费者可见,或对消费者不可见。

为了使用,事务支持需要在代理配置中使用 transactionCoordinatorEnabled=truesystemTopicEnabled=true 代理配置激活。

在客户端上,也需要在 PulsarClient 配置中启用事务支持:

mp.messaging.outgoing.tx-producer.enableTransaction=true

Pulsar 连接器提供 PulsarTransactions 自定义发射器,用于在事务中写入记录。

它可以作为常规发射器 @Channel 使用:

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarTransactionalProducer {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<OutgoingMessage<Integer>> txProducer;

    @Inject
    @Channel("other-producer")
    PulsarTransactions<String> producer;

    @Incoming("in")
    public Uni<Void> emitInTransaction(Message<Integer> in) {
        return txProducer.withTransaction(emitter -> {
            emitter.send(OutgoingMessage.of("a", 1));
            emitter.send(OutgoingMessage.of("b", 2));
            emitter.send(OutgoingMessage.of("c", 3));
            producer.send(emitter, "4");
            producer.send(emitter, "5");
            producer.send(emitter, "6");
            return Uni.createFrom().completionStage(in::ack);
        });
    }

}

给定 withTransaction 方法的函数接收一个 TransactionalEmitter 来生成记录,并返回一个 Uni 来提供事务的结果。如果处理成功完成,则刷新生产者并提交事务。如果处理抛出异常、返回一个失败的 Uni 或标记 TransactionalEmitter 要中止,则中止事务。

多个事务性生产者可以参与单一事务。这确保在事务启动后发送所有消息,并在提交事务之前,刷新所有参与的生产者。

如果这种方法是在 Vert.x 上下文中调用的,则处理函数也在该上下文中调用。否则,它在生产者的发送线程中调用。

Exactly-Once Processing

Pulsar 事务 API 还可以与生成的消息一起管理事务中的消费者偏移。这反过来又支持以消费-转换-生成模式(也称为恰好一次处理)将消费者与事务性生产者耦合在一起。这意味着应用程序消费消息,处理它们,将结果发布到主题中,并在事务中提交已消费消息的偏移。

PulsarTransactions 发射器还提供了一种方法,可以在事务中对传入的 Pulsar 消息应用恰好一次处理。

以下示例包括事务中的批处理 Pulsar 消息。

mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

    @ApplicationScoped
    public class PulsarExactlyOnceProcessor {

        @Inject
        @Channel("tx-out-example")
        PulsarTransactions<Integer> txProducer;

        @Incoming("in-channel")
        public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
            return txProducer.withTransactionAndAck(batch, emitter -> {
                for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
                    emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
                }
                return Uni.createFrom().voidItem();
            });
        }

    }

如果处理成功完成,则消息在事务中确认,并提交事务。

在使用恰好一次处理时,只能单独确认消息,而不能累积确认消息。

如果需要中止处理,则消息被拒绝。可以采用一种故障策略来重试处理或简单地停止故障。请注意,如果事务失败并被中止,则从 withTransaction 返回的 Uni 将产生故障。

应用程序可以选择处理错误情况,但为了使消息消耗继续进行,从 @Incoming 方法返回的 Uni 不能导致故障。PulsarTransactions#withTransactionAndAck 方法将确认和拒绝消息,但不会停止反应式流。忽略故障只是将消费者重置为上次提交的偏移,并从那里恢复处理。

为了避免故障情况下出现重复项,建议在代理端启用消息重复消除和批次索引级别确认:

quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000

mp.messaging.incoming.data.batchIndexAckEnabled=true

Pulsar Schema Configuration & Auto Schema Discovery

Pulsar 消息以非结构化字节数组形式连同有效负载存储。Pulsar schema 定义了如何将结构化数据序列化到原始消息字节中。schema 应用于生产者和消费者,以使用强制数据结构来写入和读取。在将数据发布到主题之前将其序列化为原始字节,并在将原始字节传递给消费者之前对其进行反序列化。

Pulsar 使用模式注册表作为一个存储已注册模式信息的中央存储库,通过代理,它使生产者/消费者能够协调主题消息的模式。默认情况下,Apache BookKeeper 用于存储模式。

Pulsar API 为许多 primitive typescomplex types提供内置架构信息,例如 Key/Value、Avro 和 Protobuf。

Pulsar 连接器允许使用 `schema`属性指定架构作为原始类型:

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

如果 `schema`属性的值与 Schema Type匹配,将会使用该类型创建一个简单的架构,并用于该频道。

Pulsar 连接器允许通过 CDI 提供 `Schema`bean,并用 `@Identifier`限定符标识,来配置复杂的架构类型。

例如,以下 bean 提供 JSON 架构和 Key/Value 架构:

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

要为定义好的架构配置入站频道`users`,你需要将 `schema`属性设置为架构 `user-schema`的标识符:

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

如果找不到 `schema`属性,连接器会查找用频道名称标识的 `Schema`bean。例如,出站频道 `a-channel`将使用 Key/Value 架构。

mp.messaging.outgoing.a-channel.connector=smallrye-pulsar

如果没有提供架构信息,入站频道将使用 Schema.AUTO_CONSUME(),而连接器将使用 `Schema.AUTO_PRODUCE_BYTES()`架构。

Auto Schema Discovery

当使用 Quarkus 消息传递 Pulsar (io.quarkus:quarkus-messaging-pulsar) 时,Quarkus 经常会自动检测配置正确的 Pulsar 架构。此自动检测基于 @Incoming`和 `@Outgoing`方法的声明,以及注入的 `@Channel

例如,如果您声明

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

你的配置表明 generated-price`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `generated-price`频道中 `schema`的属性设置为 Pulsar 架构 `INT32

类似地,如果您声明

@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
    ...
}

你的配置表明 `my-pulsar-consumer`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `schema`的属性设置为 Pulsar `BYTES`架构。

最后,如果您声明

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

你的配置表明 `price-create`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `schema`设置为 Pulsar `INT64`架构。

Pulsar 架构自动检测支持的类型全套集是:

  • short and java.lang.Short

  • int and java.lang.Integer

  • long and java.lang.Long

  • float and java.lang.Float

  • double and java.lang.Double

  • byte[]

  • java.time.Instant

  • java.sql.Timestamp

  • java.time.LocalDate

  • java.time.LocalTime

  • java.time.LocalDateTime

  • java.nio.ByteBuffer

  • 从 Avro 架构生成的类,以及 Avro GenericRecord,将使用 `AVRO`架构类型进行配置

  • 从 Protobuf 架构生成的类,将使用 `PROTOBUF`架构类型进行配置

  • 其他类将使用 `JSON`架构类型进行自动配置

请注意,`JSON`架构类型强制进行架构验证。

除那些 Pulsar 提供的架构外,Quarkus 还提供以下架构实现 without enforcing validation:

  • `io.vertx.core.buffer.Buffer`将使用`io.quarkus.pulsar.schema.BufferSchema`架构进行配置

  • io.vertx.core.json.JsonObject 将使用 io.quarkus.pulsar.schema.JsonObjectSchema 架构进行配置

  • io.vertx.core.json.JsonArray 将使用 io.quarkus.pulsar.schema.JsonArraySchema 架构进行配置

  • 对于无架构的 Json 序列化,如果 schema 配置设置为 ObjectMapper&lt;fully_qualified_name_of_the_bean&gt;,Schema 将使用 Jackson ObjectMapper 生成,而不强制实施 Pulsar Schema 验证。io.quarkus.pulsar.schema.ObjectMapperSchema 可用于显式配置 JSON 架构而不进行验证。

如果通过配置设置了 schema,它将不会被自动检测替换。

如果您在序列化器自动检测方面遇到了任何问题,可以通过设置 quarkus.messaging.pulsar.serializer-autodetection.enabled=false 完全关闭它。如果您发现您需要这样做,请在 Quarkus issue tracker 中提交一个漏洞,以便我们修复您遇到的任何问题。

Dev Services for Pulsar

借助 Quarkus Messaging Pulsar 扩展(quarkus-messaging-pulsar),针对 Pulsar 的 Dev Services 会在开发模式和运行测试时自动启动一个 Pulsar 代理。因此,你无需手动启动代理。应用程序会自动配置。

Enabling / Disabling Dev Services for Pulsar

除非满足以下条件,否则会自动启用针对 Pulsar 的 Dev Services:

  • quarkus.pulsar.devservices.enabled 设置为 false

  • the pulsar.client.serviceUrl is configured

  • 所有的 Reactive Messaging Pulsar 频道都有 serviceUrl 属性设置

用于 Pulsar 的 Dev Services 依赖于 Docker 来启动代理。如果您的环境不支持 Docker,您将需要手动启动代理或连接到已运行的代理。您可以使用 pulsar.client. 配置代理地址。

Shared broker

大多数情况下,您需要在应用程序之间共享代理。用于 Pulsar 的 Dev Services 为在 dev 模式下运行您的多个 Quarkus 应用程序实施了一个 service discovery 机制,以共享一个代理。

用于 Pulsar 的 Dev Services 使用 quarkus-dev-service-pulsar 标签启动容器,该标签用于识别容器。

如果您需要多个(共享)代理,您可以配置 quarkus.pulsar.devservices.service-name 属性并指出代理名称。它会寻找具有相同值的一个容器,或者如果找不到容器,则会启动一个新容器。默认服务名称是 pulsar

在 dev 模式下默认启用共享,但在测试模式下禁用共享。您可以使用 quarkus.pulsar.devservices.shared=false 禁用共享。

Setting the port

默认情况下,用于 Pulsar 的 Dev Services 会选择一个随机端口并配置应用程序。您可以通过配置 quarkus.pulsar.devservices.port 属性来设置端口。

请注意,Pulsar 通告地址会使用所选端口自动配置。

Configuring the image

用于 Pulsar 的 Dev Services 支持 official Apache Pulsar image

可以配置自定义镜像名称,如下所示:

quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7

Configuring the Pulsar broker

您可以使用自定义代理配置配置用于 Pulsar 的 Dev Services。

下面的示例启用事务支持:

quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true

Configuring Pulsar clients

Pulsar 客户端、使用者和生成器可以高度定制,以配置 Pulsar 客户端应用程序的行为。

Pulsar 连接器会分别为每个通道创建 Pulsar 客户端和使用者或生成器,每个都采用合理的默认值以简化其配置。尽管创建过程已得到处理,但仍然可以通过 Pulsar 通道来配置所有可用的配置选项。

虽然创建 PulsarClientPulsarConsumerPulsarProducer 的习惯方法是通过构建器 API,但其实质是,这些 API 每次都构建一个配置对象,以传递给实现。那些是 ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData

Pulsar Connector 允许直接对此类配置对象接收属性。例如,用于 PulsarClient 的代理身份验证信息是使用 authPluginClassNameauthParams 属性接收的。为了配置传入通道 data 的身份验证:

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

请注意,Pulsar 使用者属性 subscriptionInitialPosition 也已使用 enum 值 SubscriptionInitialPosition.Earliest 标识的 Earliest 值进行配置。

这种方法涵盖了大多数配置情况。但是,诸如 CryptoKeyReaderServiceUrlProvider 等非序列化对象无法通过这种方式进行配置。Pulsar Connector 允许考虑 Pulsar 配置数据对象的实例 –ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

检索此实例并使用它来配置连接器使用的客户端。您需要使用 client-configurationconsumer-configurationproducer-configuration 属性指示客户端的名称:

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

如果没有配置 [client|consumer|producer]-configuration,连接器将查找带有通道名称的标识的实例:

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

您还可以提供包含按键排列的配置值的 Map<String, Object>

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public Map<String, Object> getProducerConfig() {
        return Map.of(
                "batcherBuilder", BatcherBuilder.KEY_BASED,
                "sendTimeoutMs", 3000,
                "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    }
}

不同的配置源按从最低到最高重要性的顺序加载,如下所示:

  1. 使用默认配置标识符 default-pulsar-clientdefault-pulsar-consumerdefault-pulsar-producer 生成的 Map&lt;String, Object&gt; 配置地图。

  2. 在配置或通道名称中使用标识符生成的 Map&lt;String, Object&gt; 配置地图

  3. 在通道配置或通道名称中使用标识符生成的 [Client|Producer|Consuemr]ConfigurationData 对象

  4. 使用 [Client|Producer|Consuemr]ConfigurationData 字段名称命名的通道配置属性。

有关配置选项的详尽列表,请参阅 Configuration Reference

Configuring Pulsar Authentication

Pulsar 提供了一个可插入式验证框架,Pulsar 代理/代理使用这个机制来验证客户端。

客户端可以在 application.properties 文件中使用 authPluginClassNameauthParams 属性进行配置:

pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}

或者以编程方式:

import java.util.Map;

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData config() {
        var data = new ClientConfigurationData();
        var auth = new AuthenticationBasic();
        auth.configure(Map.of("userId", "superuser", "password", "admin"));
        data.setAuthentication(auth);
        return data;
    }
}

Configuring access to Datastax Luna Streaming

Luna Streaming 是 Apache Pulsar 的一个生产准备发布版本,带有 DataStax 的工具和支持。在创建 DataStax Luna Pulsar 租户后,请注意自动生成的令牌,并配置令牌验证:

pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....

确保事先创建主题或在命名空间配置中启用 Auto Topic Creation

请注意,主题配置需要引用主题的全名:

mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices

Configuring access to StreamNative Cloud

StreamNative Cloud 是一个完全托管的 Pulsar-as-a-Service,可用于不同的部署选项,无论它是完全托管的、在由 StreamNative 管理的公共云中托管的或在 Kubernetes 上自托管的。

StreamNative Pulsar 集群使用 Oauth2 身份验证,因此您需要确保存在 service account,其中包含应用程序正在使用的必需的 permissions to the Pulsar namespace/topic

接下来,您需要下载服务帐户的 Key file(充当 private key)并记下群集的 issuer URL(通常为 https://auth.streamnative.cloud/)和 audience(例如 urn:sn:pulsar:o-rf3ol:redhat)。Admin 部分中的 Pulsar Clients 页面 StreamNative Cloud 控制台可帮助您完成此过程。

使用 Pulsar Oauth2 验证配置您的应用程序:

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}

请注意,pulsar.client.authParams 配置包含一个包含 issuerUrlaudienceprivateKey 的 Json 字符串,该字符串采用 data:application/json;base64,<base64-encoded-key-file> 格式。

或者,您可以以编程方式配置身份验证:

package org.acme.pulsar;

import java.net.MalformedURLException;
import java.net.URL;

import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

@ApplicationScoped
public class PulsarAuth {

    @ConfigProperty(name = "pulsar.issuerUrl")
    String issuerUrl;

    @ConfigProperty(name = "pulsar.credentials")
    String credentials;

    @ConfigProperty(name = "pulsar.audience")
    String audience;

    @Produces
    @Identifier("pulsar-auth")
    public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
        var data = new ClientConfigurationData();
        data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
        return data;
    }
}

这假设密钥文件作为资源包含在应用程序类路径中,那么配置就像下面这样:

mp.messaging.incoming.prices.client-configuration=pulsar-auth

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json

请注意,使用 pulsar-auth 标识的客户端配置的通道需要设置 client-configuration 属性。

Health Checks

Quarkus 扩展报告由 Pulsar 连接器管理的每个通道的启动、准备和活动状态。运行状况检查依赖 Pulsar 客户端来验证是否已与代理建立连接。

StartupReadiness 探针用于入站和出站通道均报告 OK,当时与代理的连接已建立。

用于入站和出站通道的 Liveness 探针报告 OK,当时已与代理建立连接,并且尚未捕获到任何故障。

请注意,消息处理在 nacks 中失败,该消息将由失败策略处理。失败策略负责报告故障并影响活动性检查的结果。fail 失败策略将报告故障,因此活动性检查也将报告故障。

Configuration Reference

以下是 Pulsar 连接器通道、使用者、生产者和客户端的配置属性列表。有关如何配置 Pulsar 客户端的更多信息,请参见 Pulsar Client Configuration

Incoming channel configuration (receiving from Pulsar)

使用以下内容配置下列属性:

mp.messaging.incoming.your-channel-name.attribute=value
Table 1. Incoming Attributes of the 'smallrye-pulsar' connector
Attribute (alias) Description Type Mandatory Default

ack-strategy

指定在确认从记录生成的消息时应用的提交策略。值可以是 ackcumulative

string

false

ack

ackTimeout.redeliveryBackoff

用于配置 ack 超时 MultiplierRedeliveryBackoff、最小延迟、最大延迟、倍数器的逗号分隔值。

string

false

batchReceive

是否使用批量接收使用消息

boolean

false

false

client-configuration

提供此通道的默认 Pulsar 客户端配置的 CDI Bean 的标识符。通道配置仍然可以覆盖任何属性。该 Bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符设置标识符。

string

false

consumer-configuration

提供此通道的默认 Pulsar 消费者配置的 CDI Bean 的标识符。通道配置仍然可以覆盖任何属性。该 Bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符设置标识符。

string

false

deadLetterPolicy.deadLetterTopic

将发送失败消息的备用主题的名称

string

false

deadLetterPolicy.initialSubscriptionName

备用主题的初始订阅的名称

string

false

deadLetterPolicy.maxRedeliverCount

消息在发送到备用主题之前将重新发送的最大次数

int

false

deadLetterPolicy.retryLetterTopic

将发送失败消息的重试主题的名称

string

false

failure-strategy

指定记录产生的消息是否为负面确认 (nack) 时应用的失败策略。值可为 nack(默认值)、failignorereconsume-later

string

false

nack

health-enabled

是否启用健康报告(默认)或禁用

boolean

false

true

negativeAck.redeliveryBackoff

用于配置否定确认 MultiplierRedeliveryBackoff、最小延迟、最大延迟、倍数的逗号分隔值。

string

false

reconsumeLater.delay

重新使用失败策略的默认延迟(单位:秒)

long

false

3

schema

此通道的 Pulsar 架构类型。配置后,将使用给定的 SchemaType 构建架构并在通道中使用。当不存在时,架构将通过搜索使用 Schema 限定的 @Identifier 类型 CDI Bean 和通道名称来解析。作为后备,将使用 AUTO_CONSUME 或 AUTO_PRODUCE。

string

false

serviceUrl

Pulsar 服务的服务 URL

string

false

pulsar://localhost:6650

topic

已消费/填充的 Pulsar 主题。如果未设置,则使用通道名称

string

false

tracing-enabled

是否启用跟踪(默认)或禁用

boolean

false

true

还可以配置底层 Pulsar 使用者支持的属性。

这些属性还可以使用 pulsar.consumer 前缀进行全局配置:

pulsar.consumer.subscriptionInitialPosition=Earliest
Table 2. Pulsar consumer Attributes
Attribute Description Type Config file Default

topicNames

Topic name

Set

true

[]

topicsPattern

Topic pattern

Pattern

true

subscriptionName

Subscription name

String

true

subscriptionType

订阅类型。可以使用四种类型订阅:* 专用* 冗余* 共享* 密钥共享

SubscriptionType

true

Exclusive

subscriptionProperties

Map

true

subscriptionMode

SubscriptionMode

true

Durable

messageListener

MessageListener

false

consumerEventListener

ConsumerEventListener

false

negativeAckRedeliveryBackoff

自定义消息的接口是否为 negativeAcked 策略。你可以为消费者指定 RedeliveryBackoff.

RedeliveryBackoff

false

ackTimeoutRedeliveryBackoff

自定义消息的接口是否为 ackTimeout 策略。你可以为消费者指定 RedeliveryBackoff.

RedeliveryBackoff

false

receiverQueueSize

消费者接收器队列的大小。例如,在应用程序调用 Receive 之前,消费者累积的消息数量。高于默认值的值将增加消费者的吞吐量,但会以使用更多内存为代价。

int

true

1000

acknowledgementsGroupTimeMicros

按指定时间对消费者的确认进行分组。默认情况下,消费者使用 100 毫秒的分组时间向代理发送确认。将分组时间设置为 0 会立即发送确认。较长的确认分组时间更有效,但会略微增加故障后重新发送消息的次数。

long

true

100000

maxAcknowledgmentGroupSize

按消息数量对消费者的确认进行分组。

int

true

1000

negativeAckRedeliveryDelayMicros

延迟在重新发送处理失败的消息之前等待的时间。当应用程序使用 Consumer#negativeAcknowledge(Message) 时,失败的消息将在固定超时后重新发送。

long

true

60000000

maxTotalReceiverQueueSizeAcrossPartitions

跨分区的所有接收器队列的最大大小。如果所有接收器队列的大小超过此值,此设置将减小各个分区接收器队列的大小。

int

true

50000

consumerName

Consumer name

String

true

ackTimeoutMillis

Timeout of unacked messages

long

true

0

tickDurationMillis

ack 超时重新发送的粒度。使用较高的 tickDurationMillis 会减少跟踪消息的内存开销,同时将 ack 超时设置为更大的值(例如 1 小时)。

long

true

1000

priorityLevel

消费者优先级级别,代理在“共享订阅”类型中分发消息时会优先考虑此级别。代理遵循降序优先级。例如,0=最高优先级,1、2、… 在“共享订阅”类型中,代理 first dispatches messages to the max priority level consumers if they have permits 。否则,代理会考虑下一个优先级级别的消费者。Example 1 如果某个订阅具有优先级 priorityLevel 为 0 的消费者 A 和优先级 priorityLevel 为 1 的消费者 B,则代理 only dispatches messages to consumerA until it runs out permits 然后开始向消费者 B 分发消息。Example 2 消费者优先级、级别、允许数量 C1,0,2 C2,0,1 C3,0,1 C4,1,2 C5,1,1 代理向消费者分发消息的顺序为:C1、C2、C3、C1、C4、C5、C4。

int

true

0

maxPendingChunkedMessage

保存待处理分块消息的队列的最大大小。达到阈值时,消费者将放弃待处理消息以优化内存利用率。

int

true

10

autoAckOldestChunkedMessageOnQueueFull

maxPendingChunkedMessage 的阈值达到时,是否自动确认待处理的分块消息。如果设置为 false ,这些消息将由它们的代理重新发送。

boolean

true

false

expireTimeOfIncompleteChunkedMessageMillis

如果消费者未在指定时间段内接收所有分块,则过期不完整分块的时间间隔。默认值为 1 分钟。

long

true

60000

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

cryptoFailureAction

消费者应该在收到无法解密的消息时采取行动。* FAIL :这是默认选项,表示在加密成功之前消息会失败。* DISCARD :静默确认并且不向应用程序发送消息。* CONSUME :向应用程序发送加密消息。应用程序负责解密消息。消息的解压失败。如果消息包含批处理消息,客户端将无法在批处理中检索单个消息。发送的加密消息包含 EncryptionContext ,其中包含应用程序可以用来解密已消耗消息有效负载的加密和压缩信息。

ConsumerCryptoFailureAction

true

FAIL

properties

此消费者的名称或值属性。properties 是应用程序定义的、附加到消费者的元数据。当获取主题统计信息时,将此元数据与消费者统计信息相关联,以便于识别。

SortedMap

true

{}

readCompacted

如果启用 readCompacted ,消费者将从紧凑的主题中读取消息,而不是读取主题的完整消息积压。紧凑主题中,消费者只会看到每个键的最新值,直到达到紧凑的积压主题消息中的那一时刻。超过该点后,照常发送消息。仅对具有单个活动消费者(例如故障或独占订阅)的持久主题的订阅启用 readCompacted 。尝试在非持久主题或共享订阅的订阅上启用它会导致订阅调用抛出 PulsarClientException

boolean

true

false

subscriptionInitialPosition

订阅主题时的初始位置,用于首次设置游标。

SubscriptionInitialPosition

true

Latest

patternAutoDiscoveryPeriod

使用主题消费者的模式时的主题自动发现时间段。默认值和最小值是 1 分钟。

int

true

60

regexSubscriptionMode

使用正则表达式订阅主题时,可以选取特定类型的主题。* PersistentOnly :仅订阅持久主题。* NonPersistentOnly :仅订阅非持久主题。* AllTopics :订阅持久主题和非持久主题。

RegexSubscriptionMode

true

PersistentOnly

deadLetterPolicy

消费者的死信策略。默认情况下,有些消息可能会被多次重新发送,甚至可能一直不停地被重新发送。通过使用死信机制,可以对消息设置最大重新发送次数。When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically 。可以通过设置 deadLetterPolicy 启用死信机制。在未指定 ackTimeoutMillis 的情况下指定死信策略时,可以将 ack 超时设置为 30000 毫秒。

DeadLetterPolicy

true

retryEnable

boolean

true

false

batchReceivePolicy

BatchReceivePolicy

false

autoUpdatePartitions

如果 autoUpdatePartitions 已启用,消费者将自动订阅分区增加。Note :这仅适用于分区消费者。

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

replicateSubscriptionState

如果 replicateSubscriptionState 已启用,订阅状态将复制到地理复制群集。

boolean

true

false

resetIncludeHead

boolean

true

false

keySharedPolicy

KeySharedPolicy

false

batchIndexAckEnabled

boolean

true

false

ackReceiptEnabled

boolean

true

false

poolMessages

boolean

true

false

payloadProcessor

MessagePayloadProcessor

false

startPaused

boolean

true

false

autoScaledReceiverQueueSizeEnabled

boolean

true

false

topicConfigurations

List

true

[]

Outgoing channel configuration (publishing to Pulsar)

Table 3. Outgoing Attributes of the 'smallrye-pulsar' connector
Attribute (alias) Description Type Mandatory Default

client-configuration

提供此通道的默认 Pulsar 客户端配置的 CDI Bean 的标识符。通道配置仍然可以覆盖任何属性。该 Bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符设置标识符。

string

false

health-enabled

是否启用健康报告(默认)或禁用

boolean

false

true

maxPendingMessages

保存待处理消息的队列的最大大小,即正在等待接收来自代理的确认的消息

int

false

1000

producer-configuration

提供此通道的默认 Pulsar 生产者配置的 CDI Bean 的标识符。通道配置仍然可以覆盖任何属性。该 Bean 的类型必须是 Map<String, Object>,并且必须使用 @io.smallrye.common.annotation.Identifier 限定符设置标识符。

string

false

schema

此通道的 Pulsar 架构类型。配置后,将使用给定的 SchemaType 构建架构并在通道中使用。当不存在时,架构将通过搜索使用 Schema 限定的 @Identifier 类型 CDI Bean 和通道名称来解析。作为后备,将使用 AUTO_CONSUME 或 AUTO_PRODUCE。

string

false

serviceUrl

Pulsar 服务的服务 URL

string

false

pulsar://localhost:6650

topic

已消费/填充的 Pulsar 主题。如果未设置,则使用通道名称

string

false

tracing-enabled

是否启用跟踪(默认)或禁用

boolean

false

true

waitForWriteCompletion

客户端是否等待代理确认已写入记录才能确认消息

boolean

false

true

还可以配置底层 Pulsar 生产者支持的属性。

这些属性还可以使用 pulsar.producer 前缀进行全局配置:

pulsar.producer.batchingEnabled=false
Table 4. Pulsar producer Attributes
Attribute Description Type Config file Default

topicName

Topic name

String

true

producerName

Producer name

String

true

sendTimeoutMs

消息发送超时(毫秒)。如果服务器在“14”到期之前未确认消息,将发生错误。

long

true

30000

blockIfQueueFull

如果将其设置为“15”,当传出消息队列已满时,生产者的“16”和“17”方法将阻塞,而不是失败并引发错误。如果将其设置为“18”,则当传出消息队列已满时,生产者的“19”和“20”方法将失败并发生“21”异常。“22”参数确定传出消息队列的大小。

boolean

true

false

maxPendingMessages

暂挂消息的队列的最大大小。例如,等待从“23”接收确认的消息。默认情况下,当队列已满时,所有对“25”和“26”方法的调用都将失败“24”,将“27”设置为“28”。

int

true

0

maxPendingMessagesAcrossPartitions

不同分区中暂挂消息的最大数量。如果总数超过配置的值,使用设置降低每个分区的最大暂挂消息(“29”)。

int

true

0

messageRoutingMode

“33”上为生产者定义的消息路由逻辑。仅在消息上不设置密钥时应用该逻辑。可用选项如下:“30”:循环“31”:把所有消息发布到单个分区*“32”:自定义分区方案

MessageRoutingMode

true

hashingScheme

哈希函数决定将特定消息发布到哪个分区(仅分区分区)。可用选项如下:“34”:相当于Java中的“35”“36”:应用“38”哈希函数*“37”:应用C++“39”库中的哈希函数

HashingScheme

true

JavaStringHash

cryptoFailureAction

加密失败时,生产者应采取措施。“40”:如果加密失败,则未加密的消息将发送失败。“41”:如果加密失败,将发送未加密的消息。

ProducerCryptoFailureAction

true

FAIL

customMessageRouter

MessageRouter

false

batchingMaxPublishDelayMicros

发送消息的分批时间段。

long

true

1000

batchingPartitionSwitchFrequencyByPublishDelay

int

true

10

batchingMaxMessages

一批中允许的最大消息数。

int

true

1000

batchingMaxBytes

int

true

131072

batchingEnabled

Enable batching of messages.

boolean

true

true

batcherBuilder

BatcherBuilder

false

chunkingEnabled

Enable chunking of messages.

boolean

true

false

chunkMaxMessageSize

int

true

-1

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

encryptionKeys

Set

true

[]

compressionType

生产者使用消息数据压缩类型。可用选项:“42”“43”“44”“45”

CompressionType

true

NONE

initialSequenceId

Long

true

autoUpdatePartitions

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

multiSchema

boolean

true

true

accessMode

ProducerAccessMode

true

Shared

lazyStartPartitionedProducers

boolean

true

false

properties

SortedMap

true

{}

initialSubscriptionName

使用此配置在创建主题时自动创建一个初始订阅。如果不设置此字段,则不会创建初始订阅。

String

true

Pulsar Client Configuration

以下是底层 `PulsarClient`的配置参考。可以使用通道属性配置这些选项:

mp.messaging.incoming.your-channel-name.numIoThreads=4

或使用 pulsar.client 前缀进行全局配置:

pulsar.client.serviceUrl=pulsar://pulsar:6650
Table 5. Pulsar client Attributes
Attribute Description Type Config file Default

serviceUrl

用于连接到代理的 Pulsar 集群 HTTP URL。

String

true

serviceUrlProvider

用于生成 ServiceUrl 的 ServiceUrlProvider 的实现类。

ServiceUrlProvider

false

authentication

客户端的身份验证设置。

Authentication

false

authPluginClassName

客户端的身份验证插件类名。

String

true

authParams

客户端的身份验证参数。

String

true

authParamMap

客户端的身份验证映射。

Map

true

operationTimeoutMs

客户端操作超时时间(以毫秒为单位)。

long

true

30000

lookupTimeoutMs

客户端查找超时时间(以毫秒为单位)。

long

true

-1

statsIntervalSeconds

打印客户端统计信息的时间间隔(以秒为单位)。

long

true

60

numIoThreads

Number of IO threads.

int

true

10

numListenerThreads

使用者侦听器线程数。

int

true

10

connectionsPerBroker

客户端与每个代理之间建立的连接数。值 0 表示禁用连接池。

int

true

1

connectionMaxIdleSeconds

如果未使用超过 [connectionMaxIdleSeconds] 秒,则释放连接。如果 [connectionMaxIdleSeconds] < 0,则禁用自动释放空闲连接的功能

int

true

180

useTcpNoDelay

是否使用 TCP NoDelay 选项。

boolean

true

true

useTls

Whether to use TLS.

boolean

true

false

tlsKeyFilePath

TLS 密钥文件路径。

String

true

tlsCertificateFilePath

TLS 证书文件路径。

String

true

tlsTrustCertsFilePath

受信任的 TLS 证书文件路径。

String

true

tlsAllowInsecureConnection

客户端是否接受代理发送的不可信 TLS 证书。

boolean

true

false

tlsHostnameVerificationEnable

当客户端与代理创建 TLS 连接时,是否验证主机名。

boolean

true

false

concurrentLookupRequest

每个代理连接上可以发送的并发查找请求数。设置一个最大值可防止代理过载。

int

true

5000

maxLookupRequest

为防止代理过载而允许在每个代理连接上发出的最大查找请求数。

int

true

50000

maxLookupRedirects

重定向查找请求的最大次数。

int

true

20

maxNumberOfRejectedRequestPerConnection

在当前连接关闭,并且客户端创建了一个新连接以连接到其他代理之后,某个时间范围(60 秒)内一个代理被拒绝的请求的最大数量。

int

true

50

keepAliveIntervalSeconds

每个客户端代理连接保持活动的时间间隔(以秒计)。

int

true

30

connectionTimeoutMs

建立与代理的连接的等待时间。如果在没有代理响应的情况下时间已过,则连接尝试将被终止。

int

true

10000

requestTimeoutMs

完成请求的最长时间。

int

true

60000

readTimeoutMs

请求的最大读取时间。

int

true

60000

autoCertRefreshSeconds

证书自动刷新的时间(以秒计)。

int

true

300

initialBackoffIntervalNanos

初始退避间隔(以纳秒计)。

long

true

100000000

maxBackoffIntervalNanos

最大退避间隔(以纳秒计)。

long

true

60000000000

enableBusyWait

是否为 EpollEventLoopGroup 启用 BusyWait。

boolean

true

false

listenerName

查找侦听器名称。只要可以访问网络,客户端便能利用 listenerName 从侦听器中选择一个作为与代理建立连接的服务 URL。“advertisedListeners”必须在代理端启用。

String

true

useKeyStoreTls

使用 KeyStore 方式设置 TLS。

boolean

true

false

sslProvider

内部客户端用于通过其他 Pulsar 代理进行认证的 TLS 提供程序。

String

true

tlsKeyStoreType

TLS KeyStore type configuration.

String

true

JKS

tlsKeyStorePath

Path of TLS KeyStore.

String

true

tlsKeyStorePassword

Password of TLS KeyStore.

String

true

tlsTrustStoreType

TLS 信任库类型配置。需要在需要客户端认证时设置此配置。

String

true

JKS

tlsTrustStorePath

Path of TLS TrustStore.

String

true

tlsTrustStorePassword

Password of TLS TrustStore.

String

true

tlsCiphers

Set of TLS Ciphers.

Set

true

[]

tlsProtocols

Protocols of TLS.

Set

true

[]

memoryLimitBytes

客户端内存使用限制(以字节为单位)。默认的 64M 可以保证较高的生产者吞吐量。

long

true

67108864

proxyServiceUrl

代理服务 URL。proxyServiceUrl 和 proxyProtocol 必须互相包容。

String

true

proxyProtocol

代理服务协议。proxyServiceUrl 和 proxyProtocol 必须互相包容。

ProxyProtocol

true

enableTransaction

Whether to enable transaction.

boolean

true

false

clock

Clock

false

dnsLookupBindAddress

Pulsar 客户端 DNS 查找绑定地址,默认行为是绑定到 0.0.0.0

String

true

dnsLookupBindPort

Pulsar 客户端 DNS 查找绑定端口,在配置 dnsLookupBindAddress 时生效,默认值为 0。

int

true

0

socks5ProxyAddress

Address of SOCKS5 proxy.

InetSocketAddress

true

socks5ProxyUsername

SOCKS5 代理的用户名。

String

true

socks5ProxyPassword

Password of SOCKS5 proxy.

String

true

description

客户端版本的其他说明信息。长度不得超过 64。

String

true

在配置文件中不可配置的配置属性(不可序列化)在列 `Config file`中标明。

Going further

本指南展示了你可以使用 Quarkus 与 Pulsar 进行交互的方式。它利用 Quarkus Messaging 构建数据流应用程序。

如果您想进一步了解,请查看 SmallRye Reactive Messaging的文档,这是Quarkus中使用的实现。