Przeglądaj źródła

feat:【IoT 物联网】增加消息总线(messagebus)的 rocketmq 实现

YunaiV 11 miesięcy temu
rodzic
commit
70a0df54e4
11 zmienionych plików z 514 dodań i 129 usunięć
  1. 12 0
      yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java
  2. 19 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java
  3. 0 69
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java
  4. 1 2
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java
  5. 10 8
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java
  6. 35 5
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java
  7. 98 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBus.java
  8. 12 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/TestMessage.java
  9. 56 41
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java
  10. 271 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java
  11. 0 4
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml

+ 12 - 0
yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java

@@ -99,6 +99,18 @@ public class JsonUtils {
         }
     }
 
+    public static <T> T parseObject(byte[] text, Type type) {
+        if (ArrayUtil.isEmpty(text)) {
+            return null;
+        }
+        try {
+            return objectMapper.readValue(text, objectMapper.getTypeFactory().constructType(type));
+        } catch (IOException e) {
+            log.error("json parse err,json:{}", text, e);
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * 将字符串解析成指定类型的对象
      * 使用 {@link #parseObject(String, Class)} 时,在@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) 的场景下,

+ 19 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java

@@ -2,8 +2,12 @@ package cn.iocoder.yudao.module.iot.messagebus.config;
 
 import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
 import cn.iocoder.yudao.module.iot.messagebus.core.local.LocalIotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.rocketmq.RocketMQIotMessageBus;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ApplicationContext;
@@ -34,4 +38,19 @@ public class IotMessageBusAutoConfiguration {
 
     }
 
+    // ==================== RocketMQ 实现 ====================
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rocketmq")
+    @ConditionalOnClass(RocketMQTemplate.class)
+    public static class RocketMQIotMessageBusConfiguration {
+
+        @Bean
+        public IotMessageBus rocketMQIotMessageBus(RocketMQProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate) {
+            log.info("[rocketMQIotMessageBus][创建 RocketMQ IoT 消息总线]");
+            return new RocketMQIotMessageBus(rocketMQProperties, rocketMQTemplate);
+        }
+
+    }
+
 }

+ 0 - 69
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java

@@ -1,69 +0,0 @@
-package cn.iocoder.yudao.module.iot.messagebus.core;
-
-import cn.hutool.core.collection.CollUtil;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * IoT 消息总线抽象基类
- *
- * 提供通用的订阅者管理功能
- *
- * @author 芋道源码
- */
-@Slf4j
-public abstract class AbstractIotMessageBus implements IotMessageBus {
-
-    /**
-     * 订阅者映射表
-     * Key: topic
-     */
-    private final Map<String, List<IotMessageBusSubscriber<?>>> subscribers = new ConcurrentHashMap<>();
-
-    @Override
-    public void register(String topic, IotMessageBusSubscriber<?> subscriber) {
-        // 执行注册
-        doRegister(topic, subscriber);
-
-        // 添加订阅者映射
-        List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>());
-        topicSubscribers.add(subscriber);
-        topicSubscribers.sort(Comparator.comparingInt(IotMessageBusSubscriber::order));
-        log.info("[register][topic({}) 注册订阅者({})成功]", topic, subscriber.getClass().getName());
-    }
-
-    /**
-     * 注册订阅者
-     *
-     * @param topic 主题
-     * @param subscriber 订阅者
-     */
-    protected abstract void doRegister(String topic, IotMessageBusSubscriber<?> subscriber);
-
-    /**
-     * 通知订阅者
-     *
-     * @param message 消息
-     */
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    protected void notifySubscribers(String topic, Object message) {
-        List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.get(topic);
-        if (CollUtil.isEmpty(topicSubscribers)) {
-            return;
-        }
-        for (IotMessageBusSubscriber subscriber : topicSubscribers) {
-            try {
-                subscriber.onMessage(topic, message);
-            } catch (Exception e) {
-                log.error("[notifySubscribers][topic({}) message({}) 通知订阅者({})失败]",
-                        topic, e, subscriber.getClass().getName());
-            }
-        }
-    }
-
-}

+ 1 - 2
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java

@@ -20,9 +20,8 @@ public interface IotMessageBus {
     /**
      * 注册消息订阅者
      *
-     * @param topic 主题
      * @param subscriber 订阅者
      */
-    void register(String topic, IotMessageBusSubscriber<?> subscriber);
+    void register(IotMessageBusSubscriber<?> subscriber);
 
 }

+ 10 - 8
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java

@@ -10,18 +10,20 @@ package cn.iocoder.yudao.module.iot.messagebus.core;
 public interface IotMessageBusSubscriber<T> {
 
     /**
-     * 处理接收到的消息
-     *
-     * @param topic 主题
-     * @param message 消息内容
+     * @return 主题
+     */
+    String getTopic();
+
+    /**
+     * @return 分组
      */
-    void onMessage(String topic, T message);
+    String getGroup();
 
     /**
-     * 获取订阅者的顺序
+     * 处理接收到的消息
      *
-     * @return 顺序值
+     * @param message 消息内容
      */
-    int order();
+    void onMessage(T message);
 
 }

+ 35 - 5
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java

@@ -1,6 +1,6 @@
 package cn.iocoder.yudao.module.iot.messagebus.core.local;
 
-import cn.iocoder.yudao.module.iot.messagebus.core.AbstractIotMessageBus;
+import cn.hutool.core.collection.CollUtil;
 import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
 import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
 import lombok.RequiredArgsConstructor;
@@ -8,6 +8,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.event.EventListener;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 /**
  * 本地的 {@link IotMessageBus} 实现类
  *
@@ -17,23 +24,46 @@ import org.springframework.context.event.EventListener;
  */
 @RequiredArgsConstructor
 @Slf4j
-public class LocalIotMessageBus extends AbstractIotMessageBus {
+public class LocalIotMessageBus implements IotMessageBus {
 
     private final ApplicationContext applicationContext;
 
+    /**
+     * 订阅者映射表
+     * Key: topic
+     */
+    private final Map<String, List<IotMessageBusSubscriber<?>>> subscribers = new HashMap<>();
+
     @Override
     public void post(String topic, Object message) {
         applicationContext.publishEvent(new LocalIotMessage(topic, message));
     }
 
     @Override
-    protected void doRegister(String topic, IotMessageBusSubscriber<?> subscriber) {
-        // 无需实现,交给 Spring @EventListener 监听
+    public void register(IotMessageBusSubscriber<?> subscriber) {
+        String topic = subscriber.getTopic();
+        List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new ArrayList<>());
+        topicSubscribers.add(subscriber);
+        log.info("[register][topic({}/{}) 注册消费者({})成功]",
+                topic, subscriber.getGroup(), subscriber.getClass().getName());
     }
 
     @EventListener
+    @SuppressWarnings({"unchecked", "rawtypes"})
     public void onMessage(LocalIotMessage message) {
-        notifySubscribers(message.getTopic(), message.getMessage());
+        String topic = message.getTopic();
+        List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.get(topic);
+        if (CollUtil.isEmpty(topicSubscribers)) {
+            return;
+        }
+        for (IotMessageBusSubscriber subscriber : topicSubscribers) {
+            try {
+                subscriber.onMessage(message.getMessage());
+            } catch (Exception ex) {
+                log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]",
+                        subscriber.getTopic(), subscriber.getGroup(), message.getMessage(), subscriber.getClass().getName(), ex);
+            }
+        }
     }
 
 }

