UDP Adapters

本部分介绍如何配置和使用 UDP 适配器。

This section describes how to configure and use the UDP adapters.

Outbound UDP Adapters (XML Configuration)

以下示例配置 UDP 出站通道适配器:

The following example configures a UDP outbound channel adapter:

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    socket-customizer="udpCustomizer"
    channel="exampleChannel"/>

multicast 设置为 true 时,还应该在 host 属性中提供多播地址。

When setting multicast to true, you should also provide the multicast address in the host attribute.

UDP 是一种高效但不稳定的协议。Spring 集成增加了两个属性以提高可靠性:check-lengthacknowledge。当将 check-length 设置为 true 时,适配器将使用长度字段(以网络字节顺序为四字节)前置消息数据。这使得接收方能够验证接收到的数据包的长度。如果接收系统使用的缓冲区太小以至于无法包含数据包,则该数据包可能会被截断。length 头部提供了一种检测此问题的机制。

UDP is an efficient but unreliable protocol. Spring Integration adds two attributes to improve reliability: check-length and acknowledge. When check-length is set to true, the adapter precedes the message data with a length field (four bytes in network byte order). This enables the receiving side to verify the length of the packet received. If a receiving system uses a buffer that is too short to contain the packet, the packet can be truncated. The length header provides a mechanism to detect this.

从版本 4.3 开始,您可以将 port 设置为 0,在这种情况下,操作系统将选择端口。可以在适配器启动后且 isListening() 返回 true 时,通过调用 getPort() 来发现已选择的端口。

Starting with version 4.3, you can set the port to 0, in which case the operating system chooses the port. The chosen port can be discovered by invoking getPort() after the adapter is started and isListening() returns true.

从版本 5.3.3 开始,您可以添加一个 SocketCustomizer Bean,以在创建 DatagramSocket 之后对其进行修改(例如,调用 setTrafficClass(0x10))。

Starting with version 5.3.3, you can add a SocketCustomizer bean to modify the DatagramSocket after it is created (for example, call setTrafficClass(0x10)).

以下示例展示了一个出站通道适配器,它向数据报数据包中添加长度检查:

The following example shows an outbound channel adapter that adds length checking to the datagram packets:

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    channel="exampleChannel"/>

数据包的接收方还必须配置为预料实际数据前面会带有长度。对于 Spring Integration UDP 入站通道适配器,设置其 check-length 属性。

The recipient of the packet must also be configured to expect a length to precede the actual data. For a Spring Integration UDP inbound channel adapter, set its check-length attribute.

第二项可靠性改进允许使用应用程序级别确认协议。接收方必须在指定时间内向发送方发送一个确认。

The second reliability improvement allows an application-level acknowledgment protocol to be used. The receiver must send an acknowledgment to the sender within a specified time.

以下示例展示了一个出站通道适配器,它向数据报数据包中添加长度检查,并等待确认:

The following example shows an outbound channel adapter that adds length checking to the datagram packets and waits for an acknowledgment:

<int-ip:udp-outbound-channel-adapter id="udpOut"
    host="somehost"
    port="11111"
    multicast="false"
    check-length="true"
    acknowledge="true"
    ack-host="thishost"
    ack-port="22222"
    ack-timeout="10000"
    channel="exampleChannel"/>

acknowledge 设置为 true 意味着数据包的接收方可以解释添加到包含确认数据(主机和端口)的数据包中的标头。接收方很可能是 Spring Integration 入站通道适配器。

Setting acknowledge to true implies that the recipient of the packet can interpret the header added to the packet containing acknowledgment data (host and port). Most likely, the recipient is a Spring Integration inbound channel adapter.

当 multicast 为 true 时,另一个属性(min-acks-for-success)指定了在 ack-timeout 内必须接收多少个确认。

When multicast is true, an additional attribute (min-acks-for-success) specifies how many acknowledgments must be received within the ack-timeout.

从版本 4.3 开始,您可以将 ackPort 设置为 0,在这种情况下,操作系统将选择端口。

Starting with version 4.3, you can set the ackPort to 0, in which case the operating system chooses the port.

Outbound UDP Adapters (Java Configuration)

以下示例展示了如何使用 Java 配置出站 UDP 适配器:

The following example shows how to configure an outbound UDP adapter with Java:

@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
    return new UnicastSendingMessageHandler("localhost", 11111);
}

(或 MulticastSendingChannelAdapter 用于多播)。

(or MulticastSendingChannelAdapter for multicast).

Outbound UDP Adapters (Java DSL Configuration)

以下示例展示了如何使用 Java DSL 配置出站 UDP 适配器:

The following example shows how to configure an outbound UDP adapter with the Java DSL:

@Bean
public IntegrationFlow udpOutFlow() {
    return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
                    .configureSocket(socket -> socket.setTrafficClass(0x10)))
                .get();
}

Inbound UDP Adapters (XML Configuration)

以下示例展示了如何配置一个基本的单播入站 UDP 通道适配器。

The following example shows how to configure a basic unicast inbound udp channel adapter.

<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="false"
    socket-customizer="udpCustomizer"
    check-length="true"/>

以下示例展示了如何配置一个基本的多播入站 UDP 通道适配器:

The following example shows how to configure a basic multicast inbound udp channel adapter:

<int-ip:udp-inbound-channel-adapter id="udpReceiver"
    channel="udpOutChannel"
    port="11111"
    receive-buffer-size="500"
    multicast="true"
    multicast-address="225.6.7.8"
    check-length="true"/>

