Annotation-driven Listener Endpoints

接收消息的最简单方法是使用带注释的侦听器终端基础架构。简而言之,它允许你将托管 Bean 的方法显示为 JMS 侦听器终端。以下示例展示了如何使用该方法:

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

前一个示例的想法是,每当在 jakarta.jms.Destination myDestination 上有消息可用时,processOrder 方法都将被相应地调用(在这种情况下,使用 JMS 消息的内容,类似于 MessageListenerAdapter 提供的内容)。 带注释的终端基础架构会利用 JmsListenerContainerFactory 为每个带注释方法在幕后创建一个消息侦听器容器。不会针对应用程序上下文注册此类容器,但可轻松地使用 JmsListenerEndpointRegistry Bean 找到该容器以用于管理目的。

@JmsListener 是 Java 8 中的可重复注释,因此你可以通过向其添加额外的 @JmsListener 声明来将多个 JMS 目标地址与相同的方法关联起来。

Enable Listener Endpoint Annotations

若要启用对 @JmsListener 注释的支持,可在你的一个 @Configuration 类中添加 @EnableJms,如下面的示例所示:

  • Java

  • Kotlin

  • Xml

@Configuration
@EnableJms
public class JmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
			DestinationResolver destinationResolver) {

		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setDestinationResolver(destinationResolver);
		factory.setSessionTransacted(true);
		factory.setConcurrency("3-10");
		return factory;
	}
}
@Configuration
@EnableJms
class JmsConfiguration {

	@Bean
	fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
		DefaultJmsListenerContainerFactory().apply {
			setConnectionFactory(connectionFactory)
			setDestinationResolver(destinationResolver)
			setSessionTransacted(true)
			setConcurrency("3-10")
		}
}
<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destinationResolver" ref="destinationResolver"/>
	<property name="sessionTransacted" value="true"/>
	<property name="concurrency" value="3-10"/>
</bean>

默认情况下,基础架构会查找名为 jmsListenerContainerFactory 的 Bean 作为用于创建消息侦听器容器的工厂的源。在这种情况下(并忽略了 JMS 基础架构设置),你可以调用 processOrder 方法,使用三个线程作为核心池大小,十个线程作为最大池大小。

你可以自定义侦听器容器工厂以便对每个注释使用,或者可以通过实现 JmsListenerConfigurer`接口来配置一个明确的默认值。只有在至少一个端点未注册为特定容器工厂时才需要默认值。请参阅实现 `JmsListenerConfigurer 类的 javadoc 以获取详细信息和示例。

Programmatic Endpoint Registration

JmsListenerEndpoint 提供了 JMS 终端的模型,并负责为该模型配置容器。除通过 JmsListener 注释检测到的终端外,基础架构还允许你以编程方式配置终端。以下示例展示了如何执行此操作:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

在前一个示例中,我们使用了 SimpleJmsListenerEndpoint,它提供了要调用的实际 MessageListener。不过,你也可以构建自己的终端变量,以描述自定义调用机制。

请注意,你可以完全跳过使用 @JmsListener,并只通过 JmsListenerConfigurer 以编程方式注册终端。

Annotated Endpoint Method Signature

到目前为止我们一直在终端注入一个简单的 String,但实际上它可以有非常灵活的方法签名。在以下示例中,我们重写它以注入具有自定义标头的 Order

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(Order order, @Header("order_type") String orderType) {
		...
	}
}

可在 JMS 侦听器终端注入的主要元素如下:

  • 原始 jakarta.jms.Message 或其任何子类(只要它与传入消息类型匹配即可)。

  • jakarta.jms.Session 用于可选择访问本机 JMS API(例如,用于发送自定义回复)。

  • org.springframework.messaging.Message 表示传入 JMS 消息。请注意,此消息同时包含自定义标头和标准标头(如由 JmsHeaders 定义)。

  • @Header 注释的方法参数,用于提取特定标头值,包括标准 JMS 标头。

  • @Headers 注释的参数,它还必须可分配给 java.util.Map 以便忘记对所有标头的访问。

  • 未注释的非受支持类型元素 (MessageSession) 被视为有效负载。你可以通过使用 @Payload 注释参数来明确表示。你还可以通过添加额外的 @Valid 来启用验证。

注入 Spring 的 Message 抽象的能力特别有用,你可以利用其从传输特定消息中存储的所有信息中受益,而无需依赖传输特定 API。以下示例展示了如何执行此操作:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

对方法参数的处理由 DefaultMessageHandlerMethodFactory 提供,你可以进一步自定义它以支持其他方法参数。你也可以在其中自定义转换和验证支持。

例如,如果我们希望确保在处理 Order 之前它有效,我们可以用 @Valid 注释有效负载,并配置必要的验证器,如下面的示例所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
	}

	@Bean
	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(myValidator());
		return factory;
	}
}

Response Management

MessageListenerAdapter 中现有的支持已经可以让你的方法具有非-void 返回类型。在那种情况下,调用的结果将封装在 jakarta.jms.Message 中,发送到在原始消息的 JMSReplyTo 头中指定的目标或监听器上配置的默认目标中。你现在可以使用消息抽象的 @SendTo 注解来设置该默认目标。

假设我们的 processOrder 方法现在应返回 OrderStatus,我们能够编写它来自动发送响应,如下面的示例所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
	// order processing
	return status;
}

如果你有多个 @JmsListener 注释的方法,你还可以将 @SendTo 注释放在类级别,以便共享一个默认的回复目标地址。

如果你需要以与传输无关的方式设置其他标头,则可以使用类似于以下方法的方法返回 Message

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
	// order processing
	return MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
}

如果你需要在运行时计算响应目标,则可以通过将响应封装在 JmsResponse 实例中来完成该操作,该实例还提供了将在运行时使用的目标。我们可以按如下方式重写前面的示例:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
	// order processing
	Message<OrderStatus> response = MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
	return JmsResponse.forQueue(response, "status");
}

最后,如果你需要为响应指定一些 QoS 值(例如优先级或生存时间),则可以相应地配置 JmsListenerContainerFactory,如下面的示例所示:

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		QosSettings replyQosSettings = new QosSettings();
		replyQosSettings.setPriority(2);
		replyQosSettings.setTimeToLive(10000);
		factory.setReplyQosSettings(replyQosSettings);
		return factory;
	}
}