Procházet zdrojové kódy

新增微博socket iot

车车 před 1 rokem
rodič
revize
f7b92f179c

+ 7 - 3
ktg-admin/src/main/java/com/ktg/web/controller/iscs/SysTeamController.java

@@ -18,6 +18,7 @@ import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletResponse;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -87,7 +88,8 @@ public class SysTeamController extends BaseController
     @PostMapping("/insertSysTeam")
     public CommonResult<Boolean> insertSysTeam(@RequestBody @Parameter(name = "sysTeam", description = "新增数据类,放到body") SysTeam sysTeam)
     {
-        return CommonResult.success(sysTeamService.insertSysTeam(sysTeam) == 1);
+        int i = sysTeamService.insertSysTeam(sysTeam);
+        return CommonResult.success(i == 1);
     }
 
     /**
@@ -109,8 +111,10 @@ public class SysTeamController extends BaseController
     @PreAuthorize("@ss.hasPermi('iscs:team:remove')")
     @Log(title = "小组", businessType = BusinessType.DELETE)
 	@DeleteMapping("/{ids}")
-    public CommonResult<Boolean> deleteByids(@PathVariable Long[] ids)
+    public CommonResult<Boolean> deleteSysTeamByIds(@PathVariable Long[] ids)
     {
-        return CommonResult.success(sysTeamService.deleteSysTeamByIds(ids) == 1);
+        // return CommonResult.success(sysTeamService.deleteSysTeamByIds(ids) == 1);
+        boolean b = sysTeamService.removeByIds(Arrays.asList(ids));
+        return CommonResult.success(b);
     }
 }

+ 15 - 16
ktg-admin/src/main/java/com/ktg/web/controller/system/SysDeptController.java

@@ -1,19 +1,5 @@
 package com.ktg.web.controller.system;
 
-import java.util.Iterator;
-import java.util.List;
-import org.apache.commons.lang3.ArrayUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.security.access.prepost.PreAuthorize;
-import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.DeleteMapping;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.PutMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
 import com.ktg.common.annotation.Log;
 import com.ktg.common.constant.UserConstants;
 import com.ktg.common.core.controller.BaseController;
@@ -22,22 +8,35 @@ import com.ktg.common.core.domain.entity.SysDept;
 import com.ktg.common.enums.BusinessType;
 import com.ktg.common.utils.StringUtils;
 import com.ktg.system.service.ISysDeptService;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.commons.lang3.ArrayUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
+
+import javax.annotation.security.PermitAll;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * 部门信息
- * 
+ *
  * @author ruoyi
  */
