Sending Messages

本节介绍如何发送邮件。

This section covers how to send messages.

Using KafkaTemplate

本节介绍如何使用 KafkaTemplate 发送邮件。

This section covers how to use KafkaTemplate to send messages.

Overview

KafkaTemplate 封装一个生产者,并提供了向 Kafka 主题发送数据的方法。以下列表显示了 KafkaTemplate 中的相关方法:

The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. The following listing shows the relevant methods from KafkaTemplate:

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

有关详情,请参见“ Javadoc”。

See the Javadoc for more detail.

在 3.0 版本中,先前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了方便迁移,2.9 版本添加了提供具有 CompletableFuture 返回类型的方法 usingCompletableFuture() 的方法;此方法不再可用。

In version 3.0, the methods that previously returned ListenableFuture have been changed to return CompletableFuture. To facilitate the migration, the 2.9 version added a method usingCompletableFuture() which provided the same methods with CompletableFuture return types; this method is no longer available.

sendDefault API 要求为模板提供一个默认主题。

The sendDefault API requires that a default topic has been provided to the template.

该 API 将 timestamp 作为参数接收,并将此时间戳存储在记录中。用户提供的 timestamp 存储方式取决于 Kafka 主题中配置的时间戳类型。如果主题配置为使用 CREATE_TIME,则会记录(或在未指定情况下生成)用户指定的时间戳。如果主题配置为使用 LOG_APPEND_TIME,则忽略用户指定的时间戳,而代理会添加本地代理时间。

The API takes in a timestamp as a parameter and stores this timestamp in the record. How the user-provided timestamp is stored depends on the timestamp type configured on the Kafka topic. If the topic is configured to use CREATE_TIME, the user-specified timestamp is recorded (or generated if not specified). If the topic is configured to use LOG_APPEND_TIME, the user-specified timestamp is ignored and the broker adds in the local broker time.

metrics”和“partitionsFor”方法委派给底层“ Producer”的相同方法。“execute”方法提供对底层“ Producer”的直接访问。

The metrics and partitionsFor methods delegate to the same methods on the underlying Producer. The execute method provides direct access to the underlying Producer.

要使用此模板,可以配置生成器工厂,并将其提供给模板的构造函数。以下示例展示了如何操作:

To use the template, you can configure a producer factory and provide it in the template’s constructor. The following example shows how to do so:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从版本 2.5 开始,您现在可以覆盖工厂的 ProducerConfig 属性,以便使用相同工厂的不同生成器配置创建模板。

Starting with version 2.5, you can now override the factory’s ProducerConfig properties to create templates with different producer configurations from the same factory.

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,可以通过不同的缩小泛型类型来引用类型为 ProducerFactory<?, ?> 的 bean(例如,Spring Boot 自动配置的 bean)。

Note that a bean of type ProducerFactory<?, ?> (such as the one auto-configured by Spring Boot) can be referenced with different narrowed generic types.

您还可以使用标准 <bean/> 定义配置模板。

You can also configure the template by using standard <bean/> definitions.

然后,要使用模板,可以调用其某个方法。

Then, to use the template, you can invoke one of its methods.

使用带 Message<?> 参数的方法时,会在消息头中提供主题、分区、键和时间戳信息,其中包括以下项:

When you use the methods with a Message<?> parameter, the topic, partition, key and timestamp information is provided in a message header that includes the following items:

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

消息负载是数据。

The message payload is the data.

另外,您可以使用 ProducerListener 配置 KafkaTemplate,以在发送结果(成功或失败)时获取带有回调的异步信息,而不是等待 Future 完成。以下列表显示了 ProducerListener 接口的定义:

Optionally, you can configure the KafkaTemplate with a ProducerListener to get an asynchronous callback with the results of the send (success or failure) instead of waiting for the Future to complete. The following listing shows the definition of the ProducerListener interface:

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下,模板使用 LoggingProducerListener 进行配置,该侦听器记录错误,并在发送成功时不执行任何操作。

