Sessions & Transactions

从 3.6 版开始,MongoDB 支持会话的概念。会话的使用启用了 MongoDB 的 Causal Consistency 模型,该模型确保按其因果关系运行操作。它们被分为 ServerSession 实例和 ClientSession 实例。在本部分中,当我们讨论会话时,我们指的是 ClientSession

As of version 3.6, MongoDB supports the concept of sessions. The use of sessions enables MongoDB’s Causal Consistency model, which guarantees running operations in an order that respects their causal relationships. Those are split into ServerSession instances and ClientSession instances. In this section, when we speak of a session, we refer to ClientSession.

客户端会话中的操作与会话外部的操作无关。

Operations within a client session are not isolated from operations outside the session.

MongoOperationsReactiveMongoOperations 都提供了将 ClientSession 与操作关联的网关方法。MongoCollectionMongoDatabase 使用实现 MongoDB 的集合和数据库界面的会话代理对象,因此无需在每次调用时添加会话。这意味着对 MongoCollection#find() 的潜在调用将委托给 MongoCollection#find(ClientSession)

Both MongoOperations and ReactiveMongoOperations provide gateway methods for tying a ClientSession to the operations. MongoCollection and MongoDatabase use session proxy objects that implement MongoDB’s collection and database interfaces, so you need not add a session on each call. This means that a potential call to MongoCollection#find() is delegated to MongoCollection#find(ClientSession).

(Reactive)MongoOperations#getCollection 等方法返回原生 MongoDB Java 驱动程序网关对象(例如 MongoCollection),它们自己为 ClientSession 提供专用方法。这些方法被 NOT 会话代理。在直接与 MongoCollectionMongoDatabase 交互而不是通过 MongoOperations 上的 #execute 的一个回调交互时,应在需要时提供 ClientSession

Methods such as (Reactive)MongoOperations#getCollection return native MongoDB Java Driver gateway objects (such as MongoCollection) that themselves offer dedicated methods for ClientSession. These methods are NOT session-proxied. You should provide the ClientSession where needed when interacting directly with a MongoCollection or MongoDatabase and not through one of the #execute callbacks on MongoOperations.

ClientSession support

以下示例展示了会话的用法:

The following example shows the usage of a session:

  • Imperative

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();

ClientSession session = client.startSession(sessionOptions); 1

template.withSession(() -> session)
    .execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  2

        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);

        action.insert(azoth);                                3

        return azoth;
    });

session.close()                                              4
1 Obtain a new session from the server.
2 Use MongoOperation methods as before. The ClientSession gets applied automatically.
3 Make sure to close the ClientSession.
4 Close the session.

在处理 DBRef 实例时,尤其是延迟加载的实例,在加载所有数据之前关闭 ClientSession 至关重要。否则,延迟获取失败。

When dealing with DBRef instances, especially lazily loaded ones, it is essential to not close the ClientSession before all data is loaded. Otherwise, lazy fetch fails.

Reactive
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

Publisher<ClientSession> session = client.startSession(sessionOptions); 1

template.withSession(session)
.execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {

                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);

                return action.insert(azoth);                            2
            });
    }, ClientSession::close)                                            3
    .subscribe();                                                       4
1 Obtain a Publisher for new session retrieval.
2 Use ReactiveMongoOperation methods as before. The ClientSession is obtained and applied automatically.
3 Make sure to close the ClientSession.
4 Nothing happens until you subscribe. See the Project Reactor Reference Guide for details.

通过使用提供实际会话的 Publisher,您可以将会话获取推迟到实际订阅之时。但是,完成时仍需关闭会话,以免在服务器中污染陈旧的会话。对 execute 使用 doFinally 钩子,在不再需要会话时调用 ClientSession#close()。如果您希望对会话本身有更多控制权,可以通过驱动程序获取 ClientSession,并通过 Supplier 提供它。

By using a Publisher that provides the actual session, you can defer session acquisition to the point of actual subscription. Still, you need to close the session when done, so as to not pollute the server with stale sessions. Use the doFinally hook on execute to call ClientSession#close() when you no longer need the session. If you prefer having more control over the session itself, you can obtain the ClientSession through the driver and provide it through a Supplier.

