Data Buffers and Codecs

Java NIO 提供 ByteBuffer,但许多库都基于字节缓冲区的 API 构建了自己的 API,尤其是在重用缓冲区和/或使用直接缓冲区对性能有益的网络操作中。例如 Netty 具有 ByteBuf 层次结构,Undertow 使用 XNIO,Jetty 使用带有回调的池化字节缓冲区来释放,依此类推。spring-core 模块提供了一组抽象,用于处理各种字节缓冲区 API,如下所示:

Java NIO provides ByteBuffer but many libraries build their own byte buffer API on top, especially for network operations where reusing buffers and/or using direct buffers is beneficial for performance. For example Netty has the ByteBuf hierarchy, Undertow uses XNIO, Jetty uses pooled byte buffers with a callback to be released, and so on. The spring-core module provides a set of abstractions to work with various byte buffer APIs as follows:

DataBufferFactory

DataBufferFactory 用于以以下两种方式之一创建数据缓冲区:

DataBufferFactory is used to create data buffers in one of two ways:

  1. Allocate a new data buffer, optionally specifying capacity upfront, if known, which is more efficient even though implementations of DataBuffer can grow and shrink on demand.

  2. Wrap an existing byte[] or java.nio.ByteBuffer, which decorates the given data with a DataBuffer implementation and that does not involve allocation.

请注意,WebFlux 应用程序不会直接创建 DataBufferFactory,而是在客户端通过 ServerHttpResponseClientHttpRequest 访问它。工厂的类型取决于底层的客户端或服务器,例如 Reactor Netty 的 NettyDataBufferFactory,其他类型的 DefaultDataBufferFactory

Note that WebFlux applications do not create a DataBufferFactory directly but instead access it through the ServerHttpResponse or the ClientHttpRequest on the client side. The type of factory depends on the underlying client or server, e.g. NettyDataBufferFactory for Reactor Netty, DefaultDataBufferFactory for others.

DataBuffer

DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但也带来了一些额外的好处,其中一些好处受到 Netty ByteBuf 的启发。以下是一部分好处:

The DataBuffer interface offers similar operations as java.nio.ByteBuffer but also brings a few additional benefits some of which are inspired by the Netty ByteBuf. Below is a partial list of benefits:

  • Read and write with independent positions, i.e. not requiring a call to flip() to alternate between read and write.

  • Capacity expanded on demand as with java.lang.StringBuilder.

  • Pooled buffers and reference counting via PooledDataBuffer.

  • View a buffer as java.nio.ByteBuffer, InputStream, or OutputStream.

  • Determine the index, or the last index, for a given byte.

PooledDataBuffer

ByteBuffer 的 Javadoc 中所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能会驻留在 Java 堆之外,这消除了对于本地 IO 操作复制的需要。这使得直接缓冲区对于通过套接字接收和发送数据特别有用,但它们创建和释放起来也更昂贵,这会导致池缓冲区的想法。

As explained in the Javadoc for ByteBuffer, byte buffers can be direct or non-direct. Direct buffers may reside outside the Java heap which eliminates the need for copying for native I/O operations. That makes direct buffers particularly useful for receiving and sending data over a socket, but they’re also more expensive to create and release, which leads to the idea of pooling buffers.

PooledDataBufferDataBuffer 的扩展,它有助于引用计数,这对于字节缓冲区池化至关重要。它如何工作?当分配 PooledDataBuffer 时,引用计数为 1。对 retain() 的调用会增加计数,而对 release() 的调用会减少计数。只要计数大于 0,则保证不会释放缓冲区。当计数减少到 0 时,可以释放池化缓冲区,实际上这意味着为缓冲区保留的内存被返回到内存池。

PooledDataBuffer is an extension of DataBuffer that helps with reference counting which is essential for byte buffer pooling. How does it work? When a PooledDataBuffer is allocated the reference count is at 1. Calls to retain() increment the count, while calls to release() decrement it. As long as the count is above 0, the buffer is guaranteed not to be released. When the count is decreased to 0, the pooled buffer can be released, which in practice could mean the reserved memory for the buffer is returned to the memory pool.

请注意,大部分情况下,最好直接使用 PooledDataBuffer 上的便捷方法,而不是使用 DataBufferUtils 中的便捷方法,只有当它是一个 PooledDataBuffer 实例时才能将释放或保留应用到 DataBuffer 中。

Note that instead of operating on PooledDataBuffer directly, in most cases it’s better to use the convenience methods in DataBufferUtils that apply release or retain to a DataBuffer only if it is an instance of PooledDataBuffer.

