Receiving a Message
本部分描述如何在 Spring 中使用 JMS 接收消息。
This describes how to receive messages with JMS in Spring.
Synchronous Reception
虽然 JMS 通常与异步处理相关,但你可以同步使用消息。重载的 receive(…) 方法提供了此功能。在同步接收过程中,调用线程会阻塞,直到有消息可用为止。这可能是一个危险的操作,因为调用线程可能潜在地无限期地被阻塞。receiveTimeout 属性指定接收者在放弃等待消息之前应该等待多长时间。
While JMS is typically associated with asynchronous processing, you can
consume messages synchronously. The overloaded receive(..)
methods provide this
functionality. During a synchronous receive, the calling thread blocks until a message
becomes available. This can be a dangerous operation, since the calling thread can
potentially be blocked indefinitely. The receiveTimeout
property specifies how long
the receiver should wait before giving up waiting for a message.
Asynchronous reception: Message-Driven POJOs
Spring 还通过使用 |
Spring also supports annotated-listener endpoints through the use of the |
类似于 EJB 世界中的消息驱动 Bean (MDB),消息驱动 POJO (MDP) 充当 JMS 消息的接收器。MDP 的一个限制(但请参见 xref:integration/jms/receiving.adoc#jms-receiving-async-message-listener-adapter[Using MessageListenerAdapter
)是它必须实现 jakarta.jms.MessageListener
接口。请注意,如果您的 POJO 在多个线程上接收消息,则务必确保您的实现是线程安全的。
In a fashion similar to a Message-Driven Bean (MDB) in the EJB world, the Message-Driven
POJO (MDP) acts as a receiver for JMS messages. The one restriction (but see
Using MessageListenerAdapter
) on an MDP is that it must implement
the jakarta.jms.MessageListener
interface. Note that, if your POJO receives messages
on multiple threads, it is important to ensure that your implementation is thread-safe.
以下示例显示了 MDP 的一个简单实现:
The following example shows a simple implementation of an MDP:
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
实现 MessageListener 后,是时候创建一个监听容器了。
Once you have implemented your MessageListener
, it is time to create a message listener
container.
以下示例展示了如何定义和配置 Spring 随附的消息监听器容器之一(在此例中为 DefaultMessageListenerContainer):
The following example shows how to define and configure one of the message listener
containers that ships with Spring (in this case, DefaultMessageListenerContainer
):
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
有关每个实现支持的功能的完整说明,请参见 Spring javadoc 中各种消息侦听器容器(全部实现 MessageListenerContainer)。
See the Spring javadoc of the various message listener containers (all of which implement MessageListenerContainer) for a full description of the features supported by each implementation.
Using the SessionAwareMessageListener
Interface
SessionAwareMessageListener 接口是一个特定于 Spring 的接口,它提供与 JMS MessageListener 接口类似的约定,但还允许消息处理方法从该方法接收消息的 JMS Session。以下清单显示了 SessionAwareMessageListener 接口的定义:
The SessionAwareMessageListener
interface is a Spring-specific interface that provides
a similar contract to the JMS MessageListener
interface but also gives the message-handling
method access to the JMS Session
from which the Message
was received.
The following listing shows the definition of the SessionAwareMessageListener
interface:
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的 MDP 能够对任何接收到的消息进行响应(通过在 onMessage(Message、Session) 方法中使用提供的 Session),你可以选择让你的 MDP 实现此接口(而不是标准 JMS MessageListener 接口)。所有随 Spring 一同提供的消息侦听器容器实现都支持实现 MessageListener 或 SessionAwareMessageListener 接口的 MDP。实现 SessionAwareMessageListener 类的缺点是它们随后与 Spring 绑定到了接口。是否使用它们的决定完全取决于你作为应用程序开发人员或架构师。
You can choose to have your MDPs implement this interface (in preference to the standard
JMS MessageListener
interface) if you want your MDPs to be able to respond to any
received messages (by using the Session
supplied in the onMessage(Message, Session)
method). All of the message listener container implementations that ship with Spring
have support for MDPs that implement either the MessageListener
or
SessionAwareMessageListener
interface. Classes that implement the
SessionAwareMessageListener
come with the caveat that they are then tied to Spring
through the interface. The choice of whether or not to use it is left entirely up to you
as an application developer or architect.
注意,SessionAwareMessageListener 接口的 onMessage(…) 方法抛出 JMSException。与标准 JMS MessageListener 接口相反,在使用 SessionAwareMessageListener 接口时,由客户端代码负责处理抛出的任何异常。
Note that the onMessage(..)
method of the SessionAwareMessageListener
interface throws JMSException
. In contrast to the standard JMS MessageListener
interface, when using the SessionAwareMessageListener
interface, it is the
responsibility of the client code to handle any thrown exceptions.
Using MessageListenerAdapter
MessageListenerAdapter 类是 Spring 异步消息支持中的最终组件。简单来说,它允许你公开几乎任何类作为 MDP(尽管有一些限制)。
The MessageListenerAdapter
class is the final component in Spring’s asynchronous
messaging support. In a nutshell, it lets you expose almost any class as an MDP
(though there are some constraints).
考虑以下接口定义:
Consider the following interface definition:
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
请注意,虽然接口既没有扩展 MessageListener 也没有扩展 SessionAwareMessageListener 接口,但你仍然可以使用 MessageListenerAdapter 类将它用作 MDP。还要注意,根据它们能接收和处理的各种 Message 类型的不同,各种消息处理方法如何得到强类型。
Notice that, although the interface extends neither the MessageListener
nor the
SessionAwareMessageListener
interface, you can still use it as an MDP by using the
MessageListenerAdapter
class. Notice also how the various message handling methods are
strongly typed according to the contents of the various Message
types that they can
receive and handle.
现在,考虑以下 MessageDelegate 接口的实现:
Now consider the following implementation of the MessageDelegate
interface:
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别是,请注意 MessageDelegate 接口(DefaultMessageDelegate 类)的前一个实现完全没有 JMS 依赖项。它实际上是一个 POJO,我们可以通过以下配置将其变成 MDP:
In particular, note how the preceding implementation of the MessageDelegate
interface (the
DefaultMessageDelegate
class) has no JMS dependencies at all. It truly is a
POJO that we can make into an MDP through the following configuration:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下面的示例显示了另一个只能处理接收 JMS TextMessage 消息的 MDP。注意,消息处理方法实际上称为 receive(MessageListenerAdapter 中的消息处理方法的名称默认为 handleMessage),但它是可配置的(如你稍后在本文中所见)。同样要注意的是,receive(…) 方法被强类型化为仅接收和响应 JMS TextMessage 消息。以下清单显示了 TextMessageDelegate 接口的定义:
The next example shows another MDP that can handle only receiving JMS
TextMessage
messages. Notice how the message handling method is actually called
receive
(the name of the message handling method in a MessageListenerAdapter
defaults to handleMessage
), but it is configurable (as you can see later in this section). Notice
also how the receive(..)
method is strongly typed to receive and respond only to JMS
TextMessage
messages.
The following listing shows the definition of the TextMessageDelegate
interface:
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下清单显示了实现 TextMessageDelegate 接口的类:
The following listing shows a class that implements the TextMessageDelegate
interface:
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
然后,相关 MessageListenerAdapter 的配置如下:
The configuration of the attendant MessageListenerAdapter
would then be as follows:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果 messageListener 收到一个非 TextMessage 类型的 JMS Message,则将抛出 IllegalStateException(并随后被中止)。MessageListenerAdapter 类的另一个功能是,如果处理程序方法返回非空值,则可以自动发回一个响应 Message。考虑以下接口和类:
Note that, if the messageListener
receives a JMS Message
of a type
other than TextMessage
, an IllegalStateException
is thrown (and subsequently
swallowed). Another of the capabilities of the MessageListenerAdapter
class is the
ability to automatically send back a response Message
if a handler method returns a
non-void value. Consider the following interface and class:
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果你将 DefaultResponsiveTextMessageDelegate
与 MessageListenerAdapter
结合使用,那么在执行 receive(..)
方法时,从执行中返回的任何非空值(在默认配置中)都会被转换为 TextMessage
。然后,生成的 TextMessage
将发送到 JMS Reply-To
属性中定义的 Destination
(如果存在)或对 MessageListenerAdapter
设置的默认 Destination
(如果已配置)。如果没有找到 Destination
,则会抛出 InvalidDestinationException
(请注意,此异常不会被吞咽,并且会在调用堆栈中传播)。
If you use the DefaultResponsiveTextMessageDelegate
in conjunction with a
MessageListenerAdapter
, any non-null value that is returned from the execution of
the ’receive(..)'` method is (in the default configuration) converted into a
TextMessage
. The resulting TextMessage
is then sent to the Destination
(if
one exists) defined in the JMS Reply-To
property of the original Message
or the
default Destination
set on the MessageListenerAdapter
(if one has been configured).
If no Destination
is found, an InvalidDestinationException
is thrown
(note that this exception is not swallowed and propagates up the
call stack).
Processing Messages Within Transactions
在事务中调用消息侦听器只会要求重新配置侦听器容器。
Invoking a message listener within a transaction requires only reconfiguration of the listener container.
可以通过侦听器容器定义中的 sessionTransacted
标志激活本地资源事务。然后,每个消息侦听器调用都在一个活动 JMS 事务内操作,在侦听器执行失败的情况下,消息接收会回滚。发送响应消息(通过 SessionAwareMessageListener
)是同一本地事务的一部分,但任何其他资源操作(例如,数据库访问)都是独立操作的。这通常要求在侦听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理未提交的情况。
You can activate local resource transactions through the sessionTransacted
flag
on the listener container definition. Each message listener invocation then operates
within an active JMS transaction, with message reception rolled back in case of listener
execution failure. Sending a response message (through SessionAwareMessageListener
) is
part of the same local transaction, but any other resource operations (such as
database access) operate independently. This usually requires duplicate message
detection in the listener implementation, to cover the case where database processing
has committed but message processing failed to commit.
考虑以下 bean 定义:
Consider the following bean definition:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
若要参与外部管理事务,你需要配置一个事务管理器,并使用支持外部管理事务的侦听器容器(通常为 DefaultMessageListenerContainer
)。
To participate in an externally managed transaction, you need to configure a
transaction manager and use a listener container that supports externally managed
transactions (typically, DefaultMessageListenerContainer
).
若要为 XA 事务参与配置消息侦听器容器,你需要配置 JtaTransactionManager
(默认情况下,它会委托给 Jakarta EE 服务器的事务子系统)。请注意,基础的 JMS ConnectionFactory
需要支持 XA 并且与你的 JTA 事务协调器正确注册。(检查你的 Jakarta EE 服务器对 JNDI 资源的配置。)这可以让消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,以牺牲 XA 事务日志开销为代价)。
To configure a message listener container for XA transaction participation, you want
to configure a JtaTransactionManager
(which, by default, delegates to the Jakarta EE
server’s transaction subsystem). Note that the underlying JMS ConnectionFactory
needs to
be XA-capable and properly registered with your JTA transaction coordinator. (Check your
Jakarta EE server’s configuration of JNDI resources.) This lets message reception as well
as (for example) database access be part of the same transaction (with unified commit
semantics, at the expense of XA transaction log overhead).
以下 bean 定义创建了一个事务管理器:
The following bean definition creates a transaction manager:
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后,我们需要将其添加到我们早期的容器配置中。容器会处理其余部分。以下示例展示了如何执行此操作:
Then we need to add it to our earlier container configuration. The container takes care of the rest. The following example shows how to do so:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>