Outbound Gateways

JPA 入站通道适配器让你可以轮询数据库,以检索一个或更多个 JPA 实体。检索到的数据随后用于启动 Spring 集成流,而该流将检索到的数据用作消息有效载荷。

The JPA inbound channel adapter lets you poll a database to retrieve one or more JPA entities. The retrieved data is consequently used to start a Spring Integration flow that uses the retrieved data as message payload.

此外,你可以在流的末尾使用 JPA 出站通道适配器,以便持久化数据,并在持久化操作结束时停止流。

Additionally, you can use JPA outbound channel adapters at the end of your flow in order to persist data, essentially stopping the flow at the end of the persistence operation.

然而,如何在流的中间执行 JPA 持久化操作呢?例如,你可能有在 Spring 集成消息流中处理的业务数据,并且希望将其持久化,但是你仍然需要进一步使用其他组件。或者,除了使用轮询器轮询数据库外,你还需要执行 JPQL 查询,并主动检索数据,然后在流内的后续组件中处理该数据。

However, how can you execute JPA persistence operations in the middle of a flow? For example, you may have business data that you are processing in your Spring Integration message flow and that you would like to persist, yet you still need to use other components further downstream. Alternatively, instead of polling the database using a poller, you need to execute JPQL queries and actively retrieve data, which is then processed in subsequent components within your flow.

这正是 JPA 出站网关发挥作用的地方。它们让你能够持久化数据以及检索数据。为了促进这些用途,Spring 集成提供了两种类型的 JPA 出站网关:

This is where JPA Outbound Gateways come into play. They give you the ability to persist data as well as retrieving data. To facilitate these uses, Spring Integration provides two types of JPA outbound gateways:

  • Updating outbound gateway

  • Retrieving outbound gateway

无论何时使用出站网关来执行保存、更新或仅在数据库中删除某些记录的操作时,都需要使用一个更新出站网关。例如,如果你使用 entity 对其进行持久化,则会返回一个合并的持久化实体作为结果。在其他情况下,将返回受影响的记录数(已更新或已删除)。

Whenever the outbound gateway is used to perform an action that saves, updates, or solely deletes some records in the database, you need to use an updating outbound gateway. If, for example, you use an entity to persist it, a merged and persisted entity is returned as a result. In other cases, the number of records affected (updated or deleted) is returned instead.

从数据库中检索(选择)数据时,我们使用检索出站网关。使用检索出站网关,我们可以使用 JPQL、命名查询(原生或基于 JPQL)或原生查询(SQL)来选择数据并检索结果。

When retrieving (selecting) data from the database, we use a retrieving outbound gateway. With a retrieving outbound gateway, we can use JPQL, Named Queries (native or JPQL-based), or Native Queries (SQL) for selecting the data and retrieving the results.

更新出站网关在功能上类似于出站通道适配器,不同之处在于更新出站网关在执行 JPA 操作后将结果发送至网关的回复通道。

An updating outbound gateway is functionally similar to an outbound channel adapter, except that an updating outbound gateway sends a result to the gateway’s reply channel after performing the JPA operation.

检索出站网关就类似于入站通道适配器。

A retrieving outbound gateway is similar to an inbound channel adapter.

我们建议你先阅读本章前面部分的 Outbound Channel Adapter 部分和 Inbound Channel Adapter 部分,因为那里解释了大多数常见概念。

We recommend you first read the Outbound Channel Adapter section and the Inbound Channel Adapter sections earlier in this chapter, as most of the common concepts are explained there.

这种相似性是使用中心 JpaExecutor 类统一尽可能多的通用功能的主要因素。

This similarity was the main factor to use the central JpaExecutor class to unify common functionality as much as possible.

对于所有 JPA 出站网关以及与 outbound-channel-adapter 类似,我们可以进行各种 JPA 操作:

Common for all JPA outbound gateways and similar to the outbound-channel-adapter, we can use for performing various JPA operations:

  • Entity classes

  • JPA Query Language (JPQL)

  • Native query

  • Named query

有关配置示例,请参见 JPA Outbound Gateway Samples

For configuration examples see JPA Outbound Gateway Samples.

Common Configuration Parameters

JPA 出站网关始终可以访问 Spring 集成 Message 作为输入。因此,可以获得以下参数:

JPA Outbound Gateways always have access to the Spring Integration Message as input. Consequently, the following parameters is available:

parameter-source-factory

