Inbound Channel Adapter

入站通道适配器的主要功能是执行 SQL {SELECT} 查询,并将结果转换成消息。消息有效负载是整个结果集(表示为 {List}),而列表中项的类型取决于行映射策略。默认策略是一个常规映射器,为查询结果中的每一行返回一个 {Map}。您也可以通过添加对 {RowMapper} 实例的引用来更改此内容(有关行映射的更详细信息,请参见 { Spring JDBC} 文档)。

如果你想将 SELECT 查询结果中的行转换为单个消息,则可以使用下游 splitter。

入站适配器还需要一个对 JdbcTemplate 实例或 DataSource 的引用。 除了生成消息的 SELECT 语句外,适配器还有一个 UPDATE 语句,该语句将记录标记为已处理,以便它们不会出现在下一次轮询中。可以使用原始选择中 ID 列表对更新进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id 的列会转换为更新的参数映射中的列表,该列表称为 id)。以下示例定义了一个带有更新查询和 DataSource 引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />

更新查询中的参数使用冒号(:)前缀指定参数的名称(在前一个示例中,这是一个应用于轮询结果集中每个行的表达式)。这是 Spring JDBC 中的命名参数 JDBC 支持的标准功能,结合了 Spring 集成中采用的约定(投影到轮询结果列表上)。底层 Spring JDBC 功能限制了可用的表达式(例如,除了句点之外的大多数特殊字符都不允许),但由于目标通常是可以通过 bean 路径寻址的对象列表(可能是单个对象的列表),因此不会造成不适当的限制。

要更改参数生成策略,您可以将 SqlParameterSourceFactory 注入到适配器中以覆盖默认行为(适配器具有 sql-parameter-source-factory 属性)。Spring Integration 提供 ExpressionEvaluatingSqlParameterSourceFactory,它创建一个基于 SpEL 的参数源,查询结果作为 #root 对象。(如果 update-per-row 为 true,根对象是该行)。如果同一参数名称在更新查询中多次出现,则仅对其求值一次,并缓存其结果。 你也可以为 select 查询使用参数源。在这种情况下,由于没有可以评估的 “result” 对象,因此每次都使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,你可以使用 Spring 来创建一个基于 SpEL 的参数源,如下例所示:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的 value 可能是任何有效的 SpEL 表达式。用于表达式求值的 #root 对象是从 parameterSource bean 中定义的构造函数参数。它对于所有求值都是静态的(在前面的示例中,是一个空 String)。 从 5.0 版本开始,你可以通过 sqlParameterTypesExpressionEvaluatingSqlParameterSourceFactory 提供指定特定参数的目标 SQL 类型。 下面的示例提供了在查询中使用的参数的 SQL 类型:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>

使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存评估结果。另外请注意,由于 禁用了缓存,如果同一个参数名称在 select 查询中出现多次,则会对每次出现重新评估。

Polling and Transactions

入站适配器接受一个常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。JDBC 用法的轮询器的一个重要功能是将轮询操作包装到事务中的选项,如下例所示:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>

如果你没有明确指定一个轮询器,则使用默认值。与 Spring 集成一样,它可以定义为一个顶级 bean)。

在前面的示例中,数据库每 1000 毫秒(或每秒一次)进行轮询,并将在同一事务中执行更新和选择查询。未显示事务管理器配置。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),以便端点在同一个线程中调用,因此也是同一个事务。通过这种方式,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。

max-rows Versus max-messages-per-poll

JDBC 入站通道适配器定义了一个名为 max-rows 的属性。在你指定适配器的轮询器时,你还可以定义一个名为 max-messages-per-poll 的属性。虽然这两个属性看起来很相似,但它们的含义却截然不同。

max-messages-per-poll 指定每个轮询间隔执行查询的次数,而 max-rows 指定每次执行返回的行数。

在正常情况下,当您使用 JDBC 入站通道适配器时,您可能不想设置轮询器的 {max-messages-per-poll} 属性。它的默认值为 {1},这意味着 JDBC 入站通道适配器的 { receive()} 方法对于每个轮询间隔都执行一次。

将 {max-messages-per-poll} 属性设置为一个更大的值,意味着查询会连续执行这么多次。有关 {max-messages-per-poll} 属性的更多信息,请参见 {Configuring An Inbound Channel Adapter}。

相比之下,如果 max-rows 属性大于 0,则指定由 receive() 方法创建的查询结果集中要使用的最大行数。如果该属性设置为 0,则所有行都包含在结果消息中。该属性默认为 0

建议使用供应商特定查询选项(例如 MySQL LIMIT、SQL Server TOP 或 Oracle’s ROWNUM)限制结果集。有关更多信息,请参阅特定供应商文档。