Change Streams
-
Change Streams 允许在不追踪 oplog 的情况下获取更改通知。
-
Change Streams 仅适用于副本集或分片集群。
-
使用命令式和响应式 MongoDB Java 驱动程序以及 Spring 生态系统中的消息传递概念都可以使用 Change Streams。
-
可以监听集合或数据库级别的更改,并使用恢复令牌或服务器时间来恢复流。
从 MongoDB 3.6 开始, Change Streams 允许应用程序在无需跟踪 oplog 的情况下获悉更改。
变更流支持仅适用于副本集或分片集群。 |
可以使用命令式和响应式 MongoDB Java 驱动程序来使用更改流。强烈建议使用响应式变体,因为它消耗的资源较少。但是,如果您无法使用响应式 API,则仍然可以通过使用 Spring 生态系统中已经普遍存在的消息传递概念来获取更改事件。
可以在集合和数据库级别上进行监听,而数据库级别变体会发布数据库内所有集合中的更改。在订阅数据库更改流时,请务必对事件类型使用合适的类型,因为在不同实体类型之间可能无法正确地应用转换。如果您有疑问,请使用 Document
。
Change Streams with MessageListener
监听 Change Stream by using a Sync Driver 会创建一个需要委派给单独组件的长时间运行阻塞任务。在这种情况下,我们需要首先创建一个 MessageListenerContainer
,它将成为运行特定 SubscriptionRequest
任务的主要入口点。Spring Data MongoDB 已经附带一个默认的 MongoTemplate
实现,该实现可以在 MongoTemplate
上操作,并且能够为 ChangeStreamRequest
创建和运行 Task
实例。
以下示例显示了如何将更改流与 MessageListener
实例结合使用:
MessageListener
instancesMessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); 1
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; 2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); 3
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); 4
// ...
container.stop(); 5
1 | 启动容器将初始化资源,并为已注册的 SubscriptionRequest 实例启动 Task 实例。启动后添加的请求会立即运行。 |
2 | 定义在接收到 Message 时调用的侦听器。将 Message#getBody() 转换为请求的域类型。使用 Document 接收未转换的原始结果。 |
3 | 设置要侦听的集合,并通过 ChangeStreamOptions 提供其他选项。 |
4 | 注册请求。返回的 Subscription 可用于检查当前 Task 状态并取消它以释放资源。 |
5 | 一旦你确定不再需要它,请不要忘记停止容器。这样做会停止容器中的所有正在运行的 Task 实例。 |
在处理过程中产生的错误会传递给 |
Reactive Change Streams
使用响应式 API 订阅变更流是处理流的更为自然的方法。不过,基本构建块(如 ChangeStreamOptions
)保持不变。以下示例展示了如何使用发出 ChangeStreamEvent
的变更流:
ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) 1
.watchCollection("people")
.filter(where("age").gte(38)) 2
.listen(); 3
1 | 底层文档应转换为的目标事件类型。将其留空表示接收未转换的原始结果。 |
2 | 使用聚合管道或只使用 Criteria 查询来筛选事件。 |
3 | 获取一个 Flux 更改流事件。ChangeStreamEvent#getBody() 转换为 (2) 中请求的域类型。 |
Resuming Change Streams
变更流可以恢复,并从你离开的地方继续发出事件。要恢复流,你需要提供恢复令牌或上次已知的服务器时间(以 UTC 为单位)。使用 ChangeStreamOptions
适当地设置值。
以下示例展示了如何使用服务器时间设置恢复偏移量:
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) 1
.listen();
1 | 你可以通过 getTimestamp 方法获取 ChangeStreamEvent 的服务器时间,也可以使用 getResumeToken 中公开的 resumeToken 。 |
在某些情况下, |