Tailable Cursors

默认情况下,当客户端用尽游标提供的所有结果时,MongoDB 会自动关闭该游标。在用尽时关闭游标会将流转变成有限流。对于 capped collections,可以在客户端消耗完最初返回的所有数据之后,使用仍保持开放的 Tailable Cursor

By default, MongoDB automatically closes a cursor when the client exhausts all results supplied by the cursor. Closing a cursor on exhaustion turns a stream into a finite stream. For capped collections, you can use a Tailable Cursor that remains open after the client consumed all initially returned data.

可以通过 MongoOperations.createCollection 创建封底集合。为此,提供必需的 CollectionOptions.empty().capped()…​

Capped collections can be created with MongoOperations.createCollection. To do so, provide the required CollectionOptions.empty().capped()…​.

可追踪游标可以使用命令式和反应式 MongoDB API 消耗。强烈建议使用反应式变量,因为它资源占用更少。但是,如果您无法使用响应式 API,您仍然可以使用 Spring 生态系统中已普遍存在的传递概念。

Tailable cursors can be consumed with both, the imperative and the reactive MongoDB API. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still use a messaging concept that is already prevalent in the Spring ecosystem.

Tailable Cursors with MessageListener

使用 Sync Driver 侦听固定集合会创建需要委派到单独组件的长期阻塞任务。在这种情况下,我们需要首先创建一个 MessageListenerContainer,它将是运行特定 SubscriptionRequest 的主要入口点。Spring Data MongoDB 已随附在 MongoTemplate 上运行的默认实现,并且能够为 TailableCursorRequest 创建和运行 Task 实例。

Listening to a capped collection using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component. In this case, we need to first create a MessageListenerContainer, which will be the main entry point for running the specific SubscriptionRequest. Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate and is capable of creating and running Task instances for a TailableCursorRequest.

以下示例演示如何将可追踪游标与 MessageListener 实例一起使用:

The following example shows how to use tailable cursors with MessageListener instances:

Example 1. Tailable Cursors with MessageListener instances
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                  1

MessageListener<Document, User> listener = System.out::println;                     2

TailableCursorRequest request = TailableCursorRequest.builder()
  .collection("orders")                                                             3
  .filter(query(where("value").lt(100)))                                            4
  .publishTo(listener)                                                              5
  .build();

container.register(request, User.class);                                            6

// ...

container.stop();                                                                   7
1 Starting the container intializes the resources and starts Task instances for already registered SubscriptionRequest instances. Requests added after startup are ran immediately.
2 Define the listener called when a Message is received. The Message#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion.
3 Set the collection to listen to.
4 Provide an optional filter for documents to receive.
5 Set the message listener to publish incoming `Message`s to.
6 Register the request. The returned Subscription can be used to check the current Task state and cancel it to free resources.
7 Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running Task instances within the container.

Reactive Tailable Cursors

将可追踪游标与响应式数据类型配合使用允许构建无限流。可追踪游标在外部关闭之前一直保持打开状态。当新文档到达固定集合时,它会发出数据。

Using tailable cursors with a reactive data types allows construction of infinite streams. A tailable cursor remains open until it is closed externally. It emits data as new documents arrive in a capped collection.

如果查询未返回匹配项或游标返回集合“end”处的文档,并且应用程序随后删除该文档,则可追踪游标可能变成无效或死文档。以下示例演示如何创建和使用无限流查询:

Tailable cursors may become dead, or invalid, if either the query returns no match or the cursor returns the document at the “end” of the collection and the application then deletes that document. The following example shows how to create and use an infinite stream query:

Example 2. Infinite Stream queries with ReactiveMongoOperations
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);

Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();

Spring Data MongoDB Reactive 存储库通过使用 @Tailable 批注查询方法来支持无限流。这适用于返回 Flux 和其他能够发出多个元素的反应性类型的函数,如下例所示:

Spring Data MongoDB Reactive repositories support infinite streams by annotating a query method with @Tailable. This works for methods that return Flux and other reactive types capable of emitting multiple elements, as the following example shows:

Example 3. Infinite Stream queries with ReactiveMongoRepository
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {

  @Tailable
  Flux<Person> findByFirstname(String firstname);

}

Flux<Person> stream = repository.findByFirstname("Joe");

Disposable subscription = stream.doOnNext(System.out::println).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();