Outbound Gateways

JPA 入站通道适配器让你可以轮询数据库,以检索一个或更多个 JPA 实体。检索到的数据随后用于启动 Spring 集成流,而该流将检索到的数据用作消息有效载荷。 此外,你可以在流的末尾使用 JPA 出站通道适配器,以便持久化数据,并在持久化操作结束时停止流。 然而,如何在流的中间执行 JPA 持久化操作呢?例如,你可能有在 Spring 集成消息流中处理的业务数据,并且希望将其持久化,但是你仍然需要进一步使用其他组件。或者,除了使用轮询器轮询数据库外,你还需要执行 JPQL 查询,并主动检索数据,然后在流内的后续组件中处理该数据。 这正是 JPA 出站网关发挥作用的地方。它们让你能够持久化数据以及检索数据。为了促进这些用途,Spring 集成提供了两种类型的 JPA 出站网关:

  • Updating outbound gateway

  • Retrieving outbound gateway

无论何时使用出站网关来执行保存、更新或仅在数据库中删除某些记录的操作时,都需要使用一个更新出站网关。例如,如果你使用 entity 对其进行持久化,则会返回一个合并的持久化实体作为结果。在其他情况下,将返回受影响的记录数(已更新或已删除)。 从数据库中检索(选择)数据时,我们使用检索出站网关。使用检索出站网关,我们可以使用 JPQL、命名查询(原生或基于 JPQL)或原生查询(SQL)来选择数据并检索结果。 更新出站网关在功能上类似于出站通道适配器,不同之处在于更新出站网关在执行 JPA 操作后将结果发送至网关的回复通道。 检索出站网关就类似于入站通道适配器。

我们建议你先阅读本章前面部分的 Outbound Channel Adapter 部分和 Inbound Channel Adapter 部分,因为那里解释了大多数常见概念。

这种相似性是使用中心 JpaExecutor 类统一尽可能多的通用功能的主要因素。 对于所有 JPA 出站网关以及与 outbound-channel-adapter 类似,我们可以进行各种 JPA 操作:

  • Entity classes

  • JPA Query Language (JPQL)

  • Native query

  • Named query

有关配置示例,请参见 JPA Outbound Gateway Samples

Common Configuration Parameters

JPA 出站网关始终可以访问 Spring 集成 Message 作为输入。因此,可以获得以下参数:

parameter-source-factory

An instance of o.s.i.jpa.support.parametersource.ParameterSourceFactory used to get an instance of o.s.i.jpa.support.parametersource.ParameterSource. The ParameterSource is used to resolve the values of the parameters provided in the query. If you perform operations by using a JPA entity, the parameter-source-factory attribute is ignored. The parameter sub-elements are mutually exclusive with the parameter-source-factory and they have to be configured on the provided ParameterSourceFactory. Optional.

use-payload-as-parameter-source

If set to true, the payload of the Message is used as a source for parameters. If set to false, the entire Message is available as a source for parameters. If no JPA Parameters are passed in, this property defaults to true. This means that, if you use a default BeanPropertyParameterSourceFactory, the bean properties of the payload are used as a source for parameter values for the JPA query. However, if JPA Parameters are passed in, this property, by default, evaluates to false. The reason is that JPA Parameters let you provide SpEL Expressions. Therefore, it is highly beneficial to have access to the entire Message, including the headers. Optional.

Updating Outbound Gateway

以下列表显示了可以在更新出站网关上设置的所有属性,并描述了键属性:

<int-jpa:updating-outbound-gateway request-channel=""  1
    auto-startup="true"
    entity-class=""
    entity-manager=""
    entity-manager-factory=""
    id=""
    jpa-operations=""
    jpa-query=""
    named-query=""
    native-query=""
    order=""
    parameter-source-factory=""
    persist-mode="MERGE"
    reply-channel=""  2
    reply-timeout=""  3
    use-payload-as-parameter-source="true">

    <int:poller/>
    <int-jpa:transactional/>

    <int-jpa:parameter name="" type="" value=""/>
    <int-jpa:parameter name="" expression=""/>
</int-jpa:updating-outbound-gateway>
1 出站网关接收消息以执行所需操作的通道。此属性类似于 outbound-channel-adapterchannel 属性。可选。
2 在执行所需的 JPA 操作后,网关向其发送响应的通道。如果没有定义此属性,则请求消息必须具有 replyChannel 标头。可选。
3 指定网关等待将结果发送到回复通道的时间。仅适用于回复通道本身可能阻塞发送操作的情况(例如,当前已满的绑定 QueueChannel)。该值以毫秒为单位指定。可选。

