Tailable Cursors

默认情况下,当客户端用尽游标提供的所有结果时,MongoDB 会自动关闭该游标。在用尽时关闭游标会将流转变成有限流。对于 capped collections,可以在客户端消耗完最初返回的所有数据之后,使用仍保持开放的 Tailable Cursor

可以通过 MongoOperations.createCollection 创建封底集合。为此,提供必需的 CollectionOptions.empty().capped()…​

可追踪游标可以使用命令式和反应式 MongoDB API 消耗。强烈建议使用反应式变量,因为它资源占用更少。但是,如果您无法使用响应式 API,您仍然可以使用 Spring 生态系统中已普遍存在的传递概念。

Tailable Cursors with MessageListener

使用 Sync Driver 侦听固定集合会创建需要委派到单独组件的长期阻塞任务。在这种情况下,我们需要首先创建一个 MessageListenerContainer,它将是运行特定 SubscriptionRequest 的主要入口点。Spring Data MongoDB 已随附在 MongoTemplate 上运行的默认实现,并且能够为 TailableCursorRequest 创建和运行 Task 实例。

以下示例演示如何将可追踪游标与 MessageListener 实例一起使用:

Example 1. Tailable Cursors with MessageListener instances
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                  1

MessageListener<Document, User> listener = System.out::println;                     2

TailableCursorRequest request = TailableCursorRequest.builder()
  .collection("orders")                                                             3
  .filter(query(where("value").lt(100)))                                            4
  .publishTo(listener)                                                              5
  .build();

container.register(request, User.class);                                            6

// ...

container.stop();                                                                   7
1 启动容器时,将初始化资源,并启动已注册 SubscriptionRequest 实例的 Task 实例。在启动后添加请求会立即运行。
2 定义在接收到 Message 时调用的侦听器。将 Message#getBody() 转换为请求的域类型。使用 Document 接收未转换的原始结果。
3 设置要监听的集合。
4 提供要接收的文档的可选过滤器。
5 将消息侦听器设置为发布传入 Message
6 注册请求。返回的 Subscription 可用于检查当前 Task 状态并取消它以释放资源。
7 一旦你确定不再需要它,请不要忘记停止容器。这样做会停止容器中的所有正在运行的 Task 实例。

Reactive Tailable Cursors

将可追踪游标与响应式数据类型配合使用允许构建无限流。可追踪游标在外部关闭之前一直保持打开状态。当新文档到达固定集合时,它会发出数据。

如果查询未返回匹配项或游标返回集合“end”处的文档,并且应用程序随后删除该文档,则可追踪游标可能变成无效或死文档。以下示例演示如何创建和使用无限流查询:

Example 2. Infinite Stream queries with ReactiveMongoOperations
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);

Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();

Spring Data MongoDB Reactive 存储库通过使用 @Tailable 批注查询方法来支持无限流。这适用于返回 Flux 和其他能够发出多个元素的反应性类型的函数,如下例所示:

Example 3. Infinite Stream queries with ReactiveMongoRepository
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {

  @Tailable
  Flux<Person> findByFirstname(String firstname);

}

Flux<Person> stream = repository.findByFirstname("Joe");

Disposable subscription = stream.doOnNext(System.out::println).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();