JDBC Message Store

Spring 集成提供了两个特定于 JDBC 的消息存储实现。JdbcMessageStore 适用于合计器和索赔检查模式。JdbcChannelMessageStore 实现提供了专门用于消息通道、有针对性和更具可扩展性的实现。 请注意,您可以使用 JdbcMessageStore 作为消息通道的后台,JdbcChannelMessageStore 已针对该目的进行了优化。

从 5.0.11、5.1.2 版本开始,`JdbcChannelMessageStore`的索引已得到优化。如果您在这样的存储中具有大型消息组,您可能希望更改索引。此外,`PriorityChannel`的索引已注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。

使用`OracleChannelMessageStoreQueryProvider`时,优先级通道索引*must*将被添加,因为它包含在查询的提示中。

Initializing the Database

在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象来提供一个目标数据库。

Spring 集成附带了一些示例脚本,可用于初始化数据库。在 spring-integration-jdbc JAR 文件中,可以在 org.springframework.integration.jdbc 包中找到脚本。它针对一系列常见的数据库平台提供了一个示例创建脚本和一个示例删除脚本。使用这些脚本的一种常用方法是引用 Spring JDBC data source initializer 中的这些脚本。请注意,这些脚本作为示例和所需表名和列名的规范提供。你可能会发现你需要对它们进行增强以用于生产(例如,通过添加索引声明)。

从第 6.2 版开始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 实现 SmartLifecycle,并在 start() 方法中针对各自的表执行 SELECT COUNT 查询,以确保在目标数据库中存在所需的表(根据提供的前缀)。如果所需的表不存在,则应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false) 禁用此项检查。

The Generic JDBC Message Store

JDBC 模块提供了由数据库支持的 Spring Integration MessageStore(在索取检查模式中很重要)和 MessageGroupStore(在诸如聚合器等有状态模式中很重要)的实现。这两个接口都由 JdbcMessageStore 实现,并且支持以 XML 配置存储实例,如下面的示例所示:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定 JdbcTemplate 而不是 DataSource

以下示例显示了一些其他可选属性:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前述示例中,我们指定了一个 LobHandler 来将消息作为大对象进行处理(这对于 Oracle 来说通常非常必要),并指定了存储中查询生成的表名的前缀。表名前缀默认为 INT_

Backing Message Channels

如果您打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore 实现。它只能与消息通道配合使用。

Supported Databases

JdbcChannelMessageStore 使用特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。此 channelMessageStoreQueryProvider 提供针对您指定的特定数据库的 SQL 查询。Spring Integration 为以下关系型数据库提供支持:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果没有列出您的数据库,您可以实现 ChannelMessageStoreQueryProvider 接口并提供您自己的自定义查询。

版本 4.0 向表中添加了 MESSAGE_SEQUENCE 列,以确保即使消息存储在同一毫秒内,也能按照先进先出 (FIFO) 排列顺序队列。

从第 6.2 版开始,ChannelMessageStoreQueryProvider 公开一个 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,且其用于轮询的查询现基于一条 DELETE…​RETURNING 语句。JdbcChannelMessageStore 咨询 isSingleStatementForPoll 选项,如果只支持单轮询语句,则跳过单独的 DELETE 语句。

Custom Message Insertion

自版本 5.0 起,通过重载 ChannelMessageStorePreparedStatementSetter 类,你可以在 JdbcChannelMessageStore 中提供一条自定义实现用于消息插入。你可以使用它来设置不同的列或更改表结构或序列化策略。例如,你可以将其结构作为 JSON 字符串存储,而不是默认序列化为 byte[]

以下示例使用 setValues 的默认实现来存储公共列,并覆盖存储消息负载为 varchar 的行为:

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系数据库进行排队。如果您可能的话,可考虑使用 JMS 或 AMQP 支持的通道。有关更多参考,请参见以下资源:

如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,其在 {in a subsequent section} 中进行了描述。

Concurrent Polling

