Configuration

从 2.9 版开始,对于默认配置,应该在带有 @Configuration 批注的类中使用 @EnableKafkaRetryTopic 批注。这会启用此功能以正确启动,并且可以访问注入的某些功能组件,以便在运行时进行查找。

Starting with version 2.9, for default configuration, the @EnableKafkaRetryTopic annotation should be used in a @Configuration annotated class. This enables the feature to bootstrap properly and gives access to injecting some of the feature’s components to be looked up at runtime.

如果添加了此注释,则无需也添加 @EnableKafka,因为 @EnableKafkaRetryTopic 使用 @EnableKafka 进行了元注释。

It is not necessary to also add @EnableKafka, if you add this annotation, because @EnableKafkaRetryTopic is meta-annotated with @EnableKafka.

而且,从该版本开始,对于该特性组件和全局特性的更高级配置,RetryTopicConfigurationSupport 类应该在 @Configuration 类中扩展,并覆盖适当的方法。有关更多详细信息,请参阅 Configuring Global Settings and Features

Also, starting with that version, for more advanced configuration of the feature’s components and global features, the RetryTopicConfigurationSupport class should be extended in a @Configuration class, and the appropriate methods overridden. For more details refer to Configuring Global Settings and Features.

默认情况下,重试主题的容器与主容器具有相同的并发性。从 3.0 版开始,您可以为重试容器设置不同的 并发性(在批注中或在 RetryConfigurationBuilder 中)。

By default, the containers for the retry topics will have the same concurrency as the main container. Starting with version 3.0, you can set a different concurrency for the retry containers (either on the annotation, or in RetryConfigurationBuilder).

只可使用上述技术之一,且只有一个 @Configuration 类可扩展 RetryTopicConfigurationSupport

Only one of the above techniques can be used, and only one @Configuration class can extend RetryTopicConfigurationSupport.

Using the @RetryableTopic annotation

要为带 @KafkaListener 注释的方法配置重试主题和 DLT,你只需向它添加 @RetryableTopic 注释,Apache Kafka 的 Spring 将使用默认配置引导所需的所有主题和消费者。

To configure the retry topic and dlt for a @KafkaListener annotated method, you just have to add the @RetryableTopic annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

自 3.2 起,@RetryableTopic 对类上 @KafkaListener 的支持将是:

Since 3.2, @RetryableTopic support for @KafkaListener on a class would be:

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

你可以通过用 @DltHandler 注释来指定类中处理 DLT 消息的方法。如果没有提供 DltHandler 方法,则创建默认使用者,它只记录消耗。

You can specify a method in the same class to process the dlt messages by annotating it with the @DltHandler annotation. If no DltHandler method is provided a default consumer is created which only logs the consumption.

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

如果您未指定 kafkaTemplate 名称,则会查找名称为 defaultRetryTopicKafkaTemplate 的 Bean。如果未找到 Bean,则会引发异常。

If you don’t specify a kafkaTemplate name a bean with name defaultRetryTopicKafkaTemplate will be looked up. If no bean is found an exception is thrown.

从 3.0 版开始,@RetryableTopic 注释可用作自定义注释的元注释;例如:

Starting with version 3.0, the @RetryableTopic annotation can be used as a meta-annotation on custom annotations; for example:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

Using RetryTopicConfiguration beans

你还可以通过在带 @Configuration 注释的类中创建 RetryTopicConfiguration Bean 来配置非阻塞重试支持。

You can also configure the non-blocking retry support by creating RetryTopicConfiguration beans in a @Configuration annotated class.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为使用默认配置带 @KafkaListener 注释的方法中的所有主题创建重试主题和 DLT 以及相应的使用者。需要 KafkaTemplate 实例进行消息转发。

This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with @KafkaListener using the default configurations. The KafkaTemplate instance is required for message forwarding.

要对如何处理每个主题的非阻塞重试进行更细粒度的控制,可以提供多个 RetryTopicConfiguration Bean。

To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one RetryTopicConfiguration bean can be provided.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}

重试主题和 DLT 的使用者将被分配到一个使用者组,该组的组 ID 是您在 @KafkaListener 注释的 groupId 参数中提供的组 ID 与该主题的后缀组合。如果您未提供任何主题,则它们都属于同一个组,并且在重试主题上重新平衡会导致在主主题上进行不必要的重新平衡。

The retry topics' and dlt’s consumers will be assigned to a consumer group with a group id that is the combination of the one which you provide in the groupId parameter of the @KafkaListener annotation with the topic’s suffix. If you don’t provide any they’ll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.

如果消费者配置有 ErrorHandlingDeserializer 来处理反序列化异常,则配置 KafkaTemplate 及其生产者使用可以处理正常对象以及由反序列化异常导致的原始 byte[] 值的序列化程序非常重要。模板的通用值类型应为 Object。一种方法是使用 DelegatingByTypeSerializer;示例如下:

