STOMP Client

Spring 提供了 STOMP over WebSocket 客户端和 STOMP over TCP 客户端。

Spring provides a STOMP over WebSocket client and a STOMP over TCP client.

首先,您可以创建和配置 WebSocketStompClient,如下例所示:

To begin, you can create and configure WebSocketStompClient, as the following example shows:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在前面的示例中,你可以用`SockJsClient`替换`StandardWebSocketClient`,因为这也是`WebSocketClient`的实现。SockJsClient`可以使用 WebSocket 或基于 HTTP 的传输作为后备。有关更多详细信息,请参见`SockJsClient

In the preceding example, you could replace StandardWebSocketClient with SockJsClient, since that is also an implementation of WebSocketClient. The SockJsClient can use WebSocket or HTTP-based transport as a fallback. For more details, see SockJsClient.

接下来,您可以建立连接并为 STOMP 会话提供一个处理程序,如下例所示:

Next, you can establish a connection and provide a handler for the STOMP session, as the following example shows:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备好使用时,会通知处理程序,如下例所示:

When the session is ready for use, the handler is notified, as the following example shows:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

会话建立后,可以发送任何有效负载,并使用已配置的 MessageConverter 进行序列化,如下例所示:

Once the session is established, any payload can be sent and is serialized with the configured MessageConverter, as the following example shows:

session.send("/topic/something", "payload");

您还可以订阅目的地。subscribe 方法要求一个用于订阅消息的处理程序,并返回一个可用于取消订阅的 Subscription 句柄。对于每个接收到的消息,处理程序可以指定有效负载应被反序列化到的目标 Object 类型,如下例所示:

You can also subscribe to destinations. The subscribe methods require a handler for messages on the subscription and returns a Subscription handle that you can use to unsubscribe. For each received message, the handler can specify the target Object type to which the payload should be deserialized, as the following example shows:

session.subscribe("/topic/something", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,您可以使用 TaskScheduler 配置 WebSocketStompClient 并根据需要自定义心跳间隔(写不活动 10 秒,从而导致发送心跳,读不活动 10 秒,从而关闭连接)。

To enable STOMP heartbeat, you can configure WebSocketStompClient with a TaskScheduler and optionally customize the heartbeat intervals (10 seconds for write inactivity, which causes a heartbeat to be sent, and 10 seconds for read inactivity, which closes the connection).

WebSocketStompClient 仅在无操作的情况下(即没有发送其他消息时)发送心跳。当使用外部代理时,这可能会造成困难,因为使用非代理目标的消息表示操作但实际上并未转发到代理。在这种情况下,你可以在初始化 External Broker 时配置 TaskScheduler ,确保即使仅发送具有非代理目标的消息,心跳也会转发到该代理。

WebSocketStompClient sends a heartbeat only in case of inactivity, i.e. when no other messages are sent. This can present a challenge when using an external broker since messages with a non-broker destination represent activity but aren’t actually forwarded to the broker. In that case you can configure a TaskScheduler when initializing the External Broker which ensures a heartbeat is forwarded to the broker also when only messages with a non-broker destination are sent.

当您使用 WebSocketStompClient 进行性能测试以从同一台计算机模拟数千个客户端时,请考虑关闭心跳,因为每个连接都会调度自己的心跳任务,这并未针对在同一台计算机上运行的大量客户端进行优化。

When you use WebSocketStompClient for performance tests to simulate thousands of clients from the same machine, consider turning off heartbeats, since each connection schedules its own heartbeat tasks and that is not optimized for a large number of clients running on the same machine.

STOMp 协议还支持回执,客户端必须在其中添加 receipt 头,服务器在处理完发送或订阅后使用 RECEIPT 帧对其进行响应。为了支持此功能,StompSession 提供 setAutoReceipt(boolean),这会使在以后的每个发送或订阅事件中添加 receipt 头。或者,您也可以手动向 StompHeaders 添加回执头。发送和订阅都返回 Receiptable 实例,您可以使用该实例注册回执成功和失败回调。对于此功能,您必须使用 TaskScheduler 配置客户端以及回执过期之前的时间长度(默认情况下为 15 秒)。

The STOMP protocol also supports receipts, where the client must add a receipt header to which the server responds with a RECEIPT frame after the send or subscribe are processed. To support this, the StompSession offers setAutoReceipt(boolean) that causes a receipt header to be added on every subsequent send or subscribe event. Alternatively, you can also manually add a receipt header to the StompHeaders. Both send and subscribe return an instance of Receiptable that you can use to register for receipt success and failure callbacks. For this feature, you must configure the client with a TaskScheduler and the amount of time before a receipt expires (15 seconds by default).

请注意,StompSessionHandler 本身是一个 StompFrameHandler,除了用于处理消息的 handleException 回调和用于包括 ConnectionLostException 在内的传输级错误的 handleTransportError 回调外,它还可以处理 ERROR 帧。

Note that StompSessionHandler itself is a StompFrameHandler, which lets it handle ERROR frames in addition to the handleException callback for exceptions from the handling of messages and handleTransportError for transport-level errors including ConnectionLostException.

你可以使用 WebSocketStompClientinboundMessageSizeLimitoutboundMessageSizeLimit 属性来限制入站和出站 WebSocket 消息的最大大小。当一个出站 STOMP 消息超过限制后,它会被分割成部分帧,然后接收方必须重新组装这些帧。默认情况下,对出站消息没有大小限制。当一个入站 STOMP 消息大小超过配置的限制后,将引发 StompConversionException。入站消息的默认大小限制为 64KB

You can use the inboundMessageSizeLimit and outboundMessageSizeLimit properties of WebSocketStompClient to limit the maximum size of inbound and outbound WebSocket messages. When an outbound STOMP message exceeds the limit, it is split into partial frames, which the receiver would have to reassemble. By default, there is no size limit for outbound messages. When an inbound STOMP message size exceeds the configured limit, a StompConversionException is thrown. The default size limit for inbound messages is 64KB.

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB