Эх сурвалжийг харах

WENSOCKET发送作业操作日志消息

车车 3 сар өмнө
parent
commit
acd1ea2a8d

+ 189 - 0
ktg-framework/src/main/java/com/ktg/framework/websocket/WebSocketJobTicketLog.java

@@ -0,0 +1,189 @@
+package com.ktg.framework.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+@Component
+@ServerEndpoint("/websocket/jobTicketLog/{code}")
+@Slf4j
+public class WebSocketJobTicketLog {
+    private Session session;
+    private String code;
+    // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+    private static int onlineCount = 0;
+    private static CopyOnWriteArraySet<WebSocketJobTicketLog> webSocketSet = new CopyOnWriteArraySet<>();
+    //concurrent包的线程安全set,用来存放每个客户端对应的MyWebSocket对象
+    private static ConcurrentHashMap<String, WebSocketJobTicketLog> webSocketMap2 = new ConcurrentHashMap();
+
+    // 为了保存在线用户信息,在方法中新建一个list存储一下【实际项目依据复杂度,可以存储到数据库或者缓存】
+    private final static List<Session> SESSIONS = Collections.synchronizedList(new ArrayList<>());
+    private Thread heartbeatThread;
+
+    /**
+     * 建立连接
+     *
+     * @param session
+     * @param code
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("code") String code) {
+        this.session = session;
+        this.code = code;
+        webSocketSet.add(this);
+        SESSIONS.add(session);
+        if (webSocketMap2.containsKey(code)) {
+            webSocketMap2.remove(code);
+            webSocketMap2.put(code, this);
+        } else {
+            webSocketMap2.put(code, this);
+            addOnlineCount();
+        }
+        // 心跳机制
+        heartbeatThread = new Thread(() -> {
+            try {
+                sendHeartbeats(session, code);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        });
+        heartbeatThread.start();
+        log.info("[连接ID:{}] 建立连接, 当前连接数:{}", this.code, webSocketMap2.size());
+        log.info("线程序号{}, 当前存活线程数{}", heartbeatThread.getId(), Thread.activeCount());
+
+    }
+
+    private void sendHeartbeats(Session session, String code) throws IOException {
+        int heartbeatInterval = 60000; // 60 seconds
+        while (true) {
+            try {
+                Thread.sleep(heartbeatInterval);
+                session.getBasicRemote().sendText("heartbeat");
+            } catch (InterruptedException | IOException e) {
+                break;
+            }
+        }
+    }
+
+    /**
+     * 断开连接
+     */
+    @OnClose
+    public void onClose() {
+        webSocketSet.remove(this);
+        if (webSocketMap2.containsKey(code)) {
+            webSocketMap2.remove(code);
+            subOnlineCount();
+        }
+        // 关闭当前线程
+        if (heartbeatThread != null && heartbeatThread.isAlive()) {
+            log.info("关闭线程序号{}, 当前存活线程数{}", heartbeatThread.getId(), Thread.activeCount());
+            heartbeatThread.interrupt(); // Interrupt the heartbeat thread
+            try {
+                heartbeatThread.join(); // Wait for the thread to finish
+            } catch (InterruptedException e) {
+                log.error("Error joining heartbeat thread for connection ID: {}", code, e);
+                log.info("关闭线程序号{}, 当前存活线程数{}", Thread.currentThread().getId(), Thread.activeCount());
+                Thread.currentThread().interrupt(); // Restore the interruption status
+            }
+        }
+        log.info("[连接ID:{}] 断开连接, 当前连接数:{}", code, webSocketMap2.size());
+    }
+
+    /**
+     * 发送错误
+     *
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.info("[连接ID:{}] 错误原因:{}", this.code, error.getMessage());
+        error.printStackTrace();
+        // 发生错误时,关闭连接
+        // conn.close(500, "连接出错");
+    }
+
+    /**
+     * 收到消息
+     *
+     * @param message
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        // log.info("【websocket消息】收到客户端发来的消息:{}", message);
+        log.info("[连接ID:{}] 收到消息:{}", this.code, message);
+    }
+
+    /**
+     * 发送消息
+     *
+     * @param message
+     * @param code
+     */
+    public static void sendMessage(String code, String message) {
+        WebSocketJobTicketLog webSocketIots = webSocketMap2.get(code);
+        log.info("【websocket消息】推送消息, webSocketServer={}", webSocketIots);
+        if (webSocketIots != null) {
+            log.info("【websocket消息】推送消息, message={}", message);
+            try {
+                webSocketIots.session.getBasicRemote().sendText(message);
+            } catch (Exception e) {
+                e.printStackTrace();
+                log.error("[连接ID:{}] 发送消息失败, 消息:{}", code, message, e);
+            }
+        }
+    }
+
+    /**
+     * 群发消息
+     *
+     * @param message
+     */
+    public static void sendMassMessage(String message) {
+        try {
+            for (Session session : SESSIONS) {
+                if (session.isOpen()) {
+                    session.getBasicRemote().sendText(message);
+                    log.info("[连接ID:{}] 发送消息:{}", session.getRequestParameterMap().get("code"), message);
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 获取当前连接数
+     *
+     * @return
+     */
+    public static synchronized int getOnlineCount() {
+        return onlineCount;
+    }
+
+    /**
+     * 当前连接数加一
+     */
+    public static synchronized void addOnlineCount() {
+        WebSocketJobTicketLog.onlineCount++;
+    }
+
+    /**
+     * 当前连接数减一
+     */
+    public static synchronized void subOnlineCount() {
+        WebSocketJobTicketLog.onlineCount--;
+    }
+
+}
+

+ 10 - 0
ktg-iscs/src/main/java/com/ktg/iscs/controller/TestIscsController.java

@@ -12,6 +12,7 @@ import com.ktg.common.utils.poi.ExcelUtil;
 import com.ktg.common.utils.FingerprintComparisonByImg;
 import com.ktg.iscs.domain.TestIscs;
 import com.ktg.common.vo.VerificationVO;
+import com.ktg.iscs.service.IIsTicketOperLogService;
 import com.ktg.iscs.service.ITestIscsService;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -39,6 +40,8 @@ public class TestIscsController extends BaseController
 {
     @Autowired
     private ITestIscsService testIscsService;
+    @Autowired
+    private IIsTicketOperLogService isTicketOperLogService;
 
     /**
      * 查询测试用分页
@@ -137,4 +140,11 @@ public class TestIscsController extends BaseController
         return CommonResult.success(verificationVO);
     }
 
+    @ApiOperation("测试websocket-----------")
+    @GetMapping(value = "/testWebsocketLog")
+    public CommonResult<Boolean> testWebsocketLog(Long id, String name) {
+        Boolean b = isTicketOperLogService.addLog1(id, name);
+        return CommonResult.success(b);
+    }
+
 }

+ 24 - 11
ktg-iscs/src/main/java/com/ktg/iscs/service/impl/IsTicketOperLogServiceImpl.java

@@ -3,7 +3,9 @@ package com.ktg.iscs.service.impl;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.ktg.common.utils.DateUtils;
 import com.ktg.common.utils.SecurityUtils;
+import com.ktg.framework.websocket.WebSocketJobTicketLog;
 import com.ktg.iscs.domain.IsTicketOperLog;
 import com.ktg.iscs.mapper.IsTicketOperLogMapper;
 import com.ktg.iscs.service.IIsTicketOperLogService;
@@ -45,8 +47,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("启动作业:" + date + " 作业[" +  jobName + "]已启动。");
+        isTicketOperLog.setOperationContent("启动作业:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + " 作业[" +  jobName + "]已启动。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -60,8 +63,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(userName);
-        isTicketOperLog.setOperationContent("操作确认:" + date + "[" + userName +"]已确认执行[" +  title + "]。");
+        isTicketOperLog.setOperationContent("操作确认:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + userName +"]已确认执行[" +  title + "]。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -83,8 +87,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("⼈员变动:" + date + "[" + username +"]已" + type + lockType + "[" +lockUser + "]。");
+        isTicketOperLog.setOperationContent("⼈员变动:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]已" + type + lockType + "[" +lockUser + "]。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -100,8 +105,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("启动上锁:" + date + "[" + username +"]启动上锁。");
+        isTicketOperLog.setOperationContent("启动上锁:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]启动上锁。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -117,8 +123,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("完成上锁:" + date + "[" + username +"]完成上锁," + pointNames + "已上锁。");
+        isTicketOperLog.setOperationContent("完成上锁:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]完成上锁," + pointNames + "已上锁。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -134,8 +141,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("添加共锁:" + date + "[" + colockUser + "]已添加共锁。");
+        isTicketOperLog.setOperationContent("添加共锁:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + colockUser + "]已添加共锁。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -151,8 +159,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("解除共锁:" + date + "[" + colockUser + "]已解除共锁。");
+        isTicketOperLog.setOperationContent("解除共锁:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + colockUser + "]已解除共锁。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -168,8 +177,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("启动解锁:" + date + "[" + username +"]启动解锁。");
+        isTicketOperLog.setOperationContent("启动解锁:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]启动解锁。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -185,8 +195,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("完成解锁:" + date + "[" + username +"]完成解锁," + pointNames + "已解锁。");
+        isTicketOperLog.setOperationContent("完成解锁:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]完成解锁," + pointNames + "已解锁。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -202,8 +213,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("取消作业:" + date + "[" + username +"]已取消作业[" + jobName + "]。");
+        isTicketOperLog.setOperationContent("取消作业:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]已取消作业[" + jobName + "]。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }
 
@@ -219,8 +231,9 @@ public class IsTicketOperLogServiceImpl extends ServiceImpl<IsTicketOperLogMappe
         isTicketOperLog.setOperationTime(date);
         isTicketOperLog.setOperationUserId(userId);
         isTicketOperLog.setOperationUserName(username);
-        isTicketOperLog.setOperationContent("结束作业:" + date + "[" + username +"]已完成作业[" + jobName + "]。");
+        isTicketOperLog.setOperationContent("结束作业:" + DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, date) + "[" + username +"]已完成作业[" + jobName + "]。");
         save(isTicketOperLog);
+        WebSocketJobTicketLog.sendMessage(String.valueOf(jobId), isTicketOperLog.toString());
         return true;
     }