JMS Support

Outbound Gateway, 出站网关, reply-destination, reply-destination-name, reply-channe, async, idle-reply-listener-timeout, reply-listern, correlation-key, JMSMessageID, JMSCorrelationID, jms_correlationId

Spring Integration 为接收和发送 JMS 消息提供通道适配器。

Spring Integration provides channel adapters for receiving and sending JMS messages.

你需要将此依赖项包含在你的项目中:

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:{project-version}"

必须通过某些 JMS 厂商特定的实现明确添加“jakarta.jms:jakarta.jms-api”,例如 Apache ActiveMQ。

The jakarta.jms:jakarta.jms-api must be added explicitly via some JMS vendor-specific implementation, e.g. Apache ActiveMQ.

实际上有两个基于 JMS 的入站通道适配器。第一个基于轮询周期使用 Spring 的“JmsTemplate”来接收。第二个为“message-driven”,并依赖于 Spring 的“MessageListener”容器。出站通道适配器使用“JmsTemplate”按需转换并发送一个 JMS 消息。

There are actually two JMS-based inbound Channel Adapters. The first uses Spring’s JmsTemplate to receive based on a polling period. The second is “message-driven” and relies on a Spring MessageListener container. The outbound channel adapter uses the JmsTemplate to convert and send a JMS message on demand.

通过使用 JmsTemplateMessageListener 容器,Spring Integration 依赖于 Spring 的 JMS 支持。这一点很重要,因为在这些适配器上公开的大多数属性都会配置底层 JmsTemplateMessageListener 容器。有关 JmsTemplateMessageListener 容器的更多详细信息,请参阅 Spring JMS documentation

By using JmsTemplate and the MessageListener container, Spring Integration relies on Spring’s JMS support. This is important to understand, since most of the attributes exposed on these adapters configure the underlying JmsTemplate and MessageListener container. For more details about JmsTemplate and the MessageListener container, see the Spring JMS documentation.

虽然 JMS 通道适配器专用于单向消息传递(仅发送或仅接收),但 Spring Integration 也提供用于请求和答复操作的入站和出站 JMS 网关。入站网关依赖于 Spring 的 MessageListener 容器实现之一进行消息驱动的接收。它还能够将返回值发送到 reply-to 目标,如接收到的消息所提供。出站网关向 request-destination(或 request-destination-namerequest-destination-expression)发送 JMS 消息,然后接收答复消息。您可以显式配置 reply-destination 引用(或 reply-destination-namereply-destination-expression)。否则,出站网关将使用 JMS TemporaryQueue

Whereas the JMS channel adapters are intended for unidirectional messaging (send-only or receive-only), Spring Integration also provides inbound and outbound JMS Gateways for request and reply operations. The inbound gateway relies on one of Spring’s MessageListener container implementations for message-driven reception. It is also capable of sending a return value to the reply-to destination, as provided by the received message. The outbound gateway sends a JMS message to a request-destination (or request-destination-name or request-destination-expression) and then receives a reply message. You can explicitly configure the reply-destination reference (or reply-destination-name or reply-destination-expression). Otherwise, the outbound gateway uses a JMS TemporaryQueue.

在 Spring Integration 2.2 之前,如果有必要,会为每个请求或答复创建一个“TemporaryQueue”(并将其移除)。从 Spring Integration 2.2 开始,您可以配置出站网关以使用“MessageListener”容器来接收答复,而不是直接使用一个新的(或缓存的)“Consumer”来接收每个请求的答复。如果这样配置且没有提供明确的答复目标,则会为每个网关使用一个“TemporaryQueue”,而不是为每个请求使用一个。

Prior to Spring Integration 2.2, if necessary, a TemporaryQueue was created (and removed) for each request or reply. Beginning with Spring Integration 2.2, you can configure the outbound gateway to use a MessageListener container to receive replies instead of directly using a new (or cached) Consumer to receive the reply for each request. When so configured, and no explicit reply destination is provided, a single TemporaryQueue is used for each gateway instead of one for each request.

从版本 6.0 开始,如果将“replyPubSubDomain”选项设置为“true”,出站网关将创建一个“TemporaryTopic”而不是“TemporaryQueue”。一些 JMS 厂商以不同方式处理这些目标。

Starting with version 6.0, the outbound gateway creates a TemporaryTopic instead of TemporaryQueue if replyPubSubDomain option is set to true. Some JMS vendors handle these destinations differently.

Inbound Channel Adapter

入站通道适配器需要一个指向一个“JmsTemplate”实例的引用或指向一个“ConnectionFactory”和一个“Destination”的引用(您可以提供“destinationName”来替代“destination”引用)。以下示例定义了一个具有“Destination”引用的入站通道适配器:

The inbound channel adapter requires a reference to either a single JmsTemplate instance or both a ConnectionFactory and a Destination (you can provide a 'destinationName' in place of the 'destination' reference). The following example defines an inbound channel adapter with a Destination reference:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>

从前面的配置中注意到,通道适配器是一个轮询消费者。这意味着它会在被触发时调用消费器。你应该只在轮询相对不频繁且及时性不重要的情况下使用它。对于所有其他情况(绝大多数基于 JMS 的用例),消息驱动通道适配器(MDBA)是更好的选择。

Notice from the preceding configuration that the inbound-channel-adapter is a polling consumer. That means that it invokes receive() when triggered. You should use this should only in situations where polling is done relatively infrequently and timeliness is not important. For all other situations (a vast majority of JMS-based use-cases), the message-driven-channel-adapter (described later) is a better option.

默认情况下,所有需要引用 JMS 连接工厂的 JMS 适配器都会自动查找名为“jmsConnectionFactory”的 bean。这就是为什么在许多示例中看不到 connectionFactory 属性的原因。但是,如果你的 JMS 连接工厂具有不同的 bean 名称,则需要提供该属性。

By default, all JMS adapters that require a reference to the ConnectionFactory automatically look for a bean named jmsConnectionFactory. That is why you do not see a connection-factory attribute in many of the examples. However, if your JMS ConnectionFactory has a different bean name, you need to provide that attribute.

如果“extract-payload”设置为“true”(默认值),接收的 JMS 消息会通过“MessageConverter”。当依靠默认的“SimpleMessageConverter”时,这意味着生成的 Spring Integration 消息具有 JMS 消息的主体作为其负载。一个 JMS“TextMessage”生成一个基于字符串的负载,一个 JMS“BytesMessage”生成一个字节数组负载,一个 JMS“ObjectMessage”的可序列化实例成为 Spring Integration 消息的负载。如果您希望原始 JMS 消息作为 Spring Integration 消息的负载,请将“extractPayload”选项设置为“false”。

If extract-payload is set to true (the default), the received JMS Message is passed through the MessageConverter. When relying on the default SimpleMessageConverter, this means that the resulting Spring Integration Message has the JMS message’s body as its payload. A JMS TextMessage produces a string-based payload, a JMS BytesMessage produces a byte array payload, and the serializable instance of a JMS ObjectMessage becomes the Spring Integration message’s payload. If you prefer to have the raw JMS message as the Spring Integration message’s payload, set the extractPayload option to false.

从版本 5.0.8 开始,“receive-timeout”的默认值为“-1”(不等待)用于“org.springframework.jms.connection.CachingConnectionFactory”和“cacheConsumers”,否则为 1 秒。JMS 入站通道适配器基于提供的“ConnectionFactory”和选项创建一个“DynamicJmsTemplate”。如果需要外部“JmsTemplate”(例如在 Spring Boot 环境中)或“ConnectionFactory”没有缓存或没有“cacheConsumers”,如果期望无阻塞消耗,建议设置“jmsTemplate.receiveTimeout(-1)”:

Starting with version 5.0.8, a default value of the receive-timeout is -1 (no wait) for the org.springframework.jms.connection.CachingConnectionFactory and cacheConsumers, otherwise it is 1 second. The JMS Inbound Channel Adapter crates a DynamicJmsTemplate based on the provided ConnectionFactory and options. If an external JmsTemplate is required (e.g. in Spring Boot environment), or ConnectionFactory is not caching, or no cacheConsumers, it is recommended to set jmsTemplate.receiveTimeout(-1) if a non-blocking consumption is expected:

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

Transactions

从版本 4.0 开始,入站通道适配器支持“session-transacted”属性。在早期版本中,您必须注入一个“sessionTransacted”设置为“true”的“JmsTemplate”。(适配器允许您将“acknowledge”属性设置为“transacted”,但这是不正确的,也不起作用)。

