Transaction Support

从 5.0 版本开始,引入了新的 TransactionHandleMessageAdvice,以便实现整个下游流程事务,这要归功于 HandleMessageAdvice 实现。如果在 <request-handler-advice-chain> 元素中使用了常规的 TransactionInterceptor(例如,通过配置 <tx:advice>),则启动的事务仅适用于内部 AbstractReplyProducingMessageHandler.handleRequestMessage(),并且不会传播到下游流程。

Starting with version 5.0, a new TransactionHandleMessageAdvice has been introduced to make the whole downstream flow transactional, thanks to the HandleMessageAdvice implementation. When a regular TransactionInterceptor is used in the <request-handler-advice-chain> element (for example, through configuring <tx:advice>), a started transaction is applied only for an internal AbstractReplyProducingMessageHandler.handleRequestMessage() and is not propagated to the downstream flow.

为了简化 XML 配置,除了 <request-handler-advice-chain> 之外,还在所有 <outbound-gateway><service-activator> 及相关组件中添加了一个 <transactional> 元素。以下示例显示了 <transactional> 的用法:

To simplify XML configuration, along with the <request-handler-advice-chain>, a <transactional> element has been added to all <outbound-gateway> and <service-activator> and related components. The following example shows <transactional> in use:

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
        <int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

如果您熟悉 JPA integration components,那么这样的配置并不新鲜,但现在我们可以从流程中的任何点开始事务 - 不仅仅是从 <poller> 或消息驱动信道适配器(例如 JMS)。

If you are familiar with the JPA integration components, such a configuration is not new, but now we can start a transaction from any point in our flow — not only from the <poller> or a message-driven channel adapter such as JMS.

可以通过使用 TransactionInterceptorBuilder 简化 Java 配置,并且结果 bean 名称可用于 messaging annotations adviceChain 属性中,如下例所示:

Java configuration can be simplified by using the TransactionInterceptorBuilder, and the result bean name can be used in the messaging annotations adviceChain attribute, as the following example shows:

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

请注意 TransactionInterceptorBuilder 构造函数上的 true 参数。它会导致创建 TransactionHandleMessageAdvice,而不是常规的 TransactionInterceptor

Note the true parameter on the TransactionInterceptorBuilder constructor. It causes the creation of a TransactionHandleMessageAdvice, not a regular TransactionInterceptor.

Java DSL 通过端点配置上的 .transactional() 选项支持 Advice,如下面的示例所示:

Java DSL supports an Advice through the .transactional() options on the endpoint configuration, as the following example shows:

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}