STOMP Client

Spring 提供了 STOMP over WebSocket 客户端和 STOMP over TCP 客户端。

首先,您可以创建和配置 WebSocketStompClient,如下例所示:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在前面的示例中,你可以用`SockJsClient`替换`StandardWebSocketClient`,因为这也是`WebSocketClient`的实现。SockJsClient`可以使用 WebSocket 或基于 HTTP 的传输作为后备。有关更多详细信息,请参见`SockJsClient

接下来,您可以建立连接并为 STOMP 会话提供一个处理程序,如下例所示:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备好使用时,会通知处理程序,如下例所示:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

会话建立后,可以发送任何有效负载,并使用已配置的 MessageConverter 进行序列化,如下例所示:

session.send("/topic/something", "payload");

您还可以订阅目的地。subscribe 方法要求一个用于订阅消息的处理程序,并返回一个可用于取消订阅的 Subscription 句柄。对于每个接收到的消息,处理程序可以指定有效负载应被反序列化到的目标 Object 类型,如下例所示:

session.subscribe("/topic/something", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,您可以使用 TaskScheduler 配置 WebSocketStompClient 并根据需要自定义心跳间隔(写不活动 10 秒,从而导致发送心跳,读不活动 10 秒,从而关闭连接)。

WebSocketStompClient 仅在无操作的情况下(即没有发送其他消息时)发送心跳。当使用外部代理时,这可能会造成困难,因为使用非代理目标的消息表示操作但实际上并未转发到代理。在这种情况下,你可以在初始化 External Broker 时配置 TaskScheduler ,确保即使仅发送具有非代理目标的消息,心跳也会转发到该代理。

当您使用 WebSocketStompClient 进行性能测试以从同一台计算机模拟数千个客户端时,请考虑关闭心跳,因为每个连接都会调度自己的心跳任务,这并未针对在同一台计算机上运行的大量客户端进行优化。

STOMp 协议还支持回执,客户端必须在其中添加 receipt 头,服务器在处理完发送或订阅后使用 RECEIPT 帧对其进行响应。为了支持此功能,StompSession 提供 setAutoReceipt(boolean),这会使在以后的每个发送或订阅事件中添加 receipt 头。或者,您也可以手动向 StompHeaders 添加回执头。发送和订阅都返回 Receiptable 实例,您可以使用该实例注册回执成功和失败回调。对于此功能,您必须使用 TaskScheduler 配置客户端以及回执过期之前的时间长度(默认情况下为 15 秒)。

请注意,StompSessionHandler 本身是一个 StompFrameHandler,除了用于处理消息的 handleException 回调和用于包括 ConnectionLostException 在内的传输级错误的 handleTransportError 回调外,它还可以处理 ERROR 帧。

你可以使用 WebSocketStompClientinboundMessageSizeLimitoutboundMessageSizeLimit 属性来限制入站和出站 WebSocket 消息的最大大小。当一个出站 STOMP 消息超过限制后,它会被分割成部分帧,然后接收方必须重新组装这些帧。默认情况下,对出站消息没有大小限制。当一个入站 STOMP 消息大小超过配置的限制后,将引发 StompConversionException。入站消息的默认大小限制为 64KB

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB