Using Spring JMS

本部分描述如何使用 Spring 的 JMS 组件。

Using JmsTemplate

JmsTemplate 类是 JMS 核心包中的核心类。它简化了 JMS 的使用,因为它在发送或同步接收消息时处理资源的创建和释放。

使用 JmsTemplate 的代码只需实现回调接口,这些接口提供清晰定义的高级契约。MessageCreator 回调接口在给定 JmsTemplate 中调用代码提供的 Session 时创建消息。为了允许更复杂地使用 JMS API,SessionCallback 提供 JMS session,ProducerCallback 公开 SessionMessageProducer 对。

JMS API 公开了两种类型的发送方法,一种采用传输模式、优先级和生存时间作为服务质量 (QOS) 参数,而另一种不采用 QOS 参数并使用默认值。由于 JmsTemplate 具有许多发送方法,因此将 QOS 参数设置为了 bean 属性以避免发送方法数量的重复。同样,同步接收调用的超时值是通过使用 setReceiveTimeout 属性设置的。

一些 JMS 提供程序允许通过 ConnectionFactory 的配置以管理方式设置 QOS 默认值。这会产生这样的效果:对 MessageProducer 实例的 send 方法 (send(Destination destination, Message message)) 的调用使用不同于 JMS 规范中指定的 QOS 默认值。为了提供对 QOS 值的一致性管理,因此 JmsTemplate 必须明确启用以使用其自己的 QOS 值,方法是将布尔属性 isExplicitQosEnabled 设置为 true

为了方便,JmsTemplate 还公开了一个基本请求-应答操作,该操作允许发送消息并在创建为该操作一部分的临时队列上等待答复。

JmsTemplate 类的实例一旦进行配置,就是线程安全的。这一点很重要,因为它意味着您可以配置 JmsTemplate 的单个实例,然后可以安全地将此共享引用注入到多个协作者中。需要澄清的是,JmsTemplate 是有状态的,因为它保留对 ConnectionFactory 的引用,但此状态不是会话状态。

自 Spring Framework 4.1 起,JmsMessagingTemplate 基于 JmsTemplate 构建,并提供与消息抽象的集成,即 org.springframework.messaging.Message。这允许您以通用方式创建要发送的消息。

Connections

JmsTemplate 要求引用 ConnectionFactoryConnectionFactory 是 JMS 规范的一部分,是使用 JMS 的入口点。客户端应用程序将其用作工厂,以创建与 JMS 提供程序的连接,并封装各种配置参数,其中许多是特定于供应商的,例如 SSL 配置选项。

在 EJB 内部使用 JMS 时,供应商提供了 JMS 接口的实现,以便它们可以参与声明式事务管理并执行连接和 session 的池化。为了使用此实现,Jakarta EE 容器通常要求您在 EJB 或 servlet 部署描述符中将 JMS 连接工厂声明为 resource-ref。为了确保在 EJB 内将这些功能与 JmsTemplate 一起使用,客户端应用程序应确保它引用 ConnectionFactory 的托管实现。

Caching Messaging Resources

标准 API 涉及创建许多中间对象。要发送消息,将执行以下“API”遍历:

ConnectionFactory->Connection->Session->MessageProducer->send

ConnectionFactorySend 操作之间,将创建和销毁三个中间对象。为了优化资源使用并提高性能,Spring 提供了 ConnectionFactory 的两种实现。

Using SingleConnectionFactory

Spring 提供了 ConnectionFactory 接口的一个实现 SingleConnectionFactory,它在所有 createConnection() 调用上返回相同的 Connection 并忽略对 close() 的调用。这对于测试和独立环境很有用,这样同一个连接可以用于可能跨越任意数量的事务的多个 JmsTemplate 调用。SingleConnectionFactory 引用一个通常来自 JNDI 的标准 ConnectionFactory

Using CachingConnectionFactory

CachingConnectionFactory 扩展了 SingleConnectionFactory 的功能,并增加了 SessionMessageProducerMessageConsumer 实例的缓存。初始缓存大小设置为 1。您可以使用 sessionCacheSize 属性来增加已缓存的 session 的数量。请注意,实际缓存的 session 数量多于该数量,因为 session 是根据它们的确认模式进行缓存的,因此当 sessionCacheSize 设置为 1 时,可能有多达四个已缓存的 session 实例(每个确认模式一个)。MessageProducerMessageConsumer 实例在它们自己的 session 内被缓存,并且在缓存时还考虑 producers 和 consumers 的唯一属性。MessageProducer 是根据其 destination 缓存的。MessageConsumer 根据由 destination、selector、noLocal 传递标志和持久订阅名称(如果创建持久订阅者)组成的键进行缓存。

