R2DBC Support

Spring Integration 提供通道适配器,用于通过 R2DBC 驱动程序使用对数据库的响应式访问接收和发送消息。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:{project-version}"

R2DBC Inbound Channel Adapter

R2dbcMessageSource 是一个基于 R2dbcEntityOperations 的可轮询 MessageSource 实现,并且根据 expectSingleResult 选项为从数据库获取的数据生成包含 FluxMono 作为有效负载的消息。SELECT 查询可以静态提供,也可以基于在每次 receive() 调用时求值的 SpEL 表达式。R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,以允许使用 StatementMapper.SelectSpec 流利 API。默认情况下,此通道适配器将选定的记录映射到 LinkedCaseInsensitiveMap 实例中。它可以通过提供 payloadType 选项进行自定义,该选项由基于 this.r2dbcEntityOperations.getConverter()EntityRowMapper 在内部使用。updateSql 是可选的,用于标记数据库中的已读记录,以便从后续轮询中跳过。UPDATE 操作可以用 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> 提供,以根据 SELECT 结果中的记录将值绑定到 UPDATE 中。

此通道适配器的典型配置可能如下所示:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下:

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC Outbound Channel Adapter

R2dbcMessageHandler 是一个 ReactiveMessageHandler 实现,用于执行数据库中的 INSERT(默认)、UPDATEDELETE 查询,并使用提供的 R2dbcEntityOperationsR2dbcMessageHandler.Type 可以静态配置,也可以针对请求消息通过 SpEL 表达式配置。要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)将整个消息有效负载视为应针对其执行 SQL 的 org.springframework.data.relational.core.mapping.Table 实体。包 org.springframework.data.relational.core.query 已注册为 SpEL 求值上下文中的导入包,可以直接访问 Criteria fluent API,该 API 用于 UPDATEDELETE 查询。valuesExpression 用于 INSERTUPDATE,必须对其求值为用于针对请求消息执行目标表中更改的列-值对的 Map

此通道适配器的典型配置可能如下所示:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下:

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))