Просмотр исходного кода

feat:【IoT 物联网】增加消息总线(messagebus)的 local 实现

YunaiV 1 год назад
Родитель
Сommit
39fb31d400
13 измененных файлов с 500 добавлено и 0 удалено
  1. 1 0
      yudao-module-iot/pom.xml
  2. 24 0
      yudao-module-iot/yudao-module-iot-core/pom.xml
  3. 65 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml
  4. 37 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java
  5. 28 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java
  6. 69 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java
  7. 28 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java
  8. 27 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java
  9. 14 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java
  10. 39 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java
  11. 2 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories
  12. 162 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java
  13. 4 0
      yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml

+ 1 - 0
yudao-module-iot/pom.xml

@@ -11,6 +11,7 @@
         <module>yudao-module-iot-biz</module>
         <module>yudao-module-iot-net-components</module>
         <module>yudao-module-iot-protocol</module>
+        <module>yudao-module-iot-core</module>
     </modules>
     <modelVersion>4.0.0</modelVersion>
 

+ 24 - 0
yudao-module-iot/yudao-module-iot-core/pom.xml

@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>yudao-module-iot</artifactId>
+        <groupId>cn.iocoder.boot</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modules>
+        <module>yudao-module-iot-message-bus</module>
+    </modules>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>yudao-module-iot-core</artifactId>
+    <packaging>pom</packaging>
+
+    <name>${project.artifactId}</name>
+    <description>
+        iot 模块下,提供 biz 和 gateway-server 模块的核心功能。
+        例如说:消息总线、消息协议(编解码)等。
+    </description>
+
+</project>

+ 65 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/pom.xml

@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>yudao-module-iot-core</artifactId>
+        <groupId>cn.iocoder.boot</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>yudao-module-iot-message-bus</artifactId>
+    <packaging>jar</packaging>
+
+    <name>${project.artifactId}</name>
+    <description>
+        iot 模块下,提供消息总线的功能。
+        可选择使用 spring event、redis stream、rocketmq、kafka、rabbitmq 等。
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-common</artifactId>
+        </dependency>
+
+        <!-- Spring 核心 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+
+        <!-- 消息中间件相关(可选依赖) -->
+        <dependency>
+            <groupId>org.springframework.data</groupId>
+            <artifactId>spring-data-redis</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <!-- 测试依赖 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>

+ 37 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java

@@ -0,0 +1,37 @@
+package cn.iocoder.yudao.module.iot.messagebus.config;
+
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.local.LocalIotMessageBus;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * IoT 消息总线自动配置
+ *
+ * @author 芋道源码
+ */
+@AutoConfiguration
+@EnableConfigurationProperties(IotMessageBusProperties.class)
+@Slf4j
+public class IotMessageBusAutoConfiguration {
+
+    // ==================== Local 实现 ====================
+
+    @Configuration
+    @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true)
+    public static class LocalIotMessageBusConfiguration {
+
+        @Bean
+        public IotMessageBus localIotMessageBus(ApplicationContext applicationContext) {
+            log.info("[localIotMessageBus][创建 Local IoT 消息总线]");
+            return new LocalIotMessageBus(applicationContext);
+        }
+
+    }
+
+}

+ 28 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusProperties.java

@@ -0,0 +1,28 @@
+package cn.iocoder.yudao.module.iot.messagebus.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+import jakarta.validation.constraints.NotEmpty;
+import jakarta.validation.constraints.NotNull;
+
+/**
+ * IoT 消息总线配置属性
+ *
+ * @author 芋道源码
+ */
+@ConfigurationProperties("yudao.iot.message-bus")
+@Data
+@Validated
+public class IotMessageBusProperties {
+
+    /**
+     * 消息总线类型
+     *
+     * 可选值:local、redis、rocketmq、rabbitmq
+     */
+    @NotNull(message = "IoT 消息总线类型不能为空")
+    private String type = "local";
+
+}

+ 69 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java

