Inbound Channel Adapter

入站通道适配器的主要功能是执行 SQL {SELECT} 查询,并将结果转换成消息。消息有效负载是整个结果集(表示为 {List}),而列表中项的类型取决于行映射策略。默认策略是一个常规映射器,为查询结果中的每一行返回一个 {Map}。您也可以通过添加对 {RowMapper} 实例的引用来更改此内容(有关行映射的更详细信息,请参见 { Spring JDBC} 文档)。

The main function of an inbound channel adapter is to execute a SQL SELECT query and turn the result set into a message. The message payload is the whole result set (expressed as a List), and the types of the items in the list depend on the row-mapping strategy. The default strategy is a generic mapper that returns a Map for each row in the query result. Optionally, you can change this by adding a reference to a RowMapper instance (see the Spring JDBC documentation for more detailed information about row mapping).

如果你想将 SELECT 查询结果中的行转换为单个消息,则可以使用下游 splitter。

If you want to convert rows in the SELECT query result to individual messages, you can use a downstream splitter.

入站适配器还需要一个对 JdbcTemplate 实例或 DataSource 的引用。

The inbound adapter also requires a reference to either a JdbcTemplate instance or a DataSource.

除了生成消息的 SELECT 语句外,适配器还有一个 UPDATE 语句,该语句将记录标记为已处理,以便它们不会出现在下一次轮询中。可以使用原始选择中 ID 列表对更新进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id 的列会转换为更新的参数映射中的列表,该列表称为 id)。以下示例定义了一个带有更新查询和 DataSource 引用的入站通道适配器。

As well as the SELECT statement to generate the messages, the adapter also has an UPDATE statement that marks the records as processed so that they do not show up in the next poll. The update can be parameterized by the list of IDs from the original select. By default, this is done through a naming convention (a column in the input result set called id is translated into a list in the parameter map for the update called id). The following example defines an inbound channel adapter with an update query and a DataSource reference.

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />

更新查询中的参数使用冒号(:)前缀指定参数的名称(在前一个示例中,这是一个应用于轮询结果集中每个行的表达式)。这是 Spring JDBC 中的命名参数 JDBC 支持的标准功能,结合了 Spring 集成中采用的约定(投影到轮询结果列表上)。底层 Spring JDBC 功能限制了可用的表达式(例如,除了句点之外的大多数特殊字符都不允许),但由于目标通常是可以通过 bean 路径寻址的对象列表(可能是单个对象的列表),因此不会造成不适当的限制。

The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (for example, most special characters other than a period are disallowed), but since the target is usually a list of objects (possibly a list of one) that are addressable by bean paths this is not unduly restrictive.

要更改参数生成策略,您可以将 SqlParameterSourceFactory 注入到适配器中以覆盖默认行为(适配器具有 sql-parameter-source-factory 属性)。Spring Integration 提供 ExpressionEvaluatingSqlParameterSourceFactory,它创建一个基于 SpEL 的参数源,查询结果作为 #root 对象。(如果 update-per-row 为 true,根对象是该行)。如果同一参数名称在更新查询中多次出现,则仅对其求值一次,并缓存其结果。

To change the parameter generation strategy, you can inject a SqlParameterSourceFactory into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory attribute). Spring Integration provides ExpressionEvaluatingSqlParameterSourceFactory, which creates a SpEL-based parameter source, with the results of the query as the #root object. (If update-per-row is true, the root object is the row). If the same parameter name appears multiple times in the update query, it is evaluated only once, and its result is cached.

你也可以为 select 查询使用参数源。在这种情况下,由于没有可以评估的 “result” 对象,因此每次都使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,你可以使用 Spring 来创建一个基于 SpEL 的参数源,如下例所示:

You can also use a parameter source for the select query. In this case, since there is no “result” object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source, as the following example shows:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的 value 可能是任何有效的 SpEL 表达式。用于表达式求值的 #root 对象是从 parameterSource bean 中定义的构造函数参数。它对于所有求值都是静态的(在前面的示例中,是一个空 String)。

The value in each parameter expression can be any valid SpEL expression. The #root object for the expression evaluation is the constructor argument defined on the parameterSource bean. It is static for all evaluations (in the preceding example, an empty String).

