Kotlin Support
该框架还得到了改进,以支持用于函数的 Kotlin lambda,因此现在您可以将 Kotlin 语言和 Spring Integration 流定义结合起来使用:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin Coroutines
从版本 6.0 开始,Spring Integration 为 Kotlin 协程提供支持。现在,可以在服务方法中使用 suspend
函数和 kotlinx.coroutines.Deferred
& kotlinx.coroutines.flow.Flow
返回类型:
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
框架将它们视作 Reactive Streams 交互,并使用 ReactiveAdapterRegistry
转换为相应的 Mono 和 Flux 反应器类型。然后在回复通道中处理此类函数回复(如果是 ReactiveStreamsSubscribableChannel
),或者作为相应回调中的 CompletableFuture
的结果进行处理。
|
当在 Kotlin 中声明时,@MessagingGateway
接口方法也可以标记为 suspend
修饰符。框架在内部使用 Mono 执行请求-回复操作,使用下游流。内部通过 MonoKt.awaitSingleOrNull()
API 处理此类 Mono 结果,以满足网关的已调用 suspend
函数的 kotlin.coroutines.Continuation
参数:
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根据 Kotlin 语言要求,该方法必须以协程的形式调用:
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}