临时队列和主题(TemporaryQueue/TemporaryTopic)的 MessageProducer 和 MessageConsumer 永远不会被缓存。不幸的是,WebLogic JMS 恰好在其常规 destination 实现上实现了临时队列/主题接口,错误地指示其没有任何 destination 可以被缓存。请在 WebLogic 上使用不同的连接池/缓存,或针对 WebLogic 目的自定义 CachingConnectionFactory

Destination Management

destinations 作为 ConnectionFactory 实例,是您可以存储在 JNDI 中并从 JNDI 中检索的 JMS 管理对象。在配置 Spring 应用程序上下文时,您可以使用 JNDI JndiObjectFactoryBean 工厂类或 <jee:jndi-lookup> 对您对象的 JMS destination 引用执行依赖项注入。但是,如果应用程序中有很多 destination 或如果有特定于 JMS 提供程序的高级 destination 管理功能,此策略通常很麻烦。此类高级 destination 管理的示例包括创建动态 destination 或支持 destination 的分层命名空间。JmsTemplate 将 destination 名称的解析委托给实现 DestinationResolver 接口的 JMS destination 对象。DynamicDestinationResolverJmsTemplate 使用的默认实现,并适应解析动态 destination。还提供了 JndiDestinationResolver,作为 JNDI 中包含的 destination 的服务定位器,并可选地回退到 DynamicDestinationResolver 中包含的行为。

通常,在 JMS 应用程序中使用的 destination 仅在运行时才知道,因此当应用程序部署时无法通过管理方式创建 destination。这通常是因为交互系统组件之间的应用程序逻辑是共享的,这些组件根据公知的命名约定在运行时创建 destination。即使动态 destination 的创建不是 JMS 规范的一部分,大多数供应商也提供了此功能。动态 destination 是用用户定义的名称创建的,这使它们与临时 destination 区分开来,并且通常在 JNDI 中未注册。用于创建动态 destination 的 API 因供应商而异,因为与 destination 关联的属性是特定于供应商的。但是,供应商有时会做出的一个简单的实现选择是不理会 JMS 规范中的警告,并使用 TopicSession createTopic(String topicName) 方法或 QueueSession createQueue(StringqueueName) 方法创建具有默认 destination 属性的新 destination。根据供应商实现,DynamicDestinationResolver 还可以创建物理 destination,而不仅仅是解析一个 destination。

布尔属性 pubSubDomain 用于配置 JmsTemplate,使其知道正在使用的 JMS 域。默认情况下,此属性的值为 false,表示使用点对点域 Queues。此属性(由 JmsTemplate 使用)通过 DestinationResolver 接口的实现来确定动态 destination 解析的行为。

您还可以通过 defaultDestination 属性为 JmsTemplate 配置默认 destination。默认 destination 用于不引用特定 destination 的发送和接收操作。

Message Listener Containers

在 EJB 领域中,JMS 消息最常见的用途之一是驱动消息驱动的 bean(MDB)。Spring 提供了一种解决方案:创建消息驱动的 POJO(MDP),而无需将用户绑定到 EJB 容器。(有关 Spring 的 MDP 支持的详细介绍,请参阅 Asynchronous reception: Message-Driven POJOs)。自 Spring Framework 4.1 以来,端点方法可以使用 `@JmsListener`进行注释——有关更多详细信息,请参阅 Annotation-driven Listener Endpoints

消息侦听器容器用于从 JMS 消息队列接收消息并驱动注入其中的 MessageListener。侦听器容器负责消息接收的所有线程处理,并分派到侦听器进行处理。消息侦听器容器是 MDP 与消息传递提供程序之间的中介,负责注册接收消息、参与事务、资源获取和释放、异常转换等。这使您可以编写与接收消息(以及可能对其进行响应)关联的(可能复杂的)业务逻辑,并将样板 JMS 基础设施问题委派给框架。

Spring 中打包装了两个标准 JMS 消息监听器容器的集合,每个容器都有其专业的特性集。

Using SimpleMessageListenerContainer

这种消息监听器容器是两种标准形式中较简单的,在启动时会创建固定数量的 JMS 会话和使用者,使用标准 JMS MessageConsumer.setMessageListener 方法注册该监听器,并由 JMS 提供者来执行监听器回调。此变量不允许针对运行时需求进行动态调整,也不允许参与外部管理的事务。从兼容性方面来看,它非常符合独立 JMS 规范的精神,但通常不兼容 Jakarta EE 的 JMS 限制。

虽然 SimpleMessageListenerContainer 不允许参与外部管理的事务,但它确实支持本机 JMS 事务。要启用此功能,您可以将 sessionTransacted 标志切换为 true,或在 XML 命名空间中,将 acknowledge 属性设置为 transacted。从侦听器触发的异常随后会导致回滚,消息将被重新传递。或者,可以考虑使用 CLIENT_ACKNOWLEDGE 模式,它在异常的情况下也会提供重新传递,但不会使用已处理的 Session 实例,因此不会在事务协议中包括任何其他 Session 操作(例如发送响应消息)。

