Sfoglia il codice sorgente

rocketmq接入,多生产者,多消费者,配置开关

车车 5 mesi fa
parent
commit
c74ad0f516

+ 34 - 0
ktg-admin/src/main/java/com/ktg/web/controller/common/OrderController.java

@@ -0,0 +1,34 @@
+package com.ktg.web.controller.common;
+
+import com.ktg.common.utils.rocketmq.RocketMQProducerManager;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@Api(tags = "mq测试")
+@RestController
+@RequestMapping("/test")
+public class OrderController {
+    private final RocketMQProducerManager producerManager;
+
+    // 通过构造函数注入生产者管理器
+    public OrderController(RocketMQProducerManager producerManager) {
+        this.producerManager = producerManager;
+    }
+
+    @ApiOperation("发送消息测试")
+    @GetMapping("/create-order")
+    public String createOrder() throws Exception {
+        // 使用order-producer发送订单创建消息
+        producerManager.sendSync(
+                "order-producer", // 生产者名称
+                "order_topic",    // Topic
+                "create",         // Tag
+                "order_123",      // Key
+                "订单创建内容".getBytes() // 消息体
+        );
+        return "Order created";
+    }
+}

+ 23 - 0
ktg-admin/src/main/resources/application.yml

@@ -169,3 +169,26 @@ mail:
   #reminder: MATERIALS_EXPIRATION_REMINDER
   # 物资逾期告警模板
   #alarm: MATERIALS_OVERDUE_ALARM
+
+### MQ CONFIG
+rocketmq:
+  namesrvAddr: localhost:9876 # NameServer地址
+  producers: # 生产者配置
+    order-producer: # 订单生产者
+      group: order_producer_group # 生产者组名
+      instanceName: order-producer-instance # 实例名称
+    payment-producer: # 支付生产者
+      group: payment_producer_group
+      retryTimesWhenSendFailed: 5 # 发送失败重试次数
+  consumers: # 消费者配置
+    order-consumer: # 订单消费者
+      group: order_consumer_group
+      topic: order_topic # 订阅的Topic
+      tags: "create || pay" # 订阅的Tag表达式
+      consumeThreadMin: 10 # 最小消费线程数
+      enabled: false # 是否启用
+    payment-consumer: # 支付消费者
+      group: payment_consumer_group
+      topic: payment_topic
+      tags: "*" # 订阅所有Tag
+      enabled: false

+ 6 - 0
ktg-common/pom.xml

@@ -223,6 +223,12 @@
             <version>3.17.4</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.2.3</version>
+        </dependency>
+
     </dependencies>
 
 </project>

+ 58 - 0
ktg-common/src/main/java/com/ktg/common/utils/rocketmq/RocketMQAutoConfiguration.java

@@ -0,0 +1,58 @@
+package com.ktg.common.utils.rocketmq;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * RocketMQ自动配置类
+ * 在Spring Boot应用启动时自动配置RocketMQ相关Bean
+ */
+@Configuration // 声明为配置类
+@ConditionalOnClass(DefaultMQProducer.class) // 当类路径下有DefaultMQProducer类时生效
+@EnableConfigurationProperties(RocketMQProperties.class) // 启用配置属性
+public class RocketMQAutoConfiguration {
+
+    /**
+     * 创建生产者管理器Bean
+     * @param properties RocketMQ配置属性
+     * @return 生产者管理器实例
+     */
+    @Bean
+    @ConditionalOnMissingBean // 当容器中没有该Bean时创建
+    public RocketMQProducerManager rocketMQProducerManager(RocketMQProperties properties) {
+        return new RocketMQProducerManager(properties);
+    }
+
+    /**
+     * 创建默认的消息处理器Map
+     * 应用可以通过@Bean提供自定义的消息处理器
+     * @return 空的处理器Map
+     */
+    @Bean
+    @ConditionalOnMissingBean
+    public Map<String, Consumer<MessageExt>> rocketMQMessageHandlers() {
+        return new HashMap<>();
+    }
+
+    /**
+     * 创建消费者管理器Bean
+     * @param properties RocketMQ配置属性
+     * @param messageHandlers 消息处理器Map
+     * @return 消费者管理器实例
+     */
+    @Bean
+    @ConditionalOnMissingBean
+    public RocketMQConsumerManager rocketMQConsumerManager(RocketMQProperties properties,
+                                                           Map<String, Consumer<MessageExt>> messageHandlers) {
+        return new RocketMQConsumerManager(properties, messageHandlers);
+    }
+}

