Change Streams

  • Change Streams 允许在不追踪 oplog 的情况下获取更改通知。

  • Change Streams 仅适用于副本集或分片集群。

  • 使用命令式和响应式 MongoDB Java 驱动程序以及 Spring 生态系统中的消息传递概念都可以使用 Change Streams。

  • 可以监听集合或数据库级别的更改,并使用恢复令牌或服务器时间来恢复流。

从 MongoDB 3.6 开始, Change Streams 允许应用程序在无需跟踪 oplog 的情况下获悉更改。

变更流支持仅适用于副本集或分片集群。

可以使用命令式和响应式 MongoDB Java 驱动程序来使用更改流。强烈建议使用响应式变体,因为它消耗的资源较少。但是,如果您无法使用响应式 API,则仍然可以通过使用 Spring 生态系统中已经普遍存在的消息传递概念来获取更改事件。 可以在集合和数据库级别上进行监听,而数据库级别变体会发布数据库内所有集合中的更改。在订阅数据库更改流时,请务必对事件类型使用合适的类型,因为在不同实体类型之间可能无法正确地应用转换。如果您有疑问,请使用 Document

Change Streams with MessageListener

监听 Change Stream by using a Sync Driver 会创建一个需要委派给单独组件的长时间运行阻塞任务。在这种情况下,我们需要首先创建一个 MessageListenerContainer,它将成为运行特定 SubscriptionRequest 任务的主要入口点。Spring Data MongoDB 已经附带一个默认的 MongoTemplate 实现,该实现可以在 MongoTemplate 上操作,并且能够为 ChangeStreamRequest 创建和运行 Task 实例。

以下示例显示了如何将更改流与 MessageListener 实例结合使用:

Example 1. Change Streams with MessageListener instances
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              1

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); 3

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       4

// ...

container.stop();                                                                                               5
1 启动容器将初始化资源,并为已注册的 SubscriptionRequest 实例启动 Task 实例。启动后添加的请求会立即运行。
2 定义在接收到 Message 时调用的侦听器。将 Message#getBody() 转换为请求的域类型。使用 Document 接收未转换的原始结果。
3 设置要侦听的集合,并通过 ChangeStreamOptions 提供其他选项。
4 注册请求。返回的 Subscription 可用于检查当前 Task 状态并取消它以释放资源。
5 一旦你确定不再需要它,请不要忘记停止容器。这样做会停止容器中的所有正在运行的 Task 实例。

在处理过程中产生的错误会传递给 org.springframework.util.ErrorHandler。如果没有另行声明,默认情况下会应用 ErrorHandler 来附加日志。请使用 register(request, body, errorHandler) 来提供其他功能。

Reactive Change Streams

使用响应式 API 订阅变更流是处理流的更为自然的方法。不过,基本构建块(如 ChangeStreamOptions)保持不变。以下示例展示了如何使用发出 ChangeStreamEvent 的变更流:

Example 2. Change Streams emitting ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) 1
    .watchCollection("people")
    .filter(where("age").gte(38))                                              2
    .listen();                                                                 3
1 底层文档应转换为的目标事件类型。将其留空表示接收未转换的原始结果。
2 使用聚合管道或只使用 Criteria 查询来筛选事件。
3 获取一个 Flux 更改流事件。ChangeStreamEvent#getBody() 转换为 (2) 中请求的域类型。

Resuming Change Streams

变更流可以恢复,并从你离开的地方继续发出事件。要恢复流,你需要提供恢复令牌或上次已知的服务器时间(以 UTC 为单位)。使用 ChangeStreamOptions 适当地设置值。

以下示例展示了如何使用服务器时间设置恢复偏移量:

Example 3. Resume a Change Stream
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) 1
    .listen();
1 你可以通过 getTimestamp 方法获取 ChangeStreamEvent 的服务器时间,也可以使用 getResumeToken 中公开的 resumeToken

在某些情况下,Instant 在恢复更改流时可能不是足够精细的度量。为此目的,使用 MongoDB 原生 BsonTimestamp