By default, the template is configured with a LoggingProducerListener, which logs errors and does nothing when the send is successful.

为了方便起见,如果您只想实现其中一个方法,则会提供默认的方法实现。

For convenience, default method implementations are provided in case you want to implement only one of the methods.

请注意,send 方法返回 CompletableFuture<SendResult>。您可以使用侦听器注册一个回调,以异步接收发送结果。以下示例展示了如何操作:

Notice that the send methods return a CompletableFuture<SendResult>. You can register a callback with the listener to receive the result of the send asynchronously. The following example shows how to do so:

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult 有两个属性,分别是 ProducerRecord`和`RecordMetadata。有关这些对象的更多信息,请参阅 Kafka API 文档。

SendResult has two properties, a ProducerRecord and RecordMetadata. See the Kafka API documentation for information about those objects.

Throwable 可以强制转换为 KafkaProducerException;它的 failedProducerRecord 属性包含失败记录。

The Throwable can be cast to a KafkaProducerException; its failedProducerRecord property contains the failed record.

如果您希望阻塞发送线程以等待结果,则可以调用 future 的 get() 方法;建议使用带超时的方法。如果您设置了 linger.ms,则可能希望在等待之前调用 flush(),或者为了方便,模板有一个带有 autoFlush 参数的构造函数,可导致模板在每次发送时进行 flush()。仅当您设置了 linger.ms 生成器属性,并且希望立即发送部分批次时,才需要进行刷新。

If you wish to block the sending thread to await the result, you can invoke the future’s get() method; using the method with a timeout is recommended. If you have set a linger.ms, you may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter that causes the template to flush() on each send. Flushing is only needed if you have set the linger.ms producer property and want to immediately send a partial batch.

Examples

本节展示将消息发送到 Kafka 的示例:

This section shows examples of sending messages to Kafka:

Example 1. Non Blocking (Async)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,ExecutionException 的原因是带有 failedProducerRecord 属性的 KafkaProducerException

Note that the cause of the ExecutionException is KafkaProducerException with the failedProducerRecord property.

Using RoutingKafkaTemplate

从版本 2.5 开始,你可以使用 RoutingKafkaTemplate 来根据目标 topic 名称在运行时选择生成程序。

Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name.

路由模板不支持 not 事务、executeflushmetrics 操作,因为主题对这些操作未知。

The routing template does not support transactions, execute, flush, or metrics operations because the topic is not known for those operations.

该模板需要一个 java.util.regex.PatternProducerFactory<Object, Object> 实例的映射。该映射应该是按顺序排列的(例如 LinkedHashMap),因为它按顺序遍历;你应当在开头添加更具体的模式。

The template requires a map of java.util.regex.Pattern to ProducerFactory<Object, Object> instances. This map should be ordered (e.g. a LinkedHashMap) because it is traversed in order; you should add more specific patterns at the beginning.

以下简单的 Spring Boot 应用程序提供了一个使用同一模板发送到不同主题的示例,每个主题都使用不同的值序列化器。

The following simple Spring Boot application provides an example of how to use the same template to send to different topics, each using a different value serializer.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此示例的相应“@KafkaListener”显示在“Annotation Properties”中。

The corresponding `@KafkaListener`s for this example are shown in Annotation Properties.

要了解达成类似结果的另一种技术,但该技术拥有额外的能力(将不同类型发送至同一主题),请参阅“Delegating Serializer and Deserializer”。

For another technique to achieve similar results, but with the additional capability of sending different types to the same topic, see Delegating Serializer and Deserializer.

Using DefaultKafkaProducerFactory

如“Using KafkaTemplate”中所示,使用“ProducerFactory”创建生成器。

As seen in Using KafkaTemplate, a ProducerFactory is used to create the producer.

