AmqpTemplate
和 Spring 框架及相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个“模板
”,它发挥着核心作用。定义主要操作的接口被称为 AmqpTemplate
。这些操作涵盖发送和接收消息的一般行为。换句话说,它们对于任何实现来说都不是唯一的,因此名称中包含“AMQP
”。另一方面,该接口的一些实现与 AMQP 协议的实现相关。与作为接口级 API 本身的 JMS 不同,AMQP 是一个线级协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都依赖于某个特定的客户端库。目前,只有一种实现:RabbitTemplate
。在随后的示例中,我们经常使用 AmqpTemplate
。但是,当你查看配置示例或任何实例化模板或调用 setter 的代码摘录时,你都可以看到实现类型(例如,RabbitTemplate
)。
如前文所述,AmqpTemplate
接口定义了用于发送和接收消息的所有基本操作。我们会在 Sending Messages 和 Receiving Messages 中分别探讨消息发送和接收。
另请参阅 Async Rabbit Template。
Adding Retry Capabilities
从 1.3 版开始,您现在可以通过配置 RabbitTemplate
来使用 RetryTemplate
,以便帮助处理经纪人连接问题。要获得完整信息,请参阅 spring-retry 项目。以下是仅使用指数级退避策略和默认 SimpleRetryPolicy
的一个示例,在向调用方引发异常之前尝试三次。
以下示例使用 XML 命名空间:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例在 Java 中使用 @Configuration
注解:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
从 1.4 版开始,除了 retryTemplate
属性外,recoveryCallback
选项也在 RabbitTemplate
上受支持。它用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
的第二个参数。
|
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,你不应向 RabbitTemplate
中注入 RetryTemplate
。
Publishing is Asynchronous — How to Detect Successes and Failures
发布消息是一种异步机制,默认情况下,不能路由的消息会被 RabbitMQ 删除。对于成功的发布,您可以收到异步确认,如 Correlated Publisher Confirms and Returns 中所述。考虑两种失败情况:
-
发布到交换机,但没有匹配的目标队列。
-
发布到不存在的交换机。
第一种情况由发布者退回内容涵盖,如 Correlated Publisher Confirms and Returns 中所述。
对于第二种情况,消息会被放弃,并且不会生成任何返回。底层通道会因异常而关闭。默认情况下,此异常会被记录,但你可以使用 CachingConnectionFactory
注册一个 ChannelListener
来获取此类事件的通知。以下示例展示了如何添加一个 ConnectionListener
:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
你可以检查信号的 reason
属性以确定发生的问题。
若要在发送线程上检测异常,你可以将 RabbitTemplate
上的 setChannelTransacted(true)
设置为 true
,而且异常将在 txCommit()
上检测到。但是,事务会极大地影响性能,因此在启用事务只用于此用例之前请对此予以仔细考虑。
Correlated Publisher Confirms and Returns
AmqpTemplate
的 RabbitTemplate
实现支持发布者确认和返回。
对于已返回的消息,模板的 mandatory
属性必须设置为 true
或 mandatory-expression
针对特定消息评估为 true
。此功能需要一个 CachingConnectionFactory
,其 publisherReturns
属性设置为 true
(请参阅 Publisher Confirms and Returns)。退回内容通过调用 setReturnsCallback(ReturnsCallback callback)
由其注册 RabbitTemplate.ReturnsCallback
发送至客户端。回调必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
ReturnedMessage
具有以下属性:
-
message
- 返回的消息本身 -
replyCode
- 指示返回原因的代码 -
replyText
- 返回的文本原因 - 例如NO_ROUTE
-
exchange
- 收到消息的交换机 -
routingKey
- 所使用的路由键
每个 RabbitTemplate
只支持一个 ReturnsCallback
。另请参阅 Reply Timeout。
对于发布者确认(也称为发布者确认),模板需要一个 CachingConnectionFactory
,且其 publisherConfirm
属性设置为 ConfirmType.CORRELATED
。确认是通过调用 setConfirmCallback(ConfirmCallback callback)
注册 RabbitTemplate.ConfirmCallback
发送给客户机的。回调必须实现此方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
是在发送原始消息时由客户机提供的对象。ack
对于一个 ack
为真,并且对于一个 nack
为假。对于 nack
实例,如果在生成 nack
时原因可用,则原因可能包含 nack
的原因。一个示例是在发送消息到不存在的交换时。在该情况下,代理会关闭通道。关闭原因包含在 cause
中。cause
在 1.4 版中添加。
一个 RabbitTemplate
仅支持一个 ConfirmCallback
。
当 Rabbit 模板发送操作完成时,通道关闭。当连接工厂缓存已满时,导致接收确认或返回失败(当缓存中有空间时,通道未物理关闭并且返回和确认正常进行)。当缓存已满时,框架将关闭延迟最多五秒,以便留出接收确认和返回的时间。使用确认时,在收到最后一个确认后关闭通道。仅使用返回时,通道在全部五秒内保持打开状态。我们通常建议设置连接工厂的 |
在 2.1 版之前,启用了发布者确认的通道在收到确认之前返回到缓存。某些其他进程可以签出通道并执行导致通道关闭的操作,例如将消息发布到不存在的交换。这可能会导致确认丢失。2.1 版及更高版本在有待处理的确认时不再将通道返回到缓存。RabbitTemplate
在每个操作后对通道执行一个逻辑 close()
。一般来说,这意味着一次只在一个通道上有待处理的一个确认。
从 2.2 版开始,在连接工厂的 |
只要返回回调在 60 秒或更短时间内执行,就仍可确保在确认之前收到返回消息。已经计划确认在返回回调退出或 60 秒后(以先到者为准)送达。
CorrelationData
对象有一个 CompletableFuture
,你可以使用它来获取结果,而不是在模板上使用 ConfirmCallback
。以下示例展示了如何配置一个 CorrelationData
实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是 CompletableFuture<Confirm>
,因此你可以在准备就绪时 get()
结果,或者使用 whenComplete()
进行异步回调。Confirm
对象是一个带有 2 个属性的简单 bean:ack
和 reason
(对于 nack
实例)。对于代理生成的 nack
实例,原因不会填充。它对于由框架生成的 nack
实例填充(例如,在 ack
实例未完成时关闭连接)。
此外,当确认和返回都启用时,如果无法将 CorrelationData
return
属性路由到任何队列,则其 return
属性将填充为已返回的消息。确保在使用 ack
设置 future 之前设置返回消息属性。CorrelationData.getReturn()
返回一个 ReturnMessage
,其具有以下属性:
-
message (the returned message)
-
replyCode
-
replyText
-
exchange
-
routingKey
另请参阅 Scoped Operations,以了解用于等待发布者确认的更简单的机制。
Scoped Operations
通常情况下,在使用模板时,将从缓存中签出(或创建)“通道”,用于操作,然后将其返回给缓存以供重用。在多线程环境中,无法保证下一个操作使用相同的通道。然而,有时你想要更多地控制通道的使用,并确保在同一个通道上执行多个操作。
从 2.0 版开始,提供了一个名为 invoke
的新方法,并带有 OperationsCallback
。在回调范围内执行在提供的 RabbitOperations
参数上执行的任何操作均使用相同的专用 Channel
,该 Channel
将在最后关闭(不会返回到缓存)。如果频道为 PublisherCallbackChannel
,则在收到所有确认后将其返回到缓存(请参阅 Correlated Publisher Confirms and Returns)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
你可能需要这样做的一个示例是,如果你希望对底层 通道
使用 waitForConfirms()
方法。如前所述,此方法以前未在 Spring API 中暴露,因为通道通常会被缓存并共享。RabbitTemplate
现在提供 waitForConfirms(long timeout)
和 waitForConfirmsOrDie(long timeout)
,它们委托给 OperationsCallback
范围中使用的专用通道。显然,这些方法不能在该范围之外使用。
请注意,在其他位置会提供一种更高级别的抽象,它允许您将确认与请求相关联(请参阅 Correlated Publisher Confirms and Returns)。如果您只想等到经纪人确认交付,可以使用以下示例中所示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果你希望在 OperationsCallback
范围内针对 RabbitAdmin
操作调用相同通道,则必须使用与 invoke
操作相同的 RabbitTemplate
构建管理员。
如果已经模板操作在现有事务范围中执行,则上述讨论是无关紧要的,例如,当在已执行监听器容器线程上运行,并在已执行模板上执行操作时。在这种情况下,操作在该通道上执行,并在线程返回到容器时提交。在该场景中,无需使用 |
以这种方式使用确认时,为将确认与请求关联而设置的大部分基础架构实际上并不是必需的(除非返回也已启用)。从版本 2.2 开始,连接工厂支持一个名为 publisherConfirmType
的新属性。当它被设置为 ConfirmType.SIMPLE
时,将避免使用该基础架构,并且确认处理的效率会更高。
此外,RabbitTemplate
在已发送的消息 MessageProperties
中设置 publisherSequenceNumber
属性。如果你希望检查(或记录或以其他方式使用)特定确认,则可以使用重载的 invoke
方法执行此操作,如下例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些 |
以下示例记录 ack
和 nack
实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
作用域操作绑定到一个线程。请参阅 Strict Message Ordering in a Multi-Threaded Environment 了解多线程环境中严格顺序的讨论。
Strict Message Ordering in a Multi-Threaded Environment
Scoped Operations 中的讨论仅适用于在同一线程上执行操作的情况。
考虑以下情况:
-
thread-1
向队列发送消息,并将工作交给thread-2
-
thread-2
向同一队列发送消息
由于 RabbitMQ 具有异步性质且使用了缓存的频道,因此无法确定将使用相同的频道,从而无法保证消息到达队列的顺序。(在大多数情况下,它们会按顺序到达,但乱序交付的可能性不为零)。为解决此用例,您可以将具有大小 1
的有界频道缓存(以及 channelCheckoutTimeout
)结合使用,以确保始终在同一频道上发布消息,并且保证顺序。为此,如果您有其他用途的连接工厂(例如使用者),则应为模板使用专用的连接工厂,或将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅 Using a Separate Connection)。
通过一个简单的 Spring Boot 应用程序最能说明这一点:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布是在两个不同的线程上执行的,它们都将使用相同的通道,因为缓存上限为一个通道。
从版本 2.3.7 开始,ThreadChannelConnectionFactory
支持使用 prepareContextSwitch
和 switchContext
方法将线程的通道传输到另一个线程。第一个方法返回一个上下文,传递给调用第二个方法的第二个线程。一个线程可以有一个非事务通道或一个事务通道(或每个通道一个);除非你使用两个连接工厂,否则你无法单独传输它们。示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦调用 prepareSwitchContext
,如果当前线程执行任何其他操作,则这些操作将在新通道上执行。当不再需要线程绑定的通道时,关闭该通道非常重要。
Messaging Integration
从 1.4 版本开始,RabbitMessagingTemplate
(构建在 RabbitTemplate
上)提供与 Spring Framework 消息抽象(即 org.springframework.messaging.Message
)的集成。这使你可以通过使用 spring-messaging
Message<?>
抽象来发送和接收消息。其他 Spring 项目(例如 Spring Integration 和 Spring 的 STOMP 支持)会使用此抽象。涉及两个消息转换器:一个用于在 spring-messaging Message<?>
和 Spring AMQP 的 Message
抽象之间转换,另一个用于在 Spring AMQP 的 Message
抽象和底层 RabbitMQ 客户端库所需的格式之间转换。默认情况下,消息负载由提供的 RabbitTemplate
实例的消息转换器转换。或者,你可以注入一个带有其他负载转换器的自定义 MessagingMessageConverter
,如下例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
Validated User Id
从 1.6 版本开始,现在,模板支持 user-id-expression
(在使用 Java 配置时为 userIdExpression
)。如果消息已发送,则在评估此表达式后设置用户 ID 属性(如果尚未设置)。评估的根对象是要发送的消息。
以下示例展示了如何使用 user-id-expression
属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是一个文本表达式。第二个示例从应用程序上下文中获取连接工厂 bean 的 username
属性。
Using a Separate Connection
从版本 2.0.2 开始,你可以将 usePublisherConnection
属性设置为 true
,以便在可能的情况下使用与侦听器容器不同的连接。这样做是为了避免在生产者因任何原因阻塞时消费者被阻塞。为此,连接工厂维护了一个第二个内部连接工厂;默认情况下,它与主工厂的类型相同,但如果你希望为发布使用不同的工厂类型,则可以显式设置它。如果兔子模板在侦听器容器启动的事务中运行,那么无论此设置如何,都会使用容器的通道。
一般来说,你不应该使用属性设置为 true
的模板 RabbitAdmin
。使用接收连接工厂的 RabbitAdmin
构造函数。如果你使用接收模板的其他构造函数,请确保模板属性为 false
。这是因为通常使用管理员声明侦听器容器的队列。使用属性设置为 true
的模板意味着独占队列(例如 AnonymousQueue
)将在与侦听器容器使用的连接不同的连接上声明。在该情况下,队列不能被容器使用。