Using Spring for Apache Pulsar
Preface
我们建议将 Spring-Boot-First 方法用于基于 Apache Pulsar 的 Spring 应用,因为它极大地简化了操作。为此,您可以将 |
本参考的大部分内容都假定读者使用该启动程序,并提供了针对该特定内容的大部分配置说明。但是,有意识强调针对 Spring Boot 启动程序使用的特定说明。 |
Quick Tour
我们将会通过展示一个可以进行生产和使用的 Spring Boot 应用示例,来快速的了解一下 Apache Pulsar 的 Spring 相关特性。这是一个完整的应用程序,并且不需要任何额外的配置,只要你在默认位置 - localhost:6650
上运行着一个 Pulsar 集群。
Dependencies
Spring Boot 应用程序只需要 spring-pulsar-spring-boot-starter
依赖。以下清单分别展示了如何为 Maven 和 Gradle 定义依赖项:
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
<version>{spring-boot-version}</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar:{spring-boot-version}'
}
当使用 0.2.x 版本
时,以上坐标将发生如下更改:
- Maven
-
<dependencies> <dependency> <groupId>org.springframework.pulsar</groupId> <artifactId>spring-pulsar-spring-boot-starter</artifactId> <version>0.2.0</version> </dependency> </dependencies>
- Gradle
-
dependencies { implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0' }
Application Code
以下清单展示了示例的 Spring Boot 应用程序案例:
@SpringBootApplication
public class PulsarBootHelloWorld {
public static void main(String[] args) {
SpringApplication.run(PulsarBootHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
}
@PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
void listen(String message) {
System.out.println("Message Received: " + message);
}
}
让我们快速了解一下该应用程序的高级细节。在文档后面,我们将会看到这些更详细的组件。
在前一个示例中,我们严重依赖 Spring Boot 的自动配置。Spring Boot 为我们的应用程序自动配置了几个组件。它为应用程序自动提供了一个 PulsarClient
,该客户端被生产者和使用者同时使用。
Spring Boot 还自动配置了 PulsarTemplate
,将其注入到应用程序中并开始向 Pulsar 主题发送记录。该应用程序会将消息发送到名为 hello-pulsar
的主题。请注意,该应用程序没有指定任何 schema 信息,因为 Apache Pulsar 的 Spring 库会自动从发送数据的类型推断 schema 类型。
我们使用 PulsarListener
注释来获取我们发布了数据的 hello-pulsar
主题。PulsarListener
是一个便利注释,它将 Apache Pulsar 中的消息监听器容器基础设施包装在 Spring 中。它在幕后创建了一个消息监听器容器来创建和管理 Pulsar 消费者。和常规的 Pulsar 消费者一样,在使用 PulsarListener
时,订阅的默认类型是 Exclusive
模式。当记录发布到 hello-pulsar
主题时,PulsarListener
便会使用它们并在控制台上打印出来。框架还会从 PulsarListener
方法用作有效负载的数据类型中推断出所使用的 schema 类型 - 在本例中为 String
。
Pulsar Client
当你使用 Pulsar Spring Boot Starter 时,你会得到自动配置的 PulsarClient
。
默认情况下,该应用程序会尝试连接到 pulsar://localhost:6650
处的本地 Pulsar 实例。可以通过将 spring.pulsar.client.service-url
属性设为不同的值来调整此功能。
该值必须是一个有效的 {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar 协议] URL |
您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.client.*
] 应用程序属性来进一步配置客户端。
如果不使用该启动程序,则需要配置并自己注册 |
TLS Encryption (SSL)
默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务进行通信。以下部分介绍如何配置 Pulsar 客户端以使用 TLS 加密(SSL)。一个先决条件是,代理也已经配置为使用 TLS 加密。
Spring Boot 自动配置目前不支持任何 TLS/SSL 配置属性。相反,你可以提供一个 PulsarClientBuilderCustomizer
,它将在 Pulsar 客户端构建器上设置必要的属性。Pulsar 同时支持隐私增强邮件(PEM)和 Java 密钥库(JKS)证书格式。
请按照以下步骤配置 TLS:
-
调整 Pulsar 客户端服务 URL 以使用
pulsar+ssl://
方案和 TLS 端口(通常为6651
)。 -
调整管理客户端服务 URL 以使用
https://
方案和 TLS Web 端口(通常为8443
)。 -
提供客户端构建器定制器,在构建器上设置相关属性。
-
{github}/blob/02730275e8d0291525eed9db5babe880c555a7bd/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/app/SamplePemBasedSslConfig.java#L30-L49[PEM based sample]
-
{github}/blob/02730275e8d0291525eed9db5babe880c555a7bd/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/app/SampleJksBasedSslConfig.java#L30-L57[JKS based sample]
-
您可以在官方 {apache-pulsar-docs}/security-tls-transport/[Pulsar TLS 加密] 文档中找到关于上述内容的更多信息。
Authentication
为了连接到需要认证的 Pulsar 集群,你需要指定要使用的认证插件和指定插件所需的任何参数。在 使用 Spring Boot 自动配置时,你可以通过配置属性设定插件和插件参数(大多数情况下)。
你需要确保 |
通过环境变量使用认证参数通常会有问题,因为在转换过程中会丢失大小写敏感性。例如,考虑通过环境变量设置的以下
当 Spring Boot 加载此属性时,它将使用
|
在 不使用 Spring Boot 自动配置时,你可以使用 org.apache.pulsar.client.api.AuthenticationFactory
创建认证,然后在提供给客户端工厂的客户端自定义程序中直接将其设置在 Pulsar 客户端构建器上。
下列清单演示了如何配置每个受支持的认证机制。
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz param: tenantDomain: ... tenantService: ... providerDomain: ... privateKey: ... keyId: ...
这也需要 TLS encryption。
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken param: token: some-token-goes-here
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic param: userId: ... password: ...
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 param: issuerUrl: ... privateKey: ... audience: ... scope: ...
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl param: saslJaasClientSectionName: ... serverType: ...
由于此选项需要使用 TLS 加密,这已要求您 provide a client builder customizer,因此建议直接在您提供的 TLS 自定程序中对客户端构建器添加认证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来帮助创建身份验证对象,如下所示:
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");
有关 {apache-pulsar-docs}/security-tls-authentication/#configure-mtls-authentication-in-pulsar-clients[mTLS (PEM)],请参见 Pulsar 官方文档。
由于此选项需要使用 TLS 加密,这已要求您 provide a client builder customizer,因此建议直接在您提供的 TLS 自定程序中对客户端构建器添加认证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来帮助创建身份验证对象,如下所示:
Authentication auth = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));
有关 {apache-pulsar-docs}/security-tls-authentication/#configure-clients[mTLS (JKS)],请参见 Pulsar 官方文档。
您可以在官方{apache-pulsar-docs}/security-overview#authentication-providers[Pulsar 安全]文档中找到更多有关每个支持插件的详细信息以及必要的属性。
Message Production
Pulsar Template
在 Pulsar 产品方,Spring Boot 自动配置提供了一个 PulsarTemplate
来发布记录。该模板实现了名为 PulsarOperations
的一个接口,并提供了通过其契约发布记录的方法。
这些发送 API 方法有两种类型:send
和 sendAsync
。send
方法使用 Pulsar 生产者的同步发送功能来禁止调用。在一个消息在代理上持久化之后,它们会返回已发布的消息的 MessageId
。sendAsync
方法调用是非阻塞的异步调用。它们返回一个 CompletableFuture
,你可以在消息发布之后使用该 CompletableFuture
来异步接收消息 ID。
对于不包含主题参数的 API 变量,使用 topic resolution process 确定目标主题。 |
Simple API
该模板提供了一些方法(以 'send' 为前缀的 {javadocs}/org/springframework/pulsar/core/PulsarOperations.html)用于简单的发送请求。对于更复杂的发送请求,一个流利的 API 可以让您配置更多选项。
Fluent API
该模板提供一个{javadocs}/org/springframework/pulsar/core/PulsarOperations.html#newMessage(T)[流畅构建器]来处理更复杂的发送请求。
Message customization
您可以指定 TypedMessageBuilderCustomizer
来配置出站消息。例如,以下代码显示如何发送键入消息:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
Producer customization
您可以指定 ProducerBuilderCustomizer
来配置最终构建用于发送出站消息的生成器的底层 Pulsar 生成器。
谨慎使用,因为它赋予对生产者构建器的完全访问权限,调用其中某些方法(例如 create
)可能产生预料之外的副作用。
例如,以下代码显示如何禁用批处理并启用块处理:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
此其他示例显示在向分区主题发布记录时如何使用自定义路由。在 Producer
生成器上指定您的自定义 MessageRouter
实现,例如:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
请注意,在使用 |
此其他示例显示如何在向代理发布之前拦截和修改生成器接收到的消息时添加 ProducerInterceptor
:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
自定义器仅适用于用于发送操作的生成器。如果您想将自定义器应用于所有生成器,则必须按照 Global producer customization 中所述将它们提供给生成器工厂。
使用 Lambda 定制器时,必须遵循 “Caution on Lambda customizers” 中描述的规则。
Specifying Schema Information
@6
Specifying Schema Information
如果你使用 Java 原语类型,框架会为你自动检测架构,且你不需要为发布数据指定任何架构类型。对于非原语类型,如果在 PulsarTemplate
上调用发送操作时未明确指定架构,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON
。
当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE 以及 INLINE 编码。
Custom Schema Mapping
作为在 PulsarTemplate
上调用发送操作时为复杂类型指定架构的替代方案,可以将架构解析器配置为具有类型的映射。这消除了指定架构的需要,因为框架使用传出消息类型通过解析器进行查询。
@3
@4
有了此配置,无需设置发送操作时的架构。
Producing with AUTO_SCHEMA
如果没有办法提前了解 Pulsar 主题的架构类型,您可以使用 {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] 架构来安全地发布原始 JSON 或 Avro 负载作为 byte[]
。
在这种情况下,生成器会验证出站字节是否与目标主题的架构兼容。
只需在模板发送操作中指定 Schema.AUTO_PRODUCE_BYTES()
架构,如下例所示:
@7
这仅受 Avro 和 JSON 模式类型支持。 |
Pulsar Producer Factory
PulsarTemplate`依靠 `PulsarProducerFactory`来实际创建底层生成器。Spring Boot 自动配置还提供了此生成器工厂,您可以通过指定 {spring-boot-pulsar-config-props}[`spring.pulsar.producer.*
] 应用程序属性来进一步配置该工厂。
如果在直接使用生产者工厂 API 时未指定主题信息,则与 |
Global producer customization
该框架提供了 ProducerBuilderCustomizer
合约,允许您配置用于构建每个生成器的底层生成器。要自定义所有生成器,您可以将自定义器列表传递给`PulsarProducerFactory` 构造函数。使用多个自定义器时,它们将按照在列表中出现的顺序应用。
如果您使用 Spring Boot 自动配置,则可以将定制器指定为 Bean,并将根据其 |
如果您只想将自定义器应用于单个生成器,则可以使用 Fluent API 和 [在发送时指定自定义器,single-producer-customize]。
Pulsar Producer Caching
每个底层 Pulsar 生成器都会使用资源。为了提高性能并避免持续创建生成器,生成器工厂会缓存其创建的生成器。它们以 LRU 方式进行缓存,并在配置的时间段内未使用它们时将其逐出。 cache key仅包含足够的信息来确保在后续创建请求中向调用者返回相同的生成器。
此外,您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*
] 应用程序属性来配置缓存设置。
Caution on Lambda customizers
任何用户提供的生产者自定义项也包含在高速缓存键中。由于高速缓存键依赖于 equals/hashCode
的有效实现,所以在使用 Lambda 自定义项时必须谨慎。
RULE: 作为 Lambda 实现的两个定制器将匹配在 equals/hashCode
if and only if 上,它们使用相同的 Lambda 实例,且不需要在封闭之外定义任何变量。
为了阐明上述规则,我们将看几个示例。在以下示例中,自定义项被定义为内联 Lambda,这意味着每次对 sendUser
的调用都使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它*将*匹配为高速缓存键。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在此下一个案例中,自定义项被定义为内联 Lambda,这意味着每次对 sendUser
的调用都使用相同的 Lambda 实例。但是,它需要闭包之外的变量。因此,它*不会*匹配为高速缓存键。
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在最后这个示例中,自定义项被定义为内联 Lambda,意味着每次对 sendUser
的调用都使用相同的 Lambda 实例。虽然它确实使用变量名,但是它不是从其闭包外部产生的,因此*将*匹配为高速缓存键。这说明变量可以在 Lambda 闭包*内*使用,甚至可以对静态方法进行调用。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
RULE: 如果没有 once and only once 定义您的 Lambda 定制器(在后续调用中会使用相同的实例) OR,则需要使用有效 equals/hashCode
实现提供定制器实现,且该实现必须在封闭之外定义变量。
如果未遵循这些规则,则生产者缓存将始终错过,并且您的应用程序性能将受到负面影响。
Intercept Messages on the Producer
添加 ProducerInterceptor
允许您拦截和更改在发布到代理之前由生产者接收到的消息。要这样做,您可以将拦截器列表传递到 PulsarTemplate
构造函数中。在使用多个拦截器时,将其应用的顺序是它们在列表中的出现顺序。
如果您使用 Spring Boot 自动配置,则可以将拦截器指定为 Bean。它们会自动传递到 PulsarTemplate
中。使用 @Order
注释按如下方式对拦截器排序:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果您未使用该启动器,您将需要自己配置并注册上述组件。 |
Message Consumption
Pulsar Listener
当涉及到 Pulsar 消费者时,我们建议终端用户应用程序使用 PulsarListener
注释。要使用 PulsarListener
,您需要使用 @EnablePulsar
注释。当您使用 Spring Boot 支持时,它会自动启用此注释并配置 PulsarListener
所需的所有组件,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer
使用 PulsarConsumerFactory
创建和管理底层 Pulsar 消费者,该底层 Pulsar 消费者用来消费消息。
Spring Boot 提供此消费者工厂,您可以通过指定 {spring-boot-pulsar-config-props}[已配置属性的`spring.pulsar.consumer.`] application properties. *Most] 来进一步配置该工厂,在使用以下 *exceptions*的侦听器中尊重工厂中的属性:
|
|
让我们重新审视我们在快速浏览部分中看到的 PulsarListener
代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在此最基本的形式中,当 subscriptionName
未在 @PulsarListener
注释中提供时,将使用自动生成的订阅名称。同样,当未直接提供 topics
时,将使用 [主题解析过程,主题解析过程-命令式] 来确定目标主题。
在前面显示的 PulsarListener
方法中,我们以 String
的形式接收数据,但是我们未指定任何模式类型。在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需类型。该框架会检测到您期望 String
类型,然后根据该信息推断模式类型并将该模式提供给消费者。该框架对所有基本类型执行此推断。对于所有非基本类型,默认模式假定为 JSON。如果复杂类型使用除 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须在使用 schemaType
属性的注释中提供模式类型。
以下示例显示了另一个 PulsarListener
方法,它使用 Integer
:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下 PulsarListener
方法显示了我们如何从主题消费复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们再看一些方法。
您可以直接消费 Pulsar 消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
以下示例使用 Spring 消息信封来消费记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量消费记录。以下示例使用 PulsarListener
以 POJO 的形式批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此例中,我们接收记录作为对象的集合 (List
)。此外,为了在 PulsarListener
级别启用批量使用,需要将注解上的 batch
属性设置为 true
。
框架会根据 List
所保存的实际类型来尝试推断要使用的模式。如果 List
中包含 JSON 之外的复杂类型,则仍需要在 PulsarListener
上提供 schemaType
。
以下内容使用 Pulsar Java 客户端提供的 消息
封装:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用 Spring 消息 Message
类型的封装来批量使用记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,还可以针对批量监听器使用来自 Pulsar 的 Messages
持有者对象:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
在使用 PulsarListener
时,可以在注解本身上直接提供 Pulsar 消费者属性。在不想使用前面提到的引导配置属性或有多个 PulsarListener
方法时,这是非常方便的。
以下示例直接在 PulsarListener
上使用 Pulsar 消费者属性:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
所使用的属性是直接 Pulsar 消费者属性,而不是 |
Generic records with AUTO_CONSUME
如果无法提前知晓 Pulsar 主题的模式类型,则可以使用 AUTO_CONSUME
模式类型来使用常规记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord
对象。
若要使用常规记录,请在 @PulsarListener
上将 schemaType = SchemaType.AUTO_CONSUME
设置为以下所示内容,并且将类型为 GenericRecord
的 Pulsar 信息用作信息参数。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
|
Customizing the ConsumerBuilder
可以使用 PulsarListenerConsumerBuilderCustomizer
自定义任何可通过 ConsumerBuilder
获得的字段,方法是向其提供 PulsarListenerConsumerBuilderCustomizer
类型的 @Bean
, 然后将其提供给 PulsarListener
,如下所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个 |
@1
Accessing the Pulsar Consumer Object
有时,您需要直接访问 Pulsar Consumer 对象。以下示例展示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
当以这种方式访问 Consumer
对象时,不要调用任何会通过调用任何接收方法来更改消费者游标位置的操作。所有此类操作都必须由容器完成。
Pulsar Message Listener Container
现在,我们通过 PulsarListener
查看了消费者端的基本交互。现在,让我们深入了解 PulsarListener
如何与底层 Pulsar 消费者交互。请记住,对于最终用户应用程序,大多数情况下,我们建议直接使用 PulsarListener
注释从 Pulsar 主题使用 Spring for Apache Pulsar 进行使用,因为该模型涵盖了广泛的应用程序用例。但是,了解 PulsarListener
在内部如何工作非常重要。本部分将介绍这些详细信息。
正如前面简要提到的,当您使用 Spring for Apache Pulsar 时,消息监听器容器是消息使用核心。PulsarListener
在后台使用消息监听器容器来创建和管理 Pulsar 消费者。Spring for Apache Pulsar 通过 PulsarMessageListenerContainer
提供此消息监听器容器的合同。此消息监听器容器的默认实现通过 DefaultPulsarMessageListenerContainer
提供。正如它的名称所示,PulsarMessageListenerContainer
包含消息监听器。容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。数据由所提供的消息监听器实现进行处理。
消息监听器容器使用消费者的 batchReceive
方法批量使用数据。接收到数据后,会将其传递给选定的消息监听器实现。
在使用 Spring for Apache Pulsar 时,可以使用以下消息监听器类型。
我们将在以下部分中了解有关这些各种消息监听器的详细信息。
但是,在这样做之前,让我们仔细了解容器本身。
DefaultPulsarMessageListenerContainer
这是一个基于单个消费者的消息侦听器容器。以下清单显示了其构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它接收 PulsarConsumerFactory
(用于创建消费者)和 PulsarContainerProperties
对象(其中包含有关容器属性的信息)。PulsarContainerProperties
具有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过 PulsarContainerProperties
提供主题信息,或作为传输到消费者工厂的消费者属性。以下示例使用 DefaultPulsarMessageListenerContainer
:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
当使用监听器容器直接指定主题信息时,同一个 topic resolution process 由 |
DefaultPulsarMessageListenerContainer
仅创建单个消费者。如果您希望有多个消费者通过多个线程进行管理,则需要使用 ConcurrentPulsarMessageListenerContainer
。
ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer
具有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer
允许您通过 setter 指定 concurrency
属性。仅允许非独占订阅(failover
、shared
和 key-shared
)中的并发大于 1
。当您具有独占订阅模式时,只能将并发设置为默认的 1
。
以下示例通过 PulsarListener
注释为 failover
订阅启用了 concurrency
。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前述侦听器中,假设主题 my-topic
有三个分区。如果它是一个非分区主题,则将并发设置为 3
无效。除了主要的活动消费者之外,您还会有两个空闲消费者。如果主题有三个以上的分区,则消息将在容器创建的消费者之间进行负载均衡。如果您运行此 PulsarListener
,您会看到来自不同分区的消息是通过不同的消费者使用前述示例中的线程名称和消费者名称输出消耗的。
当您以这种方式在分区主题上使用 |
以下清单展示了 PulsarListener
的另一个示例,但启用了 Shared
订阅和 concurrency
。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前述示例中,PulsarListener
创建五个不同的消费者(这次我们假设主题有五个分区)。
在这个版本中,没有消息排序,因为 |
如果您需要消息排序并且仍然需要共享订阅类型,则需要使用 Key_Shared
订阅类型。
Message Consumption
让我们看看消息侦听器容器如何同时启用单记录和基于批处理的消息消耗。
让我们重新审视一下我们为此次讨论而提供的基本 PulsarListener
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
使用此 PulsarListener
方法,我们实际上要求 Spring for Apache Pulsar 每一次使用单条记录来调用侦听器方法。我们提到消息侦听器容器使用消费者的 batchReceive
方法以批处理方式使用数据。在此情形下,框架检测到 PulsarListener
接收单条记录。这意味着在每次调用此方法时,它需要一条记录。虽然消息侦听器容器以批处理方式使用记录,但它会遍历已收到的批处理并通过 PulsarRecordMessageListener
适配器调用侦听器方法。如您在前一节所见,PulsarRecordMessageListener
扩展自 Pulsar Java 客户端提供的 MessageListener
,并且支持基本 received
方法。
以下示例显示了 PulsarListener
以批处理方式使用记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用此类型的 PulsarListener
时,框架会检测到您处于批处理模式。因为它已经使用消费者的 batchReceive
方法以批处理方式接收数据,所以它将整个批处理通过 PulsarBatchMessageListener
适配器传递到侦听器方法。
Pulsar Headers
Pulsar 消息元数据可以作为 Spring 消息头被使用。可用的标头列表可以在{github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java]中找到。
Accessing in Single Record based Consumer
以下示例展示了如何在一使用单条记录使用模式的应用程序中访问各种 Pulsar 头:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前述示例中,我们访问 messageId
和 rawData
消息元数据的值,以及一个名为 foo
的自定义消息属性。Spring @Header
注释用于每个头字段。
您还可以将 Pulsar 的 Message
用作承载负载的信封。在此过程中,用户可以直接调用 Pulsar 消息上的对应方法来检索元数据。但是,为了方便起见,您也可以使用 Header
注释来检索它。请注意,您还可以使用 Spring 消息 Message
信封来承载负载,然后通过 @Header
检索 Pulsar 头。
Accessing in Batch Record based Consumer
在本节中,我们了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 头部:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前一个示例中,我们以 List<String>
的形式消费数据。在提取各种头部时,我们也以 List<>
的形式进行提取。针对 Apache Pulsar 的 Spring 确保头部列表与数据列表相对应。
当使用批处理侦听器并将有效负载接收为 List<org.apache.pulsar.client.api.Message<?>
, org.apache.pulsar.client.api.Messages<?>
, 或 org.springframework.messaging.Messsge<?>
时,你也可以以相同的方式提取头部。
Message Acknowledgment
当使用针对 Apache Pulsar 的 Spring 时,除非应用程序选择退出,否则消息应答由框架处理。在本节中,我们详细了解框架如何处理消息应答。
Message ACK modes
针对 Apache Pulsar 的 Spring 提供以下消息应答模式:
-
BATCH
-
RECORD
-
MANUAL
BATCH
应答模式为默认模式,但你可以在消息侦听器容器上对其进行更改。在以下部分中,我们将了解在同时使用 PulsarListener
的单版本和批处理版本时应答如何运作,以及它们如何转换为支持的消息侦听器容器(最终转换为 Pulsar 消费者)。
Automatic Message Ack in Single Record Mode
我们回顾一下基于单个基本消息的 PulsarListener
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
如果你熟悉直接使用 Pulsar 消费者,那么自然会好奇在使用 PulsarListener
时应答如何运作。答案归结为消息侦听器容器,因为它是针对 Apache Pulsar 的 Spring 中协调所有消费者相关活动的核心位置。
假设你没有覆盖默认行为,则在使用上述 PulsarListener
时幕后将发生以下情况:
-
首先,监听器容器从 Pulsar 消费者接收批处理消息。
-
接收的消息一次一条地交付给
PulsarListener
。 -
当所有记录都已交付给监听器方法并已成功处理时,容器会确认原始批处理中的所有消息。
这是正常流程。如果原始批处理中的任何记录抛出异常,针对 Apache Pulsar 的 Spring 将分别跟踪这些记录。在批处理中的所有记录都经过处理后,针对 Apache Pulsar 的 Spring 将应答所有成功消息并对所有失败消息进行否定应答 (nack)。换句话说,在使用 PulsarRecordMessageListener
消费单个记录且使用 BATCH
的默认应答模式时,该框架将等待从 batchReceive
调用接收的所有记录都成功处理,然后在 Pulsar 消费者上调用 acknowledge
方法。如果在调用处理程序方法时任何特定记录抛出异常,针对 Apache Pulsar 的 Spring 将跟踪这些记录并在处理整个批处理后对这些记录分别调用 negativeAcknowledge
。
如果应用程序希望应答或否定应答按记录发生,则可以启用 RECORD
应答模式。在这种情况下,在处理每条记录后,如果没有错误则应答消息,如果有错误则否定应答。以下示例在 Pulsar 侦听器上启用 RECORD
应答模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
Manual Message Ack in Single Record Mode
你可能并不总是希望框架发送应答,而希望直接从应用程序本身发送应答。针对 Apache Pulsar 的 Spring 提供了几种启用手动消息应答的方法。以下示例展示了其中一种方法:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
这里有一些事情值得解释。首先,我们通过在 PulsarListener
上设置 ackMode
来启用手动应答模式。启用手动应答模式时,针对 Apache Pulsar 的 Spring 允许应用程序注入一个 Acknowledgment
对象。该框架通过选择兼容的消息侦听器容器来实现这一目标:基于单个记录的消费的 PulsarAcknowledgingMessageListener
,它允许你访问 Acknowledgment
对象。
Acknowledgment
对象提供以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
在使用 MANUAL
应答模式的同时,你可以将此 Acknowledgment
对象注入到你的 PulsarListener
中,然后调用相应的方法之一。
在上述 PulsarListener
示例中,我们调用了一个无参数的 acknowledge
方法。这是因为框架知道它当前在哪个 Message
下操作。在调用 acknowledge()
时,你不必通过 Message
信封接收有效负载,而应使用目标类型——在本例中为 String
。你还可以通过提供消息 ID 来调用 acknowledge
的不同变体:acknowledge.acknowledge(message.getMessageId());`在使用 `acknowledge(messageId)
时,你必须使用 Message<?>
信封接收有效负载。
与应答类似,Acknowledgment
API 也提供否定应答选项。请参阅前面显示的 nack 方法。
你也可以直接在 Pulsar 消费者上调用 acknowledge
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
在直接在基础消费者上调用 acknowledge
时,你需要自己进行错误处理。使用 Acknowledgment
不需要这样做,因为框架可以为你完成此操作。因此,在使用手动应答时,你应该使用 Acknowledgment
对象方法。
当使用手动确认时,理解框架完全保留所有确认非常重要。因此,在设计应用程序时,仔细考虑正确的确认策略非常重要。
Automatic Message Ack in Batch Consumption
当以批处理形式消费记录(请参阅“Message ACK modes”)且使用 BATCH
的默认应答模式时,在成功处理整个批处理后,将对整个批处理进行应答。如果任何记录抛出异常,将对整个批处理进行否定应答。请注意,这可能与在生产者端批处理的批处理不同。相反,这是从在消费者上调用 batchReceive
返回的批处理
考虑以下批处理侦听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合(在本例中为消息)中的所有消息均处理完毕后,框架将确认所有消息。
在批处理模式中使用时,“RECORD”不是允许的确认模式。这可能会引起问题,因为应用程序可能不希望整个批处理再次重新传达。在这种情况下,您需要使用`MANUAL`确认模式。
Manual Message Ack in Batch Consumption
如在上一节中所见,当“MANUAL”确认模式在消息侦听器容器上设置时,框架不会执行任何确认,无论为正还是负。它完全取决于应用程序来处理此类问题。当设置`MANUAL`确认模式时,Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:`PulsarBatchAcknowledgingMessageListener`用于批处理使用,它允许您访问“确认”对象。以下是“确认”API 中提供的方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
您可以将此“确认”对象注入到`PulsarListener`中,同时使用`MANUAL`确认模式。以下清单显示了基于批处理的侦听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
当您使用批处理侦听器时,消息侦听器容器无法得知它当前正在对哪条记录执行操作。因此,若要手动确认,您需要使用接受`MessageId`或`List<MessageId>`的重载`acknowledge`方法。您还可以对批处理侦听器的`MessageId`进行否定确认。
Message Redelivery and Error Handling
现在我们已经了解了`PulsarListener`和消息侦听器容器基础架构及其各种功能,接下来让我们尝试理解消息重新投递和错误处理。Apache Pulsar 为消息重新投递和错误处理提供了各种本地策略。我们将仔细观察这些策略,并了解如何通过 Spring for Apache Pulsar 使用它们。
Specifying Acknowledgment Timeout for Message Redelivery
默认情况下,Pulsar 消费者不会重新投递消息,除非消费者崩溃,但您可以通过在 Pulsar 消费者上设置确认超时来更改此行为。如果确认超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新投递消息。
当您使用 Spring for Apache Pulsar 时,您可以通过 [消费者自定义程序,在 PulsarListener 中进行消费者自定义]或使用`@PulsarListener`的`properties`属性中的本地 Pulsar`ackTimeout`属性来设置此属性:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeout=60s"})
public void listen(String s) {
...
}
如果您指定确认超时,如果消费者在 60 秒内未发送确认,Pulsar 会将消息重新投递给消费者。
如果您想为具有不同延迟的确认超时指定一些高级回退选项,可以执行以下操作:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeout=60s" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们使用 Pulsar 的`RedeliveryBackoff`指定一个 Bean,该 Bean 具有 1 秒的最小延迟、10 秒的最大延迟和 2 的回退倍数。在初始确认超时发生后,消息重新投递将通过此回退 Bean 进行控制。我们通过将`ackTimeoutRedeliveryBackoff`属性设置为实际 Bean 名称(在本例中为`ackTimeoutRedeliveryBackoff`)将回退 Bean 提供给`PulsarListener`注释。
Specifying Negative Acknowledgment Redelivery
在否定确认时,Pulsar 消费者允许您指定应用程序希望如何重新投递消息。默认情况下,消息将在 1 分钟内重新投递,但您可以通过 [消费者自定义程序,在 PulsarListener 中进行消费者自定义]或使用`@PulsarListener`的`properties`属性中的本地 Pulsar `negativeAckRedeliveryDelay`属性来更改默认设置:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以使用回退倍数指定不同的延迟和回退机制,方法是提供一个`RedeliveryBackoff` Bean,并像下面这样将 Bean 名称作为 PulsarProducer 上的`negativeAckRedeliveryBackoff`属性提供:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
Using Dead Letter Topic from Apache Pulsar for Message Redelivery and Error Handling
Apache Pulsar 允许应用程序在具有`Shared`订阅类型的消费者上使用死信主题。对于`Exclusive`和`Failover`订阅类型,此功能不可用。基本思想是,如果某条消息被重试了一定次数(可能是由于确认超时或 Nack 重新投递),一旦重试次数用尽,消息就可以发送到一个名为死信队列 (DLQ) 的特殊主题中。让我们通过检查一些代码段来了解该功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个用于`DeadLetterPolicy`的特殊 Bean,它被命名为`deadLetterPolicy`(可以是您想要的任何名称)。此 Bean 指定许多内容,例如最大传递次数(在本例中为 10)和死信主题的名称(在本例中为`my-dlq-topic`)。如果您不指定 DLQ 主题名称,则在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ
。接下来,我们通过设置`deadLetterPolicy`属性将此 Bean 名称提供给`PulsarListener`。请注意,PulsarListener`的订阅类型为`Shared
,因为 DLQ 功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了 1 秒的`ackTimeout`值。这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,就会重试。如果该循环持续十次(这是我们在`DeadLetterPolicy`中的最大重新投递次数),Pulsar 消费者会将消息发布到 DLQ 主题。我们有另一个`PulsarListener`侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。
如果主主题被分区,在后台,Pulsar 将每个分区视为单独主题。Pulsar 会将`partition-<n>`附加到主主题名称,其中`n`代表分区号。问题在于,如果您没有指定 DLQ 主题(与我们上面所做相反),Pulsar 会发布到默认主题名称,其中包含此 `partition-<n>
信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ
。解决此问题的简单方法是始终提供 DLQ 主题名称。
Native Error Handling in Spring for Apache Pulsar
正如我们前面提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。如果应用程序需要对非共享订阅使用一些类似功能,该怎么办?Pulsar 不支持在独占和故障转移订阅上使用 DLQ 的主要原因是这些订阅类型有顺序保证。允许重新投递、DLQ 等实际上是以无序的方式接收消息。但是,如果应用程序对此没有意见,但更重要的是,是否需要非共享订阅的此 DLQ 功能?为此,Spring for Apache Pulsar 提供了`PulsarConsumerErrorHandler`,您可以在 Pulsar 中的任何订阅类型(Exclusive
、Failover
、Shared`或`Key_Shared
)中使用它:
当您从 Spring for Apache Pulsar 使用`PulsarConsumerErrorHandler`时,请确保不要在侦听器上设置确认超时属性。
让我们看看通过检查一些代码段来了解一些详细信息:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑 pulsarConsumerErrorHandler
bean。这创建了一个 PulsarConsumerErrorHandler
类型的 bean,并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler
。DefaultPulsarConsumerErrorHandler
有一个构造函数,它获取一个 PulsarMessageRecovererFactory
和一个 org.springframework.util.backoff.Backoff
。PulsarMessageRecovererFactory
是一个具有以下 API 的函数式接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
recovererForConsumer
方法获取一个 Pulsar 消费者并返回一个 PulsarMessageRecoverer
,这是一个另外的函数式接口。以下是 PulsarMessageRecoverer
的 API:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 为 PulsarMessageRecovererFactory
提供了一个实现,称为 PulsarDeadLetterPublishingRecoverer
,该实现提供了一个默认实现,可以通过将其发送到死信主题 (DLT) 来恢复消息。我们将此实现提供给前面 DefaultPulsarConsumerErrorHandler
的构造函数。作为第二个参数,我们提供一个 FixedBackOff
。你还可以提供 Spring 提供的 ExponentialBackoff
以获取高级后退功能。然后,我们将此 bean 名称提供给 PulsarConsumerErrorHandler
,作为 PulsarListener
的属性。该属性称为 pulsarConsumerErrorHandler
。每次 PulsarListener
方法因消息而失败时,它都会被重试。重试次数由提供的 Backoff
实现值控制。在我们的示例中,我们执行了 10 次重试(总共 11 次尝试——第一次和随后 10 次重试)。一旦耗尽所有重试,消息将被发送到 DLT 主题。
我们提供的 PulsarDeadLetterPublishingRecoverer
实现使用 PulsarTemplate
,该模板用于将消息发布到 DLT。在大多数情况下,来自 Spring Boot 的相同自动配置 PulsarTemplate
就足够了,对于分区主题有一个警告。当使用分区主题以及对主主题使用自定义消息路由时,你必须使用不采用自动配置的 PulsarTemplate
,其使用带有 message-routing-mode
值 custompartition
的 PulsarProducerFactory
。你可以使用带有以下蓝图的 PulsarConsumerErrorHandler
:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们要将目标解析程序作为第二个构造函数参数提供给 PulsarDeadLetterPublishingRecoverer
。如果没有提供,PulsarDeadLetterPublishingRecoverer
将 <subscription-name>-<topic-name>-DLT>
用作 DLT 主题名称。使用此功能时,你应当通过设置目标解析程序来使用一个合适的目标名称,而不是使用默认值。
当使用单记录消息侦听器时,就像我们在 PulsarConsumerErrorHnadler
中所做的那样,并且如果你使用手动确认,请务必确保在引发异常时不否定确认消息。相反,将异常重新抛回容器。否则,容器认为该消息被单独处理,并且不会触发错误处理。
最后,我们有一个第二个 PulsarListener
,它从 DLT 主题接收消息。
在本节中提供的示例中,我们只看到了如何将 PulsarConsumerErrorHandler
与一个单记录消息侦听器配合使用。接下来,我们看看如何在批处理侦听器上使用它。
Batch listener with PulsarConsumerErrorHandler
首先,让我们看看一个批处理 PulsarListener
方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
我们再次用 PulsarConsumerErrorHandler
bean 名称提供 pulsarConsumerErrorHandler
属性。当你使用批处理侦听器(如前面示例中所示)并希望使用 Spring for Apache Pulsar 的 PulsarConsumerErrorHandler
时,你需要使用手动确认。通过这种方式,你可以确认所有成功的个别消息。对于失败的消息,你必须使用失败的消息抛出 PulsarBatchListenerFailedException
。没有此异常,框架不知道如何处理失败。在重试时,容器会发送一批新的消息,从失败的消息开始发送给侦听器。如果再次失败,它将被重试,直到重试用尽为止,此时消息将被发送到 DLT。那时,该消息将被容器确认,侦听器将移交给原始批处理中的后续消息。
Consumer Customization on PulsarListener
Spring for Apache Pulsar 提供了一种便捷的方法来定制 PulsarListener
使用的容器创建的消费者。应用程序可以为 PulsarListenerConsumerBuilderCustomizer
提供一个 bean。这里是一个示例。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,此自定义器 bean 名称可以像下面所示作为 PuslarListener
注解上的属性提供。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过 PulsarListener
检测提供的 bean,并在创建 Pulsar 消费者之前对 Consumer 构建器应用此自定义器。
如果你有多个 PulsarListener
方法,并且它们各自具有不同的自定义规则,你应该创建多个自定义器 bean,并将适当的自定义器附加到每个 PulsarListener
。
Pausing and Resuming Message Listener Containers
在某些情况下,应用程序可能希望暂时暂停消息消费,然后稍后再恢复。Spring for Apache Pulsar 提供了暂停和恢复基础消息侦听器容器的能力。当 Pulsar 消息侦听器容器被暂停时,容器执行的任何轮询以从 Pulsar 消费者接收数据都将被暂停。类似地,当容器恢复时,如果暂停期间主题有任何新的记录添加,则下一个轮询开始返回数据。
要暂停或恢复侦听器容器,首先通过 PulsarListenerEndpointRegistry
bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下面的代码段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给 |
Pulsar Reader Support
此框架提供支持,以通过 `PulsarReaderFactory`使用 {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader]。
Spring Boot 提供此读者工厂,您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.reader.*
] 应用程序属性来进一步配置该工厂。
PulsarReader Annotation
虽然可以直接使用 PulsarReaderFactory
,但 Spring for Apache Pulsar 提供了 PulsarReader
注解,你可以使用它来从一个主题快速读取,而无需自己设置任何读取器工厂。这与 PulsarListener
背后的想法类似。这是一个快速示例。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
id
属性是可选的,但最好为你的应用程序提供一个有意义的值。如果未指定,将使用自动生成的 ID。另一方面,topics
和 startMessageId
属性是必需的。topics
属性可以是一个单独的主题或一个以逗号分隔的主题列表。startMessageId
属性指示读取器从主题中的特定消息开始。startMessageId
的有效值为 earliest
或 latest
。假设你希望读取器从主题中的任意消息开始,而不是最早或最新的可用消息。在这种情况下,你需要使用 ReaderBuilderCustomizer
来定制 ReaderBuilder
,以便它知道要从哪个正确的 MessageId
开始。
Customizing the ReaderBuilder
你可以在 Spring for Apache Pulsar 中使用 PulsarReaderReaderBuilderCustomizer
来自定义通过 ReaderBuilder
可用的任何字段。你可以提供一个 PulsarReaderReaderBuilderCustomizer
类型的 @Bean
,然后如下使其对 PulsarReader
可用。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序中仅注册有一个 |
Publishing and Consuming Partitioned Topics
在下面的示例中,我们发布到名为 hello-pulsar-partitioned
的主题。这是一个已分区的主题,并且在这个示例中,我们假设该主题已使用三个分区创建。
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
在前面的示例中,我们发布到分区主题,并且我们希望将一些数据段发布到特定分区。如果你将它留给 Pulsar 的默认设置,它将遵循分区分配的循环模式,并且我们希望覆盖它。为此,我们提供了一个带有 send
方法的消息路由器对象。考虑实现的三种消息路由器。FooRouter
总发送数据到分区 0
,BarRouter
发送到分区 1
,BuzzRouter
发送到分区 2
。还要注意,我们现在使用 PulsarTemplate
的 sendAsync
方法,该方法会返回 CompletableFuture
。在运行应用程序时,我们还需要将生产者的 messageRoutingMode
设置为 CustomPartition
(spring.pulsar.producer.message-routing-mode
)。
在消费者端,我们使用了一个具有独占订阅类型的 PulsarListener
。这意味着来自所有分区的将最终出现在同一个消费者中,且没有排序的保证。
如果我们希望每个分区都被一个单独的消费者消费,我们能做些什么?我们可以切换到“故障转移”订阅模式,并添加三个单独的消费者:
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
当你遵循此方法时,一个分区总会被一个专用的消费者消费。
类似地,如果你想使用 Pulsar 的共享消费者类型,可以使用 shared
订阅类型。但是,当你使用 shared
模式时,你会失去任何排序保证,因为一个消费者在另一个消费者有机会之前从所有分区接收消息。
请考虑以下示例:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}