Asynchronous Requests

  • DeferredResult 或 Callable,用于单个异步返回值。

  • ResponseBodyEmitter、SseEmitter 或 StreamingResponseBody,用于流式传输多个返回值。

  • 响应式类型,例如 Mono、Flux 或 Single,用于支持与响应式客户端库集成。

Spring MVC 的异步请求处理与 Spring WebFlux 不同,它依赖于 Servlet API 的异步功能,而 Spring WebFlux 是异步设计的。Spring MVC 不支持控制器方法参数中的异步类型,而 Spring WebFlux 则支持。

Spring MVC 与 Servlet 的异步请求 processing 进行广泛集成:

关于这与 Spring WebFlux 有何不同,请参阅下面的 Async Spring MVC compared to WebFlux 部分。

DeferredResult

一旦在 Servlet 容器中 enabled了异步请求处理功能,控制器方法就可以使用 DeferredResult 包装任何受支持的控制器方法返回值,如下例所示:

  • Java

  • Kotlin

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
	DeferredResult<String> deferredResult = new DeferredResult<>();
	// Save the deferredResult somewhere..
	return deferredResult;
}

// From some other thread...
deferredResult.setResult(result);
@GetMapping("/quotes")
@ResponseBody
fun quotes(): DeferredResult<String> {
	val deferredResult = DeferredResult<String>()
	// Save the deferredResult somewhere..
	return deferredResult
}

// From some other thread...
deferredResult.setResult(result)

控制器可以从不同的线程异步生成返回值 - 例如,响应外部事件(JMS 消息)、预定任务或其他事件。

Callable

控制器可以用 java.util.concurrent.Callable 包装任何受支持的返回值,如下示例所示:

  • Java

  • Kotlin

@PostMapping
public Callable<String> processUpload(final MultipartFile file) {
	return () -> "someView";
}
@PostMapping
fun processUpload(file: MultipartFile) = Callable<String> {
	// ...
	"someView"
}

然后可以通过运行 configured AsyncTaskExecutor 中给定的任务来获得返回值。

Processing

以下是对 Servlet 异步请求处理的非常简洁的概述:

  • `ServletRequest`可通过调用 `request.startAsync()`来置于异步模式。执行此操作的主要效果是 Servlet(以及任何过滤器)可以退出,但响应保持开放状态以让处理稍后完成。

  • request.startAsync()`的调用返回 `AsyncContext,可用于进一步控制异步处理。例如,它提供 `dispatch`方法,该方法类似于 Servlet API 的转发,不同之处在于它允许应用程序在 Servlet 容器线程上恢复请求处理。

  • `ServletRequest`提供对当前 `DispatcherType`的访问,可用于区分处理初始请求、异步调度、转发和其他调度器类型。

DeferredResult 处理按如下方式进行:

  • 控制器返回一个 `DeferredResult`并将其保存到可以在其中访问它的某个内存队列或列表中。

  • Spring MVC calls request.startAsync().

  • 同时,`DispatcherServlet`和所有配置的过滤器退出请求处理线程,但响应保持为开放状态。

  • 应用程序从某个线程设置 DeferredResult,Spring MVC 将请求调度回 Servlet 容器。

  • `DispatcherServlet`再次调用,并使用异步生成的返回值恢复处理。

Callable 处理按如下方式进行:

  • 控制器返回一个 Callable

  • Spring MVC 调用 `request.startAsync()`并将 `Callable`提交给 `AsyncTaskExecutor`以供在单独线程中处理。

  • 同时,`DispatcherServlet`和所有过滤器退出 Servlet 容器线程,但响应保持为开放状态。

  • 最终,`Callable`生成结果,Spring MVC 将请求调度回 Servlet 容器以完成处理。

  • `DispatcherServlet`再次调用,并使用 `Callable`的异步生成的返回值恢复处理。

为了进一步了解背景和上下文,你还可以阅读介绍 Spring MVC 3.2 中异步请求处理支持的 博客文章

Exception Handling

当您使用 DeferredResult 时,您可以选择使用异常调用 setResultsetErrorResult。在这两种情况下,Spring MVC 将请求分派回 Servlet 容器以完成处理。然后,它被视为控制器方法已返回给定值或好像它生成了给定异常。然后,异常会通过常规异常处理机制(例如,调用 @ExceptionHandler 方法)。

当您使用 Callable 时,会发生类似的处理逻辑,主要区别在于结果是从 Callable 返回的,或者由它引发异常。

Interception

HandlerInterceptor 实例可以是 AsyncHandlerInterceptor 类型,以在启动异步处理的初始请求上接收 afterConcurrentHandlingStarted 回调(而不是 postHandleafterCompletion)。

HandlerInterceptor 实现还可以注册 CallableProcessingInterceptorDeferredResultProcessingInterceptor,以更深入地集成到异步请求的生命周期(例如,处理超时事件)。查看 AsyncHandlerInterceptor,了解更多详情。

DeferredResult 提供了 onTimeout(Runnable)onCompletion(Runnable) 回调。查看 DeferredResultJavaDoc,了解更多详情。Callable 可以替换 WebAsyncTask,它公开了用于超时和完成回调的其他方法。