+ 140 - 0
ktg-common/src/main/java/com/ktg/common/utils/rocketmq/RocketMQConsumerManager.java

@@ -0,0 +1,140 @@
+package com.ktg.common.utils.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+/**
+ * RocketMQ消费者管理器
+ * 负责管理多个RocketMQ消费者实例
+ * 实现InitializingBean在Spring初始化完成后自动初始化消费者
+ * 实现DisposableBean在Spring销毁时自动关闭消费者
+ */
+public class RocketMQConsumerManager implements InitializingBean, DisposableBean {
+    // RocketMQ配置属性
+    private final RocketMQProperties properties;
+
+    // 消息处理器Map,key为消费者名称,value为消息处理函数
+    private final Map<String, Consumer<MessageExt>> messageHandlers;
+
+    // 消费者Map,key为消费者名称,value为消费者实例
+    private final Map<String, DefaultMQPushConsumer> consumerMap = new ConcurrentHashMap<>();
+
+    /**
+     * 构造函数
+     * @param properties RocketMQ配置属性
+     * @param messageHandlers 消息处理器Map
+     */
+    public RocketMQConsumerManager(RocketMQProperties properties,
+                                   Map<String, Consumer<MessageExt>> messageHandlers) {
+        this.properties = properties;
+        this.messageHandlers = messageHandlers;
+    }
+
+    /**
+     * Spring Bean初始化完成后调用的方法
+     * 初始化所有配置的消费者实例
+     */
+    @Override
+    public void afterPropertiesSet() {
+        // 遍历所有消费者配置
+        properties.getConsumers().forEach((name, config) -> {
+            // 检查消费者是否启用
+            if (!config.isEnabled()) {
+                return;
+            }
+
+            // 获取对应的消息处理器
+            Consumer<MessageExt> messageHandler = messageHandlers.get(name);
+            if (messageHandler == null) {
+                throw new IllegalStateException("No message handler found for consumer: " + name);
+            }
+
+            // 创建消费者实例
+            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(config.getGroup());
+
+            // 设置NameServer地址
+            consumer.setNamesrvAddr(properties.getNamesrvAddr());
+
+            // 设置消费者参数
+            consumer.setConsumeThreadMin(config.getConsumeThreadMin());
+            consumer.setConsumeThreadMax(config.getConsumeThreadMax());
+            consumer.setConsumeMessageBatchMaxSize(config.getConsumeMessageBatchMaxSize());
+
+            // 设置实例名称(如果配置了)
+            if (config.getInstanceName() != null) {
+                consumer.setInstanceName(config.getInstanceName());
+            }
+
+            try {
+                // 订阅Topic和Tags
+                consumer.subscribe(config.getTopic(), config.getTags());
+
+                // 注册消息监听器
+                consumer.registerMessageListener(new MessageListenerConcurrently() {
+                    @Override
+                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                    ConsumeConcurrentlyContext context) {
+                        // 处理每条消息
+                        for (MessageExt msg : msgs) {
+                            try {
+                                // 调用消息处理器处理消息
+                                messageHandler.accept(msg);
+                            } catch (Exception e) {
+                                // 处理失败,稍后重试
+                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                            }
+                        }
+                        // 处理成功
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    }
+                });
+                try {
+                    // 启动消费者
+                    consumer.start();
+                    System.out.println("=============初始化消费者成功===============");
+                } catch (Exception e) {
+                    System.out.println("初始化消费者失败!");
+                }
+
+                // 将消费者加入Map
+                consumerMap.put(name, consumer);
+            } catch (MQClientException e) {
+                throw new RuntimeException("Failed to start RocketMQ consumer: " + name, e);
+            }
+        });
+    }
+
+    /**
+     * Spring Bean销毁时调用的方法
+     * 关闭所有消费者实例
+     */
+    @Override
+    public void destroy() {
+        System.out.println("=================销毁消费者====================");
+        consumerMap.values().forEach(DefaultMQPushConsumer::shutdown);
+    }
+
+    /**
+     * 获取指定名称的消费者实例
+     * @param name 消费者名称
+     * @return 消费者实例
+     */
+    public DefaultMQPushConsumer getConsumer(String name) {
+        DefaultMQPushConsumer consumer = consumerMap.get(name);
+        if (consumer == null) {
+            throw new IllegalArgumentException("RocketMQ consumer not found: " + name);
+        }
+        return consumer;
+    }
+}