+@Api(tags = "部门")
 @RestController
 @RequestMapping("/system/dept")
 public class SysDeptController extends BaseController
 {
     @Autowired
     private ISysDeptService deptService;
-
     /**
      * 获取部门列表
      */
+    @PermitAll
+    @ApiOperation("查询部门-分页")
     @GetMapping("/list")
     public AjaxResult list(SysDept dept)
     {

+ 1 - 1
ktg-common/src/main/java/com/ktg/common/exception/enums/GlobalErrorCodeConstants.java

@@ -15,7 +15,7 @@ import com.ktg.common.exception.ErrorCode;
  */
 public interface GlobalErrorCodeConstants {
 
-    ErrorCode SUCCESS = new ErrorCode(0, "成功");
+    ErrorCode SUCCESS = new ErrorCode(200, "成功");
 
     // ========== 客户端错误段 ==========
 

+ 175 - 0
ktg-framework/src/main/java/com/ktg/framework/websocket/WebSocketIots.java

@@ -0,0 +1,175 @@
+package com.ktg.framework.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+@Component
+@ServerEndpoint("/websocket/iot/{code}")
+@Slf4j
+public class WebSocketIots {
+    private Session session;
+    private String code;
+    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
+    private static int onlineCount = 0;
+    private static CopyOnWriteArraySet<WebSocketIots> webSocketSet = new CopyOnWriteArraySet<>();
+
+    /**
+     * concurrent包的线程安全set,用来存放每个客户端对应的MyWebSocket对象
+     */
+    private static ConcurrentHashMap<String, WebSocketIots> webSocketMap2 = new ConcurrentHashMap();
+
+    /**
+     * 为了保存在线用户信息,在方法中新建一个list存储一下【实际项目依据复杂度,可以存储到数据库或者缓存】
+     */
+    private final static List<Session> SESSIONS = Collections.synchronizedList(new ArrayList<>());
+
+
+    /**
+     * 建立连接
+     * @param session
+     * @param code
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("code") String code) {
+        this.session = session;
+        this.code = code;
+        webSocketSet.add(this);
+        SESSIONS.add(session);
+        if (webSocketMap2.containsKey(code)) {
+            webSocketMap2.remove(code);
+            webSocketMap2.put(code,this);
+        } else {
+            webSocketMap2.put(code,this);
+            addOnlineCount();
+        }
+        // 心跳机制
+       /* new Thread(() -> {
+            try {
+                sendHeartbeats(session, code);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }).start();*/
+        log.info("[连接ID:{}] 建立连接, 当前连接数:{}", this.code, webSocketMap2.size());
+    }
+    private void sendHeartbeats(Session session, String code) throws IOException {
+        System.out.println();
+        int heartbeatInterval = 5000; // 5 seconds
+        while (true) {
+            try {
+                Thread.sleep(heartbeatInterval);
+                session.getBasicRemote().sendText("heartbeat");
+                System.out.println("Sent heartbeat to client: " + session.getId() + "--code:" + code);
+            } catch (InterruptedException | IOException e) {
+                break; // Stop sending heartbeats if an error occurs
+            }
+        }
+    }
+
+    /**
+     * 断开连接
+     */
+    @OnClose
+    public void onClose() {
+        webSocketSet.remove(this);
+        if (webSocketMap2.containsKey(code)) {
+            webSocketMap2.remove(code);
+            subOnlineCount();
+        }
+        // log.info("【websocket消息】连接断开, 总数:{}", webSocketSet.size());
+        log.info("[连接ID:{}] 断开连接, 当前连接数:{}", code, webSocketMap2.size());
+    }
+
+    /**
+     * 发送错误
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.info("[连接ID:{}] 错误原因:{}", this.code, error.getMessage());
+        error.printStackTrace();
+        // 发生错误时,关闭连接
+        // conn.close(500, "连接出错");
+    }
+
+    /**
+     * 收到消息
+     * @param message
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        // log.info("【websocket消息】收到客户端发来的消息:{}", message);
+        log.info("[连接ID:{}] 收到消息:{}", this.code, message);
+    }
+
+    /**
+     * 发送消息
+     * @param message
+     * @param code
+     */
+    public void sendMessage(String code, String message) {
+        WebSocketIots webSocketIots = webSocketMap2.get(code);
+        log.info("【websocket消息】推送消息, webSocketServer={}", webSocketIots);
+        if (webSocketIots!=null){
+            log.info("【websocket消息】推送消息, message={}", message);
+            try {
+                webSocketIots.session.getBasicRemote().sendText(message);
+            } catch (Exception e) {
+                e.printStackTrace();
+                log.error("[连接ID:{}] 发送消息失败, 消息:{}", this.code, message, e);
+            }
+        }
+    }
+
+    /**
+     * 群发消息
+     * @param message
+     */
+    public void sendMassMessage(String message) {
+        try {
+            for (Session session : SESSIONS) {
+                if (session.isOpen()) {
+                    session.getBasicRemote().sendText(message);
+                    log.info("[连接ID:{}] 发送消息:{}",session.getRequestParameterMap().get("code"),message);
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 获取当前连接数
+     * @return
+     */
+    public static synchronized int getOnlineCount() {
+        return onlineCount;
+    }
+
+    /**
+     * 当前连接数加一
+     */
+    public static synchronized void addOnlineCount() {
+        WebSocketIots.onlineCount++;
+    }
+
+    /**
+     * 当前连接数减一
+     */
+    public static synchronized void subOnlineCount() {
+        WebSocketIots.onlineCount--;
+    }
+
+}
+

+ 31 - 42
ktg-framework/src/main/java/com/ktg/framework/websocket/WebSocketServer.java

@@ -1,20 +1,8 @@
 package com.ktg.framework.websocket;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import com.ktg.common.constant.UserConstants;
-import com.ktg.common.core.domain.entity.SysUser;
 import com.ktg.common.core.domain.model.LoginUser;
 import com.ktg.common.utils.StringUtils;
 import com.ktg.common.utils.spring.SpringUtils;
@@ -22,9 +10,14 @@ import com.ktg.framework.web.service.TokenService;
 import com.ktg.system.domain.SysMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
 /**
  * websocket 消息处理
  *
@@ -32,8 +25,7 @@ import org.springframework.stereotype.Component;
  */
 @Component
 @ServerEndpoint("/websocket/message/{token}")
-public class WebSocketServer
-{
+public class WebSocketServer {
     /**
      * WebSocketServer 日志控制器
      */
@@ -47,25 +39,27 @@ public class WebSocketServer
 
     private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
 
-    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
+    /**
+     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+     */
     private static int onlineCount = 0;
 
     /**
      * concurrent包的线程安全set,用来存放每个客户端对应的MyWebSocket对象
      */
-    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap();
+    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap();
 
 
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
-    public void onOpen(Session session,@PathParam("token") String token) throws Exception
-    {
+    public void onOpen(Session session, @PathParam("token") String token) throws Exception {
         LOGGER.info("[连接token:{}] 建立连接, 当前连接数:{}", token, webSocketMap.size());
         boolean semaphoreFlag = false;
         //身份验证
-        if(!StringUtils.isNotNull(token)){
+        if (!StringUtils.isNotNull(token)) {
+            LOGGER.error("当前token为空!");
             session.close();
             return;
         }
@@ -73,27 +67,25 @@ public class WebSocketServer
         TokenService tokenService = SpringUtils.getBean(TokenService.class);
 
         LoginUser user = tokenService.getUserByToken(token);
-        if(!StringUtils.isNotNull(user)){
+        if (!StringUtils.isNotNull(user)) {
+            LOGGER.error("当前user不存在!");
             session.close();
             return;
         }
 
         // 尝试获取信号量
         semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
-        if (!semaphoreFlag)
-        {
+        if (!semaphoreFlag) {
             // 未获取到信号量
             LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
             WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
             session.close();
-        }
-        else
-        {
+        } else {
             if (webSocketMap.containsKey(token)) {
                 webSocketMap.remove(token);
-                webSocketMap.put(token,this);
+                webSocketMap.put(token, this);
             } else {
-                webSocketMap.put(token,this);
+                webSocketMap.put(token, this);
                 addOnlineCount();
             }
 
@@ -109,8 +101,7 @@ public class WebSocketServer
      * 连接关闭时处理
      */
     @OnClose
-    public void onClose(Session session)
-    {
+    public void onClose(Session session) {
         LOGGER.info("\n 关闭连接 - {}", session);
         // 移除用户
         WebSocketUsers.remove(session);
@@ -122,10 +113,8 @@ public class WebSocketServer
      * 抛出异常时处理
      */
     @OnError
-    public void onError(Session session, Throwable exception) throws Exception
-    {
-        if (session.isOpen())
-        {
+    public void onError(Session session, Throwable exception) throws Exception {
+        if (session.isOpen()) {
             // 关闭连接
             session.close();
         }
@@ -142,16 +131,16 @@ public class WebSocketServer
      * 服务器接收到客户端消息时调用的方法
      */
     @OnMessage
-    public void onMessage(String message, Session session)
-    {
-        if(!UserConstants.WEBSOCKET_HEARTBEAT.equals(message)){
-            try{
-                SysMessage msg = JSON.parseObject(message, new TypeReference<SysMessage>(){});
-                if(StringUtils.isNotNull(msg.getRecipientName())){
+    public void onMessage(String message, Session session) {
+        if (!UserConstants.WEBSOCKET_HEARTBEAT.equals(message)) {
+            try {
+                SysMessage msg = JSON.parseObject(message, new TypeReference<SysMessage>() {
+                });
+                if (StringUtils.isNotNull(msg.getRecipientName())) {
                     //这里必须传递username
-                    WebSocketUsers.sendMesssageToUserByName(msg.getRecipientName(),message);
+                    WebSocketUsers.sendMesssageToUserByName(msg.getRecipientName(), message);
                 }
-            }catch (Exception e){
+            } catch (Exception e) {
                 LOGGER.error("\n 错误的websocket信息格式 - {}", message);
             }
         }

+ 1 - 1
ktg-generator/src/main/resources/vm/java/controller.java.vm

@@ -121,7 +121,7 @@ public class ${ClassName}Controller extends BaseController
     @PreAuthorize("@ss.hasPermi('${permissionPrefix}:remove')")
     @Log(title = "${functionName}", businessType = BusinessType.DELETE)
 	@DeleteMapping("/{${pkColumn.javaField}s}")
-    public CommonResult<Boolean> deleteBy${pkColumn.capJavaField}s(@PathVariable ${pkColumn.javaType}[] ${pkColumn.javaField}s)
+    public CommonResult<Boolean> delete${ClassName}By${pkColumn.capJavaField}s(@PathVariable ${pkColumn.javaType}[] ${pkColumn.javaField}s)
     {
         return CommonResult.success(${className}Service.delete${ClassName}By${pkColumn.capJavaField}s(${pkColumn.javaField}s) == 1);
     }