Sending Messages
本节介绍如何发送邮件。
Using KafkaTemplate
本节介绍如何使用 KafkaTemplate
发送邮件。
Overview
KafkaTemplate
封装一个生产者,并提供了向 Kafka 主题发送数据的方法。以下列表显示了 KafkaTemplate
中的相关方法:
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
有关详情,请参见“ Javadoc”。
在 3.0 版本中,先前返回 ListenableFuture
的方法已更改为返回 CompletableFuture
。为了方便迁移,2.9 版本添加了提供具有 CompletableFuture
返回类型的方法 usingCompletableFuture()
的方法;此方法不再可用。
sendDefault
API 要求为模板提供一个默认主题。
该 API 将 timestamp
作为参数接收,并将此时间戳存储在记录中。用户提供的 timestamp 存储方式取决于 Kafka 主题中配置的时间戳类型。如果主题配置为使用 CREATE_TIME
,则会记录(或在未指定情况下生成)用户指定的时间戳。如果主题配置为使用 LOG_APPEND_TIME
,则忽略用户指定的时间戳,而代理会添加本地代理时间。
要使用此模板,可以配置生成器工厂,并将其提供给模板的构造函数。以下示例展示了如何操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本 2.5 开始,您现在可以覆盖工厂的 ProducerConfig
属性,以便使用相同工厂的不同生成器配置创建模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,可以通过不同的缩小泛型类型来引用类型为 ProducerFactory<?, ?>
的 bean(例如,Spring Boot 自动配置的 bean)。
您还可以使用标准 <bean/>
定义配置模板。
然后,要使用模板,可以调用其某个方法。
使用带 Message<?>
参数的方法时,会在消息头中提供主题、分区、键和时间戳信息,其中包括以下项:
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION
-
KafkaHeaders.KEY
-
KafkaHeaders.TIMESTAMP
消息负载是数据。
另外,您可以使用 ProducerListener
配置 KafkaTemplate
,以在发送结果(成功或失败)时获取带有回调的异步信息,而不是等待 Future
完成。以下列表显示了 ProducerListener
接口的定义:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板使用 LoggingProducerListener
进行配置,该侦听器记录错误,并在发送成功时不执行任何操作。
为了方便起见,如果您只想实现其中一个方法,则会提供默认的方法实现。
请注意,send
方法返回 CompletableFuture<SendResult>
。您可以使用侦听器注册一个回调,以异步接收发送结果。以下示例展示了如何操作:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult
有两个属性,分别是 ProducerRecord`和`RecordMetadata
。有关这些对象的更多信息,请参阅 Kafka API 文档。
Throwable
可以强制转换为 KafkaProducerException
;它的 failedProducerRecord
属性包含失败记录。
如果您希望阻塞发送线程以等待结果,则可以调用 future 的 get()
方法;建议使用带超时的方法。如果您设置了 linger.ms
,则可能希望在等待之前调用 flush()
,或者为了方便,模板有一个带有 autoFlush
参数的构造函数,可导致模板在每次发送时进行 flush()
。仅当您设置了 linger.ms
生成器属性,并且希望立即发送部分批次时,才需要进行刷新。
Examples
本节展示将消息发送到 Kafka 的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,ExecutionException
的原因是带有 failedProducerRecord
属性的 KafkaProducerException
。
Using RoutingKafkaTemplate
从版本 2.5 开始,你可以使用 RoutingKafkaTemplate
来根据目标 topic
名称在运行时选择生成程序。
路由模板不支持 not 事务、execute
、flush
或 metrics
操作,因为主题对这些操作未知。
该模板需要一个 java.util.regex.Pattern
到 ProducerFactory<Object, Object>
实例的映射。该映射应该是按顺序排列的(例如 LinkedHashMap
),因为它按顺序遍历;你应当在开头添加更具体的模式。
以下简单的 Spring Boot 应用程序提供了一个使用同一模板发送到不同主题的示例,每个主题都使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
此示例的相应“@KafkaListener
”显示在“Annotation Properties”中。
要了解达成类似结果的另一种技术,但该技术拥有额外的能力(将不同类型发送至同一主题),请参阅“Delegating Serializer and Deserializer”。
Using DefaultKafkaProducerFactory
如“Using KafkaTemplate
”中所示,使用“ProducerFactory
”创建生成器。
当不使用Transactions时,默认情况下,DefaultKafkaProducerFactory`会创建一个单例制作者,由所有客户端使用,如在`KafkaProducer`JavaDocs中推荐的那样。但是,如果您在模板上调用`flush()
,这可能会导致使用同一制作者的其他线程延迟。从2.3版本开始,DefaultKafkaProducerFactory`具有一个新属性`producerPerThread
。当设置为`true`时,工厂将为每个线程创建(并缓存)一个单独的制作者,以避免此问题。
当 producerPerThread
是 true
时,当不再需要生产者时,用户代码 must 调用工厂上的 closeThreadBoundProducer()
。这将物理关闭生产者并将其从 ThreadLocal
中移除。调用 reset()
或 destroy()
不会清除这些生产者。
在创建 DefaultKafkaProducerFactory
时,可以通过调用仅包含属性映射的构造函数,从配置中获取键和/或值 Serializer
类(请参见 Using KafkaTemplate
中的示例),或者可以使用 Serializer
实例传递给 DefaultKafkaProducerFactory
构造函数(在这种情况下所有 Producer
共享相同的实例)。或者可以为每个 Producer
提供 Supplier<Serializer>`s (starting with version 2.3) that will be used to obtain separate `Serializer
实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,你现在可以在创建工厂之后更新生成程序属性。这可能很有用,例如,如果你必须在凭据更改后更新 SSL 密钥/信任存储位置。这些更改不会影响现有的生成程序实例;调用 reset()
以关闭任何现有的生成程序,以便使用新属性创建新生成程序。注意:你无法将事务性生成程序工厂更改为非事务性,反之亦然。
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从版本 2.8 开始,如果你以对象的形式(在构造函数中或通过设置程序)提供了序列化器,则工厂将调用 configure()
方法以使用配置属性对其进行配置。
Using ReplyingKafkaTemplate
版本 2.1.3 引入了 KafkaTemplate
的子类,以提供请求/答复语义。此类名为 ReplyingKafkaTemplate
,并有两种附加方法;以下显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另请参阅 xref:kafka/sending-messages.adoc#exchanging-messages[Request/Reply with Message<?>
)。
结果是一个 CompletableFuture
,该结果会异步填充结果(或针对超时的异常)。结果还有一个 sendFuture
属性,它是调用 KafkaTemplate.send()
的结果。你可以使用该 Future 来确定发送操作的结果。
在 3.0 版本中,这些方法(及其 sendFuture
属性)返回的 Future 已更改为 CompletableFuture
,而不是 ListenableFuture
。
如果使用了第一个方法,或 replyTimeout
参数是 null
,则使用模板的 defaultReplyTimeout
属性(默认情况下为 5 秒)。
从版本 2.8.8 开始,该模板有一个新方法 waitForAssignment
。如果答复容器使用 auto.offset.reset=latest
进行配置以避免在容器初始化之前发送请求和答复,这将非常有用。
当使用手动分区分配(无组管理)时,等待持续时间必须大于容器的 pollTimeout
属性,因为只有在完成首次轮询后才会发送通知。
以下 Spring Boot 应用程序显示了一个如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 的自动配置容器工厂来创建答复容器。
如果对回复使用非平凡的反序列化工具,请考虑使用委托给配置的反序列化工具的 ErrorHandlingDeserializer
。配置时,RequestReplyFuture
将以异常完成,然后你可以捕获 ExecutionException
,并捕获其 cause
属性中的 DeserializationException
。
从 2.6.7 版本开始,除了检测 DeserializationException
之外,如果提供了模板,它还会调用 replyErrorChecker
函数。如果返回异常,则将来将异常完成。
这是一个示例:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
该模板设置一个标头(默认为 KafkaHeaders.CORRELATION_ID
),服务器侧必须对该标头进行回显。
在这种情况下,以下 @KafkaListener
应用程序会做出响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
@KafkaListener
基础设施会对关联 ID 进行回显,并确定答复主题。
有关发送答复的更多信息,请参阅“@ [3]” 。模板使用默认标头“@ [2]”来指示答复所针对的话题。
从 2.2 版本开始,该模板尝试从配置的答复容器中检测答复主题或分区。如果容器配置为只侦听一个主题或一个 TopicPartitionOffset
,则会用它来设置答复标头。如果以其他方式配置了容器,则用户必须设置答复标头。在这种情况下,将在初始化过程中写入一条 INFO
日志消息。以下示例使用了 KafkaHeaders.REPLY_TOPIC
:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
当你使用单个答复 TopicPartitionOffset
进行配置时,你可以为多个模板使用相同的答复主题,前提条件是每个实例都在不同的分区中进行侦听。使用单个答复主题进行配置时,每个实例都必须使用不同的 group.id
。在这种情况下,所有实例都将接收每个答复,但只有发送请求的实例才能找到相关 ID。这可能对自动扩展有用,但会产生额外的网络流量和丢弃每个不需要的答复时产生的小成本。当你使用此设置时,我们建议你将模板的 sharedReplyTopic
设置为 true
,这将消除意外答复的日志记录级别为 DEBUG,而不是默认的 ERROR。
以下是如何配置答复容器以使用相同共享答复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例并且您未按照前一段所述对其进行配置,则每个实例都需要一个专用回复主题。另一种方法是设置 KafkaHeaders.REPLY_PARTITION
并为每个实例使用专用分区。Header
包含一个四字节整数(大端)。服务器必须使用此标头将回复路由到正确的分区(@KafkaListener
执行此操作)。不过,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(在 ContainerProperties
构造函数中使用 TopicPartitionOffset
)。
|
默认情况下,会使用 3 个标头:
-
KafkaHeaders.CORRELATION_ID
——用于关联对请求的答复 -
KafkaHeaders.REPLY_TOPIC
——用于告知服务器在何处回复 -
KafkaHeaders.REPLY_PARTITION
——(可选)用于指示服务器回复哪个分区
@KafkaListener
基础设施使用这些标头名称来路由答复。
从 2.3 版本开始,你可以自定义标头名称 - 该模板具有 3 个属性 correlationHeaderName
、replyTopicHeaderName
和 replyPartitionHeaderName
。如果你的服务器不是 Spring 应用程序(或不使用 @KafkaListener
),则这非常有用。
相反,如果请求应用程序不是 Spring 应用程序并将关联信息放在其他标头中,那么从 3.0 版本开始,您可以为侦听器容器工厂配置自定义 |
Request/Reply with `Message<?>`s
2.7 及更高版本在 ReplyingKafkaTemplate
中添加了方法,用于发送和接收 spring-messaging
的 Message<?>
抽象:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认 replyTimeout
,并且还有可以在方法调用中采用超时的重载版本。
在 3.0 版本中,这些方法(及其 sendFuture
属性)返回的 Future 已更改为 CompletableFuture
,而不是 ListenableFuture
。
如果消费者的 Deserializer
或模板的 MessageConverter
可以在没有任何其他信息的情况下转换有效负载,则使用第一种方法,可以通过回复消息中的配置或类型元数据进行转换。
如果你需要为返回类型提供类型信息,以帮助信息转换器,请使用第二种方法。这也允许同一模板接收不同类型的数据,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。以下是后者的一个示例:
-
Java
-
Kotlin
link:{java-examples}/requestreply/Application.java[role=include]
link:{kotlin-examples}/requestreply/Application.kt[role=include]
-
Java
-
Kotlin
link:{java-examples}/requestreply/Application.java[role=include]
link:{kotlin-examples}/requestreply/Application.kt[role=include]
Reply Type Message<?>
当 @KafkaListener
返回 Message<?>
时,在 2.5 版本之前,需要填充答复主题和关联 ID 标头。在此示例中,我们使用请求中的答复主题标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这也展示了如何在答复记录上设置密钥。
从 2.5 版本开始,该框架将检测这些标头是否缺失,并使用主题填充它们 - 或者是从 @SendTo
值中确定的主题,或者是从传入的 KafkaHeaders.REPLY_TOPIC
标头(如果存在)中确定的主题。如果存在,它还将对传入的 KafkaHeaders.CORRELATION_ID
和 KafkaHeaders.REPLY_PARTITION
进行回显。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
Aggregating Multiple Replies
Using ReplyingKafkaTemplate
中的模板严格用于单个请求/回复场景。对于单个消息的多个接收器返回回复的情况,可以使用 AggregatingReplyingKafkaTemplate
。这是 Scatter-Gather Enterprise Integration Pattern 客户机端的实现。
与 ReplyingKafkaTemplate
类似,AggregatingReplyingKafkaTemplate
构造函数需要一个生成器工厂和一个侦听器容器来接收答复;它具有第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
,在每次收到答复时都会查询该参数;当谓词返回 true
时,将使用 ConsumerRecord
的集合来完成 sendAndReceive
方法返回的 Future
。
有一个额外的属性 returnPartialOnTimeout
(默认值为 false)。当此属性设置为 true
时,与使用 KafkaReplyTimeoutException
完成 Future 不同,部分结果正常完成 Future(只要接收到了至少一条回复记录)。
从 2.3.5 版开始,在超时后(如果 returnPartialOnTimeout
为 true
)也将调用谓词。第一个参数是记录的当前列表;如果此调用是由于超时引起的,则第二个参数为 true
。谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是一个 ConsumerRecord
,其值为 ConsumerRecord
的集合。“外部”ConsumerRecord
不是“真实”记录,它是模板合成的一个记录,用作请求中接收到的实际回复记录的持有者。当正常的释放发生(释放策略返回 true)时,主题设置为 aggregatedResults
;如果 returnPartialOnTimeout
为 true,并且发生超时(并且接收到了至少一条回复记录),则主题设置为 partialResultsAfterTimeout
。模板为此类“主题”名称提供了常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
Collection
中的实际 ConsumerRecord
包含实际主题(即从该主题接收回复)。
回复的侦听器容器 must 可配置为 AckMode.MANUAL
或 AckMode.MANUAL_IMMEDIATE
;消费者属性 enable.auto.commit
必须是 false
(2.3 版以来的默认设置)。为了避免丢失任何消息的可能,模板仅在没有未完成请求时提交偏移量,即当最后一个未完成请求被释放策略释放时。在重新平衡之后,可能会出现重复的回复传递;对于任何正在进行的请求,这些传递将被忽略;在接收已发布回复的重复回复时,您可能会看到错误日志消息。
如果您对这个聚合模板使用 |