Receiving a Message

本部分描述如何在 Spring 中使用 JMS 接收消息。

This describes how to receive messages with JMS in Spring.

Synchronous Reception

虽然 JMS 通常与异步处理相关,但你可以同步使用消息。重载的 receive(…​) 方法提供了此功能。在同步接收过程中,调用线程会阻塞,直到有消息可用为止。这可能是一个危险的操作,因为调用线程可能潜在地无限期地被阻塞。receiveTimeout 属性指定接收者在放弃等待消息之前应该等待多长时间。

While JMS is typically associated with asynchronous processing, you can consume messages synchronously. The overloaded receive(..) methods provide this functionality. During a synchronous receive, the calling thread blocks until a message becomes available. This can be a dangerous operation, since the calling thread can potentially be blocked indefinitely. The receiveTimeout property specifies how long the receiver should wait before giving up waiting for a message.

Asynchronous reception: Message-Driven POJOs

Spring 还通过使用 @JmsListener 注释来支持带注释的侦听器端点,并提供一个开放的基础设施来以编程方式注册端点。这无疑是设置异步接收器的最便捷方式。有关更多详细信息,请参见 Enable Listener Endpoint Annotations

Spring also supports annotated-listener endpoints through the use of the @JmsListener annotation and provides an open infrastructure to register endpoints programmatically. This is, by far, the most convenient way to setup an asynchronous receiver. See Enable Listener Endpoint Annotations for more details.