DataBufferUtils

DataBufferUtils 提供了许多用于操作数据缓冲区的实用方法:

DataBufferUtils offers a number of utility methods to operate on data buffers:

  • Join a stream of data buffers into a single buffer possibly with zero copy, e.g. via composite buffers, if that’s supported by the underlying byte buffer API.

  • Turn InputStream or NIO Channel into Flux<DataBuffer>, and vice versa a Publisher<DataBuffer> into OutputStream or NIO Channel.

  • Methods to release or retain a DataBuffer if the buffer is an instance of PooledDataBuffer.

  • Skip or take from a stream of bytes until a specific byte count.

Codecs

org.springframework.core.codec 软件包提供以下策略接口:

The org.springframework.core.codec package provides the following strategy interfaces:

  • Encoder to encode Publisher<T> into a stream of data buffers.

  • Decoder to decode Publisher<DataBuffer> into a stream of higher level objects.

spring-core 模块提供以下各项: byte[]ByteBufferDataBufferResourceString 编码器和解码器实现。spring-web 模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protobuf 和其他编码器和解码器。请参阅 WebFlux 部分中的 Codecs

The spring-core module provides byte[], ByteBuffer, DataBuffer, Resource, and String encoder and decoder implementations. The spring-web module adds Jackson JSON, Jackson Smile, JAXB2, Protocol Buffers and other encoders and decoders. See Codecs in the WebFlux section.

Using DataBuffer

在使用数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能已 pooled。我们将使用编解码器来说明其工作原理,但这些概念的适用范围更广。让我们看看编解码器在内部必须执行哪些操作来管理数据缓冲区。

When working with data buffers, special care must be taken to ensure buffers are released since they may be pooled. We’ll use codecs to illustrate how that works but the concepts apply more generally. Let’s see what codecs must do internally to manage data buffers.

在创建更高级别的对象之前,Decoder 是读取输入数据缓冲区的最后一步,因此它必须按照如下方式释放它们:

A Decoder is the last to read input data buffers, before creating higher level objects, and therefore it must release them as follows:

  1. If a Decoder simply reads each input buffer and is ready to release it immediately, it can do so via DataBufferUtils.release(dataBuffer).

  2. If a Decoder is using Flux or Mono operators such as flatMap, reduce, and others that prefetch and cache data items internally, or is using operators such as filter, skip, and others that leave out items, then doOnDiscard(DataBuffer.class, DataBufferUtils::release) must be added to the composition chain to ensure such buffers are released prior to being discarded, possibly also as a result of an error or cancellation signal.

  3. If a Decoder holds on to one or more data buffers in any other way, it must ensure they are released when fully read, or in case of an error or cancellation signals that take place before the cached data buffers have been read and released.

请注意,DataBufferUtils#join 提供了一种安全有效的方式,可以将数据缓冲区流聚合到单个数据缓冲区中。同样,skipUntilByteCounttakeUntilByteCount 是解码器可以使用的其他安全方法。

Note that DataBufferUtils#join offers a safe and efficient way to aggregate a data buffer stream into a single data buffer. Likewise skipUntilByteCount and takeUntilByteCount are additional safe methods for decoders to use.

Encoder 分配其他必须读取(并释放)的数据缓冲区。因此,Encoder 没有太多事情要做。但是,如果在使用数据填充缓冲区时发生序列化错误,则 Encoder 必须小心释放数据缓冲区。例如:

An Encoder allocates data buffers that others must read (and release). So an Encoder doesn’t have much to do. However an Encoder must take care to release a data buffer if a serialization error occurs while populating the buffer with data. For example:

  • Java

  • Kotlin

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
	// serialize and populate buffer..
	release = false;
}
finally {
	if (release) {
		DataBufferUtils.release(buffer);
	}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
	// serialize and populate buffer..
	release = false
} finally {
	if (release) {
		DataBufferUtils.release(buffer)
	}
}
return buffer

Encoder 的使用者负责释放它接收的数据缓冲区。在 WebFlux 应用程序中,Encoder 的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区的责任就是编写服务器响应或客户端请求的代码。

The consumer of an Encoder is responsible for releasing the data buffers it receives. In a WebFlux application, the output of the Encoder is used to write to the HTTP server response, or to the client HTTP request, in which case releasing the data buffers is the responsibility of the code writing to the server response, or to the client request.

请注意,当在 Netty 上运行时,有用于 troubleshooting buffer leaks 的调试选项。

Note that when running on Netty, there are debugging options for troubleshooting buffer leaks.