Starting with version 4.0, the inbound channel adapter supports the session-transacted attribute. In earlier versions, you had to inject a JmsTemplate with sessionTransacted set to true. (The adapter did let you set the acknowledge attribute to transacted, but this was incorrect and did not work).

但是,请注意,将“session-transacted”设置为“true”没什么用,因为事务会在“receive()”操作后并消息被发送到“channel”前立即提交。

Note, however, that setting session-transacted to true has little value, because the transaction is committed immediately after the receive() operation and before the message is sent to the channel.

如果您想要整个流程成为事务性的(例如,如果有下游出站通道适配器),则必须使用具有“JmsTransactionManager”的事务性轮询器。或者,考虑使用“acknowledge”设置为“transacted”(默认值)的“jms-message-driven-channel-adapter”。

If you want the entire flow to be transactional (for example, if there is a downstream outbound channel adapter), you must use a transactional poller with a JmsTransactionManager. Alternatively, consider using a jms-message-driven-channel-adapter with acknowledge set to transacted (the default).

Message Driven Channel Adapter

“message-driven-channel-adapter”需要一个指向 Spring 的“MessageListener”容器(“AbstractMessageListenerContainer”的任何子类)实例的引用或指向“ConnectionFactory”和“Destination”的引用(可以提供“destinationName”来替代“destination”引用)。以下示例定义了一个具有“Destination”引用的消息驱动通道适配器:

The message-driven-channel-adapter requires a reference to either an instance of a Spring MessageListener container (any subclass of AbstractMessageListenerContainer) or both ConnectionFactory and Destination (a 'destinationName' can be provided in place of the 'destination' reference). The following example defines a message-driven channel adapter with a Destination reference:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>

消息驱动的适配器还接受与 MessageListener 容器相关的几个属性。仅当您不提供 container 引用时,才会考虑这些值。在这种情况下,将基于这些属性创建一个 DefaultMessageListenerContainer 实例并将其配置好。例如,您可以指定 transaction-manager 引用、concurrent-consumers 值以及几个其他属性引用和值。请参阅 Javadoc 和 Spring Integration 的 JMS 架构 (spring-integration-jms.xsd) 以获取更多详细信息。

The message-driven adapter also accepts several properties that pertain to the MessageListener container. These values are considered only if you do not provide a container reference. In that case, an instance of DefaultMessageListenerContainer is created and configured based on these properties. For example, you can specify the transaction-manager reference, the concurrent-consumers value, and several other property references and values. See the Javadoc and Spring Integration’s JMS schema (spring-integration-jms.xsd) for more details.

如果您有自定义侦听器容器实现(通常是“DefaultMessageListenerContainer”的子类),您可以通过使用“container”属性提供对它的实例的引用或通过使用“container-class”属性提供它的完全限定类名。在这种情况下,适配器上的属性会传输到自定义容器的实例。

If you have a custom listener container implementation (usually a subclass of DefaultMessageListenerContainer), you can either provide a reference to an instance of it by using the container attribute or provide its fully qualified class name by using the container-class attribute. In that case, the attributes on the adapter are transferred to an instance of your custom container.

您不能使用 Spring JMS 命名空间元素“<jms:listener-container/>”来配置“<int-jms:message-driven-channel-adapter>”的容器引用,因为该元素实际上并不引用容器。每个“<jms:listener/>”子元素都有它自己的“DefaultMessageListenerContainer”(具有在父元素“<jms:listener-container/>”上定义的共享属性)。不过,您可以给每个侦听器子元素一个“id”,并用它注入通道适配器,“<jms:/>”命名空间需要一个真正的侦听器。

You can’t use the Spring JMS namespace element <jms:listener-container/> to configure a container reference for the <int-jms:message-driven-channel-adapter> since that element doesn’t actually reference a container. Each <jms:listener/> sub-element gets its own DefaultMessageListenerContainer (with shared attributes defined on the parent <jms:listener-container/> element). You can give each listener sub-element an id, and use that to inject into the channel adapter, however, the <jms:/> namespace requires a real listener.

建议为“DefaultMessageListenerContainer”配置一个常规的“<bean>”,并在通道适配器中用它作引用。

It is recommended to configure a regular <bean> for the DefaultMessageListenerContainer and use it as a reference in the channel adapter.

从 4.2 版本开始,默认 message-driven 模式为 auto,除非您提供了一个外部容器。在后一种情况下,您应该根据需要配置容器。我们建议使用 pub-sub,因为只有这样才能避免丢失消息。

Starting with version 4.2, the default acknowledge mode is transacted, unless you provide an external container. In that case, you should configure the container as needed. We recommend using transacted with the DefaultMessageListenerContainer to avoid message loss.

“extract-payload”属性具有相同的效果,其默认值为“true”。由于“poller”元素会被主动调用,因此它不适用于消息驱动通道适配器。对于大多数方案,消息驱动方法更好,因为消息在从底层的 JMS 消费者接收到后会立即传递给“MessageChannel”。

The 'extract-payload' property has the same effect, and its default value is 'true'. The poller element is not applicable for a message-driven channel adapter, as it is actively invoked. For most scenarios, the message-driven approach is better, since the messages are passed along to the MessageChannel as soon as they are received from the underlying JMS consumer.

最后,<message-driven-channel-adapter> 元素还接受“error-channel”属性。这提供了与 Enter the GatewayProxyFactoryBean 中描述相同的基本功能。以下示例演示了如何在消息驱动的通道适配器上设置错误通道:

Finally, the <message-driven-channel-adapter> element also accepts the 'error-channel' attribute. This provides the same basic functionality, as described in Enter the GatewayProxyFactoryBean. The following example shows how to set an error channel on a message-driven channel adapter:

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

将前面该示例与我们后面讨论的通用网关配置或 JMS“inbound-gateway”进行比较时,主要区别在于这是一个单向流,因为它是一个“channel-adapter”,而不是网关。因此,“error-channel”下游流也应是单向的。例如,它可以发送到日志记录处理程序,或者可以连接到其他 JMS“<outbound-channel-adapter>”元素。

When comparing the preceding example to the generic gateway configuration or the JMS 'inbound-gateway' that we discuss later, the key difference is that we are in a one-way flow, since this is a 'channel-adapter', not a gateway. Therefore, the flow downstream from the 'error-channel' should also be one-way. For example, it could send to a logging handler, or it could connect to a different JMS <outbound-channel-adapter> element.

从主题使用时,将“pub-sub-domain”属性设置为 true。为持久订阅设置“subscription-durable”为“true”,或为共享订阅设置“subscription-shared” (它需要 JMS 2.0 代理并且从版本 4.2 开始可用)。使用“subscription-name”来命名订阅。

When consuming from topics, set the pub-sub-domain attribute to true. Set subscription-durable to true for a durable subscription or subscription-shared for a shared subscription (which requires a JMS 2.0 broker and has been available since version 4.2). Use subscription-name to name the subscription.

从版本 5.1 开始,当应用程序正在运行时停止端点时,基础监听器容器关闭,关闭其共享连接和使用者。以前,连接和使用者保持打开状态。要恢复到以前的行为,将 JmsMessageDrivenEndpoint 中的 shutdownContainerOnStop 设置为“false”。

Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers. Previously, the connection and consumers remained open. To revert to the previous behavior, set the shutdownContainerOnStop on the JmsMessageDrivenEndpoint to false.

从版本 6.3 开始,“ChannelPublishingJmsMessageListener”现在可以提供一个 RetryTemplateRecoveryCallback<Message<?>>,可以在下行发送和发送接收操作中重试。这些选项也公开到 Java DSL 的 JmsMessageDrivenChannelAdapterSpec 中。

Starting with version 6.3, the ChannelPublishingJmsMessageListener can now be supplied with a RetryTemplate and RecoveryCallback<Message<?>> for retries on the downstream send and send-and-receive operations. These options are also exposed into a JmsMessageDrivenChannelAdapterSpec for Java DSL.

Inbound Conversion Errors

从版本 4.2 开始,“error-channel”也用于转换错误。以前,如果 JMS“<message-driven-channel-adapter/>”或“<inbound-gateway/>”由于转换错误而无法传递消息,则会向容器抛回异常。如果容器配置为使用事务,则消息将被回滚并重复传送。转换过程发生在消息构建之前和期间,以使此类错误不会发送到“error-channel”。现在,此类转换异常会导致向“error-channel”发送 ErrorMessage,异常作为“payload”。如果希望事务回滚并且定义了“error-channel”,则“error-channel”上的集成流必须重新抛出异常(或其他异常)。如果错误流不抛出异常,则事务将提交,消息将被移除。如果没有定义“error-channel”,则会向容器抛回异常,就像以前一样。

