|
@@ -1,13 +1,13 @@
|
|
|
-package cn.iocoder.yudao.module.iot.component.emqx.upstream;
|
|
|
|
|
|
|
+package cn.iocoder.yudao.module.iot.net.component.emqx.upstream;
|
|
|
|
|
|
|
|
import cn.hutool.core.util.ArrayUtil;
|
|
import cn.hutool.core.util.ArrayUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
|
|
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
|
|
|
-import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
|
|
|
|
|
-import cn.iocoder.yudao.module.iot.component.emqx.config.IotComponentEmqxProperties;
|
|
|
|
|
-import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceAuthVertxHandler;
|
|
|
|
|
-import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceMqttMessageHandler;
|
|
|
|
|
-import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceWebhookVertxHandler;
|
|
|
|
|
|
|
+import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
|
|
|
|
|
+import cn.iocoder.yudao.module.iot.net.component.emqx.config.IotNetComponentEmqxProperties;
|
|
|
|
|
+import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceAuthVertxHandler;
|
|
|
|
|
+import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceMqttMessageHandler;
|
|
|
|
|
+import cn.iocoder.yudao.module.iot.net.component.emqx.upstream.router.IotDeviceWebhookVertxHandler;
|
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|
|
import io.vertx.core.Future;
|
|
import io.vertx.core.Future;
|
|
|
import io.vertx.core.Vertx;
|
|
import io.vertx.core.Vertx;
|
|
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
|
|
|
|
|
|
|
+ * IoT 设备上行服务端,接收来自 device 设备的请求,转发给 server 服务器
|
|
|
* <p>
|
|
* <p>
|
|
|
* 协议:HTTP、MQTT
|
|
* 协议:HTTP、MQTT
|
|
|
*
|
|
*
|
|
@@ -30,15 +30,6 @@ import java.util.concurrent.TimeUnit;
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
public class IotDeviceUpstreamServer {
|
|
public class IotDeviceUpstreamServer {
|
|
|
|
|
|
|
|
- // TODO @haohao:抽到 IotComponentEmqxProperties 里?
|
|
|
|
|
- /**
|
|
|
|
|
- * 重连延迟时间(毫秒)
|
|
|
|
|
- */
|
|
|
|
|
- private static final int RECONNECT_DELAY_MS = 5000;
|
|
|
|
|
- /**
|
|
|
|
|
- * 连接超时时间(毫秒)
|
|
|
|
|
- */
|
|
|
|
|
- private static final int CONNECTION_TIMEOUT_MS = 10000;
|
|
|
|
|
/**
|
|
/**
|
|
|
* 默认 QoS 级别
|
|
* 默认 QoS 级别
|
|
|
*/
|
|
*/
|
|
@@ -47,20 +38,20 @@ public class IotDeviceUpstreamServer {
|
|
|
private final Vertx vertx;
|
|
private final Vertx vertx;
|
|
|
private final HttpServer server;
|
|
private final HttpServer server;
|
|
|
private final MqttClient client;
|
|
private final MqttClient client;
|
|
|
- private final IotComponentEmqxProperties emqxProperties;
|
|
|
|
|
|
|
+ private final IotNetComponentEmqxProperties emqxProperties;
|
|
|
private final IotDeviceMqttMessageHandler mqttMessageHandler;
|
|
private final IotDeviceMqttMessageHandler mqttMessageHandler;
|
|
|
- private final IotComponentRegistry componentRegistry;
|
|
|
|
|
|
|
+ private final IotNetComponentRegistry componentRegistry;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 服务运行状态标志
|
|
* 服务运行状态标志
|
|
|
*/
|
|
*/
|
|
|
private volatile boolean isRunning = false;
|
|
private volatile boolean isRunning = false;
|
|
|
|
|
|
|
|
- public IotDeviceUpstreamServer(IotComponentEmqxProperties emqxProperties,
|
|
|
|
|
|
|
+ public IotDeviceUpstreamServer(IotNetComponentEmqxProperties emqxProperties,
|
|
|
IotDeviceUpstreamApi deviceUpstreamApi,
|
|
IotDeviceUpstreamApi deviceUpstreamApi,
|
|
|
Vertx vertx,
|
|
Vertx vertx,
|
|
|
MqttClient client,
|
|
MqttClient client,
|
|
|
- IotComponentRegistry componentRegistry) {
|
|
|
|
|
|
|
+ IotNetComponentRegistry componentRegistry) {
|
|
|
this.vertx = vertx;
|
|
this.vertx = vertx;
|
|
|
this.emqxProperties = emqxProperties;
|
|
this.emqxProperties = emqxProperties;
|
|
|
this.client = client;
|
|
this.client = client;
|
|
@@ -70,8 +61,7 @@ public class IotDeviceUpstreamServer {
|
|
|
Router router = Router.router(vertx);
|
|
Router router = Router.router(vertx);
|
|
|
router.route().handler(BodyHandler.create()); // 处理 Body
|
|
router.route().handler(BodyHandler.create()); // 处理 Body
|
|
|
router.post(IotDeviceAuthVertxHandler.PATH)
|
|
router.post(IotDeviceAuthVertxHandler.PATH)
|
|
|
- // TODO @haohao:疑问,mqtt 的认证,需要通过 http 呀?
|
|
|
|
|
- // 回复:MQTT 认证不必须通过 HTTP 进行,但 HTTP 认证是 EMQX 等 MQTT 服务器支持的一种灵活的认证方式
|
|
|
|
|
|
|
+ // MQTT 认证不必须通过 HTTP 进行,但 HTTP 认证是 EMQX 等 MQTT 服务器支持的一种灵活的认证方式
|
|
|
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
|
|
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
|
|
|
// 添加 Webhook 处理器,用于处理设备连接和断开连接事件
|
|
// 添加 Webhook 处理器,用于处理设备连接和断开连接事件
|
|
|
router.post(IotDeviceWebhookVertxHandler.PATH)
|
|
router.post(IotDeviceWebhookVertxHandler.PATH)
|
|
@@ -91,15 +81,20 @@ public class IotDeviceUpstreamServer {
|
|
|
}
|
|
}
|
|
|
log.info("[start][开始启动服务]");
|
|
log.info("[start][开始启动服务]");
|
|
|
|
|
|
|
|
- // 检查authPort是否为null
|
|
|
|
|
|
|
+ // 检查 authPort 是否为 null
|
|
|
Integer authPort = emqxProperties.getAuthPort();
|
|
Integer authPort = emqxProperties.getAuthPort();
|
|
|
if (authPort == null) {
|
|
if (authPort == null) {
|
|
|
- log.warn("[start][authPort为null,使用默认端口8080]");
|
|
|
|
|
|
|
+ log.warn("[start][authPort 为 null,使用默认端口 8080]");
|
|
|
authPort = 8080; // 默认端口
|
|
authPort = 8080; // 默认端口
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // 获取连接超时时间
|
|
|
|
|
+ int connectionTimeoutMs = emqxProperties.getConnectionTimeoutMs() != null
|
|
|
|
|
+ ? emqxProperties.getConnectionTimeoutMs()
|
|
|
|
|
+ : 10000;
|
|
|
|
|
+
|
|
|
// 1. 启动 HTTP 服务器
|
|
// 1. 启动 HTTP 服务器
|
|
|
- final Integer finalAuthPort = authPort; // 为lambda表达式创建final变量
|
|
|
|
|
|
|
+ final Integer finalAuthPort = authPort; // 为 lambda 表达式创建 final 变量
|
|
|
CompletableFuture<Void> httpFuture = server.listen(finalAuthPort)
|
|
CompletableFuture<Void> httpFuture = server.listen(finalAuthPort)
|
|
|
.toCompletionStage()
|
|
.toCompletionStage()
|
|
|
.toCompletableFuture()
|
|
.toCompletableFuture()
|
|
@@ -115,13 +110,13 @@ public class IotDeviceUpstreamServer {
|
|
|
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
|
|
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
|
|
|
reconnectWithDelay();
|
|
reconnectWithDelay();
|
|
|
});
|
|
});
|
|
|
- // 2. 设置 MQTT 消息处理器
|
|
|
|
|
|
|
+ // 2.2 设置 MQTT 消息处理器
|
|
|
setupMessageHandler();
|
|
setupMessageHandler();
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
// 3. 等待所有服务启动完成
|
|
// 3. 等待所有服务启动完成
|
|
|
CompletableFuture.allOf(httpFuture, mqttFuture)
|
|
CompletableFuture.allOf(httpFuture, mqttFuture)
|
|
|
- .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
|
|
|
|
|
|
|
+ .orTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS)
|
|
|
.whenComplete((result, error) -> {
|
|
.whenComplete((result, error) -> {
|
|
|
if (error != null) {
|
|
if (error != null) {
|
|
|
log.error("[start][服务启动失败]", error);
|
|
log.error("[start][服务启动失败]", error);
|
|
@@ -149,7 +144,12 @@ public class IotDeviceUpstreamServer {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- vertx.setTimer(RECONNECT_DELAY_MS, id -> {
|
|
|
|
|
|
|
+ // 获取重连延迟时间
|
|
|
|
|
+ int reconnectDelayMs = emqxProperties.getReconnectDelayMs() != null
|
|
|
|
|
+ ? emqxProperties.getReconnectDelayMs()
|
|
|
|
|
+ : 5000;
|
|
|
|
|
+
|
|
|
|
|
+ vertx.setTimer(reconnectDelayMs, id -> {
|
|
|
log.info("[reconnectWithDelay][开始重新连接 MQTT]");
|
|
log.info("[reconnectWithDelay][开始重新连接 MQTT]");
|
|
|
connectMqtt();
|
|
connectMqtt();
|
|
|
});
|
|
});
|
|
@@ -158,14 +158,14 @@ public class IotDeviceUpstreamServer {
|
|
|
/**
|
|
/**
|
|
|
* 连接 MQTT Broker 并订阅主题
|
|
* 连接 MQTT Broker 并订阅主题
|
|
|
*
|
|
*
|
|
|
- * @return 连接结果的Future
|
|
|
|
|
|
|
+ * @return 连接结果的 Future
|
|
|
*/
|
|
*/
|
|
|
private Future<Void> connectMqtt() {
|
|
private Future<Void> connectMqtt() {
|
|
|
// 检查必要的 MQTT 配置
|
|
// 检查必要的 MQTT 配置
|
|
|
String host = emqxProperties.getMqttHost();
|
|
String host = emqxProperties.getMqttHost();
|
|
|
Integer port = emqxProperties.getMqttPort();
|
|
Integer port = emqxProperties.getMqttPort();
|
|
|
- if (host == null) {
|
|
|
|
|
- String msg = "[connectMqtt][MQTT Host 为 null,无法连接]";
|
|
|
|
|
|
|
+ if (StrUtil.isBlank(host)) {
|
|
|
|
|
+ String msg = "[connectMqtt][MQTT Host 为空,无法连接]";
|
|
|
log.error(msg);
|
|
log.error(msg);
|
|
|
return Future.failedFuture(new IllegalStateException(msg));
|
|
return Future.failedFuture(new IllegalStateException(msg));
|
|
|
}
|
|
}
|
|
@@ -177,11 +177,11 @@ public class IotDeviceUpstreamServer {
|
|
|
final Integer finalPort = port;
|
|
final Integer finalPort = port;
|
|
|
return client.connect(finalPort, host)
|
|
return client.connect(finalPort, host)
|
|
|
.compose(connAck -> {
|
|
.compose(connAck -> {
|
|
|
- log.info("[connectMqtt][MQTT客户端连接成功]");
|
|
|
|
|
|
|
+ log.info("[connectMqtt][MQTT 客户端连接成功]");
|
|
|
return subscribeToTopics();
|
|
return subscribeToTopics();
|
|
|
})
|
|
})
|
|
|
.recover(error -> {
|
|
.recover(error -> {
|
|
|
- log.error("[connectMqtt][连接MQTT Broker失败:]", error);
|
|
|
|
|
|
|
+ log.error("[connectMqtt][连接 MQTT Broker 失败:]", error);
|
|
|
reconnectWithDelay();
|
|
reconnectWithDelay();
|
|
|
return Future.failedFuture(error);
|
|
return Future.failedFuture(error);
|
|
|
});
|
|
});
|
|
@@ -198,62 +198,67 @@ public class IotDeviceUpstreamServer {
|
|
|
log.warn("[subscribeToTopics][未配置 MQTT 主题或为 null,使用默认主题]");
|
|
log.warn("[subscribeToTopics][未配置 MQTT 主题或为 null,使用默认主题]");
|
|
|
topics = new String[]{"/device/#"}; // 默认订阅所有设备上下行主题
|
|
topics = new String[]{"/device/#"}; // 默认订阅所有设备上下行主题
|
|
|
}
|
|
}
|
|
|
- log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
|
|
|
|
|
|
|
|
|
|
- Future<Void> compositeFuture = Future.succeededFuture();
|
|
|
|
|
|
|
+ // 使用协调器追踪多个 Future 的完成状态
|
|
|
|
|
+ Future<Void> result = Future.succeededFuture();
|
|
|
for (String topic : topics) {
|
|
for (String topic : topics) {
|
|
|
- String trimmedTopic = StrUtil.trim(topic);
|
|
|
|
|
- if (StrUtil.isBlank(trimmedTopic)) {
|
|
|
|
|
|
|
+ if (StrUtil.isBlank(topic)) {
|
|
|
|
|
+ log.warn("[subscribeToTopics][跳过空主题]");
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
- compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())
|
|
|
|
|
|
|
+
|
|
|
|
|
+ result = result.compose(v -> client.subscribe(topic, DEFAULT_QOS.value())
|
|
|
.<Void>map(ack -> {
|
|
.<Void>map(ack -> {
|
|
|
- log.info("[subscribeToTopics][成功订阅主题: {}]", trimmedTopic);
|
|
|
|
|
|
|
+ log.info("[subscribeToTopics][订阅主题成功: {}]", topic);
|
|
|
return null;
|
|
return null;
|
|
|
})
|
|
})
|
|
|
- .recover(error -> {
|
|
|
|
|
- log.error("[subscribeToTopics][订阅主题失败: {}]", trimmedTopic, error);
|
|
|
|
|
- return Future.<Void>succeededFuture(); // 继续订阅其他主题
|
|
|
|
|
|
|
+ .recover(err -> {
|
|
|
|
|
+ log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err);
|
|
|
|
|
+ return Future.failedFuture(err);
|
|
|
}));
|
|
}));
|
|
|
}
|
|
}
|
|
|
- return compositeFuture;
|
|
|
|
|
|
|
+ return result;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 停止所有服务
|
|
|
|
|
|
|
+ * 停止服务
|
|
|
*/
|
|
*/
|
|
|
public void stop() {
|
|
public void stop() {
|
|
|
if (!isRunning) {
|
|
if (!isRunning) {
|
|
|
- log.warn("[stop][服务未运行,无需停止]");
|
|
|
|
|
|
|
+ log.warn("[stop][服务已经停止,无需再次停止]");
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
- log.info("[stop][开始关闭服务]");
|
|
|
|
|
- isRunning = false;
|
|
|
|
|
|
|
+ log.info("[stop][开始停止服务]");
|
|
|
|
|
+
|
|
|
|
|
+ // 1. 取消 MQTT 主题订阅
|
|
|
|
|
+ if (client.isConnected()) {
|
|
|
|
|
+ for (String topic : emqxProperties.getMqttTopics()) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ client.unsubscribe(topic);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.warn("[stop][取消订阅主题异常: {}]", topic, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
+ // 2. 关闭 MQTT 客户端
|
|
|
try {
|
|
try {
|
|
|
- CompletableFuture<Void> serverFuture = server != null
|
|
|
|
|
- ? server.close().toCompletionStage().toCompletableFuture()
|
|
|
|
|
- : CompletableFuture.completedFuture(null);
|
|
|
|
|
- CompletableFuture<Void> clientFuture = client != null
|
|
|
|
|
- ? client.disconnect().toCompletionStage().toCompletableFuture()
|
|
|
|
|
- : CompletableFuture.completedFuture(null);
|
|
|
|
|
- CompletableFuture<Void> vertxFuture = vertx != null
|
|
|
|
|
- ? vertx.close().toCompletionStage().toCompletableFuture()
|
|
|
|
|
- : CompletableFuture.completedFuture(null);
|
|
|
|
|
|
|
+ if (client.isConnected()) {
|
|
|
|
|
+ client.disconnect();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.warn("[stop][关闭 MQTT 客户端异常]", e);
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 等待所有资源关闭
|
|
|
|
|
- CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture)
|
|
|
|
|
- .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
|
|
|
|
|
- .whenComplete((result, error) -> {
|
|
|
|
|
- if (error != null) {
|
|
|
|
|
- log.error("[stop][服务关闭过程中发生异常]", error);
|
|
|
|
|
- } else {
|
|
|
|
|
- log.info("[stop][所有服务关闭完成]");
|
|
|
|
|
- }
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // 3. 关闭 HTTP 服务器
|
|
|
|
|
+ try {
|
|
|
|
|
+ server.close();
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("[stop][关闭服务异常]", e);
|
|
|
|
|
- throw new RuntimeException("关闭 IoT 设备上行服务失败", e);
|
|
|
|
|
|
|
+ log.warn("[stop][关闭 HTTP 服务器异常]", e);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 更新状态
|
|
|
|
|
+ isRunning = false;
|
|
|
|
|
+ log.info("[stop][服务已停止]");
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|