R2DBC Support

Spring Integration 提供通道适配器,用于通过 R2DBC 驱动程序使用对数据库的响应式访问接收和发送消息。

Spring Integration provides channel adapters for receiving and sending messages by using reactive access to databases via R2DBC drivers.

你需要将此依赖项包含在你的项目中:

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:{project-version}"

R2DBC Inbound Channel Adapter

R2dbcMessageSource 是一个基于 R2dbcEntityOperations 的可轮询 MessageSource 实现,并且根据 expectSingleResult 选项为从数据库获取的数据生成包含 FluxMono 作为有效负载的消息。SELECT 查询可以静态提供,也可以基于在每次 receive() 调用时求值的 SpEL 表达式。R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,以允许使用 StatementMapper.SelectSpec 流利 API。默认情况下,此通道适配器将选定的记录映射到 LinkedCaseInsensitiveMap 实例中。它可以通过提供 payloadType 选项进行自定义,该选项由基于 this.r2dbcEntityOperations.getConverter()EntityRowMapper 在内部使用。updateSql 是可选的,用于标记数据库中的已读记录,以便从后续轮询中跳过。UPDATE 操作可以用 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> 提供,以根据 SELECT 结果中的记录将值绑定到 UPDATE 中。

The R2dbcMessageSource is a pollable MessageSource implementation based on the R2dbcEntityOperations and produces messages with a Flux or Mono as a payload for data fetched from database according an expectSingleResult option. The query to SELECT can be statically provided or based on a SpEL expression which is evaluated on every receive() call. The R2dbcMessageSource.SelectCreator is present as a root object for evaluation context to allow to use a StatementMapper.SelectSpec fluent API. By default, this channel adapter maps records from the select into a LinkedCaseInsensitiveMap instances. It can be customized providing a payloadType options which is used underneath by the EntityRowMapper based on the this.r2dbcEntityOperations.getConverter(). The updateSql is optional and used to mark read records in the databased for skipping from the subsequent polls. The UPDATE operation can be supplied with a BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> to bind values into an UPDATE based on records in the SELECT result.

此通道适配器的典型配置可能如下所示:

A typical configuration for this channel adapter might look like this:

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下:

With Java DSL a configuration for this channel adapter is like this:

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC Outbound Channel Adapter

R2dbcMessageHandler 是一个 ReactiveMessageHandler 实现,用于执行数据库中的 INSERT(默认)、UPDATEDELETE 查询,并使用提供的 R2dbcEntityOperationsR2dbcMessageHandler.Type 可以静态配置,也可以针对请求消息通过 SpEL 表达式配置。要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)将整个消息有效负载视为应针对其执行 SQL 的 org.springframework.data.relational.core.mapping.Table 实体。包 org.springframework.data.relational.core.query 已注册为 SpEL 求值上下文中的导入包,可以直接访问 Criteria fluent API,该 API 用于 UPDATEDELETE 查询。valuesExpression 用于 INSERTUPDATE,必须对其求值为用于针对请求消息执行目标表中更改的列-值对的 Map

The R2dbcMessageHandler is a ReactiveMessageHandler implementation to perform an INSERT (default), UPDATE or DELETE query in database using a provided R2dbcEntityOperations. The R2dbcMessageHandler.Type can be configured statically or via a SpEL expression against request message. The query to execute can be based on the tableName, values and criteria expression options or (if tableName is not provided) the whole message payload is treated as an org.springframework.data.relational.core.mapping.Table entity to perform SQL against. The package org.springframework.data.relational.core.query is registered as an import into a SpEL evaluation context for direct access to the Criteria fluent API which is used for UPDATE and DELETE queries. The valuesExpression is used in the INSERT and UPDATE and must be evaluated to the Map for column-value pairs to perform a change in the target table against request message.

此通道适配器的典型配置可能如下所示:

A typical configuration for this channel adapter might look like this:

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下:

With Java DSL a configuration for this channel adapter is like this:

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))