从 5.0 版本开始,你可以通过 sqlParameterTypesExpressionEvaluatingSqlParameterSourceFactory 提供指定特定参数的目标 SQL 类型。

Starting with version 5.0, you can supply ExpressionEvaluatingSqlParameterSourceFactory with sqlParameterTypes to specify the target SQL type for the particular parameter.

下面的示例提供了在查询中使用的参数的 SQL 类型:

The following example provides SQL types for the parameters being used in the query:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>

使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存评估结果。另外请注意,由于 禁用了缓存,如果同一个参数名称在 select 查询中出现多次,则会对每次出现重新评估。

Use the createParameterSourceNoCache factory method. Otherwise, the parameter source caches the result of the evaluation. Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it is re-evaluated for each occurrence.

Polling and Transactions

入站适配器接受一个常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。JDBC 用法的轮询器的一个重要功能是将轮询操作包装到事务中的选项,如下例所示:

The inbound adapter accepts a regular Spring Integration poller as a child element. Consequently, the frequency of the polling can be controlled (among other uses). An important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, as the following example shows:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>

如果你没有明确指定一个轮询器,则使用默认值。与 Spring 集成一样,它可以定义为一个顶级 bean)。

If you do not explicitly specify a poller, a default value is used. As is normal with Spring Integration, it can be defined as a top-level bean).

在前面的示例中,数据库每 1000 毫秒(或每秒一次)进行轮询,并将在同一事务中执行更新和选择查询。未显示事务管理器配置。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),以便端点在同一个线程中调用,因此也是同一个事务。通过这种方式,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。

In the preceding example, the database is polled every 1000 milliseconds (or once a second), and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown. However, as long as it is aware of the data source, the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.

max-rows Versus max-messages-per-poll

JDBC 入站通道适配器定义了一个名为 max-rows 的属性。在你指定适配器的轮询器时,你还可以定义一个名为 max-messages-per-poll 的属性。虽然这两个属性看起来很相似,但它们的含义却截然不同。

The JDBC inbound channel adapter defines an attribute called max-rows. When you specify the adapter’s poller, you can also define a property called max-messages-per-poll. While these two attributes look similar, their meaning is quite different.

max-messages-per-poll 指定每个轮询间隔执行查询的次数,而 max-rows 指定每次执行返回的行数。

max-messages-per-poll specifies the number of times the query is executed per polling interval, whereas max-rows specifies the number of rows returned for each execution.

在正常情况下,当您使用 JDBC 入站通道适配器时,您可能不想设置轮询器的 {max-messages-per-poll} 属性。它的默认值为 {1},这意味着 JDBC 入站通道适配器的 { receive()} 方法对于每个轮询间隔都执行一次。

Under normal circumstances, you would likely not want to set the poller’s max-messages-per-poll property when you use the JDBC inbound channel adapter. Its default value is 1, which means that the JDBC inbound channel adapter’s receive() method is executed exactly once for each poll interval.

将 {max-messages-per-poll} 属性设置为一个更大的值,意味着查询会连续执行这么多次。有关 {max-messages-per-poll} 属性的更多信息,请参见 {Configuring An Inbound Channel Adapter}。

Setting the max-messages-per-poll attribute to a larger value means that the query is executed that many times back to back. For more information regarding the max-messages-per-poll attribute, see Configuring An Inbound Channel Adapter.

相比之下,如果 max-rows 属性大于 0,则指定由 receive() 方法创建的查询结果集中要使用的最大行数。如果该属性设置为 0,则所有行都包含在结果消息中。该属性默认为 0

In contrast, the max-rows attribute, if greater than 0, specifies the maximum number of rows to be used from the query result set created by the receive() method. If the attribute is set to 0, all rows are included in the resulting message. The attribute defaults to 0.

建议使用供应商特定查询选项(例如 MySQL LIMIT、SQL Server TOP 或 Oracle’s ROWNUM)限制结果集。有关更多信息,请参阅特定供应商文档。

It is recommended to use result set limiting via vendor-specific query options, for example MySQL LIMIT or SQL Server TOP or Oracle’s ROWNUM. See the particular vendor documentation for more information.