当不使用Transactions时,默认情况下,DefaultKafkaProducerFactory`会创建一个单例制作者,由所有客户端使用,如在`KafkaProducer`JavaDocs中推荐的那样。但是,如果您在模板上调用`flush(),这可能会导致使用同一制作者的其他线程延迟。从2.3版本开始,DefaultKafkaProducerFactory`具有一个新属性`producerPerThread。当设置为`true`时,工厂将为每个线程创建(并缓存)一个单独的制作者,以避免此问题。

When not using Transactions, by default, the DefaultKafkaProducerFactory creates a singleton producer used by all clients, as recommended in the KafkaProducer JavaDocs. However, if you call flush() on the template, this can cause delays for other threads using the same producer. Starting with version 2.3, the DefaultKafkaProducerFactory has a new property producerPerThread. When set to true, the factory will create (and cache) a separate producer for each thread, to avoid this issue.

producerPerThreadtrue 时,当不再需要生产者时,用户代码 must 调用工厂上的 closeThreadBoundProducer()。这将物理关闭生产者并将其从 ThreadLocal 中移除。调用 reset()destroy() 不会清除这些生产者。

When producerPerThread is true, user code must call closeThreadBoundProducer() on the factory when the producer is no longer needed. This will physically close the producer and remove it from the ThreadLocal. Calling reset() or destroy() will not clean up these producers.

在创建 DefaultKafkaProducerFactory 时,可以通过调用仅包含属性映射的构造函数,从配置中获取键和/或值 Serializer 类(请参见 Using KafkaTemplate 中的示例),或者可以使用 Serializer 实例传递给 DefaultKafkaProducerFactory 构造函数(在这种情况下所有 Producer 共享相同的实例)。或者可以为每个 Producer 提供 Supplier<Serializer>`s (starting with version 2.3) that will be used to obtain separate `Serializer 实例:

When creating a DefaultKafkaProducerFactory, key and/or value Serializer classes can be picked up from configuration by calling the constructor that only takes in a Map of properties (see example in Using KafkaTemplate), or Serializer instances may be passed to the DefaultKafkaProducerFactory constructor (in which case all Producer s share the same instances). Alternatively you can provide Supplier<Serializer>`s (starting with version 2.3) that will be used to obtain separate `Serializer instances for each Producer:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从版本 2.5.10 开始,你现在可以在创建工厂之后更新生成程序属性。这可能很有用,例如,如果你必须在凭据更改后更新 SSL 密钥/信任存储位置。这些更改不会影响现有的生成程序实例;调用 reset() 以关闭任何现有的生成程序,以便使用新属性创建新生成程序。注意:你无法将事务性生成程序工厂更改为非事务性,反之亦然。

Starting with version 2.5.10, you can now update the producer properties after the factory is created. This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change. The changes will not affect existing producer instances; call reset() to close any existing producers so that new producers will be created using the new properties. NOTE: You cannot change a transactional producer factory to non-transactional, and vice-versa.

现在提供了两种新方法:

Two new methods are now provided:

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从版本 2.8 开始,如果你以对象的形式(在构造函数中或通过设置程序)提供了序列化器,则工厂将调用 configure() 方法以使用配置属性对其进行配置。

Starting with version 2.8, if you provide serializers as objects (in the constructor or via the setters), the factory will invoke the configure() method to configure them with the configuration properties.

Using ReplyingKafkaTemplate

版本 2.1.3 引入了 KafkaTemplate 的子类,以提供请求/答复语义。此类名为 ReplyingKafkaTemplate,并有两种附加方法;以下显示了方法签名:

Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. The class is named ReplyingKafkaTemplate and has two additional methods; the following shows the method signatures:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

