Accessing Delivery Attempts
要访问阻塞和非阻塞发送尝试,请将这些标题添加到 @KafkaListener
方法签名:
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int blockingAttempts,
@Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, required = false) Integer nonBlockingAttempts
仅在设置 ContainerProperties’s deliveryAttemptHeader to `true
的情况下才会阻止发送尝试。
请注意,对于初始发送,非阻塞尝试将为 null
。
从 3.0.10 版开始,提供了一个方便的 KafkaMessageHeaderAccessor
以允许更简单地访问这些标题;可以在访问器作为监听器方法的参数提供:
@RetryableTopic(backoff = @Backoff(...)) @KafkaListener(id = "dh1", topics = "dh1") void listen(Thing thing, KafkaMessageHeaderAccessor accessor) { ... }
使用 accessor.getBlockingRetryDeliveryAttempt()
和 accessor.getNonBlockingRetryDeliveryAttempt()
获取值。如果未启用阻塞重试,访问器将抛出 IllegalStateException
;对于非阻塞重试,访问器对初始发送返回 1
。