本文前面描述了其余的属性。请参阅 Configuration Parameter ReferenceConfiguration Parameter Reference

Configuring with Java Configuration

以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器的示例:

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @MessagingGateway
    interface JpaGateway {

       @Gateway(requestChannel = "jpaUpdateChannel")
       @Transactional
       void updateStudent(StudentDomain payload);

    }

    @Bean
    @ServiceActivator(channel = "jpaUpdateChannel")
    public MessageHandler jpaOutbound() {
        JpaOutboundGateway adapter =
               new JpaOutboundGateway(new JpaExecutor(this.entityManagerFactory));
        adapter.setOutputChannelName("updateResults");
        return adapter;
    }

}

Configuring with the Java DSL

以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Bean
    public IntegrationFlow updatingGatewayFlow() {
        return f -> f
                .handle(Jpa.updatingGateway(this.entityManagerFactory),
                        e -> e.transactional(true))
                .channel(c -> c.queue("updateResults"));
    }

}

Retrieving Outbound Gateway

以下示例演示了如何配置检索出站网关:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Bean
    public IntegrationFlow retrievingGatewayFlow() {
        return f -> f
                .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                       .jpaQuery("from Student s where s.id = :id")
                       .expectSingleResult(true)
                       .parameterExpression("id", "payload"))
                .channel(c -> c.queue("retrieveResults"));
    }

}
@Bean
fun retrievingGatewayFlow() =
    integrationFlow {
        handle(Jpa.retrievingGateway(this.entityManagerFactory)
                .jpaQuery("from Student s where s.id = :id")
                .expectSingleResult(true)
                .parameterExpression("id", "payload"))
        channel { queue("retrieveResults") }
    }
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;


    @Bean
    public JpaExecutor jpaExecutor() {
        JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
        jpaExecutor.setJpaQuery("from Student s where s.id = :id");
        executor.setJpaParameters(Collections.singletonList(new JpaParameter("id", null, "payload")));
        jpaExecutor.setExpectSingleResult(true);
        return executor;
    }

    @Bean
    @ServiceActivator(channel = "jpaRetrievingChannel")
    public MessageHandler jpaOutbound() {
        JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
        adapter.setOutputChannelName("retrieveResults");
        adapter.setGatewayType(OutboundGatewayType.RETRIEVING);
        return adapter;
    }

}
<int-jpa:retrieving-outbound-gateway request-channel=""
    auto-startup="true"
    delete-after-poll="false"
    delete-in-batch="false"
    entity-class=""
    id-expression=""              1
    entity-manager=""
    entity-manager-factory=""
    expect-single-result="false"  2
    id=""
    jpa-operations=""
    jpa-query=""
    max-results=""                3
    max-results-expression=""     4
    first-result=""               5
    first-result-expression=""    6
    named-query=""
    native-query=""
    order=""
    parameter-source-factory=""
    reply-channel=""
    reply-timeout=""
    use-payload-as-parameter-source="true">
    <int:poller></int:poller>
    <int-jpa:transactional/>

    <int-jpa:parameter name="" type="" value=""/>
    <int-jpa:parameter name="" expression=""/>
</int-jpa:retrieving-outbound-gateway>
1 (自 Spring Integration 4.0 起)SpEL 表达式,用于根据评估上下文的根对象 requestMessage 针对 EntityManager.find(Class entityClass, Object primaryKey) 方法确定 primaryKey 值。如果存在,则 entityClass 参数由 entity-class 属性确定。否则,它将由 payload 类确定。如果您使用 id-expression,则不允许所有其他属性。可选。
2 指示选择操作预计会返回单个结果还是 List 结果的布尔标志。如果将此标志设置为 true,则单个实体将作为消息的有效负载发送。如果返回多个实体,则会引发异常。如果为 false,则将 List 实体作为消息的有效负载发送。它默认为 false。可选。
3 此非零、非负整数值告诉适配器在执行选择操作时不要选择超过指定数量的行。默认情况下,如果未设置此属性,则给定查询将选择所有可能的记录。此属性与 max-results-expression 互斥。可选。
4 可用于查找结果集中最大结果数的表达式。它与 max-results 互斥。可选。
5 此非零、非负整数值告诉适配器应从其检索结果的第一条记录。此属性与 first-result-expression 互斥。此属性由版本 3.0 引入。可选。
6 根据消息计算此表达式,以查找结果集中第一条记录的位置。此属性与 first-result 互斥。此属性由版本 3.0 引入。可选。