(另请参阅 xref:kafka/sending-messages.adoc#exchanging-messages[Request/Reply with Message<?>)。

结果是一个 CompletableFuture,该结果会异步填充结果(或针对超时的异常)。结果还有一个 sendFuture 属性,它是调用 KafkaTemplate.send() 的结果。你可以使用该 Future 来确定发送操作的结果。

The result is a CompletableFuture that is asynchronously populated with the result (or an exception, for a timeout). The result also has a sendFuture property, which is the result of calling KafkaTemplate.send(). You can use this future to determine the result of the send operation.

在 3.0 版本中,这些方法(及其 sendFuture 属性)返回的 Future 已更改为 CompletableFuture,而不是 ListenableFuture

In version 3.0, the futures returned by these methods (and their sendFuture properties) have been changed to `CompletableFuture`s instead of `ListenableFuture`s.

如果使用了第一个方法,或 replyTimeout 参数是 null,则使用模板的 defaultReplyTimeout 属性(默认情况下为 5 秒)。

If the first method is used, or the replyTimeout argument is null, the template’s defaultReplyTimeout property is used (5 seconds by default).

从版本 2.8.8 开始,该模板有一个新方法 waitForAssignment。如果答复容器使用 auto.offset.reset=latest 进行配置以避免在容器初始化之前发送请求和答复,这将非常有用。

Starting with version 2.8.8, the template has a new method waitForAssignment. This is useful if the reply container is configured with auto.offset.reset=latest to avoid sending a request and a reply sent before the container is initialized.

当使用手动分区分配(无组管理)时,等待持续时间必须大于容器的 pollTimeout 属性,因为只有在完成首次轮询后才会发送通知。

When using manual partition assignment (no group management), the duration for the wait must be greater than the container’s pollTimeout property because the notification will not be sent until after the first poll is completed.

以下 Spring Boot 应用程序显示了一个如何使用该功能的示例:

The following Spring Boot application shows an example of how to use the feature:

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

请注意,我们可以使用 Boot 的自动配置容器工厂来创建答复容器。

Note that we can use Boot’s auto-configured container factory to create the reply container.

如果对回复使用非平凡的反序列化工具,请考虑使用委托给配置的反序列化工具的 ErrorHandlingDeserializer。配置时,RequestReplyFuture 将以异常完成,然后你可以捕获 ExecutionException,并捕获其 cause 属性中的 DeserializationException

If a non-trivial deserializer is being used for replies, consider using an ErrorHandlingDeserializer that delegates to your configured deserializer. When so configured, the RequestReplyFuture will be completed exceptionally and you can catch the ExecutionException, with the DeserializationException in its cause property.

从 2.6.7 版本开始,除了检测 DeserializationException 之外,如果提供了模板,它还会调用 replyErrorChecker 函数。如果返回异常,则将来将异常完成。

Starting with version 2.6.7, in addition to detecting DeserializationException`s, the template will call the `replyErrorChecker function, if provided. If it returns an exception, the future will be completed exceptionally.

这是一个示例:

Here is an example:

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

该模板设置一个标头(默认为 KafkaHeaders.CORRELATION_ID),服务器侧必须对该标头进行回显。

The template sets a header (named KafkaHeaders.CORRELATION_ID by default), which must be echoed back by the server side.

在这种情况下,以下 @KafkaListener 应用程序会做出响应:

In this case, the following @KafkaListener application responds:

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 基础设施会对关联 ID 进行回显,并确定答复主题。

The @KafkaListener infrastructure echoes the correlation ID and determines the reply topic.

有关发送答复的更多信息,请参阅“@ [3]” 。模板使用默认标头“@ [2]”来指示答复所针对的话题。

See Forwarding Listener Results using @SendTo for more information about sending replies. The template uses the default header KafKaHeaders.REPLY_TOPIC to indicate the topic to which the reply goes.

从 2.2 版本开始,该模板尝试从配置的答复容器中检测答复主题或分区。如果容器配置为只侦听一个主题或一个 TopicPartitionOffset,则会用它来设置答复标头。如果以其他方式配置了容器,则用户必须设置答复标头。在这种情况下,将在初始化过程中写入一条 INFO 日志消息。以下示例使用了 KafkaHeaders.REPLY_TOPIC

Starting with version 2.2, the template tries to detect the reply topic or partition from the configured reply container. If the container is configured to listen to a single topic or a single TopicPartitionOffset, it is used to set the reply headers. If the container is configured otherwise, the user must set up the reply headers. In this case, an INFO log message is written during initialization. The following example uses KafkaHeaders.REPLY_TOPIC:

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

当你使用单个答复 TopicPartitionOffset 进行配置时,你可以为多个模板使用相同的答复主题,前提条件是每个实例都在不同的分区中进行侦听。使用单个答复主题进行配置时,每个实例都必须使用不同的 group.id。在这种情况下,所有实例都将接收每个答复,但只有发送请求的实例才能找到相关 ID。这可能对自动扩展有用,但会产生额外的网络流量和丢弃每个不需要的答复时产生的小成本。当你使用此设置时,我们建议你将模板的 sharedReplyTopic 设置为 true,这将消除意外答复的日志记录级别为 DEBUG,而不是默认的 ERROR。

When you configure with a single reply TopicPartitionOffset, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition. When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID. This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply. When you use this setting, we recommend that you set the template’s sharedReplyTopic to true, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.

以下是如何配置答复容器以使用相同共享答复主题的示例:

The following is an example of configuring the reply container to use the same shared reply topic:

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}

