Kotlin Support

该框架还得到了改进,以支持用于函数的 Kotlin lambda,因此现在您可以将 Kotlin 语言和 Spring Integration 流定义结合起来使用:

The Framework also has been improved to support Kotlin lambdas for functions, so now you can use a combination of the Kotlin language and Spring Integration flow definitions:

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Kotlin Coroutines

从版本 6.0 开始,Spring Integration 为 Kotlin 协程提供支持。现在,可以在服务方法中使用 suspend 函数和 kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow 返回类型:

Starting with version 6.0, Spring Integration provides support for Kotlin Coroutines. Now suspend functions and kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow return types can be used for service methods:

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")
        }
    }

框架将它们视作 Reactive Streams 交互,并使用 ReactiveAdapterRegistry 转换为相应的 Mono 和 Flux 反应器类型。然后在回复通道中处理此类函数回复(如果是 ReactiveStreamsSubscribableChannel),或者作为相应回调中的 CompletableFuture 的结果进行处理。

The framework treats them as Reactive Streams interactions and uses ReactiveAdapterRegistry to convert to respective Mono and Flux reactor types. Such a function reply is processed then in the reply channel, if it is a ReactiveStreamsSubscribableChannel, or as a result of CompletableFuture in the respective callback.

Flow 结果的函数在 @ServiceActivator 上默认不会 async,因此 Flow 实例作为答复消息有效负载生成。分别处理此对象作为协程或将其转换为 Flux 是目标应用程序的责任。

The functions with Flow result are not async by default on the @ServiceActivator, so Flow instance is produced as a reply message payload. It is the target application’s responsibility to process this object as a coroutine or convert it to Flux, respectively.

当在 Kotlin 中声明时,@MessagingGateway 接口方法也可以标记为 suspend 修饰符。框架在内部使用 Mono 执行请求-回复操作,使用下游流。内部通过 MonoKt.awaitSingleOrNull() API 处理此类 Mono 结果,以满足网关的已调用 suspend 函数的 kotlin.coroutines.Continuation 参数:

The @MessagingGateway interface methods also can be marked with a suspend modifier when declared in Kotlin. The framework utilizes a Mono internally to perform request-reply using the downstream flow. Such a Mono result is processed by the MonoKt.awaitSingleOrNull() API internally to fulfil a kotlin.coroutines.Continuation argument fo the called suspend function of the gateway:

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String

}

根据 Kotlin 语言要求,该方法必须以协程的形式调用:

This method has to be called as a coroutine according to Kotlin language requirements:

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
    }
}