Tailable Cursors
默认情况下,当客户端用尽游标提供的所有结果时,MongoDB 会自动关闭该游标。在用尽时关闭游标会将流转变成有限流。对于 capped collections,可以在客户端消耗完最初返回的所有数据之后,使用仍保持开放的 Tailable Cursor。
可以通过 |
可追踪游标可以使用命令式和反应式 MongoDB API 消耗。强烈建议使用反应式变量,因为它资源占用更少。但是,如果您无法使用响应式 API,您仍然可以使用 Spring 生态系统中已普遍存在的传递概念。
Tailable Cursors with MessageListener
使用 Sync Driver 侦听固定集合会创建需要委派到单独组件的长期阻塞任务。在这种情况下,我们需要首先创建一个 MessageListenerContainer
,它将是运行特定 SubscriptionRequest
的主要入口点。Spring Data MongoDB 已随附在 MongoTemplate
上运行的默认实现,并且能够为 TailableCursorRequest
创建和运行 Task
实例。
以下示例演示如何将可追踪游标与 MessageListener
实例一起使用:
MessageListener
instancesMessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); 1
MessageListener<Document, User> listener = System.out::println; 2
TailableCursorRequest request = TailableCursorRequest.builder()
.collection("orders") 3
.filter(query(where("value").lt(100))) 4
.publishTo(listener) 5
.build();
container.register(request, User.class); 6
// ...
container.stop(); 7
1 | 启动容器时,将初始化资源,并启动已注册 SubscriptionRequest 实例的 Task 实例。在启动后添加请求会立即运行。 |
2 | 定义在接收到 Message 时调用的侦听器。将 Message#getBody() 转换为请求的域类型。使用 Document 接收未转换的原始结果。 |
3 | 设置要监听的集合。 |
4 | 提供要接收的文档的可选过滤器。 |
5 | 将消息侦听器设置为发布传入 Message 。 |
6 | 注册请求。返回的 Subscription 可用于检查当前 Task 状态并取消它以释放资源。 |
7 | 一旦你确定不再需要它,请不要忘记停止容器。这样做会停止容器中的所有正在运行的 Task 实例。 |
Reactive Tailable Cursors
将可追踪游标与响应式数据类型配合使用允许构建无限流。可追踪游标在外部关闭之前一直保持打开状态。当新文档到达固定集合时,它会发出数据。
如果查询未返回匹配项或游标返回集合“end
”处的文档,并且应用程序随后删除该文档,则可追踪游标可能变成无效或死文档。以下示例演示如何创建和使用无限流查询:
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Spring Data MongoDB Reactive 存储库通过使用 @Tailable
批注查询方法来支持无限流。这适用于返回 Flux
和其他能够发出多个元素的反应性类型的函数,如下例所示:
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
@Tailable
Flux<Person> findByFirstname(String firstname);
}
Flux<Person> stream = repository.findByFirstname("Joe");
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();