@@ -0,0 +1,69 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * IoT 消息总线抽象基类
+ *
+ * 提供通用的订阅者管理功能
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public abstract class AbstractIotMessageBus implements IotMessageBus {
+
+    /**
+     * 订阅者映射表
+     * Key: topic
+     */
+    private final Map<String, List<IotMessageBusSubscriber<?>>> subscribers = new ConcurrentHashMap<>();
+
+    @Override
+    public void register(String topic, IotMessageBusSubscriber<?> subscriber) {
+        // 执行注册
+        doRegister(topic, subscriber);
+
+        // 添加订阅者映射
+        List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>());
+        topicSubscribers.add(subscriber);
+        topicSubscribers.sort(Comparator.comparingInt(IotMessageBusSubscriber::order));
+        log.info("[register][topic({}) 注册订阅者({})成功]", topic, subscriber.getClass().getName());
+    }
+
+    /**
+     * 注册订阅者
+     *
+     * @param topic 主题
+     * @param subscriber 订阅者
+     */
+    protected abstract void doRegister(String topic, IotMessageBusSubscriber<?> subscriber);
+
+    /**
+     * 通知订阅者
+     *
+     * @param message 消息
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected void notifySubscribers(String topic, Object message) {
+        List<IotMessageBusSubscriber<?>> topicSubscribers = subscribers.get(topic);
+        if (CollUtil.isEmpty(topicSubscribers)) {
+            return;
+        }
+        for (IotMessageBusSubscriber subscriber : topicSubscribers) {
+            try {
+                subscriber.onMessage(topic, message);
+            } catch (Exception e) {
+                log.error("[notifySubscribers][topic({}) message({}) 通知订阅者({})失败]",
+                        topic, e, subscriber.getClass().getName());
+            }
+        }
+    }
+
+}

+ 28 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java

@@ -0,0 +1,28 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+/**
+ * IoT 消息总线接口
+ *
+ * 用于在 IoT 系统中发布和订阅消息,支持多种消息中间件实现
+ *
+ * @author 芋道源码
+ */
+public interface IotMessageBus {
+
+    /**
+     * 发布消息到消息总线
+     *
+     * @param topic 主题
+     * @param message 消息内容
+     */
+    void post(String topic, Object message);
+
+    /**
+     * 注册消息订阅者
+     *
+     * @param topic 主题
+     * @param subscriber 订阅者
+     */
+    void register(String topic, IotMessageBusSubscriber<?> subscriber);
+
+}

+ 27 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java

@@ -0,0 +1,27 @@
+package cn.iocoder.yudao.module.iot.messagebus.core;
+
+/**
+ * IoT 消息总线订阅者接口
+ *
+ * 用于处理从消息总线接收到的消息
+ *
+ * @author 芋道源码
+ */
+public interface IotMessageBusSubscriber<T> {
+
+    /**
+     * 处理接收到的消息
+     *
+     * @param topic 主题
+     * @param message 消息内容
+     */
+    void onMessage(String topic, T message);
+
+    /**
+     * 获取订阅者的顺序
+     *
+     * @return 顺序值
+     */
+    int order();
+
+}

+ 14 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessage.java

@@ -0,0 +1,14 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.local;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class LocalIotMessage {
+
+    private String topic;
+
+    private Object message;
+
+}

+ 39 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java