当你选择在检索时删除实体时,并且你已检索到一个实体集合,默认情况下,会逐个实体删除实体。这可能会引起性能问题。 或者,你可以将属性 deleteInBatch 设置为 true,这样可以执行批量删除。但是,这样做的限制是级联删除不受支持。 JSR 317:Java™ 持久性 2.0 在第 4.10 章“批量更新和删除操作”中指出: “删除操作仅适用于指定类及其子类的实体。它并不会级联到相关实体。” 有关更多信息,请参阅 JSR 317: Java™ Persistence 2.0

从版本 6.0 开始,当查询没有返回实体时,Jpa.retrievingGateway() 将返回一个空列表结果。以前,会返回 null 结束流程,或抛出一个异常,具体取决于 requiresReply。或者,为恢复为以前的处理方法,可以在网关后添加一个 filter 以过滤出空列表。它要求在应用程序中进行额外的配置,其中空列表处理是下游逻辑的一部分。有关可能的空列表处理选项,请参阅 Splitter Discard Channel

JPA Outbound Gateway Samples

本节包含有关使用更新出站网关和检索出站网关的各种示例:

Update by Using an Entity Class

在以下示例中,使用 org.springframework.integration.jpa.test.entity.Student 实体类作为 JPA 定义参数来持久化更新出站网关:

<int-jpa:updating-outbound-gateway request-channel="entityRequestChannel"  1
    reply-channel="entityResponseChannel"  2
    entity-class="org.springframework.integration.jpa.test.entity.Student"
    entity-manager="em"/>
1 这是出站网关的请求通道。它类似于 outbound-channel-adapterchannel 属性。
2 网关与出站适配器在此不同。这是接收来自 JPA 操作的回复的通道。但是,如果您不感兴趣接收的回复,并且只想执行操作,则使用 JPA outbound-channel-adapter 是适当的选择。在此示例中,我们使用实体类,回复是由于 JPA 操作而创建或合并的实体对象。

Update using JPQL

以下示例使用 Java 持久性查询语言 (JPQL) 更新实体,该语言要求使用更新出站网关:

<int-jpa:updating-outbound-gateway request-channel="jpaqlRequestChannel"
  reply-channel="jpaqlResponseChannel"
  jpa-query="update Student s set s.lastName = :lastName where s.rollNumber = :rollNumber"  1
  entity-manager="em">
    <int-jpa:parameter name="lastName" expression="payload"/>
    <int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:updating-outbound-gateway>
1 网关执行的 JPQL 查询。由于我们使用更新出站网关,因此仅 updatedelete JPQL 查询才是明智的选择。

当您发送包含一个名为 rollNumber 的带有 long 值的标头的 String 有效负载的消息时,该学生指定 roll number 的姓氏将更新为消息有效负载中的值。使用更新网关时,返回值始终是整数,表示受 JPA QL 执行影响的记录数。

Retrieving an Entity using JPQL

以下示例使用检索出站网关和 JPQL 从数据库中检索(选择)一个或多个实体:

<int-jpa:retrieving-outbound-gateway request-channel="retrievingGatewayReqChannel"
    reply-channel="retrievingGatewayReplyChannel"
    jpa-query="select s from Student s where s.firstName = :firstName and s.lastName = :lastName"
    entity-manager="em">
    <int-jpa:parameter name="firstName" expression="payload"/>
    <int-jpa:parameter name="lastName" expression="headers['lastName']"/>
</int-jpa:outbound-gateway>

Retrieving an Entity by Using id-expression

以下示例使用带有 id-expression 的检索出站网关,从数据库中检索(查找)一个且仅一个实体:primaryKeyid-expression 评估的结果。entityClass 是消息 payload 的一个类。

<int-jpa:retrieving-outbound-gateway
	request-channel="retrievingGatewayReqChannel"
    reply-channel="retrievingGatewayReplyChannel"
    id-expression="payload.id"
    entity-manager="em"/>

Update using a Named Query

使用命名查询基本上与直接使用 JPQL 查询相同。不同之处在于,使用的是 named-query 属性,如下例所示:

<int-jpa:updating-outbound-gateway request-channel="namedQueryRequestChannel"
    reply-channel="namedQueryResponseChannel"
    named-query="updateStudentByRollNumber"
    entity-manager="em">
    <int-jpa:parameter name="lastName" expression="payload"/>
    <int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:outbound-gateway>

你可以找到一个完整的示例应用程序,它使用 Spring Integration 的 JPA 适配器 here