STOMP Support
Spring 集成 4.2 版本引入了 STOMP(简单文本导向消息协议)客户端支持。它基于 Spring 框架消息模块中的架构、基础设施和 API,stomp 包。Spring 集成使用许多 Spring STOMP 组件(如 @{2} 和 @{3})。有关更多信息,请参阅 Spring 框架参考手册中的 @{4} 章节。
Spring Integration version 4.2 introduced STOMP (Simple Text Orientated Messaging Protocol) client support.
It is based on the architecture, infrastructure, and API from the Spring Framework’s messaging module, stomp package.
Spring Integration uses many of Spring STOMP components (such as StompSession
and StompClientSupport
).
For more information, see the Spring Framework STOMP Support chapter in the Spring Framework reference manual.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:{project-version}"
对于服务器端组件,你需要添加 org.springframework:spring-websocket
和/或 io.projectreactor.netty:reactor-netty
依赖项。
For server side components you need to add a org.springframework:spring-websocket
and/or io.projectreactor.netty:reactor-netty
dependencies.
Overview
要配置 STOMP,你应该从 STOMP 客户端对象开始。Spring Framework 提供以下实现:
To configure STOMP, you should start with the STOMP client object. The Spring Framework provides the following implementations:
-
WebSocketStompClient
: Built on the Spring WebSocket API with support for standard JSR-356 WebSocket, Jetty 9, and SockJS for HTTP-based WebSocket emulation with SockJS Client. -
ReactorNettyTcpStompClient
: Built onReactorNettyTcpClient
from thereactor-netty
project.
您可以提供任何其他 @{5} 实现。请参阅这些类的 @{6}。
You can provide any other StompClientSupport
implementation.
See the Javadoc of those classes.
StompClientSupport
类被设计为一个用于为提供的 StompSessionHandler
生成 StompSession
的工厂,所有剩余的工作都是通过到该 StompSessionHandler
和 StompSession
抽象的回调来完成。使用 Spring Integration 适配器抽象,我们需要提供一些受管理的共享对象来表示我们的应用程序作为具有其唯一会话的 STOMP 客户端。为此,Spring Integration 提供 StompSessionManager
抽象来管理任何提供的 StompSessionHandler
之间的单个 StompSession
。这允许使用特定 STOMP 代理的入站或出站管道适配器(或两者)。有关更多信息,请参阅 StompSessionManager
(及其实现)JavaDocs。
The StompClientSupport
class is designed as a factory to produce a StompSession
for the provided StompSessionHandler
and all the remaining work is done through the callbacks to that StompSessionHandler
and StompSession
abstraction.
With the Spring Integration adapter abstraction, we need to provide some managed shared object to represent our application as a STOMP client with its unique session.
For this purpose, Spring Integration provides the StompSessionManager
abstraction to manage the single StompSession
between any provided StompSessionHandler
.
This allows the use of inbound or outbound channel adapters (or both) for the particular STOMP Broker.
See StompSessionManager
(and its implementations) JavaDocs for more information.
STOMP Inbound Channel Adapter
StompInboundChannelAdapter
是一个一站式 MessageProducer
组件,它将你的 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(使用连接的 StompSession
上提供的 MessageConverter
从 STOMP 帧转换)。你可以通过对 StompInboundChannelAdapter
使用适当的 @ManagedOperation
注解在运行时更改目标(因此 STOMP 订阅)。
The StompInboundChannelAdapter
is a one-stop MessageProducer
component that subscribes your Spring Integration application to the provided STOMP destinations and receives messages from them (converted from the STOMP frames by using the provided MessageConverter
on the connected StompSession
).
You can change the destinations (and therefore STOMP subscriptions) at runtime by using appropriate @ManagedOperation
annotations on the StompInboundChannelAdapter
.
有关更多配置选项,请参阅 @{9} 和 @{7} @{8}。
For more configuration options, see STOMP Namespace Support and the StompInboundChannelAdapter
Javadoc.
STOMP Outbound Channel Adapter
StompMessageHandler
是 <int-stomp:outbound-channel-adapter>
的 MessageHandler
,用于通过 StompSession
(由共享的 StompSessionManager
提供)将传出的 Message<?>
实例发送到 STOMP 目标(预先配置或在运行时使用 SpEL 表达式确定)。
The StompMessageHandler
is the MessageHandler
for the <int-stomp:outbound-channel-adapter>
and is used to send the outgoing Message<?>
instances to the STOMP destination
(pre-configured or determined at runtime with a SpEL expression) through the StompSession
(which is provided by the shared StompSessionManager
).
有关更多配置选项,请参阅 @{12} 和 @{10} @{11}。
For more configuration options see STOMP Namespace Support and the StompMessageHandler
Javadoc.
STOMP Headers Mapping
STOMP 协议提供标头作为其帧的一部分。STOMP 帧的整个结构具有以下格式:
The STOMP protocol provides headers as part of its frame. The entire structure of the STOMP frame has the following format:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring 框架提供 @{13} 来表示这些标头。请参阅 @{20} 了解更多详情。STOMP 帧将转换为 @{14} 实例并从中转换,而这些标头将映射到 @{15} 实例并从中映射。Spring 集成提供了一个 STOMP 适配器的默认 @{16} 实现。该实现是 @{17}。它分别为入站和出站适配器提供 @{18} 和 @{19} 操作。
Spring Framework provides StompHeaders
to represent these headers.
See the Javadoc for more details.
STOMP frames are converted to and from Message<?>
instances and these headers are mapped to and from MessageHeaders
instances.
Spring Integration provides a default HeaderMapper
implementation for the STOMP adapters.
The implementation is StompHeaderMapper
.
It provides fromHeaders()
and toHeaders()
operations for the inbound and outbound adapters, respectively.
与许多其他 Spring Integration 模块类似,IntegrationStompHeaders
类已被引入以将标准 STOMP 头部映射到 MessageHeaders
,其中 stomp_
为头部名称前缀。此外,所有带有该前缀的 MessageHeaders
实例在发送到目标时都将映射到 StompHeaders
。
As with many other Spring Integration modules, the IntegrationStompHeaders
class has been introduced to map standard STOMP headers to MessageHeaders
, with stomp_
as the header name prefix.
In addition, all MessageHeaders
instances with that prefix are mapped to the StompHeaders
when sending to a destination.
有关更多信息,请参阅这些类的 @{22} 和 @{23} 中的 @{21} 属性说明。
For more information, see the Javadoc for those classes and the mapped-headers
attribute description in the STOMP Namespace Support.
STOMP Integration Events
很多 STOMP 操作都是异步的,包括错误处理。例如,STOMP 具有 @{24} 服务器帧,当客户端帧通过添加 @{25} 标头请求它时,它会返回该帧。Spring 集成会发出 @{26} 实例,以提供对这些异步事件的访问,您可以通过实现 @{27} 或使用 @{28} (请参阅 @{29})来获取该实例。
Many STOMP operations are asynchronous, including error handling.
For example, STOMP has a RECEIPT
server frame that it returns when a client frame has requested one by adding the RECEIPT
header.
To provide access to these asynchronous events, Spring Integration emits StompIntegrationEvent
instances, which you can obtain by implementing an ApplicationListener
or by using an <int-event:inbound-channel-adapter>
(see Receiving Spring Application Events).
具体来说,当 stompSessionListenableFuture
由于无法连接到 STOMP 代理而接收到 onFailure()
时,AbstractStompSessionManager
会发出 StompExceptionEvent
。另一个示例是 StompMessageHandler
。它处理 ERROR
STOMP 帧,这些帧是对该 StompMessageHandler
发送的不当(不被接受)消息的服务器响应。
Specifically, a StompExceptionEvent
is emitted from the AbstractStompSessionManager
when a stompSessionListenableFuture
receives onFailure()
due to failure to connect to STOMP broker.
Another example is the StompMessageHandler
.
It processes ERROR
STOMP frames, which are server responses to improper (unaccepted) messages sent by this StompMessageHandler
.
StompMessageHandler
将 StompReceiptEvent
作为 StompSession.Receiptable
回调的一部分发出,作为发送到 StompSession
的消息的异步回答。StompReceiptEvent
可以是正面的或负面的,具体取决于是否在 receiptTimeLimit
周期内从服务器接收到 RECEIPT
帧,您可以在 StompClientSupport
实例上配置它。它的默认值为 15 * 1000
(以毫秒为单位,即 15 秒)。
The StompMessageHandler
emits StompReceiptEvent
as a part of StompSession.Receiptable
callbacks in the asynchronous answers for the messages sent to the StompSession
.
The StompReceiptEvent
can be positive or negative, depending on whether the RECEIPT
frame was received from the server within the receiptTimeLimit
period, which you can configure on the StompClientSupport
instance.
It defaults to 15 * 1000
(in milliseconds, so 15 seconds).
仅当要发送的消息的 |
The |
请参阅 @{31} 了解有关如何配置 Spring 集成以接受这些 @{30} 实例的更多信息。
See STOMP Adapters Java Configuration for more information how to configure Spring Integration to accept those ApplicationEvent
instances.
STOMP Adapters Java Configuration
以下示例展示了 STOMP 适配器的全面 Java 配置:
The following example shows a comprehensive Java configuration for STOMP adapters:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP Namespace Support
Spring Integration STOMP 名称空间实现了入站和出站通道适配器组件。要在您的配置中包含它,请在您的应用程序上下文配置文件中提供以下名称空间声明:
The Spring Integration STOMP namespace implements the inbound and outbound channel adapter components. To include it in your configuration, provide the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
Understanding the <int-stomp:outbound-channel-adapter>
Element
以下清单展示了 STOMP 出站通道适配器的可用属性:
The following listing shows the available attributes for the STOMP outbound channel adapter:
<int-stomp:outbound-channel-adapter
id="" 1
channel="" 2
stomp-session-manager="" 3
header-mapper="" 4
mapped-headers="" 5
destination="" 6
destination-expression="" 7
auto-startup="" 8
phase=""/> 9
1 | The component bean name.
The MessageHandler is registered with a bean alias of id plus .handler .
If you do not set the channel attribute, a DirectChannel is created and registered in the application context with the value of this id attribute as the bean name.
In this case, the endpoint is registered with a bean name id plus .adapter . |
2 | Identifies the channel attached to this adapter if id is present.
See id .
Optional. |
3 | Reference to a StompSessionManager bean, which encapsulates the low-level connection and StompSession handling operations.
Required. |
4 | Reference to a bean that implements HeaderMapper<StompHeaders> , which maps Spring Integration MessageHeaders to and from
STOMP frame headers.
It is mutually exclusive with mapped-headers .
It defaults to StompHeaderMapper . |
5 | Comma-separated list of names of STOMP Headers to be mapped to the STOMP frame headers.
It can be provided only if the header-mapper reference is not set.
The values in this list can also be simple patterns to be matched against the header names (such as myheader* or *myheader ).
A special token (STOMP_OUTBOUND_HEADERS ) represents all the standard STOMP headers (content-length, receipt, heart-beat, and so on).
They are included by default.
If you want to add your own headers and want the standard headers to also be mapped, you must include this token or provide your own HeaderMapper implementation by using header-mapper . |
6 | Name of the destination to which STOMP Messages are sent.
It is mutually exclusive with the destination-expression . |
7 | A SpEL expression to be evaluated at runtime against each Spring Integration Message as the root object.
It is mutually exclusive with the destination . |
8 | Boolean value indicating whether this endpoint should start automatically.
It defaults to true . |
9 | The lifecycle phase within which this endpoint should start and stop.
The lower the value, the earlier this endpoint starts and the later it stops.
The default is Integer.MIN_VALUE .
Values can be negative.
See SmartLifeCycle . |
Understanding the <int-stomp:inbound-channel-adapter>
Element
以下清单展示了 STOMP 入站通道适配器的可用属性:
The following listing shows the available attributes for the STOMP inbound channel adapter:
<int-stomp:inbound-channel-adapter
id="" 1
channel="" 2
error-channel="" 3
stomp-session-manager="" 4
header-mapper="" 5
mapped-headers="" 6
destinations="" 7
send-timeout="" 8
payload-type="" 9
auto-startup="" 10
phase=""/> 11
1 | The component bean name.
If you do not set the channel attribute, a DirectChannel is created and registered in the application context with the value of this id attribute as the bean name.
In this case, the endpoint is registered with the bean name id plus .adapter . |
2 | Identifies the channel attached to this adapter. |
3 | The MessageChannel bean reference to which ErrorMessage instances should be sent. |
4 | See the same option on the <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>. |
5 | Comma-separated list of names of STOMP Headers to be mapped from the STOMP frame headers.
You can only provide this if the header-mapper reference is not set.
The values in this list can also be simple patterns to be matched against the header names (for example, myheader* or *myheader ).
A special token (STOMP_INBOUND_HEADERS ) represents all the standard STOMP headers (content-length, receipt, heart-beat, and so on).
They are included by default.
If you want to add your own headers and want the standard headers to also be mapped, you must also include this token or provide your own HeaderMapper implementation using header-mapper . |
6 | See the same option on the <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>. |
7 | Comma-separated list of STOMP destination names to subscribe.
The list of destinations (and therefore subscriptions) can be modified at runtime through the addDestination() and removeDestination() @ManagedOperation annotations. |
8 | Maximum amount of time (in milliseconds) to wait when sending a message to the channel if the channel can block.
For example, a QueueChannel can block until space is available if its maximum capacity has been reached. |
9 | Fully qualified name of the Java type for the target payload to convert from the incoming STOMP frame.
It defaults to String.class . |
10 | See the same option on the <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>. |
11 | See the same option on the <<`<int-stomp:outbound-channel-adapter>`,stomp-outbound-channel-adapter>>. |