Data Access with R2DBC

R2DBC(“反应式关系数据库连接”)是一项社区驱动的规范工作,旨在使用反应式模式标准化对 SQL 数据库的访问。

Package Hierarchy

Spring 框架的 R2DBC 抽象框架包含两个不同的包:

Using the R2DBC Core Classes to Control Basic R2DBC Processing and Error Handling

本节介绍如何使用 R2DBC 核心类来控制基本的 R2DBC 处理,包括错误处理。它包括以下主题:

Using DatabaseClient

DatabaseClient 是 R2DBC 核心包中的核心类。它处理资源的创建和释放,这有助于避免常见错误,例如忘记关闭连接。它执行 R2DBC 核心工作流的基本任务(例如语句创建和执行),让应用程序代码提供 SQL 并提取结果。DatabaseClient 类:

  • Runs SQL queries

  • 更新语句和存储过程调用

  • Result 实例执行迭代

  • 捕获 R2DBC 异常,并将其转换为定义在 org.springframework.dao 包中的一般、更具信息性的异常层次结构。(请参见 Consistent Exception Hierarchy。)

该客户端拥有一个函数式流畅式 API,它使用反应式类型进行声明性组合。

当您为您的代码使用 DatabaseClient 时,只需实现 java.util.function 接口即可,给它们一个明确定义的契约。给定由 DatabaseClient 类提供的 Connection,一个 Function 回调会创建一个 Publisher。映射函数同样如此,它会提取 Row 结果。

您可以直接实例化一个 DatabaseClient 引用,在 DAO 实现中使用它,也可以在一个 Spring IoC 容器中配置它,并给它一个 bean 引用。

创建 DatabaseClient 对象最简单的方法是使用一个静态工厂方法,如下所示:

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)

ConnectionFactory 应始终配置为 Spring IoC 容器中的 bean。

前述的方法会创建一个带有默认设置的 DatabaseClient

您还可以从 DatabaseClient.builder() 获取一个 Builder 实例。您可以通过调用以下方法来自定义客户端:

  • ….bindMarkers(…):提供特定的 BindMarkersFactory 来配置 namedparameter 与数据库绑定标记翻译。

  • ….executeFunction(…):设置 ExecuteFunction 用于运行 Statement 对象。

  • ….namedParameters(false):禁用命名参数扩展。默认情况下启用。

方言由 BindMarkersFactoryResolverConnectionFactory 中解析,通常通过检查 ConnectionFactoryMetadata。您可以通过注册一个通过 META-INF/spring.factories 实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的类,让 Spring 自动发现您的 BindMarkersFactoryBindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供程序实现。

目前支持的数据库有:

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

这个类发出的所有 SQL 都会被记录在对应于客户端实例的全限定类名的类别下的 DEBUG 级别(通常为 DefaultDatabaseClient)。此外,每个执行都会在反应式序列中注册一个检查点,以帮助调试。

以下各节提供了一些 DatabaseClient 使用示例。这些示例不是 DatabaseClient 公开的全部功能的详尽列表。请参阅附带的 javadoc

Executing Statements

DatabaseClient 提供能运行语句的基本功能。以下示例展示了创建新表所需的最低限度但完全具备功能的代码:

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient 专为方便、流畅的使用而设计。它在执行规范的每个阶段都暴露中间提交、继续和终端方法。上述示例使用 then() 返回一个一旦查询(如果 SQL 查询包含多条语句,则为多个查询)完成就会完成的完成 Publisher

execute(…) 接受 SQL 查询字符串或查询 Supplier<String> 以延迟实际查询创建,直到执行。

Querying (SELECT)

SQL 查询可以通过 Row 对象或受影响的行数来返回值。DatabaseClient 可返回更新的行数或行本身,具体取决于发出的查询。

以下查询从表中获取 idname 列:

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

以下查询使用绑定变量:

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()

您可能注意到了上述示例中 fetch() 的用法。fetch() 是一个继续运算符,它可让您指定需要使用多少数据。

调用 first() 会从结果中返回第一行,并丢弃剩余行。您可以使用以下运算符来使用数据:

  • first() 返回整个结果的第一行。其 Kotlin Coroutine 变体名为 awaitSingle(),用于不可为空的返回值,而 awaitSingleOrNull() 用于可选的值。

  • one() 准确返回一个结果,如果结果包含更多行,则失败。使用 Kotlin Coroutine,对于一个值使用 awaitOne(),或对于可选值使用 awaitOneOrNull()

  • all() 返回结果的所有行。使用 Kotlin Coroutine 时,使用 flow()

  • rowsUpdated() 返回受影响的行数(INSERT/UPDATE/DELETE`计数)。其 Kotlin Coroutine 变体名为 `awaitRowsUpdated()

在不指定更多映射细节的情况下,查询会将表格结果作为 Map 返回,它的键是不区分大小写的列名,它们映射到各自的列值。

您可以为每个 Row 提供 Function<Row, T> 控制结果映射,以便它可以返回任意值(单值、集合和映射以及对象)。

以下示例提取 name 列并发出其值:

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()

或者,有一个映射到单个值的快捷方式:

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

或者,您可以映射到具有 bean 属性或记录组件的结果对象:

	// assuming a name property on Person
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
What about null?

关系数据库结果可能包含 null 值。Reactive Streams 规范禁止发出 null 值。此要求强制在提取函数中正确处理 null。虽然您可以从 Row 获取 null 值,但不得发出 null 值。您必须将任何 null 值包装在对象中(例如,Optional 用于单值)以确保您的提取函数绝不会直接返回 null 值。

Updating (INSERT, UPDATE, and DELETE) with DatabaseClient

修改语句的唯一区别在于这些语句通常不返回表格数据,因此您使用 rowsUpdated() 来使用结果。

以下示例显示返回更新的行数的 UPDATE 语句:

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

Binding Values to Queries

典型的应用程序需要根据一些输入来选择或更新行的参数化 SQL 语句。这些通常是受 WHERE 从句约束的 SELECT 语句,或者接受输入参数的 INSERTUPDATE 语句。如果参数未正确转义,参数化语句会面临 SQL 注入的风险。DatabaseClient 利用 R2DBC 的 bind API 消除查询参数的 SQL 注入风险。您可以使用 execute(…) 运算符提供参数化 SQL 语句,并将参数绑定到实际的 Statement。然后,您的 R2DBC 驱动程序将使用准备好的语句和参数替换来运行该语句。

参数绑定支持两种绑定策略:

  • 通过索引,使用基于零的参数索引。

  • 通过名称,使用占位符名称。

以下示例显示查询的参数绑定:

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind("id", "joe")
	    	.bind("name", "Joe")
			.bind("age", 34);

或者,您可以传入名称和值映射:

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

或者,您可以传入具有 bean 属性或记录组件的参数对象:

	// assuming id, name, age properties on Person
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);
R2DBC Native Bind Markers

R2DBC 使用数据库本机绑定标记,这些标记取决于实际的数据库供应商。例如,Postgres 使用索引标记,例如 $1$2$n。另一个示例是 SQL Server,它使用以 @ 为前缀的命名绑定标记。 这与 JDBC 不同, JDBC 要求 ? 作为绑定标记。在 JDBC 中,实际的驱动程序会将 ? 绑定标记转换为数据库本机标记,作为其语句执行的一部分。 Spring Framework 的 R2DBC 支持允许您使用本机绑定标记或使用 :name 语法命名的绑定标记。 命名参数支持利用 BindMarkersFactory 实例在查询执行时将命名参数扩展到本机绑定标记,这为您提供了跨各种数据库供应商的特定程度的查询可移植性。

查询预处理器将命名的 Collection 参数展开到一系列绑定标记中,以消除根据参数数量进行动态查询创建的需要。嵌套对象数组被展开,以允许使用(例如)选择列表。

请考虑以下查询:

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

前面的查询可以参数化并如下运行:

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples)

选择列表的使用取决于供应商。

以下示例展示了使用 IN 谓词的一个更简单的变体:

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", arrayOf(35, 50))

R2DBC 本身不支持类集合值。然而,在上面的示例中展开给定的 List 可以用于 Spring 的 R2DBC 支持中的已命名参数,例如,用于 IN 子句(如上所示)。然而,插入或更新数组类型列(例如在 Postgres 中)需要由底层 R2DBC 驱动支持的数组类型:通常是 Java 数组,例如 String[] 用于更新 text[] 列。不要传递 Collection<String> 或类似内容作为数组参数。

Statement Filters

有时你需要对实际的 Statement 进行微调,然后再运行它。要做到这一点,请向 DatabaseClient 注册一个 Statement 过滤器(StatementFilterFunction)以拦截和修改执行中的语句,如下例所示:

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
	    .bind("name", …)
	    .bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient 还公开了一个简化的 filter(…​) 重载,它接受一个 Function<Statement, Statement>

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
	    .filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
	    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction 实现允许过滤 Statement 和过滤 Result 对象。

DatabaseClient Best Practices

一旦配置好,DatabaseClient 类的实例就会变成线程安全的。这很重要,因为它意味着你可以配置 DatabaseClient 的单个实例,然后将此共享引用安全地注入到多个 DAO(或存储库)中。DatabaseClient 是有状态的,因为它会维护到 ConnectionFactory 的引用,但此状态不是对话状态。

在使用 DatabaseClient 类时,一种常见做法是在 Spring 配置文件中配置一个 ConnectionFactory,然后将共享的 ConnectionFactory Bean 依赖注入到 DAO 类中。DatabaseClientConnectionFactory 的 setter 中创建。这样就导致了类似以下内容的 DAO:

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

