Tips, Tricks and Recipes

Simple DLQ with Kafka

Problem Statement

作为一名开发者,我想编写一个从 Kafka 主题处理记录的使用者应用程序。但是,如果在处理过程中出现错误,我不想让应用程序完全停止。相反,我想把错误记录发送到 DLT(死信主题),然后继续处理新记录。

As a developer, I want to write a consumer application that processes records from a Kafka topic. However, if some error occurs in processing, I don’t want the application to stop completely. Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.

Solution

此问题的解决方案是在 Spring Cloud Stream 中使用 DLQ 功能。为了对此展开讨论,让我们假设以下为我们的处理器函数。

The solution for this problem is to use the DLQ feature in Spring Cloud Stream. For the purposes of this discussion, let us assume that the following is our processor function.

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常简单的函数,针对其处理的所有记录抛出一个异常,但你可以采用此函数并将其扩展到任何其他类似情况。

This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.

为了把错误记录发送到 DLT,我们需要提供以下配置。

In order to send the records in error to a DLT, we need to provide the following configuration.

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活 DLQ,应用程序必须提供一个组名。匿名使用者无法使用 DLQ 设施。我们还需要通过将 Kafka 使用者绑定上的 enableDLQ 属性设置为“true”来启用 DLQ。最后,我们可选择通过在 Kafka 使用者绑定上提供 dlqName 来提供 DLT 名称,否则在这种情况下,其默认为 error.input-topic.my-group

In order to activate DLQ, the application must provide a group name. Anonymous consumers cannot use the DLQ facilities. We also need to enable DLQ by setting the enableDLQ property on the Kafka consumer binding to true. Finally, we can optionally provide the DLT name by providing the dlqName on Kafka consumer binding, which otherwise default to error.input-topic.my-group in this case.

请注意,在上述示例使用者中,主题的类型为 byte[]。默认情况下,Kafka 绑定程序的 DLQ 生产者预期主题的类型为 byte[]。如果没有满足此条件,则我们需要提供适当序列化器的配置。例如,我们重新编写使用者函数,如下所示:

Note that in the example consumer provided above, the type of the payload is byte[]. By default, the DLQ producer in Kafka binder expects the payload of type byte[]. If that is not the case, then we need to provide the configuration for proper serializer. For example, let us re-write the consumer function as below:

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,希望如何在写入 DLT 时对数据进行序列化。下面是针对此场景修改的配置:

Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT. Here is the modified configuration for this scenario:

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

DLQ with Advanced Retry Options

Problem Statement

这与上面的配方类似,不过作为开发人员,我想配置如何处理重试。

This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.

Solution

如果您按照上面的配方进行操作,那么当处理遇到错误时,您将获得内置到 Kafka binder 中的默认重试选项。

If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.

默认情况下,binder 最多重试 3 次,首次延迟一秒,每次递减 2.0,最大延迟 10 秒。您可以如下更改所有这些配置:

By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds. You can change all these configurations as below:

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果您愿意,还可以通过提供布尔值映射来提供可重试异常列表。例如,

If you want, you can also provide a list of retryable exceptions by providing a map of boolean values. For example,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,上述映射中未列出的任何异常都将被重试。如果不需要这样做,则可以通过提供以下内容来禁用它:

By default, any exceptions not listed in the map above will be retried. If that is not desired, then you can disable that by providing,

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您还可以提供自己的`RetryTemplate`,并将其标记为`@StreamRetryTemplate`,该标记将由 binder 扫描并使用。当您需要更复杂的重试策略时,这样做非常有用。

You can also provide your own RetryTemplate and mark it as @StreamRetryTemplate which will be scanned and used by the binder. This is useful when you want more sophisticated retry strategies and policies.

如果您有多个`@StreamRetryTemplate` bean,则可以使用`property`指定绑定所需的 bean,

If you have multiple @StreamRetryTemplate beans, then you can specify which one your binding wants by using the property,

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

Handling Deserialization errors with DLQ

Problem Statement

我的一个处理器在 Kafka 消费者中遇到反序列化异常。我希望 Spring Cloud Stream DLQ 机制会捕获该场景,但它并没有。我该如何处理?

