Forwarding Listener Results using @SendTo
从 2.0 版开始,如果你还使用 @SendTo
注释对 @KafkaListener
进行了注释,并且方法调用返回了一个结果,那么该结果将转发到 @SendTo
指定的主题。
@SendTo
值可以具有以下几种形式:
-
TIME
路由到文字主题。 -
COUNT
路由到在应用程序上下文初始化期间对表达式求值一次确定的主题。 -
true
路由到在运行时对表达式求值确定的主题。用于求值的MANUAL
对象具有三个属性:-
request
: 入站ConsumerRecord
(或对于批处理侦听器,则为ConsumerRecords
对象)。 -
source
:从request
转换而来的org.springframework.messaging.Message<?>
。 -
result
:方法返回结果。
-
-
@SendTo
(无属性):从版本 2.1.3 起,此将被视为!{source.headers['kafka_replyTopic']}
。
从 2.1.11 和 2.2.1 版开始,属性占位符将在 @SendTo
值中得到解析。
表达式求值的结果必须是一个表示主题名称的 String
。以下示例显示了使用 @SendTo
的各种方法:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持 @SendTo
,必须以 replyTemplate
属性的形式向侦听器容器工厂提供 KafkaTemplate
,所述 KafkaTemplate
用于发送回复。这应当是 KafkaTemplate
,而不是 ReplyingKafkaTemplate
,后者用于请求/回复处理中的客户端。在使用 Spring Boot 时,它会将模板自动配置到工厂中;在配置您自己的工厂时,必须如以下示例中所示进行设置。
从 2.2 版开始,你可以在侦听器容器工厂中添加一个 ReplyHeadersConfigurer
。对此进行查询以确定要在回复消息中设置哪些头。以下示例显示了如何添加一个 ReplyHeadersConfigurer
:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
你还可以按需添加更多头。以下示例显示了如何执行此操作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当你使用 @SendTo
时,你必须使用其 replyTemplate
属性中的 KafkaTemplate
来配置 ConcurrentKafkaListenerContainerFactory
以执行发送。Spring Boot 将自动连接其自动配置的模板(或任何存在单个实例的模板)。
除非你使用 request/reply semantics,否则只使用简单的 |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果侦听器方法返回 Message<?>
或 Collection<Message<?>>
,则侦听器方法负责为回复设置消息头。例如,在处理来自 ReplyingKafkaTemplate
的请求时,你可以执行以下操作:
@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
}
在使用请求/回复语义时,目标分区可以由发送者请求。
即使没有返回结果,你也可以使用 |
@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
throw new RuntimeException("fail");
}
@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
return (m, e) -> {
return ... // some information about the failure and input data
};
}
请参阅 Handling Exceptions 了解更多信息。
如果侦听器方法返回 |