车车 1 сар өмнө
parent
commit
cc38aae6ce

+ 149 - 78
ktg-framework/src/main/java/com/ktg/framework/websocket/WebSocketJobTicketLog.java

@@ -10,180 +10,251 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 
+/**
+ * 支持同code多端接收消息的WebSocket(任务工单日志场景)
+ * 优化点:解决同code连接覆盖、消息群发、线程泄漏、日志准确性问题
+ */
 @Component
 @ServerEndpoint("/websocket/jobTicketLog/{code}")
 @Slf4j
 public class WebSocketJobTicketLog {
+    // 当前连接的会话对象(每个连接独立)
     private Session session;
+    // 当前连接对应的业务code(如工单ID)
     private String code;
-    // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+
+    // -------------------------- 全局状态管理(均保证线程安全)--------------------------
+    // 1. 总在线连接数(原子性统计,避免并发计数错误)
     private static int onlineCount = 0;
+    // 2. 存放所有连接实例的集合(备用,用于全局连接管理)
     private static CopyOnWriteArraySet<WebSocketJobTicketLog> webSocketSet = new CopyOnWriteArraySet<>();
-    //concurrent包的线程安全set,用来存放每个客户端对应的MyWebSocket对象
-    private static ConcurrentHashMap<String, WebSocketJobTicketLog> webSocketMap2 = new ConcurrentHashMap();
-
-    // 为了保存在线用户信息,在方法中新建一个list存储一下【实际项目依据复杂度,可以存储到数据库或者缓存】
+    // 3. 核心映射:code -> 该code对应的所有连接(解决同code多连接问题,关键优化)
+    private static ConcurrentHashMap<String, Set<WebSocketJobTicketLog>> webSocketMap2 = new ConcurrentHashMap<>();
+    // 4. 存放所有会话的集合(备用,用于全量群发场景)
     private final static List<Session> SESSIONS = Collections.synchronizedList(new ArrayList<>());
+
+    // 心跳线程(每个连接独立启动,避免单线程阻塞)
     private Thread heartbeatThread;
 
     /**
-     * 建立连接
-     *
-     * @param session
-     * @param code
+     * 1. 连接建立逻辑(核心优化:同code连接加入集合,不覆盖)
      */
     @OnOpen
     public void onOpen(Session session, @PathParam("code") String code) {
         this.session = session;
         this.code = code;
+
+        // 加入全局连接/会话集合
         webSocketSet.add(this);
         SESSIONS.add(session);
-        if (webSocketMap2.containsKey(code)) {
-            webSocketMap2.remove(code);
-            webSocketMap2.put(code, this);
-        } else {
-            webSocketMap2.put(code, this);
-            addOnlineCount();
-        }
-        // 心跳机制
+
+        // 关键操作:同code的连接存入同一个Set(不存在则创建,线程安全)
+        // ConcurrentHashMap.newKeySet() 生成线程安全的Set,避免多线程操作冲突
+        webSocketMap2.computeIfAbsent(code, k -> ConcurrentHashMap.newKeySet())
+                .add(this);
+
+        // 在线数+1(synchronized保证原子性)
+        addOnlineCount();
+
+        // 启动心跳线程(优化:线程命名+中断感知,避免泄漏)
         heartbeatThread = new Thread(() -> {
             try {
-                sendHeartbeats(session, code);
+                sendHeartbeats();
             } catch (IOException e) {
-                e.printStackTrace();
+                log.error("[code:{}] 心跳线程异常,会话ID:{}", code, session.getId(), e);
             }
-        });
+        }, "WebSocket-Heartbeat-" + code + "-" + session.getId()); // 线程命名,便于排查
         heartbeatThread.start();
-        log.info("[连接ID:{}] 建立连接, 当前连接数:{}", this.code, webSocketMap2.size());
-        log.info("线程序号{}, 当前存活线程数{}", heartbeatThread.getId(), Thread.activeCount());
 
+        // 日志优化:打印“总在线连接数”而非“code数量”,补充会话ID
+        log.info("[code:{}] WebSocket连接建立成功,会话ID:{},当前总在线连接数:{}",
+                code, session.getId(), getOnlineCount());
+        log.info("[code:{}] 心跳线程启动,线程序号:{},当前存活线程数:{}",
+                code, heartbeatThread.getId(), Thread.activeCount());
     }
 
