Reactive SQL Clients

Reactive SQL 客户端具有直接的 API,重点是可扩展性和低开销。目前支持以下数据库服务器:

The Reactive SQL Clients have a straightforward API focusing on scalability and low-overhead. Currently, the following database servers are supported:

  • IBM Db2

  • PostgreSQL

  • MariaDB/MySQL

  • Microsoft SQL Server

  • Oracle

Oracle 的 Reactive SQL 客户端被认为是 tech preview

The Reactive SQL Client for Oracle is considered tech preview.

tech preview 模式下,需要及早反馈来完善该理念。在解决方案成熟之前,平台的稳定性无法得到保证。欢迎在我们的 mailing list 或我们的 GitHub issue tracker 中作为问题提供反馈。

In tech preview mode, early feedback is requested to mature the idea. There is no guarantee of stability in the platform until the solution matures. Feedback is welcome on our mailing list or as issues in our GitHub issue tracker.

在本指南中,你将学习如何实现一个简单的 CRUD 应用程序,通过 RESTful API 公开存储在 PostgreSQL 中的数据。

In this guide, you will learn how to implement a simple CRUD application exposing data stored in PostgreSQL over a RESTful API.

可以在本文档的底部找到每个客户端的扩展名和连接池类名。

Extension and connection pool class names for each client can be found at the bottom of this document.

如果你不熟悉 Quarkus Vert.x 扩展,请考虑先阅读 Using Eclipse Vert.x 指南。

If you are not familiar with the Quarkus Vert.x extension, consider reading the Using Eclipse Vert.x guide first.

该应用程序应管理水果实体:

The application shall manage fruit entities:

src/main/java/org/acme/reactive/crud/Fruit.java
package org.acme.reactive.crud;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;

public class Fruit {

    public Long id;

    public String name;

    public Fruit() {
        // default constructor.
    }

    public Fruit(String name) {
        this.name = name;
    }

    public Fruit(Long id, String name) {
        this.id = id;
        this.name = name;
    }
}

Prerequisites

Unresolved directive in reactive-sql-clients.adoc - include::{includes}/prerequisites.adoc[]

如果你在开发模式下启动应用程序,Quarkus 会开箱即用地为你提供一个 zero-config database

If you start the application in dev mode, Quarkus provides you with a zero-config database out of the box.

你还可以预先启动一个数据库:

You might also start a database up front:

docker run -it --rm=true --name quarkus_test -e POSTGRES_USER=quarkus_test -e POSTGRES_PASSWORD=quarkus_test -e POSTGRES_DB=quarkus_test -p 5432:5432 postgres:14.1

Solution

我们建议您遵循接下来的部分中的说明,按部就班地创建应用程序。然而,您可以直接跳到完成的示例。

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

克隆 Git 存储库: git clone {quickstarts-clone-url},或下载 {quickstarts-archive-url}[存档]。

Clone the Git repository: git clone {quickstarts-clone-url}, or download an {quickstarts-archive-url}[archive].

此解决方案位于 getting-started-reactive-crud directory

The solution is located in the getting-started-reactive-crud directory.

Installing

Reactive PostgreSQL Client extension

首先,确保你的项目已启用 quarkus-reactive-pg-client 扩展。如果你正在创建一个新项目,请使用以下命令:

First, make sure your project has the quarkus-reactive-pg-client extension enabled. If you are creating a new project, use the following command:

Unresolved directive in reactive-sql-clients.adoc - include::{includes}/devtools/create-app.adoc[]

如果你已经创建了一个项目,可以使用 add-extension 命令将 reactive-pg-client 扩展添加到现有的 Quarkus 项目:

If you have an already created project, the reactive-pg-client extension can be added to an existing Quarkus project with the add-extension command:

Unresolved directive in reactive-sql-clients.adoc - include::{includes}/devtools/extension-add.adoc[]

否则,你可以手动将依赖项添加到你的构建文件:

Otherwise, you can manually add the dependency to your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-reactive-pg-client</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-reactive-pg-client")

Mutiny

