Tips, Tricks and Recipes
Simple DLQ with Kafka
Problem Statement
作为一名开发者,我想编写一个从 Kafka 主题处理记录的使用者应用程序。但是,如果在处理过程中出现错误,我不想让应用程序完全停止。相反,我想把错误记录发送到 DLT(死信主题),然后继续处理新记录。
Solution
此问题的解决方案是在 Spring Cloud Stream 中使用 DLQ 功能。为了对此展开讨论,让我们假设以下为我们的处理器函数。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常简单的函数,针对其处理的所有记录抛出一个异常,但你可以采用此函数并将其扩展到任何其他类似情况。
为了把错误记录发送到 DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活 DLQ,应用程序必须提供一个组名。匿名使用者无法使用 DLQ 设施。我们还需要通过将 Kafka 使用者绑定上的 enableDLQ
属性设置为“true”来启用 DLQ。最后,我们可选择通过在 Kafka 使用者绑定上提供 dlqName
来提供 DLT 名称,否则在这种情况下,其默认为 error.input-topic.my-group
。
请注意,在上述示例使用者中,主题的类型为 byte[]
。默认情况下,Kafka 绑定程序的 DLQ 生产者预期主题的类型为 byte[]
。如果没有满足此条件,则我们需要提供适当序列化器的配置。例如,我们重新编写使用者函数,如下所示:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,希望如何在写入 DLT 时对数据进行序列化。下面是针对此场景修改的配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
DLQ with Advanced Retry Options
Solution
如果您按照上面的配方进行操作,那么当处理遇到错误时,您将获得内置到 Kafka binder 中的默认重试选项。
默认情况下,binder 最多重试 3 次,首次延迟一秒,每次递减 2.0,最大延迟 10 秒。您可以如下更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果您愿意,还可以通过提供布尔值映射来提供可重试异常列表。例如,
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,上述映射中未列出的任何异常都将被重试。如果不需要这样做,则可以通过提供以下内容来禁用它:
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您还可以提供自己的`RetryTemplate`,并将其标记为`@StreamRetryTemplate`,该标记将由 binder 扫描并使用。当您需要更复杂的重试策略时,这样做非常有用。
如果您有多个`@StreamRetryTemplate` bean,则可以使用`property`指定绑定所需的 bean,
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
Handling Deserialization errors with DLQ
Solution
当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的普通 DLQ 机制帮不了忙。这是因为此异常在消费者的`poll` () 方法返回之前就会发生。Spring for Apache Kafka 项目提供了一些很好的方法来帮助 binder 处理这种情况。我们来探索一下这些方法。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个接受`String`参数的简单函数。
我们要绕过 Spring Cloud Stream 提供的消息转换器,想要改为使用本机反序列化器。在`String`类型的情况下,这样做没有多大意义,但如果是更复杂类型(如 AVRO 等),则必须依赖外部反序列化器,因此要将转换委托给 Kafka。
现在当消费者接收到数据时,让我们假设有一条有缺陷的记录导致反序列化错误,例如某人传递了一个`Integer`而不是`String`。在这种情况下,如果您在应用程序中不执行任何操作,该异常将通过链传播,并且最终您的应用程序将退出。
为了处理这种情况,您可以添加一个`ListenerContainerCustomizer` @Bean`来配置`DefaultErrorHandler
。此`DefaultErrorHandler`使用`DeadLetterPublishingRecoverer`进行配置。我们还需要为消费者配置一个`ErrorHandlingDeserializer`。这听起来像是很多复杂的事情,但实际上,在这种情况下,它归结为这 3 个 bean。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们分析每个函数。第一个函数是 ListenerContainerCustomizer
bean,它接收一个 DefaultErrorHandler
。现在,容器通过特定的错误处理程序进行了自定义。你可以了解更多有关容器自定义的信息 here。
第二个 bean 是 DefaultErrorHandler
,该 bean 配置为发布到 DLT
。参阅 here 了解更多有关 DefaultErrorHandler
的详细信息。
第三个 bean 是 DeadLetterPublishingRecoverer
,它最终负责发送到 DLT
。默认情况下,DLT
主题被命名为 ORIGINAL_TOPIC_NAME.DLT。不过,你可以对它进行更改。参阅 docs 了解更多详细信息。
我们还需要通过应用程序配置对“错误处理反序列化器”进行配置。
`ErrorHandlingDeserializer`委托给实际的反序列化器。在出现错误的情况下,它将记录的键/值设置为 null,并包含消息的原始字节。然后它在 header 中设置异常,并将此记录传递给侦听器,侦听器随后调用注册的错误处理程序。
以下是必需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们通过绑定上的 configuration
属性提供 ErrorHandlingDeserializer
。我们还指示实际需要委派的是 StringDeserializer
。
请记住,以上 dlq 属性都不与本食谱中的讨论相关。它们的目的是只处理任何应用程序级别错误。
Basic offset management in Kafka binder
Solution
我们鼓励您阅读 docs 部分,以彻底了解它。
以下是一个要点:
默认情况下,Kafka 支持两种类型的偏移量来开始,它们是 earliest
和 latest
。它们的语义从名字中不言自明。
假设您是首次运行消费者。如果您错过了 Spring Cloud Stream 应用程序中的 group.id,那么它就变成匿名消费者。每当您有一个匿名消费者时,在这种情况下,Spring Cloud Stream应用程序默认会从主题分区中可用的 latest
偏移量开始。另一方面,如果您明确指定了一个 group.id,那么默认情况下,Spring Cloud Stream 应用程序将从主题分区中可用的 earliest
偏移量开始。
在以上两种情况下(带显式组的消费者和匿名组),都可以通过使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
并将其设置为 earliest
或 latest
来切换开始偏移量。
现在,假设您以前已经运行过消费者,现在重新启动它。在这种情况下,上述情况中的开始偏移量语义不适用,因为消费者会找到消费者组的已提交偏移量(在匿名消费者的情况下,尽管应用程序没有提供 group.id,但绑定器会自动为您生成一个)。它只是从最后提交的偏移量开始。即便您提供了 startOffset
值,但这仍然成立。
但是,您可以通过使用`resetOffsets` 属性来覆盖消费者从最后提交的偏移量开始的默认行为。为此,请将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
设置为 true
(默认值为 false
)。然后确保提供 startOffset
值(earliest
或 latest
)。当您执行此操作并启动消费者应用程序时,每次启动时都会像首次启动一样启动,并忽略分区中的任何已提交偏移量。
Seeking to arbitrary offsets in Kafka
Problem Statement
使用 Kafka binder,我知道它可以将偏移量设置为 earliest
或 latest
,但我有一个要求,可以将偏移量设置为中间的某个值,也就是一个任意偏移量。是否可以通过 Spring Cloud Stream Kafka binder 实现此目的?
Solution
之前我们已经了解了 Kafka binder 如何处理基本的偏移量管理。默认情况下,binder 不允许您回退到任意偏移量,至少通过我们在食谱中看到的机制不行。但是,binder 提供了一些低级别的策略来实现此用例。我们来探索一下这些策略。
首先,当您想要重置到 earliest
或 latest
以外的任意偏移量时,确保将 resetOffsets
配置保留为其默认值,即 false
。然后您必须提供一个`KafkaBindingRebalanceListener` 类型的自定义 bean,它将注入到所有消费者绑定中。它是一个带有一些默认方法的接口,但此处是我们感兴趣的方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们看看具体情况。
本质上,此方法将在对主题分区进行初始分配时或在重新平衡后每次调用。为了更好地说明,假设我们的主题是 foo
,它有 4 个分区。最初,我们在组中只启动一个消费者,并且这个消费者将消费所有分区。当消费者第一次启动时,所有 4 个分区都将最初得到分配。然而,我们不想让分区从默认值开始消费(因为我们定义了一个组),相反,对于每个分区,我们希望它们在寻找到任意偏移量后开始消费。想象一下您有一个用例,要从下面的某些偏移量开始消费。
Partition start offset
0 1000
1 2000
2 2000
3 1000
可以通过以下方式实现上述方法来实现此目的。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个基本的实现。现实世界的用例比这复杂得多,您需要相应地进行调整,但这绝对可以给您一个基本的概述。当消费者 seek
失败时,它可能会抛出一些运行时异常,您需要决定在这些情况下做什么。
[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们启动一个带有相同组 ID 的第二个使用者,会怎样?
当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。假设新消费者获得了分区 2
和 3
。当这个新的 Spring Cloud Stream 消费者调用此 onPartitionsAssigned
方法时,它将看到这是此消费者上分区 2
和 3
的初始分配。因此,它将由于 initial
参数上的条件检查而执行 seek 操作。对于第一个消费者,它现在只有分区 0
和 1
。然而,对于这个消费者来说,它仅仅是一个重新平衡事件,而不是被视为初始分配。因此,由于`initial` 参数上的条件检查,它不会重新 seek 到给定的偏移量。
[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka 绑定程序进行手动确认?
Solution
默认情况下,Kafka 绑定器委派给了 Spring 中有关 Apache Kafka 项目的默认提交设置。Spring Kafka 中的默认 ackMode
是 batch
。有关该内容的更多详细信息,请参见 here。
在某些情况下,您希望禁用此默认提交行为并依靠手动提交。以下步骤允许您执行此操作。
将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
设置为 MANUAL
或 MANUAL_IMMEDIATE
。如此设置后,消费者方法收到的消息中将存在一个名为 kafka_acknowledgment
(来自 KafkaHeaders.ACKNOWLEDGMENT
)的头。
例如,想象这是您的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后将属性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
设置为 MANUAL
或 MANUAL_IMMEDIATE
。
[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆盖 Spring Cloud Stream 中的默认绑定名称?
Solution
假设以下为您的函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream 将如下创建绑定。
-
uppercase-in-0
-
uppercase-out-0
您可以使用以下属性将这些绑定重写为某些内容。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
在此之后,所有绑定属性都必须基于新名称 my-transformer-in
和 my-transformer-out
进行。
这里有另一个有关 Kafka 流和多个输入的示例。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次要针对这些绑定设置某些配置时,都必须使用这些绑定名称。您不喜欢这样做,并且您想要使用对域更友好且更易读的绑定名称,例如类似于以下的内容。
-
orders
-
accounts
-
enrichedOrders
只需设置这三个属性就可以轻松实现此目的。
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
执行此操作后,它将重写默认绑定名称,您要对其设置的任何属性都必须基于这些新绑定名称。
[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何作为记录的部分内容发送一个消息密钥?
Solution
您经常需要将关联数据结构(如映射)作为带有键和值的数据记录发送。Spring Cloud Stream 允许您以直接的方式进行此操作。以下是对此操作的基本蓝图,但是您可能需要根据特定的用例对它进行调整。
以下是示例生产者方法(也称为 Supplier
)。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个发送带有 String
负载的消息的简单函数,但也带有键。请注意,我们使用 KafkaHeaders.MESSAGE_KEY
将键设置为消息头。
如果您想将键从默认的 kafka_messageKey
更改,则在配置中,我们需要指定此属性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称 supplier-out-0
,因为这是我们的函数名称,请相应更新。
然后,我们会在生成消息时使用此新键。
[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用本机序列化器和反序列化器,而不是使用 Spring Cloud Stream 完成的消息转换?
Problem Statement
我想在 Kafka 中使用本机序列化器和反序列化器,而不是在 Spring Cloud Stream 中使用消息转换器。在默认情况下,Spring Cloud Stream 使用其内部内置的消息转换器来处理此转换。我该如何绕过此问题并将此责任委派给 Kafka?
Solution
此操作非常容易。
您所需要做的就是提供以下属性以启用本机序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,您还需要设置序列化器。有几种方法可以执行此操作。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或使用绑定配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用绑定方式时,它将应用于所有绑定,而将它们设置为绑定时则以每个绑定为单位。
在反序列化端,您只需要将反序列化器作为配置提供。
例如,
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您还可以在绑定级别设置它们。
您可以设置一个可选属性来强制进行本机解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
但是,对于 Kafka Binder,这是不必要的,因为当它到达 Binder 时,Kafka 已使用配置的反序列化器对其进行反序列化。
Explain how offset resetting work in Kafka Streams binder
Problem Statement
在默认情况下,Kafka Streams Binder 总是从新消费者的最早偏移量开始。有时,应用程序需要或有益于从最新偏移量开始。Kafka Streams Binder 允许您进行此操作。
Solution
在查看解决方案之前,我们先假设以下场景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个 BiConsumer
bean,它需要两个输入绑定。在这种情况下,第一个绑定用于 KStream
,而第二个绑定用于 KTable
。首次运行此应用程序时,在默认情况下,两个绑定都从 earliest
偏移量开始。出于某些要求,我该如何从 latest
偏移量开始?您可以通过启用以下属性来进行此操作。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只想让一个绑定从 latest
偏移量启动并消费另一个来自默认 earliest
的绑定,则从配置中保留后一个绑定。
在有提交的偏移量后,请记住,不遵守这些设置,提交的偏移量具有优先权。
Keeping track of successful sending of records (producing) in Kafka
Solution
假设我们在应用程序中提供了以下供应商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的 MessageChannel
bean 以捕获所有成功的发送信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用程序配置中定义此属性以提供 recordMetadataChannel
的 bean 名称。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功发送的信息将被发送到 fooRecordChannel
。
您可以编写一个 IntegrationFlow
如下所示以查看信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在 handle
方法中,有效负载就是发送到 Kafka 的内容,消息头包含一个称为 kafka_recordMetadata
的特殊键。它的值是包含有关主题分区、当前偏移量等信息的 RecordMetadata
。
Adding custom header mapper in Kafka
Solution
在正常情况下,这应该是好的。
想象一下,您有以下制作者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者方面,您仍然应该看到标头“foo”,以下内容应该不会给您带来任何问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在应用程序中提供了 custom header mapper,那么这将不起作用。假设您在应用程序中有一个空 KafkaHeaderMapper
。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果那是您的实现,那么您将错过消费者上的 foo
标头。很有可能,您在那些 KafkaHeaderMapper
方法内有一些逻辑。您需要以下内容来填充 foo
标头。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这将正确地填充从制作者到消费者的 foo
标头。
Special note on the id header
在 Spring Cloud Stream 中, id
标头是一个特殊标头,但是某些应用程序可能需要有特殊的自定义 ID 标头 - 类似于 custom-id
或 ID
或 Id
。第一个(custom-id
)将在没有任何自定义标头映射器的情况下从生成者传播到消费者。然而,如果您使用框架保留的 id
标头的变体进行生成,如 ID
、Id
、iD
等,那么您将遇到框架内部的问题。有关此用例的更多背景信息,请参见此 StackOverflow thread。在这种情况下,您必须使用一个自定义 KafkaHeaderMapper
来映射区分大小写的 ID 标头。例如,假设您有以下生成者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上面的标头 Id
将从消费方消失,因为它与框架 id
标头冲突。您可以提供一个自定义的 KafkaHeaderMapper
来解决此问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
通过这样做,id
和 Id
标头都将从制作者提供到消费者一方。
Producing to multiple topics in transaction
Solution
在 Kafka binder 中使用事务支持进行事务,然后提供一个 AfterRollbackProcessor
。为了生成多个主题,请使用 StreamBridge
API。
以下是此代码的代码片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
Required Configuration
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了进行测试,你可以使用以下内容:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要的注意事项:
请确保应用程序配置中没有 DLQ 设置,因为我们手动配置了 DLT(默认情况下,它会发布到一个名为 input.DLT
的主题,该主题基于初始消费者函数)。另外,将消费者绑定上的 maxAttempts
重置为 1
,以避免 binder 重试。在上述示例中,它将最多尝试 3 次(初始尝试 + FixedBackoff
中的 2 次尝试)。
有关如何测试此代码的更多详细信息,请参见 StackOverflow thread。如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请务必将消费者绑定上的 isolation-level
设置为 read-committed
。
此 StackOverflow thread 也与此讨论相关。
Pitfalls to avoid when running multiple pollable consumers
Solution
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
在运行应用程序时,Kafka 消费者生成 client.id
(类似于 consumer-my-group-1
)。对于正在运行的应用程序的每个实例,此 client.id
将相同,从而导致意外问题。
为了解决此问题,你可以在应用程序的每个实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有关更多详细信息,请参见此 GitHub issue。