ClientSession 的反应式使用仅限于模板 API 使用。目前没有与反应式存储库的会话集成。

Reactive use of ClientSession is limited to Template API usage. There’s currently no session integration with reactive repositories.

MongoDB Transactions

从版本 4 开始,MongoDB 支持 Transactions。事务建立在 Sessions 之上,因此需要一个活动的 ClientSession

As of version 4, MongoDB supports Transactions. Transactions are built on top of Sessions and, consequently, require an active ClientSession.

除非在应用程序上下文中指定了 MongoTransactionManager,否则事务支持将为 DISABLED。您可以使用 setSessionSynchronization(ALWAYS) 参与正在进行的非原生 MongoDB 事务。

Unless you specify a MongoTransactionManager within your application context, transaction support is DISABLED. You can use setSessionSynchronization(ALWAYS) to participate in ongoing non-native MongoDB transactions.

要对事务获得充分的编程控制权,您可能希望对 MongoOperations 使用会话回调。

To get full programmatic control over transactions, you may want to use the session callback on MongoOperations.

以下示例展示了项目的事务控制:

The following example shows programmatic transaction control:

Programmatic transactions
  • Imperative

ClientSession session = client.startSession(options);                   1

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     2

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                3

        } catch (RuntimeException e) {
            session.abortTransaction();                                 4
        }
    }, ClientSession::close)                                            5
1 Obtain a new ClientSession.
2 Start the transaction.
3 If everything works out as expected, commit the changes.
4 Something broke, so roll back everything.
5 Do not forget to close the session when done.

前面的示例让您能够完全控制事务行为,同时在回调中使用会话作用域的 MongoOperations 实例,以确保将会话传递给每次服务器调用。要避免这种方法带来的某些开销,您可以使用 TransactionTemplate 来消除手动事务流的一些噪声。

The preceding example lets you have full control over transactional behavior while using the session scoped MongoOperations instance within the callback to ensure the session is passed on to every server call. To avoid some of the overhead that comes with this approach, you can use a TransactionTemplate to take away some of the noise of manual transaction flow.

Reactive
Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             1

    .flatMap(session -> {
        session.startTransaction();                                                          2

        return Mono.from(collection.deleteMany(session, ...))                                3

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   4

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     5

            .doFinally(signal -> session.close());                                           6
      });
1 First we obviously need to initiate the session.
2 Once we have the ClientSession at hand, start the transaction.
3 Operate within the transaction by passing on the ClientSession to the operation.
4 If the operations completes exceptionally, we need to stop the transaction and preserve the error.
5 Or of course, commit the changes in case of success. Still preserving the operations result.
6 Lastly, we need to make sure to close the session.

上述操作的罪魁祸首是保留主要流的 DeleteResult,而不是保留通过 commitTransaction()abortTransaction() 发布的事务结果,这会导致相当复杂的设置。

The culprit of the above operation is in keeping the main flows DeleteResult instead of the transaction outcome published via either commitTransaction() or abortTransaction(), which leads to a rather complicated setup.

除非在应用程序上下文中指定了 ReactiveMongoTransactionManager,否则事务支持将为 DISABLED。您可以使用 setSessionSynchronization(ALWAYS) 参与正在进行的非原生 MongoDB 事务。

Unless you specify a ReactiveMongoTransactionManager within your application context, transaction support is DISABLED. You can use setSessionSynchronization(ALWAYS) to participate in ongoing non-native MongoDB transactions.

Transactions with TransactionTemplate / TransactionalOperator

Spring Data MongoDB 事务同时支持 TransactionTemplateTransactionalOperator

Spring Data MongoDB transactions support both TransactionTemplate and TransactionalOperator.

Transactions with TransactionTemplate / TransactionalOperator
  • Imperative

