Annotation-driven Listener Endpoints

接收消息的最简单方法是使用带注释的侦听器终端基础架构。简而言之,它允许你将托管 Bean 的方法显示为 JMS 侦听器终端。以下示例展示了如何使用该方法:

The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it lets you expose a method of a managed bean as a JMS listener endpoint. The following example shows how to use it:

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

前一个示例的想法是,每当在 jakarta.jms.Destination myDestination 上有消息可用时,processOrder 方法都将被相应地调用(在这种情况下,使用 JMS 消息的内容,类似于 MessageListenerAdapter 提供的内容)。

The idea of the preceding example is that, whenever a message is available on the jakarta.jms.Destination myDestination, the processOrder method is invoked accordingly (in this case, with the content of the JMS message, similar to what the MessageListenerAdapter provides).

带注释的终端基础架构会利用 JmsListenerContainerFactory 为每个带注释方法在幕后创建一个消息侦听器容器。不会针对应用程序上下文注册此类容器,但可轻松地使用 JmsListenerEndpointRegistry Bean 找到该容器以用于管理目的。

The annotated endpoint infrastructure creates a message listener container behind the scenes for each annotated method, by using a JmsListenerContainerFactory. Such a container is not registered against the application context but can be easily located for management purposes by using the JmsListenerEndpointRegistry bean.

@JmsListener 是 Java 8 中的可重复注释,因此你可以通过向其添加额外的 @JmsListener 声明来将多个 JMS 目标地址与相同的方法关联起来。

@JmsListener is a repeatable annotation on Java 8, so you can associate several JMS destinations with the same method by adding additional @JmsListener declarations to it.

Enable Listener Endpoint Annotations

若要启用对 @JmsListener 注释的支持,可在你的一个 @Configuration 类中添加 @EnableJms,如下面的示例所示:

To enable support for @JmsListener annotations, you can add @EnableJms to one of your @Configuration classes, as the following example shows:

  • Java

  • Kotlin

  • Xml

@Configuration
@EnableJms
public class JmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
			DestinationResolver destinationResolver) {

		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setDestinationResolver(destinationResolver);
		factory.setSessionTransacted(true);
		factory.setConcurrency("3-10");
		return factory;
	}
}
@Configuration
@EnableJms
class JmsConfiguration {

	@Bean
	fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
		DefaultJmsListenerContainerFactory().apply {
			setConnectionFactory(connectionFactory)
			setDestinationResolver(destinationResolver)
			setSessionTransacted(true)
			setConcurrency("3-10")
		}
}
<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destinationResolver" ref="destinationResolver"/>
	<property name="sessionTransacted" value="true"/>
	<property name="concurrency" value="3-10"/>
</bean>

默认情况下,基础架构会查找名为 jmsListenerContainerFactory 的 Bean 作为用于创建消息侦听器容器的工厂的源。在这种情况下(并忽略了 JMS 基础架构设置),你可以调用 processOrder 方法,使用三个线程作为核心池大小,十个线程作为最大池大小。

By default, the infrastructure looks for a bean named jmsListenerContainerFactory as the source for the factory to use to create message listener containers. In this case (and ignoring the JMS infrastructure setup), you can invoke the processOrder method with a core pool size of three threads and a maximum pool size of ten threads.

你可以自定义侦听器容器工厂以便对每个注释使用,或者可以通过实现 JmsListenerConfigurer`接口来配置一个明确的默认值。只有在至少一个端点未注册为特定容器工厂时才需要默认值。请参阅实现 `JmsListenerConfigurer 类的 javadoc 以获取详细信息和示例。

You can customize the listener container factory to use for each annotation or you can configure an explicit default by implementing the JmsListenerConfigurer interface. The default is required only if at least one endpoint is registered without a specific container factory. See the javadoc of classes that implement JmsListenerConfigurer for details and examples.

Programmatic Endpoint Registration

JmsListenerEndpoint 提供了 JMS 终端的模型,并负责为该模型配置容器。除通过 JmsListener 注释检测到的终端外,基础架构还允许你以编程方式配置终端。以下示例展示了如何执行此操作:

JmsListenerEndpoint provides a model of a JMS endpoint and is responsible for configuring the container for that model. The infrastructure lets you programmatically configure endpoints in addition to the ones that are detected by the JmsListener annotation. The following example shows how to do so:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

在前一个示例中,我们使用了 SimpleJmsListenerEndpoint,它提供了要调用的实际 MessageListener。不过,你也可以构建自己的终端变量,以描述自定义调用机制。

In the preceding example, we used SimpleJmsListenerEndpoint, which provides the actual MessageListener to invoke. However, you could also build your own endpoint variant to describe a custom invocation mechanism.

请注意,你可以完全跳过使用 @JmsListener,并只通过 JmsListenerConfigurer 以编程方式注册终端。