Async Spring MVC compared to WebFlux

Servlet API 最初是为通过 Filter-Servlet 链进行单次传递而构建的。异步请求处理允许应用程序退出 Filter-Servlet 链,但使响应保持打开状态以供进一步处理。Spring MVC 异步支持围绕该机制构建。当控制器返回 DeferredResult 时,Filter-Servlet 链退出,并且 Servlet 容器线程被释放。稍后,当设置 DeferredResult 时,会执行 ASYNC 分派(到同一个 URL),在过程中控制器再次映射,但不会调用它,而是使用 DeferredResult 值(好像控制器返回了它)恢复处理。

相比之下,Spring WebFlux 既不是构建在 Servlet API 之上的,也不需要这样的异步请求处理特性,因为它在设计上就是异步的。异步处理被构建到所有框架约定中,并在请求处理的所有阶段得到本征支持。

从编程模型的角度来看,Spring MVC 和 Spring WebFlux 均支持在控制器方法中作为返回值的异步和 Reactive Types。Spring MVC 甚至支持流式处理,包括反应式反压。但是,与依赖于非阻塞 I/O 且不需要为每次写入开辟额外线程的 WebFlux 不同,对响应进行单独写入仍会阻塞(并会在单独的线程上执行)。

另一个根本区别是,Spring MVC 也不支持控制器方法参数中的异步或反应类型(例如,@RequestBody@RequestPart 等),也没有任何显式支持作为模型属性的异步和反应类型。Spring WebFlux 支持所有这些。

最后,从配置角度来看,必须 enabled at the Servlet container level 异步请求处理功能。

HTTP Streaming

对于单个异步返回值,可以使用 DeferredResultCallable。如果你想产生多个异步值,并且让它们被写入响应,该怎么做?此部分将说明如何进行此操作。

Objects

您可以使用 ResponseBodyEmitter 返回值产生一个对象流,其中每个对象用 xref:integration/rest-clients.adoc#rest-message-conversion[HttpMessageConverter 序列化,并写入响应,如下例所示:

  • Java

  • Kotlin

@GetMapping("/events")
public ResponseBodyEmitter handle() {
	ResponseBodyEmitter emitter = new ResponseBodyEmitter();
	// Save the emitter somewhere..
	return emitter;
}

// In some other thread
emitter.send("Hello once");

// and again later on
emitter.send("Hello again");

// and done at some point
emitter.complete();
@GetMapping("/events")
fun handle() = ResponseBodyEmitter().apply {
	// Save the emitter somewhere..
}

// In some other thread
emitter.send("Hello once")

// and again later on
emitter.send("Hello again")

// and done at some point
emitter.complete()

你还可以将 ResponseBodyEmitter 用作 ResponseEntity 中的主体,这使你能够自定义响应状态和标头。

emitter 引发 IOException(例如,如果远程客户端消失),应用程序将不会负责清理连接,并且不应调用 emitter.completeemitter.completeWithError。相反,servlet 容器会自动启动 AsyncListener 错误通知,其中 Spring MVC 会调用 completeWithError。然后,此调用会执行到应用程序的最后一次 ASYNC 派发,在此期间,Spring MVC 会调用配置的异常解析器并完成请求。

SSE

SseEmitterResponseBodyEmitter 的一个子类)提供对 Server-Sent Events 的支持,在此服务器发送的事件根据 W3C SSE 规范进行格式化。要从一个控制器产生一个 SSE 流,返回 SseEmitter,如下例所示:

  • Java

  • Kotlin

@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle() {
	SseEmitter emitter = new SseEmitter();
	// Save the emitter somewhere..
	return emitter;
}

// In some other thread
emitter.send("Hello once");

// and again later on
emitter.send("Hello again");

// and done at some point
emitter.complete();
@GetMapping("/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun handle() = SseEmitter().apply {
	// Save the emitter somewhere..
}

// In some other thread
emitter.send("Hello once")

// and again later on
emitter.send("Hello again")

// and done at some point
emitter.complete()

虽然 SSE 是用于在浏览器中进行流式传输的主要选项,但请注意 Internet Explorer 不支持服务器发送的事件。考虑与 SockJS fallback 传输(包括 SSE)结合使用 Spring 的 WebSocket messaging,这些传输针对范围广泛的浏览器。

另请参阅 previous section 以获取有关异常处理的注意事项。

Raw Data

有时,绕过消息转换并将流直接传输到响应 OutputStream(例如,用于文件下载)非常有用。你可以使用 StreamingResponseBody 返回值类型来执行此操作,如下例所示:

  • Java

  • Kotlin

@GetMapping("/download")
public StreamingResponseBody handle() {
	return new StreamingResponseBody() {
		@Override
		public void writeTo(OutputStream outputStream) throws IOException {
			// write...
		}
	};
}
@GetMapping("/download")
fun handle() = StreamingResponseBody {
	// write...
}

可以将 StreamingResponseBody 用作 ResponseEntity 中的主体,以自定义响应的状态和标头。