如果您有多个客户端实例并且您未按照前一段所述对其进行配置,则每个实例都需要一个专用回复主题。另一种方法是设置 KafkaHeaders.REPLY_PARTITION 并为每个实例使用专用分区。Header 包含一个四字节整数(大端)。服务器必须使用此标头将回复路由到正确的分区(@KafkaListener 执行此操作)。不过,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(在 ContainerProperties 构造函数中使用 TopicPartitionOffset)。

If you have multiple client instances and you do not configure them as discussed in the preceding paragraph, each instance needs a dedicated reply topic. An alternative is to set the KafkaHeaders.REPLY_PARTITION and use a dedicated partition for each instance. The Header contains a four-byte int (big-endian). The server must use this header to route the reply to the correct partition (@KafkaListener does this). In this case, though, the reply container must not use Kafka’s group management feature and must be configured to listen on a fixed partition (by using a TopicPartitionOffset in its ContainerProperties constructor).

DefaultKafkaHeaderMapper 需要 Jackson 位于类路径上(用于 @KafkaListener)。如果它不可用,则消息转换器没有标头映射器,因此您必须使用 MessagingMessageConverterSimpleKafkaHeaderMapper 配置 MessagingMessageConverter,如前所示。

The DefaultKafkaHeaderMapper requires Jackson to be on the classpath (for the @KafkaListener). If it is not available, the message converter has no header mapper, so you must configure a MessagingMessageConverter with a SimpleKafkaHeaderMapper, as shown earlier.

默认情况下,会使用 3 个标头:

By default, 3 headers are used:

  • KafkaHeaders.CORRELATION_ID - used to correlate the reply to a request

  • KafkaHeaders.REPLY_TOPIC - used to tell the server where to reply

  • KafkaHeaders.REPLY_PARTITION - (optional) used to tell the server which partition to reply to

@KafkaListener 基础设施使用这些标头名称来路由答复。

These header names are used by the @KafkaListener infrastructure to route the reply.

从 2.3 版本开始,你可以自定义标头名称 - 该模板具有 3 个属性 correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName。如果你的服务器不是 Spring 应用程序(或不使用 @KafkaListener),则这非常有用。

Starting with version 2.3, you can customize the header names - the template has 3 properties correlationHeaderName, replyTopicHeaderName, and replyPartitionHeaderName. This is useful if your server is not a Spring application (or does not use the @KafkaListener).

相反,如果请求应用程序不是 Spring 应用程序并将关联信息放在其他标头中,那么从 3.0 版本开始,您可以为侦听器容器工厂配置自定义 correlationHeaderName,并且该标头将被回显。以前,侦听器必须回显自定义关联标头。

Conversely, if the requesting application is not a spring application and puts correlation information in a different header, starting with version 3.0, you can configure a custom correlationHeaderName on the listener container factory and that header will be echoed back. Previously, the listener had to echo custom correlation headers.

