ZeroMQ Support
Spring Integration 提供组件以支持应用程序中的 ZeroMQ 通信。该实现基于 JeroMQ 库的良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并对它们内部管理线程,从而使与这些组件的交互无锁和线程安全。
Spring Integration provides components to support ZeroMQ communication in the application. The implementation is based on the well-supported Java API of the JeroMQ library. All components encapsulate ZeroMQ socket lifecycles and manage threads for them internally making interactions with these components lock-free and thread-safe.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:{project-version}"
ZeroMQ Proxy
ZeroMqProxy
是内置 ZMQ.proxy()
function 的 Spring 友好封装。它封装了套接字生命周期和线程管理。此代理程序的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。除了标准 ZContext
之外,它还需要众所周知的 ZeroMQ 代理模式之一:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样就为代理程序的前端和后端使用了一对适当的 ZeroMQ 套接字类型。有关详细信息,请参阅 ZeroMqProxy.Type
。
The ZeroMqProxy
is a Spring-friendly wrapper for the built-in ZMQ.proxy()
function.
It encapsulates socket lifecycles and thread management.
The clients of this proxy still can use a standard ZeroMQ socket connection and interaction API.
Alongside with the standard ZContext
it requires one of the well-known ZeroMQ proxy modes: SUB/PUB, PULL/PUSH or ROUTER/DEALER.
This way an appropriate pair of ZeroMQ socket types are used for the frontend and backend of the proxy.
See ZeroMqProxy.Type
for details.
ZeroMqProxy
实现 SmartLifecycle
来创建、绑定和配置套接字,以及在来自 Executor
(如果有的话)的专用线程中启动 ZMQ.proxy()
。前端和后端套接字的绑定通过 tcp://
协议在所有可用的网络接口上进行,并使用提供的端口。否则,它们会绑定到稍后可以通过相应的 getFrontendPort()
和 getBackendPort()
API 方法获得的随机端口。
The ZeroMqProxy
implements SmartLifecycle
to create, bind and configure the sockets and to start ZMQ.proxy()
in a dedicated thread from an Executor
(if any).
The binding for frontend and backend sockets is done over the tcp://
protocol onto all of the available network interfaces with the provided ports.
Otherwise, they are bound to random ports which can be obtained later via the respective getFrontendPort()
and getBackendPort()
API methods.
控制套接字公开为 SocketType.PAIR
,在 "inproc://" + beanName + ".control"
地址上进行线程间传输;可以通过 getControlAddress()
获得它。它应当与来自另一个 SocketType.PAIR
套接字的相同应用程序一起使用,以发送 ZMQ.PROXY_TERMINATE
、ZMQ.PROXY_PAUSE
和/或 ZMQ.PROXY_RESUME
命令。当调用 stop()
来终止 ZMQ.proxy()
循环并正常关闭所有绑定的套接字时,ZeroMqProxy
会执行 ZMQ.PROXY_TERMINATE
命令。
The control socket is exposed as a SocketType.PAIR
with an inter-thread transport on the "inproc://" + beanName + ".control"
address; it can be obtained via getControlAddress()
.
It should be used with the same application from another SocketType.PAIR
socket to send ZMQ.PROXY_TERMINATE
, ZMQ.PROXY_PAUSE
and/or ZMQ.PROXY_RESUME
commands.
The ZeroMqProxy
performs a ZMQ.PROXY_TERMINATE
command when stop()
is called for its lifecycle to terminate the ZMQ.proxy()
loop and close all the bound sockets gracefully.
setExposeCaptureSocket(boolean)
选项会导致此组件使用 SocketType.PUB
绑定一个额外的线程间套接字,以便捕获和发布前端和后端套接字之间的所有通信,正如 ZMQ.proxy()
实现所述。此套接字绑定到 "inproc://" + beanName + ".capture"
地址,并且不要求任何特定订阅进行筛选。
The setExposeCaptureSocket(boolean)
option causes this component to bind an additional inter-thread socket with SocketType.PUB
to capture and publish all the communication between the frontend and backend sockets as it states with ZMQ.proxy()
implementation.
This socket is bound to the "inproc://" + beanName + ".capture"
address and doesn’t expect any specific subscription for filtering.
可以通过其他属性,如读/写超时或安全性来定制前端和后端套接字。这种定制可分别通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和 setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回调获得。
The frontend and backend sockets can be customized with additional properties, such as read/write timeout or security.
This customization is available through setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
and setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
callbacks, respectively.
ZeroMqProxy
可作为以下的简单 Bean 提供:
The ZeroMqProxy
could be provided as simple bean like this:
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过 tcp://
连接到此代理的主机,并使用它们各自感兴趣的端口。
All the client nodes should connect to the host of this proxy via tcp://
and use the respective port of their interest.
ZeroMQ Message Channel
ZeroMqChannel
是一个 SubscribableChannel
,它使用一对 ZeroMQ 套接字来连接发布者和订阅者,以进行消息传递交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它还用作本地线程间通道(使用 PAIR
套接字) - 在这种情况下,不提供 connectUrl
。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。连接 URL 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及冒号分隔的用作 ZeroMQ 代理前端和后端套接字的一对端口。为了方便,如果在与代理相同的应用程序中配置通道,则可以使用 ZeroMqProxy
实例(而不是连接字符串)来提供通道。
The ZeroMqChannel
is a SubscribableChannel
which uses a pair of ZeroMQ sockets to connect publishers and subscribers for messaging interaction.
It can work in a PUB/SUB mode (defaults to PUSH/PULL); it can also be used as a local inter-thread channel (uses PAIR
sockets) - the connectUrl
is not provided in this case.
In distributed mode it has to be connected to an externally managed ZeroMQ proxy, where it can exchange messages with other similar channels connected to the same proxy.
The connection url option is a standard ZeroMQ connection string with the protocol and host and a pair of ports over colon for frontend and backend sockets of the ZeroMQ proxy.
For convenience, the channel could be supplied with the ZeroMqProxy
instance instead of connection string, if it is configured in the same application as the proxy.
发送和接收套接字在其各自的专用线程中进行管理,这使得此通道对并行处理很友好。这样,我们可以从不同的线程向/从 ZeroMqChannel
发布和使用数据,而无需同步。
Both sending and receiving sockets are managed in their own dedicated threads making this channel concurrency-friendly.
This way we can publish and consume to/from a ZeroMqChannel
from different threads without synchronization.
默认情况下,ZeroMqChannel
使用 EmbeddedJsonHeadersMessageMapper
来使用 Jackson JSON 处理程序从 byte[]
(de)序列化 Message
(包括头信息)。此逻辑可以通过 `setMessageMapper(BytesMessageMapper)`进行配置。
By default, the ZeroMqChannel
uses an EmbeddedJsonHeadersMessageMapper
to (de)serialize the Message
(including headers) from/to byte[]
using a Jackson JSON processor.
This logic can be configured via setMessageMapper(BytesMessageMapper)
.
通过各个 setSendSocketConfigurer(Consumer<ZMQ.Socket>)
和 setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回调,可以为任何选项(读/写超时、安全性等)自定义发送和接收套接字。
Sending and receiving sockets can be customized for any options (read/write timeout, security etc.) via respective setSendSocketConfigurer(Consumer<ZMQ.Socket>)
and setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
callbacks.
ZeroMqChannel
的内部逻辑基于响应式流,通过项目反应器 Flux
和 Mono
操作符。这提供了更简单的线程控制,并允许对通道进行无锁并发发布和消费。本地 PUB/SUB 逻辑实现为 Flux.publish()
操作符,以允许该通道的所有本地订阅者接收相同的已发布消息,作为 PUB
套接字的分布式订阅者。
The internal logic of the ZeroMqChannel
is based on the reactive streams via Project Reactor Flux
and Mono
operators.
This provides easier threading control and allows lock-free concurrent publication and consumption to/from the channel.
Local PUB/SUB logic is implemented as a Flux.publish()
operator to allow all of the local subscribers to this channel to receive the same published message, as distributed subscribers to the PUB
socket.
以下是一个 ZeroMqChannel
配置的简单示例:
The following is a simple example of a ZeroMqChannel
configuration:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ Inbound Channel Adapter
ZeroMqMessageProducer
是一种带有响应式语义的 MessageProducerSupport
实现。它以非阻塞方式持续读取 ZeroMQ 套接字中的数据,并将消息发布到无限 Flux
中,该 Flux
由 FluxMessageChannel
订阅,或者如果输出通道不是响应式,则在 start()
方法中明确订阅。当套接字上没有收到数据时,在下次读取尝试之前应用 consumeDelay
(默认为 1 秒)。
The ZeroMqMessageProducer
is a MessageProducerSupport
implementation with reactive semantics.
It constantly reads the data from a ZeroMQ socket in a non-blocking manner and publishes the messages to an infinite Flux
which is subscribed to by a FluxMessageChannel
or explicitly in the start()
method, if the output channel is not reactive.
When no data are received on the socket, a consumeDelay
(defaults to 1 second) is applied before the next read attempt.
ZeroMqMessageProducer
仅支持 SocketType.PAIR
、SocketType.PULL
和 SocketType.SUB
。此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。可以在此组件启动后通过 getBoundPort()
获取实际端口,并且 ZeroMQ 套接字已绑定。套接字选项(例如安全性或写超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调进行配置。
Only SocketType.PAIR
, SocketType.PULL
and SocketType.SUB
are supported by the ZeroMqMessageProducer
.
This component can connect to the remote socket or bind onto TCP protocol with the provided or random port.
The actual port can be obtained via getBoundPort()
after this component is started and ZeroMQ socket is bound.
The socket options (e.g. security or write timeout) can be configured via setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
callback.
如果 receiveRaw
选项设置为 true
,则从套接字中获取的 ZMsg
会按原样发送到生成消息的有效负载中:由下游流程解析并转换 ZMsg
。否则,InboundMessageMapper
用于将获取的数据转换为 Message
。如果接收到的 ZMsg
是多帧的,则第一帧将被视为 ZeroMQ 消息发布到的 ZeroMqHeaders.TOPIC
头。
If the receiveRaw
option is set to true
, a ZMsg
, consumed from the socket, is sent as is in the payload of the produced Message
: it’s up to the downstream flow to parse and convert the ZMsg
.
Otherwise, an InboundMessageMapper
is used to convert the consumed data into a Message
.
If the received ZMsg
is multi-frame, the first frame is treated as the ZeroMqHeaders.TOPIC
header this ZeroMQ message was published to.
对于 SocketType.SUB
,ZeroMqMessageProducer
使用提供的 topics
选项进行订阅;默认为订阅所有。可以通过 subscribeToTopics()
和 unsubscribeFromTopics()
@ManagedOperation
在运行时调整订阅。
With SocketType.SUB
, the ZeroMqMessageProducer
uses the provided topics
option for subscriptions; defaults to subscribe to all.
Subscriptions can be adjusted at runtime using subscribeToTopics()
and unsubscribeFromTopics()
@ManagedOperation
s.
以下是 ZeroMqMessageProducer
配置的示例:
Here is a sample of ZeroMqMessageProducer
configuration:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ Outbound Channel Adapter
ZeroMqMessageHandler
是一种 ReactiveMessageHandler
实现,用于将发布消息生成到 ZeroMQ 套接字中。只支持 SocketType.PAIR
、SocketType.PUSH
和 SocketType.PUB
。ZeroMqMessageHandler
只支持连接 ZeroMQ 套接字;不支持绑定。当使用 SocketType.PUB
时,将根据请求消息评估 topicExpression
,以在 ZeroMQ 消息中注入主题帧(如果它不为 null)。订阅方 (SocketType.SUB
) 必须先接收主题帧,然后才能解析实际的数据。当请求消息的有效负载为 ZMsg
时,不执行转换或主题提取:ZMsg
被原样发送到套接字中,并且不会将其销毁以备后续可能的重复使用。否则,使用 OutboundMessageMapper<byte[]>
将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以发布。默认情况下,使用带有 ConfigurableCompositeMessageConverter
的 ConvertingBytesMessageMapper
。套接字选项(例如安全性或写超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调进行配置。
The ZeroMqMessageHandler
is a ReactiveMessageHandler
implementation to produce publish messages into a ZeroMQ socket.
Only SocketType.PAIR
, SocketType.PUSH
and SocketType.PUB
are supported.
The ZeroMqMessageHandler
only supports connecting the ZeroMQ socket; binding is not supported.
When the SocketType.PUB
is used, the topicExpression
is evaluated against a request message to inject a topic frame into a ZeroMQ message if it is not null.
The subscriber side (SocketType.SUB
) must receive the topic frame first before parsing the actual data.
When the payload of the request message is a ZMsg
, no conversion or topic extraction is performed: the ZMsg
is sent into a socket as is and it is not destroyed for possible further reuse.
Otherwise, an OutboundMessageMapper<byte[]>
is used to convert a request message (or just its payload) into a ZeroMQ frame to publish.
By default, a ConvertingBytesMessageMapper
is used supplied with a ConfigurableCompositeMessageConverter
.
The socket options (e.g. security or write timeout) can be configured via setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
callback.
以下是 ZeroMqMessageHandler
配置的示例:
Here is a sample of ZeroMqMessageHandler
configuration:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL Support
spring-integration-zeromq
通过 ZeroMq
工厂和上述组件的 IntegrationComponentSpec
实现提供了一个便捷的 Java DSL 流畅 API。
The spring-integration-zeromq
provide a convenient Java DSL fluent API via ZeroMq
factory and IntegrationComponentSpec
implementations for the components mentioned above.
以下是 ZeroMqChannel
的 Java DSL 示例:
This is a sample of Java DSL for ZeroMqChannel
:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
The Inbound Channel Adapter for ZeroMQ Java DSL is:
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是:
The Outbound Channel Adapter for ZeroMQ Java DSL is:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}