Quarkus REST(以前称为 RESTEasy Reactive)开箱即用地支持 Mutiny 类型(例如 UniMulti)。

Quarkus REST (formerly RESTEasy Reactive) includes supports for Mutiny types (e.g. Uni and Multi) out of the box.

在本指南中,我们将使用 Reactive PostgreSQL 客户端的 Mutiny API。如果你不熟悉 Mutiny,请查看 Mutiny - an intuitive reactive programming library

In this guide, we will use the Mutiny API of the Reactive PostgreSQL Client. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

JSON Binding

我们将在 JSON 格式下通过 HTTP 公开 Fruit 实例。因此,你必须还添加 quarkus-rest-jackson 扩展:

We will expose Fruit instances over HTTP in the JSON format. Consequently, you must also add the quarkus-rest-jackson extension:

Unresolved directive in reactive-sql-clients.adoc - include::{includes}/devtools/extension-add.adoc[]

如果你不想使用命令行,请手动将依赖项添加到你的构建文件:

If you prefer not to use the command line, manually add the dependency to your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-jackson</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-rest-jackson")

当然,这仅仅是本指南的要求,而不是使用 Reactive PostgreSQL 客户端的任何应用程序的要求。

Of course, this is only a requirement for this guide, not any application using the Reactive PostgreSQL Client.

Configuring

Reactive PostgreSQL 客户端可以通过标准的 Quarkus 数据源属性和反应式 URL 进行配置:

The Reactive PostgreSQL Client can be configured with standard Quarkus datasource properties and a reactive URL:

src/main/resources/application.properties
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=quarkus_test
quarkus.datasource.password=quarkus_test
quarkus.datasource.reactive.url=postgresql://localhost:5432/quarkus_test

使用该方法,你可以创建 FruitResource 框架并注入一个 io.vertx.mutiny.pgclient.PgPool 实例:

With that you can create your FruitResource skeleton and inject a io.vertx.mutiny.pgclient.PgPool instance:

src/main/java/org/acme/vertx/FruitResource.java
package org.acme.reactive.crud;

import java.net.URI;

import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.ResponseBuilder;
import jakarta.ws.rs.core.Response.Status;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;

@Path("fruits")
public class FruitResource {

    private final PgPool client;

    public FruitResource(PgPool client) {
        this.client = client;
    }
}

Database schema and seed data

在实现 REST 端点和数据管理代码之前,我们必须设置数据库模式。在前期插入一些数据也比较方便。

Before we implement the REST endpoint and data management code, we must set up the database schema. It would also be convenient to have some data inserted up front.

对于生产,我们建议使用类似于 Flyway database migration tool 的内容。但是对于开发,我们可以在启动时简单地删除并创建表,然后插入一些水果。

For production, we would recommend to use something like the Flyway database migration tool. But for development we can simply drop and create the tables on startup, and then insert a few fruits.

/src/main/java/org/acme/reactive/crud/DBInit.java
package org.acme.reactive.crud;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.pgclient.PgPool;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class DBInit {

    private final PgPool client;
    private final boolean schemaCreate;

    public DBInit(PgPool client, @ConfigProperty(name = "myapp.schema.create", defaultValue = "true") boolean schemaCreate) {
        this.client = client;
        this.schemaCreate = schemaCreate;
    }

    void onStart(@Observes StartupEvent ev) {
        if (schemaCreate) {
            initdb();
        }
    }

    private void initdb() {
        // TODO
    }
}

您可以在 application.properties 文件中覆盖 myapp.schema.create 属性的默认值。

You might override the default value of the myapp.schema.create property in the application.properties file.

快准备好了!在开发模式下初始化数据库,我们将使用客户端简单的 query 方法。它返回一个 Uni,因此可以组合来顺序执行查询:

Almost ready! To initialize the DB in development mode, we will use the client simple query method. It returns a Uni and thus can be composed to execute queries sequentially:

client.query("DROP TABLE IF EXISTS fruits").execute()
    .flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
    .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Kiwi')").execute())
    .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Durian')").execute())
    .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pomelo')").execute())
    .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Lychee')").execute())
    .await().indefinitely();

