Преглед на файлове

feat:Redis Stream 增加清理 Job,避免占用内存过多

YunaiV преди 6 месеца
родител
ревизия
361e50e5de

+ 12 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQConsumerAutoConfiguration.java

@@ -6,6 +6,7 @@ import cn.hutool.system.SystemUtil;
 import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
 import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
 import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.job.RedisStreamMessageCleanupJob;
 import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
 import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
 import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
@@ -73,6 +74,17 @@ public class YudaoRedisMQConsumerAutoConfiguration {
         return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
     }
 
+    /**
+     * 创建 Redis Stream 消息清理任务
+     */
+    @Bean
+    @ConditionalOnBean(AbstractRedisStreamMessageListener.class)
+    public RedisStreamMessageCleanupJob redisStreamMessageCleanupJob(List<AbstractRedisStreamMessageListener<?>> listeners,
+                                                                     RedisMQTemplate redisTemplate,
+                                                                     RedissonClient redissonClient) {
+        return new RedisStreamMessageCleanupJob(listeners, redisTemplate, redissonClient);
+    }
+
     /**
      * 创建 Redis Stream 集群消费的容器
      *

+ 3 - 3
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java

@@ -23,13 +23,13 @@ import java.util.Objects;
 @AllArgsConstructor
 public class RedisPendingMessageResendJob {
 
-    private static final String LOCK_KEY = "redis:pending:msg:lock";
+    private static final String LOCK_KEY = "redis:stream:pending-message-resend:lock";
 
     /**
      * 消息超时时间,默认 5 分钟
      *
      * 1. 超时的消息才会被重新投递
-     * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到
+     * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息 5 分钟过期后,再等 1 分钟才会被扫瞄到
      */
     private static final int EXPIRE_TIME = 5 * 60;
 
@@ -39,7 +39,7 @@ public class RedisPendingMessageResendJob {
     private final RedissonClient redissonClient;
 
     /**
-     * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
+     * 一分钟执行一次,这里选择每分钟的 35 秒执行,是为了避免整点任务过多的问题
      */
     @Scheduled(cron = "35 * * * * ?")
     public void messageResend() {

+ 72 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisStreamMessageCleanupJob.java

@@ -0,0 +1,72 @@
+package cn.iocoder.yudao.framework.mq.redis.core.job;
+
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.data.redis.core.StreamOperations;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import java.util.List;
+
+/**
+ * Redis Stream 消息清理任务
+ * 用于定期清理已消费的消息,防止内存占用过大
+ *
+ * @see <a href="https://www.cnblogs.com/nanxiang/p/16179519.html">记一次 redis stream 数据类型内存不释放问题</a>
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+@AllArgsConstructor
+public class RedisStreamMessageCleanupJob {
+
+    private static final String LOCK_KEY = "redis:stream:message-cleanup:lock";
+
+    /**
+     * 保留的消息数量,默认保留最近 10000 条消息
+     */
+    private static final long MAX_COUNT = 10000;
+
+    private final List<AbstractRedisStreamMessageListener<?>> listeners;
+    private final RedisMQTemplate redisTemplate;
+    private final RedissonClient redissonClient;
+
+    /**
+     * 每小时执行一次清理任务
+     */
+    @Scheduled(cron = "0 0 * * * ?")
+    public void cleanup() {
+        RLock lock = redissonClient.getLock(LOCK_KEY);
+        // 尝试加锁
+        if (lock.tryLock()) {
+            try {
+                execute();
+            } catch (Exception ex) {
+                log.error("[cleanup][执行异常]", ex);
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * 执行清理逻辑
+     */
+    private void execute() {
+        StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
+        listeners.forEach(listener -> {
+            try {
+                // 使用 XTRIM 命令清理消息,只保留最近的 MAX_LEN 条消息
+                Long trimCount = ops.trim(listener.getStreamKey(), MAX_COUNT, true);
+                if (trimCount != null && trimCount > 0) {
+                    log.info("[execute][Stream({}) 清理消息数量({})]", listener.getStreamKey(), trimCount);
+                }
+            } catch (Exception ex) {
+                log.error("[execute][Stream({}) 清理异常]", listener.getStreamKey(), ex);
+            }
+        });
+    }
+}