Sessions & Transactions
从 3.6 版开始,MongoDB 支持会话的概念。会话的使用启用了 MongoDB 的 Causal Consistency 模型,该模型确保按其因果关系运行操作。它们被分为 ServerSession
实例和 ClientSession
实例。在本部分中,当我们讨论会话时,我们指的是 ClientSession
。
客户端会话中的操作与会话外部的操作无关。
MongoOperations
和 ReactiveMongoOperations
都提供了将 ClientSession
与操作关联的网关方法。MongoCollection
和 MongoDatabase
使用实现 MongoDB 的集合和数据库界面的会话代理对象,因此无需在每次调用时添加会话。这意味着对 MongoCollection#find()
的潜在调用将委托给 MongoCollection#find(ClientSession)
。
|
ClientSession support
以下示例展示了会话的用法:
-
Imperative
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); 1
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); 2
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); 3
return azoth;
});
session.close() 4
1 | 从服务器获取一个新的会话。 |
2 | 与以前一样使用 MongoOperation 方法。ClientSession 会自动应用。 |
3 | 确保关闭 ClientSession 。 |
4 | Close the session. |
在处理 DBRef
实例时,尤其是延迟加载的实例,在加载所有数据之前关闭 ClientSession
至关重要。否则,延迟获取失败。
- Reactive
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); 1
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); 2
});
}, ClientSession::close) 3
.subscribe(); 4
1 | 获取一个用于检索新会话的 Publisher 。 |
2 | 与以前一样使用 ReactiveMongoOperation 方法。ClientSession 会自动获取并应用。 |
3 | 确保关闭 ClientSession 。 |
4 | 在您订阅之前不会发生任何事情。有关详细信息,请参见 the Project Reactor Reference Guide。 |
通过使用提供实际会话的 Publisher
,您可以将会话获取推迟到实际订阅之时。但是,完成时仍需关闭会话,以免在服务器中污染陈旧的会话。对 execute
使用 doFinally
钩子,在不再需要会话时调用 ClientSession#close()
。如果您希望对会话本身有更多控制权,可以通过驱动程序获取 ClientSession
,并通过 Supplier
提供它。
|
MongoDB Transactions
从版本 4 开始,MongoDB 支持 Transactions。事务建立在 Sessions 之上,因此需要一个活动的 ClientSession
。
除非在应用程序上下文中指定了 |
要对事务获得充分的编程控制权,您可能希望对 MongoOperations
使用会话回调。
以下示例展示了项目的事务控制:
-
Imperative
ClientSession session = client.startSession(options); 1
template.withSession(session)
.execute(action -> {
session.startTransaction(); 2
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); 3
} catch (RuntimeException e) {
session.abortTransaction(); 4
}
}, ClientSession::close) 5
1 | Obtain a new ClientSession . |
2 | Start the transaction. |
3 | 如果一切按预期进行,则提交更改。 |
4 | 出现问题,因此回滚所有操作。 |
5 | 完成后不要忘记关闭会话。 |
前面的示例让您能够完全控制事务行为,同时在回调中使用会话作用域的 MongoOperations
实例,以确保将会话传递给每次服务器调用。要避免这种方法带来的某些开销,您可以使用 TransactionTemplate
来消除手动事务流的一些噪声。
- Reactive
Mono<DeleteResult> result = Mono
.from(client.startSession()) 1
.flatMap(session -> {
session.startTransaction(); 2
return Mono.from(collection.deleteMany(session, ...)) 3
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) 4
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) 5
.doFinally(signal -> session.close()); 6
});
1 | 首先我们显然需要初始化会话。 |
2 | 一旦我们掌握了 ClientSession ,就启动事务。 |
3 | 通过将 `ClientSession`传递给操作来在事务中操作。 |
4 | 如果操作异常完成,我们需要停止事务并保留错误。 |
5 | 或者当然,在成功的情况下提交更改。仍然保留操作结果。 |
6 | 最后,我们需要确保关闭会话。 |
上述操作的罪魁祸首是保留主要流的 DeleteResult
,而不是保留通过 commitTransaction()
或 abortTransaction()
发布的事务结果,这会导致相当复杂的设置。
除非在应用程序上下文中指定了 |
Transactions with TransactionTemplate / TransactionalOperator
Spring Data MongoDB 事务同时支持 TransactionTemplate
和 TransactionalOperator
。
TransactionTemplate
/ TransactionalOperator
-
Imperative
template.setSessionSynchronization(ALWAYS); 1
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); 2
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { 3
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | 在模板 API 配置期间启用事务同步。 |
2 | 使用提供的 PlatformTransactionManager 创建 TransactionTemplate 。 |
3 | 在回调中,ClientSession 和事务已注册。 |
在运行时更改 MongoTemplate
的状态(就像您可能认为前述清单第 1 项中那样是可能的)会导致线程和可见性问题。
- Reactive
template.setSessionSynchronization(ALWAYS); 1
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); 2
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) 3
.then();
1 | 为事务参与启用事务同步。 |
2 | 使用提供的 ReactiveTransactionManager 创建 TransactionalOperator 。 |
3 | TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。 |
Transactions with MongoTransactionManager & ReactiveMongoTransactionManager
MongoTransactionManager
/ ReactiveMongoTransactionManager
是通往著名的 Spring 事务支持的网关。它让应用程序能够使用 the managed transaction features of Spring。MongoTransactionManager
将 ClientSession
绑定到线程,而 ReactiveMongoTransactionManager
为此使用 ReactorContext
。MongoTemplate
检测会话并相应地对与事务相关的这些资源进行操作。MongoTemplate
还可以参与其他正在进行的事务。以下示例演示如何使用 MongoTransactionManager
创建和使用事务:
MongoTransactionManager
/ ReactiveMongoTransactionManager
-
Imperative
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { 1
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { 2
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | 在应用程序上下文中注册 MongoTransactionManager 。 |
2 | Mark methods as transactional. |
|
- Reactive
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { 1
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { 2
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | 在应用程序上下文中注册 ReactiveMongoTransactionManager 。 |
2 | Mark methods as transactional. |
|
Controlling MongoDB-specific Transaction Options
事务服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })
。
默认情况下,使用 mongo:
前缀的标签名称空间由默认配置的 MongoTransactionOptionsResolver
进行评估。事务标签由 TransactionAttribute
提供,开发者可以通过 TransactionTemplate
和 TransactionalOperator
对其进行事务控制。由于其声明式特性,@Transactional(label = …)
提供了一个良好的起点,也可以用作文档。
目前,支持以下选项:
- Max Commit Time
-
Controls the maximum execution time on the server for the commitTransaction operation. The format of the value corresponds with ISO-8601 duration format as used with
Duration.parse(…)
.用法:
mongo:maxCommitTime=PT1S
- Read Concern
-
Sets the read concern for the transaction.
用法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- Read Preference
-
Sets the read preference for the transaction.
用法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- Write Concern
-
Sets the write concern for the transaction.
用法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
加入外部事务的嵌套事务不会影响初始事务选项,因为事务已启动。事务选项仅在启动新事务时才应用。 |
Special behavior inside transactions
在事务中,MongoDB 服务器具有略有不同的行为。
连接设置
MongoDB 驱动程序提供了一个专用的副本集名称配置选项,可以将驱动程序转为自动检测模式。该选项有助于在事务期间识别主副本集节点和命令路由。
确保向 MongoDB URI 添加`replicaSet`。有关详细信息,请参阅 connection string options。 |
集合操作
MongoDB 不支持事务中的集合操作,例如集合创建。这也影响了首次使用时发生的即时集合创建。因此,请确保所有必需的结构都已到位。
瞬态错误
MongoDB 可以为事务操作期间引发的错误添加特殊标签。这些标签可能表明瞬态故障,只需重试操作即可消除。我们强烈建议将 Spring Retry用于这些目的。不过,可以覆盖 `MongoTransactionManager#doCommit(MongoTransactionObject)`以实现 Retry Commit Operation行为,如 MongoDB 参考手册中所述。
计算
MongoDB count
在集合统计数据上进行操作,这些统计数据可能无法反映事务中的实际情况。当在多文档事务中发出 count
命令时,服务器会以 error 50851 响应。一旦 MongoTemplate
检测到活动事务,所有公开的 count()
方法都将被转换并委托给聚合框架,使用 $match
和 $count
运算符,保留 Query
设置,例如 collation
。
在聚合 count 助手内部使用地理命令时,会应用一些限制。以下运算符不可用,必须替换为其他运算符:
-
$where
→$expr
-
$near
→$geoWithin
与$center
-
$nearSphere
→$geoWithin
与$centerSphere
使用 Criteria.near(…)`和 `Criteria.nearSphere(…)`的查询必须分别重写为 `Criteria.within(…)`和 `Criteria.withinSphere(…)
。对于信息库查询方法中的 near`查询关键字,也适用相同的规则,必须更改为 `within
。另请参阅 MongoDB JIRA 问题单 DRIVERS-518以供进一步参考。
以下片段显示了在会话绑定闭包中使用 count
:
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上面的片段具体化为以下命令:
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而不是:
db.collection.find( { state: "active" } ).count()