-    private void sendHeartbeats(Session session, String code) throws IOException {
-        int heartbeatInterval = 60000; // 60 seconds
-        while (true) {
+    /**
+     * 2. 心跳发送逻辑(优化:中断感知+连接状态检查,避免线程泄漏)
+     */
+    private void sendHeartbeats() throws IOException {
+        int heartbeatInterval = 60000; // 心跳间隔:60秒(可配置化)
+        // 循环条件:线程未中断 + 会话未关闭(双重保障,确保连接关闭后线程退出)
+        while (!Thread.currentThread().isInterrupted() && session.isOpen()) {
             try {
                 Thread.sleep(heartbeatInterval);
+                // 仅在会话打开时发送心跳(避免无效发送)
                 session.getBasicRemote().sendText("heartbeat");
-            } catch (InterruptedException | IOException e) {
+                log.debug("[code:{}] 心跳包发送成功,会话ID:{}", code, session.getId());
+            } catch (InterruptedException e) {
+                // 线程被中断(连接关闭时触发),恢复中断状态并退出循环
+                Thread.currentThread().interrupt();
+                log.info("[code:{}] 心跳线程被中断,会话ID:{}", code, session.getId());
+                break;
+            } catch (IOException e) {
+                log.error("[code:{}] 心跳包发送失败,会话ID:{}", code, session.getId(), e);
                 break;
             }
         }
+        log.debug("[code:{}] 心跳线程正常退出,会话ID:{}", code, session.getId());
     }
 
     /**
-     * 断开连接
+     * 3. 连接关闭逻辑(核心优化:仅删当前连接,不删同code其他连接)
      */
     @OnClose
     public void onClose() {
+        // 从全局集合移除当前连接
         webSocketSet.remove(this);
-        if (webSocketMap2.containsKey(code)) {
-            webSocketMap2.remove(code);
-            subOnlineCount();
+        SESSIONS.remove(this.session);
+
+        // 关键操作:仅移除当前连接,保留同code其他连接
+        Set<WebSocketJobTicketLog> codeConnections = webSocketMap2.get(code);
+        if (codeConnections != null) {
+            codeConnections.remove(this);
+            // 若该code无剩余连接,删除code键(避免内存泄漏)
+            if (codeConnections.isEmpty()) {
+                webSocketMap2.remove(code);
+                log.debug("[code:{}] 无剩余连接,移除code映射", code);
+            }
         }
-        // 关闭当前线程
+
+        // 在线数-1(原子操作)
+        subOnlineCount();
+
+        // 关闭心跳线程(优化:安全中断+超时等待,确保线程退出)
         if (heartbeatThread != null && heartbeatThread.isAlive()) {
-            log.info("关闭线程序号{}, 当前存活线程数{}", heartbeatThread.getId(), Thread.activeCount());
-            heartbeatThread.interrupt(); // Interrupt the heartbeat thread
+            log.info("[code:{}] 准备关闭心跳线程,线程序号:{},会话ID:{}",
+                    code, heartbeatThread.getId(), session.getId());
+            heartbeatThread.interrupt(); // 触发线程中断
             try {
-                heartbeatThread.join(); // Wait for the thread to finish
+                // 等待1秒,确保线程有时间退出(避免资源占用)
+                heartbeatThread.join(1000);
             } catch (InterruptedException e) {
-                log.error("Error joining heartbeat thread for connection ID: {}", code, e);
-                log.info("关闭线程序号{}, 当前存活线程数{}", Thread.currentThread().getId(), Thread.activeCount());
-                Thread.currentThread().interrupt(); // Restore the interruption status
+                log.error("[code:{}] 心跳线程关闭异常,会话ID:{}", code, session.getId(), e);
+                Thread.currentThread().interrupt(); // 恢复当前线程中断状态
             }
         }
-        log.info("[连接ID:{}] 断开连接, 当前连接数:{}", code, webSocketMap2.size());
+
+        // 日志优化:补充会话ID,打印真实在线数
+        log.info("[code:{}] WebSocket连接断开,会话ID:{},当前总在线连接数:{}",
+                code, session.getId(), getOnlineCount());
     }
 
     /**
-     * 发送错误
-     *
-     * @param session
-     * @param error
+     * 4. 错误处理(优化:完整异常堆栈+主动关闭会话,避免僵尸连接)
      */
     @OnError
     public void onError(Session session, Throwable error) {
-        log.info("[连接ID:{}] 错误原因:{}", this.code, error.getMessage());
-        error.printStackTrace();
-        // 发生错误时,关闭连接
-        // conn.close(500, "连接出错");
+        // 打印完整异常堆栈(原代码仅打印message,无法定位问题)
+        log.error("[code:{}] WebSocket连接发生错误,会话ID:{},错误原因:",
+                code, session.getId(), error);
+
+        // 主动关闭异常会话(避免僵尸连接占用资源)
+        if (session.isOpen()) {
+            try {
+                session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION,
+                        "连接异常,主动关闭"));
+            } catch (IOException e) {
+                log.error("[code:{}] 异常后关闭会话失败,会话ID:{}", code, session.getId(), e);
+            }
+        }
     }
 
     /**
-     * 收到消息
-     *
-     * @param message
+     * 5. 接收客户端消息(优化:补充会话ID,便于定位消息来源)
      */
     @OnMessage
     public void onMessage(String message) {
-        // log.info("【websocket消息】收到客户端发来的消息:{}", message);
-        log.info("[连接ID:{}] 收到消息:{}", this.code, message);
+        log.info("[code:{}] 收到客户端消息,会话ID:{},消息内容:{}",
+                code, session.getId(), message);
+        // 可选:如需响应客户端,可添加如下代码
+        // try { session.getBasicRemote().sendText("已收到消息:" + message); }
+        // catch (IOException e) { log.error("响应客户端失败", e); }
     }
 
     /**
-     * 发送消息
-     *
-     * @param message
-     * @param code
+     * 6. 核心功能:给指定code的所有连接发送消息(修复类型错误+群发逻辑)
+     * @param code 目标业务code(如工单ID)
+     * @param message 消息内容(建议JSON格式,便于客户端解析)
      */
     public static void sendMessage(String code, String message) {
-        WebSocketJobTicketLog webSocketIots = webSocketMap2.get(code);
-        log.info("【websocket消息】推送消息, webSocketServer={}", webSocketIots);
-        if (webSocketIots != null) {
-            log.info("【websocket消息】推送消息, message={}", message);
+        // 1. 获取该code对应的所有连接
+        Set<WebSocketJobTicketLog> codeConnections = webSocketMap2.get(code);
+        if (codeConnections == null || codeConnections.isEmpty()) {
+            log.warn("【WebSocket】给code:{}发送消息失败,无在线连接", code);
+            return;
+        }
+
+        // 2. 遍历所有连接,逐个发送(确保每个连接都收到)
+        int successCount = 0;
+        for (WebSocketJobTicketLog connection : codeConnections) {
             try {
-                webSocketIots.session.getBasicRemote().sendText(message);
+                Session session = connection.session;
+                if (session.isOpen()) {
+                    session.getBasicRemote().sendText(message);
+                    successCount++;
+                    log.debug("【WebSocket】给code:{}的会话ID:{}发送消息成功,内容:{}",
+                            code, session.getId(), message);
+                } else {
+                    log.warn("【WebSocket】给code:{}的会话ID:{}发送消息失败,会话已关闭",
+                            code, session.getId());
+                }
             } catch (Exception e) {
-                e.printStackTrace();
-                log.error("[连接ID:{}] 发送消息失败, 消息:{}", code, message, e);
+                log.error("【WebSocket】给code:{}的连接发送消息失败,内容:{}",
+                        code, message, e);
             }
         }
+
+        // 日志统计:明确发送结果,便于问题排查
+        log.info("【WebSocket】给code:{}发送消息完成,共{}个连接,成功发送{}个",
+                code, codeConnections.size(), successCount);
     }
 
     /**
-     * 群发消息
-     *
-     * @param message
+     * 7. 群发消息(优化:code参数格式化+发送结果统计)
+     * @param message 全量群发的消息内容
      */
     public static void sendMassMessage(String message) {
+        int successCount = 0;
         try {
+            // 遍历所有会话,逐个发送
             for (Session session : SESSIONS) {
                 if (session.isOpen()) {
+                    // 修复:getRequestParameterMap返回List<String>,需取第一个元素(原代码直接打印List)
+                    List<String> codeList = session.getRequestParameterMap().get("code");
+                    String code = (codeList != null && !codeList.isEmpty()) ? codeList.get(0) : "未知code";
+
                     session.getBasicRemote().sendText(message);
-                    log.info("[连接ID:{}] 发送消息:{}", session.getRequestParameterMap().get("code"), message);
+                    successCount++;
+                    log.debug("【WebSocket】群发消息成功,会话ID:{},code:{},内容:{}",
+                            session.getId(), code, message);
                 }
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            log.error("【WebSocket】群发消息失败", e);
         }
+
+        // 统计群发结果
+        log.info("【WebSocket】群发消息完成,共{}个会话,成功发送{}个",
+                SESSIONS.size(), successCount);
     }
 
     /**
-     * 获取当前连接数
-     *
-     * @return
+     * 8. 在线数统计(synchronized保证线程安全,避免并发计数错误)
      */
     public static synchronized int getOnlineCount() {
         return onlineCount;
     }
 
-    /**
-     * 当前连接数加一
-     */
     public static synchronized void addOnlineCount() {
         WebSocketJobTicketLog.onlineCount++;
     }
 
-    /**
-     * 当前连接数减一
-     */
     public static synchronized void subOnlineCount() {
         WebSocketJobTicketLog.onlineCount--;
     }
-
 }
-