An instance of o.s.i.jpa.support.parametersource.ParameterSourceFactory used to get an instance of o.s.i.jpa.support.parametersource.ParameterSource. The ParameterSource is used to resolve the values of the parameters provided in the query. If you perform operations by using a JPA entity, the parameter-source-factory attribute is ignored. The parameter sub-elements are mutually exclusive with the parameter-source-factory and they have to be configured on the provided ParameterSourceFactory. Optional.

use-payload-as-parameter-source

If set to true, the payload of the Message is used as a source for parameters. If set to false, the entire Message is available as a source for parameters. If no JPA Parameters are passed in, this property defaults to true. This means that, if you use a default BeanPropertyParameterSourceFactory, the bean properties of the payload are used as a source for parameter values for the JPA query. However, if JPA Parameters are passed in, this property, by default, evaluates to false. The reason is that JPA Parameters let you provide SpEL Expressions. Therefore, it is highly beneficial to have access to the entire Message, including the headers. Optional.

Updating Outbound Gateway

以下列表显示了可以在更新出站网关上设置的所有属性,并描述了键属性:

The following listing shows all the attributes that you can set on an updating-outbound-gateway and describes the key attributes:

<int-jpa:updating-outbound-gateway request-channel=""  1
    auto-startup="true"
    entity-class=""
    entity-manager=""
    entity-manager-factory=""
    id=""
    jpa-operations=""
    jpa-query=""
    named-query=""
    native-query=""
    order=""
    parameter-source-factory=""
    persist-mode="MERGE"
    reply-channel=""  2
    reply-timeout=""  3
    use-payload-as-parameter-source="true">

    <int:poller/>
    <int-jpa:transactional/>

    <int-jpa:parameter name="" type="" value=""/>
    <int-jpa:parameter name="" expression=""/>
</int-jpa:updating-outbound-gateway>
1 The channel from which the outbound gateway receives messages for performing the desired operation. This attribute is similar to the channel attribute of the outbound-channel-adapter. Optional.
2 The channel to which the gateway send the response after performing the required JPA operation. If this attribute is not defined, the request message must have a replyChannel header. Optional.
3 Specifies the time the gateway waits to send the result to the reply channel. Only applies when the reply channel itself might block the send operation (for example, a bounded QueueChannel that is currently full). The value is specified in milliseconds. Optional.

本文前面描述了其余的属性。请参阅 Configuration Parameter ReferenceConfiguration Parameter Reference

The remaining attributes are described earlier in this chapter. See Configuration Parameter Reference and Configuration Parameter Reference.

Configuring with Java Configuration

以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器的示例:

The following Spring Boot application shows an example of how configure the outbound adapter with Java:

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @MessagingGateway
    interface JpaGateway {

       @Gateway(requestChannel = "jpaUpdateChannel")
       @Transactional
       void updateStudent(StudentDomain payload);

    }

    @Bean
    @ServiceActivator(channel = "jpaUpdateChannel")
    public MessageHandler jpaOutbound() {
        JpaOutboundGateway adapter =
               new JpaOutboundGateway(new JpaExecutor(this.entityManagerFactory));
        adapter.setOutputChannelName("updateResults");
        return adapter;
    }

}

Configuring with the Java DSL

以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:

The following Spring Boot application shows an example of how to configure the outbound adapter using the Java DSL:

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Bean
    public IntegrationFlow updatingGatewayFlow() {
        return f -> f
                .handle(Jpa.updatingGateway(this.entityManagerFactory),
                        e -> e.transactional(true))
                .channel(c -> c.queue("updateResults"));
    }

}

Retrieving Outbound Gateway

以下示例演示了如何配置检索出站网关:

The following example demonstrates how to configure a retrieving outbound gateway:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Bean
    public IntegrationFlow retrievingGatewayFlow() {
        return f -> f
                .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                       .jpaQuery("from Student s where s.id = :id")
                       .expectSingleResult(true)
                       .parameterExpression("id", "payload"))
                .channel(c -> c.queue("retrieveResults"));
    }

}
@Bean
fun retrievingGatewayFlow() =
    integrationFlow {
        handle(Jpa.retrievingGateway(this.entityManagerFactory)
                .jpaQuery("from Student s where s.id = :id")
                .expectSingleResult(true)
                .parameterExpression("id", "payload"))
        channel { queue("retrieveResults") }
    }
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(JpaJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private EntityManagerFactory entityManagerFactory;


    @Bean
    public JpaExecutor jpaExecutor() {
        JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
        jpaExecutor.setJpaQuery("from Student s where s.id = :id");
        executor.setJpaParameters(Collections.singletonList(new JpaParameter("id", null, "payload")));
        jpaExecutor.setExpectSingleResult(true);
        return executor;
    }

    @Bean
    @ServiceActivator(channel = "jpaRetrievingChannel")
    public MessageHandler jpaOutbound() {
        JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
        adapter.setOutputChannelName("retrieveResults");
        adapter.setGatewayType(OutboundGatewayType.RETRIEVING);
        return adapter;
    }

}
<int-jpa:retrieving-outbound-gateway request-channel=""
    auto-startup="true"
    delete-after-poll="false"
    delete-in-batch="false"
    entity-class=""
    id-expression=""              1
    entity-manager=""
    entity-manager-factory=""
    expect-single-result="false"  2
    id=""
    jpa-operations=""
    jpa-query=""
    max-results=""                3
    max-results-expression=""     4
    first-result=""               5
    first-result-expression=""    6
    named-query=""
    native-query=""
    order=""
    parameter-source-factory=""
    reply-channel=""
    reply-timeout=""
    use-payload-as-parameter-source="true">
    <int:poller></int:poller>
    <int-jpa:transactional/>

    <int-jpa:parameter name="" type="" value=""/>
    <int-jpa:parameter name="" expression=""/>
</int-jpa:retrieving-outbound-gateway>
1 (Since Spring Integration 4.0) The SpEL expression that determines the primaryKey value for EntityManager.find(Class entityClass, Object primaryKey) method against the requestMessage as the root object of evaluation context. The entityClass argument is determined from the entity-class attribute, if present. Otherwise, it is determined from the payload class. All other attributes are disallowed if you use id-expression. Optional.
2 A boolean flag indicating whether the select operation is expected to return a single result or a List of results. If this flag is set to true, a single entity is sent as the payload of the message. If multiple entities are returned, an exception thrown. If false, the List of entities sent as the payload of the message. It defaults to false. Optional.
3 This non-zero, non-negative integer value tells the adapter not to select more than the specified number of rows on execution of the select operation. By default, if this attribute is not set, all the possible records are selected by given query. This attribute is mutually exclusive with max-results-expression. Optional.
4 An expression that can be used to find the maximum number of results in a result set. It is mutually exclusive with max-results. Optional.
5 This non-zero, non-negative integer value tells the adapter the first record from which results are to be retrieved. This attribute is mutually exclusive with first-result-expression. Version 3.0 introduced this attribute. Optional.
6 This expression is evaluated against the message, to find the position of the first record in the result set. This attribute is mutually exclusive to first-result. Version 3.0 introduced this attribute. Optional.

当你选择在检索时删除实体时,并且你已检索到一个实体集合,默认情况下,会逐个实体删除实体。这可能会引起性能问题。

When you choose to delete entities upon retrieval, and you have retrieved a collection of entities, by default, entities are deleted on a per-entity basis. This may cause performance issues.

或者,你可以将属性 deleteInBatch 设置为 true,这样可以执行批量删除。但是,这样做的限制是级联删除不受支持。

Alternatively, you can set attribute deleteInBatch to true, which performs a batch delete. However, the limitation of doing so is that cascading deletes are not supported.

JSR 317:Java™ 持久性 2.0 在第 4.10 章“批量更新和删除操作”中指出:

JSR 317: Java™ Persistence 2.0 states in chapter 4.10, “Bulk Update and Delete Operations” that:

“删除操作仅适用于指定类及其子类的实体。它并不会级联到相关实体。”

“A delete operation only applies to entities of the specified class and its subclasses. It does not cascade to related entities.”

有关更多信息,请参阅 JSR 317: Java™ Persistence 2.0

For more information, see JSR 317: Java™ Persistence 2.0

从版本 6.0 开始,当查询没有返回实体时,Jpa.retrievingGateway() 将返回一个空列表结果。以前,会返回 null 结束流程,或抛出一个异常,具体取决于 requiresReply。或者,为恢复为以前的处理方法,可以在网关后添加一个 filter 以过滤出空列表。它要求在应用程序中进行额外的配置,其中空列表处理是下游逻辑的一部分。有关可能的空列表处理选项,请参阅 Splitter Discard Channel

Starting with version 6.0, the Jpa.retrievingGateway() returns an empty list result when there are no entities returned by the query. Previously null was returned ending the flow, or throwing an exception, depending on requiresReply. Or, to revert to the previous behavior, add a filter after the gateway to filter out empty lists. It requires extra configuration in applications where empty list handling is a part of the downstream logic. See Splitter Discard Channel for possible empty list handling options.

