MQTT Support
Spring Integration 提供入站和出站通道适配器来支持消息队列遥测传输 (MQTT) 协议。 你需要将此依赖项包含在你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:{project-version}"
当前实现使用 Eclipse Paho MQTT Client库。
XML 配置和本章的大部分内容都与 MQTT v3.1 协议支持和各自的 Paho Client 有关。有关各自的协议支持,请参阅 MQTT v5 Support 段落。
这两个适配器的配置都使用 DefaultMqttPahoClientFactory
来实现。有关配置选项的详细信息,请参阅 Paho 文档。
我们建议配置 |
Inbound (Message-driven) Channel Adapter
入站通道适配器由 MqttPahoMessageDrivenChannelAdapter
实现。为方便起见,您可以使用以下名称空间来配置它。最基本的配置如下所示:
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下清单显示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" 1
url="tcp://localhost:1883" 2
topics="bar,baz" 3
qos="1,2" 4
converter="myConverter" 5
client-factory="clientFactory" 6
send-timeout="123" 7
error-channel="errors" 8
recovery-interval="10000" 9
manual-acks="false" 10
channel="out" />
1 | The client ID. |
2 | The broker URL. |
3 | 此适配器从中接收消息的一系列主题(用逗号分隔)。 |
4 | 逗号分隔的 QoS 值列表。它可以是应用于所有主题的单个值或每个主题的一个值(在这种情况下,列表必须具有相同的长度)。 |
5 | 一个 MqttMessageConverter (可选)。默认情况下,默认 `DefaultPahoMessageConverter`会创建带有以下标头的消息:
|
6 | The client factory. |
7 | send()`超时。它仅适用于通道可能阻塞(例如当前满的受限 `QueueChannel )的情况。 |
8 | 错误通道。如果提供了错误通道,则下游异常会以 ErrorMessage`的形式发送到此通道。有效载荷是包含失败消息和原因的 `MessagingException 。 |
9 | 恢复间隔。它控制了适配器在发生故障后重试连接的时间间隔。其默认值为 10000ms (十秒)。 |
10 | 应答模式;设置为 true 以进行手动应答。 |
从版本 4.1 开始,你可以省略 URL。相反,你可以在 |
从 4.2.2 版开始,当适配器成功订阅主题后,将发布 MqttSubscribedEvent
。当连接或订阅失败时,将发布 MqttConnectionFailedEvent
事件。这些事件可以通过实现 ApplicationListener
的 bean 来接收。
此外,一个名为 recoveryInterval
的新属性控制了适配器在出现故障后尝试重新连接的时间间隔。它的默认值为 10000ms
(十秒)。
在 4.2.3 版之前,当适配器停止时,客户端始终取消订阅。这是不正确的,因为如果客户端 QOS 大于 0,我们就需要保持订阅活动,这样在适配器停止时到达的消息将在下次启动时被传递。这也要求在客户端工厂上将 |
从版本 5.0 开始,topic
、qos
和 retained
属性映射到 .RECEIVED_…
标头(MqttHeaders.RECEIVED_TOPIC
、MqttHeaders.RECEIVED_QOS
和 MqttHeaders.RECEIVED_RETAINED
),以避免无意中传播到(默认情况下)使用 MqttHeaders.TOPIC
、MqttHeaders.QOS
和 MqttHeaders.RETAINED
标头的出站消息。
Adding and Removing Topics at Runtime
从版本 4.1 开始,您可以通过编程方式更改适配器订阅的主题。Spring Integration 提供 addTopic()
和 removeTopic()
方法。添加主题时,您可以选择指定 QoS
(默认值:1)。您还可以通过向 <control-bus/>
发送适当的消息来修改主题,其中包含适当的负载,例如:"myMqttAdapter.addTopic('foo', 1)"
。
停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。这些更改不会保留在应用程序上下文的生命周期之外。一个新的应用程序上下文将恢复到已配置的设置。
在适配器停止(或与代理断开连接)时更改主题会在下次建立连接时生效。
Manual Acks
从版本 5.3 开始,您可以将 manualAcks
属性设置为 true。通常用于异步确认交付。当设置为 true
时,标头(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
)会添加到消息中,其值为 SimpleAcknowledgment
。您必须调用 acknowledge()
方法才能完成交付。有关更多信息,请参阅 IMqttClient
setManualAcks()
和 messageArrivedComplete()
的 Javadoc。为了方便起见,提供了一个标头访问器:
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
从版本 5.2.11
开始,当消息转换器引发异常或从 MqttMessage
转换返回 null
时,MqttPahoMessageDrivenChannelAdapter
会向 errorChannel
(如果提供)发送 ErrorMessage
。否则,将此转换错误重新抛出到 MQTT 客户端回调中。
Configuring with Java Configuration
以下 Spring Boot 应用展示了如何通过 Java 配置来配置入站适配器的一个示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
下面的 Spring Boot 应用程序提供使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
Outbound Channel Adapter
出站通道适配器由 MqttPahoMessageHandler
实现,该处理程序包装在 ConsumerEndpoint
中。为了方便起见,您可以使用命名空间对其进行配置。
从版本 4.1 开始,适配器支持异步发送操作,避免在确认交付之前阻塞。您可以发出应用程序事件,以便在需要时使应用程序确认交付。
以下列表显示了出站通道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" 1
url="tcp://localhost:1883" 2
converter="myConverter" 3
client-factory="clientFactory" 4
default-qos="1" 5
qos-expression="" 6
default-retained="true" 7
retained-expression="" 8
default-topic="bar" 9
topic-expression="" 10
async="false" 11
async-events="false" 12
channel="target" />
1 | The client ID. |
2 | The broker URL. |
3 | 一个 MqttMessageConverter (可选)。默认 `DefaultPahoMessageConverter`识别以下标头:
|
4 | The client factory. |
5 | 默认服务质量。如果没有找到 mqtt_qos`标头或 `qos-expression`返回 `null ,则会使用该服务质量。如果你提供了自定义 converter ,则不会使用此服务质量。 |
6 | 用于评估服务质量的表达式。默认值为 headers[mqtt_qos] 。 |
7 | 保留标志的默认值。如果没有找到 mqtt_retained`标头,则使用该值。如果提供了自定义 `converter ,则不会使用此服务质量。 |
8 | 用于评估保留布尔值的表达式。默认值为 headers[mqtt_retained] 。 |
9 | 发送消息的默认主题(如果未找到 `mqtt_topic`标头,则使用该主题)。 |
10 | 用于评估目标主题的表达式。默认值为 headers['mqtt_topic'] 。 |
11 | 当为 true`时,调用方不会阻塞。相反,它在发送消息时等待传送确认。默认值为 `false (在确认传送之前,发送会阻塞)。 |
12 | 当 async`和 `async-events`均为 `true`时,会发出一个 `MqttMessageSentEvent (参见 Events)。它包含消息、主题、客户端库生成的 messageId 、clientId`和 `clientInstance (每当客户端连接时,此值都会增加)。当客户端库确认传送时,会发出一个 MqttMessageDeliveredEvent 。它包含 messageId 、clientId`和 `clientInstance ,从而能将传送与 send()`相关联。任何 `ApplicationListener`或事件入站通道适配器都可以接收这些事件。请注意,在 `MqttMessageSentEvent`之前收到 `MqttMessageDeliveredEvent`是可能的。默认值为 `false 。 |
从 4.1 版开始,可以省略 URL。相反,服务器 URI 可以提供给 |
Configuring with Java Configuration
以下 Spring Boot 应用程序展示了如何使用 Java 配置配置出站适配器:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
Configuring with the Java DSL
以下 Spring Boot 应用程序提供了一个使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
Events
适配器会发布某些应用程序事件。
-
MqttConnectionFailedEvent
-如果我们未能连接或连接随后丢失,则它会由两个适配器发布。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,丢失的连接的cause`是 `null
。 -
MqttMessageSentEvent
-如果以异步模式运行,则 outbound 适配器会在发送消息时发布此事件。 -
MqttMessageDeliveredEvent
-如果以异步模式运行,则 outbound 适配器会在客户端表示已传送消息时发布此事件。 -
MqttSubscribedEvent
-在订阅主题后,inbound 适配器会发布此事件。
这些事件可以通过 ApplicationListener<MqttIntegrationEvent>
或带有 @EventListener
方法接收。
要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 Support
从版本 5.5.5 开始,spring-integration-mqtt
模块为 MQTT v5 协议提供通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client
是一个 optional
依赖项,因此必须明确包含在目标项目中。
由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了 MqttHeaderMapper
实现来在发布和接收操作中进行映射。默认情况下(通过 *
模式),它映射所有接收的 PUBLISH
帧属性(包括用户属性)。在出站方面,它为 PUBLISH
帧映射标头的以下子集:contentType
、mqtt_messageExpiryInterval
、mqtt_responseTopic
、mqtt_correlationData
。
MQTT v5 协议的传出通道适配器以 Mqttv5PahoMessageHandler`的形式出现。它需要一个 `clientId`以及 MQTT 代理 URL 或 `MqttConnectionOptions`引用。它支持 `MqttClientPersistence`选项,在这种情况下可以为 `async`并可以释放 `MqttIntegrationEvent`对象(请参阅 `asyncEvents`选项)。如果请求消息有效载荷是一个 `org.eclipse.paho.mqttv5.common.MqttMessage
,它将通过内部 IMqttAsyncClient`按原样发布。如果有效载荷是 `byte[]
,它会原样用作目标 MqttMessage`要发布的有效载荷。如果有效载荷是一个 `String
,它将被转换为 byte[]`进行发布。其余用例委托给提供的 `MessageConverter
,它是应用程序上下文中一个 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
ConfigurableCompositeMessageConverter`bean。注意:仅当请求的消息有效载荷已经是 `MqttMessage`时,才不会使用提供的 `HeaderMapper<MqttProperties>
。下面的 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter
不能与 Mqttv5PahoMessageHandler
一起使用,因为其合同仅针对 MQTT v3 协议。
如果连接在启动或运行时失败,则 Mqttv5PahoMessageHandler
会尝试在向此处理程序发送下一条消息时重新连接。如果此手动重新连接失败,则连接异常会抛回给调用者。在这种情况下,将应用标准的 Spring Integration 错误处理过程,包括请求处理程序建议,例如重试或断路器。
在 Mqttv5PahoMessageHandler
javadoc 及其超类中查看更多信息。
MQTT v5 协议的传入通道适配器以 Mqttv5PahoMessageDrivenChannelAdapter`的形式出现。它需要一个 `clientId`以及 MQTT 代理 URL 或 `MqttConnectionOptions`引用,外加从中订阅和使用的主题。它支持一个 `MqttClientPersistence`选项,默认情况下为 in-memory。可以配置预期的 `payloadType
(默认情况下为 byte[]
),它传播到提供的 SmartMessageConverter`以从接收到的 `MqttMessage`的 `byte[]`中进行转换。如果设置了 `manualAck`选项,则会将 `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`标头添加到要作为 `SimpleAcknowledgment`实例生成的消息中。`HeaderMapper<MqttProperties>`用于将 `PUBLISH`帧属性(包括用户属性)映射到目标消息标头。标准 `MqttMessage`属性(例如 `qos
、id
、dup
、retained
,以及接收的主题)始终映射到标头。有关更多信息,请参阅 MqttHeaders
。
从 6.3 版开始,Mqttv5PahoMessageDrivenChannelAdapter
基于 MqttSubscription
提供了构造函数,用于细粒度配置而不是普通主题名称。当提供这些订阅时,信道适配器的 qos
选项不可用,因为此 qos
模式是 MqttSubscription
API 的一部分。
以下 Java DSL 配置样本演示了如何在集成流中使用此信道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter
无法与 Mqttv5PahoMessageDrivenChannelAdapter
一起使用,因为其协定仅旨在用于 MQTT v3 协议。
在 Mqttv5PahoMessageDrivenChannelAdapter
javadoc 及其超类中查看更多信息。
建议将 MqttConnectionOptions#setAutomaticReconnect(boolean)
设置为 true,以让内部 IMqttAsyncClient
实例处理重新连接。否则,只有手动的 Mqttv5PahoMessageDrivenChannelAdapter
重新启动才能处理重新连接,例如通过在断开连接时处理 MqttConnectionFailedEvent
。
Shared MQTT Client Support
如果多个集成需要一个 MQTT ClientID,不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数有限制(通常,允许单个连接)。要将一个客户端重复用于不同的信道适配器,可以使用 org.springframework.integration.mqtt.core.ClientManager
组件并将其传递给任何需要的信道适配器。它将管理 MQTT 连接生命周期并且在需要时自动重新连接。此外,可以向客户端管理器提供自定义连接选项和 MqttClientPersistence
,就像当前可以对信道适配器组件执行相同操作一样。
请注意,支持 MQTT v5 和 v3 信道适配器。
以下 Java DSL 配置样本演示了如何在集成流中使用此客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}