Receiving a Message
本部分描述如何在 Spring 中使用 JMS 接收消息。
Synchronous Reception
虽然 JMS 通常与异步处理相关,但你可以同步使用消息。重载的 receive(…) 方法提供了此功能。在同步接收过程中,调用线程会阻塞,直到有消息可用为止。这可能是一个危险的操作,因为调用线程可能潜在地无限期地被阻塞。receiveTimeout 属性指定接收者在放弃等待消息之前应该等待多长时间。
Asynchronous reception: Message-Driven POJOs
Spring 还通过使用 |
类似于 EJB 世界中的消息驱动 Bean (MDB),消息驱动 POJO (MDP) 充当 JMS 消息的接收器。MDP 的一个限制(但请参见 xref:integration/jms/receiving.adoc#jms-receiving-async-message-listener-adapter[Using MessageListenerAdapter
)是它必须实现 jakarta.jms.MessageListener
接口。请注意,如果您的 POJO 在多个线程上接收消息,则务必确保您的实现是线程安全的。
以下示例显示了 MDP 的一个简单实现:
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
实现 MessageListener 后,是时候创建一个监听容器了。
以下示例展示了如何定义和配置 Spring 随附的消息监听器容器之一(在此例中为 DefaultMessageListenerContainer):
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
有关每个实现支持的功能的完整说明,请参见 Spring javadoc 中各种消息侦听器容器(全部实现 MessageListenerContainer)。
Using the SessionAwareMessageListener
Interface
SessionAwareMessageListener 接口是一个特定于 Spring 的接口,它提供与 JMS MessageListener 接口类似的约定,但还允许消息处理方法从该方法接收消息的 JMS Session。以下清单显示了 SessionAwareMessageListener 接口的定义:
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的 MDP 能够对任何接收到的消息进行响应(通过在 onMessage(Message、Session) 方法中使用提供的 Session),你可以选择让你的 MDP 实现此接口(而不是标准 JMS MessageListener 接口)。所有随 Spring 一同提供的消息侦听器容器实现都支持实现 MessageListener 或 SessionAwareMessageListener 接口的 MDP。实现 SessionAwareMessageListener 类的缺点是它们随后与 Spring 绑定到了接口。是否使用它们的决定完全取决于你作为应用程序开发人员或架构师。
注意,SessionAwareMessageListener 接口的 onMessage(…) 方法抛出 JMSException。与标准 JMS MessageListener 接口相反,在使用 SessionAwareMessageListener 接口时,由客户端代码负责处理抛出的任何异常。
Using MessageListenerAdapter
MessageListenerAdapter 类是 Spring 异步消息支持中的最终组件。简单来说,它允许你公开几乎任何类作为 MDP(尽管有一些限制)。
考虑以下接口定义:
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
请注意,虽然接口既没有扩展 MessageListener 也没有扩展 SessionAwareMessageListener 接口,但你仍然可以使用 MessageListenerAdapter 类将它用作 MDP。还要注意,根据它们能接收和处理的各种 Message 类型的不同,各种消息处理方法如何得到强类型。
现在,考虑以下 MessageDelegate 接口的实现:
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别是,请注意 MessageDelegate 接口(DefaultMessageDelegate 类)的前一个实现完全没有 JMS 依赖项。它实际上是一个 POJO,我们可以通过以下配置将其变成 MDP:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下面的示例显示了另一个只能处理接收 JMS TextMessage 消息的 MDP。注意,消息处理方法实际上称为 receive(MessageListenerAdapter 中的消息处理方法的名称默认为 handleMessage),但它是可配置的(如你稍后在本文中所见)。同样要注意的是,receive(…) 方法被强类型化为仅接收和响应 JMS TextMessage 消息。以下清单显示了 TextMessageDelegate 接口的定义:
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下清单显示了实现 TextMessageDelegate 接口的类:
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
然后,相关 MessageListenerAdapter 的配置如下:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果 messageListener 收到一个非 TextMessage 类型的 JMS Message,则将抛出 IllegalStateException(并随后被中止)。MessageListenerAdapter 类的另一个功能是,如果处理程序方法返回非空值,则可以自动发回一个响应 Message。考虑以下接口和类:
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果你将 DefaultResponsiveTextMessageDelegate
与 MessageListenerAdapter
结合使用,那么在执行 receive(..)
方法时,从执行中返回的任何非空值(在默认配置中)都会被转换为 TextMessage
。然后,生成的 TextMessage
将发送到 JMS Reply-To
属性中定义的 Destination
(如果存在)或对 MessageListenerAdapter
设置的默认 Destination
(如果已配置)。如果没有找到 Destination
,则会抛出 InvalidDestinationException
(请注意,此异常不会被吞咽,并且会在调用堆栈中传播)。
Processing Messages Within Transactions
在事务中调用消息侦听器只会要求重新配置侦听器容器。
可以通过侦听器容器定义中的 sessionTransacted
标志激活本地资源事务。然后,每个消息侦听器调用都在一个活动 JMS 事务内操作,在侦听器执行失败的情况下,消息接收会回滚。发送响应消息(通过 SessionAwareMessageListener
)是同一本地事务的一部分,但任何其他资源操作(例如,数据库访问)都是独立操作的。这通常要求在侦听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理未提交的情况。
考虑以下 bean 定义:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
若要参与外部管理事务,你需要配置一个事务管理器,并使用支持外部管理事务的侦听器容器(通常为 DefaultMessageListenerContainer
)。
若要为 XA 事务参与配置消息侦听器容器,你需要配置 JtaTransactionManager
(默认情况下,它会委托给 Jakarta EE 服务器的事务子系统)。请注意,基础的 JMS ConnectionFactory
需要支持 XA 并且与你的 JTA 事务协调器正确注册。(检查你的 Jakarta EE 服务器对 JNDI 资源的配置。)这可以让消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,以牺牲 XA 事务日志开销为代价)。
以下 bean 定义创建了一个事务管理器:
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后,我们需要将其添加到我们早期的容器配置中。容器会处理其余部分。以下示例展示了如何执行此操作:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>