template.setSessionSynchronization(ALWAYS);                                     1

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         2

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     3

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    }
});
1 Enable transaction synchronization during Template API configuration.
2 Create the TransactionTemplate using the provided PlatformTransactionManager.
3 Within the callback the ClientSession and transaction are already registered.

在运行时更改 MongoTemplate 的状态(就像您可能认为前述清单第 1 项中那样是可能的)会导致线程和可见性问题。

Changing state of MongoTemplate during runtime (as you might think would be possible in item 1 of the preceding listing) can cause threading and visibility issues.

Reactive
template.setSessionSynchronization(ALWAYS);                                          1

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              2


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         3
    .then();
1 Enable transaction synchronization for Transactional participation.
2 Create the TransactionalOperator using the provided ReactiveTransactionManager.
3 TransactionalOperator.transactional(…) provides transaction management for all upstream operations.

Transactions with MongoTransactionManager & ReactiveMongoTransactionManager

MongoTransactionManager / ReactiveMongoTransactionManager 是通往著名的 Spring 事务支持的网关。它让应用程序能够使用 the managed transaction features of SpringMongoTransactionManagerClientSession 绑定到线程,而 ReactiveMongoTransactionManager 为此使用 ReactorContextMongoTemplate 检测会话并相应地对与事务相关的这些资源进行操作。MongoTemplate 还可以参与其他正在进行的事务。以下示例演示如何使用 MongoTransactionManager 创建和使用事务:

MongoTransactionManager / ReactiveMongoTransactionManager is the gateway to the well known Spring transaction support. It lets applications use the managed transaction features of Spring. The MongoTransactionManager binds a ClientSession to the thread whereas the ReactiveMongoTransactionManager is using the ReactorContext for this. MongoTemplate detects the session and operates on these resources which are associated with the transaction accordingly. MongoTemplate can also participate in other, ongoing transactions. The following example shows how to create and use transactions with a MongoTransactionManager:

Transactions with MongoTransactionManager / ReactiveMongoTransactionManager
  • Imperative

@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  1
        return new MongoTransactionManager(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        2

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
1 Register MongoTransactionManager in the application context.
2 Mark methods as transactional.

@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求。

@Transactional(readOnly = true) advises MongoTransactionManager to also start a transaction that adds the ClientSession to outgoing requests.

Reactive
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  1
        return new ReactiveMongoTransactionManager(factory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  2

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
1 Register ReactiveMongoTransactionManager in the application context.
2 Mark methods as transactional.

@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求。

@Transactional(readOnly = true) advises ReactiveMongoTransactionManager to also start a transaction that adds the ClientSession to outgoing requests.

Controlling MongoDB-specific Transaction Options

事务服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })

Transactional service methods can require specific transaction options to run a transaction. Spring Data MongoDB’s transaction managers support evaluation of transaction labels such as @Transactional(label = { "mongo:readConcern=available" }).

默认情况下,使用 mongo: 前缀的标签名称空间由默认配置的 MongoTransactionOptionsResolver 进行评估。事务标签由 TransactionAttribute 提供,开发者可以通过 TransactionTemplateTransactionalOperator 对其进行事务控制。由于其声明式特性,@Transactional(label = …) 提供了一个良好的起点,也可以用作文档。

By default, the label namespace using the mongo: prefix is evaluated by MongoTransactionOptionsResolver that is configured by default. Transaction labels are provided by TransactionAttribute and available to programmatic transaction control through TransactionTemplate and TransactionalOperator. Due to their declarative nature, @Transactional(label = …) provides a good starting point that also can serve as documentation.

目前,支持以下选项:

Currently, the following options are supported:

Max Commit Time

Controls the maximum execution time on the server for the commitTransaction operation. The format of the value corresponds with ISO-8601 duration format as used with Duration.parse(…).

用法:mongo:maxCommitTime=PT1S

Usage: mongo:maxCommitTime=PT1S

Read Concern

Sets the read concern for the transaction.

用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

Usage: mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

Read Preference

Sets the read preference for the transaction.

用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

Usage: mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

Write Concern

Sets the write concern for the transaction.

用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

Usage: mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

