Forráskód Böngészése

feat:【IoT 物联网】优化 MQTT 配置属性,新增 MQTT 消息处理抽象基类

haohao 5 hónapja
szülő
commit
7b10b59541

+ 33 - 7
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java

@@ -103,39 +103,65 @@ public class IotGatewayProperties {
          */
         @NotNull(message = "是否开启不能为空")
         private Boolean enabled;
+
         /**
-         * HTTP 认证端口
+         * HTTP 认证端口(默认:8090)
          */
         private Integer httpAuthPort = 8090;
+
         /**
          * MQTT 服务器地址
          */
+        @NotEmpty(message = "MQTT 服务器地址不能为空")
         private String mqttHost;
+
         /**
-         * MQTT 服务器端口
+         * MQTT 服务器端口(默认:1883)
          */
-        private Integer mqttPort;
+        @NotNull(message = "MQTT 服务器端口不能为空")
+        private Integer mqttPort = 1883;
+
         /**
          * MQTT 用户名
          */
+        @NotEmpty(message = "MQTT 用户名不能为空")
         private String mqttUsername;
+
         /**
          * MQTT 密码
          */
+        @NotEmpty(message = "MQTT 密码不能为空")
         private String mqttPassword;
+
         /**
-         * MQTT 是否开启 SSL
+         * MQTT 是否开启 SSL(默认:false)
          */
-        private Boolean mqttSsl;
+        @NotNull(message = "MQTT 是否开启 SSL 不能为空")
+        private Boolean mqttSsl = false;
+
         /**
-         * MQTT客户端 ID
+         * MQTT 客户端 ID(如果为空,系统将自动生成)
          */
         private String mqttClientId;
+
         /**
-         * MQTT 主题
+         * MQTT 主题列表
          */
+        @NotEmpty(message = "MQTT 主题不能为空")
         private List<String> mqttTopics;
 
+        /**
+         * 获取 MQTT 客户端 ID,如果未配置则自动生成
+         *
+         * @return MQTT 客户端 ID
+         */
+        public String getMqttClientId() {
+            if (cn.hutool.core.util.StrUtil.isBlank(mqttClientId)) {
+                mqttClientId = "iot-gateway-mqtt-" + System.currentTimeMillis();
+            }
+            return mqttClientId;
+        }
+
     }
 
 }

+ 24 - 3
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONObject;
 import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
+import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
 import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
 import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
 import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -47,19 +48,28 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
 
     @Override
     public void onMessage(IotDeviceMessage message) {
-        log.info("[onMessage][接收到下行消息:{}]", message);
+        log.info("[onMessage][接收到下行消息][messageId: {}][method: {}][deviceId: {}]",
+                message.getId(), message.getMethod(), message.getDeviceId());
         try {
             // 根据消息方法处理不同的下行消息
             String method = message.getMethod();
             if (method == null) {
-                log.warn("[onMessage][消息({})方法为空]", message);
+                log.warn("[onMessage][消息方法为空][messageId: {}][deviceId: {}]",
+                        message.getId(), message.getDeviceId());
+                return;
+            }
+
+            // 过滤上行消息:下行订阅者只处理下行消息
+            if (isUpstreamMessage(method)) {
+                log.debug("[onMessage][忽略上行消息][method: {}][messageId: {}]", method, message.getId());
                 return;
             }
 
             // 处理下行消息
             handleDownstreamMessage(message);
         } catch (Exception e) {
-            log.error("[onMessage][处理下行消息失败:{}]", message, e);
+            log.error("[onMessage][处理下行消息失败][messageId: {}][method: {}][deviceId: {}]",
+                    message.getId(), message.getMethod(), message.getDeviceId(), e);
         }
     }
 
@@ -155,4 +165,15 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
         return payload;
     }
 
+    /**
+     * 判断是否为上行消息
+     *
+     * @param method 消息方法
+     * @return 是否为上行消息
+     */
+    private boolean isUpstreamMessage(String method) {
+        IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method);
+        return methodEnum != null && methodEnum.getUpstream();
+    }
+
 }

+ 43 - 20
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java