默认情况下,不针对入站数据包执行反向 DNS 查找:在未配置 DNS(例如,Docker 容器)的环境中,这会导致连接延迟。为了将 IP 地址转换为用于消息标头的主机名,可以通过将 lookup-host 属性设置为 true 来覆盖默认行为。

By default, reverse DNS lookups are not performed on inbound packets: in environments where DNS is not configured (e.g. Docker containers), this can cause connection delays. To convert IP addresses to host names for use in message headers, the default behavior can be overridden by setting the lookup-host attribute to true.

从版本 5.3.3 开始,您可以添加一个 SocketCustomizer bean 来修改在创建 DatagramSocket 之后。它用于接收套接字和任何用于发送 ack 的创建的套接字。

Starting with version 5.3.3, you can add a SocketCustomizer bean to modify the DatagramSocket after it is created. It is called for the receiving socket and any sockets created for sending acks.

Inbound UDP Adapters (Java Configuration)

以下示例展示了如何使用 Java 配置入站 UDP 适配器:

The following example shows how to configure an inbound UDP adapter with Java:

@Bean
public UnicastReceivingChannelAdapter udpIn() {
    UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
    adapter.setOutputChannelName("udpChannel");
    return adapter;
}

以下示例展示了如何使用 Java DSL 配置入站 UDP 适配器:

The following example shows how to configure an inbound UDP adapter with the Java DSL:

Inbound UDP Adapters (Java DSL Configuration)

@Bean
public IntegrationFlow udpIn() {
    return IntegrationFlow.from(Udp.inboundAdapter(11111))
            .channel("udpChannel")
            .get();
}

Server Listening Events

从版本 5.0.2 开始,当入站适配器启动并开始监听时,会发送一个 UdpServerListeningEvent。当适配器配置为在端口 0 上进行监听时,这非常有用,这意味着操作系统选择了该端口。如果您需要在连接到套接字的其他一些进程启动之前进行等待,也可以使用它代替轮询 isListening()

Starting with version 5.0.2, a UdpServerListeningEvent is emitted when an inbound adapter is started and has begun listening. This is useful when the adapter is configured to listen on port 0, meaning that the operating system chooses the port. It can also be used instead of polling isListening(), if you need to wait before starting some other process that will connect to the socket.

Advanced Outbound Configuration

<int-ip:udp-outbound-channel-adapter> (UnicastSendingMessageHandler) 有 destination-expressionsocket-expression 选项。

The <int-ip:udp-outbound-channel-adapter> (UnicastSendingMessageHandler) has destination-expression and socket-expression options.

您可以将 destination-expression 用作硬编码 host-port 对的运行时替代,以根据 requestMessage 确定发往数据报数据包的目标地址(其中评估上下文使用根对象)。表达式必须计算为 URI、URI 样式中的 String(参见 RFC-2396)或 SocketAddress。您还可以对此表达式使用入站 IpHeaders.PACKET_ADDRESS 标头。在此框架中,DatagramPacketMessageMapper 在我们以消息形式接收并转换数据报时填充此标头。标头值恰好是输入数据报的 DatagramPacket.getSocketAddress() 的结果。

You can use the destination-expression as a runtime alternative to the hardcoded host-port pair to determine the destination address for the outgoing datagram packet against a requestMessage (with the root object for the evaluation context). The expression must evaluate to an URI, a String in the URI style (see RFC-2396), or a SocketAddress. You can also use the inbound IpHeaders.PACKET_ADDRESS header for this expression. In the framework, the DatagramPacketMessageMapper populates this header when we receive datagrams in the UnicastReceivingChannelAdapter and convert them to messages. The header value is exactly the result of DatagramPacket.getSocketAddress() of the incoming datagram.

使用 socket-expression,出站通道适配器可以使用(例如)一个入站通道适配器套接字通过其接收数据的相同端口发送数据报。在我们的应用程序充当 UDP 服务器并且客户端在网络地址转换 (NAT) 的后面操作的情况下,这非常有用。该表达式必须评估为 DatagramSocketrequestMessage 用作评估上下文的根对象。您不能将 socket-expression 参数与 multicastacknowledge 参数一起使用。以下示例展示了如何配置一个带有转换器的 UDP 入站通道适配器,该转换器转换为大写并使用套接字:

With the socket-expression, the outbound channel adapter can use (for example) an inbound channel adapter socket to send datagrams through the same port which they were received. It is useful in a scenario where our application works as a UDP server and clients operate behind network address translation (NAT). This expression must evaluate to a DatagramSocket. The requestMessage is used as the root object for the evaluation context. You cannot use the socket-expression parameter with the multicast and acknowledge parameters. The following example shows how to configure a UDP inbound channel adapter with a transformer that converts to upper case and uses a socket:

<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />

<int:channel id="in" />

<int:transformer expression="new String(payload).toUpperCase()"
                       input-channel="in" output-channel="out"/>

<int:channel id="out" />

<int-ip:udp-outbound-channel-adapter id="outbound"
                        socket-expression="@inbound.socket"
                        destination-expression="headers['ip_packetAddress']"
                        channel="out" />

以下示例展示了使用 Java DSL 的等效配置:

The following example shows the equivalent configuration with the Java DSL:

@Bean
public IntegrationFlow udpEchoUpcaseServer() {
    return IntegrationFlow.from(Udp.inboundAdapter(11111).id("udpIn"))
            .<byte[], String>transform(p -> new String(p).toUpperCase())
            .handle(Udp.outboundAdapter("headers['ip_packetAddress']")
                    .socketExpression("@udpIn.socket"))
            .get();
}