Coroutines

Kotlin 协程 是 Kotlin 轻量级线程,允许以命令式方式编写非阻塞代码。在语言方面,挂起函数提供了异步操作的抽象,而在库方面https://github.com/Kotlin/kotlinx.coroutines[kotlinx.coroutines] 提供了诸如https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html[async { }]和 Flow之类的函数和类型。 Spring Framework 在以下范围内提供对协程的支持:

  • DeferredFlow 返回值支持 Spring MVC 和 WebFlux 注释的 @Controller

  • Spring MVC 和 WebFlux 注释的 @Controller 中的挂起函数支持

  • WebFlux clientserver 函数式 API 的扩展。

  • WebFlux.fn coRouter { } DSL

  • WebFlux CoWebFilter

  • RSocket 中的挂起函数和 Flow 支持 @MessageMapping 注释的方法

  • Extensions for RSocketRequester

  • Spring AOP

Dependencies

当类路径中存在 kotlinx-coroutines-corekotlinx-coroutines-reactor 依赖项时,将启用协程支持:

build.gradle.kts

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

支持版本 1.4.0 及更高版本。

How Reactive translates to Coroutines?

对于返回值,从 Reactive 到协程 API 的转换如下所示:

  • fun handler(): Mono<Void> 变成 suspend fun handler()

  • fun handler(): Mono<T> 根据 Mono 是否可以为空(以静态类型更强的优点)变成 suspend fun handler(): Tsuspend fun handler(): T?

  • fun handler(): Flux<T> 变成 fun handler(): Flow<T>

对于输入参数:

  • 如果不需要惰性,那么 fun handler(mono: Mono<T>) 就会变成 fun handler(value: T),因为可以调用挂起函数来获取值参数。

  • 如果需要惰性,那么 fun handler(mono: Mono<T>) 就会变成 fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)

Flow 是协程世界中相当于 Flux,适用于热流或冷流、有限流或无限流,具有以下主要区别:

  • Flow 是面向推送的,而 Flux 是面向推送-拉取的混合型

  • 反压通过挂起函数实现

  • Flow 只有一个 单一挂起 collect 方法,并且将算子实现为 扩展

  • 多亏了协程,才让 实现算子变得容易

  • 扩展允许向 Flow 添加自定义算子

  • 收集操作是挂起函数

  • map 运算符 支持异步操作 (无需 flatMap),因为它使用挂起函数参数

请阅读关于 使用 Spring、协程和 Kotlin Flow 实现响应式编程的此博客文章以获取更多详细信息,包括如何通过协程同时运行代码。

Controllers

以下是一个 Coroutines @RestController 示例。

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

也支持使用 @Controller 进行视图渲染。

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn

以下是通过 coRouter { } DSL 和相关处理器定义的协程路由示例。

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

Transactions

Spring Framework 5.2 提供的反应式事务管理的编程变体支持对协程的事务。

针对挂起函数,提供了一个 TransactionalOperator.executeAndAwait 扩展。

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

    suspend fun initDatabase() = operator.executeAndAwait {
        insertPerson1()
        insertPerson2()
    }

    private suspend fun insertPerson1() {
        // INSERT SQL statement
    }

    private suspend fun insertPerson2() {
        // INSERT SQL statement
    }
}

针对 Kotlin Flow,提供了一个 Flow<T>.transactional 扩展。

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

    fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

    private fun findPeople(): Flow<Person> {
        // SELECT SQL statement
    }

    private suspend fun updatePerson(person: Person): Person {
        // UPDATE SQL statement
    }
}