@@ -37,10 +37,20 @@ import java.util.concurrent.TimeUnit;
 public class IotMqttUpstreamProtocol {
 
     /**
-     * 默认 QoS 级别
+     * 默认 QoS 级别 - 至少一次
      */
     private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
 
+    /**
+     * 连接超时时间(秒)
+     */
+    private static final int CONNECT_TIMEOUT_SECONDS = 10;
+
+    /**
+     * 重连延迟时间(毫秒)
+     */
+    private static final long RECONNECT_DELAY_MS = 5000;
+
     private final IotGatewayProperties.EmqxProperties emqxProperties;
 
     // 共享资源
@@ -227,25 +237,26 @@ public class IotMqttUpstreamProtocol {
      * 连接 MQTT Broker 并订阅主题
      */
     private void connectMqtt() {
+        // 参数校验
         String host = emqxProperties.getMqttHost();
         Integer port = emqxProperties.getMqttPort();
 
         if (StrUtil.isBlank(host)) {
-            String msg = "[connectMqtt][MQTT Host 为空,无法连接]";
-            log.error(msg);
-            return;
+            log.error("[connectMqtt][MQTT Host 为空,无法连接]");
+            throw new IllegalArgumentException("MQTT Host 不能为空");
         }
-        if (port == null) {
-            log.warn("[connectMqtt][MQTT Port 为 null,使用默认端口 1883]");
-            port = 1883;
+        if (port == null || port <= 0) {
+            log.error("[connectMqtt][MQTT Port 无效:{}]", port);
+            throw new IllegalArgumentException("MQTT Port 必须为正整数");
         }
 
-        final Integer finalPort = port;
-        CompletableFuture<Void> connectFuture = mqttClient.connect(finalPort, host)
+        log.info("[connectMqtt][开始连接 MQTT Broker][host: {}][port: {}]", host, port);
+
+        CompletableFuture<Void> connectFuture = mqttClient.connect(port, host)
                 .toCompletionStage()
                 .toCompletableFuture()
                 .thenAccept(connAck -> {
-                    log.info("[connectMqtt][MQTT 客户端连接成功]");
+                    log.info("[connectMqtt][MQTT 客户端连接成功][host: {}][port: {}]", host, port);
                     // 设置断开重连监听器
                     mqttClient.closeHandler(closeEvent -> {
                         log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
@@ -257,17 +268,19 @@ public class IotMqttUpstreamProtocol {
                     subscribeToTopics();
                 })
                 .exceptionally(error -> {
-                    log.error("[connectMqtt][连接 MQTT Broker 失败]", error);
+                    log.error("[connectMqtt][连接 MQTT Broker 失败][host: {}][port: {}]", host, port, error);
+                    // 连接失败时也要尝试重连
                     reconnectWithDelay();
                     return null;
                 });
 
         // 等待连接完成
         try {
-            connectFuture.get(10, TimeUnit.SECONDS);
+            connectFuture.get(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             log.info("[connectMqtt][MQTT 客户端启动完成]");
         } catch (Exception e) {
             log.error("[connectMqtt][MQTT 客户端启动失败]", e);
+            throw new RuntimeException("MQTT 客户端启动失败", e);
         }
     }
 
@@ -284,15 +297,19 @@ public class IotMqttUpstreamProtocol {
      */
     private void subscribeToTopics() {
         List<String> topicList = emqxProperties.getMqttTopics();
-        if (CollUtil.isEmpty(topicList)) {
-            log.warn("[subscribeToTopics][没有配置要订阅的主题]");
-            return;
-        }
+        // @NotEmpty 注解已保证 topicList 不为空,无需重复校验
+
+        log.info("[subscribeToTopics][开始订阅主题,共 {} 个]", topicList.size());
 
         for (String topic : topicList) {
+            if (StrUtil.isBlank(topic)) {
+                log.warn("[subscribeToTopics][跳过空主题]");
+                continue;
+            }
+
             mqttClient.subscribe(topic, DEFAULT_QOS.value(), subscribeResult -> {
                 if (subscribeResult.succeeded()) {
-                    log.info("[subscribeToTopics][订阅主题成功: {}]", topic);
+                    log.info("[subscribeToTopics][订阅主题成功: {}][QoS: {}]", topic, DEFAULT_QOS.value());
                 } else {
                     log.error("[subscribeToTopics][订阅主题失败: {}]", topic, subscribeResult.cause());
                 }
@@ -304,10 +321,16 @@ public class IotMqttUpstreamProtocol {
      * 延迟重连
      */
     private void reconnectWithDelay() {
-        vertx.setTimer(5000, timerId -> {
+        vertx.setTimer(RECONNECT_DELAY_MS, timerId -> {
             if (isRunning && (mqttClient == null || !mqttClient.isConnected())) {
-                log.info("[reconnectWithDelay][开始重连 MQTT Broker]");
-                connectMqtt();
+                log.info("[reconnectWithDelay][开始重连 MQTT Broker,延迟 {} 毫秒]", RECONNECT_DELAY_MS);
+                try {
+                    connectMqtt();
+                } catch (Exception e) {
+                    log.error("[reconnectWithDelay][重连失败,将继续尝试重连]", e);
+                    // 重连失败时继续尝试重连
+                    reconnectWithDelay();
+                }
             }
         });
     }

+ 94 - 0
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttAbstractHandler.java

@@ -0,0 +1,94 @@
+package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
+
+import cn.hutool.core.util.StrUtil;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * IoT 网关 MQTT 协议的处理器抽象基类
+ * <p>
+ * 提供通用的异常处理、参数校验等功能
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public abstract class IotMqttAbstractHandler {
+
+    /**
+     * 处理 MQTT 消息的模板方法
+     *
+     * @param topic   主题
+     * @param payload 消息内容
+     */
+    public final void handle(String topic, String payload) {
+        try {
+            // 1. 前置校验
+            if (!validateInput(topic, payload)) {
+                return;
+            }
+
+            // 2. 执行具体逻辑
+            doHandle(topic, payload);
+
+        } catch (Exception e) {
+            log.error("[handle][处理 MQTT 消息失败][topic: {}][payload: {}]", topic, payload, e);
+            handleException(topic, payload, e);
+        }
+    }
+
+    /**
+     * 具体的处理逻辑,由子类实现
+     *
+     * @param topic   主题
+     * @param payload 消息内容
+     */
+    protected abstract void doHandle(String topic, String payload);
+
+    /**
+     * 输入参数校验
+     *
+     * @param topic   主题
+     * @param payload 消息内容
+     * @return 校验是否通过
+     */
+    protected boolean validateInput(String topic, String payload) {
+        if (StrUtil.isBlank(topic)) {
+            log.warn("[validateInput][主题为空,忽略消息]");
+            return false;
+        }
+
+        if (StrUtil.isBlank(payload)) {
+            log.warn("[validateInput][消息内容为空][topic: {}]", topic);
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * 异常处理
+     *
+     * @param topic   主题
+     * @param payload 消息内容
+     * @param e       异常
+     */
+    protected void handleException(String topic, String payload, Exception e) {
+        // 默认实现:记录错误日志
+        // 子类可以重写此方法,添加特定的异常处理逻辑
+        log.error("[handleException][MQTT 消息处理异常][topic: {}]", topic, e);
+    }
+
+    /**
+     * 解析主题,获取主题各部分
+     *
+     * @param topic 主题
+     * @return 主题各部分数组,如果解析失败返回 null
+     */
+    protected String[] parseTopic(String topic) {
+        String[] topicParts = topic.split("/");
+        if (topicParts.length < 7) {
+            log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
+            return null;
+        }
+        return topicParts;
+    }
+}

+ 31 - 9
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttHttpAuthHandler.java

@@ -13,16 +13,38 @@ import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.RoutingContext;
 import lombok.extern.slf4j.Slf4j;
 
+import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
+
 /**
  * IoT 网关 MQTT HTTP 认证处理器
  * <p>
- * 处理 EMQX 的认证请求和事件钩子
+ * 处理 EMQX 的认证请求和事件钩子,提供统一的错误处理和参数校验
  *
  * @author 芋道源码
  */
 @Slf4j
 public class IotMqttHttpAuthHandler {
 
+    /**
+     * 认证成功状态码
+     */
+    private static final int SUCCESS_STATUS_CODE = 200;
+
+    /**
+     * 参数错误状态码
+     */
+    private static final int BAD_REQUEST_STATUS_CODE = 400;
+
+    /**
+     * 认证失败状态码
+     */
+    private static final int UNAUTHORIZED_STATUS_CODE = 401;
+
+    /**
+     * 服务器错误状态码
+     */
+    private static final int INTERNAL_ERROR_STATUS_CODE = 500;
+
     /**
      * EMQX 认证接口
      */
@@ -44,7 +66,7 @@ public class IotMqttHttpAuthHandler {
             // 参数校验
             if (StrUtil.isEmpty(clientid) || StrUtil.isEmpty(username) || StrUtil.isEmpty(password)) {
                 log.warn("[authenticate][认证参数不完整][clientId: {}][username: {}]", clientid, username);
-                sendErrorResponse(context, 400, "认证参数不完整");
+                sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "认证参数不完整");
                 return;
             }
 
@@ -58,7 +80,7 @@ public class IotMqttHttpAuthHandler {
             result.checkError();
             if (!BooleanUtil.isTrue(result.getData())) {
                 log.warn("[authenticate][设备认证失败][clientId: {}][username: {}]", clientid, username);
-                sendErrorResponse(context, 401, "设备认证失败");
+                sendErrorResponse(context, UNAUTHORIZED_STATUS_CODE, DEVICE_AUTH_FAIL.getMsg());
                 return;
             }
 
@@ -67,7 +89,7 @@ public class IotMqttHttpAuthHandler {
 
         } catch (Exception e) {
             log.error("[authenticate][设备认证异常]", e);
-            sendErrorResponse(context, 500, "认证服务异常");
+            sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "认证服务异常");
         }
     }
 
@@ -79,7 +101,7 @@ public class IotMqttHttpAuthHandler {
             // 解析请求体
             JsonObject body = context.body().asJsonObject();
             if (body == null) {
-                sendErrorResponse(context, 400, "请求体不能为空");
+                sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空");
                 return;
             }
 
@@ -94,7 +116,7 @@ public class IotMqttHttpAuthHandler {
 
         } catch (Exception e) {
             log.error("[connected][处理设备连接事件失败]", e);
-            sendErrorResponse(context, 500, "处理失败");
+            sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败");
         }
     }
 
@@ -106,7 +128,7 @@ public class IotMqttHttpAuthHandler {
             // 解析请求体
             JsonObject body = context.body().asJsonObject();
             if (body == null) {
-                sendErrorResponse(context, 400, "请求体不能为空");
+                sendErrorResponse(context, BAD_REQUEST_STATUS_CODE, "请求体不能为空");
                 return;
             }
 
@@ -123,7 +145,7 @@ public class IotMqttHttpAuthHandler {
 
         } catch (Exception e) {
             log.error("[disconnected][处理设备断开连接事件失败]", e);
-            sendErrorResponse(context, 500, "处理失败");
+            sendErrorResponse(context, INTERNAL_ERROR_STATUS_CODE, "处理失败");
         }
     }
 
@@ -175,7 +197,7 @@ public class IotMqttHttpAuthHandler {
      */
     private void sendSuccessResponse(RoutingContext context, String message) {
         context.response()
-                .setStatusCode(200)
+                .setStatusCode(SUCCESS_STATUS_CODE)
                 .putHeader("Content-Type", "text/plain; charset=utf-8")
                 .end(message);
     }

+ 19 - 58
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java

@@ -1,9 +1,7 @@
 package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
 
-import cn.hutool.core.util.StrUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
 import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
 import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
 import io.vertx.mqtt.messages.MqttPublishMessage;
@@ -19,14 +17,12 @@ import java.nio.charset.StandardCharsets;
  * @author 芋道源码
  */
 @Slf4j
-public class IotMqttUpstreamHandler {
+public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
 
-    private final IotDeviceMessageProducer deviceMessageProducer;
     private final IotDeviceMessageService deviceMessageService;
     private final String serverId;
 
     public IotMqttUpstreamHandler(cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol protocol) {
-        this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
         this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
         this.serverId = protocol.getServerId();
     }
@@ -38,41 +34,22 @@ public class IotMqttUpstreamHandler {
         String topic = message.topicName();
         String payload = message.payload().toString(StandardCharsets.UTF_8);
 
-        if (StrUtil.isBlank(topic)) {
-            log.warn("[handle][主题为空,忽略消息]");
-            return;
-        }
-
-        if (StrUtil.isBlank(payload)) {
-            log.warn("[handle][消息内容为空][topic: {}]", topic);
-            return;
-        }
-
         log.debug("[handle][收到 MQTT 消息][topic: {}]", topic);
+        // 调用父类的 handle 方法,父类会进行参数校验
         handle(topic, payload);
     }
 
-    /**
-     * 处理 MQTT 消息
-     *
-     * @param topic   主题
-     * @param payload 消息内容
-     */
-    public void handle(String topic, String payload) {
-        try {
-            // 1. 识别并验证消息类型
-            String messageType = getMessageType(topic);
-            if (messageType == null) {
-                log.warn("[handle][未知的消息类型][topic: {}]", topic);
-                return;
-            }
-
-            // 2. 处理消息
-            processMessage(topic, payload, messageType);
-
-        } catch (Exception e) {
-            log.error("[handle][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
+    @Override
+    protected void doHandle(String topic, String payload) {
+        // 1. 识别并验证消息类型
+        String messageType = getMessageType(topic);
+        if (messageType == null) {
+            log.warn("[doHandle][未知的消息类型][topic: {}]", topic);
+            return;
         }
+
+        // 2. 处理消息
+        processMessage(topic, payload, messageType);
     }
 
     /**
@@ -109,39 +86,37 @@ public class IotMqttUpstreamHandler {
      * @return 消息类型描述,如果不支持返回 null
      */
     private String getMessageType(String topic) {
-        if (StrUtil.isBlank(topic)) {
-            return null;
-        }
+        // 此方法由 doHandle 调用,topic 已经在父类中校验过,无需重复校验
 
         // 按优先级匹配主题类型,避免误匹配
 
-        // 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/event/property/post
+        // 1. 设备属性上报: /sys/{productKey}/{deviceName}/thing/property/post
         if (isPropertyPostTopic(topic)) {
             return IotDeviceTopicEnum.PROPERTY_POST_TOPIC.getDescription();
         }
 
-        // 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/event/{eventIdentifier}/post
+        // 2. 设备事件上报: /sys/{productKey}/{deviceName}/thing/{eventIdentifier}/post
         if (isEventPostTopic(topic)) {
             return "设备事件上报";
         }
 
-        // 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/service/property/set_reply
+        // 3. 设备属性设置响应: /sys/{productKey}/{deviceName}/thing/property/set_reply
         if (isPropertySetReplyTopic(topic)) {
             return "设备属性设置响应";
         }
 
-        // 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/service/property/get_reply
+        // 4. 设备属性获取响应: /sys/{productKey}/{deviceName}/thing/property/get_reply
         if (isPropertyGetReplyTopic(topic)) {
             return "设备属性获取响应";
         }
 
-        // 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/service/config/set_reply
+        // 5. 设备配置设置响应: /sys/{productKey}/{deviceName}/thing/config/set_reply
         if (isConfigSetReplyTopic(topic)) {
             return IotDeviceTopicEnum.CONFIG_SET_TOPIC.getDescription() + "响应";
         }
 
         // 6. 设备 OTA 升级响应:
-        // /sys/{productKey}/{deviceName}/thing/service/ota/upgrade_reply
+        // /sys/{productKey}/{deviceName}/thing/ota/upgrade_reply
         if (isOtaUpgradeReplyTopic(topic)) {
             return IotDeviceTopicEnum.OTA_UPGRADE_TOPIC.getDescription() + "响应";
         }
@@ -214,18 +189,4 @@ public class IotMqttUpstreamHandler {
                 && !topic.contains("ota");
     }
 
-    /**
-     * 解析主题,获取主题各部分
-     *
-     * @param topic 主题
-     * @return 主题各部分数组,如果解析失败返回 null
-     */
-    private String[] parseTopic(String topic) {
-        String[] topicParts = topic.split("/");
-        if (topicParts.length < 7) {
-            log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
-            return null;
-        }
-        return topicParts;
-    }
 }