Data Buffers and Codecs
Java NIO 提供 ByteBuffer
,但许多库都基于字节缓冲区的 API 构建了自己的 API,尤其是在重用缓冲区和/或使用直接缓冲区对性能有益的网络操作中。例如 Netty 具有 ByteBuf
层次结构,Undertow 使用 XNIO,Jetty 使用带有回调的池化字节缓冲区来释放,依此类推。spring-core
模块提供了一组抽象,用于处理各种字节缓冲区 API,如下所示:
-
DataBufferFactory
抽象数据缓冲区的创建。 -
DataBuffer
表示字节缓冲区,该缓冲区可能是 pooled。 -
DataBufferUtils
提供数据缓冲区的实用方法。 -
Codecs 将数据缓冲区流解码或编码至更高级别的对象。
DataBufferFactory
DataBufferFactory
用于以以下两种方式之一创建数据缓冲区:
-
分配一个新的数据缓冲区,如果已知,可以选择预先指定容量,这样更高效,即使
DataBuffer
的实现可以按需增长和缩小。 -
包装现有的
byte[]
或java.nio.ByteBuffer
,该包装会使用DataBuffer
实现修饰给定数据,且不涉及分配。
请注意,WebFlux 应用程序不会直接创建 DataBufferFactory
,而是在客户端通过 ServerHttpResponse
或 ClientHttpRequest
访问它。工厂的类型取决于底层的客户端或服务器,例如 Reactor Netty 的 NettyDataBufferFactory
,其他类型的 DefaultDataBufferFactory
。
DataBuffer
DataBuffer
接口提供了与 java.nio.ByteBuffer
类似的操作,但也带来了一些额外的好处,其中一些好处受到 Netty ByteBuf
的启发。以下是一部分好处:
-
使用独立位置读取和写入,即不需要调用
flip()
来在读取和写入之间交替进行。 -
使用
java.lang.StringBuilder
按需扩容。 -
通过
PooledDataBuffer
使用池化缓冲区和引用计数。 -
将缓冲区视为
java.nio.ByteBuffer
、InputStream
或OutputStream
。 -
确定给定字节的索引或最后一个索引。
PooledDataBuffer
如 ByteBuffer 的 Javadoc 中所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能会驻留在 Java 堆之外,这消除了对于本地 IO 操作复制的需要。这使得直接缓冲区对于通过套接字接收和发送数据特别有用,但它们创建和释放起来也更昂贵,这会导致池缓冲区的想法。
PooledDataBuffer
是 DataBuffer
的扩展,它有助于引用计数,这对于字节缓冲区池化至关重要。它如何工作?当分配 PooledDataBuffer
时,引用计数为 1。对 retain()
的调用会增加计数,而对 release()
的调用会减少计数。只要计数大于 0,则保证不会释放缓冲区。当计数减少到 0 时,可以释放池化缓冲区,实际上这意味着为缓冲区保留的内存被返回到内存池。
请注意,大部分情况下,最好直接使用 PooledDataBuffer
上的便捷方法,而不是使用 DataBufferUtils
中的便捷方法,只有当它是一个 PooledDataBuffer
实例时才能将释放或保留应用到 DataBuffer
中。
DataBufferUtils
DataBufferUtils
提供了许多用于操作数据缓冲区的实用方法:
-
将数据缓冲区流连接到一个缓冲区中,可能支持零拷贝,例如,通过复合缓冲区(如果底层字节缓冲区 API 支持的话)。
-
将
InputStream
或 NIOChannel
转换为Flux<DataBuffer>
,反之亦然,将Publisher<DataBuffer>
转换为OutputStream
或 NIOChannel
。 -
如果缓冲区是
PooledDataBuffer
的一个实例,释放或保留DataBuffer
的方法。 -
从字节流中断过或跳过,直到达到特定的字节计数。
Codecs
org.springframework.core.codec
软件包提供以下策略接口:
-
Encoder
将Publisher<T>
编码为数据缓冲区流。 -
Decoder
将Publisher<DataBuffer>
解码为更高层次对象流。
spring-core
模块提供以下各项: byte[]
、ByteBuffer
、DataBuffer
、Resource
和 String
编码器和解码器实现。spring-web
模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protobuf 和其他编码器和解码器。请参阅 WebFlux 部分中的 Codecs。
Using DataBuffer
在使用数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能已 pooled。我们将使用编解码器来说明其工作原理,但这些概念的适用范围更广。让我们看看编解码器在内部必须执行哪些操作来管理数据缓冲区。
在创建更高级别的对象之前,Decoder
是读取输入数据缓冲区的最后一步,因此它必须按照如下方式释放它们:
-
如果
Decoder
只需读取每个输入缓冲区,并准备立即释放,则可以通过DataBufferUtils.release(dataBuffer)
来执行该操作。 -
如果
Decoder
正在使用Flux
或Mono
算子,例如flatMap
、reduce
以及内部预取和缓存数据项的其他算子,或者正在使用filter
或skip
等算子,这些算子会将项留出,则必须将doOnDiscard(DataBuffer.class, DataBufferUtils::release)
添加到 composition 链中,以确保在丢弃之前释放此类缓冲区,也有可能作为错误或取消信号的结果。 -
如果
Decoder
以任何其他方式保存一个或多个数据缓冲区,则它必须确保在完全读取时释放它们,或者在缓存数据缓冲区已被读取和释放之前发生的错误或取消信号的情况下释放它们。
请注意,DataBufferUtils#join
提供了一种安全有效的方式,可以将数据缓冲区流聚合到单个数据缓冲区中。同样,skipUntilByteCount
和 takeUntilByteCount
是解码器可以使用的其他安全方法。
Encoder
分配其他必须读取(并释放)的数据缓冲区。因此,Encoder
没有太多事情要做。但是,如果在使用数据填充缓冲区时发生序列化错误,则 Encoder
必须小心释放数据缓冲区。例如:
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder
的使用者负责释放它接收的数据缓冲区。在 WebFlux 应用程序中,Encoder
的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区的责任就是编写服务器响应或客户端请求的代码。
请注意,当在 Netty 上运行时,有用于 troubleshooting buffer leaks 的调试选项。