类似于 EJB 世界中的消息驱动 Bean (MDB),消息驱动 POJO (MDP) 充当 JMS 消息的接收器。MDP 的一个限制(但请参见 xref:integration/jms/receiving.adoc#jms-receiving-async-message-listener-adapter[Using MessageListenerAdapter)是它必须实现 jakarta.jms.MessageListener 接口。请注意,如果您的 POJO 在多个线程上接收消息,则务必确保您的实现是线程安全的。

In a fashion similar to a Message-Driven Bean (MDB) in the EJB world, the Message-Driven POJO (MDP) acts as a receiver for JMS messages. The one restriction (but see Using MessageListenerAdapter) on an MDP is that it must implement the jakarta.jms.MessageListener interface. Note that, if your POJO receives messages on multiple threads, it is important to ensure that your implementation is thread-safe.

以下示例显示了 MDP 的一个简单实现:

The following example shows a simple implementation of an MDP:

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

实现 MessageListener 后,是时候创建一个监听容器了。

Once you have implemented your MessageListener, it is time to create a message listener container.

以下示例展示了如何定义和配置 Spring 随附的消息监听器容器之一(在此例中为 DefaultMessageListenerContainer):

The following example shows how to define and configure one of the message listener containers that ships with Spring (in this case, DefaultMessageListenerContainer):

  • Java

  • Kotlin

  • Xml

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

有关每个实现支持的功能的完整说明,请参见 Spring javadoc 中各种消息侦听器容器(全部实现 MessageListenerContainer)。

See the Spring javadoc of the various message listener containers (all of which implement MessageListenerContainer) for a full description of the features supported by each implementation.

Using the SessionAwareMessageListener Interface

SessionAwareMessageListener 接口是一个特定于 Spring 的接口,它提供与 JMS MessageListener 接口类似的约定,但还允许消息处理方法从该方法接收消息的 JMS Session。以下清单显示了 SessionAwareMessageListener 接口的定义:

The SessionAwareMessageListener interface is a Spring-specific interface that provides a similar contract to the JMS MessageListener interface but also gives the message-handling method access to the JMS Session from which the Message was received. The following listing shows the definition of the SessionAwareMessageListener interface:

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的 MDP 能够对任何接收到的消息进行响应(通过在 onMessage(Message、Session) 方法中使用提供的 Session),你可以选择让你的 MDP 实现此接口(而不是标准 JMS MessageListener 接口)。所有随 Spring 一同提供的消息侦听器容器实现都支持实现 MessageListener 或 SessionAwareMessageListener 接口的 MDP。实现 SessionAwareMessageListener 类的缺点是它们随后与 Spring 绑定到了接口。是否使用它们的决定完全取决于你作为应用程序开发人员或架构师。

You can choose to have your MDPs implement this interface (in preference to the standard JMS MessageListener interface) if you want your MDPs to be able to respond to any received messages (by using the Session supplied in the onMessage(Message, Session) method). All of the message listener container implementations that ship with Spring have support for MDPs that implement either the MessageListener or SessionAwareMessageListener interface. Classes that implement the SessionAwareMessageListener come with the caveat that they are then tied to Spring through the interface. The choice of whether or not to use it is left entirely up to you as an application developer or architect.

注意,SessionAwareMessageListener 接口的 onMessage(…​) 方法抛出 JMSException。与标准 JMS MessageListener 接口相反,在使用 SessionAwareMessageListener 接口时,由客户端代码负责处理抛出的任何异常。

Note that the onMessage(..) method of the SessionAwareMessageListener interface throws JMSException. In contrast to the standard JMS MessageListener interface, when using the SessionAwareMessageListener interface, it is the responsibility of the client code to handle any thrown exceptions.

Using MessageListenerAdapter

MessageListenerAdapter 类是 Spring 异步消息支持中的最终组件。简单来说,它允许你公开几乎任何类作为 MDP(尽管有一些限制)。

The MessageListenerAdapter class is the final component in Spring’s asynchronous messaging support. In a nutshell, it lets you expose almost any class as an MDP (though there are some constraints).

考虑以下接口定义:

Consider the following interface definition:

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

请注意,虽然接口既没有扩展 MessageListener 也没有扩展 SessionAwareMessageListener 接口,但你仍然可以使用 MessageListenerAdapter 类将它用作 MDP。还要注意,根据它们能接收和处理的各种 Message 类型的不同,各种消息处理方法如何得到强类型。

Notice that, although the interface extends neither the MessageListener nor the SessionAwareMessageListener interface, you can still use it as an MDP by using the MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle.

现在,考虑以下 MessageDelegate 接口的实现:

Now consider the following implementation of the MessageDelegate interface:

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特别是,请注意 MessageDelegate 接口(DefaultMessageDelegate 类)的前一个实现完全没有 JMS 依赖项。它实际上是一个 POJO,我们可以通过以下配置将其变成 MDP:

In particular, note how the preceding implementation of the MessageDelegate interface (the DefaultMessageDelegate class) has no JMS dependencies at all. It truly is a POJO that we can make into an MDP through the following configuration:

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下面的示例显示了另一个只能处理接收 JMS TextMessage 消息的 MDP。注意,消息处理方法实际上称为 receive(MessageListenerAdapter 中的消息处理方法的名称默认为 handleMessage),但它是可配置的(如你稍后在本文中所见)。同样要注意的是,receive(…​) 方法被强类型化为仅接收和响应 JMS TextMessage 消息。以下清单显示了 TextMessageDelegate 接口的定义:

The next example shows another MDP that can handle only receiving JMS TextMessage messages. Notice how the message handling method is actually called receive (the name of the message handling method in a MessageListenerAdapter defaults to handleMessage), but it is configurable (as you can see later in this section). Notice also how the receive(..) method is strongly typed to receive and respond only to JMS TextMessage messages. The following listing shows the definition of the TextMessageDelegate interface:

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

以下清单显示了实现 TextMessageDelegate 接口的类:

The following listing shows a class that implements the TextMessageDelegate interface:

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

然后,相关 MessageListenerAdapter 的配置如下:

The configuration of the attendant MessageListenerAdapter would then be as follows:

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果 messageListener 收到一个非 TextMessage 类型的 JMS Message,则将抛出 IllegalStateException(并随后被中止)。MessageListenerAdapter 类的另一个功能是,如果处理程序方法返回非空值,则可以自动发回一个响应 Message。考虑以下接口和类:

Note that, if the messageListener receives a JMS Message of a type other than TextMessage, an IllegalStateException is thrown (and subsequently swallowed). Another of the capabilities of the MessageListenerAdapter class is the ability to automatically send back a response Message if a handler method returns a non-void value. Consider the following interface and class:

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果你将 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 结合使用,那么在执行 receive(..) 方法时,从执行中返回的任何非空值(在默认配置中)都会被转换为 TextMessage。然后,生成的 TextMessage 将发送到 JMS Reply-To 属性中定义的 Destination(如果存在)或对 MessageListenerAdapter 设置的默认 Destination(如果已配置)。如果没有找到 Destination,则会抛出 InvalidDestinationException(请注意,此异常不会被吞咽,并且会在调用堆栈中传播)。

If you use the DefaultResponsiveTextMessageDelegate in conjunction with a MessageListenerAdapter, any non-null value that is returned from the execution of the ’receive(..)'` method is (in the default configuration) converted into a TextMessage. The resulting TextMessage is then sent to the Destination (if one exists) defined in the JMS Reply-To property of the original Message or the default Destination set on the MessageListenerAdapter (if one has been configured). If no Destination is found, an InvalidDestinationException is thrown (note that this exception is not swallowed and propagates up the call stack).

Processing Messages Within Transactions

在事务中调用消息侦听器只会要求重新配置侦听器容器。

Invoking a message listener within a transaction requires only reconfiguration of the listener container.

可以通过侦听器容器定义中的 sessionTransacted 标志激活本地资源事务。然后,每个消息侦听器调用都在一个活动 JMS 事务内操作,在侦听器执行失败的情况下,消息接收会回滚。发送响应消息(通过 SessionAwareMessageListener)是同一本地事务的一部分,但任何其他资源操作(例如,数据库访问)都是独立操作的。这通常要求在侦听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理未提交的情况。

You can activate local resource transactions through the sessionTransacted flag on the listener container definition. Each message listener invocation then operates within an active JMS transaction, with message reception rolled back in case of listener execution failure. Sending a response message (through SessionAwareMessageListener) is part of the same local transaction, but any other resource operations (such as database access) operate independently. This usually requires duplicate message detection in the listener implementation, to cover the case where database processing has committed but message processing failed to commit.

考虑以下 bean 定义:

Consider the following bean definition:

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

若要参与外部管理事务,你需要配置一个事务管理器,并使用支持外部管理事务的侦听器容器(通常为 DefaultMessageListenerContainer)。

To participate in an externally managed transaction, you need to configure a transaction manager and use a listener container that supports externally managed transactions (typically, DefaultMessageListenerContainer).

若要为 XA 事务参与配置消息侦听器容器,你需要配置 JtaTransactionManager(默认情况下,它会委托给 Jakarta EE 服务器的事务子系统)。请注意,基础的 JMS ConnectionFactory 需要支持 XA 并且与你的 JTA 事务协调器正确注册。(检查你的 Jakarta EE 服务器对 JNDI 资源的配置。)这可以让消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,以牺牲 XA 事务日志开销为代价)。

To configure a message listener container for XA transaction participation, you want to configure a JtaTransactionManager (which, by default, delegates to the Jakarta EE server’s transaction subsystem). Note that the underlying JMS ConnectionFactory needs to be XA-capable and properly registered with your JTA transaction coordinator. (Check your Jakarta EE server’s configuration of JNDI resources.) This lets message reception as well as (for example) database access be part of the same transaction (with unified commit semantics, at the expense of XA transaction log overhead).

以下 bean 定义创建了一个事务管理器:

The following bean definition creates a transaction manager:

  • Java

  • Kotlin

  • Xml

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,我们需要将其添加到我们早期的容器配置中。容器会处理其余部分。以下示例展示了如何执行此操作:

Then we need to add it to our earlier container configuration. The container takes care of the rest. The following example shows how to do so:

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>