@@ -0,0 +1,39 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.local;
+
+import cn.iocoder.yudao.module.iot.messagebus.core.AbstractIotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.EventListener;
+
+/**
+ * 本地的 {@link IotMessageBus} 实现类
+ *
+ * 注意:仅适用于单机场景!!!
+ *
+ * @author 芋道源码
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class LocalIotMessageBus extends AbstractIotMessageBus {
+
+    private final ApplicationContext applicationContext;
+
+    @Override
+    public void post(String topic, Object message) {
+        applicationContext.publishEvent(new LocalIotMessage(topic, message));
+    }
+
+    @Override
+    protected void doRegister(String topic, IotMessageBusSubscriber<?> subscriber) {
+        // 无需实现,交给 Spring @EventListener 监听
+    }
+
+    @EventListener
+    public void onMessage(LocalIotMessage message) {
+        notifySubscribers(message.getTopic(), message.getMessage());
+    }
+
+}

+ 2 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration 

+ 162 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java

@@ -0,0 +1,162 @@
+package cn.iocoder.yudao.module.iot.messagebus.core.local;
+
+import cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Import;
+import org.springframework.test.context.TestPropertySource;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link LocalIotMessageBus} 集成测试
+ *
+ * @author 芋道源码
+ */
+@SpringBootTest(classes = LocalIotMessageBusIntegrationTest.class)
+@Import(IotMessageBusAutoConfiguration.class)
+@TestPropertySource(properties = {
+    "yudao.iot.message-bus.type=local"
+})
+@Slf4j
+public class LocalIotMessageBusIntegrationTest {
+
+    @Resource
+    private IotMessageBus messageBus;
+
+    /**
+     * 1 topic 2 subscriber
+     */
+    @Test
+    public void testSendMessageWithTwoSubscribers() throws InterruptedException {
+        // 准备
+        String topic = "test-topic";
+        String testMessage = "Hello IoT Message Bus!";
+        // 用于等待消息处理完成
+        CountDownLatch latch = new CountDownLatch(2);
+        // 用于记录接收到的消息
+        AtomicInteger subscriber1Count = new AtomicInteger(0);
+        AtomicInteger subscriber2Count = new AtomicInteger(0);
+
+        // 创建第一个订阅者
+        IotMessageBusSubscriber<String> subscriber1 = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public void onMessage(String topic, String message) {
+                log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", topic, message);
+                subscriber1Count.incrementAndGet();
+                assertEquals("test-topic", topic);
+                assertEquals(testMessage, message);
+                latch.countDown();
+            }
+
+            @Override
+            public int order() {
+                return 1;
+            }
+
+        };
+        // 创建第二个订阅者
+        IotMessageBusSubscriber<String> subscriber2 = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public void onMessage(String topic, String message) {
+                log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", topic, message);
+                subscriber2Count.incrementAndGet();
+                assertEquals("test-topic", topic);
+                assertEquals(testMessage, message);
+                latch.countDown();
+            }
+
+            @Override
+            public int order() {
+                return 0;
+            }
+
+        };
+        // 注册订阅者
+        messageBus.register(topic, subscriber1);
+        messageBus.register(topic, subscriber2);
+
+        // 发送消息
+        log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
+        messageBus.post(topic, testMessage);
+        // 等待消息处理完成(最多等待5秒)
+        boolean completed = latch.await(5, TimeUnit.SECONDS);
+
+        // 验证结果
+        assertTrue(completed, "消息处理超时");
+        assertEquals(1, subscriber1Count.get(), "订阅者1应该收到1条消息");
+        assertEquals(1, subscriber2Count.get(), "订阅者2应该收到1条消息");
+        log.info("[测试] 测试完成 - 订阅者1收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
+    }
+
+    /**
+     * 2 topic 2 subscriber
+     */
+    @Test
+    public void testMultipleTopics() throws InterruptedException {
+        // 准备
+        String topic1 = "device-status";
+        String topic2 = "device-data";
+        String message1 = "设备在线";
+        String message2 = "温度:25°C";
+        CountDownLatch latch = new CountDownLatch(2);
+
+        // 创建订阅者 1 - 只订阅设备状态
+        IotMessageBusSubscriber<String> statusSubscriber = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public void onMessage(String topic, String message) {
+                log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", topic, message);
+                assertEquals(topic1, topic);
+                assertEquals(message1, message);
+                latch.countDown();
+            }
+
+            @Override
+            public int order() {
+                return 0;
+            }
+
+        };
+        // 创建订阅者 2 - 只订阅设备数据
+        IotMessageBusSubscriber<String> dataSubscriber = new IotMessageBusSubscriber<>() {
+
+            @Override
+            public void onMessage(String topic, String message) {
+                log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", topic, message);
+                assertEquals(topic2, topic);
+                assertEquals(message2, message);
+                latch.countDown();
+            }
+
+            @Override
+            public int order() {
+                return 1;
+            }
+
+        };
+        // 注册订阅者到不同主题
+        messageBus.register(topic1, statusSubscriber);
+        messageBus.register(topic2, dataSubscriber);
+
+        // 发送消息到不同主题
+        messageBus.post(topic1, message1);
+        messageBus.post(topic2, message2);
+        // 等待消息处理完成
+        boolean completed = latch.await(5, TimeUnit.SECONDS);
+        assertTrue(completed, "消息处理超时");
+        log.info("[测试] 多主题测试完成");
+    }
+
+}

+ 4 - 0
yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml

@@ -0,0 +1,4 @@
+yudao:
+  iot:
+    message-bus:
+      type: local