JPA Outbound Gateway Samples

本节包含有关使用更新出站网关和检索出站网关的各种示例:

This section contains various examples of using the updating outbound gateway and the retrieving outbound gateway:

Update by Using an Entity Class

在以下示例中,使用 org.springframework.integration.jpa.test.entity.Student 实体类作为 JPA 定义参数来持久化更新出站网关:

In the following example, an updating outbound gateway is persisted by using the org.springframework.integration.jpa.test.entity.Student entity class as a JPA defining parameter:

<int-jpa:updating-outbound-gateway request-channel="entityRequestChannel"  1
    reply-channel="entityResponseChannel"  2
    entity-class="org.springframework.integration.jpa.test.entity.Student"
    entity-manager="em"/>
1 This is the request channel for the outbound gateway. It is similar to the channel attribute of the outbound-channel-adapter.
2 This is where a gateway differs from an outbound adapter. This is the channel over which the reply from the JPA operation is received. If, however, you are not interested in the reply received and want only to perform the operation, using a JPA outbound-channel-adapter is the appropriate choice. In this example, where we use an entity class, the reply is the entity object that was created or merged as a result of the JPA operation.

Update using JPQL

以下示例使用 Java 持久性查询语言 (JPQL) 更新实体,该语言要求使用更新出站网关:

The following example updates an entity by using the Java Persistence Query Language (JPQL), which mandates using an updating outbound gateway:

<int-jpa:updating-outbound-gateway request-channel="jpaqlRequestChannel"
  reply-channel="jpaqlResponseChannel"
  jpa-query="update Student s set s.lastName = :lastName where s.rollNumber = :rollNumber"  1
  entity-manager="em">
    <int-jpa:parameter name="lastName" expression="payload"/>
    <int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:updating-outbound-gateway>
1 The JPQL query that the gateway executes. Since we used updating outbound gateway, only update and delete JPQL queries would be sensible choices.

当您发送包含一个名为 rollNumber 的带有 long 值的标头的 String 有效负载的消息时,该学生指定 roll number 的姓氏将更新为消息有效负载中的值。使用更新网关时,返回值始终是整数,表示受 JPA QL 执行影响的记录数。

When you send a message with a String payload that also contains a header called rollNumber with a long value, the last name of the student with the specified roll number is updated to the value in the message payload. When using an updating gateway, the return value is always an integer value, which denotes the number of records affected by execution of the JPA QL.

Retrieving an Entity using JPQL

以下示例使用检索出站网关和 JPQL 从数据库中检索(选择)一个或多个实体:

The following example uses a retrieving outbound gateway and JPQL to retrieve (select) one or more entities from the database:

<int-jpa:retrieving-outbound-gateway request-channel="retrievingGatewayReqChannel"
    reply-channel="retrievingGatewayReplyChannel"
    jpa-query="select s from Student s where s.firstName = :firstName and s.lastName = :lastName"
    entity-manager="em">
    <int-jpa:parameter name="firstName" expression="payload"/>
    <int-jpa:parameter name="lastName" expression="headers['lastName']"/>
</int-jpa:outbound-gateway>

Retrieving an Entity by Using id-expression

以下示例使用带有 id-expression 的检索出站网关,从数据库中检索(查找)一个且仅一个实体:primaryKeyid-expression 评估的结果。entityClass 是消息 payload 的一个类。

The following example uses a retrieving outbound gateway with id-expression to retrieve (find) one and only one entity from the database: The primaryKey is the result of id-expression evaluation. The entityClass is a class of Message payload.

<int-jpa:retrieving-outbound-gateway
	request-channel="retrievingGatewayReqChannel"
    reply-channel="retrievingGatewayReplyChannel"
    id-expression="payload.id"
    entity-manager="em"/>

Update using a Named Query

使用命名查询基本上与直接使用 JPQL 查询相同。不同之处在于,使用的是 named-query 属性,如下例所示:

Using a named query is basically the same as using a JPQL query directly. The difference is that the named-query attribute is used instead, as the following example shows:

<int-jpa:updating-outbound-gateway request-channel="namedQueryRequestChannel"
    reply-channel="namedQueryResponseChannel"
    named-query="updateStudentByRollNumber"
    entity-manager="em">
    <int-jpa:parameter name="lastName" expression="payload"/>
    <int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:outbound-gateway>

你可以找到一个完整的示例应用程序,它使用 Spring Integration 的 JPA 适配器 here

You can find a complete sample application that uses Spring Integration’s JPA adapter here.