Sessions & Transactions

从 3.6 版开始,MongoDB 支持会话的概念。会话的使用启用了 MongoDB 的 Causal Consistency 模型,该模型确保按其因果关系运行操作。它们被分为 ServerSession 实例和 ClientSession 实例。在本部分中,当我们讨论会话时,我们指的是 ClientSession

客户端会话中的操作与会话外部的操作无关。

MongoOperationsReactiveMongoOperations 都提供了将 ClientSession 与操作关联的网关方法。MongoCollectionMongoDatabase 使用实现 MongoDB 的集合和数据库界面的会话代理对象,因此无需在每次调用时添加会话。这意味着对 MongoCollection#find() 的潜在调用将委托给 MongoCollection#find(ClientSession)

(Reactive)MongoOperations#getCollection 等方法返回原生 MongoDB Java 驱动程序网关对象(例如 MongoCollection),它们自己为 ClientSession 提供专用方法。这些方法被 NOT 会话代理。在直接与 MongoCollectionMongoDatabase 交互而不是通过 MongoOperations 上的 #execute 的一个回调交互时,应在需要时提供 ClientSession

ClientSession support

以下示例展示了会话的用法:

  • Imperative

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();

ClientSession session = client.startSession(sessionOptions); 1

template.withSession(() -> session)
    .execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  2

        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);

        action.insert(azoth);                                3

        return azoth;
    });

session.close()                                              4
1 从服务器获取一个新的会话。
2 与以前一样使用 MongoOperation 方法。ClientSession 会自动应用。
3 确保关闭 ClientSession
4 Close the session.

在处理 DBRef 实例时,尤其是延迟加载的实例,在加载所有数据之前关闭 ClientSession 至关重要。否则,延迟获取失败。

Reactive
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

Publisher<ClientSession> session = client.startSession(sessionOptions); 1

template.withSession(session)
.execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {

                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);

                return action.insert(azoth);                            2
            });
    }, ClientSession::close)                                            3
    .subscribe();                                                       4
1 获取一个用于检索新会话的 Publisher
2 与以前一样使用 ReactiveMongoOperation 方法。ClientSession 会自动获取并应用。
3 确保关闭 ClientSession
4 在您订阅之前不会发生任何事情。有关详细信息,请参见 the Project Reactor Reference Guide

通过使用提供实际会话的 Publisher,您可以将会话获取推迟到实际订阅之时。但是,完成时仍需关闭会话,以免在服务器中污染陈旧的会话。对 execute 使用 doFinally 钩子,在不再需要会话时调用 ClientSession#close()。如果您希望对会话本身有更多控制权,可以通过驱动程序获取 ClientSession,并通过 Supplier 提供它。

ClientSession 的反应式使用仅限于模板 API 使用。目前没有与反应式存储库的会话集成。

MongoDB Transactions

从版本 4 开始,MongoDB 支持 Transactions。事务建立在 Sessions 之上,因此需要一个活动的 ClientSession

除非在应用程序上下文中指定了 MongoTransactionManager,否则事务支持将为 DISABLED。您可以使用 setSessionSynchronization(ALWAYS) 参与正在进行的非原生 MongoDB 事务。

要对事务获得充分的编程控制权,您可能希望对 MongoOperations 使用会话回调。

以下示例展示了项目的事务控制:

Programmatic transactions
  • Imperative

ClientSession session = client.startSession(options);                   1

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     2

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                3

        } catch (RuntimeException e) {
            session.abortTransaction();                                 4
        }
    }, ClientSession::close)                                            5
1 Obtain a new ClientSession.
2 Start the transaction.
3 如果一切按预期进行,则提交更改。
4 出现问题,因此回滚所有操作。
5 完成后不要忘记关闭会话。

前面的示例让您能够完全控制事务行为,同时在回调中使用会话作用域的 MongoOperations 实例,以确保将会话传递给每次服务器调用。要避免这种方法带来的某些开销,您可以使用 TransactionTemplate 来消除手动事务流的一些噪声。

Reactive
Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             1

    .flatMap(session -> {
        session.startTransaction();                                                          2

        return Mono.from(collection.deleteMany(session, ...))                                3

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   4

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     5

            .doFinally(signal -> session.close());                                           6
      });
1 首先我们显然需要初始化会话。
2 一旦我们掌握了 ClientSession,就启动事务。
3 通过将 `ClientSession`传递给操作来在事务中操作。
4 如果操作异常完成,我们需要停止事务并保留错误。
5 或者当然,在成功的情况下提交更改。仍然保留操作结果。
6 最后,我们需要确保关闭会话。

上述操作的罪魁祸首是保留主要流的 DeleteResult,而不是保留通过 commitTransaction()abortTransaction() 发布的事务结果,这会导致相当复杂的设置。

除非在应用程序上下文中指定了 ReactiveMongoTransactionManager,否则事务支持将为 DISABLED。您可以使用 setSessionSynchronization(ALWAYS) 参与正在进行的非原生 MongoDB 事务。

