Publisher Confirms
获取发布消息结果有两种机制;在每种情况下,连接工厂都必须将 publisherConfirmType
设置为 ConfirmType.CORRELATED
。“传统”机制是将 confirmAckChannel
设置为您可从中异步检索确认的消息通道的 bean 名称;负响应会被发送到错误通道(如果已启用)——参见 Error Channels。
There are two mechanisms to get the result of publishing a message; in each case, the connection factory must have publisherConfirmType
set ConfirmType.CORRELATED
.
The "legacy" mechanism is to set the confirmAckChannel
to the bean name of a message channel from which you can retrieve the confirmations asynchronously; negative acks are sent to the error channel (if enabled) - see Error Channels.
在 3.1 版中添加的首选机制是使用关联数据标头并通过其 Future<Confirm>
属性等待结果。对于批处理侦听器,这尤其有用,因为您可以在等待结果之前发送多条消息。要使用此技术,请将 useConfirmHeader
属性设置为 true。以下简单应用程序是使用此技术的一个示例:
The preferred mechanism, added in version 3.1 is to use a correlation data header and wait for the result via its Future<Confirm>
property.
This is particularly useful with a batch listener because you can send multiple messages before waiting for the result.
To use this technique, set the useConfirmHeader
property to true
The following simple application is an example of using this technique:
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private StreamBridge bridge;
@Bean
Consumer<List<String>> input() {
return list -> {
List<MyCorrelationData> results = new ArrayList<>();
list.forEach(str -> {
log.info("Received: " + str);
MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
results.add(corr);
this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
});
results.forEach(correlation -> {
try {
Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
log.info(confirm + " for " + correlation.getPayload());
if (correlation.getReturnedMessage() != null) {
log.error("Message for " + correlation.getPayload() + " was returned ");
// throw some exception to invoke binder retry/error handling
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
};
}
@Bean
public ApplicationRunner runner(BatchingRabbitTemplate template) {
return args -> IntStream.range(0, 10).forEach(i ->
template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
}
@Bean
public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
}
}
class MyCorrelationData extends CorrelationData {
private final String payload;
MyCorrelationData(String id, String payload) {
super(id);
this.payload = payload;
}
public String getPayload() {
return this.payload;
}
}
正如您所见,我们发送每条消息,然后等待发布结果。如果无法路由消息,则在完成将来之前,使用已返回的消息填充关联数据。
As you can see, we send each message and then await for the publication results. If the messages can’t be routed, then correlation data is populated with the returned message before the future is completed.
相关数据必须提供唯一的 id
,以便框架能够执行相关性。
The correlation data must be provided with a unique id
so that the framework can perform the correlation.
您不能同时设置 useConfirmHeader
和 confirmAckChannel
,但当 useConfirmHeader
为 true 时,您仍然可以在错误通道中接收返回的消息,但使用关联标头会更方便。
You cannot set both useConfirmHeader
and confirmAckChannel
but you can still receive returned messages in the error channel when useConfirmHeader
is true, but using the correlation header is more convenient.