I have a processor that encounters a deserialization exception in Kafka consumer. I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not. How can I handle this?

Solution

当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的普通 DLQ 机制帮不了忙。这是因为此异常在消费者的`poll` () 方法返回之前就会发生。Spring for Apache Kafka 项目提供了一些很好的方法来帮助 binder 处理这种情况。我们来探索一下这些方法。

The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization exception. This is because, this exception happens even before the consumer’s poll() method returns. Spring for Apache Kafka project offers some great ways to help the binder with this situation. Let us explore those.

假设这是我们的函数:

Assuming this is our function:

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个接受`String`参数的简单函数。

It is a trivial function that takes a String parameter.

我们要绕过 Spring Cloud Stream 提供的消息转换器,想要改为使用本机反序列化器。在`String`类型的情况下,这样做没有多大意义,但如果是更复杂类型(如 AVRO 等),则必须依赖外部反序列化器,因此要将转换委托给 Kafka。

We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead. In the case of String types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.

现在当消费者接收到数据时,让我们假设有一条有缺陷的记录导致反序列化错误,例如某人传递了一个`Integer`而不是`String`。在这种情况下,如果您在应用程序中不执行任何操作,该异常将通过链传播,并且最终您的应用程序将退出。

Now when the consumer receives the data, let us assume that there is a bad record that causes a deserialization error, maybe someone passed an Integer instead of a String for example. In that case, if you don’t do something in the application, the exception will be propagated through the chain and your application will exit eventually.

为了处理这种情况,您可以添加一个`ListenerContainerCustomizer` @Bean`来配置`DefaultErrorHandler。此`DefaultErrorHandler`使用`DeadLetterPublishingRecoverer`进行配置。我们还需要为消费者配置一个`ErrorHandlingDeserializer`。这听起来像是很多复杂的事情,但实际上,在这种情况下,它归结为这 3 个 bean。

In order to handle this, you can add a ListenerContainerCustomizer @Bean that configures a DefaultErrorHandler. This DefaultErrorHandler is configured with a DeadLetterPublishingRecoverer. We also need to configure an ErrorHandlingDeserializer for the consumer. That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们分析每个函数。第一个函数是 ListenerContainerCustomizer bean,它接收一个 DefaultErrorHandler。现在,容器通过特定的错误处理程序进行了自定义。你可以了解更多有关容器自定义的信息 here

Let us analyze each of them. The first one is the ListenerContainerCustomizer bean that takes a DefaultErrorHandler. The container is now customized with that particular error handler. You can learn more about container customization here.

第二个 bean 是 DefaultErrorHandler,该 bean 配置为发布到 DLT。参阅 here 了解更多有关 DefaultErrorHandler 的详细信息。

The second bean is the DefaultErrorHandler that is configured with a publishing to a DLT. See here for more details on DefaultErrorHandler.

第三个 bean 是 DeadLetterPublishingRecoverer,它最终负责发送到 DLT。默认情况下,DLT 主题被命名为 ORIGINAL_TOPIC_NAME.DLT。不过,你可以对它进行更改。参阅 docs 了解更多详细信息。

The third bean is the DeadLetterPublishingRecoverer that is ultimately responsible for sending to the DLT. By default, the DLT topic is named as the ORIGINAL_TOPIC_NAME.DLT. You can change that though. See the docs for more details.

我们还需要通过应用程序配置对“错误处理反序列化器”进行配置。

We also need to configure an ErrorHandlingDeserializer through application config.

`ErrorHandlingDeserializer`委托给实际的反序列化器。在出现错误的情况下,它将记录的键/值设置为 null,并包含消息的原始字节。然后它在 header 中设置异常,并将此记录传递给侦听器,侦听器随后调用注册的错误处理程序。

The ErrorHandlingDeserializer delegates to the actual deserializer. In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message. It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.

以下是必需的配置:

Following is the configuration required:

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们通过绑定上的 configuration 属性提供 ErrorHandlingDeserializer。我们还指示实际需要委派的是 StringDeserializer

We are providing the ErrorHandlingDeserializer through the configuration property on the binding. We are also indicating that the actual deserializer to delegate is the StringDeserializer.