Transactions with TransactionTemplate / TransactionalOperator

Spring Data MongoDB 事务同时支持 TransactionTemplateTransactionalOperator

Transactions with TransactionTemplate / TransactionalOperator
  • Imperative

template.setSessionSynchronization(ALWAYS);                                     1

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         2

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     3

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    }
});
1 在模板 API 配置期间启用事务同步。
2 使用提供的 PlatformTransactionManager 创建 TransactionTemplate
3 在回调中,ClientSession 和事务已注册。

在运行时更改 MongoTemplate 的状态(就像您可能认为前述清单第 1 项中那样是可能的)会导致线程和可见性问题。

Reactive
template.setSessionSynchronization(ALWAYS);                                          1

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              2


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         3
    .then();
1 为事务参与启用事务同步。
2 使用提供的 ReactiveTransactionManager 创建 TransactionalOperator
3 TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。

Transactions with MongoTransactionManager & ReactiveMongoTransactionManager

MongoTransactionManager / ReactiveMongoTransactionManager 是通往著名的 Spring 事务支持的网关。它让应用程序能够使用 the managed transaction features of SpringMongoTransactionManagerClientSession 绑定到线程,而 ReactiveMongoTransactionManager 为此使用 ReactorContextMongoTemplate 检测会话并相应地对与事务相关的这些资源进行操作。MongoTemplate 还可以参与其他正在进行的事务。以下示例演示如何使用 MongoTransactionManager 创建和使用事务:

Transactions with MongoTransactionManager / ReactiveMongoTransactionManager
  • Imperative

@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  1
        return new MongoTransactionManager(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        2

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
1 在应用程序上下文中注册 MongoTransactionManager
2 Mark methods as transactional.

@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求。

Reactive
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  1
        return new ReactiveMongoTransactionManager(factory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  2

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
1 在应用程序上下文中注册 ReactiveMongoTransactionManager
2 Mark methods as transactional.

@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求。

Controlling MongoDB-specific Transaction Options

事务服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })

默认情况下,使用 mongo: 前缀的标签名称空间由默认配置的 MongoTransactionOptionsResolver 进行评估。事务标签由 TransactionAttribute 提供,开发者可以通过 TransactionTemplateTransactionalOperator 对其进行事务控制。由于其声明式特性,@Transactional(label = …) 提供了一个良好的起点,也可以用作文档。

目前,支持以下选项:

Max Commit Time

Controls the maximum execution time on the server for the commitTransaction operation. The format of the value corresponds with ISO-8601 duration format as used with Duration.parse(…).

用法:mongo:maxCommitTime=PT1S

Read Concern

Sets the read concern for the transaction.

用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

Read Preference

Sets the read preference for the transaction.

用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

Write Concern

Sets the write concern for the transaction.

用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

加入外部事务的嵌套事务不会影响初始事务选项,因为事务已启动。事务选项仅在启动新事务时才应用。

Special behavior inside transactions

在事务中,MongoDB 服务器具有略有不同的行为。

连接设置

MongoDB 驱动程序提供了一个专用的副本集名称配置选项,可以将驱动程序转为自动检测模式。该选项有助于在事务期间识别主副本集节点和命令路由。

确保向 MongoDB URI 添加`replicaSet`。有关详细信息,请参阅 connection string options

集合操作

MongoDB 不支持事务中的集合操作,例如集合创建。这也影响了首次使用时发生的即时集合创建。因此,请确保所有必需的结构都已到位。

瞬态错误

MongoDB 可以为事务操作期间引发的错误添加特殊标签。这些标签可能表明瞬态故障,只需重试操作即可消除。我们强烈建议将 Spring Retry用于这些目的。不过,可以覆盖 `MongoTransactionManager#doCommit(MongoTransactionObject)`以实现 Retry Commit Operation行为,如 MongoDB 参考手册中所述。

计算

MongoDB count 在集合统计数据上进行操作,这些统计数据可能无法反映事务中的实际情况。当在多文档事务中发出 count 命令时,服务器会以 error 50851 响应。一旦 MongoTemplate 检测到活动事务,所有公开的 count() 方法都将被转换并委托给聚合框架,使用 $match$count 运算符,保留 Query 设置,例如 collation

在聚合 count 助手内部使用地理命令时,会应用一些限制。以下运算符不可用,必须替换为其他运算符:

  • $where$expr

  • $near$geoWithin$center

  • $nearSphere$geoWithin$centerSphere

使用 Criteria.near(…)`和 `Criteria.nearSphere(…)`的查询必须分别重写为 `Criteria.within(…)`和 `Criteria.withinSphere(…)。对于信息库查询方法中的 near`查询关键字,也适用相同的规则,必须更改为 `within。另请参阅 MongoDB JIRA 问题单 DRIVERS-518以供进一步参考。

以下片段显示了在会话绑定闭包中使用 count

session.startTransaction();

template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...

上面的片段具体化为以下命令:

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)

而不是:

db.collection.find( { state: "active" } ).count()