返回

用 Spring Integration 巧妙应对 MQTT 消息,问题解决溜溜的

java

使用 Spring Integration 进行 MQTT 消息响应

简介

消息队列遥测传输(MQTT)是一种轻量级的消息协议,用于在设备和服务器之间进行通信。Spring Integration 提供了与 MQTT 代理交互的功能,使应用程序能够接收和响应传入消息。本指南将介绍如何设置 MQTT 接收端,并使用正确的主题向传入消息发送响应。

设置 MQTT 接收端

  1. 添加必要的依赖项:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.0.10.RELEASE</version>
</dependency>
  1. 创建一个配置类来设置 MQTT 接收端:
@Configuration
public class MqttReceiverConfig {

    // MQTT 客户端工厂,用于创建客户端连接
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置服务器 URI,例如 "tcp://127.0.0.1:1883"
        options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
        factory.setConnectionOptions(options);
        return factory;
    }

    // 接收端适配器,用于从 MQTT 代理接收消息
    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient", "#");
        // 设置主题过滤器,例如 "wdzn/#",以接收所有以 "wdzn/" 开头的主题的消息
        adapter.setTopic("wdzn/#");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    // 输入通道,用于接收从 MQTT 接收端适配器接收的消息
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    // 输出通道,用于发送响应消息
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    // 处理器,用于处理传入消息并发送响应
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            // 从消息中获取接收到的主题
            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
            // 根据接收到的主题构造响应主题
            // 例如,将 "wdzn/Online/50001000/Register/0" 响应到 "wdzn/50001000/Register/0/1"
            String sendTopic = topic.replace("wdzn", "wdzn") + "/1";
            // 发送响应消息
            mqttOutbound().sendToMqtt("", sendTopic);
        };
    }

    // 输出消息处理器,用于发送响应消息
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        // 设置默认响应主题
        messageHandler.setDefaultTopic("wdzn/#");
        return messageHandler;
    }
}

发送响应消息

使用 Spring Integration 的 MqttGateway 接口发送响应消息:

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

示例用法

// 构造传入主题为 "wdzn/Online/50001000/Register/0" 的消息
Message<?> message = MessageBuilder.withPayload("Hello MQTT").setHeader("mqtt_receivedTopic", "wdzn/Online/50001000/Register/0").build();

// 处理传入消息,并向主题 "wdzn/50001000/Register/0/1" 发送响应
handler().handleMessage(message);

结论

使用 Spring Integration 和 MQTT,可以轻松创建应用程序,以接收传入 MQTT 消息并使用正确的主题进行响应。这种方法提供了与 MQTT 代理高效交互和实时响应的能力。

常见问题解答

  1. 如何更改接收端适配器的主题过滤器?

    • MqttPahoMessageDrivenChannelAdapter 中设置 topic 属性。
  2. 如何更改响应消息的默认主题?

    • MqttPahoMessageHandler 中设置 defaultTopic 属性。
  3. 如何使用 SSL/TLS 连接到 MQTT 代理?

    • MqttConnectOptions 中设置 setSslProperties
  4. 如何处理异常?

    • MqttPahoMessageDrivenChannelAdapter 中设置 errorHandler
  5. 如何发布二进制有效负载消息?

    • 使用 MqttPahoMessageHandlersetByteArrayConverter 方法。