Using Spring for Apache Pulsar
Preface
我们建议将 Spring-Boot-First 方法用于基于 Apache Pulsar 的 Spring 应用,因为它极大地简化了操作。为此,您可以将 |
We recommend using a Spring-Boot-First approach for Spring for Apache Pulsar-based applications, as that simplifies things tremendously.
To do so, you can add the |
本参考的大部分内容都假定读者使用该启动程序,并提供了针对该特定内容的大部分配置说明。但是,有意识强调针对 Spring Boot 启动程序使用的特定说明。 |
The majority of this reference expects the reader to be using the starter and gives most directions for configuration with that in mind. However, an effort is made to call out when instructions are specific to the Spring Boot starter usage. |
Quick Tour
我们将会通过展示一个可以进行生产和使用的 Spring Boot 应用示例,来快速的了解一下 Apache Pulsar 的 Spring 相关特性。这是一个完整的应用程序,并且不需要任何额外的配置,只要你在默认位置 - localhost:6650
上运行着一个 Pulsar 集群。
We will take a quick tour of Spring for Apache Pulsar by showing a sample Spring Boot application that produces and consumes.
This is a complete application and does not require any additional configuration, as long as you have a Pulsar cluster running on the default location - localhost:6650
.
Dependencies
Spring Boot 应用程序只需要 spring-pulsar-spring-boot-starter
依赖。以下清单分别展示了如何为 Maven 和 Gradle 定义依赖项:
Spring Boot applications need only the spring-pulsar-spring-boot-starter
dependency. The following listings show how to define the dependency for Maven and Gradle, respectively:
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
<version>{spring-boot-version}</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar:{spring-boot-version}'
}
当使用 0.2.x 版本
时,以上坐标将发生如下更改:
When using Version 0.2.x
the above coordinates change as follows:
- Maven
-
<dependencies> <dependency> <groupId>org.springframework.pulsar</groupId> <artifactId>spring-pulsar-spring-boot-starter</artifactId> <version>0.2.0</version> </dependency> </dependencies>
- Gradle
-
dependencies { implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0' }
Application Code
以下清单展示了示例的 Spring Boot 应用程序案例:
The following listing shows the Spring Boot application case for the example:
@SpringBootApplication
public class PulsarBootHelloWorld {
public static void main(String[] args) {
SpringApplication.run(PulsarBootHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
}
@PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
void listen(String message) {
System.out.println("Message Received: " + message);
}
}
让我们快速了解一下该应用程序的高级细节。在文档后面,我们将会看到这些更详细的组件。
Let us quickly go through the higher-level details of this application. Later in the documentation we see these components in much more detail.
在前一个示例中,我们严重依赖 Spring Boot 的自动配置。Spring Boot 为我们的应用程序自动配置了几个组件。它为应用程序自动提供了一个 PulsarClient
,该客户端被生产者和使用者同时使用。
In the preceding sample, we heavily rely on Spring Boot auto-configuration.
Spring Boot auto-configures several components for our application.
It automatically provides a PulsarClient
, which is used by both the producer and the consumer, for the application.
Spring Boot 还自动配置了 PulsarTemplate
,将其注入到应用程序中并开始向 Pulsar 主题发送记录。该应用程序会将消息发送到名为 hello-pulsar
的主题。请注意,该应用程序没有指定任何 schema 信息,因为 Apache Pulsar 的 Spring 库会自动从发送数据的类型推断 schema 类型。
Spring Boot also auto-configures PulsarTemplate
, which we inject in the application and start sending records to a Pulsar topic.
The application sends messages to a topic named hello-pulsar
.
Note that the application does not specify any schema information, because Spring for Apache Pulsar library automatically infers the schema type from the type of the data that you send.
我们使用 PulsarListener
注释来获取我们发布了数据的 hello-pulsar
主题。PulsarListener
是一个便利注释,它将 Apache Pulsar 中的消息监听器容器基础设施包装在 Spring 中。它在幕后创建了一个消息监听器容器来创建和管理 Pulsar 消费者。和常规的 Pulsar 消费者一样,在使用 PulsarListener
时,订阅的默认类型是 Exclusive
模式。当记录发布到 hello-pulsar
主题时,PulsarListener
便会使用它们并在控制台上打印出来。框架还会从 PulsarListener
方法用作有效负载的数据类型中推断出所使用的 schema 类型 - 在本例中为 String
。
We use the PulsarListener
annotation to consume from the hello-pulsar
topic where we publish the data.
PulsarListener
is a convenience annotation that wraps the message listener container infrastructure in Spring for Apache Pulsar.
Behind the scenes, it creates a message listener container to create and manage the Pulsar consumer.
As with a regular Pulsar consumer, the default subscription type when using PulsarListener
is the Exclusive
mode.
As records are published in to the hello-pulsar
topic, the Pulsarlistener
consumes them and prints them on the console.
The framework also infers the schema type used from the data type that the PulsarListner
method uses as the payload — String
, in this case.
Pulsar Client
当你使用 Pulsar Spring Boot Starter 时,你会得到自动配置的 PulsarClient
。
When you use the Pulsar Spring Boot Starter, you get the PulsarClient
auto-configured.
默认情况下,该应用程序会尝试连接到 pulsar://localhost:6650
处的本地 Pulsar 实例。可以通过将 spring.pulsar.client.service-url
属性设为不同的值来调整此功能。
By default, the application tries to connect to a local Pulsar instance at pulsar://localhost:6650
.
This can be adjusted by setting the spring.pulsar.client.service-url
property to a different value.
该值必须是一个有效的 {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar 协议] URL |
The value must be a valid {apache-pulsar-docs}/client-libraries-java/#connection-urls[Pulsar Protocol] URL |
您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.client.*
] 应用程序属性来进一步配置客户端。
You can further configure the client by specifying any of the {spring-boot-pulsar-config-props}[spring.pulsar.client.*
] application properties.
如果不使用该启动程序,则需要配置并自己注册 |
If you are not using the starter, you will need to configure and register the |
TLS Encryption (SSL)
默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务进行通信。以下部分介绍如何配置 Pulsar 客户端以使用 TLS 加密(SSL)。一个先决条件是,代理也已经配置为使用 TLS 加密。
By default, Pulsar clients communicate with Pulsar services in plain text. The following section describes how to configure Pulsar clients to use TLS encryption (SSL). A pre-requisite is that the Broker has also been configured to use TLS encryption.
Spring Boot 自动配置目前不支持任何 TLS/SSL 配置属性。相反,你可以提供一个 PulsarClientBuilderCustomizer
,它将在 Pulsar 客户端构建器上设置必要的属性。Pulsar 同时支持隐私增强邮件(PEM)和 Java 密钥库(JKS)证书格式。
The Spring Boot auto-configuration does not currently support any TLS/SSL configuration properties.
You can instead provide a PulsarClientBuilderCustomizer
that sets the necessary properties on the Pulsar client builder.
Pulsar supports both Privacy Enhanced Mail (PEM) and Java KeyStore (JKS) certificate formats.
请按照以下步骤配置 TLS:
Follow these steps to configure TLS:
-
Adjust the Pulsar client service url to use the
pulsar+ssl://
scheme and TLS port (typically6651
). -
Adjust the admin client service url to use the
https://
scheme and TLS web port (typically8443
). -
Provide client builder customizer(s) that sets the relevant properties on the builder.
-
{github}/blob/02730275e8d0291525eed9db5babe880c555a7bd/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/app/SamplePemBasedSslConfig.java#L30-L49[PEM based sample]
-
{github}/blob/02730275e8d0291525eed9db5babe880c555a7bd/integration-tests/src/intTest/java/org/springframework/pulsar/inttest/app/SampleJksBasedSslConfig.java#L30-L57[JKS based sample]
-
您可以在官方 {apache-pulsar-docs}/security-tls-transport/[Pulsar TLS 加密] 文档中找到关于上述内容的更多信息。
You can find more information on the above in the official {apache-pulsar-docs}/security-tls-transport/[Pulsar TLS Encryption] documentation.
Authentication
为了连接到需要认证的 Pulsar 集群,你需要指定要使用的认证插件和指定插件所需的任何参数。在 使用 Spring Boot 自动配置时,你可以通过配置属性设定插件和插件参数(大多数情况下)。
To connect to a Pulsar cluster that requires authentication, you need to specify which authentication plugin to use and any parameters required by the specified plugin. When using Spring Boot auto-configuration, you can set the plugin and the plugin parameters via configuration properties (in most cases).
你需要确保 You need to ensure that names defined under 例如,如果你想为 For example, if you want to configure the issuer url for the |
通过环境变量使用认证参数通常会有问题,因为在转换过程中会丢失大小写敏感性。例如,考虑通过环境变量设置的以下 Using environment variables for auth parameters is typically problematic because the case sensitivity is lost during translation.
For example, consider the following
当 Spring Boot 加载此属性时,它将使用 When Spring Boot loads this property it will use
|
在 不使用 Spring Boot 自动配置时,你可以使用 org.apache.pulsar.client.api.AuthenticationFactory
创建认证,然后在提供给客户端工厂的客户端自定义程序中直接将其设置在 Pulsar 客户端构建器上。
When not using Spring Boot auto-configuration, you can use the org.apache.pulsar.client.api.AuthenticationFactory
to create the authentication and then set it directly on the Pulsar client builder in a client customizer that you provide to the client factory.
下列清单演示了如何配置每个受支持的认证机制。
The following listings show how to configure each of the supported authentication mechanisms.
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz param: tenantDomain: ... tenantService: ... providerDomain: ... privateKey: ... keyId: ...
这也需要 TLS encryption。
This also requires TLS encryption.
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken param: token: some-token-goes-here
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic param: userId: ... password: ...
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 param: issuerUrl: ... privateKey: ... audience: ... scope: ...
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl param: saslJaasClientSectionName: ... serverType: ...
由于此选项需要使用 TLS 加密,这已要求您 provide a client builder customizer,因此建议直接在您提供的 TLS 自定程序中对客户端构建器添加认证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来帮助创建身份验证对象,如下所示:
Because this option requires TLS encryption, which already requires you to provide a client builder customizer, it is recommended to simply add the authentication directly on the client builder in your provided TLS customizer.
You can use the org.apache.pulsar.client.api.AuthenticationFactory
to help create the authentication object as follows:
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");
有关 {apache-pulsar-docs}/security-tls-authentication/#configure-mtls-authentication-in-pulsar-clients[mTLS (PEM)],请参见 Pulsar 官方文档。
See the official Pulsar documentation on {apache-pulsar-docs}/security-tls-authentication/#configure-mtls-authentication-in-pulsar-clients[mTLS (PEM)].
由于此选项需要使用 TLS 加密,这已要求您 provide a client builder customizer,因此建议直接在您提供的 TLS 自定程序中对客户端构建器添加认证。您可以使用 org.apache.pulsar.client.api.AuthenticationFactory
来帮助创建身份验证对象,如下所示:
Because this option requires TLS encryption, which already requires you to provide a client builder customizer, it is recommended to simply add the authentication directly on the client builder in your provided TLS customizer.
You can use the org.apache.pulsar.client.api.AuthenticationFactory
to help create the authentication object as follows:
Authentication auth = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));
有关 {apache-pulsar-docs}/security-tls-authentication/#configure-clients[mTLS (JKS)],请参见 Pulsar 官方文档。
See the official Pulsar documentation on {apache-pulsar-docs}/security-tls-authentication/#configure-clients[mTLS (JKS)].
您可以在官方{apache-pulsar-docs}/security-overview#authentication-providers[Pulsar 安全]文档中找到更多有关每个支持插件的详细信息以及必要的属性。
You can find more information on each of the support plugins and their required properties in the official {apache-pulsar-docs}/security-overview#authentication-providers[Pulsar security] documentation.
Message Production
Pulsar Template
在 Pulsar 产品方,Spring Boot 自动配置提供了一个 PulsarTemplate
来发布记录。该模板实现了名为 PulsarOperations
的一个接口,并提供了通过其契约发布记录的方法。
On the Pulsar producer side, Spring Boot auto-configuration provides a PulsarTemplate
for publishing records. The template implements an interface called PulsarOperations
and provides methods to publish records through its contract.
这些发送 API 方法有两种类型:send
和 sendAsync
。send
方法使用 Pulsar 生产者的同步发送功能来禁止调用。在一个消息在代理上持久化之后,它们会返回已发布的消息的 MessageId
。sendAsync
方法调用是非阻塞的异步调用。它们返回一个 CompletableFuture
,你可以在消息发布之后使用该 CompletableFuture
来异步接收消息 ID。
There are two categories of these send API methods: send
and sendAsync
.
The send
methods block calls by using the synchronous sending capabilities on the Pulsar producer.
They return the MessageId
of the message that was published once the message is persisted on the broker.
The sendAsync
method calls are asynchronous calls that are non-blocking.
They return a CompletableFuture
, which you can use to asynchronously receive the message ID once the messages are published.
对于不包含主题参数的 API 变量,使用 topic resolution process 确定目标主题。 |
For the API variants that do not include a topic parameter, a topic-resolution-process-imperative is used to determine the destination topic. |
Simple API
该模板提供了一些方法(以 'send' 为前缀的 {javadocs}/org/springframework/pulsar/core/PulsarOperations.html)用于简单的发送请求。对于更复杂的发送请求,一个流利的 API 可以让您配置更多选项。
The template provides a handful of methods ({javadocs}/org/springframework/pulsar/core/PulsarOperations.html[prefixed with 'send']) for simple send requests. For more complicated send requests, a fluent API lets you configure more options.
Fluent API
该模板提供一个{javadocs}/org/springframework/pulsar/core/PulsarOperations.html#newMessage(T)[流畅构建器]来处理更复杂的发送请求。
The template provides a {javadocs}/org/springframework/pulsar/core/PulsarOperations.html#newMessage(T)[fluent builder] to handle more complicated send requests.
Message customization
您可以指定 TypedMessageBuilderCustomizer
来配置出站消息。例如,以下代码显示如何发送键入消息:
You can specify a TypedMessageBuilderCustomizer
to configure the outgoing message. For example, the following code shows how to send a keyed message:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
Producer customization
您可以指定 ProducerBuilderCustomizer
来配置最终构建用于发送出站消息的生成器的底层 Pulsar 生成器。
You can specify a ProducerBuilderCustomizer
to configure the underlying Pulsar producer builder that ultimately constructs the producer used to send the outgoing message.
谨慎使用,因为它赋予对生产者构建器的完全访问权限,调用其中某些方法(例如 create
)可能产生预料之外的副作用。
Use with caution as this gives full access to the producer builder and invoking some of its methods (such as create
) may have unintended side effects.
例如,以下代码显示如何禁用批处理并启用块处理:
For example, the following code shows how to disable batching and enable chunking:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
此其他示例显示在向分区主题发布记录时如何使用自定义路由。在 Producer
生成器上指定您的自定义 MessageRouter
实现,例如:
This other example shows how to use custom routing when publishing records to partitioned topics.
Specify your custom MessageRouter
implementation on the Producer
builder such as:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
请注意,在使用 |
Note that, when using a |
此其他示例显示如何在向代理发布之前拦截和修改生成器接收到的消息时添加 ProducerInterceptor
:
This other example shows how to add a ProducerInterceptor
that will intercept and mutate messages received by the producer before being published to the brokers:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
自定义器仅适用于用于发送操作的生成器。如果您想将自定义器应用于所有生成器,则必须按照 Global producer customization 中所述将它们提供给生成器工厂。
The customizer will only apply to the producer used for the send operation. If you want to apply a customizer to all producers, you must provide them to the producer factory as described in Global producer customization.
使用 Lambda 定制器时,必须遵循 “Caution on Lambda customizers” 中描述的规则。
The rules described in “Caution on Lambda customizers” must be followed when using Lambda customizers.
Specifying Schema Information
@6
Specifying Schema Information
如果你使用 Java 原语类型,框架会为你自动检测架构,且你不需要为发布数据指定任何架构类型。对于非原语类型,如果在 PulsarTemplate
上调用发送操作时未明确指定架构,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON
。
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data.
For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the PulsarTemplate
, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE 以及 INLINE 编码。
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding.
Custom Schema Mapping
作为在 PulsarTemplate
上调用发送操作时为复杂类型指定架构的替代方案,可以将架构解析器配置为具有类型的映射。这消除了指定架构的需要,因为框架使用传出消息类型通过解析器进行查询。
As an alternative to specifying the schema when invoking send operations on the PulsarTemplate
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.
@3
Configuration properties
Configuration properties
可以使用 spring.pulsar.defaults.type-mappings
属性配置架构映射。以下示例使用 application.yml
为 User
和 Address
复杂对象添加映射,分别使用 AVRO
和 JSON
架构:
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
|
The |
Schema resolver customizer
添加映射的首选方法是通过上面提到的属性。但是,如果需要更多控制,则可以提供架构解析器自定义器来添加映射。
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
以下示例使用架构解析器自定义器分别为 User
和 Address
复杂对象添加使用 AVRO
和 JSON
架构的映射:
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
@4
Type mapping annotation
Type mapping annotation
为特定消息类型指定要使用的默认架构信息的另一种方法是使用 @PulsarMessage
注释标记消息类。可以将架构信息通过注释的 schemaType
属性指定。
Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage
annotation.
The schema info can be specified via the schemaType
attribute on the annotation.
以下示例将系统配置为使用 JSON 作为在生产或消费 Foo
类型消息时的默认架构:
The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了此配置,无需设置发送操作时的架构。
With this configuration in place, there is no need to set specify the schema on send operations.
Producing with AUTO_SCHEMA
如果没有办法提前了解 Pulsar 主题的架构类型,您可以使用 {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] 架构来安全地发布原始 JSON 或 Avro 负载作为 byte[]
。
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] schema to publish a raw JSON or Avro payload as a byte[]
safely.
在这种情况下,生成器会验证出站字节是否与目标主题的架构兼容。
In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.
只需在模板发送操作中指定 Schema.AUTO_PRODUCE_BYTES()
架构,如下例所示:
Simply specify a schema of Schema.AUTO_PRODUCE_BYTES()
on your template send operations as shown in the example below:
@7
使用 PulsarTemplate,可以将字节数组(userAsBytes)作为消息发送到 Pulsar。PulsarTemplate 会自动确定消息模式,如果未指定明确模式,则使用自动模式生成字节。通过此方法,可以高效地在 Pulsar 中发送字节数据。
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅受 Avro 和 JSON 模式类型支持。 |
This is only supported with Avro and JSON schema types. |
Pulsar Producer Factory
PulsarTemplate`依靠 `PulsarProducerFactory`来实际创建底层生成器。Spring Boot 自动配置还提供了此生成器工厂,您可以通过指定 {spring-boot-pulsar-config-props}[`spring.pulsar.producer.*
] 应用程序属性来进一步配置该工厂。
The PulsarTemplate
relies on a PulsarProducerFactory
to actually create the underlying producer.
Spring Boot auto-configuration also provides this producer factory which you can further configure by specifying any of the {spring-boot-pulsar-config-props}[spring.pulsar.producer.*
] application properties.
如果在直接使用生产者工厂 API 时未指定主题信息,则与 |
If topic information is not specified when using the producer factory APIs directly, the same topic-resolution-process-imperative used by the |
Global producer customization
该框架提供了 ProducerBuilderCustomizer
合约,允许您配置用于构建每个生成器的底层生成器。要自定义所有生成器,您可以将自定义器列表传递给`PulsarProducerFactory` 构造函数。使用多个自定义器时,它们将按照在列表中出现的顺序应用。
The framework provides the ProducerBuilderCustomizer
contract which allows you to configure the underlying builder which is used to construct each producer.
To customize all producers, you can pass a list of customizers into the PulsarProducerFactory
constructor.
When using multiple customizers, they are applied in the order in which they appear in the list.
如果您使用 Spring Boot 自动配置,则可以将定制器指定为 Bean,并将根据其 |
If you use Spring Boot auto-configuration, you can specify the customizers as beans and they will be passed automatically to the |
如果您只想将自定义器应用于单个生成器,则可以使用 Fluent API 和 [在发送时指定自定义器,single-producer-customize]。
If you want to apply a customizer to just a single producer, you can use the Fluent API and single-producer-customize.
Pulsar Producer Caching
每个底层 Pulsar 生成器都会使用资源。为了提高性能并避免持续创建生成器,生成器工厂会缓存其创建的生成器。它们以 LRU 方式进行缓存,并在配置的时间段内未使用它们时将其逐出。 cache key仅包含足够的信息来确保在后续创建请求中向调用者返回相同的生成器。
Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the producer factory caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period. The cache key is composed of just enough information to ensure that callers are returned the same producer on subsequent creation requests.
此外,您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*
] 应用程序属性来配置缓存设置。
Additionally, you can configure the cache settings by specifying any of the {spring-boot-pulsar-config-props}[spring.pulsar.producer.cache.*
] application properties.
Caution on Lambda customizers
任何用户提供的生产者自定义项也包含在高速缓存键中。由于高速缓存键依赖于 equals/hashCode
的有效实现,所以在使用 Lambda 自定义项时必须谨慎。
Any user-provided producer customizers are also included in the cache key.
Because the cache key relies on a valid implementation of equals/hashCode
, one must take caution when using Lambda customizers.
RULE: 作为 Lambda 实现的两个定制器将匹配在 equals/hashCode
if and only if 上,它们使用相同的 Lambda 实例,且不需要在封闭之外定义任何变量。
RULE: Two customizers implemented as Lambdas will match on equals/hashCode
if and only if they use the same Lambda instance and do not require any variable defined outside its closure.
为了阐明上述规则,我们将看几个示例。在以下示例中,自定义项被定义为内联 Lambda,这意味着每次对 sendUser
的调用都使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它*将*匹配为高速缓存键。
To clarify the above rule we will look at a few examples.
In the following example, the customizer is defined as an inline Lambda which means that each call to sendUser
uses the same Lambda instance. Additionally, it requires no variable outside its closure. Therefore, it will match as a cache key.
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在此下一个案例中,自定义项被定义为内联 Lambda,这意味着每次对 sendUser
的调用都使用相同的 Lambda 实例。但是,它需要闭包之外的变量。因此,它*不会*匹配为高速缓存键。
In this next case, the customizer is defined as an inline Lambda which means that each call to sendUser
uses the same Lambda instance. However, it requires a variable outside its closure. Therefore, it will not match as a cache key.
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在最后这个示例中,自定义项被定义为内联 Lambda,意味着每次对 sendUser
的调用都使用相同的 Lambda 实例。虽然它确实使用变量名,但是它不是从其闭包外部产生的,因此*将*匹配为高速缓存键。这说明变量可以在 Lambda 闭包*内*使用,甚至可以对静态方法进行调用。
In this final example, the customizer is defined as an inline Lambda which means that each call to sendUser
uses the same Lambda instance. While it does use a variable name, it does not originate outside its closure and therefore will match as a cache key.
This illustrates that variables can be used within the Lambda closure and can even make calls to static methods.
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
RULE: 如果没有 once and only once 定义您的 Lambda 定制器(在后续调用中会使用相同的实例) OR,则需要使用有效 equals/hashCode
实现提供定制器实现,且该实现必须在封闭之外定义变量。
RULE: If your Lambda customizer is not defined once and only once (the same instance is used on subsequent calls) OR it requires variable(s) defined outside its closure then you must provide a customizer implementation with a valid equals/hashCode
implementation.
如果未遵循这些规则,则生产者缓存将始终错过,并且您的应用程序性能将受到负面影响。
If these rules are not followed then the producer cache will always miss and your application performance will be negatively affected.
Intercept Messages on the Producer
添加 ProducerInterceptor
允许您拦截和更改在发布到代理之前由生产者接收到的消息。要这样做,您可以将拦截器列表传递到 PulsarTemplate
构造函数中。在使用多个拦截器时,将其应用的顺序是它们在列表中的出现顺序。
Adding a ProducerInterceptor
lets you intercept and mutate messages received by the producer before they are published to the brokers.
To do so, you can pass a list of interceptors into the PulsarTemplate
constructor.
When using multiple interceptors, the order they are applied in is the order in which they appear in the list.
如果您使用 Spring Boot 自动配置,则可以将拦截器指定为 Bean。它们会自动传递到 PulsarTemplate
中。使用 @Order
注释按如下方式对拦截器排序:
If you use Spring Boot auto-configuration, you can specify the interceptors as Beans.
They are passed automatically to the PulsarTemplate
.
Ordering of the interceptors is achieved by using the @Order
annotation as follows:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果您未使用该启动器,您将需要自己配置并注册上述组件。 |
If you are not using the starter, you will need to configure and register the aforementioned components yourself. |
Message Consumption
Pulsar Listener
当涉及到 Pulsar 消费者时,我们建议终端用户应用程序使用 PulsarListener
注释。要使用 PulsarListener
,您需要使用 @EnablePulsar
注释。当您使用 Spring Boot 支持时,它会自动启用此注释并配置 PulsarListener
所需的所有组件,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer
使用 PulsarConsumerFactory
创建和管理底层 Pulsar 消费者,该底层 Pulsar 消费者用来消费消息。
When it comes to Pulsar consumers, we recommend that end-user applications use the PulsarListener
annotation.
To use PulsarListener
, you need to use the @EnablePulsar
annotation.
When you use Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener
, such as the message listener infrastructure (which is responsible for creating the Pulsar consumer).
PulsarMessageListenerContainer
uses a PulsarConsumerFactory
to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages.
Spring Boot 提供此消费者工厂,您可以通过指定 {spring-boot-pulsar-config-props}[已配置属性的`spring.pulsar.consumer.`] application properties. *Most] 来进一步配置该工厂,在使用以下 *exceptions*的侦听器中尊重工厂中的属性:
Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[spring.pulsar.consumer.
] application properties.
*Most of the configured properties on the factory will be respected in the listener with the following exceptions:
|
The |
|
The |
让我们重新审视我们在快速浏览部分中看到的 PulsarListener
代码片段:
Let us revisit the PulsarListener
code snippet we saw in the quick-tour section:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
You can further simplify this method:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在此最基本的形式中,当 subscriptionName
未在 @PulsarListener
注释中提供时,将使用自动生成的订阅名称。同样,当未直接提供 topics
时,将使用 [主题解析过程,主题解析过程-命令式] 来确定目标主题。
In this most basic form, when the subscriptionName
is not provided on the @PulsarListener
annotation an auto-generated subscription name will be used.
Likewise, when the topics
are not directly provided, a topic-resolution-process-imperative is used to determine the destination topic.
在前面显示的 PulsarListener
方法中,我们以 String
的形式接收数据,但是我们未指定任何模式类型。在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需类型。该框架会检测到您期望 String
类型,然后根据该信息推断模式类型并将该模式提供给消费者。该框架对所有基本类型执行此推断。对于所有非基本类型,默认模式假定为 JSON。如果复杂类型使用除 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须在使用 schemaType
属性的注释中提供模式类型。
In the PulsarListener
method shown earlier, we receive the data as String
, but we do not specify any schema types.
Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type.
The framework detects that you expect the String
type and then infers the schema type based on that information and provides that schema to the consumer.
The framework does this inference for all primitive types.
For all non-primitive types the default schema is assumed to be JSON.
If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the schemaType
property.
以下示例显示了另一个 PulsarListener
方法,它使用 Integer
:
The following example shows another PulsarListener
method, which takes an Integer
:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下 PulsarListener
方法显示了我们如何从主题消费复杂类型:
The following PulsarListener
method shows how we can consume complex types from a topic:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们再看一些方法。
Let us look at a few more ways.
您可以直接消费 Pulsar 消息:
You can consume the Pulsar message directly:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
以下示例使用 Spring 消息信封来消费记录:
The following example consumes the record by using the Spring messaging envelope:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量消费记录。以下示例使用 PulsarListener
以 POJO 的形式批量消费记录:
Now let us see how we can consume records in batches.
The following example uses PulsarListener
to consume records in batches as POJOs:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此例中,我们接收记录作为对象的集合 (List
)。此外,为了在 PulsarListener
级别启用批量使用,需要将注解上的 batch
属性设置为 true
。
Note that, in this example, we receive the records as a collection (List
) of objects.
In addition, to enable batch consumption at the PulsarListener
level, you need to set the batch
property on the annotation to true
.
框架会根据 List
所保存的实际类型来尝试推断要使用的模式。如果 List
中包含 JSON 之外的复杂类型,则仍需要在 PulsarListener
上提供 schemaType
。
Based on the actual type that the List
holds, the framework tries to infer the schema to use.
If the List
contains a complex type besides JSON, you still need to provide the schemaType
on PulsarListener
.
以下内容使用 Pulsar Java 客户端提供的 消息
封装:
The following uses the Message
envelope provided by the Pulsar Java client:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用 Spring 消息 Message
类型的封装来批量使用记录:
The following example consumes batch records with an envelope of the Spring messaging Message
type:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,还可以针对批量监听器使用来自 Pulsar 的 Messages
持有者对象:
Finally, you can also use the Messages
holder object from Pulsar for the batch listener:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
在使用 PulsarListener
时,可以在注解本身上直接提供 Pulsar 消费者属性。在不想使用前面提到的引导配置属性或有多个 PulsarListener
方法时,这是非常方便的。
When you use PulsarListener
, you can provide Pulsar consumer properties directly on the annotation itself.
This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple PulsarListener
methods.
以下示例直接在 PulsarListener
上使用 Pulsar 消费者属性:
The following example uses Pulsar consumer properties directly on PulsarListener
:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
所使用的属性是直接 Pulsar 消费者属性,而不是 |
The properties used are direct Pulsar consumer properties, not the |
Generic records with AUTO_CONSUME
如果无法提前知晓 Pulsar 主题的模式类型,则可以使用 AUTO_CONSUME
模式类型来使用常规记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord
对象。
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the AUTO_CONSUME
schema type to consume generic records.
In this case, the topic deserializes messages into GenericRecord
objects using the schema info associated with the topic.
若要使用常规记录,请在 @PulsarListener
上将 schemaType = SchemaType.AUTO_CONSUME
设置为以下所示内容,并且将类型为 GenericRecord
的 Pulsar 信息用作信息参数。
To consume generic records set the schemaType = SchemaType.AUTO_CONSUME
on your @PulsarListener
and use a Pulsar message of type GenericRecord
as the message parameter as shown below.
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
|
The |
Customizing the ConsumerBuilder
可以使用 PulsarListenerConsumerBuilderCustomizer
自定义任何可通过 ConsumerBuilder
获得的字段,方法是向其提供 PulsarListenerConsumerBuilderCustomizer
类型的 @Bean
, 然后将其提供给 PulsarListener
,如下所示。
You can customize any fields available through ConsumerBuilder
using a PulsarListenerConsumerBuilderCustomizer
by providing a @Bean
of type PulsarListenerConsumerBuilderCustomizer
and then making it available to the PulsarListener
as shown below.
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个 |
If your application only has a single |
@1
Specifying Schema Information
Specifying Schema Information
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出在 PulsarListener
上使用的正确模式。对于非原语类型,如果在注解中未明确指定 Schema,Spring for Apache Pulsar 框架将尝试根据类型构建 Schema.JSON
。
As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the PulsarListener
.
For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE 和 INLINE 编码。
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding.
Custom Schema Mapping
作为在 PulsarListener
上为复杂类型指定架构的替代方案,可以使用映射为类型的架构解析器进行配置。这消除了在侦听器上设置架构的需要,因为框架使用传入消息类型通过解析器进行查询。
As an alternative to specifying the schema on the PulsarListener
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.
@3
Configuration properties
Configuration properties
可以使用 spring.pulsar.defaults.type-mappings
属性配置架构映射。以下示例使用 application.yml
为 User
和 Address
复杂对象添加映射,分别使用 AVRO
和 JSON
架构:
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
|
The |
Schema resolver customizer
添加映射的首选方法是通过上面提到的属性。但是,如果需要更多控制,则可以提供架构解析器自定义器来添加映射。
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
以下示例使用架构解析器自定义器分别为 User
和 Address
复杂对象添加使用 AVRO
和 JSON
架构的映射:
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
@4
Type mapping annotation
Type mapping annotation
为特定消息类型指定要使用的默认架构信息的另一种方法是使用 @PulsarMessage
注释标记消息类。可以将架构信息通过注释的 schemaType
属性指定。
Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage
annotation.
The schema info can be specified via the schemaType
attribute on the annotation.
以下示例将系统配置为使用 JSON 作为在生产或消费 Foo
类型消息时的默认架构:
The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
实施此配置后,无需在监听器上设置模式,例如:
With this configuration in place, there is no need to set the schema on the listener, for example:
@5
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
Accessing the Pulsar Consumer Object
有时,您需要直接访问 Pulsar Consumer 对象。以下示例展示了如何获取它:
Sometimes, you need direct access to the Pulsar Consumer object. The following example shows how to get it:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
当以这种方式访问 Consumer
对象时,不要调用任何会通过调用任何接收方法来更改消费者游标位置的操作。所有此类操作都必须由容器完成。
When accessing the Consumer
object this way, do NOT invoke any operations that would change the Consumer’s cursor position by invoking any receive methods.
All such operations must be done by the container.
Pulsar Message Listener Container
现在,我们通过 PulsarListener
查看了消费者端的基本交互。现在,让我们深入了解 PulsarListener
如何与底层 Pulsar 消费者交互。请记住,对于最终用户应用程序,大多数情况下,我们建议直接使用 PulsarListener
注释从 Pulsar 主题使用 Spring for Apache Pulsar 进行使用,因为该模型涵盖了广泛的应用程序用例。但是,了解 PulsarListener
在内部如何工作非常重要。本部分将介绍这些详细信息。
Now that we saw the basic interactions on the consumer side through PulsarListener
. Let us now dive into the inner workings of how PulsarListener
interacts with the underlying Pulsar consumer.
Keep in mind that, for end-user applications, in most scenarios, we recommend using the PulsarListener
annotation directly for consuming from a Pulsar topic when using Spring for Apache Pulsar, as that model covers a broad set of application use cases.
However, it is important to understand how PulsarListener
works internally. This section goes through those details.
正如前面简要提到的,当您使用 Spring for Apache Pulsar 时,消息监听器容器是消息使用核心。PulsarListener
在后台使用消息监听器容器来创建和管理 Pulsar 消费者。Spring for Apache Pulsar 通过 PulsarMessageListenerContainer
提供此消息监听器容器的合同。此消息监听器容器的默认实现通过 DefaultPulsarMessageListenerContainer
提供。正如它的名称所示,PulsarMessageListenerContainer
包含消息监听器。容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。数据由所提供的消息监听器实现进行处理。
As briefly mentioned earlier, the message listener container is at the heart of message consumption when you use Spring for Apache Pulsar.
PulsarListener
uses the message listener container infrastructure behind the scenes to create and manage the Pulsar consumer.
Spring for Apache Pulsar provides the contract for this message listener container through PulsarMessageListenerContainer
.
The default implementation for this message listener container is provided through DefaultPulsarMessageListenerContainer
.
As its name indicates, PulsarMessageListenerContainer
contains the message listener.
The container creates the Pulsar consumer and then runs a separate thread to receive and handle the data.
The data is handled by the provided message listener implementation.
消息监听器容器使用消费者的 batchReceive
方法批量使用数据。接收到数据后,会将其传递给选定的消息监听器实现。
The message listener container consumes the data in batch by using the consumer’s batchReceive
method.
Once data is received, it is handed over to the selected message listener implementation.
在使用 Spring for Apache Pulsar 时,可以使用以下消息监听器类型。
The following message listener types are available when you use Spring for Apache Pulsar.
我们将在以下部分中了解有关这些各种消息监听器的详细信息。
We see the details about these various message listeners in the following sections.
但是,在这样做之前,让我们仔细了解容器本身。
Before doing so, however, let us take a closer look at the container itself.
DefaultPulsarMessageListenerContainer
这是一个基于单个消费者的消息侦听器容器。以下清单显示了其构造函数:
This is a single consumer-based message listener container. The following listing shows its constructor:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它接收 PulsarConsumerFactory
(用于创建消费者)和 PulsarContainerProperties
对象(其中包含有关容器属性的信息)。PulsarContainerProperties
具有以下构造函数:
It receives a PulsarConsumerFactory
(which it uses to create the consumer) and a PulsarContainerProperties
object (which contains information about the container properties).
PulsarContainerProperties
has the following constructors:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过 PulsarContainerProperties
提供主题信息,或作为传输到消费者工厂的消费者属性。以下示例使用 DefaultPulsarMessageListenerContainer
:
You can provide the topic information through PulsarContainerProperties
or as a consumer property that is provided to the consumer factory.
The following example uses the DefaultPulsarMessageListenerContainer
:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
当使用监听器容器直接指定主题信息时,同一个 topic resolution process 由 |
If topic information is not specified when using the listener containers directly, the same topic-resolution-process-imperative used by the |
DefaultPulsarMessageListenerContainer
仅创建单个消费者。如果您希望有多个消费者通过多个线程进行管理,则需要使用 ConcurrentPulsarMessageListenerContainer
。
DefaultPulsarMessageListenerContainer
creates only a single consumer.
If you want to have multiple consumers managed through multiple threads, you need to use ConcurrentPulsarMessageListenerContainer
.
ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer
具有以下构造函数:
ConcurrentPulsarMessageListenerContainer
has the following constructor:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer
允许您通过 setter 指定 concurrency
属性。仅允许非独占订阅(failover
、shared
和 key-shared
)中的并发大于 1
。当您具有独占订阅模式时,只能将并发设置为默认的 1
。
ConcurrentPulsarMessageListenerContainer
lets you specify a concurrency
property through a setter.
Concurrency of more than 1
is allowed only on non-exclusive subscriptions (failover
, shared
, and key-shared
).
You can only have the default 1
for concurrency when you have an exclusive subscription mode.
以下示例通过 PulsarListener
注释为 failover
订阅启用了 concurrency
。
The following example enables concurrency
through the PulsarListener
annotation for a failover
subscription.
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前述侦听器中,假设主题 my-topic
有三个分区。如果它是一个非分区主题,则将并发设置为 3
无效。除了主要的活动消费者之外,您还会有两个空闲消费者。如果主题有三个以上的分区,则消息将在容器创建的消费者之间进行负载均衡。如果您运行此 PulsarListener
,您会看到来自不同分区的消息是通过不同的消费者使用前述示例中的线程名称和消费者名称输出消耗的。
In the preceding listener, it is assumed that the topic my-topic
has three partitions.
If it is a non-partitioned topic, having concurrency set to 3
does nothing. You get two idle consumers in addition to the main active one.
If the topic has more than three partitions, messages are load-balanced across the consumers that the container creates.
If you run this PulsarListener
, you see that messages from different partitions are consumed through different consumers, as implied by the thread name and consumer names printouts in the preceding example.
当您以这种方式在分区主题上使用 |
When you use the |
以下清单展示了 PulsarListener
的另一个示例,但启用了 Shared
订阅和 concurrency
。
The following listing shows another example of PulsarListener
, but with Shared
subscription and concurrency
enabled.
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前述示例中,PulsarListener
创建五个不同的消费者(这次我们假设主题有五个分区)。
In the preceding example, the PulsarListener
creates five different consumers (this time, we assume that the topic has five partitions).
在这个版本中,没有消息排序,因为 |
In this version, there is no message ordering, as |
如果您需要消息排序并且仍然需要共享订阅类型,则需要使用 Key_Shared
订阅类型。
If you need message ordering and still want a shared subscription types, you need to use the Key_Shared
subscription type.
Message Consumption
让我们看看消息侦听器容器如何同时启用单记录和基于批处理的消息消耗。
Let us take a look at how the message listener container enables both single-record and batch-based message consumption.
让我们重新审视一下我们为此次讨论而提供的基本 PulsarListener
:
Let us revisit our basic PulsarListener
for the sake of this discussion:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
使用此 PulsarListener
方法,我们实际上要求 Spring for Apache Pulsar 每一次使用单条记录来调用侦听器方法。我们提到消息侦听器容器使用消费者的 batchReceive
方法以批处理方式使用数据。在此情形下,框架检测到 PulsarListener
接收单条记录。这意味着在每次调用此方法时,它需要一条记录。虽然消息侦听器容器以批处理方式使用记录,但它会遍历已收到的批处理并通过 PulsarRecordMessageListener
适配器调用侦听器方法。如您在前一节所见,PulsarRecordMessageListener
扩展自 Pulsar Java 客户端提供的 MessageListener
,并且支持基本 received
方法。
With this PulsarListener
method, we essential ask Spring for Apache Pulsar to invoke the listener method with a single record each time.
We mentioned that the message listener container consumes the data in batches using the batchReceive
method on the consumer.
The framework detects that the PulsarListener
, in this case, receives a single record. This means that, on each invocation of the method, it needs a singe record.
Although the records are consumed by the message listener container in batches, it iterates through the received batch and invokes the listener method through an adapter for PulsarRecordMessageListener
.
As you can see in the previous section, PulsarRecordMessageListener
extends from the MessageListener
provided by the Pulsar Java client, and it supports the basic received
method.
以下示例显示了 PulsarListener
以批处理方式使用记录:
The following example shows the PulsarListener
consuming records in batches:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用此类型的 PulsarListener
时,框架会检测到您处于批处理模式。因为它已经使用消费者的 batchReceive
方法以批处理方式接收数据,所以它将整个批处理通过 PulsarBatchMessageListener
适配器传递到侦听器方法。
When you use this type of PulsarListener
, the framework detects that you are in batch mode.
Since it already received the data in batches by using the Consumer’s batchReceive
method, it hands off the entire batch to the listener method through an adapter for PulsarBatchMessageListener
.
Pulsar Headers
Pulsar 消息元数据可以作为 Spring 消息头被使用。可用的标头列表可以在{github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java]中找到。
The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in {github}/blob/main/spring-pulsar/src/main/java/org/springframework/pulsar/support/PulsarHeaders.java[PulsarHeaders.java].
Accessing in Single Record based Consumer
以下示例展示了如何在一使用单条记录使用模式的应用程序中访问各种 Pulsar 头:
The following example shows how you can access the various Pulsar Headers in an application that uses the single record mode of consuming:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前述示例中,我们访问 messageId
和 rawData
消息元数据的值,以及一个名为 foo
的自定义消息属性。Spring @Header
注释用于每个头字段。
In the preceding example, we access the values for the messageId
and rawData
message metadata as well as a custom message property named foo
.
The Spring @Header
annotation is used for each header field.
您还可以将 Pulsar 的 Message
用作承载负载的信封。在此过程中,用户可以直接调用 Pulsar 消息上的对应方法来检索元数据。但是,为了方便起见,您也可以使用 Header
注释来检索它。请注意,您还可以使用 Spring 消息 Message
信封来承载负载,然后通过 @Header
检索 Pulsar 头。
You can also use Pulsar’s Message
as the envelope to carry the payload.
When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata.
However, as a convenience, you can also retrieve it by using the Header
annotation.
Note that you can also use the Spring messaging Message
envelope to carry the payload and then retrieve the Pulsar headers by using @Header
.
Accessing in Batch Record based Consumer
在本节中,我们了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 头部:
In this section, we see how to access the various Pulsar Headers in an application that uses a batch consumer:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前一个示例中,我们以 List<String>
的形式消费数据。在提取各种头部时,我们也以 List<>
的形式进行提取。针对 Apache Pulsar 的 Spring 确保头部列表与数据列表相对应。
In the preceding example, we consume the data as a List<String>
.
When extracting the various headers, we do so as a List<>
as well.
Spring for Apache Pulsar ensures that the headers list corresponds to the data list.
当使用批处理侦听器并将有效负载接收为 List<org.apache.pulsar.client.api.Message<?>
, org.apache.pulsar.client.api.Messages<?>
, 或 org.springframework.messaging.Messsge<?>
时,你也可以以相同的方式提取头部。
You can also extract headers in the same manner when you use the batch listener and receive payloads as List<org.apache.pulsar.client.api.Message<?>
, org.apache.pulsar.client.api.Messages<?>
, or org.springframework.messaging.Messsge<?>
.
Message Acknowledgment
当使用针对 Apache Pulsar 的 Spring 时,除非应用程序选择退出,否则消息应答由框架处理。在本节中,我们详细了解框架如何处理消息应答。
When you use Spring for Apache Pulsar, the message acknowledgment is handled by the framework, unless opted out by the application. In this section, we go through the details of how the framework takes care of message acknowledgment.
Message ACK modes
针对 Apache Pulsar 的 Spring 提供以下消息应答模式:
Spring for Apache Pulsar provides the following modes for acknowledging messages:
-
BATCH
-
RECORD
-
MANUAL
BATCH
应答模式为默认模式,但你可以在消息侦听器容器上对其进行更改。在以下部分中,我们将了解在同时使用 PulsarListener
的单版本和批处理版本时应答如何运作,以及它们如何转换为支持的消息侦听器容器(最终转换为 Pulsar 消费者)。
BATCH
acknowledgment mode is the default, but you can change it on the message listener container.
In the following sections, we see how acknowledgment works when you use both single and batch versions of PulsarListener
and how they translate to the backing message listener container (and, ultimately, to the Pulsar consumer).
Automatic Message Ack in Single Record Mode
我们回顾一下基于单个基本消息的 PulsarListener
:
Let us revisit our basic single message based PulsarListener
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
如果你熟悉直接使用 Pulsar 消费者,那么自然会好奇在使用 PulsarListener
时应答如何运作。答案归结为消息侦听器容器,因为它是针对 Apache Pulsar 的 Spring 中协调所有消费者相关活动的核心位置。
It is natural to wonder, how acknowledgment works when you use PulsarListener
, especially if you are familiar with using Pulsar consumer directly.
The answer comes down to the message listener container, as that is the central place in Spring for Apache Pulsar that coordinates all the consumer related activities.
假设你没有覆盖默认行为,则在使用上述 PulsarListener
时幕后将发生以下情况:
Assuming you are not overriding the default behavior, this is what happens behind the scenes when you use the preceding PulsarListener
:
-
First, the listener container receives messages as batches from the Pulsar consumer.
-
The received messages are handed down to
PulsarListener
one message at a time. -
When all the records are handed down to the listener method and successfully processed, the container acknowledges all the messages from the original batch.
这是正常流程。如果原始批处理中的任何记录抛出异常,针对 Apache Pulsar 的 Spring 将分别跟踪这些记录。在批处理中的所有记录都经过处理后,针对 Apache Pulsar 的 Spring 将应答所有成功消息并对所有失败消息进行否定应答 (nack)。换句话说,在使用 PulsarRecordMessageListener
消费单个记录且使用 BATCH
的默认应答模式时,该框架将等待从 batchReceive
调用接收的所有记录都成功处理,然后在 Pulsar 消费者上调用 acknowledge
方法。如果在调用处理程序方法时任何特定记录抛出异常,针对 Apache Pulsar 的 Spring 将跟踪这些记录并在处理整个批处理后对这些记录分别调用 negativeAcknowledge
。
This is the normal flow. If any records from the original batch throw an exception, Spring for Apache Pulsar track those records separately.
When all the records from the batch are processed, Spring for Apache Pulsar acknowledges all the successful messages and negatively acknowledges (nack) all the failed messages.
In other words, when consuming single records by using PulsarRecordMessageListener
and the default ack mode of BATCH
is used, the framework waits for all the records received from the batchReceive
call to process successfully and then calls the acknowledge
method on the Pulsar consumer.
If any particular record throws an exception when invoking the handler method, Spring for Apache Pulsar tracks those records and separately calls negativeAcknowledge
on those records after the entire batch is processed.
如果应用程序希望应答或否定应答按记录发生,则可以启用 RECORD
应答模式。在这种情况下,在处理每条记录后,如果没有错误则应答消息,如果有错误则否定应答。以下示例在 Pulsar 侦听器上启用 RECORD
应答模式:
If the application wants the acknowledgment or negative acknowledgment to occur per record, the RECORD
ack mode can be enabled.
In that case, after handling each record, the message is acknowledged if no error and negatively acknowledged if there was an error.
The following example enables RECORD
ack mode on the Pulsar listener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
Manual Message Ack in Single Record Mode
你可能并不总是希望框架发送应答,而希望直接从应用程序本身发送应答。针对 Apache Pulsar 的 Spring 提供了几种启用手动消息应答的方法。以下示例展示了其中一种方法:
You might not always want the framework to send acknowledgments but, rather, do that directly from the application itself. Spring for Apache Pulsar provides a couple of ways to enable manual message acknowledgments. The following example shows one of them:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
这里有一些事情值得解释。首先,我们通过在 PulsarListener
上设置 ackMode
来启用手动应答模式。启用手动应答模式时,针对 Apache Pulsar 的 Spring 允许应用程序注入一个 Acknowledgment
对象。该框架通过选择兼容的消息侦听器容器来实现这一目标:基于单个记录的消费的 PulsarAcknowledgingMessageListener
,它允许你访问 Acknowledgment
对象。
A few things merit explanation here. First, we enablE manual ack mode by setting ackMode
on PulsarListener
.
When enabling manual ack mode, Spring for Apache Pulsar lets the application inject an Acknowledgment
object.
The framework achieves this by selecting a compatible message listener container: PulsarAcknowledgingMessageListener
for single record based consumption, which gives you access to an Acknowledgment
object.
Acknowledgment
对象提供以下 API 方法:
The Acknowledgment
object provides the following API methods:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
在使用 MANUAL
应答模式的同时,你可以将此 Acknowledgment
对象注入到你的 PulsarListener
中,然后调用相应的方法之一。
You can inject this Acknowledgment
object into your PulsarListener
while using MANUAL
ack mode and then call one of the corresponding methods.
在上述 PulsarListener
示例中,我们调用了一个无参数的 acknowledge
方法。这是因为框架知道它当前在哪个 Message
下操作。在调用 acknowledge()
时,你不必通过 Message
信封接收有效负载,而应使用目标类型——在本例中为 String
。你还可以通过提供消息 ID 来调用 acknowledge
的不同变体:acknowledge.acknowledge(message.getMessageId());`在使用 `acknowledge(messageId)
时,你必须使用 Message<?>
信封接收有效负载。
In the preceding PulsarListener
example, we call a parameter-less acknowledge
method.
This is because the framework knows which Message
it is currently operating under.
When calling acknowledge()
, you need not receive the payload with the Message
enveloper` but, rather, use the target type — String
, in this example.
You can also call a different variant of acknowledge
by providing the message ID: acknowledge.acknowledge(message.getMessageId());
When you use acknowledge(messageId)
, you must receive the payload by using the Message<?>
envelope.
与应答类似,Acknowledgment
API 也提供否定应答选项。请参阅前面显示的 nack 方法。
Similar to what is possible for acknowledging, the Acknowledgment
API also provides options for negatively acknowledging.
See the nack methods shown earlier.
你也可以直接在 Pulsar 消费者上调用 acknowledge
:
You can also call acknowledge
directly on the Pulsar consumer:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
在直接在基础消费者上调用 acknowledge
时,你需要自己进行错误处理。使用 Acknowledgment
不需要这样做,因为框架可以为你完成此操作。因此,在使用手动应答时,你应该使用 Acknowledgment
对象方法。
When calling acknowledge
directly on the underlying consumer, you need to do error handling by yourself.
Using the Acknowledgment
does not require that, as the framework can do that for you.
Therefore, you should use the Acknowledgment
object approach when using manual acknowledgment.
当使用手动确认时,理解框架完全保留所有确认非常重要。因此,在设计应用程序时,仔细考虑正确的确认策略非常重要。
When using manual acknowledgment, it is important to understand that the framework completely stays from any acknowledgment at all. Hence, it is extremely important to think through the right acknowledgment strategies when designing applications.
Automatic Message Ack in Batch Consumption
当以批处理形式消费记录(请参阅“Message ACK modes”)且使用 BATCH
的默认应答模式时,在成功处理整个批处理后,将对整个批处理进行应答。如果任何记录抛出异常,将对整个批处理进行否定应答。请注意,这可能与在生产者端批处理的批处理不同。相反,这是从在消费者上调用 batchReceive
返回的批处理
When you consume records in batches (see “Message ACK modes”) and you use the default ack mode of BATCH
is used, when the entire batch is processed successfully, the entire batch is acknowledged.
If any records throw an exception, the entire batch is negatively acknowledged.
Note that this may not be the same batch that was batched on the producer side. Rather, this is the batch that returned from calling batchReceive
on the consumer
考虑以下批处理侦听器:
Consider the following batch listener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合(在本例中为消息)中的所有消息均处理完毕后,框架将确认所有消息。
When all the messages in the incoming collection (messages
in this example) are processed, the framework acknowledges all of them.
在批处理模式中使用时,“RECORD”不是允许的确认模式。这可能会引起问题,因为应用程序可能不希望整个批处理再次重新传达。在这种情况下,您需要使用`MANUAL`确认模式。
When consuming in batch mode, RECORD
is not an allowed ack mode.
This might cause an issue, as an application may not want the entire batch to be re-delivered again.
In such situations, you need to use the MANUAL
acknowledgement mode.
Manual Message Ack in Batch Consumption
如在上一节中所见,当“MANUAL”确认模式在消息侦听器容器上设置时,框架不会执行任何确认,无论为正还是负。它完全取决于应用程序来处理此类问题。当设置`MANUAL`确认模式时,Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:`PulsarBatchAcknowledgingMessageListener`用于批处理使用,它允许您访问“确认”对象。以下是“确认”API 中提供的方法:
As seen in the previous section, when MANUAL
ack mode is set on the message listener container, the framework does not do any acknowledgment, positive or negative.
It is entirely up to the application to take care of such concerns.
When MANUAL
ack mode is set, Spring for Apache Pulsar selects a compatible message listener container: PulsarBatchAcknowledgingMessageListener
for batch consumption, which gives you access to an Acknowledgment
object.
The following are the methods available in the Acknowledgment
API:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
您可以将此“确认”对象注入到`PulsarListener`中,同时使用`MANUAL`确认模式。以下清单显示了基于批处理的侦听器的基本示例:
You can inject this Acknowledgment
object into your PulsarListener
while using MANUAL
ack mode.
The following listing shows a basic example for a batch based listener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
当您使用批处理侦听器时,消息侦听器容器无法得知它当前正在对哪条记录执行操作。因此,若要手动确认,您需要使用接受`MessageId`或`List<MessageId>`的重载`acknowledge`方法。您还可以对批处理侦听器的`MessageId`进行否定确认。
When you use a batch listener, the message listener container cannot know which record it is currently operating upon.
Therefore, to manually acknowledge, you need to use one of the overloaded acknowledge
method that takes a MessageId
or a List<MessageId>
.
You can also negatively acknowledge with the MessageId
for the batch listener.
Message Redelivery and Error Handling
现在我们已经了解了`PulsarListener`和消息侦听器容器基础架构及其各种功能,接下来让我们尝试理解消息重新投递和错误处理。Apache Pulsar 为消息重新投递和错误处理提供了各种本地策略。我们将仔细观察这些策略,并了解如何通过 Spring for Apache Pulsar 使用它们。
Now that we have seen both PulsarListener
and the message listener container infrastructure and its various functions, let us now try to understand message redelivery and error handling.
Apache Pulsar provides various native strategies for message redelivery and error handling. We take a look at them and see how we can use them through Spring for Apache Pulsar.
Specifying Acknowledgment Timeout for Message Redelivery
默认情况下,Pulsar 消费者不会重新投递消息,除非消费者崩溃,但您可以通过在 Pulsar 消费者上设置确认超时来更改此行为。如果确认超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新投递消息。
By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.
当您使用 Spring for Apache Pulsar 时,您可以通过 [消费者自定义程序,在 PulsarListener 中进行消费者自定义]或使用`@PulsarListener`的`properties`属性中的本地 Pulsar`ackTimeout`属性来设置此属性:
When you use Spring for Apache Pulsar, you can set this property via a _consumer_customization_on_pulsarlistener or with the native Pulsar ackTimeout
property in the properties
attribute of @PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeout=60s"})
public void listen(String s) {
...
}
如果您指定确认超时,如果消费者在 60 秒内未发送确认,Pulsar 会将消息重新投递给消费者。
When you specify the ack timeout, if the consumer does not send an acknowledgement within 60 seconds, the message is redelivered by Pulsar to the consumer.
如果您想为具有不同延迟的确认超时指定一些高级回退选项,可以执行以下操作:
If you want to specify some advanced backoff options for ack timeout with different delays, you can do the following:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeout=60s" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们使用 Pulsar 的`RedeliveryBackoff`指定一个 Bean,该 Bean 具有 1 秒的最小延迟、10 秒的最大延迟和 2 的回退倍数。在初始确认超时发生后,消息重新投递将通过此回退 Bean 进行控制。我们通过将`ackTimeoutRedeliveryBackoff`属性设置为实际 Bean 名称(在本例中为`ackTimeoutRedeliveryBackoff`)将回退 Bean 提供给`PulsarListener`注释。
In the preceding example, we specify a bean for Pulsar’s RedeliveryBackoff
with a minimum delay of 1 second, a maximum delay of 10 seconds, and a backoff multiplier of 2.
After the initial ack timeout occurs, the message redeliveries are controlled through this backoff bean.
We provide the backoff bean to the PulsarListener
annotation by setting the ackTimeoutRedeliveryBackoff
property to the actual bean name — ackTimeoutRedeliveryBackoff
, in this case.
Specifying Negative Acknowledgment Redelivery
在否定确认时,Pulsar 消费者允许您指定应用程序希望如何重新投递消息。默认情况下,消息将在 1 分钟内重新投递,但您可以通过 [消费者自定义程序,在 PulsarListener 中进行消费者自定义]或使用`@PulsarListener`的`properties`属性中的本地 Pulsar `negativeAckRedeliveryDelay`属性来更改默认设置:
When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered.
The default is to redeliver the message in one minute, but you can change it via a _consumer_customization_on_pulsarlistener or with the native Pulsar negativeAckRedeliveryDelay
property in the properties
attribute of @PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以使用回退倍数指定不同的延迟和回退机制,方法是提供一个`RedeliveryBackoff` Bean,并像下面这样将 Bean 名称作为 PulsarProducer 上的`negativeAckRedeliveryBackoff`属性提供:
You can also specify different delays and backoff mechanisms with a multiplier by providing a RedeliveryBackoff
bean and providing the bean name as the negativeAckRedeliveryBackoff
property on the PulsarProducer, as follows:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
Using Dead Letter Topic from Apache Pulsar for Message Redelivery and Error Handling
Apache Pulsar 允许应用程序在具有`Shared`订阅类型的消费者上使用死信主题。对于`Exclusive`和`Failover`订阅类型,此功能不可用。基本思想是,如果某条消息被重试了一定次数(可能是由于确认超时或 Nack 重新投递),一旦重试次数用尽,消息就可以发送到一个名为死信队列 (DLQ) 的特殊主题中。让我们通过检查一些代码段来了解该功能的一些细节:
Apache Pulsar lets applications use a dead letter topic on consumers with a Shared
subscription type.
For the Exclusive
and Failover
subscription types, this feature is not available.
The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ).
Let us see some details around this feature in action by inspecting some code snippets:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个用于`DeadLetterPolicy`的特殊 Bean,它被命名为`deadLetterPolicy`(可以是您想要的任何名称)。此 Bean 指定许多内容,例如最大传递次数(在本例中为 10)和死信主题的名称(在本例中为`my-dlq-topic`)。如果您不指定 DLQ 主题名称,则在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ
。接下来,我们通过设置`deadLetterPolicy`属性将此 Bean 名称提供给`PulsarListener`。请注意,PulsarListener`的订阅类型为`Shared
,因为 DLQ 功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了 1 秒的`ackTimeout`值。这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,就会重试。如果该循环持续十次(这是我们在`DeadLetterPolicy`中的最大重新投递次数),Pulsar 消费者会将消息发布到 DLQ 主题。我们有另一个`PulsarListener`侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。
First, we have a special bean for DeadLetterPolicy
, and it is named as deadLetterPolicy
(it can be any name as you wish).
This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic
, in this case.
If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ
in Pulsar.
Next, we provide this bean name to PulsarListener
by setting the deadLetterPolicy
property.
Note that the PulsarListener
has a subscription type of Shared
, as the DLQ feature only works with shared subscriptions.
This code is primarily for demonstration purposes, so we provide an ackTimeout
value of 1 second.
The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry.
If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy
), the Pulsar consumer publishes the messages to the DLQ topic.
We have another PulsarListener
that listens on the DLQ topic to receive data as it is published to the DLQ topic.
如果主主题被分区,在后台,Pulsar 将每个分区视为单独主题。Pulsar 会将`partition-<n>`附加到主主题名称,其中`n`代表分区号。问题在于,如果您没有指定 DLQ 主题(与我们上面所做相反),Pulsar 会发布到默认主题名称,其中包含此 `partition-<n>
信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ
。解决此问题的简单方法是始终提供 DLQ 主题名称。
If the main topic is partitioned, behind the scenes, each partition is treated as a separate topic by Pulsar.
Pulsar appends partition-<n>
, where n
stands for the partition number to the main topic name.
The problem is that, if you do not specify a DLQ topic (as opposed to what we did above), Pulsar publishes to a default topic name that has this `partition-<n>
info in it — for example: topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ
.
The easy way to solve this is to provide a DLQ topic name always.
Native Error Handling in Spring for Apache Pulsar
正如我们前面提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。如果应用程序需要对非共享订阅使用一些类似功能,该怎么办?Pulsar 不支持在独占和故障转移订阅上使用 DLQ 的主要原因是这些订阅类型有顺序保证。允许重新投递、DLQ 等实际上是以无序的方式接收消息。但是,如果应用程序对此没有意见,但更重要的是,是否需要非共享订阅的此 DLQ 功能?为此,Spring for Apache Pulsar 提供了`PulsarConsumerErrorHandler`,您可以在 Pulsar 中的任何订阅类型(Exclusive
、Failover
、Shared`或`Key_Shared
)中使用它:
As we noted earlier, the DLQ feature in Apache Pulsar works only for shared subscriptions.
What does an application do if it needs to use some similar feature for non-shared subscriptions?
The main reason Pulsar does not support DLQ on exclusive and failover subscriptions is because those subscription types are order-guaranteed.
Allowing redeliveries, DLQ, and so on effectively receives messages out of order.
However, what if an application are okay with that but, more importantly, needs this DLQ feature for non-shared subscriptions?
For that, Spring for Apache Pulsar provides a PulsarConsumerErrorHandler
, which you can use across any subscription types in Pulsar: Exclusive
, Failover
, Shared
, or Key_Shared
.
当您从 Spring for Apache Pulsar 使用`PulsarConsumerErrorHandler`时,请确保不要在侦听器上设置确认超时属性。
When you use PulsarConsumerErrorHandler
from Spring for Apache Pulsar, make sure not to set the ack timeout properties on the listener.
让我们看看通过检查一些代码段来了解一些详细信息:
Let us see some details by examining a few code snippets:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑 pulsarConsumerErrorHandler
bean。这创建了一个 PulsarConsumerErrorHandler
类型的 bean,并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler
。DefaultPulsarConsumerErrorHandler
有一个构造函数,它获取一个 PulsarMessageRecovererFactory
和一个 org.springframework.util.backoff.Backoff
。PulsarMessageRecovererFactory
是一个具有以下 API 的函数式接口:
Consider the pulsarConsumerErrorHandler
bean.
This creates a bean of type PulsarConsumerErrorHandler
and uses the default implementation provided out of the box by Spring for Apache Pulsar: DefaultPulsarConsumerErrorHandler
.
DefaultPulsarConsumerErrorHandler
has a constructor that takes a PulsarMessageRecovererFactory
and a org.springframework.util.backoff.Backoff
.
PulsarMessageRecovererFactory
is a functional interface with the following API:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
recovererForConsumer
方法获取一个 Pulsar 消费者并返回一个 PulsarMessageRecoverer
,这是一个另外的函数式接口。以下是 PulsarMessageRecoverer
的 API:
The recovererForConsumer
method takes a Pulsar consumer and returns a PulsarMessageRecoverer
, which is another functional interface.
Here is the API of PulsarMessageRecoverer
:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 为 PulsarMessageRecovererFactory
提供了一个实现,称为 PulsarDeadLetterPublishingRecoverer
,该实现提供了一个默认实现,可以通过将其发送到死信主题 (DLT) 来恢复消息。我们将此实现提供给前面 DefaultPulsarConsumerErrorHandler
的构造函数。作为第二个参数,我们提供一个 FixedBackOff
。你还可以提供 Spring 提供的 ExponentialBackoff
以获取高级后退功能。然后,我们将此 bean 名称提供给 PulsarConsumerErrorHandler
,作为 PulsarListener
的属性。该属性称为 pulsarConsumerErrorHandler
。每次 PulsarListener
方法因消息而失败时,它都会被重试。重试次数由提供的 Backoff
实现值控制。在我们的示例中,我们执行了 10 次重试(总共 11 次尝试——第一次和随后 10 次重试)。一旦耗尽所有重试,消息将被发送到 DLT 主题。
Spring for Apache Pulsar provides an implementation for PulsarMessageRecovererFactory
called PulsarDeadLetterPublishingRecoverer
that provides a default implementation that can recover the message by sending it to a Dead Letter Topic (DLT).
We provide this implementation to the constructor for the preceding DefaultPulsarConsumerErrorHandler
.
As the second argument, we provide a FixedBackOff
.
You can also provide the ExponentialBackoff
from Spring for advanced backoff features.
Then we provide this bean name for the PulsarConsumerErrorHandler
as a property to the PulsarListener
.
The property is called pulsarConsumerErrorHandler
.
Each time the PulsarListener
method fails for a message, it gets retried.
The number of retries are controlled by the Backoff
provided implementation values. In our example, we do 10 retries (11 total tries — the first one and then the 10 retries).
Once all the retries are exhausted, the message is sent to the DLT topic.
我们提供的 PulsarDeadLetterPublishingRecoverer
实现使用 PulsarTemplate
,该模板用于将消息发布到 DLT。在大多数情况下,来自 Spring Boot 的相同自动配置 PulsarTemplate
就足够了,对于分区主题有一个警告。当使用分区主题以及对主主题使用自定义消息路由时,你必须使用不采用自动配置的 PulsarTemplate
,其使用带有 message-routing-mode
值 custompartition
的 PulsarProducerFactory
。你可以使用带有以下蓝图的 PulsarConsumerErrorHandler
:
The PulsarDeadLetterPublishingRecoverer
implementation we provide uses a PulsarTemplate
that is used for publishing the message to the DLT.
In most cases, the same auto-configured PulsarTemplate
from Spring Boot is sufficient with the caveat for partitioned topics.
When using partitioned topics and using custom message routing for the main topic, you must use a different PulsarTemplate
that does not take the auto-configured PulsarProducerFactory
that is populated with a value of custompartition
for message-routing-mode
.
You can use a PulsarConsumerErrorHandler
with the following blueprint:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们要将目标解析程序作为第二个构造函数参数提供给 PulsarDeadLetterPublishingRecoverer
。如果没有提供,PulsarDeadLetterPublishingRecoverer
将 <subscription-name>-<topic-name>-DLT>
用作 DLT 主题名称。使用此功能时,你应当通过设置目标解析程序来使用一个合适的目标名称,而不是使用默认值。
Note that we are provide a destination resolver to the PulsarDeadLetterPublishingRecoverer
as the second constructor argument.
If not provided, PulsarDeadLetterPublishingRecoverer
uses <subscription-name>-<topic-name>-DLT>
as the DLT topic name.
When using this feature, you should use a proper destination name by setting the destination resolver rather than using the default.
当使用单记录消息侦听器时,就像我们在 PulsarConsumerErrorHnadler
中所做的那样,并且如果你使用手动确认,请务必确保在引发异常时不否定确认消息。相反,将异常重新抛回容器。否则,容器认为该消息被单独处理,并且不会触发错误处理。
When using a single record message listener, as we did with PulsarConsumerErrorHnadler
, and if you use manual acknowledgement, make sure to not negatively acknowledge the message when an exception is thrown.
Rather, re-throw the exception back to the container. Otherwise, the container thinks the message is handled separately, and the error handling is not triggered.
最后,我们有一个第二个 PulsarListener
,它从 DLT 主题接收消息。
Finally, we have a second PulsarListener
that receives messages from the DLT topic.
在本节中提供的示例中,我们只看到了如何将 PulsarConsumerErrorHandler
与一个单记录消息侦听器配合使用。接下来,我们看看如何在批处理侦听器上使用它。
In the examples provided in this section so far, we only saw how to use PulsarConsumerErrorHandler
with a single record message listener.
Next, we look at how you can use this on batch listeners.
Batch listener with PulsarConsumerErrorHandler
首先,让我们看看一个批处理 PulsarListener
方法:
First, let us look at a batch PulsarListener
method:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
我们再次用 PulsarConsumerErrorHandler
bean 名称提供 pulsarConsumerErrorHandler
属性。当你使用批处理侦听器(如前面示例中所示)并希望使用 Spring for Apache Pulsar 的 PulsarConsumerErrorHandler
时,你需要使用手动确认。通过这种方式,你可以确认所有成功的个别消息。对于失败的消息,你必须使用失败的消息抛出 PulsarBatchListenerFailedException
。没有此异常,框架不知道如何处理失败。在重试时,容器会发送一批新的消息,从失败的消息开始发送给侦听器。如果再次失败,它将被重试,直到重试用尽为止,此时消息将被发送到 DLT。那时,该消息将被容器确认,侦听器将移交给原始批处理中的后续消息。
Once again, we provide the pulsarConsumerErrorHandler
property with the PulsarConsumerErrorHandler
bean name.
When you use a batch listener (as shown in the preceding example) and want to use the PulsarConsumerErrorHandler
from Spring for Apache Pulsar, you need to use manual acknowledgment.
This way, you can acknowledge all the successful individual messages.
For the ones that fail, you must throw a PulsarBatchListenerFailedException
with the message on which it fails.
Without this exception, the framework does not know what to do with the failure.
On retry, the container sends a new batch of messages, starting with the failed message to the listener.
If it fails again, it is retried, until the retries are exhausted, at which point the message is sent to the DLT.
At that point, the message is acknowledged by the container, and the listener is handed over with the subsequent messages in the original batch.
Consumer Customization on PulsarListener
Spring for Apache Pulsar 提供了一种便捷的方法来定制 PulsarListener
使用的容器创建的消费者。应用程序可以为 PulsarListenerConsumerBuilderCustomizer
提供一个 bean。这里是一个示例。
Spring for Apache Pulsar provides a convenient way to customize the consumer created by the container used by the PulsarListener
.
Applications can provide a bean for PulsarListenerConsumerBuilderCustomizer
.
Here is an example.
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,此自定义器 bean 名称可以像下面所示作为 PuslarListener
注解上的属性提供。
Then this customizer bean name can be provided as an attribute on the PuslarListener
annotation as shown below.
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过 PulsarListener
检测提供的 bean,并在创建 Pulsar 消费者之前对 Consumer 构建器应用此自定义器。
The framework detects the provided bean through the PulsarListener
and applies this customizer on the Consumer builder before creating the Pulsar Consumer.
如果你有多个 PulsarListener
方法,并且它们各自具有不同的自定义规则,你应该创建多个自定义器 bean,并将适当的自定义器附加到每个 PulsarListener
。
If you have multiple PulsarListener
methods, and each of them have different customization rules, you should create multiple customizer beans and attach the proper customizers on each PulsarListener
.
Pausing and Resuming Message Listener Containers
在某些情况下,应用程序可能希望暂时暂停消息消费,然后稍后再恢复。Spring for Apache Pulsar 提供了暂停和恢复基础消息侦听器容器的能力。当 Pulsar 消息侦听器容器被暂停时,容器执行的任何轮询以从 Pulsar 消费者接收数据都将被暂停。类似地,当容器恢复时,如果暂停期间主题有任何新的记录添加,则下一个轮询开始返回数据。
There are situations in which an application might want to pause message consumption temporarily and then resume later. Spring for Apache Pulsar provides the ability to pause and resume the underlying message listener containers. When the Pulsar message listener container is paused, any polling done by the container to receive data from the Pulsar consumer will be paused. Similarly, when the container is resumed, the next poll starts returning data if the topic has any new records added while paused.
要暂停或恢复侦听器容器,首先通过 PulsarListenerEndpointRegistry
bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下面的代码段所示:
To pause or resume a listener container, first obtain the container instance via the PulsarListenerEndpointRegistry
bean and then invoke the pause/resume API on the container instance - as shown in the snippet below:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给 |
The id parameter passed to |
Pulsar Reader Support
此框架提供支持,以通过 `PulsarReaderFactory`使用 {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader]。
The framework provides support for using {apache-pulsar-docs}/concepts-clients/#reader-interface[Pulsar Reader] via the PulsarReaderFactory
.
Spring Boot 提供此读者工厂,您可以通过指定 {spring-boot-pulsar-config-props}[spring.pulsar.reader.*
] 应用程序属性来进一步配置该工厂。
Spring Boot provides this reader factory which you can further configure by specifying any of the {spring-boot-pulsar-config-props}[spring.pulsar.reader.*
] application properties.
PulsarReader Annotation
虽然可以直接使用 PulsarReaderFactory
,但 Spring for Apache Pulsar 提供了 PulsarReader
注解,你可以使用它来从一个主题快速读取,而无需自己设置任何读取器工厂。这与 PulsarListener
背后的想法类似。这是一个快速示例。
While it is possible to use PulsarReaderFactory
directly, Spring for Apache Pulsar provides the PulsarReader
annotation that you can use to quickly read from a topic without setting up any reader factories yourselves.
This is similar to the same ideas behind PulsarListener.
Here is a quick example.
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
id
属性是可选的,但最好为你的应用程序提供一个有意义的值。如果未指定,将使用自动生成的 ID。另一方面,topics
和 startMessageId
属性是必需的。topics
属性可以是一个单独的主题或一个以逗号分隔的主题列表。startMessageId
属性指示读取器从主题中的特定消息开始。startMessageId
的有效值为 earliest
或 latest
。假设你希望读取器从主题中的任意消息开始,而不是最早或最新的可用消息。在这种情况下,你需要使用 ReaderBuilderCustomizer
来定制 ReaderBuilder
,以便它知道要从哪个正确的 MessageId
开始。
The id
attribute is optional, but it is a best practice to provide a value that is meaningful to your application.
When not specified an auto-generated id will be used.
On the other hand, the topics
and startMessageId
attributes are mandatory.
The topics
attribute can be a single topic or a comma-separated list of topics.
The startMessageId
attribute instructs the reader to start from a particular message in the topic.
The valid values for startMessageId
are earliest
or latest.
Suppose you want the reader to start reading messages arbitrarily from a topic other than the earliest or latest available messages. In that case, you need to use a ReaderBuilderCustomizer
to customize the ReaderBuilder
so it knows the right MessageId
to start from.
Customizing the ReaderBuilder
你可以在 Spring for Apache Pulsar 中使用 PulsarReaderReaderBuilderCustomizer
来自定义通过 ReaderBuilder
可用的任何字段。你可以提供一个 PulsarReaderReaderBuilderCustomizer
类型的 @Bean
,然后如下使其对 PulsarReader
可用。
You can customize any fields available through ReaderBuilder
using a PulsarReaderReaderBuilderCustomizer
in Spring for Apache Pulsar.
You can provide a @Bean
of type PulsarReaderReaderBuilderCustomizer
and then make it available to the PulsarReader
as below.
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序中仅注册有一个 |
If your application only has a single |
User specified
在生成或使用消息时需要目标主题。框架按以下顺序查找主题,以便确定主题(在首次找到时停止):
A destination topic is needed when producing or consuming messages. The framework looks in the following ordered locations to determine a topic (stopping at the first find):
-
User specified
-
Message type default
-
Global default
当通过其中一种默认机制找到主题时,无需在生成或使用 API 时指定主题。
When a topic is found via one of the default mechanisms, there is no need to specify the topic on the produce or consume API.
当未找到主题时,API 将相应抛出一个异常。
When a topic is not found, the API will throw an exception accordingly.
User specified
传递给所用 API 的主题具有最高优先级(例如,PulsarTemplate.send("my-topic", myMessage)
或 @PulsarListener(topics = "my-topic"
)。
A topic passed into the API being used has the highest precedence (eg. PulsarTemplate.send("my-topic", myMessage)
or @PulsarListener(topics = "my-topic"
).
Message type default
当未将主题传递给 API 时,系统会查找为正在生成或消费的消息类型配置到主题的映射。
When no topic is passed into the API, the system looks for a message type to topic mapping configured for the type of the message being produced or consumed.
可以使用 spring.pulsar.defaults.type-mappings
属性配置映射。以下示例使用 application.yml
为在消费或生成 Foo
或 Bar
消息时使用默认主题配置:
Mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to configure default topics to use when consuming or producing Foo
or Bar
messages:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
|
The |
如果消息(或 Publisher
输入的第一个消息)是 null
,框架将无法从中确定主题。如果您的应用程序可能发送 null
消息,则应使用另一种方法指定主题。
If the message (or the first message of a Publisher
input) is null
, the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null
messages.
Specified via annotation
当未将主题传递给 API 并且未配置自定义主题映射时,系统会在正在生成或消费的消息的类上查找 @PulsarMessage
批注。可以在批注上的 topic
特性中指定默认主题。
When no topic is passed into the API and there are no custom topic mappings configured, the system looks for a @PulsarMessage
annotation on the class of the message being produced or consumed.
The default topic can be specified via the topic
attribute on the annotation.
以下示例配置在生成或消费 Foo
类型的消息时使用的默认主题:
The following example configures the default topic to use when producing or consuming messages of type Foo
:
@PulsarMessage(topic = "foo-topic")
record Foo(String value) {
}
@PulsarMessage
批注中支持属性占位符和 SpEL 表达式,例如:
Property placeholders and SpEL expressions are supported in the @PulsarMessage
annotation,
for example:
@PulsarMessage(topic = "${app.topics.foo}")
record Foo(String value) {
}
@PulsarMessage(topic = "#{someBean.getTopic()}")
record Bar(String value) {
}
Custom topic resolver
添加映射的首选方法是通过上述属性。然而,如果需要更多控制,可以通过提供自己的实现来替换默认解析器,例如:
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can replace the default resolver by proving your own implementation, for example:
@Bean
public MyTopicResolver topicResolver() {
return new MyTopicResolver();
}
Producer global default
在(生成时)咨询的最终位置是系统范围的生成器默认主题。使用命令式 API 时,它通过 spring.pulsar.producer.topic-name
属性配置,使用反应式 API 时,通过 spring.pulsar.reactive.sender.topic-name
属性配置。
The final location consulted (when producing) is the system-wide producer default topic.
It is configured via the spring.pulsar.producer.topic-name
property when using the imperative API and the spring.pulsar.reactive.sender.topic-name
property when using the reactive API.
Consumer global default
在(消费时)咨询的最终位置是系统范围的消费者默认主题。使用命令式 API 时,它通过 spring.pulsar.consumer.topics
或 spring.pulsar.consumer.topics-pattern
属性配置,使用反应式 API 时,它通过 spring.pulsar.reactive.consumer.topics
或 spring.pulsar.reactive.consumer.topics-pattern
属性中的一个属性配置。
The final location consulted (when consuming) is the system-wide consumer default topic.
It is configured via the spring.pulsar.consumer.topics
or spring.pulsar.consumer.topics-pattern
property when using the imperative API and one of the spring.pulsar.reactive.consumer.topics
or spring.pulsar.reactive.consumer.topics-pattern
property when using the reactive API.
Publishing and Consuming Partitioned Topics
在下面的示例中,我们发布到名为 hello-pulsar-partitioned
的主题。这是一个已分区的主题,并且在这个示例中,我们假设该主题已使用三个分区创建。
In the following example, we publish to a topic called hello-pulsar-partitioned
.
It is a topic that is partitioned, and, for this sample, we assume that the topic is already created with three partitions.
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
在前面的示例中,我们发布到分区主题,并且我们希望将一些数据段发布到特定分区。如果你将它留给 Pulsar 的默认设置,它将遵循分区分配的循环模式,并且我们希望覆盖它。为此,我们提供了一个带有 send
方法的消息路由器对象。考虑实现的三种消息路由器。FooRouter
总发送数据到分区 0
,BarRouter
发送到分区 1
,BuzzRouter
发送到分区 2
。还要注意,我们现在使用 PulsarTemplate
的 sendAsync
方法,该方法会返回 CompletableFuture
。在运行应用程序时,我们还需要将生产者的 messageRoutingMode
设置为 CustomPartition
(spring.pulsar.producer.message-routing-mode
)。
In the preceding example, we publish to a partitioned topic, and we would like to publish some data segment to a specific partition.
If you leave it to Pulsar’s default, it follows a round-robin mode of partition assignments, and we would like to override that.
To do so, we provide a message router object with the send
method.
Consider the three message routers implemented.
FooRouter
always sends data to partition 0
, BarRouter
sends to partition 1
, and BuzzRouter
sends to partition 2
.
Also note that we now use the sendAsync
method of PulsarTemplate
that returns a CompletableFuture
.
When running the application, we also need to set the messageRoutingMode
on the producer to CustomPartition
(spring.pulsar.producer.message-routing-mode
).
在消费者端,我们使用了一个具有独占订阅类型的 PulsarListener
。这意味着来自所有分区的将最终出现在同一个消费者中,且没有排序的保证。
On the consumer side, we use a PulsarListener
with the exclusive subscription type.
This means that data from all the partitions ends up in the same consumer and there is no ordering guarantee.
如果我们希望每个分区都被一个单独的消费者消费,我们能做些什么?我们可以切换到“故障转移”订阅模式,并添加三个单独的消费者:
What can we do if we want each partition to be consumed by a single distinct consumer?
We can switch to the failover
subscription mode and add three separate consumers:
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
当你遵循此方法时,一个分区总会被一个专用的消费者消费。
When you follow this approach, a single partition always gets consumed by a dedicated consumer.
类似地,如果你想使用 Pulsar 的共享消费者类型,可以使用 shared
订阅类型。但是,当你使用 shared
模式时,你会失去任何排序保证,因为一个消费者在另一个消费者有机会之前从所有分区接收消息。
In a similar vein, if you want to use Pulsar’s shared consumer type, you can use the shared
subscription type.
However, when you use the shared
mode, you lose any ordering guarantees, as a single consumer may receive messages from all the partitions before another consumer gets a chance.
请考虑以下示例:
Consider the following example:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}