默认 AUTO_ACKNOWLEDGE 模式不能提供适当的可靠性保证。当侦听器执行失败(因为提供程序在侦听器调用后自动确认每个消息,没有异常传播给提供程序)或侦听器容器关闭时(您可以通过设置 acceptMessagesWhileStopping 标志来配置此操作),消息可能会丢失。在需要可靠性的情况下,请务必使用已处理会话(例如,对于可靠队列处理和持久主题订阅)。

Using DefaultMessageListenerContainer

这种消息监听器容器在大多数情况下使用。与 SimpleMessageListenerContainer 相比,此容器变量允许根据运行时需求进行动态调整,并且能够参与外部管理的事务。在使用 JtaTransactionManager 配置时,每条接收到的消息都会用 XA 事务进行注册。为此,消息处理可以利用 XA 事务语义。此监听器容器很好地平衡了对 JMS 提供者的低要求、高级功能(如参与外部管理的事务)和与 Jakarta EE 环境的兼容性。

你可以自定义容器的缓存级别。请注意,在未启用缓存时,会为每次消息接收创建新的连接和会话。在高负载下将此与非持久订阅相结合可能导致消息丢失。在这种情况下,请务必使用适当的缓存级别。

当代理中断时,此容器还具有可恢复功能。默认情况下,一个简单的 BackOff 实现每五秒重试一次。你可以指定一个自定义 BackOff 实现来获得更细粒度的恢复选项。有关示例,请参见 ExponentialBackOff.

与兄弟项(SimpleMessageListenerContainer)一样,DefaultMessageListenerContainer 支持本机 JMS 事务,并允许自定义确认模式。如果对你的方案可行,则强烈建议使用此模式,而不是外部管理的事务,也就是说,如果你可以容忍在 JVM 终止时偶尔出现重复的消息。你的业务逻辑中的自定义重复消息检测步骤可以涵盖此类情况,例如,以业务实体存在性检查或协议表格检查的形式。任何此类安排都比以下替代方案更有效:使用 XA 事务(通过使用 JtaTransactionManager 配置 DefaultMessageListenerContainer)包装你的整个处理过程,以涵盖 JMS 消息的接收以及你的消息侦听器中业务逻辑的执行(包括数据库操作等)。

默认 AUTO_ACKNOWLEDGE 模式不能提供适当的可靠性保证。当侦听器执行失败(因为提供程序在侦听器调用后自动确认每个消息,没有异常传播给提供程序)或侦听器容器关闭时(您可以通过设置 acceptMessagesWhileStopping 标志来配置此操作),消息可能会丢失。在需要可靠性的情况下,请务必使用已处理会话(例如,对于可靠队列处理和持久主题订阅)。

Transaction Management

Spring 提供了一个 JmsTransactionManager,它管理一个 JMS ConnectionFactory 的事务。这使得 JMS 应用程序能够利用 Spring 的管理事务功能,如数据访问章节中的事务管理章节中所述。JmsTransactionManager 执行本地资源事务,将 JMS 连接/会话对从特定的 ConnectionFactory 绑定到该线程。JmsTemplate 可以自动检测到此类事务性资源,并根据需要对它们进行操作。

在 Jakarta EE 环境中,ConnectionFactory 将池化连接和会话实例,以便在事务中有效地重复使用这些资源。在独立环境中,使用 Spring 的 SingleConnectionFactory 会产生一个共享的 JMS 连接,并且每个事务都有其自己的独立会话。或者,考虑使用特定于提供者的池化适配器,如 ActiveMQ 的 PooledConnectionFactory 类。

你还可以使用 JmsTemplate 与 JtaTransactionManager 和支持 XA 的 JMS ConnectionFactory 来执行分布式事务。请注意,这需要使用 JTA 事务管理器以及正确配置的 XA 的 ConnectionFactory。(请查看 Jakarta EE 服务器或 JMS 提供商的文档。)

在使用 JMS API 从 Connection 中创建 Session 时,在托管和非托管的事务环境中重复使用代码可能会令人感到困惑。这是因为 JMS API 只有一个工厂方法可以创建会话,并且它需要事务和确认模式的值。在托管环境中,设置这些值是环境事务基础架构的责任,因此这些值会被供应商对 JMS 连接的包装程序忽略。当你在非托管环境中使用 JmsTemplate 时,你可以通过使用属性 sessionTransacted 和 sessionAcknowledgeMode 来指定这些值。当使用带有 JmsTemplate 的 PlatformTransactionManager 时,模板总是会获得一个事务性的 JMS 会话。