Note that you could skip the use of @JmsListener altogether and programmatically register only your endpoints through JmsListenerConfigurer.

Annotated Endpoint Method Signature

到目前为止我们一直在终端注入一个简单的 String,但实际上它可以有非常灵活的方法签名。在以下示例中,我们重写它以注入具有自定义标头的 Order

So far, we have been injecting a simple String in our endpoint, but it can actually have a very flexible method signature. In the following example, we rewrite it to inject the Order with a custom header:

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(Order order, @Header("order_type") String orderType) {
		...
	}
}

可在 JMS 侦听器终端注入的主要元素如下:

The main elements you can inject in JMS listener endpoints are as follows:

  • The raw jakarta.jms.Message or any of its subclasses (provided that it matches the incoming message type).

  • The jakarta.jms.Session for optional access to the native JMS API (for example, for sending a custom reply).

  • The org.springframework.messaging.Message that represents the incoming JMS message. Note that this message holds both the custom and the standard headers (as defined by JmsHeaders).

  • @Header-annotated method arguments to extract a specific header value, including standard JMS headers.

  • A @Headers-annotated argument that must also be assignable to java.util.Map for getting access to all headers.

  • A non-annotated element that is not one of the supported types (Message or Session) is considered to be the payload. You can make that explicit by annotating the parameter with @Payload. You can also turn on validation by adding an extra @Valid.

注入 Spring 的 Message 抽象的能力特别有用,你可以利用其从传输特定消息中存储的所有信息中受益,而无需依赖传输特定 API。以下示例展示了如何执行此操作:

The ability to inject Spring’s Message abstraction is particularly useful to benefit from all the information stored in the transport-specific message without relying on transport-specific API. The following example shows how to do so:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

对方法参数的处理由 DefaultMessageHandlerMethodFactory 提供,你可以进一步自定义它以支持其他方法参数。你也可以在其中自定义转换和验证支持。

Handling of method arguments is provided by DefaultMessageHandlerMethodFactory, which you can further customize to support additional method arguments. You can customize the conversion and validation support there as well.

例如,如果我们希望确保在处理 Order 之前它有效,我们可以用 @Valid 注释有效负载,并配置必要的验证器,如下面的示例所示:

For instance, if we want to make sure our Order is valid before processing it, we can annotate the payload with @Valid and configure the necessary validator, as the following example shows:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
	}

	@Bean
	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(myValidator());
		return factory;
	}
}

Response Management

MessageListenerAdapter 中现有的支持已经可以让你的方法具有非-void 返回类型。在那种情况下,调用的结果将封装在 jakarta.jms.Message 中,发送到在原始消息的 JMSReplyTo 头中指定的目标或监听器上配置的默认目标中。你现在可以使用消息抽象的 @SendTo 注解来设置该默认目标。

The existing support in MessageListenerAdapter already lets your method have a non-void return type. When that is the case, the result of the invocation is encapsulated in a jakarta.jms.Message, sent either in the destination specified in the JMSReplyTo header of the original message or in the default destination configured on the listener. You can now set that default destination by using the @SendTo annotation of the messaging abstraction.

假设我们的 processOrder 方法现在应返回 OrderStatus,我们能够编写它来自动发送响应,如下面的示例所示:

Assuming that our processOrder method should now return an OrderStatus, we can write it to automatically send a response, as the following example shows:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
	// order processing
	return status;
}

如果你有多个 @JmsListener 注释的方法,你还可以将 @SendTo 注释放在类级别,以便共享一个默认的回复目标地址。

If you have several @JmsListener-annotated methods, you can also place the @SendTo annotation at the class level to share a default reply destination.

如果你需要以与传输无关的方式设置其他标头,则可以使用类似于以下方法的方法返回 Message

If you need to set additional headers in a transport-independent manner, you can return a Message instead, with a method similar to the following:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
	// order processing
	return MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
}

如果你需要在运行时计算响应目标,则可以通过将响应封装在 JmsResponse 实例中来完成该操作,该实例还提供了将在运行时使用的目标。我们可以按如下方式重写前面的示例:

If you need to compute the response destination at runtime, you can encapsulate your response in a JmsResponse instance that also provides the destination to use at runtime. We can rewrite the previous example as follows:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
	// order processing
	Message<OrderStatus> response = MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
	return JmsResponse.forQueue(response, "status");
}

最后,如果你需要为响应指定一些 QoS 值(例如优先级或生存时间),则可以相应地配置 JmsListenerContainerFactory,如下面的示例所示:

Finally, if you need to specify some QoS values for the response such as the priority or the time to live, you can configure the JmsListenerContainerFactory accordingly, as the following example shows:

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		QosSettings replyQosSettings = new QosSettings();
		replyQosSettings.setPriority(2);
		replyQosSettings.setTimeToLive(10000);
		factory.setReplyQosSettings(replyQosSettings);
		return factory;
	}
}