加入外部事务的嵌套事务不会影响初始事务选项,因为事务已启动。事务选项仅在启动新事务时才应用。

Nested transactions that join the outer transaction do not affect the initial transaction options as the transaction is already started. Transaction options are only applied when a new transaction is started.

Special behavior inside transactions

在事务中,MongoDB 服务器具有略有不同的行为。

Inside transactions, MongoDB server has a slightly different behavior.

连接设置

Connection Settings

MongoDB 驱动程序提供了一个专用的副本集名称配置选项,可以将驱动程序转为自动检测模式。该选项有助于在事务期间识别主副本集节点和命令路由。

The MongoDB drivers offer a dedicated replica set name configuration option turing the driver into auto-detection mode. This option helps identify the primary replica set nodes and command routing during a transaction.

确保向 MongoDB URI 添加`replicaSet`。有关详细信息,请参阅 connection string options

Make sure to add replicaSet to the MongoDB URI. Please refer to connection string options for further details.

集合操作

Collection Operations

MongoDB 不支持事务中的集合操作,例如集合创建。这也影响了首次使用时发生的即时集合创建。因此,请确保所有必需的结构都已到位。

MongoDB does not support collection operations, such as collection creation, within a transaction. This also affects the on the fly collection creation that happens on first usage. Therefore make sure to have all required structures in place.

瞬态错误

Transient Errors

MongoDB 可以为事务操作期间引发的错误添加特殊标签。这些标签可能表明瞬态故障,只需重试操作即可消除。我们强烈建议将 Spring Retry用于这些目的。不过,可以覆盖 `MongoTransactionManager#doCommit(MongoTransactionObject)`以实现 Retry Commit Operation行为,如 MongoDB 参考手册中所述。

MongoDB can add special labels to errors raised during transactional operations. Those may indicate transient failures that might vanish by merely retrying the operation. We highly recommend Spring Retry for those purposes. Nevertheless one may override MongoTransactionManager#doCommit(MongoTransactionObject) to implement a Retry Commit Operation behavior as outlined in the MongoDB reference manual.

计算

Count

MongoDB count 在集合统计数据上进行操作,这些统计数据可能无法反映事务中的实际情况。当在多文档事务中发出 count 命令时,服务器会以 error 50851 响应。一旦 MongoTemplate 检测到活动事务,所有公开的 count() 方法都将被转换并委托给聚合框架,使用 $match$count 运算符,保留 Query 设置,例如 collation

MongoDB count operates upon collection statistics which may not reflect the actual situation within a transaction. The server responds with error 50851 when issuing a count command inside of a multi-document transaction. Once MongoTemplate detects an active transaction, all exposed count() methods are converted and delegated to the aggregation framework using $match and $count operators, preserving Query settings, such as collation.

在聚合 count 助手内部使用地理命令时,会应用一些限制。以下运算符不可用,必须替换为其他运算符:

Restrictions apply when using geo commands inside of the aggregation count helper. The following operators cannot be used and must be replaced with a different operator:

  • $where$expr

  • $near$geoWithin with $center

  • $nearSphere$geoWithin with $centerSphere

使用 Criteria.near(…)`和 `Criteria.nearSphere(…)`的查询必须分别重写为 `Criteria.within(…)`和 `Criteria.withinSphere(…)。对于信息库查询方法中的 near`查询关键字,也适用相同的规则,必须更改为 `within。另请参阅 MongoDB JIRA 问题单 DRIVERS-518以供进一步参考。

Queries using Criteria.near(…) and Criteria.nearSphere(…) must be rewritten to Criteria.within(…) respective Criteria.withinSphere(…). Same applies for the near query keyword in repository query methods that must be changed to within. See also MongoDB JIRA ticket DRIVERS-518 for further reference.

以下片段显示了在会话绑定闭包中使用 count

The following snippet shows count usage inside the session-bound closure:

session.startTransaction();

template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...

上面的片段具体化为以下命令:

The snippet above materializes in the following command:

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)

而不是:

instead of:

db.collection.find( { state: "active" } ).count()