Forwarding Listener Results using @SendTo

从 2.0 版开始,如果你还使用 @SendTo 注释对 @KafkaListener 进行了注释,并且方法调用返回了一个结果,那么该结果将转发到 @SendTo 指定的主题。

Starting with version 2.0, if you also annotate a @KafkaListener with a @SendTo annotation and the method invocation returns a result, the result is forwarded to the topic specified by the @SendTo.

@SendTo 值可以具有以下几种形式:

The @SendTo value can have several forms:

  • @SendTo("someTopic") routes to the literal topic.

  • @SendTo("#{someExpression}") routes to the topic determined by evaluating the expression once during application context initialization.

  • @SendTo("!{someExpression}") routes to the topic determined by evaluating the expression at runtime. The #root object for the evaluation has three properties:

    • request: The inbound ConsumerRecord (or ConsumerRecords object for a batch listener).

    • source: The org.springframework.messaging.Message<?> converted from the request.

    • result: The method return result.

  • @SendTo (no properties): This is treated as !{source.headers['kafka_replyTopic']} (since version 2.1.3).

从 2.1.11 和 2.2.1 版开始,属性占位符将在 @SendTo 值中得到解析。

Starting with versions 2.1.11 and 2.2.1, property placeholders are resolved within @SendTo values.

表达式求值的结果必须是一个表示主题名称的 String。以下示例显示了使用 @SendTo 的各种方法:

The result of the expression evaluation must be a String that represents the topic name. The following examples show the various ways to use @SendTo:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

为了支持 @SendTo,必须以 replyTemplate 属性的形式向侦听器容器工厂提供 KafkaTemplate,所述 KafkaTemplate 用于发送回复。这应当是 KafkaTemplate,而不是 ReplyingKafkaTemplate,后者用于请求/回复处理中的客户端。在使用 Spring Boot 时,它会将模板自动配置到工厂中;在配置您自己的工厂时,必须如以下示例中所示进行设置。

In order to support @SendTo, the listener container factory must be provided with a KafkaTemplate (in its replyTemplate property), which is used to send the reply. This should be a KafkaTemplate and not a ReplyingKafkaTemplate which is used on the client-side for request/reply processing. When using Spring Boot, it will auto-configure the template into the factory; when configuring your own factory, it must be set as shown in the examples below.

从 2.2 版开始,你可以在侦听器容器工厂中添加一个 ReplyHeadersConfigurer。对此进行查询以确定要在回复消息中设置哪些头。以下示例显示了如何添加一个 ReplyHeadersConfigurer

Starting with version 2.2, you can add a ReplyHeadersConfigurer to the listener container factory. This is consulted to determine which headers you want to set in the reply message. The following example shows how to add a ReplyHeadersConfigurer:

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

你还可以按需添加更多头。以下示例显示了如何执行此操作:

You can also add more headers if you wish. The following example shows how to do so:

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

当你使用 @SendTo 时,你必须使用其 replyTemplate 属性中的 KafkaTemplate 来配置 ConcurrentKafkaListenerContainerFactory 以执行发送。Spring Boot 将自动连接其自动配置的模板(或任何存在单个实例的模板)。

When you use @SendTo, you must configure the ConcurrentKafkaListenerContainerFactory with a KafkaTemplate in its replyTemplate property to perform the send. Spring Boot will automatically wire in its auto-configured template (or any if a single instance is present).

除非你使用 request/reply semantics,否则只使用简单的 send(topic, value) 方法,因此你可能希望创建一个子类来生成分区或密钥。下列示例展示如何进行操作:

Unless you use request/reply semantics, only the simple send(topic, value) method is used, so you may wish to create a subclass to generate the partition or key. The following example shows how to do so:

@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果侦听器方法返回 Message<?>Collection<Message<?>>,则侦听器方法负责为回复设置消息头。例如,在处理来自 ReplyingKafkaTemplate 的请求时,你可以执行以下操作:

If the listener method returns Message<?> or Collection<Message<?>>, the listener method is responsible for setting up the message headers for the reply. For example, when handling a request from a ReplyingKafkaTemplate, you might do the following:

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

在使用请求/回复语义时,目标分区可以由发送者请求。

When using request/reply semantics, the target partition can be requested by the sender.

即使没有返回结果,你也可以使用 @SendTo 来注释一个 @KafkaListener 方法。这是为了允许配置一个 errorHandler,该 errorHandler 可以将有关失败消息传递的信息转发到某些主题。以下示例显示了如何执行此操作:

You can annotate a @KafkaListener method with @SendTo even if no result is returned. This is to allow the configuration of an errorHandler that can forward information about a failed message delivery to some topic. The following example shows how to do so:

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

请参阅 Handling Exceptions 了解更多信息。

See Handling Exceptions for more information.

如果侦听器方法返回 Iterable,则默认情况下,为作为值存在于每个元素的记录发送一条记录。从版本 2.3.5 开始,将 splitIterables 属性在 @KafkaListener 上设为 false,整个结果将作为单个 ProducerRecord 的值发送。这需要在回复模板的生产者配置中有一个合适的序列化器。然而,如果回复是 Iterable<Message<?>>,则忽略该属性,并分别发送每条消息。

If a listener method returns an Iterable, by default a record for each element as the value is sent. Starting with version 2.3.5, set the splitIterables property on @KafkaListener to false and the entire result will be sent as the value of a single ProducerRecord. This requires a suitable serializer in the reply template’s producer configuration. However, if the reply is Iterable<Message<?>> the property is ignored and each message is sent separately.