显式配置的替代方法是使用组件扫描和注释来依赖注入。在这种情况下,你可以使用 @Component 注释类(这使其成为组件扫描的候选者),并使用 @Autowired 注释 ConnectionFactory setter 方法。以下示例演示了如何执行此操作:

Java
@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 注释类。
2 使用 @Autowired 注释 ConnectionFactory 设置方法。
3 使用 ConnectionFactory 创建新的 DatabaseClient
Kotlin
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
4 使用 @Component 注释类。
5 构造函数注入 ConnectionFactory
6 使用 ConnectionFactory 创建新的 DatabaseClient

无论选择使用哪种上述模板初始化样式(或不使用),通常不必在每次想要运行 SQL 时都创建一个新的 DatabaseClient 类实例。一旦配置好,DatabaseClient 实例就是线程安全的。如果你的应用程序访问多个数据库,你可能需要多个 DatabaseClient 实例,这需要多个 ConnectionFactory 以及多个不同配置的 DatabaseClient 实例。

Retrieving Auto-generated Keys

当将行插入定义了自动增量或标识列的表时,INSERT 语句可能会生成键。若要完全控制要生成的列名,只需注册一个请求所需列的生成键的 StatementFilterFunction

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// generatedId emits the generated key once the INSERT statement has finished

Controlling Database Connections

本部分介绍:

Using ConnectionFactory

Spring 通过 ConnectionFactory 获取到与数据库的 R2DBC 连接。ConnectionFactory 是 R2DBC 规范的一部分,是一个通用的驱动程序入口点。它允许容器或框架将连接池和事务管理问题隐藏在应用程序代码中。作为一名开发者,你不需要了解有关如何连接到数据库的详细信息。这是设置 ConnectionFactory 的管理员的责任。在开发和测试代码时,你很可能同时扮演这两个角色,但你不一定需要知道生成数据源是如何配置的。

当你使用 Spring 的 R2DBC 层时,你可以使用第三方提供的连接池实现来配置自己的连接池。一个流行的实现是 R2DBC Pool(r2dbc-pool)。Spring 发行版中的实现仅用于测试目的,不提供池。

若要配置 ConnectionFactory

  1. 使用 ConnectionFactory 获取连接,如通常获取 R2DBC ConnectionFactory 一样。

  2. 提供 R2DBC URL(有关正确值,请参见驱动程序的文档)。

以下示例演示了如何配置 ConnectionFactory

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

Using ConnectionFactoryUtils

ConnectionFactoryUtils 类是一个方便而强大的帮助类,它提供了 静态 方法,可从 ConnectionFactory 获取连接(必要时)并关闭连接。

它支持使用 R2dbcTransactionManager 等订阅者 Context 绑定的连接。

Using SingleConnectionFactory

SingleConnectionFactory 类是 DelegatingConnectionFactory 接口的一个实现,它包装了一个每次使用后都不关闭的 Connection

如果任何客户端代码调用 close 以假设池化连接(例如在使用持久性工具时),则应将 suppressClose 属性设置为 true。此设置会返回一个关闭抑制代理,该代理包装物理连接。请注意,你不能再将其强制转换为本机 Connection 或类似对象。

SingleConnectionFactory 主要是一个测试类,如果你允许 R2DBC 驱动程序使用,它可用于特殊要求,如流水线。与池化 ConnectionFactory 相比,它始终重用同一连接,从而避免过多创建物理连接。

Using TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy 是目标 ConnectionFactory 的代理。代理封装目标 ConnectionFactory 以添加对 Spring 管理的事务的感知。

如果你使用的 R2DBC 客户端未与 Spring 的 R2DBC 支持集成,则需要使用此类。在这种情况下,你仍然可以使用此客户端,同时让此客户端参与 Spring 管理的事务中。通常最好将 R2DBC 客户端与正确访问 ConnectionFactoryUtils 进行资源管理集成。

请参阅 TransactionAwareConnectionFactoryProxy javadoc 了解更多详细信息。

Using R2dbcTransactionManager

R2dbcTransactionManager 类是一个 ReactiveTransactionManager 实现,用于单个 R2DBC ConnectionFactory。它将 R2DBC Connection 从指定的 ConnectionFactory 绑定到订户 Context,可能允许每个 ConnectionFactory 有一个订户 Connection

应用程序代码需要通过 ConnectionFactoryUtils.getConnection(ConnectionFactory) 而不是 R2DBC 标准的 ConnectionFactory.create() 来检索 R2DBC Connection。所有框架类(例如 DatabaseClient)都隐式使用此策略。如果未与事务管理器一起使用,则查找策略的行为与 ConnectionFactory.create() 完全相同,因此在任何情况下都可以使用。