If the consumer is configured with an ErrorHandlingDeserializer, to handle deserialization exceptions, it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions. The generic value type of the template should be Object. One technique is to use the DelegatingByTypeSerializer; an example follows:

@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

可以对同一个主题使用多个 @KafkaListener 注释,可以手动分配分区,也可以不手动分配分区,同时还可以使用非阻塞重试,但对于给定的主题只使用一个配置。最好为此类主题的配置使用一个单独的 RetryTopicConfiguration Bean;如果针对同一个主题使用了多个 @RetryableTopic 注释,则所有注释都应具有相同的值,否则其中一个注释将应用于该主题的所有侦听器,而其他注释的值将被忽略。

Multiple @KafkaListener annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic. It’s best to use a single RetryTopicConfiguration bean for configuration of such topics; if multiple @RetryableTopic annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic’s listeners and the other annotations' values will be ignored.

Configuring Global Settings and Features

自 2.9 起,用于配置组件的以前 Bean 覆盖方法已被删除(未弃用,由于上述 API 的实验性质)。这不会改变 RetryTopicConfiguration Bean 方法,而只改变基础设施组件的配置。现在应该在(单个)@Configuration 类中扩展 RetryTopicConfigurationSupport 类,并覆盖适当的方法。示例如下:

Since 2.9, the previous bean overriding approach for configuring components has been removed (without deprecation, due to the aforementioned experimental nature of the API). This does not change the RetryTopicConfiguration beans approach - only infrastructure components' configurations. Now the RetryTopicConfigurationSupport class should be extended in a (single) @Configuration class, and the proper methods overridden. An example follows:

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}

在使用这种配置方法的情况下,不应使用 @EnableKafkaRetryTopic 注释来防止由于重复的 Bean 而导致上下文启动失败。请改用简单的 @EnableKafka 注释。

When using this configuration approach, the @EnableKafkaRetryTopic annotation should not be used to prevent context failing to start due to duplicated beans. Use the simple @EnableKafka annotation instead.

autoCreateTopics 为 true 时,将使用指定的分区数和复制因子创建主主题和重试主题。从版本 3.0 开始,默认复制因子为 -1,表示使用代理默认值。如果代理版本低于 2.4,则需要设置显式值。若要针对特定主题(例如主主题或 DLT)覆盖这些值,只需添加带有所需属性的 NewTopic @Bean;这将覆盖自动创建属性。

When autoCreateTopics is true, the main and retry topics will be created with the specified number of partitions and replication factor. Starting with version 3.0, the default replication factor is -1, meaning using the broker default. If your broker version is earlier than 2.4, you will need to set an explicit value. To override these values for a particular topic (e.g. the main topic or DLT), simply add a NewTopic @Bean with the required properties; that will override the auto creation properties.

默认情况下,记录将使用接收记录的原始分区发布到重试主题。如果重试主题的分区比主主题的分区少,则应适当配置框架;如下所示的一个示例。

By default, records are published to the retry topic(s) using the original partition of the received record. If the retry topics have fewer partitions than the main topic, you should configure the framework appropriately; an example follows.

@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

函数的参数是使用者记录和下一个主题的名称。可以返回特定分区数,或 null 以指示 KafkaProducer 应确定分区。

The parameters to the function are the consumer record and the name of the next topic. You can return a specific partition number, or null to indicate that the KafkaProducer should determine the partition.

在默认情况下,当记录在重试主题中转换时,重试报头(尝试次数、时间戳)的所有值将被保留。从 2.9.6 版开始,如果你只想保留这些报头的最后一个值,请使用上面显示的 configureDeadLetterPublishingContainerFactory() 方法将工厂的 retainAllRetryHeaderValues 属性设置为 false

By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics. Starting with version 2.9.6, if you want to retain just the last value of these headers, use the configureDeadLetterPublishingContainerFactory() method shown above to set the factory’s retainAllRetryHeaderValues property to false.

Find RetryTopicConfiguration

尝试提供 RetryTopicConfiguration 实例,通过从 @RetryableTopic 注释创建实例,或在没有可用注释的情况下从 Bean 容器创建实例。

Attempts to provide an instance of RetryTopicConfiguration by either creating one from a @RetryableTopic annotation, or from the bean container if no annotation is available.

如果在容器中找到 Bean,则会检查提供的主题是否应由任何这样一个实例来处理。

If beans are found in the container, there’s a check to determine whether the provided topics should be handled by any of such instances.

如果提供了 @RetryableTopic 注释,则会查找 DltHandler 注释的方法。

If @RetryableTopic annotation is provided, a DltHandler annotated method is looked up.

自 3.2 起,在类上注释 @RetryableTopic 时提供新的 API 来创建 RetryTopicConfiguration

since 3.2, provide new API to Create RetryTopicConfiguration when @RetryableTopic annotated on a class:

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}