+ 52 - 0
ktg-common/src/main/java/com/ktg/common/utils/rocketmq/RocketMQHandlerConfig.java

@@ -0,0 +1,52 @@
+package com.ktg.common.utils.rocketmq;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+@Configuration
+public class RocketMQHandlerConfig {
+
+    @Bean
+    public Map<String, Consumer<MessageExt>> customMessageHandlers() {
+        Map<String, Consumer<MessageExt>> handlers = new HashMap<>();
+
+        // 订单消息处理器
+        handlers.put("order-consumer", msg -> {
+            System.out.println("处理订单消息,消息ID: " + msg.getMsgId());
+            System.out.println("消息内容: " + new String(msg.getBody()));
+            System.out.println("Topic: " + msg.getTopic());
+            System.out.println("Tags: " + msg.getTags());
+
+            // 业务逻辑处理...
+            // 可以根据不同的Tag执行不同的业务逻辑
+            if ("create".equals(msg.getTags())) {
+                processOrderCreate(msg);
+            } else if ("pay".equals(msg.getTags())) {
+                processOrderPay(msg);
+            }
+        });
+
+        // 支付消息处理器
+        handlers.put("payment-consumer", msg -> {
+            System.out.println("处理支付消息: " + new String(msg.getBody()));
+            // 支付业务逻辑...
+        });
+
+        return handlers;
+    }
+
+    private void processOrderCreate(MessageExt msg) {
+        // 订单创建处理逻辑
+        System.out.println("获取订单创建消息成功:" + msg);
+    }
+
+    private void processOrderPay(MessageExt msg) {
+        // 订单支付处理逻辑
+        System.out.println("获取订单支付消息成功:" + msg);
+    }
+}

+ 135 - 0
ktg-common/src/main/java/com/ktg/common/utils/rocketmq/RocketMQProducerManager.java

