Coroutines
Kotlin 协程 是 Kotlin 轻量级线程,允许以命令式方式编写非阻塞代码。在语言方面,挂起函数提供了异步操作的抽象,而在库方面https://github.com/Kotlin/kotlinx.coroutines[kotlinx.coroutines] 提供了诸如https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html[async { }
]和 Flow
之类的函数和类型。
Spring Framework 在以下范围内提供对协程的支持:
-
Spring MVC 和 WebFlux 注释的
@Controller
中的挂起函数支持 -
WebFlux.fn coRouter { } DSL
-
WebFlux
CoWebFilter
-
RSocket 中的挂起函数和
Flow
支持@MessageMapping
注释的方法 -
Extensions for
RSocketRequester
-
Spring AOP
Dependencies
当类路径中存在 kotlinx-coroutines-core
和 kotlinx-coroutines-reactor
依赖项时,将启用协程支持:
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
支持版本 1.4.0
及更高版本。
How Reactive translates to Coroutines?
对于返回值,从 Reactive 到协程 API 的转换如下所示:
-
fun handler(): Mono<Void>
变成suspend fun handler()
-
fun handler(): Mono<T>
根据Mono
是否可以为空(以静态类型更强的优点)变成suspend fun handler(): T
或suspend fun handler(): T?
-
fun handler(): Flux<T>
变成fun handler(): Flow<T>
对于输入参数:
-
如果不需要惰性,那么
fun handler(mono: Mono<T>)
就会变成fun handler(value: T)
,因为可以调用挂起函数来获取值参数。 -
如果需要惰性,那么
fun handler(mono: Mono<T>)
就会变成fun handler(supplier: suspend () → T)
或fun handler(supplier: suspend () → T?)
Flow
是协程世界中相当于 Flux
,适用于热流或冷流、有限流或无限流,具有以下主要区别:
-
Flow
是面向推送的,而Flux
是面向推送-拉取的混合型 -
反压通过挂起函数实现
-
Flow
只有一个 单一挂起collect
方法,并且将算子实现为 扩展 -
多亏了协程,才让 实现算子变得容易
-
扩展允许向
Flow
添加自定义算子 -
收集操作是挂起函数
-
map
运算符 支持异步操作 (无需flatMap
),因为它使用挂起函数参数
请阅读关于 使用 Spring、协程和 Kotlin Flow 实现响应式编程的此博客文章以获取更多详细信息,包括如何通过协程同时运行代码。
Controllers
以下是一个 Coroutines @RestController
示例。
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
也支持使用 @Controller
进行视图渲染。
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn
以下是通过 coRouter { } DSL 和相关处理器定义的协程路由示例。
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
Transactions
Spring Framework 5.2 提供的反应式事务管理的编程变体支持对协程的事务。
针对挂起函数,提供了一个 TransactionalOperator.executeAndAwait
扩展。
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
针对 Kotlin Flow
,提供了一个 Flow<T>.transactional
扩展。
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}