+ 98 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBus.java

@@ -0,0 +1,98 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.rocketmq;
+
+import cn.hutool.core.util.TypeUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+
+import jakarta.annotation.PreDestroy;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 基于 RocketMQ 的 {@link IotMessageBus} 实现类
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class RocketMQIotMessageBus implements IotMessageBus {
+
+    private final RocketMQProperties rocketMQProperties;
+
+    private final RocketMQTemplate rocketMQTemplate;
+
+    /**
+     * 主题对应的消费者映射
+     */
+    private final List<DefaultMQPushConsumer> topicConsumers = new ArrayList<>();
+
+    @Override
+    public void post(String topic, Object message) {
+        SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message));
+        log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result);
+    }
+
+    @Override
+    @SneakyThrows
+    public void register(IotMessageBusSubscriber<?> subscriber) {
+        Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0);
+        if (type == null) {
+            throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
+        }
+
+        // 1.1 创建 DefaultMQPushConsumer
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
+        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
+        consumer.setConsumerGroup(subscriber.getGroup());
+        // 1.2 订阅主题
+        consumer.subscribe(subscriber.getTopic(), "*");
+        // 1.3 设置消息监听器
+        consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
+            for (MessageExt messageExt : messages) {
+                try {
+                    byte[] body = messageExt.getBody();
+                    subscriber.onMessage(JsonUtils.parseObject(body, type));
+                } catch (Exception ex) {
+                    log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]",
+                            subscriber.getTopic(), subscriber.getGroup(), messageExt, subscriber.getClass().getName(), ex);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        // 1.4 启动消费者
+        consumer.start();
+
+        // 2. 保存消费者引用
+        topicConsumers.add(consumer);
+    }
+
+    /**
+     * 销毁时关闭所有消费者
+     */
+    @PreDestroy
+    public void destroy() {
+        for (DefaultMQPushConsumer consumer : topicConsumers) {
+            try {
+                consumer.shutdown();
+                log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup());
+            } catch (Exception e) {
+                log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e);
+            }
+        }
+    }
+
+}