Request/Reply with `Message<?>`s

2.7 及更高版本在 ReplyingKafkaTemplate 中添加了方法,用于发送和接收 spring-messagingMessage<?> 抽象:

Version 2.7 added methods to the ReplyingKafkaTemplate to send and receive spring-messaging 's Message<?> abstraction:

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些将使用模板的默认 replyTimeout,并且还有可以在方法调用中采用超时的重载版本。

These will use the template’s default replyTimeout, there are also overloaded versions that can take a timeout in the method call.

在 3.0 版本中,这些方法(及其 sendFuture 属性)返回的 Future 已更改为 CompletableFuture,而不是 ListenableFuture

In version 3.0, the futures returned by these methods (and their sendFuture properties) have been changed to `CompletableFuture`s instead of `ListenableFuture`s.

如果消费者的 Deserializer 或模板的 MessageConverter 可以在没有任何其他信息的情况下转换有效负载,则使用第一种方法,可以通过回复消息中的配置或类型元数据进行转换。

Use the first method if the consumer’s Deserializer or the template’s MessageConverter can convert the payload without any additional information, either via configuration or type metadata in the reply message.

如果你需要为返回类型提供类型信息,以帮助信息转换器,请使用第二种方法。这也允许同一模板接收不同类型的数据,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。以下是后者的一个示例:

Use the second method if you need to provide type information for the return type, to assist the message converter. This also allows the same template to receive different types, even if there is no type metadata in the replies, such as when the server side is not a Spring application. The following is an example of the latter:

Template Bean
Unresolved include directive in modules/ROOT/pages/kafka/sending-messages.adoc - include::example$java-examples/org/springframework/kafka/jdocs/requestreply/Application.java[]
Using the template
Unresolved include directive in modules/ROOT/pages/kafka/sending-messages.adoc - include::example$java-examples/org/springframework/kafka/jdocs/requestreply/Application.java[]

Reply Type Message<?>

@KafkaListener 返回 Message<?> 时,在 2.5 版本之前,需要填充答复主题和关联 ID 标头。在此示例中,我们使用请求中的答复主题标头:

When the @KafkaListener returns a Message<?>, with versions before 2.5, it was necessary to populate the reply topic and correlation id headers. In this example, we use the reply topic header from the request:

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这也展示了如何在答复记录上设置密钥。

This also shows how to set a key on the reply record.

从 2.5 版本开始,该框架将检测这些标头是否缺失,并使用主题填充它们 - 或者是从 @SendTo 值中确定的主题,或者是从传入的 KafkaHeaders.REPLY_TOPIC 标头(如果存在)中确定的主题。如果存在,它还将对传入的 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION 进行回显。

Starting with version 2.5, the framework will detect if these headers are missing and populate them with the topic - either the topic determined from the @SendTo value or the incoming KafkaHeaders.REPLY_TOPIC header (if present). It will also echo the incoming KafkaHeaders.CORRELATION_ID and KafkaHeaders.REPLY_PARTITION, if present.

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

Aggregating Multiple Replies

Using ReplyingKafkaTemplate 中的模板严格用于单个请求/回复场景。对于单个消息的多个接收器返回回复的情况,可以使用 AggregatingReplyingKafkaTemplate。这是 Scatter-Gather Enterprise Integration Pattern 客户机端的实现。

The template in Using ReplyingKafkaTemplate is strictly for a single request/reply scenario. For cases where multiple receivers of a single message return a reply, you can use the AggregatingReplyingKafkaTemplate. This is an implementation of the client-side of the Scatter-Gather Enterprise Integration Pattern.

ReplyingKafkaTemplate 类似,AggregatingReplyingKafkaTemplate 构造函数需要一个生成器工厂和一个侦听器容器来接收答复;它具有第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,在每次收到答复时都会查询该参数;当谓词返回 true 时,将使用 ConsumerRecord 的集合来完成 sendAndReceive 方法返回的 Future