Reactive Types

Spring MVC 支持在控制器中使用反应式客户端库(在 WebFlux 部分中也已读过 Reactive Libraries)。其中包括来自 spring-webflux 的 `WebClient`以及其他库,例如 Spring 数据反应式数据仓库。在这样的场景中,能够从控制器方法返回反应式类型非常方便。

响应式返回值的处理如下:

  • 类似于使用 DeferredResult,单值承诺得到调整。示例包括 Mono(Reactor)或 Single(RxJava)。

  • 类似于使用 ResponseBodyEmitter`或 `SseEmitter,多值流与流媒体类型(如 application/x-ndjson`或 `text/event-stream)相适应。示例包括 Flux(Reactor)或 Observable(RxJava)。应用程序还可以返回 Flux&lt;ServerSentEvent&gt;`或 `Observable&lt;ServerSentEvent&gt;

  • 类似于使用 DeferredResult&lt;List&lt;?&gt;&gt;,多值流与任何其他媒体类型(如 application/json)相适应。

Spring MVC 通过来自 spring-core 的 [|ReactiveAdapterRegistry|](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/ReactiveAdapterRegistry.html) 来支持 Reactor 和 RxJava,它允许它从多个响应式库进行适配。

对于流式处理响应,支持反应式反压,但对响应的写入仍然会阻塞,并通过 configured AsyncTaskExecutor 在单独的线程上运行,以避免阻塞上游源,例如来自 WebClientFlux

Context Propagation

通过 java.lang.ThreadLocal 传播上下文很常见。这对于在同一线程上进行处理是透明的,但对于跨多个线程的异步处理则需要额外的工作。Micrometer Context Propagation 库简化了跨线程以及跨上下文机制(例如 ThreadLocal 值)的上下文传播,Reactor context、GraphQL Java context 等等。

如果类路径上存在 Micrometer 上下文传播,当控制器方法返回 reactive type 例如 FluxMono 时,所有 ThreadLocal 值(已为其注册 io.micrometer.ThreadLocalAccessor)都会使用由 ThreadLocalAccessor 分配的键作为键值对写入到 Reactor Context

对于其他异步处理场景,可以直接使用 Context Propagation 库。例如:

Java
// Capture ThreadLocal values from the main thread ...
ContextSnapshot snapshot = ContextSnapshot.captureAll();

// On a different thread: restore ThreadLocal values
try (ContextSnapshot.Scope scope = snapshot.setThreadLocals()) {
	// ...
}

以下 ThreadLocalAccessor 实现是直接提供的:

  • LocaleContextThreadLocalAccessor — propagates LocaleContext via LocaleContextHolder

  • RequestAttributesThreadLocalAccessor — propagates RequestAttributes via RequestContextHolder

以上内容不会自动注册。你需要在启动时通过 ContextRegistry.getInstance() 注册它们。

有关更多详细信息,请参阅 Micrometer 上下文传播库的 documentation

Disconnects

Servlet API 不会在远程客户端消失时提供任何通知。因此,在通过 SseEmitterreactive types对响应进行流式处理时,定期发送数据非常重要,因为如果客户端已断开连接,则写入操作将失败。发送可以采用空 (仅注释) SSE 事件或任何其他数据形式,接收方必将将其解释为心跳并予以忽略。

或者,考虑使用具有内置心跳机制的网络消息传递解决方案(例如 STOMP over WebSocket 或与 SockJS 结合使用的 WebSocket)。

Configuration

必须在 Servlet 容器级别启用异步请求处理功能。MVC 配置也为异步请求提供了多个选项。

Servlet Container

Filter 和 Servlet 声明具有 asyncSupported 标志,而该标志需要设为 true 以启用异步请求处理。此外,应声明 Filter 映射以处理 ASYNC jakarta.servlet.DispatchType

在 Java 配置中,使用 AbstractAnnotationConfigDispatcherServletInitializer 初始化 Servlet 容器时,将会自动完成此操作。

web.xml 配置中,可以向 DispatcherServletFilter 声明中添加 <async-supported>true</async-supported>,并向过滤器映射中添加 <dispatcher>ASYNC</dispatcher>

Spring MVC

MVC 配置为异步请求处理提供了以下选项:

  • Java 配置:在 `WebMvcConfigurer`上使用 `configureAsyncSupport`回调。

  • XML 命名空间:在 `&lt;mvc:annotation-driven&gt;`下使用 `&lt;async-support&gt;`元素。

可以配置以下内容:

  • 除非显式设置,否则异步请求的默认超时值取决于底层 Servlet 容器。

  • `AsyncTaskExecutor`供在使用 Reactive Types流式处理和执行控制器方法返回的 `Callable`实例时用于阻塞写入。默认使用的在负载下不适合生产环境。

  • `DeferredResultProcessingInterceptor`实现和 `CallableProcessingInterceptor`实现。

请注意,还可以在 DeferredResultResponseBodyEmitterSseEmitter 上设置默认超时值。对于 Callable,可以使用 WebAsyncTask 提供超时值。