Change Streams
-
Change Streams 允许在不追踪 oplog 的情况下获取更改通知。
-
Change Streams 仅适用于副本集或分片集群。
-
使用命令式和响应式 MongoDB Java 驱动程序以及 Spring 生态系统中的消息传递概念都可以使用 Change Streams。
-
可以监听集合或数据库级别的更改,并使用恢复令牌或服务器时间来恢复流。
从 MongoDB 3.6 开始, Change Streams 允许应用程序在无需跟踪 oplog 的情况下获悉更改。
As of MongoDB 3.6, Change Streams let applications get notified about changes without having to tail the oplog.
变更流支持仅适用于副本集或分片集群。 |
Change Stream support is only possible for replica sets or for a sharded cluster. |
可以使用命令式和响应式 MongoDB Java 驱动程序来使用更改流。强烈建议使用响应式变体,因为它消耗的资源较少。但是,如果您无法使用响应式 API,则仍然可以通过使用 Spring 生态系统中已经普遍存在的消息传递概念来获取更改事件。
Change Streams can be consumed with both, the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still obtain change events by using the messaging concept that is already prevalent in the Spring ecosystem.
可以在集合和数据库级别上进行监听,而数据库级别变体会发布数据库内所有集合中的更改。在订阅数据库更改流时,请务必对事件类型使用合适的类型,因为在不同实体类型之间可能无法正确地应用转换。如果您有疑问,请使用 Document
。
It is possible to watch both on a collection as well as database level, whereas the database level variant publishes
changes from all collections within the database. When subscribing to a database change stream, make sure to use a
suitable type for the event type as conversion might not apply correctly across different entity types.
In doubt, use Document
.
Change Streams with MessageListener
监听 Change Stream by using a Sync Driver 会创建一个需要委派给单独组件的长时间运行阻塞任务。在这种情况下,我们需要首先创建一个 MessageListenerContainer
,它将成为运行特定 SubscriptionRequest
任务的主要入口点。Spring Data MongoDB 已经附带一个默认的 MongoTemplate
实现,该实现可以在 MongoTemplate
上操作,并且能够为 ChangeStreamRequest
创建和运行 Task
实例。
Listening to a Change Stream by using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component.
In this case, we need to first create a MessageListenerContainer
, which will be the main entry point for running the specific SubscriptionRequest
tasks.
Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate
and is capable of creating and running Task
instances for a ChangeStreamRequest
.
以下示例显示了如何将更改流与 MessageListener
实例结合使用:
The following example shows how to use Change Streams with MessageListener
instances:
MessageListener
instancesMessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); 1
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; 2
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); 3
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); 4
// ...
container.stop(); 5
1 | Starting the container initializes the resources and starts Task instances for already registered SubscriptionRequest instances. Requests added after startup are ran immediately. |
2 | Define the listener called when a Message is received. The Message#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion. |
3 | Set the collection to listen to and provide additional options through ChangeStreamOptions . |
4 | Register the request. The returned Subscription can be used to check the current Task state and cancel it to free resources. |
5 | Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running Task instances within the container. |
在处理过程中产生的错误会传递给 Errors while processing are passed on to an |
Reactive Change Streams
使用响应式 API 订阅变更流是处理流的更为自然的方法。不过,基本构建块(如 ChangeStreamOptions
)保持不变。以下示例展示了如何使用发出 ChangeStreamEvent
的变更流:
Subscribing to Change Streams with the reactive API is a more natural approach to work with streams. Still, the essential building blocks, such as ChangeStreamOptions
, remain the same. The following example shows how to use Change Streams emitting `ChangeStreamEvent`s:
ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) 1
.watchCollection("people")
.filter(where("age").gte(38)) 2
.listen(); 3
1 | The event target type the underlying document should be converted to. Leave this out to receive raw results without conversion. |
2 | Use an aggregation pipeline or just a query Criteria to filter events. |
3 | Obtain a Flux of change stream events. The ChangeStreamEvent#getBody() is converted to the requested domain type from (2). |
Resuming Change Streams
变更流可以恢复,并从你离开的地方继续发出事件。要恢复流,你需要提供恢复令牌或上次已知的服务器时间(以 UTC 为单位)。使用 ChangeStreamOptions
适当地设置值。
Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume
token or the last known server time (in UTC). Use ChangeStreamOptions
to set the value accordingly.
以下示例展示了如何使用服务器时间设置恢复偏移量:
The following example shows how to set the resume offset using server time:
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) 1
.listen();
1 | You may obtain the server time of an ChangeStreamEvent through the getTimestamp method or use the resumeToken
exposed through getResumeToken . |
在某些情况下, |
In some cases an |