Like the ReplyingKafkaTemplate, the AggregatingReplyingKafkaTemplate constructor takes a producer factory and a listener container to receive the replies; it has a third parameter BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy which is consulted each time a reply is received; when the predicate returns true, the collection of ConsumerRecord`s is used to complete the `Future returned by the sendAndReceive method.

有一个额外的属性 returnPartialOnTimeout(默认值为 false)。当此属性设置为 true 时,与使用 KafkaReplyTimeoutException 完成 Future 不同,部分结果正常完成 Future(只要接收到了至少一条回复记录)。

There is an additional property returnPartialOnTimeout (default false). When this is set to true, instead of completing the future with a KafkaReplyTimeoutException, a partial result completes the future normally (as long as at least one reply record has been received).

从 2.3.5 版开始,在超时后(如果 returnPartialOnTimeouttrue)也将调用谓词。第一个参数是记录的当前列表;如果此调用是由于超时引起的,则第二个参数为 true。谓词可以修改记录列表。

Starting with version 2.3.5, the predicate is also called after a timeout (if returnPartialOnTimeout is true). The first argument is the current list of records; the second is true if this call is due to a timeout. The predicate can modify the list of records.

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

请注意,返回类型是一个 ConsumerRecord,其值为 ConsumerRecord 的集合。“外部”ConsumerRecord 不是“真实”记录,它是模板合成的一个记录,用作请求中接收到的实际回复记录的持有者。当正常的释放发生(释放策略返回 true)时,主题设置为 aggregatedResults;如果 returnPartialOnTimeout 为 true,并且发生超时(并且接收到了至少一条回复记录),则主题设置为 partialResultsAfterTimeout。模板为此类“主题”名称提供了常量静态变量:

Notice that the return type is a ConsumerRecord with a value that is a collection of ConsumerRecord`s. The "outer" `ConsumerRecord is not a "real" record, it is synthesized by the template, as a holder for the actual reply records received for the request. When a normal release occurs (release strategy returns true), the topic is set to aggregatedResults; if returnPartialOnTimeout is true, and timeout occurs (and at least one reply record has been received), the topic is set to partialResultsAfterTimeout. The template provides constant static variables for these "topic" names:

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection 中的实际 ConsumerRecord 包含实际主题(即从该主题接收回复)。

The real ConsumerRecord`s in the `Collection contain the actual topic(s) from which the replies are received.

回复的侦听器容器 must 可配置为 AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消费者属性 enable.auto.commit 必须是 false(2.3 版以来的默认设置)。为了避免丢失任何消息的可能,模板仅在没有未完成请求时提交偏移量,即当最后一个未完成请求被释放策略释放时。在重新平衡之后,可能会出现重复的回复传递;对于任何正在进行的请求,这些传递将被忽略;在接收已发布回复的重复回复时,您可能会看到错误日志消息。

The listener container for the replies must be configured with AckMode.MANUAL or AckMode.MANUAL_IMMEDIATE; the consumer property enable.auto.commit must be false (the default since version 2.3). To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy. After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.

如果您对这个聚合模板使用 ErrorHandlingDeserializer ,框架将不会自动检测 DeserializationException`s. Instead, the record (with a `null 值)将原封不动地返回,带有头中的反序列化异常。建议应用程序调用实用方法 ReplyingKafkaTemplate.checkDeserialization() 方法以确定是否发生了反序列化异常。有关更多信息,请参阅其 JavaDocs。对于这个聚合模板,也不调用 replyErrorChecker ;您应该对答复的每个元素执行检查。

If you use an ErrorHandlingDeserializer with this aggregating template, the framework will not automatically detect DeserializationException`s. Instead, the record (with a `null value) will be returned intact, with the deserialization exception(s) in headers. It is recommended that applications call the utility method ReplyingKafkaTemplate.checkDeserialization() method to determine if a deserialization exception occurred. See its JavaDocs for more information. The replyErrorChecker is also not called for this aggregating template; you should perform the checks on each element of the reply.