想知道为什么我们必须在最新查询完成之前进行阻塞吗?此代码是方法的一部分,它 @Observes StartupEvent,Quarkus 以同步方式调用它。因此,过早返回会导致在数据库尚未准备就绪时处理请求。

Wondering why we must block until the latest query is completed? This code is part of a method that @Observes the StartupEvent and Quarkus invokes it synchronously. As a consequence, returning prematurely could lead to serving requests while the database is not ready yet.

就是这样!到目前为止,我们已经了解了如何配置池化客户端和执行简单查询。我们现在可以开发数据管理代码并实现我们的 RESTful 端点。

That’s it! So far we have seen how to configure a pooled client and execute simple queries. We are now ready to develop the data management code and implement our RESTful endpoint.

Using

Query results traversal

在开发模式下,数据库在 fruits 表中设置了几行。若要检索所有数据,我们将再次使用 query 方法:

In development mode, the database is set up with a few rows in the fruits table. To retrieve all the data, we will use the query method again:

/src/main/java/org/acme/reactive/crud/Fruit.java
    public static Multi<Fruit> findAll(PgPool client) {
        return client.query("SELECT id, name FROM fruits ORDER BY name ASC").execute()
                .onItem().transformToMulti(set -> Multi.createFrom().iterable(set)) (1)
                .onItem().transform(Fruit::from); (2)
    }

    private static Fruit from(Row row) {
        return new Fruit(row.getLong("id"), row.getString("name"));
    }
1 Transform the io.vertx.mutiny.sqlclient.RowSet to a Multi<Row>.
2 Convert each io.vertx.mutiny.sqlclient.Row to a Fruit.

Fruit#from 方法将 Row 实例转换为 Fruit 实例。将其提取出来是为了方便实现其他数据管理方法。

The Fruit#from method converts a Row instance to a Fruit instance. It is extracted as a convenience for the implementation of the other data management methods.

然后,添加获取后端所有水果的端点:

Then, add the endpoint to get all fruits from the backend:

src/main/java/org/acme/vertx/FruitResource.java
@GET
public Multi<Fruit> get() {
    return Fruit.findAll(client);
}

现在,使用以下方法在开发模式下启动 Quarkus:

Now start Quarkus in dev mode with:

Unresolved directive in reactive-sql-clients.adoc - include::{includes}/devtools/dev.adoc[]

最后,打开浏览器并导航到 [role="bare"][role="bare"]http://localhost:8080/fruits,您应该看到:

Lastly, open your browser and navigate to [role="bare"]http://localhost:8080/fruits, you should see:

[{"id":2,"name":"Durian"},{"id":1,"name":"Kiwi"},{"id":4,"name":"Lychee"},{"id":3,"name":"Pomelo"}]

Prepared queries

Reactive PostgreSQL 客户端还可以准备查询并获取在执行时替换 SQL 语句中的参数:

The Reactive PostgreSQL Client can also prepare queries and take parameters that are replaced in the SQL statement at execution time:

client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id))

对于 PostgreSQL,SQL 字符串可以通过使用 $1$2 、 …​等位置来引用参数。有关其他数据库,请参考 Database Clients details 部分。

For PostgreSQL, the SQL string can refer to parameters by position, using $1, $2, …​etc. Please refer to the Database Clients details section for other databases.

与简单的 query 方法类似, preparedQuery 返回 PreparedQuery<RowSet<Row>> 的实例。借助此工具,我们能够安全地使用用户提供的 id 来获取特定水果的详细信息:

Similar to the simple query method, preparedQuery returns an instance of PreparedQuery<RowSet<Row>>. Equipped with this tooling, we are able to safely use an id provided by the user to get the details of a particular fruit:

src/main/java/org/acme/vertx/Fruit.java
public static Uni<Fruit> findById(PgPool client, Long id) {
    return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id)) (1)
            .onItem().transform(RowSet::iterator) (2)
            .onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null); (3)
}
1 Create a Tuple to hold the prepared query parameters.
2 Get an Iterator for the RowSet result.
3 Create a Fruit instance from the Row if an entity was found.