Starting with version 4.2, the 'error-channel' is used for the conversion errors, too. Previously, if a JMS <message-driven-channel-adapter/> or <inbound-gateway/> could not deliver a message due to a conversion error, an exception would be thrown back to the container. If the container is configured to use transactions, the message is rolled back and redelivered repeatedly. The conversion process occurs before and during message construction so that such errors are not sent to the 'error-channel'. Now such conversion exceptions result in an ErrorMessage being sent to the 'error-channel', with the exception as the payload. If you wish the transaction to roll back, and you have an 'error-channel' defined, the integration flow on the 'error-channel' must re-throw the exception (or another exception). If the error flow does not throw an exception, the transaction is committed and the message is removed. If no 'error-channel' is defined, the exception is thrown back to the container, as before.

Outbound Channel Adapter

JmsSendingMessageHandler 实现 MessageHandler 接口,能够将 Spring Integration 消息`转换为 JMS 消息,然后发送到 JMS 目的地。它需要 `jmsTemplate 引用或 jmsConnectionFactorydestination 引用(destinationName 可以代替 destination 提供)。与入站通道适配器一样,配置此适配器的最简单方法是使用名称空间支持。以下配置生成了一个适配器,该适配器从 exampleChannel 接收 Spring Integration 消息,将它们转换为 JMS 消息,并将它们发送到 bean 名称是 outQueue 的 JMS 目的地引用:

The JmsSendingMessageHandler implements the MessageHandler interface and is capable of converting Spring Integration Messages to JMS messages and then sending to a JMS destination. It requires either a jmsTemplate reference or both jmsConnectionFactory and destination references (destinationName may be provided in place of destination). As with the inbound channel adapter, the easiest way to configure this adapter is with the namespace support. The following configuration produces an adapter that receives Spring Integration messages from the exampleChannel, converts those into JMS messages, and sends them to the JMS destination reference whose bean name is outQueue:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return IntegrationFlow.from("exampleChannel")
                .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
        integrationFlow("exampleChannel") {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }
@Bean
jmsOutboundFlow() {
    integrationFlow('exampleChannel') {
        handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
                .with {
                    destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
                    deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
                    timeToLiveExpression '10000'
                    configureJmsTemplate {
                        it.explicitQosEnabled true
                    }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

与入站通道适配器一样,有一个“extract-payload”属性。但是,对于出站适配器,含义是相反的。布尔属性不是应用于 JMS 消息,而是应用于 Spring Integration 消息负载。换句话说,决定是将 Spring Integration 消息本身作为 JMS 消息正文传递,还是将 Spring Integration 消息负载作为 JMS 消息正文传递。默认值为“true”。因此,如果你传递一个负载为 String 的 Spring Integration 消息,将创建一个 JMS TextMessage。另一方面,如果你想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为“false”。

As with the inbound channel adapters, there is an 'extract-payload' property. However, the meaning is reversed for the outbound adapter. Rather than applying to the JMS message, the boolean property applies to the Spring Integration message payload. In other words, the decision is whether to pass the Spring Integration message itself as the JMS message body or to pass the Spring Integration message payload as the JMS message body. The default value is 'true'. Therefore, if you pass a Spring Integration message whose payload is a String, a JMS TextMessage is created. If, on the other hand, you want to send the actual Spring Integration message to another system over JMS, set it to 'false'.

无论有效负载提取的布尔值是什么,Spring Integration 都会将属性映射到 JMS 属性,只要您依赖于默认转换器或提供对其他 MessageConverter 实例的引用。(对于“入站”适配器也是如此,只是在这种情况下,JMS 属性映射到 Spring Integration 属性)。

Regardless of the boolean value for payload extraction, the Spring Integration MessageHeaders map to JMS properties, as long as you rely on the default converter or provide a reference to another instance of MessageConverter. (The same holds true for 'inbound' adapters, except that, in those cases, the JMS properties map to Spring Integration MessageHeaders).

从版本 5.1 开始,“<int-jms:outbound-channel-adapter>” (JmsSendingMessageHandler) 可以使用 deliveryModeExpressiontimeToLiveExpression 属性来评估弹性消息中间件针对请求 Spring 消息 发送在运行时适用的 QoS 值。DefaultJmsHeaderMapper 的新 setMapInboundDeliveryMode(true)setMapInboundExpiration(true) 选项可以作为消息头中动态 deliveryModetimeToLive 信息的来源:

Starting with version 5.1, the <int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) can be configured with the deliveryModeExpression and timeToLiveExpression properties to evaluate an appropriate QoS values for JMS message to send at runtime against request Spring Message. The new setMapInboundDeliveryMode(true) and setMapInboundExpiration(true) options of the DefaultJmsHeaderMapper may facilitate as a source of the information for the dynamic deliveryMode and timeToLive from message headers:

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

Transactions

从版本 4.0 开始,出站通道适配器支持 session-transacted 属性。在较早的版本中,你必须注入一个 JmsTemplate,并将 sessionTransacted 设置为“true”。该属性现在在内置默认 JmsTemplate 上设置属性。如果存在事务(可能来自上游 message-driven-channel-adapter),则发送操作在同一事务内执行。否则,将启动一个新事务。

Starting with version 4.0, the outbound channel adapter supports the session-transacted attribute. In earlier versions, you had to inject a JmsTemplate with sessionTransacted set to true. The attribute now sets the property on the built-in default JmsTemplate. If a transaction exists (perhaps from an upstream message-driven-channel-adapter), the send operation is performed within the same transaction. Otherwise, a new transaction is started.

Inbound Gateway

Spring Integration 的消息驱动 JMS 入站网关委托给一个 MessageListener 容器,支持动态调整并发使用者,并且还可以处理应答。入站网关需要一个 ConnectionFactory 和一个请求 Destination(或“requestDestinationName”)的引用。以下示例定义了一个 JMS inbound-gateway,它从 bean id inQueue 引用的 JMS 队列中接收,并发送到名为 exampleChannel 的 Spring Integration 通道:

Spring Integration’s message-driven JMS inbound-gateway delegates to a MessageListener container, supports dynamically adjusting concurrent consumers, and can also handle replies. The inbound gateway requires references to a ConnectionFactory and a request Destination (or 'requestDestinationName'). The following example defines a JMS inbound-gateway that receives from the JMS queue referenced by the bean id, inQueue, and sends to the Spring Integration channel named exampleChannel:

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

鉴于网关提供的是请求-答复行为,而不是单向发送或接收行为,所以它们还具有两个针对 “payload extraction” 的明显属性(如:用于通道适配器“extract-payload”设置的 discussed earlier)。对于入站网关,“extract-request-payload”属性确定是否提取接收到的 JMS 消息正文。如果为“false”,则 JMS 消息本身将成为 Spring Integration 消息有效负载。默认值为“true”。

Since the gateways provide request-reply behavior instead of unidirectional send or receive behavior, they also have two distinct properties for “payload extraction” (as discussed earlier for the channel adapters' 'extract-payload' setting). For an inbound gateway, the 'extract-request-payload' property determines whether the received JMS Message body is extracted. If 'false', the JMS message itself becomes the Spring Integration message payload. The default is 'true'.

类似地,对于入站网关,“extract-reply-payload”属性应用于要转换为应答 JMS 消息的 Spring Integration 消息。如果你想传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的正文),请将此值设置为“false”。默认情况下,Spring Integration 消息负载也会转换为 JMS 消息(例如,String 负载将变为 JMS TextMessage)。

Similarly, for an inbound-gateway, the 'extract-reply-payload' property applies to the Spring Integration message that is to be converted into a reply JMS Message. If you want to pass the whole Spring Integration message (as the body of a JMS ObjectMessage), set value this to 'false'. By default, it is also 'true' that the Spring Integration message payload is converted into a JMS Message (for example, a String payload becomes a JMS TextMessage).

与其他任何事情一样,网关调用可能会导致错误。默认情况下,不会向生产者通知可能发生在消费者端的错误,并且会超时等待应答。但是,有时你可能希望将错误情况传达给消费者(换句话说,你可能希望通过将其映射到消息来将异常视为有效应答)。为了实现这一点,JMS 入站网关为消息通道提供支持,可以将错误发送到该通道以进行处理,这可能会导致符合某种约定的应答消息负载,该约定定义了调用方可能期待的“error”应答。你可以使用 error-channel 属性来配置这样的通道,如下例所示:

As with anything else, gateway invocation might result in error. By default, a producer is not notified of the errors that might have occurred on the consumer side and times out waiting for the reply. However, there might be times when you want to communicate an error condition back to the consumer (in other words, you might want to treat the exception as a valid reply by mapping it to a message). To accomplish this, JMS inbound gateway provides support for a message channel to which errors can be sent for processing, potentially resulting in a reply message payload that conforms to some contract that defines what a caller may expect as an “error” reply. You can use the error-channel attribute to configure such a channel, as the following example shows:

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

您可能会注意到,此示例看起来与包含在 Enter the GatewayProxyFactoryBean 中的示例非常相似。这里应用了相同的概念:exceptionTransformer 可以是创建错误响应对象的 POJO,您可以引用 nullChannel 来抑制错误,也可以将“error-channel”留空以让异常传播。

You might notice that this example looks very similar to that included within Enter the GatewayProxyFactoryBean. The same idea applies here: The exceptionTransformer could be a POJO that creates error-response objects, you could reference the nullChannel to suppress the errors, or you could leave 'error-channel' out to let the exception propagate.

从主题使用时,将“pub-sub-domain”属性设置为 true。为持久订阅设置“subscription-durable”为“true”,或为共享订阅设置“subscription-shared” (它需要 JMS 2.0 代理并且从版本 4.2 开始可用)。使用“subscription-name”来命名订阅。

When consuming from topics, set the pub-sub-domain attribute to true. Set subscription-durable to true for a durable subscription or subscription-shared for a shared subscription (requires a JMS 2.0 broker and has been available since version 4.2). Use subscription-name to name the subscription.

从 4.2 版本开始,默认“message-driven”模式为“auto”,除非提供了外部容器。在这种情况下,您应该根据需要配置容器。我们建议使用“pub-sub”模式,因为只有这样才能避免丢失消息。

Starting with version 4.2, the default acknowledge mode is transacted, unless an external container is provided. In that case, you should configure the container as needed. We recommend that you use transacted with the DefaultMessageListenerContainer to avoid message loss.

从版本 5.1 开始,当应用程序正在运行时停止端点时,基础监听器容器关闭,关闭其共享连接和使用者。以前,连接和使用者保持打开状态。要恢复到以前的行为,将 JmsInboundGateway 中的 shutdownContainerOnStop 设置为“false”。

Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers. Previously, the connection and consumers remained open. To revert to the previous behavior, set the shutdownContainerOnStop on the JmsInboundGateway to false.

默认情况下,JmsInboundGateway 在接收到的消息中查找 jakarta.jms.Message.getJMSReplyTo() 属性来确定将应答发送到哪里。否则,它可以配置为静态 defaultReplyDestinationdefaultReplyQueueNamedefaultReplyTopicName。此外,从版本 6.1 开始,可以在提供的 ChannelPublishingJmsMessageListener 上配置一个 replyToExpression 以动态确定应答目的地,如果请求上标准 JMSReplyTo 属性为 null。接收到的 jakarta.jms.Message 用作根评估上下文对象。以下示例演示了如何使用 Java DSL API 来配置一个入站 JMS 网关,该网关具有从请求消息解析的自定义应答目的地:

By default, the JmsInboundGateway looks for a jakarta.jms.Message.getJMSReplyTo() property in the received message to determine where to send a reply. Otherwise, it can be configured with a static defaultReplyDestination, or defaultReplyQueueName or defaultReplyTopicName. In addition, starting with version 6.1, a replyToExpression can be configured on a provided ChannelPublishingJmsMessageListener to determine the reply destination dynamically, if the standard JMSReplyTo property is null on the request. The received jakarta.jms.Message is used the root evaluation context object. The following example demonstrates how to use Java DSL API to configure an inbound JMS gateway with a custom reply destination resolved from the request message:

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundGateway(connectionFactory)
                            .requestDestination("requestDestination")
                            .replyToFunction(message -> message.getStringProperty("myReplyTo")))
            .<String, String>transform(String::toUpperCase)
            .get();
}