请记住,以上 dlq 属性都不与本食谱中的讨论相关。它们的目的是只处理任何应用程序级别错误。

Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe. They are purely meant for addressing any application level errors only.

Basic offset management in Kafka binder

Problem Statement

我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不知道它如何来管理 Kafka 消费者偏移量。你能解释一下吗?

I want to write a Spring Cloud Stream Kafka consumer application and not sure about how it manages Kafka consumer offsets. Can you explain?

Solution

我们鼓励您阅读 docs 部分,以彻底了解它。

We encourage you read the docs section on this to get a thorough understanding on it.

以下是一个要点:

Here is it in a gist:

默认情况下,Kafka 支持两种类型的偏移量来开始,它们是 earliestlatest。它们的语义从名字中不言自明。

Kafka supports two types of offsets to start with by default - earliest and latest. Their semantics are self-explanatory from their names.

假设您是首次运行消费者。如果您错过了 Spring Cloud Stream 应用程序中的 group.id,那么它就变成匿名消费者。每当您有一个匿名消费者时,在这种情况下,Spring Cloud Stream应用程序默认会从主题分区中可用的 latest 偏移量开始。另一方面,如果您明确指定了一个 group.id,那么默认情况下,Spring Cloud Stream 应用程序将从主题分区中可用的 earliest 偏移量开始。

Assuming you are running the consumer for the first time. If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer. Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the latest available offset in the topic partition. On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the earliest available offset in the topic partition.

在以上两种情况下(带显式组的消费者和匿名组),都可以通过使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 并将其设置为 earliestlatest 来切换开始偏移量。

In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset and setting it to either earliest or latest.

现在,假设您以前已经运行过消费者,现在重新启动它。在这种情况下,上述情况中的开始偏移量语义不适用,因为消费者会找到消费者组的已提交偏移量(在匿名消费者的情况下,尽管应用程序没有提供 group.id,但绑定器会自动为您生成一个)。它只是从最后提交的偏移量开始。即便您提供了 startOffset 值,但这仍然成立。

Now, assume that you already ran the consumer before and now starting it again. In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you). It simply picks up from the last committed offset onward. This is true, even when you have a startOffset value provided.

但是,您可以通过使用`resetOffsets` 属性来覆盖消费者从最后提交的偏移量开始的默认行为。为此,请将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 设置为 true(默认值为 false)。然后确保提供 startOffset 值(earliestlatest)。当您执行此操作并启动消费者应用程序时,每次启动时都会像首次启动一样启动,并忽略分区中的任何已提交偏移量。

However, you can override the default behavior where the consumer starts from the last committed offset by using the resetOffsets property. In order to do that, set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets to true (which is false by default). Then make sure you provide the startOffset value (either earliest or latest). When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.

Seeking to arbitrary offsets in Kafka

Problem Statement

使用 Kafka binder,我知道它可以将偏移量设置为 earliestlatest,但我有一个要求,可以将偏移量设置为中间的某个值,也就是一个任意偏移量。是否可以通过 Spring Cloud Stream Kafka binder 实现此目的?

Using Kafka binder, I know that it can set the offset to either earliest or latest, but I have a requirement to seek the offset to something in the middle, an arbitrary offset. Is there a way to achieve this using Spring Cloud Stream Kafka binder?

Solution

之前我们已经了解了 Kafka binder 如何处理基本的偏移量管理。默认情况下,binder 不允许您回退到任意偏移量,至少通过我们在食谱中看到的机制不行。但是,binder 提供了一些低级别的策略来实现此用例。我们来探索一下这些策略。

Previously we saw how Kafka binder allows you to tackle basic offset management. By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that recipe. However, there are some low-level strategies that the binder provides to achieve this use case. Let’s explore them.

首先,当您想要重置到 earliestlatest 以外的任意偏移量时,确保将 resetOffsets 配置保留为其默认值,即 false。然后您必须提供一个`KafkaBindingRebalanceListener` 类型的自定义 bean,它将注入到所有消费者绑定中。它是一个带有一些默认方法的接口,但此处是我们感兴趣的方法:

First of all, when you want to reset to an arbitrary offset other than earliest or latest, make sure to leave the resetOffsets configuration to its defaults, which is false. Then you have to provide a custom bean of type KafkaBindingRebalanceListener, which will be injected into all consumer bindings. It is an interface that comes with a few default methods, but here is the method that we are interested in:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看具体情况。

Let us look at the details.

本质上,此方法将在对主题分区进行初始分配时或在重新平衡后每次调用。为了更好地说明,假设我们的主题是 foo,它有 4 个分区。最初,我们在组中只启动一个消费者,并且这个消费者将消费所有分区。当消费者第一次启动时,所有 4 个分区都将最初得到分配。然而,我们不想让分区从默认值开始消费(因为我们定义了一个组),相反,对于每个分区,我们希望它们在寻找到任意偏移量后开始消费。想象一下您有一个用例,要从下面的某些偏移量开始消费。

In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance. For better illustration, let us assume that our topic is foo and it has 4 partitions. Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions. When the consumer starts for the first time, all 4 partitions are getting initially assigned. However, we do not want to start the partitions to consume at the defaults (earliest since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets. Imagine that you have a business case to consume from certain offsets as below.

Partition   start offset

0           1000
1           2000
2           2000
3           1000

可以通过以下方式实现上述方法来实现此目的。

This could be achieved by implementing the above method as below.

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个基本的实现。现实世界的用例比这复杂得多,您需要相应地进行调整,但这绝对可以给您一个基本的概述。当消费者 seek 失败时,它可能会抛出一些运行时异常,您需要决定在这些情况下做什么。

This is just a rudimentary implementation. Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch. When consumer seek fails, it may throw some runtime exceptions and you need to decide what to do in those cases.

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们启动一个带有相同组 ID 的第二个使用者,会怎样?

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === What if we start a second consumer with the same group id?

当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。假设新消费者获得了分区 23。当这个新的 Spring Cloud Stream 消费者调用此 onPartitionsAssigned 方法时,它将看到这是此消费者上分区 23 的初始分配。因此,它将由于 initial 参数上的条件检查而执行 seek 操作。对于第一个消费者,它现在只有分区 01。然而,对于这个消费者来说,它仅仅是一个重新平衡事件,而不是被视为初始分配。因此,由于`initial` 参数上的条件检查,它不会重新 seek 到给定的偏移量。

When we add a second consumer, a rebalance will occur and some partitions will be moved around. Let’s say that the new consumer gets partitions 2 and 3. When this new Spring Cloud Stream consumer calls this onPartitionsAssigned method, it will see that this is the initial assignment for partition 2 and 3 on this consumer. Therefore, it will do the seek operation because of the conditional check on the initial argument. In the case of the first consumer, it now only has partitions 0 and 1 However, for this consumer it was simply a rebalance event and not considered as an intial assignment. Thus, it will not re-seek to the given offsets because of the conditional check on the initial argument.

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka 绑定程序进行手动确认?

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == How do I manually acknowledge using Kafka binder?

Problem Statement

使用 Kafka binder,我想手动确认消费者中的消息。我该怎么做?

Using Kafka binder, I want to manually acknowledge messages in my consumer. How do I do that?

Solution

默认情况下,Kafka 绑定器委派给了 Spring 中有关 Apache Kafka 项目的默认提交设置。Spring Kafka 中的默认 ackModebatch。有关该内容的更多详细信息,请参见 here

By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project. The default ackMode in Spring Kafka is batch. See here for more details on that.

在某些情况下,您希望禁用此默认提交行为并依靠手动提交。以下步骤允许您执行此操作。

There are situations in which you want to disable this default commit behavior and rely on manual commits. Following steps allow you to do that.

将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE。如此设置后,消费者方法收到的消息中将存在一个名为 kafka_acknowledgment(来自 KafkaHeaders.ACKNOWLEDGMENT)的头。

Set the property spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode to either MANUAL or MANUAL_IMMEDIATE. When it is set like that, then there will be a header called kafka_acknowledgment (from KafkaHeaders.ACKNOWLEDGMENT) present in the message received by the consumer method.

例如,想象这是您的消费者方法。

For example, imagine this as your consumer method.

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后将属性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE

Then you set the property spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode to MANUAL or MANUAL_IMMEDIATE.

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆盖 Spring Cloud Stream 中的默认绑定名称?

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == How do I override the default binding names in Spring Cloud Stream?

Problem Statement

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但如何将其重写为对域更友好的名称?

Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?

Solution

假设以下为您的函数签名。

Assume that following is your function signature.

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将如下创建绑定。

By default, Spring Cloud Stream will create the bindings as below.

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下属性将这些绑定重写为某些内容。

You can override these bindings to something by using the following properties.

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之后,所有绑定属性都必须基于新名称 my-transformer-inmy-transformer-out 进行。

After this, all binding properties must be made on the new names, my-transformer-in and my-transformer-out.

这里有另一个有关 Kafka 流和多个输入的示例。

Here is another example with Kafka Streams and multiple inputs.

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。

By default, Spring Cloud Stream will create three different binding names for this function.

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次要针对这些绑定设置某些配置时,都必须使用这些绑定名称。您不喜欢这样做,并且您想要使用对域更友好且更易读的绑定名称,例如类似于以下的内容。

You have to use these binding names each time you want to set some configuration on these bindings. You don’t like that, and you want to use more domain-friendly and readable binding names, for example, something like.

  1. orders

  2. accounts

  3. enrichedOrders

只需设置这三个属性就可以轻松实现此目的。

You can easily do that by simply setting these three properties

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

执行此操作后,它将重写默认绑定名称,您要对其设置的任何属性都必须基于这些新绑定名称。

Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何作为记录的部分内容发送一个消息密钥?

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == How do I send a message key as part of my record?

Problem Statement

我需要将键与记录的有效负载一起发送,是否可以在 Spring Cloud Stream 中执行此操作?

I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?

Solution

您经常需要将关联数据结构(如映射)作为带有键和值的数据记录发送。Spring Cloud Stream 允许您以直接的方式进行此操作。以下是对此操作的基本蓝图,但是您可能需要根据特定的用例对它进行调整。

It is often necessary that you want to send associative data structure like a map as the record with a key and value. Spring Cloud Stream allows you to do that in a straightforward manner. Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.

以下是示例生产者方法(也称为 Supplier)。

Here is sample producer method (aka Supplier).

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个发送带有 String 负载的消息的简单函数,但也带有键。请注意,我们使用 KafkaHeaders.MESSAGE_KEY 将键设置为消息头。

This is a trivial function that sends a message with a String payload, but also with a key. Note that we set the key as a message header using KafkaHeaders.MESSAGE_KEY.

如果您想将键从默认的 kafka_messageKey 更改,则在配置中,我们需要指定此属性:

If you want to change the key from the default kafka_messageKey, then in the configuration, we need to specify this property:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称 supplier-out-0,因为这是我们的函数名称,请相应更新。

Please note that we use the binding name supplier-out-0 since that is our function name, please update accordingly.

然后,我们会在生成消息时使用此新键。

Then, we use this new key when we produce the message.

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用本机序列化器和反序列化器,而不是使用 Spring Cloud Stream 完成的消息转换?

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?

Problem Statement

我想在 Kafka 中使用本机序列化器和反序列化器,而不是在 Spring Cloud Stream 中使用消息转换器。在默认情况下,Spring Cloud Stream 使用其内部内置的消息转换器来处理此转换。我该如何绕过此问题并将此责任委派给 Kafka?

Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka. By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters. How can I bypass this and delegate the responsibility to Kafka?

Solution

此操作非常容易。

This is really easy to do.

您所需要做的就是提供以下属性以启用本机序列化。

All you have to do is to provide the following property to enable native serialization.

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化器。有几种方法可以执行此操作。

Then, you need to also set the serializers. There are a couple of ways to do this.

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或使用绑定配置。

or using the binder configuration.

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用绑定方式时,它将应用于所有绑定,而将它们设置为绑定时则以每个绑定为单位。

When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.

在反序列化端,您只需要将反序列化器作为配置提供。

On the deserializing side, you just need to provide the deserializers as configuration.

例如,

For example,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您还可以在绑定级别设置它们。

You can also set them at the binder level.

您可以设置一个可选属性来强制进行本机解码。

There is an optional property that you can set to force native decoding.

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,对于 Kafka Binder,这是不必要的,因为当它到达 Binder 时,Kafka 已使用配置的反序列化器对其进行反序列化。

However, in the case of Kafka binder, this is unnecessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.

Explain how offset resetting work in Kafka Streams binder

Problem Statement

在默认情况下,Kafka Streams Binder 总是从新消费者的最早偏移量开始。有时,应用程序需要或有益于从最新偏移量开始。Kafka Streams Binder 允许您进行此操作。

By default, Kafka Streams binder always starts from the earliest offset for a new consumer. Sometimes, it is beneficial or required by the application to start from the latest offset. Kafka Streams binder allows you to do that.

Solution

在查看解决方案之前,我们先假设以下场景。

Before we look at the solution, let us look at the following scenario.

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个 BiConsumer bean,它需要两个输入绑定。在这种情况下,第一个绑定用于 KStream,而第二个绑定用于 KTable。首次运行此应用程序时,在默认情况下,两个绑定都从 earliest 偏移量开始。出于某些要求,我该如何从 latest 偏移量开始?您可以通过启用以下属性来进行此操作。

We have a BiConsumer bean that requires two input bindings. In this case, the first binding is for a KStream and the second one is for a KTable. When running this application for the first time, by default, both bindings start from the earliest offset. What about I want to start from the latest offset due to some requirements? You can do this by enabling the following properties.

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想让一个绑定从 latest 偏移量启动并消费另一个来自默认 earliest 的绑定,则从配置中保留后一个绑定。

If you want only one binding to start from the latest offset and the other to consumer from the default earliest, then leave the latter binding out from the configuration.

在有提交的偏移量后,请记住,不遵守这些设置,提交的偏移量具有优先权。

Keep in mind that, once there are committed offsets, these setting are not honored and the committed offsets take precedence.

Keeping track of successful sending of records (producing) in Kafka

Problem Statement

我有一个 Kafka 制作者应用程序,我想跟踪我所有成功的发送。

I have a Kafka producer application and I want to keep track of all my successful sendings.

Solution

假设我们在应用程序中提供了以下供应商。

Let us assume that we have this following supplier in the application.

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的 MessageChannel bean 以捕获所有成功的发送信息。

Then, we need to define a new MessageChannel bean to capture all the successful send information.

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性以提供 recordMetadataChannel 的 bean 名称。

Next, define this property in the application configuration to provide the bean name for the recordMetadataChannel.

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功发送的信息将被发送到 fooRecordChannel

At this point, successful sent information will be sent to the fooRecordChannel.

您可以编写一个 IntegrationFlow 如下所示以查看信息。

You can write an IntegrationFlow as below to see the information.

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,有效负载就是发送到 Kafka 的内容,消息头包含一个称为 kafka_recordMetadata 的特殊键。它的值是包含有关主题分区、当前偏移量等信息的 RecordMetadata

In the handle method, the payload is what got sent to Kafka and the message headers contain a special key called kafka_recordMetadata. Its value is a RecordMetadata that contains information about topic partition, current offset etc.

Adding custom header mapper in Kafka

Problem Statement

我有一个设置一些标头的 Kafka 制作者应用程序,但在消费者应用程序中却缺少这些标头。这是为什么?

I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?

Solution

在正常情况下,这应该是好的。

Under normal circumstances, this should be fine.

想象一下,您有以下制作者。

Imagine, you have the following producer.

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者方面,您仍然应该看到标头“foo”,以下内容应该不会给您带来任何问题。

On the consumer side, you should still see the header "foo", and the following should not give you any issues.

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供了 custom header mapper,那么这将不起作用。假设您在应用程序中有一个空 KafkaHeaderMapper

If you provide a custom header mapper in the application, then this won’t work. Let’s say you have an empty KafkaHeaderMapper in the application.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果那是您的实现,那么您将错过消费者上的 foo 标头。很有可能,您在那些 KafkaHeaderMapper 方法内有一些逻辑。您需要以下内容来填充 foo 标头。

If that is your implementation, then you will miss the foo header on the consumer. Chances are that, you may have some logic inside those KafkaHeaderMapper methods. You need the following to populate the foo header.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确地填充从制作者到消费者的 foo 标头。

That will properly populate the foo header from the producer to consumer.

Special note on the id header

在 Spring Cloud Stream 中, id 标头是一个特殊标头,但是某些应用程序可能需要有特殊的自定义 ID 标头 - 类似于 custom-idIDId。第一个(custom-id)将在没有任何自定义标头映射器的情况下从生成者传播到消费者。然而,如果您使用框架保留的 id 标头的变体进行生成,如 IDIdiD 等,那么您将遇到框架内部的问题。有关此用例的更多背景信息,请参见此 StackOverflow thread。在这种情况下,您必须使用一个自定义 KafkaHeaderMapper 来映射区分大小写的 ID 标头。例如,假设您有以下生成者。

In Spring Cloud Stream, the id header is a special header, but some applications may want to have special custom id headers - something like custom-id or ID or Id. The first one (custom-id) will propagate without any custom header mapper from producer to consumer. However, if you produce with a variant of the framework reserved id header - such as ID, Id, iD etc. then you will run into issues with the internals of the framework. See this StackOverflow thread fore more context on this use case. In that case, you must use a custom KafkaHeaderMapper to map the case-sensitive id header. For example, let’s say you have the following producer.

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的标头 Id 将从消费方消失,因为它与框架 id 标头冲突。您可以提供一个自定义的 KafkaHeaderMapper 来解决此问题。

The header Id above will be gone from the consuming side as it clashes with the framework id header. You can provide a custom KafkaHeaderMapper to solve this issue.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过这样做,idId 标头都将从制作者提供到消费者一方。

By doing this, both id and Id headers will be available from the producer to the consumer side.

Producing to multiple topics in transaction

Problem Statement

我如何制作交易消息到多个 Kafka 主题?

How do I produce transactional messages to multiple Kafka topics?

有关更多背景信息,请参见此 StackOverflow question

For more context, see this StackOverflow question.

Solution

在 Kafka binder 中使用事务支持进行事务,然后提供一个 AfterRollbackProcessor。为了生成多个主题,请使用 StreamBridge API。

Use transactional support in Kafka binder for transactions and then provide an AfterRollbackProcessor. In order to produce to multiple topics, use StreamBridge API.

以下是此代码的代码片段:

Below are the code snippets for this:

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

Required Configuration

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了进行测试,你可以使用以下内容:

in order to test, you can use the following:

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要的注意事项:

Some important notes:

请确保应用程序配置中没有 DLQ 设置,因为我们手动配置了 DLT(默认情况下,它会发布到一个名为 input.DLT 的主题,该主题基于初始消费者函数)。另外,将消费者绑定上的 maxAttempts 重置为 1,以避免 binder 重试。在上述示例中,它将最多尝试 3 次(初始尝试 + FixedBackoff 中的 2 次尝试)。

Please ensure that you don’t have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named input.DLT based on the initial consumer function). Also, reset the maxAttempts on consumer binding to 1 in order to avoid retries by the binder. It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the FixedBackoff).

有关如何测试此代码的更多详细信息,请参见 StackOverflow thread。如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请务必将消费者绑定上的 isolation-level 设置为 read-committed

See the StackOverflow thread for more details on how to test this code. If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the isolation-level on the consumer binding to read-committed.

StackOverflow thread 也与此讨论相关。

This StackOverflow thread is also related to this discussion.

Pitfalls to avoid when running multiple pollable consumers

Problem Statement

如何运行可轮询消费者的多个实例并为每个实例生成唯一的 client.id

How can I run multiple instances of the pollable consumers and generate unique client.id for each instance?

Solution

假设我有以下定义:

Assuming that I have the following definition:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

在运行应用程序时,Kafka 消费者生成 client.id(类似于 consumer-my-group-1)。对于正在运行的应用程序的每个实例,此 client.id 将相同,从而导致意外问题。

When running the application, the Kafka consumer generates a client.id (something like consumer-my-group-1). For each instance of the application that is running, this client.id will be the same, causing unexpected issues.

为了解决此问题,你可以在应用程序的每个实例上添加以下属性:

In order to fix this, you can add the following property on each instance of the application:

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有关更多详细信息,请参见此 GitHub issue

See this GitHub issue for more details.