在 Jakarta REST 资源中:

And in the Jakarta REST resource:

src/main/java/org/acme/vertx/FruitResource.java
@GET
@Path("{id}")
public Uni<Response> getSingle(Long id) {
    return Fruit.findById(client, id)
            .onItem().transform(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND)) (1)
            .onItem().transform(ResponseBuilder::build); (2)
}
1 Prepare a Jakarta REST response with either the Fruit instance if found or the 404 status code.
2 Build and send the response.

同样的逻辑适用于保存 Fruit 时:

The same logic applies when saving a Fruit:

src/main/java/org/acme/vertx/Fruit.java
public Uni<Long> save(PgPool client) {
    return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING id").execute(Tuple.of(name))
            .onItem().transform(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
}

在 Web 资源中,我们处理 POST 请求:

And in the web resource we handle the POST request:

src/main/java/org/acme/vertx/FruitResource.java
@POST
public Uni<Response> create(Fruit fruit) {
    return fruit.save(client)
            .onItem().transform(id -> URI.create("/fruits/" + id))
            .onItem().transform(uri -> Response.created(uri).build());
}

Result metadata

RowSet 不仅持有内存中的数据,还提供一些关于数据本身的信息,例如:

A RowSet does not only hold your data in memory, it also gives you some information about the data itself, such as:

  • the number of rows affected by the query (inserted/deleted/updated/retrieved depending on the query type),

  • the column names.

让我们用它来支持从数据库中删除水果:

Let’s use this to support removal of fruits in the database:

src/main/java/org/acme/vertx/Fruit.java
public static Uni<Boolean> delete(PgPool client, Long id) {
    return client.preparedQuery("DELETE FROM fruits WHERE id = $1").execute(Tuple.of(id))
            .onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1); (1)
}
1 Inspect metadata to determine if a fruit has been actually deleted.

并在 Web 资源中处理 HTTP DELETE 方法:

And to handle the HTTP DELETE method in the web resource:

src/main/java/org/acme/vertx/FruitResource.java
@DELETE
@Path("{id}")
public Uni<Response> delete(Long id) {
    return Fruit.delete(client, id)
            .onItem().transform(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
            .onItem().transform(status -> Response.status(status).build());
}

通过实现 GETPOSTDELETE 方法,我们现在可以创建一个最小的网页来试用 RESTful 应用程序。我们使用 jQuery 来简化与后端的交互:

With GET, POST and DELETE methods implemented, we can now create a minimal web page to try the RESTful application out. We will use jQuery to simplify interactions with the backend:

/src/main/resources/META-INF/resources/fruits.html
<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>Reactive REST - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="fruits.js"></script>
</head>
<body>

<h1>Fruits API Testing</h1>

<h2>All fruits</h2>
<div id="all-fruits"></div>

<h2>Create Fruit</h2>
<input id="fruit-name" type="text">
<button id="create-fruit-button" type="button">Create</button>
<div id="create-fruit"></div>

</body>
</html>

Quarkus 自动供应位于 META-INF/resources 目录下的静态资源。

Quarkus automatically serves static resources located under the META-INF/resources directory.

在 JavaScript 代码中,我们需要一个函数来刷新水果列表,当:

In the JavaScript code, we need a function to refresh the list of fruits when:

  • the page is loaded, or

  • a fruit is added, or

  • a fruit is deleted.

/src/main/resources/META-INF/resources/fruits.js
function refresh() {
    $.get('/fruits', function (fruits) {
        var list = '';
        (fruits || []).forEach(function (fruit) { (1)
            list = list
                + '<tr>'
                + '<td>' + fruit.id + '</td>'
                + '<td>' + fruit.name + '</td>'
                + '<td><a href="#" onclick="deleteFruit(' + fruit.id + ')">Delete</a></td>'
                + '</tr>'
        });
        if (list.length > 0) {
            list = ''
                + '<table><thead><th>Id</th><th>Name</th><th></th></thead>'
                + list
                + '</table>';
        } else {
            list = "No fruits in database"
        }
        $('#all-fruits').html(list);
    });
}

function deleteFruit(id) {
    $.ajax('/fruits/' + id, {method: 'DELETE'}).then(refresh);
}

$(document).ready(function () {

    $('#create-fruit-button').click(function () {
        var fruitName = $('#fruit-name').val();
        $.post({
            url: '/fruits',
            contentType: 'application/json',
            data: JSON.stringify({name: fruitName})
        }).then(refresh);
    });

    refresh();
});
1 The fruits parameter is not defined when the database is empty.

完成!导航至 [role="bare"][role="bare"]http://localhost:8080/fruits.html 并读取/创建/删除一些水果。

All done! Navigate to [role="bare"]http://localhost:8080/fruits.html and read/create/delete some fruits.

Database Clients details

Database Extension name Pool class name Placeholders

IBM Db2

quarkus-reactive-db2-client

io.vertx.mutiny.db2client.DB2Pool

?

MariaDB/MySQL

quarkus-reactive-mysql-client

io.vertx.mutiny.mysqlclient.MySQLPool

?

Microsoft SQL Server

quarkus-reactive-mssql-client

io.vertx.mutiny.mssqlclient.MSSQLPool

@p1, @p2, etc.

Oracle

quarkus-reactive-oracle-client

io.vertx.mutiny.oracleclient.OraclePool

?

PostgreSQL

quarkus-reactive-pg-client

io.vertx.mutiny.pgclient.PgPool

$1, $2, etc.

Transactions

反应式 SQL 客户端支持事务。事务通过 io.vertx.mutiny.sqlclient.SqlConnection#begin 启动并通过 io.vertx.mutiny.sqlclient.Transaction#commitio.vertx.mutiny.sqlclient.Transaction#rollback 终止。所有这些操作都是异步的:

The reactive SQL clients support transactions. A transaction is started with io.vertx.mutiny.sqlclient.SqlConnection#begin and terminated with either io.vertx.mutiny.sqlclient.Transaction#commit or io.vertx.mutiny.sqlclient.Transaction#rollback. All these operations are asynchronous:

  • connection.begin() returns a Uni<Transaction>

  • transaction.commit() and transaction.rollback() return Uni<Void>

在响应式编程世界中管理事务可能很繁琐。您可以使用 io.vertx.mutiny.sqlclient.Pool#withTransaction 帮助程序方法,而不是编写重复且复杂的(因而容易出错!)代码。

Managing transactions in the reactive programming world can be cumbersome. Instead of writing repetitive and complex (thus error-prone!) code, you can use the io.vertx.mutiny.sqlclient.Pool#withTransaction helper method.

以下代码段展示了如何在同一事务中运行 2 个插入:

The following snippet shows how to run 2 insertions in the same transaction:

public static Uni<Void> insertTwoFruits(PgPool client, Fruit fruit1, Fruit fruit2) {
    return client.withTransaction(conn -> {
        Uni<RowSet<Row>> insertOne = conn.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING id")
                .execute(Tuple.of(fruit1.name));
        Uni<RowSet<Row>> insertTwo = conn.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING id")
                .execute(Tuple.of(fruit2.name));

        return Uni.combine().all().unis(insertOne, insertTwo)
                // Ignore the results (the two ids)
                .discardItems();
    });
}

在此示例中,事务在成功时自动提交或在失败时回滚。

In this example, the transaction is automatically committed on success or rolled back on failure.

你还可以按照如下方式创建相关动作:

You can also create dependent actions as follows:

return client.withTransaction(conn -> conn

        .preparedQuery("INSERT INTO person (firstname,lastname) VALUES ($1,$2) RETURNING id")
        .execute(Tuple.of(person.getFirstName(), person.getLastName()))

        .onItem().transformToUni(id -> conn.preparedQuery("INSERT INTO addr (person_id,addrline1) VALUES ($1,$2)")
                .execute(Tuple.of(id.iterator().next().getLong("id"), person.getLastName())))

        .onItem().ignore().andContinueWithNull());

Working with batch query results

在执行批处理查询时,响应式 SQL 客户端会返回一个 RowSet,该批次的结果与第一个元素相对应。要获取以下批处理元素的结果,您必须调用 RowSet#next 方法,直到它返回 null

When executing batch queries, reactive SQL clients return a RowSet that corresponds to the results of the first element in the batch. To get the results of the following batch elements, you must invoke the RowSet#next method until it returns null.

假设您想要更新一些行并计算受影响的行总数。您必须检查每个 RowSet

Let’s say you want to update some rows and compute the total number of affected rows. You must inspect each RowSet:

PreparedQuery<RowSet<Row>> preparedQuery = client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2");

Uni<RowSet<Row>> rowSet = preparedQuery.executeBatch(Arrays.asList(
        Tuple.of("Orange", 1),
        Tuple.of("Pear", 2),
        Tuple.of("Apple", 3)));

Uni<Integer> totalAffected = rowSet.onItem().transform(res -> {
    int total = 0;
    do {
        total += res.rowCount(); (1)
    } while ((res = res.next()) != null); (2)
    return total;
});
1 Compute the sum of RowSet#rowCount.
2 Invoke RowSet#next until it returns null.

作为另一个示例,如果您想要加载您刚刚插入的所有行,您必须连接每个 RowSet 的内容:

As another example, if you want to load all the rows you just inserted, you must concatenate the contents of each RowSet:

PreparedQuery<RowSet<Row>> preparedQuery = client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING *");

Uni<RowSet<Row>> rowSet = preparedQuery.executeBatch(Arrays.asList(
        Tuple.of("Orange"),
        Tuple.of("Pear"),
        Tuple.of("Apple")));

// Generate a Multi of RowSet items
Multi<RowSet<Row>> rowSets = rowSet.onItem().transformToMulti(res -> {
    return Multi.createFrom().generator(() -> res, (rs, emitter) -> {
        RowSet<Row> next = null;
        if (rs != null) {
            emitter.emit(rs);
            next = rs.next();
        }
        if (next == null) {
            emitter.complete();
        }
        return next;
    });
});

// Transform each RowSet into Multi of Row items and Concatenate
Multi<Row> rows = rowSets.onItem().transformToMultiAndConcatenate(Multi.createFrom()::iterable);

Multiple Datasources

反应式 SQL 客户端支持定义多个数据源。

The reactive SQL clients support defining several datasources.

使用多个数据源的典型配置如下所示:

A typical configuration with several datasources would look like:

quarkus.datasource.db-kind=postgresql 1
quarkus.datasource.username=user-default
quarkus.datasource.password=password-default
quarkus.datasource.reactive.url=postgresql://localhost:5432/default

quarkus.datasource."additional1".db-kind=postgresql 2
quarkus.datasource."additional1".username=user-additional1
quarkus.datasource."additional1".password=password-additional1
quarkus.datasource."additional1".reactive.url=postgresql://localhost:5432/additional1

quarkus.datasource."additional2".db-kind=mysql 3
quarkus.datasource."additional2".username=user-additional2
quarkus.datasource."additional2".password=password-additional2
quarkus.datasource."additional2".reactive.url=mysql://localhost:3306/additional2
1 The default datasource - using PostgreSQL.
2 A named datasource called additional1 - using PostgreSQL.
3 A named datasource called additional2 - using MySQL.

然后你可以如下一般注入客户端:

You can then inject the clients as follows:

@Inject 1
PgPool defaultClient;

@Inject
@ReactiveDataSource("additional1") 2
PgPool additional1Client;

@Inject
@ReactiveDataSource("additional2")
MySQLPool additional2Client;
1 Injecting the client for the default datasource does not require anything special.
2 For a named datasource, you use the @ReactiveDataSource CDI qualifier with the datasource name as its value.

UNIX Domain Socket connections

可以将 PostgreSQL 和 MariaDB/MySQL 客户端配置为通过 UNIX 域套接字连接服务器。

The PostgreSQL and MariaDB/MySQL clients can be configured to connect to the server through a UNIX domain socket.

首先确保 native transport support 已启用。

First make sure that native transport support is enabled.

然后配置数据库连接 URL。此步骤取决于数据库类型。

Then configure the database connection url. This step depends on the database type.

PostgreSQL

PostgreSQL 域套接字路径采用以下形式:<directory>/.s.PGSQL.<port>

PostgreSQL domain socket paths have the following form: <directory>/.s.PGSQL.<port>

数据库连接 URL 必须进行配置,以便:

The database connection url must be configured so that:

  • the host is the directory in the socket path

  • the port is the port in the socket path

考虑以下套接字路径:/var/run/postgresql/.s.PGSQL.5432

Consider the following socket path: /var/run/postgresql/.s.PGSQL.5432.

在 `application.properties`中添加:

In application.properties add:

quarkus.datasource.reactive.url=postgresql://:5432/quarkus_test?host=/var/run/postgresql

MariaDB/MySQL

数据库连接 URL 必须进行配置,以便 `host`是套接字路径。

The database connection url must be configured so that the host is the socket path.

考虑以下套接字路径:/var/run/mysqld/mysqld.sock

Consider the following socket path: /var/run/mysqld/mysqld.sock.

在 `application.properties`中添加:

In application.properties add:

quarkus.datasource.reactive.url=mysql:///quarkus_test?host=/var/run/mysqld/mysqld.sock

Load-balancing connections

响应式 PostgreSQL 和 MariaDB/MySQL 客户端支持定义多种连接。

The reactive PostgreSQL and MariaDB/MySQL clients support defining several connections.

具有多种连接的典型配置如下:

A typical configuration with several connections would look like:

quarkus.datasource.reactive.url=postgresql://host1:5432/default,postgresql://host2:5432/default,postgresql://host3:5432/default

这也可以用索引属性语法编写:

This can also be written with indexed property syntax:

quarkus.datasource.reactive.url[0]=postgresql://host1:5432/default
quarkus.datasource.reactive.url[1]=postgresql://host2:5432/default
quarkus.datasource.reactive.url[2]=postgresql://host3:5432/default

Pooled connection idle-timeout

响应式数据源可以使用 `idle-timeout`进行配置。它是连接在池中保持未使用的最长时间,然后再关闭。

Reactive datasources can be configured with an idle-timeout. It is the maximum time a connection remains unused in the pool before it is closed.

`idle-timeout`在默认情况下被禁用。

The idle-timeout is disabled by default.

例如,你可以在 60 分钟后使空闲连接失效:

For example, you could expire idle connections after 60 minutes:

quarkus.datasource.reactive.idle-timeout=PT60M

Pooled Connection max-lifetime

除了 `idle-timeout`之外,响应式数据源还可以使用 `max-lifetime`进行配置。它是连接在池中保持的最长时间,然后根据需要关闭并替换。`max-lifetime`确保池有最新配置的最新连接。

In addition to idle-timeout, reactive datasources can also be configured with a max-lifetime. It is the maximum time a connection remains in the pool before it is closed and replaced as needed. The max-lifetime allows ensuring the pool has fresh connections with up-to-date configuration.

`max-lifetime`在默认情况下被禁用,但在使用能提供限时凭据的凭据提供程序(如 Vault credentials provider)时,它是重要的配置。

The max-lifetime is disabled by default but is an important configuration when using a credentials provider that provides time limited credentials, like the Vault credentials provider.

例如,你可以确保连接在 60 分钟后被回收:

For example, you could ensure connections are recycled after 60 minutes:

quarkus.datasource.reactive.max-lifetime=PT60M

Customizing pool creation

有时,只通过声明无法配置数据库连接池。

Sometimes, the database connection pool cannot be configured only by declaration.

例如,您可能必须读取生产中才有的特定文件,或者从专有配置服务器中检索配置数据。

For example, you might have to read a specific file only present in production, or retrieve configuration data from a proprietary configuration server.

在这种情况下,您可以通过创建实现取决于目标数据库的接口的类来自定义池创建:

In this case, you can customize pool creation by creating a class implementing an interface which depends on the target database:

Database Pool creator class name

IBM Db2

io.quarkus.reactive.db2.client.DB2PoolCreator

MariaDB/MySQL

io.quarkus.reactive.mysql.client.MySQLPoolCreator

Microsoft SQL Server

io.quarkus.reactive.mssql.client.MSSQLPoolCreator

Oracle

io.quarkus.reactive.oracle.client.OraclePoolCreator

PostgreSQL

io.quarkus.reactive.pg.client.PgPoolCreator

voici un exemple pour PostgreSQL :

Here’s an example for PostgreSQL:

import jakarta.inject.Singleton;

import io.quarkus.reactive.pg.client.PgPoolCreator;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

@Singleton
public class CustomPgPoolCreator implements PgPoolCreator {

    @Override
    public PgPool create(Input input) {
        PgConnectOptions connectOptions = input.pgConnectOptions();
        PoolOptions poolOptions = input.poolOptions();
        // Customize connectOptions, poolOptions or both, as required
        return PgPool.pool(input.vertx(), connectOptions, poolOptions);
    }
}

Pipelining

Les clients PostgreSQL et MariaDB/MySQL prennent en charge le traitement pipeliné des requêtes au niveau de la connexion. Cette fonctionnalité consiste à envoyer plusieurs requêtes sur la même connexion à la base de données sans attendre les réponses correspondantes.

The PostgreSQL and MariaDB/MySQL clients support pipelining of queries at the connection level. The feature consists in sending multiple queries on the same database connection without waiting for the corresponding responses.

Dans certains cas d’utilisation, le traitement en pipeline des requêtes peut améliorer les performances d’accès à la base de données.

In some use cases, query pipelining can improve database access performance.

voici un exemple pour PostgreSQL :

Here’s an example for PostgreSQL:

import jakarta.inject.Inject;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;

public class PipeliningExample {

    @Inject
    PgPool client;

    public Uni<String> favoriteFruitAndVegetable() {
        // Explicitly acquire a connection
        return client.withConnection(conn -> {
            Uni<String> favoriteFruit = conn.query("SELECT name FROM fruits WHERE preferred IS TRUE").execute()
                    .onItem().transform(rows -> rows.iterator().next().getString("name"));
            Uni<String> favoriteVegetable = conn.query("SELECT name FROM vegetables WHERE preferred IS TRUE").execute()
                    .onItem().transform(rows -> rows.iterator().next().getString("name"));
            // favoriteFruit and favoriteVegetable unis will be subscribed at the same time
            return Uni.combine().all().unis(favoriteFruit, favoriteVegetable)
                    .combinedWith(PipeliningExample::formatMessage);
        });
    }

    private static String formatMessage(String fruit, String vegetable) {
        return String.format("The favorite fruit is %s and the favorite vegetable is %s", fruit, vegetable);
    }
}

Le nombre maximal de requêtes en pipeline est configuré avec la propriété pipelining-limit :

The maximum number of pipelined queries is configured with the pipelining-limit property:

# For PostgreSQL
quarkus.datasource.reactive.postgresql.pipelining-limit=256
# For MariaDB/MySQL
quarkus.datasource.reactive.mysql.pipelining-limit=256

Par défaut, pipelining-limit est défini sur 256.

By default, pipelining-limit is set to 256.

Configuration Reference

Common Datasource

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-datasource.adoc[]

Reactive Datasource

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-reactive-datasource.adoc[]

IBM Db2

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-reactive-db2-client.adoc[]

MariaDB/MySQL

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-reactive-mysql-client.adoc[]

Microsoft SQL Server

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-reactive-mssql-client.adoc[]

Oracle

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-reactive-oracle-client.adoc[]

PostgreSQL

Unresolved directive in reactive-sql-clients.adoc - include::{generated-dir}/config/quarkus-reactive-pg-client.adoc[]