从 6.3 版本开始,Jms.inboundGateway() API 公开了 retryTemplate()recoveryCallback() 选项,用于重试内部收发操作。

Starting with version 6.3, the Jms.inboundGateway() API exposes a retryTemplate() and recoveryCallback() options for retrying internal send-and-receive operations.

Outbound Gateway

出站网关使用 Spring Integration 消息创建 JMS 消息,并将其发送到 request-destination。然后,通过使用选择器从配置的 reply-destination 接收或(如果未提供 reply-destination)通过创建 JMS TemporaryQueue(或如果 replyPubSubDomain= true,则创建 TemporaryTopic)实例来处理 JMS 答复消息。

The outbound gateway creates JMS messages from Spring Integration messages and sends them to a request-destination. It then handles the JMS reply message either by using a selector to receive from the reply-destination that you configure or, if no reply-destination is provided, by creating JMS TemporaryQueue (or TemporaryTopic if replyPubSubDomain= true) instances.

reply-destination(或 reply-destination-name)与将 cacheConsumers 设置为 trueCachingConnectionFactory 配合使用会导致出现内存不足的情况。这是因为每个请求都会获得带有新选择器的消费者(选择基于 correlation-key 值,或当没有 correlation-key 时,基于发送的 JMSMessageID)。鉴于这些选择器是唯一的,因此它们在当前请求完成后会保留在缓存中(未使用)。

Using a reply-destination (or reply-destination-name) together with a CachingConnectionFactory that has cacheConsumers set to true can cause out-of-memory conditions. This is because each request gets a new consumer with a new selector (selecting on the correlation-key value or when there is no correlation-key, on the sent JMSMessageID). Given that these selectors are unique, they remain in the cache (unused) after the current request completes.

如果您指定了答复目标,建议您不要使用缓存使用者。或者,考虑作为 described below 使用 <reply-listener/>

If you specify a reply destination, you are advised to not use cached consumers. Alternatively, consider using a <reply-listener/> as described below.

以下示例显示了如何配置出站网关:

The following example shows how to configure an outbound gateway:

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

“outbound-gateway”有效负载提取属性与“inbound-gateway” 的属性相反(请参阅 earlier discussion)。这意味着“extract-request-payload”属性值适用于转换为要作为请求发送的 JMS 消息的 Spring Integration 消息。“extract-reply-payload”属性值适用于作为答复接收的 JMS 消息,然后转换为 Spring Integration 消息,以便随后发送到“reply-channel”,如前面的配置示例所示。

The 'outbound-gateway' payload extraction properties are inversely related to those of the 'inbound-gateway' (see the earlier discussion). That means that the 'extract-request-payload' property value applies to the Spring Integration message being converted into a JMS message to be sent as a request. The 'extract-reply-payload' property value applies to the JMS message received as a reply and is then converted into a Spring Integration message to be subsequently sent to the 'reply-channel', as shown in the preceding configuration example.

Using a <reply-listener/>

Spring Integration 2.2 引入了一种处理响应的备用技术。如果您向网关添加 <reply-listener/> 子元素而不是为每个响应创建使用者, 那么 MessageListener 容器将用于接收响应并将其移交给请求线程。这提供了许多性能优势,还缓解了 earlier caution 中描述的缓存使用者内存使用问题。

Spring Integration 2.2 introduced an alternative technique for handling replies. If you add a <reply-listener/> child element to the gateway instead of creating a consumer for each reply, a MessageListener container is used to receive the replies and hand them over to the requesting thread. This provides a number of performance benefits as well as alleviating the cached consumer memory utilization problem described in the earlier caution.

当将 <reply-listener/> 与没有 reply-destination 的出站网关一起使用时,它会为每个请求使用一个 TemporaryQueue,而不是为每个请求创建一个 TemporaryQueue。(如果与代理断开连接再恢复,网关会根据需要创建额外的 TemporaryQueue)。如果将 replyPubSubDomain 设置为 true,则从 6.0 版本开始,会创建一个 TemporaryTopic

When using a <reply-listener/> with an outbound gateway that has no reply-destination, instead of creating a TemporaryQueue for each request, a single TemporaryQueue is used. (The gateway creates an additional TemporaryQueue, as necessary, if the connection to the broker is lost and recovered). If replyPubSubDomain is set to true, starting with version 6.0, a TemporaryTopic is created instead.