@@ -0,0 +1,135 @@
+package com.ktg.common.utils.rocketmq;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * RocketMQ生产者管理器
+ * 负责管理多个RocketMQ生产者实例
+ * 实现InitializingBean在Spring初始化完成后自动初始化生产者
+ * 实现DisposableBean在Spring销毁时自动关闭生产者
+ */
+public class RocketMQProducerManager implements InitializingBean, DisposableBean {
+    // RocketMQ配置属性
+    private final RocketMQProperties properties;
+
+    // 生产者Map,key为生产者名称,value为生产者实例
+    private final Map<String, DefaultMQProducer> producerMap = new ConcurrentHashMap<>();
+
+    /**
+     * 构造函数
+     * @param properties RocketMQ配置属性
+     */
+    public RocketMQProducerManager(RocketMQProperties properties) {
+        this.properties = properties;
+    }
+
+    /**
+     * Spring Bean初始化完成后调用的方法
+     * 初始化所有配置的生产者实例
+     */
+    @Override
+    public void afterPropertiesSet() {
+        // 遍历所有生产者配置
+        properties.getProducers().forEach((name, config) -> {
+            // 创建生产者实例
+            DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
+
+            // 设置NameServer地址
+            producer.setNamesrvAddr(properties.getNamesrvAddr());
+
+            // 设置生产者参数
+            producer.setRetryTimesWhenSendFailed(config.getRetryTimesWhenSendFailed());
+            producer.setSendMsgTimeout(config.getSendMsgTimeout());
+            producer.setCompressMsgBodyOverHowmuch(config.getCompressMsgBodyOverHowmuch());
+
+            // 设置实例名称(如果配置了)
+            if (config.getInstanceName() != null) {
+                producer.setInstanceName(config.getInstanceName());
+            }
+
+            try {
+                // 启动生产者
+                producer.start();
+                // 将生产者加入Map
+                producerMap.put(name, producer);
+            } catch (MQClientException e) {
+                throw new RuntimeException("Failed to start RocketMQ producer: " + name, e);
+            }
+        });
+    }
+
+    /**
+     * Spring Bean销毁时调用的方法
+     * 关闭所有生产者实例
+     */
+    @Override
+    public void destroy() {
+        producerMap.values().forEach(DefaultMQProducer::shutdown);
+    }
+
+    /**
+     * 同步发送消息
+     * @param producerName 生产者名称(配置中的key)
+     * @param topic 消息Topic
+     * @param tags 消息Tags
+     * @param keys 消息Keys
+     * @param body 消息体
+     * @return 发送结果
+     */
+    public SendResult sendSync(String producerName, String topic, String tags, String keys, byte[] body) throws Exception {
+        DefaultMQProducer producer = getProducer(producerName);
+        Message msg = new Message(topic, tags, keys, body);
+        return producer.send(msg);
+    }
+
+    /**
+     * 异步发送消息
+     * @param producerName 生产者名称
+     * @param topic 消息Topic
+     * @param tags 消息Tags
+     * @param keys 消息Keys
+     * @param body 消息体
+     * @param sendCallback 异步回调接口
+     */
+    public void sendAsync(String producerName, String topic, String tags, String keys, byte[] body, SendCallback sendCallback) throws Exception {
+        DefaultMQProducer producer = getProducer(producerName);
+        Message msg = new Message(topic, tags, keys, body);
+        producer.send(msg, sendCallback);
+    }
+
+    /**
+     * 单向发送消息(不关心发送结果)
+     * @param producerName 生产者名称
+     * @param topic 消息Topic
+     * @param tags 消息Tags
+     * @param keys 消息Keys
+     * @param body 消息体
+     */
+    public void sendOneway(String producerName, String topic, String tags, String keys, byte[] body) throws Exception {
+        DefaultMQProducer producer = getProducer(producerName);
+        Message msg = new Message(topic, tags, keys, body);
+        producer.sendOneway(msg);
+    }
+
+    /**
+     * 获取指定名称的生产者实例
+     * @param name 生产者名称
+     * @return 生产者实例
+     */
+    public DefaultMQProducer getProducer(String name) {
+        DefaultMQProducer producer = producerMap.get(name);
+        if (producer == null) {
+            throw new IllegalArgumentException("RocketMQ producer not found: " + name);
+        }
+        return producer;
+    }
+}

+ 50 - 0
ktg-common/src/main/java/com/ktg/common/utils/rocketmq/RocketMQProperties.java

@@ -0,0 +1,50 @@
+package com.ktg.common.utils.rocketmq;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * RocketMQ 配置属性类
+ * 通过@ConfigurationProperties注解绑定application.yml中的配置
+ */
+@Data // Lombok注解,自动生成getter/setter等方法
+@ConfigurationProperties(prefix = "rocketmq") // 指定配置前缀
+public class RocketMQProperties {
+    // NameServer地址,格式为 ip:port;ip:port
+    private String namesrvAddr;
+
+    // 生产者配置Map,key为生产者名称,value为生产者配置
+    private Map<String, ProducerConfig> producers = new HashMap<>();
+
+    // 消费者配置Map,key为消费者名称,value为消费者配置
+    private Map<String, ConsumerConfig> consumers = new HashMap<>();
+
+    /**
+     * 生产者配置内部类
+     */
+    @Data
+    public static class ProducerConfig {
+        private String group; // 生产者组名
+        private int retryTimesWhenSendFailed = 3; // 发送失败重试次数,默认3次
+        private int sendMsgTimeout = 3000; // 发送消息超时时间,默认3000ms
+        private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息压缩阈值,默认4KB
+        private String instanceName; // 生产者实例名称
+    }
+
+    /**
+     * 消费者配置内部类
+     */
+    @Data
+    public static class ConsumerConfig {
+        private String group; // 消费者组名
+        private String topic; // 订阅的Topic
+        private String tags = "*"; // 订阅的Tag表达式,默认*表示所有
+        private int consumeThreadMin = 5; // 最小消费线程数,默认5
+        private int consumeThreadMax = 10; // 最大消费线程数,默认10
+        private int consumeMessageBatchMaxSize = 1; // 批量消费大小,默认1
+        private String instanceName; // 消费者实例名称
+        private boolean enabled = true; // 是否启用,默认true
+    }
+}