|
|
@@ -1,6 +1,5 @@
|
|
|
package cn.iocoder.yudao.module.iscs.utils;
|
|
|
|
|
|
-
|
|
|
import jakarta.websocket.*;
|
|
|
import jakarta.websocket.server.PathParam;
|
|
|
import jakarta.websocket.server.ServerEndpoint;
|
|
|
@@ -11,180 +10,240 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.CopyOnWriteArraySet;
|
|
|
|
|
|
+/**
|
|
|
+ * 支持同一个code多端接收消息的WebSocket(任务工单日志场景)
|
|
|
+ */
|
|
|
@Component
|
|
|
@ServerEndpoint("/websocket/jobTicketLog/{code}")
|
|
|
@Slf4j
|
|
|
public class WebSocketJobTicketLog {
|
|
|
+ // 当前连接的会话对象
|
|
|
private Session session;
|
|
|
+ // 当前连接对应的code
|
|
|
private String code;
|
|
|
- // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
|
|
|
+
|
|
|
+ // 1. 全局状态管理(均保证线程安全)
|
|
|
+ // 总在线连接数(原子性统计)
|
|
|
private static int onlineCount = 0;
|
|
|
+ // 存放所有连接实例的集合(备用)
|
|
|
private static CopyOnWriteArraySet<WebSocketJobTicketLog> webSocketSet = new CopyOnWriteArraySet<>();
|
|
|
- //concurrent包的线程安全set,用来存放每个客户端对应的MyWebSocket对象
|
|
|
- private static ConcurrentHashMap<String, WebSocketJobTicketLog> webSocketMap2 = new ConcurrentHashMap();
|
|
|
-
|
|
|
- // 为了保存在线用户信息,在方法中新建一个list存储一下【实际项目依据复杂度,可以存储到数据库或者缓存】
|
|
|
+ // 核心映射:code -> 该code对应的所有连接(解决同code多连接问题)
|
|
|
+ private static ConcurrentHashMap<String, Set<WebSocketJobTicketLog>> webSocketMap2 = new ConcurrentHashMap<>();
|
|
|
+ // 存放所有会话的集合(备用,群发场景)
|
|
|
private final static List<Session> SESSIONS = Collections.synchronizedList(new ArrayList<>());
|
|
|
+
|
|
|
+ // 心跳线程(每个连接独立线程)
|
|
|
private Thread heartbeatThread;
|
|
|
|
|
|
/**
|
|
|
- * 建立连接
|
|
|
- *
|
|
|
- * @param session
|
|
|
- * @param code
|
|
|
+ * 2. 连接建立逻辑(核心优化:同code连接加入集合,不覆盖)
|
|
|
*/
|
|
|
@OnOpen
|
|
|
public void onOpen(Session session, @PathParam("code") String code) {
|
|
|
this.session = session;
|
|
|
this.code = code;
|
|
|
+
|
|
|
+ // 加入全局集合
|
|
|
webSocketSet.add(this);
|
|
|
SESSIONS.add(session);
|
|
|
- if (webSocketMap2.containsKey(code)) {
|
|
|
- webSocketMap2.remove(code);
|
|
|
- webSocketMap2.put(code, this);
|
|
|
- } else {
|
|
|
- webSocketMap2.put(code, this);
|
|
|
- addOnlineCount();
|
|
|
- }
|
|
|
- // 心跳机制
|
|
|
+
|
|
|
+ // 关键:同code的连接存入同一个Set(不存在则创建,线程安全)
|
|
|
+ webSocketMap2.computeIfAbsent(code, k -> ConcurrentHashMap.newKeySet())
|
|
|
+ .add(this);
|
|
|
+
|
|
|
+ // 在线数+1(线程安全)
|
|
|
+ addOnlineCount();
|
|
|
+
|
|
|
+ // 启动心跳线程(优化:增加中断检查,避免泄漏)
|
|
|
heartbeatThread = new Thread(() -> {
|
|
|
try {
|
|
|
- sendHeartbeats(session, code);
|
|
|
+ sendHeartbeats();
|
|
|
} catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ log.error("[code:{}] 心跳线程异常", code, e);
|
|
|
}
|
|
|
- });
|
|
|
+ }, "WebSocket-Heartbeat-" + code + "-" + session.getId()); // 线程命名,便于排查
|
|
|
heartbeatThread.start();
|
|
|
- log.info("[连接ID:{}] 建立连接, 当前连接数:{}", this.code, webSocketMap2.size());
|
|
|
- log.info("线程序号{}, 当前存活线程数{}", heartbeatThread.getId(), Thread.activeCount());
|
|
|
|
|
|
+ // 日志优化:打印总连接数(而非code数量)
|
|
|
+ log.info("[code:{}] 建立WebSocket连接,会话ID:{},当前总在线连接数:{}",
|
|
|
+ code, session.getId(), getOnlineCount());
|
|
|
+ log.info("[code:{}] 心跳线程启动,线程序号:{},当前存活线程数:{}",
|
|
|
+ code, heartbeatThread.getId(), Thread.activeCount());
|
|
|
}
|
|
|
|
|
|
- private void sendHeartbeats(Session session, String code) throws IOException {
|
|
|
- int heartbeatInterval = 60000; // 60 seconds
|
|
|
- while (true) {
|
|
|
+ /**
|
|
|
+ * 3. 心跳发送逻辑(优化:中断感知+连接状态检查)
|
|
|
+ */
|
|
|
+ private void sendHeartbeats() throws IOException {
|
|
|
+ int heartbeatInterval = 60000; // 60秒间隔
|
|
|
+ // 循环条件:线程未中断 + 会话未关闭
|
|
|
+ while (!Thread.currentThread().isInterrupted() && session.isOpen()) {
|
|
|
try {
|
|
|
Thread.sleep(heartbeatInterval);
|
|
|
+ // 仅在会话打开时发送心跳
|
|
|
session.getBasicRemote().sendText("heartbeat");
|
|
|
- } catch (InterruptedException | IOException e) {
|
|
|
+ log.debug("[code:{}] 发送心跳包,会话ID:{}", code, session.getId());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // 线程被中断(关闭连接时触发),恢复中断状态并退出
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.info("[code:{}] 心跳线程被中断,会话ID:{}", code, session.getId());
|
|
|
+ break;
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("[code:{}] 心跳包发送失败,会话ID:{}", code, session.getId(), e);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
+ log.debug("[code:{}] 心跳线程退出,会话ID:{}", code, session.getId());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 断开连接
|
|
|
+ * 4. 连接关闭逻辑(核心优化:仅删当前连接,不删同code其他连接)
|
|
|
*/
|
|
|
@OnClose
|
|
|
public void onClose() {
|
|
|
+ // 从全局集合移除当前连接
|
|
|
webSocketSet.remove(this);
|
|
|
- if (webSocketMap2.containsKey(code)) {
|
|
|
- webSocketMap2.remove(code);
|
|
|
- subOnlineCount();
|
|
|
+ SESSIONS.remove(this.session);
|
|
|
+
|
|
|
+ // 关键:仅从code对应的Set中移除当前连接,避免误删同code其他连接
|
|
|
+ Set<WebSocketJobTicketLog> codeConnections = webSocketMap2.get(code);
|
|
|
+ if (codeConnections != null) {
|
|
|
+ codeConnections.remove(this);
|
|
|
+ // 若该code无剩余连接,删除code键(避免内存泄漏)
|
|
|
+ if (codeConnections.isEmpty()) {
|
|
|
+ webSocketMap2.remove(code);
|
|
|
+ log.debug("[code:{}] 无剩余连接,移除code映射", code);
|
|
|
+ }
|
|
|
}
|
|
|
- // 关闭当前线程
|
|
|
+
|
|
|
+ // 在线数-1
|
|
|
+ subOnlineCount();
|
|
|
+
|
|
|
+ // 关闭心跳线程(优化:安全中断+超时等待)
|
|
|
if (heartbeatThread != null && heartbeatThread.isAlive()) {
|
|
|
- log.info("关闭线程序号{}, 当前存活线程数{}", heartbeatThread.getId(), Thread.activeCount());
|
|
|
- heartbeatThread.interrupt(); // Interrupt the heartbeat thread
|
|
|
+ log.info("[code:{}] 关闭心跳线程,线程序号:{}", code, heartbeatThread.getId());
|
|
|
+ heartbeatThread.interrupt(); // 触发线程中断
|
|
|
try {
|
|
|
- heartbeatThread.join(); // Wait for the thread to finish
|
|
|
+ heartbeatThread.join(1000); // 等待1秒,确保线程退出
|
|
|
} catch (InterruptedException e) {
|
|
|
- log.error("Error joining heartbeat thread for connection ID: {}", code, e);
|
|
|
- log.info("关闭线程序号{}, 当前存活线程数{}", Thread.currentThread().getId(), Thread.activeCount());
|
|
|
- Thread.currentThread().interrupt(); // Restore the interruption status
|
|
|
+ log.error("[连接ID:{}] 心跳线程关闭异常", code, e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
}
|
|
|
}
|
|
|
- log.info("[连接ID:{}] 断开连接, 当前连接数:{}", code, webSocketMap2.size());
|
|
|
+
|
|
|
+ // 日志优化:打印总连接数
|
|
|
+ log.info("[code:{}] 断开WebSocket连接,会话ID:{},当前总在线连接数:{}",
|
|
|
+ code, session.getId(), getOnlineCount());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送错误
|
|
|
- *
|
|
|
- * @param session
|
|
|
- * @param error
|
|
|
+ * 5. 错误处理(优化:完整日志堆栈)
|
|
|
*/
|
|
|
@OnError
|
|
|
public void onError(Session session, Throwable error) {
|
|
|
- log.info("[连接ID:{}] 错误原因:{}", this.code, error.getMessage());
|
|
|
- error.printStackTrace();
|
|
|
- // 发生错误时,关闭连接
|
|
|
- // conn.close(500, "连接出错");
|
|
|
+ log.error("[code:{}] WebSocket连接错误,会话ID:{},错误原因:",
|
|
|
+ code, session.getId(), error); // 打印完整堆栈
|
|
|
+ // 错误时主动关闭连接(避免僵尸连接)
|
|
|
+ if (session.isOpen()) {
|
|
|
+ try {
|
|
|
+ session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "连接异常"));
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("[code:{}] 错误后关闭会话失败,会话ID:{}", code, session.getId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 收到消息
|
|
|
- *
|
|
|
- * @param message
|
|
|
+ * 6. 接收客户端消息(优化:日志详情)
|
|
|
*/
|
|
|
@OnMessage
|
|
|
public void onMessage(String message) {
|
|
|
- // log.info("【websocket消息】收到客户端发来的消息:{}", message);
|
|
|
- log.info("[连接ID:{}] 收到消息:{}", this.code, message);
|
|
|
+ log.info("[code:{}] 收到客户端消息,会话ID:{},消息内容:{}",
|
|
|
+ code, session.getId(), message);
|
|
|
+ // 如需响应客户端,可添加 session.getBasicRemote().sendText("已收到:" + message);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送消息
|
|
|
- *
|
|
|
- * @param message
|
|
|
- * @param code
|
|
|
+ * 7. 核心功能:给指定code的所有连接发送消息(修复类型不匹配+遍历群发)
|
|
|
+ * @param code 目标code
|
|
|
+ * @param message 消息内容
|
|
|
*/
|
|
|
public static void sendMessage(String code, String message) {
|
|
|
- WebSocketJobTicketLog webSocketIots = webSocketMap2.get(code);
|
|
|
- log.info("【websocket消息】推送消息, webSocketServer={}", webSocketIots);
|
|
|
- if (webSocketIots != null) {
|
|
|
- log.info("【websocket消息】推送消息, message={}", message);
|
|
|
+ // 1. 获取该code对应的所有连接
|
|
|
+ Set<WebSocketJobTicketLog> codeConnections = webSocketMap2.get(code);
|
|
|
+ if (codeConnections == null || codeConnections.isEmpty()) {
|
|
|
+ log.warn("【WebSocket】给code:{}发送消息失败,无在线连接", code);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 遍历所有连接,逐个发送(确保每个连接都收到)
|
|
|
+ int successCount = 0;
|
|
|
+ for (WebSocketJobTicketLog connection : codeConnections) {
|
|
|
try {
|
|
|
- webSocketIots.session.getBasicRemote().sendText(message);
|
|
|
+ Session session = connection.session;
|
|
|
+ if (session.isOpen()) {
|
|
|
+ session.getBasicRemote().sendText(message);
|
|
|
+ successCount++;
|
|
|
+ log.debug("【WebSocket】给code:{}的会话ID:{}发送消息成功,内容:{}",
|
|
|
+ code, session.getId(), message);
|
|
|
+ } else {
|
|
|
+ log.warn("【WebSocket】给code:{}的会话ID:{}发送消息失败,会话已关闭",
|
|
|
+ code, session.getId());
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- log.error("[连接ID:{}] 发送消息失败, 消息:{}", code, message, e);
|
|
|
+ log.error("【WebSocket】给code:{}的连接发送消息失败,内容:{}",
|
|
|
+ code, message, e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ log.info("【WebSocket】给code:{}发送消息完成,共{}个连接,成功{}个",
|
|
|
+ code, codeConnections.size(), successCount);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 群发消息
|
|
|
- *
|
|
|
- * @param message
|
|
|
+ * 8. 群发消息(优化:code参数打印格式)
|
|
|
+ * @param message 群发内容
|
|
|
*/
|
|
|
public static void sendMassMessage(String message) {
|
|
|
+ int successCount = 0;
|
|
|
try {
|
|
|
+ // 遍历所有会话,逐个发送
|
|
|
for (Session session : SESSIONS) {
|
|
|
if (session.isOpen()) {
|
|
|
+ // 修复:获取code(getRequestParameterMap返回List,取第一个元素)
|
|
|
+ List<String> codeList = session.getRequestParameterMap().get("code");
|
|
|
+ String code = codeList != null && !codeList.isEmpty() ? codeList.get(0) : "未知";
|
|
|
+
|
|
|
session.getBasicRemote().sendText(message);
|
|
|
- log.info("[连接ID:{}] 发送消息:{}", session.getRequestParameterMap().get("code"), message);
|
|
|
+ successCount++;
|
|
|
+ log.debug("【WebSocket】群发消息成功,会话ID:{},code:{},内容:{}",
|
|
|
+ session.getId(), code, message);
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ log.error("【WebSocket】群发消息失败", e);
|
|
|
}
|
|
|
+
|
|
|
+ log.info("【WebSocket】群发消息完成,共{}个会话,成功{}个",
|
|
|
+ SESSIONS.size(), successCount);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取当前连接数
|
|
|
- *
|
|
|
- * @return
|
|
|
+ * 9. 在线数统计(线程安全,原子操作)
|
|
|
*/
|
|
|
public static synchronized int getOnlineCount() {
|
|
|
return onlineCount;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 当前连接数加一
|
|
|
- */
|
|
|
public static synchronized void addOnlineCount() {
|
|
|
WebSocketJobTicketLog.onlineCount++;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 当前连接数减一
|
|
|
- */
|
|
|
public static synchronized void subOnlineCount() {
|
|
|
WebSocketJobTicketLog.onlineCount--;
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
-
|