在使用 correlation-key 时,多个网关可以共享同一个答复目标,因为监听器容器使用适用于每个网关的唯一选择器。

When using a correlation-key, multiple gateways can share the same reply destination, because the listener container uses a selector that is unique to each gateway.

如果你指定了答复监听器和答复目标(或答复目标名称)但不提供关联键,网关会记录警告并回退到 2.2 版本之前的行为。这是因为在这种情况下无法配置选择器。因此,没法避免答复转到可能配置相同答复目标的不同网关。

If you specify a reply listener and specify a reply destination (or reply destination name) but provide no correlation key, the gateway logs a warning and falls back to pre-version 2.2 behavior. This is because there is no way to configure a selector in this case. Thus, there is no way to avoid a reply going to a different gateway that might be configured with the same reply destination.

请注意,在这种情况下,每个请求都会使用一个新的消费者,并且消费者会像上面的警告中所述的那样在内存中累积;因此,在这种情况下不应使用缓存的消费者。

Note that, in this situation, a new consumer is used for each request, and consumers can build up in memory as described in the caution above; therefore cached consumers should not be used in this case.

以下示例显示了具有默认属性的答复监听器:

The following example shows a reply listener with default attributes:

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

侦听器非常轻量级,我们预计在大多数情况下只需要一个使用者。但是,您可以添加诸如 concurrent-consumersmax-concurrent-consumers 等属性。参阅模式了解支持属性的完整列表,以及它们含义的 Spring JMS documentation

The listener is very lightweight, and we anticipate that, in most cases, you need only a single consumer. However, you can add attributes such as concurrent-consumers, max-concurrent-consumers, and others. See the schema for a complete list of supported attributes, together with the Spring JMS documentation for their meanings.

Idle Reply Listeners

从 4.2 版本开始,您可以根据需要启动回复侦听器(并在空闲时间后停止侦听器),而不是在网关生命周期持续时间内运行侦听器。如果应用程序上下文中有很多大部分处于空闲状态的网关,这将非常有用。这种情况之一是很多(非活动)分区 Spring Batch 作业使用 Spring Integration 和 JMS 进行分区分布时的上下文。如果所有答复侦听器都处于活动状态,则 JMS 服务器对每个网关都有一个活动使用者。通过启用空闲超时,每个使用者只在相应批处理作业运行时(以及作业完成后的一段时间内)存在。

Starting with version 4.2, you can start the reply listener as needed (and stop it after an idle time) instead of running for the duration of the gateway’s lifecycle. This can be useful if you have many gateways in the application context where they are mostly idle. One such situation is a context with many (inactive) partitioned Spring Batch jobs using Spring Integration and JMS for partition distribution. If all the reply listeners are active, the JMS broker has an active consumer for each gateway. By enabling the idle timeout, each consumer exists only while the corresponding batch job is running (and for a short time after it finishes).

请参阅 Attribute Reference 中的 idle-reply-listener-timeout

See idle-reply-listener-timeout in Attribute Reference.

Gateway Reply Correlation

本节根据网关的配置方式描述了用于回复关联的机制(确保原始网关仅接收对自身请求的回复)。请参阅 Attribute Reference 完整了解此处讨论的属性。

This section describes the mechanisms used for reply correlation (ensuring the originating gateway receives replies to only its requests), depending on how the gateway is configured. See Attribute Reference for complete description of the attributes discussed here.

以下列表描述了各种场景(数字仅用于标识——顺序无关紧要):

The following list describes the various scenarios (the numbers are for identification — order does not matter):

  1. No reply-destination* properties and no <reply-listener>[.iokays-translated-dbec813f61fc752d71765dd251fa717c] 为每个请求创建一个 TemporaryQueue,并在请求完成(成功或失败)时删除它。correlation-key 无关紧要。

A TemporaryQueue is created for each request and deleted when the request is complete (successfully or otherwise). correlation-key is irrelevant. . A reply-destination* property is provided and neither a <reply-listener/> nor a correlation-key is provided[.iokays-translated-9f2931db87bd60159b962b4aaf132334] 将 JMSCorrelationID 等于传出消息用作消费者的消息选择器:

The JMSCorrelationID equal to the outgoing message is used as a message selector for the consumer:

messageSelector = "JMSCorrelationID = '" + 消息 ID + "'"

messageSelector = "JMSCorrelationID = '" + messageId + "'"

响应系统预期在回复 JMSCorrelationID 中返回输入 JMSMessageID。这是一个常见模式,并已由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 为消息驱动的 POJO 实现。

The responding system is expected to return the inbound JMSMessageID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs.

当你使用此配置时,不应该将主题用于答复。答复可能会丢失。

When you use this configuration, you should not use a topic for replies. The reply may be lost.

  1. A reply-destination* property is provided, no <reply-listener/> is provided, and correlation-key="JMSCorrelationID"[.iokays-translated-6446db6e2afe4c358176a787bd31a271] 该网关生成一个唯一的关联 ID 并将其插入到 JMSCorrelationID 头中。消息选择器:

The gateway generates a unique correlation IS and inserts it in the JMSCorrelationID header. The message selector is:

messageSelector = "JMSCorrelationID = '" + 唯一 ID + "'"

messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

响应系统预期在回复 JMSCorrelationID 中返回输入 JMSCorrelationID。这是一个常见模式,并已由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 为消息驱动的 POJO 实现。

The responding system is expected to return the inbound JMSCorrelationID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs. . A reply-destination* property is provided, no <reply-listener/> is provided, and correlation-key="myCorrelationHeader"[.iokays-translated-3957575387070bbafa3380258ac8fe2e] 该网关生成一个唯一的关联 ID 并将其插入到 myCorrelationHeader 消息属性中。correlation-key 可以是任意用户定义的值。消息选择器:

The gateway generates a unique correlation ID and inserts it in the myCorrelationHeader message property. The correlation-key can be any user-defined value. The message selector is:

messageSelector = "myCorrelationHeader = '" + 唯一 ID + "'"

messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

响应系统预期在回复 myCorrelationHeader 中返回输入 myCorrelationHeader

The responding system is expected to return the inbound myCorrelationHeader in the reply myCorrelationHeader. . A reply-destination* property is provided, no <reply-listener/> is provided, and correlation-key="JMSCorrelationID*" (Note the * in the correlation key.)[.iokays-translated-3e111e19a9c0434dc8891dd1ce1c52d8] 该网关使用请求消息(如果存在)中 jms_correlationId 头中的值,并将其插入到 JMSCorrelationID 头中。消息选择器:

The gateway uses the value in the jms_correlationId header (if present) from the request message and inserts it in the JMSCorrelationID header. The message selector is: messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

用户必须确保该值唯一。

The user must ensure this value is unique.

如果该标头不存在,该网关将表现为 3

If the header does not exist, the gateway behaves as in 3.

响应系统预期在回复 JMSCorrelationID 中返回输入 JMSCorrelationID。这是一个常见模式,并已由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 为消息驱动的 POJO 实现。

The responding system is expected to return the inbound JMSCorrelationID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs. . No reply-destination* properties is provided, and a <reply-listener> is provided[.iokays-translated-69b87172be4e197a166f21a5cfe9cf9e] 创建一个临时队列,并将其用于此网关实例的所有回复。消息中不需要关联数据,但输出 JMSMessageID 在网关中内部使用,将回复定向到正确的请求线程。

A temporary queue is created and used for all replies from this gateway instance. No correlation data is needed in the message, but the outgoing JMSMessageID is used internally in the gateway to direct the reply to the correct requesting thread. . A reply-destination* property is provided, a <reply-listener> is provided, and no correlation-key is provided[.iokays-translated-4c119ecd8933308f2cb78d1b8c021a85] 不允许。

Not allowed.

\<reply-listener/\> 配置被忽略,并且网关表现为 2。会写入一条警告日志消息来表示此情况。

The <reply-listener/> configuration is ignored, and the gateway behaves as in 2. A warning log message is written to indicate this situation. . A reply-destination* property is provided, a <reply-listener> is provided, and correlation-key="JMSCorrelationID"[.iokays-translated-f7ed56f26c4cf1fe697e56736b7cc7f2] 该网关具有一个唯一的关联 ID,并将其与 JMSCorrelationID 头中的增量值一起插入(gatewayId + "_" + ++seq)。消息选择器:

The gateway has a unique correlation ID and inserts it, together with an incrementing value in the JMSCorrelationID header (gatewayId + "_" + ++seq). The message selector is: messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

响应系统预期在回复 JMSCorrelationID 中返回输入 JMSCorrelationID。这是一个常见模式,并已由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 为消息驱动的 POJO 实现。由于每个网关都有一个唯一 ID,每个实例只会收到自己的回复。所有关联数据都用于将回复路由到正确的请求线程。

The responding system is expected to return the inbound JMSCorrelationID in the reply JMSCorrelationID. This is a common pattern and is implemented by the Spring Integration inbound gateway as well as Spring’s MessageListenerAdapter for message-driven POJOs. Since each gateway has a unique ID, each instance gets only its own replies. The complete correlation data is used to route the reply to the correct requesting thread. . A reply-destination* property is provided a <reply-listener/> is provided, and correlation-key="myCorrelationHeader"[.iokays-translated-e57ac4463895b4e427fe90a333d3f135] 该网关具有一个唯一的关联 ID,并将其与 myCorrelationHeader 属性中的增量值一起插入(gatewayId + "_" + ++seq)。correlation-key 可以是任意用户定义的值。消息选择器:

The gateway has a unique correlation ID and inserts it, together with an incrementing value in the myCorrelationHeader property (gatewayId + "_" + ++seq). The correlation-key can be any user-defined value. The message selector is: messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

预期的响应系统在响应 myCorrelationHeader 中返回入站 myCorrelationHeader 。由于每个网关都有一个唯一的 ID,所以每个实例仅获取其自己的响应。完整的关联数据用于将 reply 路由到正确的请求线程。