轮询消息通道时,您可以选择使用 TaskExecutor 引用配置关联的 Poller

但请记住,如果你使用 JDBC 支持的消息通道,并且你计划轮询该通道,并因此以事务方式使用多线程的消息存储,则应确保使用支持 Multiversion Concurrency Control (MVCC)的关系数据库。否则,锁定可能是一个问题,在使用多线程时,性能可能无法达到预期。例如,Apache Derby 在这方面存在问题。 要实现更好的 JDBC 队列吞吐量,并在不同线程可能从队列轮询同一 Message 时避免问题,在使用不支持 MVCC 的数据库时,将 JdbcChannelMessageStoreusingIdCache 属性设置为 true 非常重要。以下示例显示了如何进行此项设置:

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

Priority Channel

从版本 4.0 开始,JdbcChannelMessageStore 实现 PriorityCapableChannelMessageStore,并提供 priorityEnabled 选项,允许将其用作 priority-queue 实例的 message-store 引用。为此,INT_CHANNEL_MESSAGE 表有一个 MESSAGE_PRIORITY 列来存储 PRIORITY 消息标头的值。此外,新的 MESSAGE_SEQUENCE 列使我们能够实现稳定的先进先出 (FIFO) 轮询机制,即使在同一毫秒内存储了具有相同优先级的多条消息时也是如此。按照 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 从数据库中轮询(选择)消息。

我们不建议对优先级和非优先级队列通道使用相同的`JdbcChannelMessageStore` bean,因为 priorityEnabled 选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。但是,同一`INT_CHANNEL_MESSAGE`表(甚至是`region`)都可以用于两种`JdbcChannelMessageStore`类型。要配置该场景,您可以从另一个消息存储 bean 扩展一个,如下面的示例所示:

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

Partitioning a Message Store

通常将 {JdbcMessageStore} 作为同一应用程序中应用程序或节点组的全局存储。为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许将表格以两种方式进行分区。一种方法是通过更改前缀(如 {described earlier})来使用单独的表格名称。另一种方法是指定 {region} 名称来对单一表格内的分区数据进行分区。使用第二种方法的一个重要用例是,当 {MessageStore} 正在管理作为 Spring Integration 消息通道的后备的持久队列时。持久通道的消息数据在存储中以通道名称为键。因此,如果通道名称并非全局唯一,通道将获取不针对它们的数据。为了避免这种危险,您可以使用消息存储 {region} 来为拥有相同逻辑名称的不同物理通道保留单独的数据。

PostgreSQL: Receiving Push Notifications

PostgreSQL 提供了一个侦听和通知框架,用于在对数据库表进行操作时接收推送通知。Spring Integration 利用此机制(从版本 6.0 开始)允许在向 JdbcChannelMessageStore 添加新消息时接收推送通知。在使用此功能时,必须定义一个数据库触发器,它可以在包含在 Spring Integration JDBC 模块中的 schema-postgresql.sql 文件的注释部分找到。

推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在为任何给定的 regiongroupId 接收新消息时接收回调。即使在不同的 JVM 上追加消息,但对于同一个数据库,也会收到这些通知。PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约,以响应上述 PostgresChannelMessageTableSubscriber 的通知从存储区提取消息。

例如,可以按如下方式接收 some group 的推送通知:

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

交易支持

从 6.0.5 版开始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 将在事务中通知订阅者。订阅器中的异常将导致事务回滚并消息将被放回消息存储区。默认情况下,事务支持未激活。

重试

从 6.0.5 版开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。默认情况下,不执行任何重试。

任何 active PostgresChannelMessageTableSubscriber 在其活动生命周期内占用 exclusive JDBC Connection。因此,此连接不能来自池化 DataSource 非常重要。此类连接池通常希望在预定义的超时窗口内关闭已发出的连接。 对于此 exclusive 连接的需求,还建议一个 JVM 仅运行一个 PostgresChannelMessageTableSubscriber,该 subscriber 可用来注册任意数量的订阅项。