+ 12 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/TestMessage.java

@@ -0,0 +1,12 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+import lombok.Data;
+
+@Data
+public class TestMessage {
+
+    private String nickname;
+
+    private Integer age;
+
+}

+ 56 - 41
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java

@@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.annotation.Import;
 import org.springframework.test.context.TestPropertySource;
@@ -51,17 +50,21 @@ public class LocalIotMessageBusIntegrationTest {
         IotMessageBusSubscriber<String> subscriber1 = new IotMessageBusSubscriber<>() {
 
             @Override
-            public void onMessage(String topic, String message) {
-                log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", topic, message);
-                subscriber1Count.incrementAndGet();
-                assertEquals("test-topic", topic);
-                assertEquals(testMessage, message);
-                latch.countDown();
+            public String getTopic() {
+                return topic;
+            }
+
+            @Override
+            public String getGroup() {
+                return "group1";
             }
 
             @Override
-            public int order() {
-                return 1;
+            public void onMessage(String message) {
+                log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriber1Count.incrementAndGet();
+                assertEquals(testMessage, message);
+                latch.countDown();
             }
 
         };
@@ -69,35 +72,39 @@ public class LocalIotMessageBusIntegrationTest {
         IotMessageBusSubscriber<String> subscriber2 = new IotMessageBusSubscriber<>() {
 
             @Override
-            public void onMessage(String topic, String message) {
-                log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", topic, message);
-                subscriber2Count.incrementAndGet();
-                assertEquals("test-topic", topic);
-                assertEquals(testMessage, message);
-                latch.countDown();
+            public String getTopic() {
+                return topic;
             }
 
             @Override
-            public int order() {
-                return 0;
+            public String getGroup() {
+                return "group2";
+            }
+
+            @Override
+            public void onMessage(String message) {
+                log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriber2Count.incrementAndGet();
+                assertEquals(testMessage, message);
+                latch.countDown();
             }
 
         };
         // 注册订阅者
-        messageBus.register(topic, subscriber1);
-        messageBus.register(topic, subscriber2);
+        messageBus.register(subscriber1);
+        messageBus.register(subscriber2);
 
         // 发送消息
         log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
         messageBus.post(topic, testMessage);
-        // 等待消息处理完成(最多等待5秒)
-        boolean completed = latch.await(5, TimeUnit.SECONDS);
+        // 等待消息处理完成(最多等待 10 秒)
+        boolean completed = latch.await(10, TimeUnit.SECONDS);
 
         // 验证结果
         assertTrue(completed, "消息处理超时");
-        assertEquals(1, subscriber1Count.get(), "订阅者1应该收到1条消息");
-        assertEquals(1, subscriber2Count.get(), "订阅者2应该收到1条消息");
-        log.info("[测试] 测试完成 - 订阅者1收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
+        assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息");
+        assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息");
+        log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者 2 收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
     }
 
     /**
@@ -116,16 +123,20 @@ public class LocalIotMessageBusIntegrationTest {
         IotMessageBusSubscriber<String> statusSubscriber = new IotMessageBusSubscriber<>() {
 
             @Override
-            public void onMessage(String topic, String message) {
-                log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", topic, message);
-                assertEquals(topic1, topic);
-                assertEquals(message1, message);
-                latch.countDown();
+            public String getTopic() {
+                return topic1;
             }
 
             @Override
-            public int order() {
-                return 0;
+            public String getGroup() {
+                return "status-group";
+            }
+
+            @Override
+            public void onMessage(String message) {
+                log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                assertEquals(message1, message);
+                latch.countDown();
             }
 
         };
@@ -133,28 +144,32 @@ public class LocalIotMessageBusIntegrationTest {
         IotMessageBusSubscriber<String> dataSubscriber = new IotMessageBusSubscriber<>() {
 
             @Override
-            public void onMessage(String topic, String message) {
-                log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", topic, message);
-                assertEquals(topic2, topic);
-                assertEquals(message2, message);
-                latch.countDown();
+            public String getTopic() {
+                return topic2;
+            }
+
+            @Override
+            public String getGroup() {
+                return "data-group";
             }
 
             @Override
-            public int order() {
-                return 1;
+            public void onMessage(String message) {
+                log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                assertEquals(message2, message);
+                latch.countDown();
             }
 
         };
         // 注册订阅者到不同主题
-        messageBus.register(topic1, statusSubscriber);
-        messageBus.register(topic2, dataSubscriber);
+        messageBus.register(statusSubscriber);
+        messageBus.register(dataSubscriber);
 
         // 发送消息到不同主题
         messageBus.post(topic1, message1);
         messageBus.post(topic2, message2);
         // 等待消息处理完成
-        boolean completed = latch.await(5, TimeUnit.SECONDS);
+        boolean completed = latch.await(10, TimeUnit.SECONDS);
         assertTrue(completed, "消息处理超时");
         log.info("[测试] 多主题测试完成");
     }

+ 271 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java

@@ -0,0 +1,271 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.rocketmq;
+
+import cn.hutool.core.util.IdUtil;
+import cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
+import cn.iocoder.yudao.module.iot.messagebus.core.TestMessage;
+import jakarta.annotation.Resource;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Import;
+import org.springframework.test.context.TestPropertySource;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link RocketMQIotMessageBus} 集成测试
+ *
+ * @author 芋道源码
+ */
+@SpringBootTest(classes = RocketMQIotMessageBusTest.class)
+@Import({RocketMQAutoConfiguration.class, IotMessageBusAutoConfiguration.class})
+@TestPropertySource(properties = {
+    "yudao.iot.message-bus.type=rocketmq",
+    "rocketmq.name-server=127.0.0.1:9876",
+    "rocketmq.producer.group=test-rocketmq-group",
+    "rocketmq.producer.send-message-timeout=10000"
+})
+@Slf4j
+public class RocketMQIotMessageBusTest {
+
+    @Resource
+    private IotMessageBus messageBus;
+
+    /**
+     * 1 topic 1 subscriber(string)
+     */
+    @Test
+    public void testSendMessageWithOneSubscriber() throws InterruptedException {
+        // 准备
+        String topic = "test-topic-" + IdUtil.simpleUUID();
+//        String topic = "test-topic-pojo";
+        String testMessage = "Hello IoT Message Bus!";
+        // 用于等待消息处理完成
+        CountDownLatch latch = new CountDownLatch(1);
+        // 用于记录接收到的消息
+        AtomicInteger subscriberCount = new AtomicInteger(0);
+        AtomicReference<String> subscriberMessageRef = new AtomicReference<>();
+
+        // 发送消息(需要提前发,保证 RocketMQ 路由的创建)
+        log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
+        messageBus.post(topic, testMessage);
+
+        // 创建订阅者
+        IotMessageBusSubscriber<String> subscriber1 = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public String getTopic() {
+                return topic;
+            }
+
+            @Override
+            public String getGroup() {
+                return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
+//                return "test-topic-consumer-01";
+            }
+
+            @Override
+            public void onMessage(String message) {
+                log.info("[订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriberCount.incrementAndGet();
+                subscriberMessageRef.set(message);
+                assertEquals(testMessage, message);
+                latch.countDown();
+            }
+
+        };
+        // 注册订阅者
+        messageBus.register(subscriber1);
+
+        // 等待消息处理完成(最多等待 5 秒)
+        boolean completed = latch.await(10, TimeUnit.SECONDS);
+
+        // 验证结果
+        assertTrue(completed, "消息处理超时");
+        assertEquals(1, subscriberCount.get(), "订阅者应该收到 1 条消息");
+        log.info("[测试] 测试完成 - 订阅者收到{}条消息", subscriberCount.get());
+        assertEquals(testMessage, subscriberMessageRef.get(), "接收到的消息内容不匹配");
+    }
+
+    /**
+     * 1 topic 2 subscriber(pojo)
+     */
+    @Test
+    public void testSendMessageWithTwoSubscribers() throws InterruptedException {
+        // 准备
+        String topic = "test-topic-" + IdUtil.simpleUUID();
+//        String topic = "test-topic-pojo";
+        TestMessage testMessage = new TestMessage().setNickname("yunai").setAge(18);
+        // 用于等待消息处理完成
+        CountDownLatch latch = new CountDownLatch(2);
+        // 用于记录接收到的消息
+        AtomicInteger subscriber1Count = new AtomicInteger(0);
+        AtomicReference<TestMessage> subscriber1MessageRef = new AtomicReference<>();
+        AtomicInteger subscriber2Count = new AtomicInteger(0);
+        AtomicReference<TestMessage> subscriber2MessageRef = new AtomicReference<>();
+
+        // 发送消息(需要提前发,保证 RocketMQ 路由的创建)
+        log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
+        messageBus.post(topic, testMessage);
+
+        // 创建第一个订阅者
+        IotMessageBusSubscriber<TestMessage> subscriber1 = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public String getTopic() {
+                return topic;
+            }
+
+            @Override
+            public String getGroup() {
+                return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
+//                return "test-topic-consumer-01";
+            }
+
+            @Override
+            public void onMessage(TestMessage message) {
+                log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriber1Count.incrementAndGet();
+                subscriber1MessageRef.set(message);
+                assertEquals(testMessage, message);
+                latch.countDown();
+            }
+
+        };
+        // 创建第二个订阅者
+        IotMessageBusSubscriber<TestMessage> subscriber2 = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public String getTopic() {
+                return topic;
+            }
+
+            @Override
+            public String getGroup() {
+                return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
+//                return "test-topic-consumer-02";
+            }
+
+            @Override
+            public void onMessage(TestMessage message) {
+                log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriber2Count.incrementAndGet();
+                subscriber2MessageRef.set(message);
+                assertEquals(testMessage, message);
+                latch.countDown();
+            }
+
+        };
+        // 注册订阅者
+        messageBus.register(subscriber1);
+        messageBus.register(subscriber2);
+
+        // 等待消息处理完成(最多等待 5 秒)
+        boolean completed = latch.await(10, TimeUnit.SECONDS);
+
+        // 验证结果
+        assertTrue(completed, "消息处理超时");
+        assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息");
+        assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息");
+        log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
+        assertEquals(testMessage, subscriber1MessageRef.get(), "接收到的消息内容不匹配");
+        assertEquals(testMessage, subscriber2MessageRef.get(), "接收到的消息内容不匹配");
+    }
+
+    /**
+     * 2 topic 2 subscriber
+     */
+    @Test
+    public void testMultipleTopics() throws InterruptedException {
+        // 准备
+        String topic1 = "device-status-" + IdUtil.simpleUUID();
+        String topic2 = "device-data-" + IdUtil.simpleUUID();
+        String message1 = "设备在线";
+        String message2 = "温度:25°C";
+        CountDownLatch latch = new CountDownLatch(2);
+        AtomicInteger subscriber1Count = new AtomicInteger(0);
+        AtomicReference<String> subscriber1MessageRef = new AtomicReference<>();
+        AtomicInteger subscriber2Count = new AtomicInteger(0);
+        AtomicReference<String> subscriber2MessageRef = new AtomicReference<>();
+
+
+        // 发送消息到不同主题(需要提前发,保证 RocketMQ 路由的创建)
+        log.info("[测试] 发送消息 - Topic1: {}, Message1: {}", topic1, message1);
+        messageBus.post(topic1, message1);
+        log.info("[测试] 发送消息 - Topic2: {}, Message2: {}", topic2, message2);
+        messageBus.post(topic2, message2);
+
+        // 创建订阅者 1 - 只订阅设备状态
+        IotMessageBusSubscriber<String> statusSubscriber = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public String getTopic() {
+                return topic1;
+            }
+
+            @Override
+            public String getGroup() {
+                return "status-group-" + IdUtil.simpleUUID();
+            }
+
+            @Override
+            public void onMessage(String message) {
+                log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriber1Count.incrementAndGet();
+                subscriber1MessageRef.set(message);
+                assertEquals(message1, message);
+                latch.countDown();
+            }
+
+        };
+        // 创建订阅者 2 - 只订阅设备数据
+        IotMessageBusSubscriber<String> dataSubscriber = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public String getTopic() {
+                return topic2;
+            }
+
+            @Override
+            public String getGroup() {
+                return "data-group-" + IdUtil.simpleUUID();
+            }
+
+            @Override
+            public void onMessage(String message) {
+                log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
+                subscriber2Count.incrementAndGet();
+                subscriber2MessageRef.set(message);
+                assertEquals(message2, message);
+                latch.countDown();
+            }
+
+        };
+        // 注册订阅者到不同主题
+        messageBus.register(statusSubscriber);
+        messageBus.register(dataSubscriber);
+
+        // 等待消息处理完成
+        boolean completed = latch.await(10, TimeUnit.SECONDS);
+
+        // 验证结果
+        assertTrue(completed, "消息处理超时");
+        assertEquals(1, subscriber1Count.get(), "状态订阅者应该收到 1 条消息");
+        assertEquals(message1, subscriber1MessageRef.get(), "状态订阅者接收到的消息内容不匹配");
+        assertEquals(1, subscriber2Count.get(), "数据订阅者应该收到 1 条消息");
+        assertEquals(message2, subscriber2MessageRef.get(), "数据订阅者接收到的消息内容不匹配");
+        log.info("[测试] 多主题测试完成 - 状态订阅者收到{}条消息,数据订阅者收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
+    }
+
+}

+ 0 - 4
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml

@@ -1,4 +0,0 @@
-yudao:
-  iot:
-    message-bus:
-      type: local