The responding system is expected to return the inbound myCorrelationHeader in the reply myCorrelationHeader. Since each gateway has a unique ID, each instance only gets its own replies. The complete correlation data is used to route the reply to the correct requesting thread. . A reply-destination* property is provided, a <reply-listener/> is provided, and correlation-key="JMSCorrelationID*"[.iokays-translated-e61dcad2fb796f98b2e5b2cac4c0e947] (注意关联键中的 *

(Note the * in the correlation key)

不允许。

Not allowed.

使用响应侦听器时,不允许用户提供的关联 ID。网关不会使用此配置初始化。

User-supplied correlation IDs are not permitted with a reply listener. The gateway does not initialize with this configuration.

Async Gateway

从版本 4.3 开始,现在可以在配置出站网关时指定 async="true"(或在 Java 中 setAsync(true))。

Starting with version 4.3, you can now specify async="true" (or setAsync(true) in Java) when configuring the outbound gateway.

默认情况下,当向网关发送请求时,请求线程将挂起,直到收到回复。然后流继续在该线程上。如果 asynctrue,则请求线程在 send() 完成后立即释放,并在侦听器容器线程上返回回复(并继续流)。当网关在轮询线程上调用时,这可能很有用。线程被释放,可用于框架内的其他任务。

By default, when a request is sent to the gateway, the requesting thread is suspended until the reply is received. The flow then continues on that thread. If async is true, the requesting thread is released immediately after the send() completes, and the reply is returned (and the flow continues) on the listener container thread. This can be useful when the gateway is invoked on a poller thread. The thread is released and is available for other tasks within the framework.

async 需要一个 <reply-listener/> (或在使用 Java 配置时 setUseReplyContainer(true) )。它还需要一个要指定的 correlationKey (通常是 JMSCorrelationID )。如果未满足以上任一条件,则忽略 async

Thee async requires a <reply-listener/> (or setUseReplyContainer(true) when using Java configuration). It also requires a correlationKey (usually JMSCorrelationID) to be specified. If either of these conditions are not met, async is ignored.

Attribute Reference

以下列表显示 outbound-gateway 的所有可用属性:

The following listing shows all the available attributes for an outbound-gateway:

<int-jms:outbound-gateway
    connection-factory="connectionFactory" 1
    correlation-key="" 2
    delivery-persistent="" 3
    destination-resolver="" 4
    explicit-qos-enabled="" 5
    extract-reply-payload="true" 6
    extract-request-payload="true" 7
    header-mapper="" 8
    message-converter="" 9
    priority="" 10
    receive-timeout="" 11
    reply-channel="" 12
    reply-destination="" 13
    reply-destination-expression="" 14
    reply-destination-name="" 15
    reply-pub-sub-domain="" 16
    reply-timeout="" 17
    request-channel="" 18
    request-destination="" 19
    request-destination-expression="" 20
    request-destination-name="" 21
    request-pub-sub-domain="" 22
    time-to-live="" 23
    requires-reply="" 24
    idle-reply-listener-timeout="" 25
    async=""> 26
  <int-jms:reply-listener /> 27
</int-jms:outbound-gateway>
1 Reference to a jakarta.jms.ConnectionFactory. The default jmsConnectionFactory.
2 The name of a property that contains correlation data to correlate responses with replies. If omitted, the gateway expects the responding system to return the value of the outbound JMSMessageID header in the JMSCorrelationID header. If specified, the gateway generates a correlation ID and populates the specified property with it. The responding system must echo back that value in the same property. It can be set to JMSCorrelationID, in which case the standard header is used instead of a String property to hold the correlation data. When you use a <reply-container/>, you must specify the correlation-key if you provide an explicit reply-destination. Starting with version 4.0.1, this attribute also supports the value JMSCorrelationID*, which means that if the outbound message already has a JMSCorrelationID (mapped from the jms_correlationId) header, it is used instead of generating a new one. Note, the JMSCorrelationID* key is not allowed when you use a <reply-container/>, because the container needs to set up a message selector during initialization.[.iokays-translated-2d8fbc4eb7284e710cbd7a2d33775ecf]

你应该明白,网关无法确保唯一性,如果提供的关联 ID 不是唯一的,则可能会产生意外的副作用。

You should understand that the gateway has no way to ensure uniqueness, and unexpected side effects can occur if the provided correlation ID is not unique.

1 A boolean value indicating whether the delivery mode should be DeliveryMode.PERSISTENT (true) or DeliveryMode.NON_PERSISTENT (false). This setting takes effect only if explicit-qos-enabled is true.
2 A DestinationResolver. The default is a DynamicDestinationResolver, which maps the destination name to a queue or topic of that name.
3 When set to true, it enables the use of quality of service attributes: priority, delivery-mode, and time-to-live.
4 When set to true (the default), the payload of the Spring Integration reply message is created from the JMS Reply message’s body (by using the MessageConverter). When set to false, the entire JMS message becomes the payload of the Spring Integration message.
5 When set to true (the default), the payload of the Spring Integration message is converted to a JMSMessage (by using the MessageConverter). When set to false, the entire Spring Integration Message is converted to the JMSMessage. In both cases, the Spring Integration message headers are mapped to JMS headers and properties by using the HeaderMapper.
6 A HeaderMapper used to map Spring Integration message headers to and from JMS message headers and properties.
7 A reference to a MessageConverter for converting between JMS messages and the Spring Integration message payloads (or messages if extract-request-payload is false). The default is a SimpleMessageConverter.
8 The default priority of request messages. Overridden by the message priority header, if present. Its range is 0 to 9. This setting takes effect only if explicit-qos-enabled is true.
9 The time (in milliseconds) to wait for a reply. The default is 5000 (five seconds).
10 The channel to which the reply message is sent.
11 A reference to a Destination, which is set as the JMSReplyTo header. At most, only one of reply-destination, reply-destination-expression, or reply-destination-name is allowed. If none is provided, a TemporaryQueue is used for replies to this gateway.
12 A SpEL expression evaluating to a Destination, which will be set as the JMSReplyTo header. The expression can result in a Destination object or a String. It is used by the DestinationResolver to resolve the actual Destination. At most, only one of reply-destination, reply-destination-expression, or reply-destination-name is allowed. If none is provided, a TemporaryQueue is used for replies to this gateway.
13 The name of the destination that is set as the JMSReplyTo header. It is used by the DestinationResolver to resolve the actual Destination. At most, only one of reply-destination, reply-destination-expression, or reply-destination-name is allowed. If none is provided, a TemporaryQueue is used for replies to this gateway.
14 When set to true, it indicates that any reply Destination resolved by the DestinationResolver should be a Topic rather then a Queue.
15 The time the gateway waits when sending the reply message to the reply-channel. This only has an effect if the reply-channel can block — such as a QueueChannel with a capacity limit that is currently full. The default is infinity.
16 The channel on which this gateway receives request messages.
17 A reference to a Destination to which request messages are sent. One of reply-destination, reply-destination-expression, or reply-destination-name is required. You can use only one of those three attributes.
18 A SpEL expression evaluating to a Destination to which request messages are sent. The expression can result in a Destination object or a String. It is used by the DestinationResolver to resolve the actual Destination. One of the reply-destination, reply-destination-expression, or reply-destination-name is required. You can use only one of those three attributes.
19 The name of the destination to which request messages are sent. It is used by the DestinationResolver to resolve the actual Destination. One of reply-destination, reply-destination-expression, or reply-destination-name is required. You can use only one of those three attributes.
20 When set to true, it indicates that any request Destination resolved by the DestinationResolver should be a Topic rather then a Queue.
21 Specifies the message time to live. This setting takes effect only if explicit-qos-enabled is true.
22 Specifies whether this outbound gateway must return a non-null value. By default, this value is true, and a MessageTimeoutException is thrown when the underlying service does not return a value after the receive-timeout. Note that, if the service is never expected to return a reply, it would be better to use a <int-jms:outbound-channel-adapter/> instead of a <int-jms:outbound-gateway/> with requires-reply="false". With the latter, the sending thread is blocked, waiting for a reply for the receive-timeout period.
23 When you use a <reply-listener />, its lifecycle (start and stop) matches that of the gateway by default. When this value is greater than 0, the container is started on demand (when a request is sent). The container continues to run until at least this time elapses with no requests being received (and until no replies are outstanding). The container is started again on the next request. The stop time is a minimum and may actually be up to 1.5x this value.
24 See Async Gateway.
25 When this element is included, replies are received by an asynchronous MessageListenerContainer rather than creating a consumer for each reply. This can be more efficient in many cases.

Mapping Message Headers to and from JMS Message

JMS 消息可以包含元信息,例如 JMS API 头和简单属性。你可以使用 JmsHeaderMapper 将它们映射到 Spring Integration 消息头和从 Spring Integration 消息头映射。JMS API 头传递给适当的 setter 方法(例如 setJMSReplyTo),而其他头则复制到 JMS 消息的一般属性。JMS 出站网关使用 JmsHeaderMapper 的默认实现自举,该实现将映射标准 JMS API 头以及原语或 String 消息头。你还可以使用入站和出站网关的 header-mapper 属性提供自定义头映射器。

JMS messages can contain meta-information such as JMS API headers and simple properties. You can map those to and from Spring Integration message headers by using JmsHeaderMapper. The JMS API headers are passed to the appropriate setter methods (such as setJMSReplyTo), whereas other headers are copied to the general properties of the JMS Message. JMS outbound gateway is bootstrapped with the default implementation of JmsHeaderMapper, which will map standard JMS API Headers as well as primitive or String message headers. You could also provide a custom header mapper by using the header-mapper attribute of inbound and outbound gateways.

许多 JMS 厂商特定的客户端不允许在已创建的 JMS 消息上直接设置 deliveryMode、priority 和 redeliveryDelay 属性。它们被认为是 QoS 属性,因此必须传播到目标 JMS API。出于此原因,Outbound Channel Adapter 不将相应的 Spring Integration 头(或表达式结果)映射到所提到的 JMS 消息属性。相反,Outbound Channel Adapter 使用 MessageBuilder,将请求消息中的头值传播到 JMS API。要启用此功能,必须使用 deliveryMode 属性为 true 配置 outbound 端点。Spring Integration Java DSL 默认为 outbound 端点配置 deliveryMode,但仍然必须设置 delay 属性。

Many JMS vendor-specific clients don’t allow setting the deliveryMode, priority and timeToLive properties directly on an already created JMS message. They are considered to be QoS properties and therefore have to be propagated to the target MessageProducer.send(message, deliveryMode, priority, timeToLive) API. For this reason the DefaultJmsHeaderMapper doesn’t map appropriate Spring Integration headers (or expression results) into the mentioned JMS message properties. Instead, a DynamicJmsTemplate is used by the JmsSendingMessageHandler to propagate header values from the request message into the MessageProducer.send() API. To enable this feature, you must configure the outbound endpoint with a DynamicJmsTemplate with its explicitQosEnabled property set to true. The Spring Integration Java DSL configures a DynamicJmsTemplate by default but you must still set the explicitQosEnabled property.

从 4.0 版本开始,JMSPriority 标头已映射到标准 priority 标头,用于入站消息。先前,priority 标头仅用于出站消息。若要还原到之前的行为(即,不映射入站优先级),请将 DefaultJmsHeaderMappermapInboundPriority 属性设置为 false

Since version 4.0, the JMSPriority header is mapped to the standard priority header for inbound messages. Previously, the priority header was only used for outbound messages. To revert to the previous behavior (that is, to not map the inbound priority), set the mapInboundPriority property of DefaultJmsHeaderMapper to false.

从 4.3 版本开始,DefaultJmsHeaderMapper 通过调用其 toString() 方法将标准 correlationId 标头映射为消息属性(correlationId 通常是 UUID,不被 JMS 支持)。在入站方面,它被映射为 String。这与 jms_correlationId 标头无关,后者映射至 JMSCorrelationID 标头并反向映射。JMSCorrelationID 通常用于关联请求和答复,而 correlationId 通常用于将相关消息合并到一个组中(例如,与聚合器或重新排序器一起)。

Since version 4.3, the DefaultJmsHeaderMapper maps the standard correlationId header as a message property by invoking its toString() method (correlationId is often a UUID, which is not supported by JMS). On the inbound side, it is mapped as a String. This is independent of the jms_correlationId header, which is mapped to and from the JMSCorrelationID header. The JMSCorrelationID is generally used to correlate requests and replies, whereas the correlationId is often used to combine related messages into a group (such as with an aggregator or a resequencer).

从版本 5.1 开始,可以配置 DefaultJmsHeaderMapper 来映射入站 JMSDeliveryModeJMSExpiration 属性:

Starting with version 5.1, the DefaultJmsHeaderMapper can be configured for mapping inbound JMSDeliveryMode and JMSExpiration properties:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

这些 JMS 属性分别映射到 Spring 消息头 JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATION

These JMS properties are mapped to the JmsHeaders.DELIVERY_MODE and JmsHeaders.EXPIRATION Spring Message headers respectively.

Message Conversion, Marshalling, and Unmarshalling

如果你需要转换消息,则所有 JMS 适配器和网关都允许你通过设置 message-converter 属性来提供 MessageConverter 。为此,请提供同一 ApplicationContext 中可用的 MessageConverter 实例的 bean 名称。此外,为了与 marshaller 和 unmarshaller 接口保持一致,Spring 提供了 MarshallingMessageConverter,你可以使用你自己的自定义 marshaller 和 unmarshaller 对其进行配置。以下示例展示了如何执行此操作:

If you need to convert the message, all JMS adapters and gateways let you provide a MessageConverter by setting the message-converter attribute. To do so, provide the bean name of an instance of MessageConverter that is available within the same ApplicationContext. Also, to provide some consistency with marshaller and unmarshaller interfaces, Spring provides MarshallingMessageConverter, which you can configure with your own custom marshallers and unmarshallers. The following example shows how to do so

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>

当您提供自己的 MessageConverter 实例时,它仍会包含在 HeaderMappingMessageConverter 中。这意味着“提取请求有效负载”和“提取答复有效负载”属性会影响传递给转换器的实际对象。HeaderMappingMessageConverter 本身委托给目标 MessageConverter,同时还将 Spring Integration MessageHeaders 映射到 JMS 消息属性,然后反向映射。

When you provide your own MessageConverter instance, it is still wrapped within the HeaderMappingMessageConverter. This means that the 'extract-request-payload' and 'extract-reply-payload' properties can affect the actual objects passed to your converter. The HeaderMappingMessageConverter itself delegates to a target MessageConverter while also mapping the Spring Integration MessageHeaders to JMS message properties and back again.

JMS-backed Message Channels

前面介绍的通道适配器和网关都适用于与其他外部系统集成的应用程序。入站选项假定某些其他系统正在向 JMS 目标发送 JMS 消息,并且出站选项假定某些其他系统正在从该目标接收。其他系统可能是或可能不是 Spring Integration 应用程序。当然,当将 Spring Integration 消息实例本身作为 JMS 消息主体发送时(将 'extract-payload' 值设置为 false),则假定其他系统基于 Spring Integration。然而,这绝不是必须的。这种灵活性是使用带有“channels”(或 JMS 中的目标)抽象的消息传递集成选项的优点之一。

The channel adapters and gateways featured earlier are all intended for applications that integrate with other external systems. The inbound options assume that some other system is sending JMS messages to the JMS destination, and the outbound options assume that some other system is receiving from the destination. The other system may or may not be a Spring Integration application. Of course, when sending a Spring Integration message instance as the body of the JMS message itself (with 'extract-payload' value set to false), it is assumed that the other system is based on Spring Integration. However, that is by no means a requirement. That flexibility is one of the benefits of using a message-based integration option with the abstraction of “channels”( or destinations in the case of JMS).

有时,给定 JMS 目标的生产者和消费者都旨在成为同一应用程序的一部分,在同一个进程中运行。你可以使用一对入站和出站通道适配器来完成此操作。这种方法的问题在于你需要两个适配器,即使从概念上来说,目标是拥有一个消息通道。从 Spring Integration 版本 2.0 开始支持更好的选择。现在在使用 JMS 命名空间时可以定义一个 "`channel“` ,如下例所示:

Sometimes, both the producer and consumer for a given JMS Destination are intended to be part of the same application, running within the same process. You can accomplish this by using a pair of inbound and outbound channel adapters. The problem with that approach is that you need two adapters, even though, conceptually, the goal is to have a single message channel. A better option is supported as of Spring Integration version 2.0. Now it is possible to define a single “channel” when using the JMS namespace, as the following example shows:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

上一个示例中的通道的行为与主 Spring Integration 命名空间中的普通 <channel/> 元素非常相似。它可以被任何端点的 input-channeloutput-channel 属性引用。不同之处在于此通道由名为 exampleQueue 的 JMS 队列实例支持。这意味着可以在生产端点和消费端点之间进行异步消息传递。但是,与通过在非 JMS <channel/> 元素中添加 <queue/> 元素创建的更简单的异步消息通道不同,这些消息不会存储在内存队列中。相反,这些消息在 JMS 消息主体中传递,然后可用于该通道的底层 JMS 提供程序的全部功能。使用此替代方案的最常见理由可能是利用 JMS 消息传递的存储转发方法提供的持久性。

The channel in the preceding example behaves much like a normal <channel/> element from the main Spring Integration namespace. It can be referenced by both the input-channel and output-channel attributes of any endpoint. The difference is that this channel is backed by a JMS Queue instance named exampleQueue. This means that asynchronous messaging is possible between the producing and consuming endpoints. However, unlike the simpler asynchronous message channels created by adding a <queue/> element within a non-JMS <channel/> element, the messages are not stored in an in-memory queue. Instead, those messages are passed within a JMS message body, and the full power of the underlying JMS provider is then available for that channel. Probably the most common rationale for using this alternative is to take advantage of the persistence made available by the store-and-forward approach of JMS messaging.

如果配置正确,JMS 支持的消息通道还支持事务。换句话说,如果生产者的发送操作是回滚事务的一部分,那么生产者实际上不会向事务性 JMS 支持的通道写入。同样地,如果对该消息的接收是回滚事务的一部分,则消费者不会从通道中物理删除 JMS 消息。请注意,在此方案中,生产者和消费者事务是分开的。这与传播事务上下文到没有 <queue/> 子元素的简单同步 <channel/> 元素有很大不同。

If configured properly, the JMS-backed message channel also supports transactions. In other words, a producer would not actually write to a transactional JMS-backed channel if its send operation is part of a transaction that rolls back. Likewise, a consumer would not physically remove a JMS message from the channel if the reception of that message is part of a transaction that rolls back. Note that the producer and consumer transactions are separate in such a scenario. This is significantly different from the propagation of a transactional context across a simple, synchronous <channel/> element that has no <queue/> child element.

由于上一个示例引用了一个 JMS 队列实例,因此它充当一个点对点通道。另一方面,如果你需要发布订阅行为,则可以使用一个单独的元素并引用一个 JMS 主题。以下示例展示了如何执行此操作:

Since the preceding example above references a JMS Queue instance, it acts as a point-to-point channel. If, on the other hand, you need publish-subscribe behavior, you can use a separate element and reference a JMS Topic instead. The following example shows how to do so:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

对于两种类型的 JMS 支持的通道,都可以提供目标名称而不是引用,如下例所示:

For either type of JMS-backed channel, the name of the destination may be provided instead of a reference, as the following example shows:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在以上示例中,目标名称由 Spring 的默认 DynamicDestinationResolver 实现解析,但你可以提供 DestinationResolver 接口的任何实现。另外,JMS ConnectionFactory 是通道的必备属性,但默认情况下,预期的 bean 名称将为 jmsConnectionFactory。以下示例提供了用于解析 JMS 目标名称的自定义实例和 ConnectionFactory 的不同名称:

In the preceding examples, the destination names are resolved by Spring’s default DynamicDestinationResolver implementation, but you could provide any implementation of the DestinationResolver interface. Also, the JMS ConnectionFactory is a required property of the channel, but, by default, the expected bean name would be jmsConnectionFactory. The following example provides both a custom instance for resolution of the JMS destination names and a different name for the ConnectionFactory:

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

对于 <publish-subscribe-channel />,将 durable 属性设置为 true 以启用持久订阅,或将该属性设置为 subscription-shared 以启用共享订阅(需要 JMS 2.0 代理,且版本 4.2 之后一直可用)。使用 subscription 来命名订阅。

For the <publish-subscribe-channel />, set the durable attribute to true for a durable subscription or subscription-shared for a shared subscription (requires a JMS 2.0 broker and has been available since version 4.2). Use subscription to name the subscription.

Using JMS Message Selectors

使用 JMS 消息选择器,您既可以根据 JMS 标头也可以根据 JMS 属性来筛选 JMS Messages。例如,如果您想收听自定义 JMS 标头属性为 myHeaderProperty、等于 something 的消息,您可以指定以下表达式:

With JMS message selectors, you can filter JMS Messages based on JMS headers as well as JMS properties. For example, if you want to listen to messages whose custom JMS header property, myHeaderProperty, equals something, you can specify the following expression:

myHeaderProperty = 'something'

消息选择器表达式是 SQL-92 条件表达式语法的子集,并且被定义为 Java Message Service 规范的一部分。您可以使用 XML 命名空间配置为以下 Spring Integration JMS 组件指定 JMS 消息 selector 属性:

Message selector expressions are a subset of the SQL-92 conditional expression syntax and are defined as part of the Java Message Service specification. You can specify the JMS message selector attribute by using XML namespace configuration for the following Spring Integration JMS components:

  • JMS Channel

  • JMS Publish Subscribe Channel

  • JMS Inbound Channel Adapter

  • JMS Inbound Gateway

  • JMS Message-driven Channel Adapter

您无法使用 JMS 消息选择器来引用消息体值。

You cannot reference message body values by using JMS Message selectors.

JMS Samples

要尝试使用这些 JMS 适配器,请查看 Spring Integration Samples Git 存储库中提供的 JMS 样品,网址为 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms.

To experiment with these JMS adapters, check out the JMS samples available in the Spring Integration Samples Git repository at https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms.

该存储库包括两个样本。一个提供入站和出站通道适配器,另一个提供入站和出站网关。它们被配置为使用嵌入式 ActiveMQ 进程运行,但是您可以修改每个样本的 common.xml Spring 应用程序上下文文件,以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。

That repository includes two samples. One provides inbound and outbound channel adapters, and the other provides inbound and outbound gateways. They are configured to run with an embedded ActiveMQ process, but you can modify the common.xml Spring application context file of each sample to support either a different JMS provider or a standalone ActiveMQ process.

换句话说,你可以拆分配置,以便入站和出站适配器在单独的 JVM 中运行。如果你已安装 ActiveMQ,请将 common.xml 文件中的 brokerURL 属性修改为使用 tcp://localhost:61616(而不是 vm://localhost)。这两个示例都接受来自标准输入的输入,并回显到标准输出。查看配置,了解如何通过 JMS 路由这些消息。

In other words, you can split the configuration so that the inbound and outbound adapters run in separate JVMs. If you have ActiveMQ installed, modify the brokerURL property within the common.xml file to use tcp://localhost:61616 (instead of vm://localhost). Both of the samples accept input from stdin and echo back to stdout. Look at the configuration to see how these messages are routed over JMS.