R2DBC Support
Spring Integration 提供通道适配器,用于通过 R2DBC 驱动程序使用对数据库的响应式访问接收和发送消息。
Spring Integration provides channel adapters for receiving and sending messages by using reactive access to databases via R2DBC drivers.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:{project-version}"
R2DBC Inbound Channel Adapter
R2dbcMessageSource
是一个基于 R2dbcEntityOperations
的可轮询 MessageSource
实现,并且根据 expectSingleResult
选项为从数据库获取的数据生成包含 Flux
或 Mono
作为有效负载的消息。SELECT
查询可以静态提供,也可以基于在每次 receive()
调用时求值的 SpEL 表达式。R2dbcMessageSource.SelectCreator
作为评估上下文的根对象存在,以允许使用 StatementMapper.SelectSpec
流利 API。默认情况下,此通道适配器将选定的记录映射到 LinkedCaseInsensitiveMap
实例中。它可以通过提供 payloadType
选项进行自定义,该选项由基于 this.r2dbcEntityOperations.getConverter()
的 EntityRowMapper
在内部使用。updateSql
是可选的,用于标记数据库中的已读记录,以便从后续轮询中跳过。UPDATE
操作可以用 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
提供,以根据 SELECT
结果中的记录将值绑定到 UPDATE
中。
The R2dbcMessageSource
is a pollable MessageSource
implementation based on the R2dbcEntityOperations
and produces messages with a Flux
or Mono
as a payload for data fetched from database according an expectSingleResult
option.
The query to SELECT
can be statically provided or based on a SpEL expression which is evaluated on every receive()
call.
The R2dbcMessageSource.SelectCreator
is present as a root object for evaluation context to allow to use a StatementMapper.SelectSpec
fluent API.
By default, this channel adapter maps records from the select into a LinkedCaseInsensitiveMap
instances.
It can be customized providing a payloadType
options which is used underneath by the EntityRowMapper
based on the this.r2dbcEntityOperations.getConverter()
.
The updateSql
is optional and used to mark read records in the databased for skipping from the subsequent polls.
The UPDATE
operation can be supplied with a BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
to bind values into an UPDATE
based on records in the SELECT
result.
此通道适配器的典型配置可能如下所示:
A typical configuration for this channel adapter might look like this:
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
使用 Java DSL,此通道适配器的配置如下:
With Java DSL a configuration for this channel adapter is like this:
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}
R2DBC Outbound Channel Adapter
R2dbcMessageHandler
是一个 ReactiveMessageHandler
实现,用于执行数据库中的 INSERT
(默认)、UPDATE
或 DELETE
查询,并使用提供的 R2dbcEntityOperations
。R2dbcMessageHandler.Type
可以静态配置,也可以针对请求消息通过 SpEL 表达式配置。要执行的查询可以基于 tableName
、values
和 criteria
表达式选项,或者(如果未提供 tableName
)将整个消息有效负载视为应针对其执行 SQL 的 org.springframework.data.relational.core.mapping.Table
实体。包 org.springframework.data.relational.core.query
已注册为 SpEL 求值上下文中的导入包,可以直接访问 Criteria
fluent API,该 API 用于 UPDATE
和 DELETE
查询。valuesExpression
用于 INSERT
和 UPDATE
,必须对其求值为用于针对请求消息执行目标表中更改的列-值对的 Map
。
The R2dbcMessageHandler
is a ReactiveMessageHandler
implementation to perform an INSERT
(default), UPDATE
or DELETE
query in database using a provided R2dbcEntityOperations
.
The R2dbcMessageHandler.Type
can be configured statically or via a SpEL expression against request message.
The query to execute can be based on the tableName
, values
and criteria
expression options or (if tableName
is not provided) the whole message payload is treated as an org.springframework.data.relational.core.mapping.Table
entity to perform SQL against.
The package org.springframework.data.relational.core.query
is registered as an import into a SpEL evaluation context for direct access to the Criteria
fluent API which is used for UPDATE
and DELETE
queries.
The valuesExpression
is used in the INSERT
and UPDATE
and must be evaluated to the Map
for column-value pairs to perform a change in the target table against request message.
此通道适配器的典型配置可能如下所示:
A typical configuration for this channel adapter might look like this:
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
使用 Java DSL,此通道适配器的配置如下:
With Java DSL a configuration for this channel adapter is like this:
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))