From 7ed62f3b388e6786e47745129d41b5714c41b823 Mon Sep 17 00:00:00 2001 From: anthonywj Date: Thu, 3 Aug 2023 14:26:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ewebsocket=E9=80=9A=E4=BF=A1?= =?UTF-8?q?=EF=BC=88=E4=B8=AD=E7=BB=A7=E6=9C=8D=E5=8A=A1=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 19 ++- .../com/glxp/api/constant/SocketMsgType.java | 8 + .../sync/SpsSyncDownloadStatusController.java | 21 ++- .../api/controller/sync/SpsSyncWebSocket.java | 138 ++++++++++++++++++ .../api/controller/sync/TestController.java | 22 +++ .../glxp/api/entity/sync/SocketMsgEntity.java | 19 +++ .../service/sync/SpsSyncDownloadService.java | 11 ++ .../com/glxp/api/util/ByteArraySplitter.java | 47 ++++++ src/main/resources/application-dev.yml | 1 + src/main/resources/application-pro.yml | 2 + .../mybatis/mapper/purchase/SupProductDao.xml | 1 + 11 files changed, 282 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/glxp/api/constant/SocketMsgType.java create mode 100644 src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java create mode 100644 src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java create mode 100644 src/main/java/com/glxp/api/util/ByteArraySplitter.java diff --git a/pom.xml b/pom.xml index 70ec617e..e4a0e65e 100644 --- a/pom.xml +++ b/pom.xml @@ -54,12 +54,12 @@ - - - - - - + + + + + + org.springframework.boot @@ -348,6 +348,13 @@ knife4j-spring-boot-starter 2.0.9 + + + + org.java-websocket + Java-WebSocket + 1.5.4 + diff --git a/src/main/java/com/glxp/api/constant/SocketMsgType.java b/src/main/java/com/glxp/api/constant/SocketMsgType.java new file mode 100644 index 00000000..42d8f789 --- /dev/null +++ b/src/main/java/com/glxp/api/constant/SocketMsgType.java @@ -0,0 +1,8 @@ +package com.glxp.api.constant; + +public interface SocketMsgType { + + + String DL_ALL_DATA = "DL_ALL_DATA"; //生产入库 + String DL_NOTICE = "DL_NOTICE"; //通知类消息 +} diff --git a/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java b/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java index 5e9c3d4e..7b9f43f7 100644 --- a/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java +++ b/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java @@ -6,7 +6,9 @@ import com.glxp.api.annotation.Log; import com.glxp.api.common.res.BaseResponse; import com.glxp.api.common.util.ResultVOUtils; import com.glxp.api.constant.BusinessType; +import com.glxp.api.constant.SocketMsgType; import com.glxp.api.entity.sync.BasicDownloadStatusEntity; +import com.glxp.api.entity.sync.SocketMsgEntity; import com.glxp.api.req.sync.BasicDownloadRequest; import com.glxp.api.req.sync.SpsSyncDataRequest; import com.glxp.api.req.system.DeleteRequest; @@ -14,6 +16,7 @@ import com.glxp.api.res.PageSimpleResponse; import com.glxp.api.service.sync.BasicDownloadService; import com.glxp.api.util.RedisUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -31,13 +34,29 @@ public class SpsSyncDownloadStatusController { BasicDownloadService basicDownloadService; @Resource RedisUtil redisUtil; - + @Value("${WEBSOCKET_TOKEN}") + private String socketToken; + @Resource + SpsSyncWebSocket spsSyncWebSocket; @GetMapping("/udispsync/sync/testConnection") public BaseResponse connectTest(SpsSyncDataRequest spsSyncDataRequest) { return ResultVOUtils.success("连接成功"); } + /** + * 立即同步 + * 通知中继服务立即下载数据 + */ + @AuthRuleAnnotation("") + @GetMapping("/spssync/download/now") + public BaseResponse noticeDlNow() { + SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build(); + spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken); + return ResultVOUtils.success("已通知中级服务下载待同步数据!"); + } + + @AuthRuleAnnotation("") @GetMapping("/spssync/download/basic/udiinfo/getStatus") public BaseResponse getStatus(BasicDownloadRequest basicDownloadRequest) { diff --git a/src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java b/src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java new file mode 100644 index 00000000..4109e7cc --- /dev/null +++ b/src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java @@ -0,0 +1,138 @@ +package com.glxp.api.controller.sync; + +import cn.hutool.core.util.StrUtil; +import com.glxp.api.entity.sync.SocketMsgEntity; +import com.glxp.api.exception.ServiceException; +import com.glxp.api.util.ByteArraySplitter; +import com.glxp.api.util.JsonUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +@Component +@ServerEndpoint(value = "/spms/sync/{type}/{token}") +@Slf4j +public class SpsSyncWebSocket { + + //与某个客户端的连接会话,需要通过它来给客户端发送数据 + private Session session; + @Value("${WEBSOCKET_TOKEN}") + private String socketToken; + + private static int onlineCount = 0; + + private static ConcurrentHashMap socketServersMap = new ConcurrentHashMap<>(); + + private String key = ""; + + + /** + * 建立连接 + * + * @param session + * @param type type为register/verify + */ + @OnOpen + public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) { + this.session = session; + key = type + ":" + token + ":" + session.getId(); + log.info("onOpen->>key:{}", key); + boolean flag = socketServersMap.containsKey(key); + if (flag) { + socketServersMap.remove(key); + socketServersMap.put(key, this); + } else { + socketServersMap.put(key, this); + addOnlineCount(); + } +// if (StrUtil.isEmpty(token) || !token.equals(socketToken)) { +// throw new ServiceException("token验证异常"); +// } + log.info("online number:{}", getOnlineCount()); + } + + @OnMessage + public void onMessage(String message, Session session) { + log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId()); + } + + @OnClose + public void onClose() { + log.info("onClose : {}" + key); + if (socketServersMap.containsKey(key)) { + socketServersMap.remove(key); + subOnlineCount(); //在线数减1 + } + log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount()); + } + + @OnError + public void onError(Throwable error) { + socketServersMap.remove(key); + subOnlineCount(); + log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage()); + } + + + /** + * @param socketMsgEntity 数据推送,只做通知消息推送,数据量不宜过大 + * @param sessionId token + */ + public synchronized void sendMessage(SocketMsgEntity socketMsgEntity, String sessionId) { + synchronized (this.getClass()) { + for (Map.Entry stringMyWebSocketEntry : socketServersMap.entrySet()) { + try { + String key = stringMyWebSocketEntry.getKey(); + SpsSyncWebSocket value = stringMyWebSocketEntry.getValue(); + if (key.contains(sessionId)) { + log.info("推送的消息为:" + key); + value.session.getBasicRemote().sendText(JsonUtils.toJsonString(socketMsgEntity)); +// List result = ByteArraySplitter.split(message, 64 * 1024); +// for (byte[] bytes : result) { +// String s = new String(bytes); +// value.session.getBasicRemote().sendText(s); +// } + // 推送结束符 +// value.session.getBasicRemote().sendText("#end#"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + + /** + * 获取在线数 + * + * @return + */ + public static synchronized int getOnlineCount() { + return onlineCount; + } + + /** + * 在线数+1 + */ + public static synchronized void addOnlineCount() { + SpsSyncWebSocket.onlineCount++; + } + + /** + * 在线数-1 + */ + public static synchronized void subOnlineCount() { + SpsSyncWebSocket.onlineCount--; + } +} + diff --git a/src/main/java/com/glxp/api/controller/sync/TestController.java b/src/main/java/com/glxp/api/controller/sync/TestController.java index a1837fdb..4c63bbe2 100644 --- a/src/main/java/com/glxp/api/controller/sync/TestController.java +++ b/src/main/java/com/glxp/api/controller/sync/TestController.java @@ -1,17 +1,24 @@ package com.glxp.api.controller.sync; import cn.hutool.core.util.StrUtil; +import com.github.pagehelper.PageInfo; import com.glxp.api.annotation.Log; import com.glxp.api.common.res.BaseResponse; import com.glxp.api.common.util.ResultVOUtils; import com.glxp.api.constant.BusinessType; +import com.glxp.api.constant.SocketMsgType; +import com.glxp.api.entity.sync.SocketMsgEntity; import com.glxp.api.idc.utils.IDCUtils; +import com.glxp.api.res.inout.IoCodeResponse; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; +import javax.annotation.Resource; import java.io.File; import java.util.List; @@ -19,6 +26,11 @@ import java.util.List; @RestController public class TestController { + @Value("${WEBSOCKET_TOKEN}") + private String socketToken; + @Resource + SpsSyncWebSocket spsSyncWebSocket; + @PostMapping("/test/file/uplaod") @Log(title = "测试上传数据", businessType = BusinessType.INSERT) public BaseResponse uploadProducts(@RequestParam("file") List files, @RequestParam("key") String key) { @@ -44,4 +56,14 @@ public class TestController { } return ResultVOUtils.success("上传成功"); } + + + @GetMapping("/testWebSocket") + public BaseResponse testWebSocket(String message) { + SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content(message).remark("下载基础信息").build(); + spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken); +// spsSyncWebSocket.sendMessage(message, "1:07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b"); + return ResultVOUtils.success(socketMsgEntity); + } + } diff --git a/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java b/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java new file mode 100644 index 00000000..0d9eb9ae --- /dev/null +++ b/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java @@ -0,0 +1,19 @@ +package com.glxp.api.entity.sync; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * websocket 消息体 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class SocketMsgEntity { + private String type; + private String content; + private String remark; +} diff --git a/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java b/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java index 1beeda9c..a5cbdaf9 100644 --- a/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java +++ b/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java @@ -9,12 +9,14 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.glxp.api.common.res.BaseResponse; import com.glxp.api.common.util.ResultVOUtils; import com.glxp.api.constant.*; +import com.glxp.api.controller.sync.SpsSyncWebSocket; import com.glxp.api.dao.basic.BasicProductsDao; import com.glxp.api.entity.basic.*; import com.glxp.api.entity.inout.*; import com.glxp.api.entity.purchase.*; import com.glxp.api.entity.sync.BasicExportStatusEntity; import com.glxp.api.entity.sync.BasicExportStatusTimeEntity; +import com.glxp.api.entity.sync.SocketMsgEntity; import com.glxp.api.entity.sync.SyncDataBustypeEntity; import com.glxp.api.entity.thrsys.ThrBusTypeOriginEntity; import com.glxp.api.req.basic.ProductInfoFilterRequest; @@ -130,6 +132,8 @@ public class SpsSyncDownloadService { } } basicExportService.insertExportStatus(orderStatusEntity); + SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).remark("下载基础信息").build(); + spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken); } } } @@ -466,6 +470,9 @@ public class SpsSyncDownloadService { // 异常回滚 this.exportTimeRollback(totalTimeMap, exportType, fileFullPath); } + SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).remark("下载基础信息").build(); + spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken); + return true; } catch (IOException e) { logger.error(String.format("syncIdcSps----process------------生成[%s]文件及更改库操作异常,异常信息<%s>" @@ -481,6 +488,10 @@ public class SpsSyncDownloadService { } } + @Value("${WEBSOCKET_TOKEN}") + private String socketToken; + @Resource + SpsSyncWebSocket spsSyncWebSocket; private final IoCodeLostService ioCodeLostService; diff --git a/src/main/java/com/glxp/api/util/ByteArraySplitter.java b/src/main/java/com/glxp/api/util/ByteArraySplitter.java new file mode 100644 index 00000000..cb6a1d21 --- /dev/null +++ b/src/main/java/com/glxp/api/util/ByteArraySplitter.java @@ -0,0 +1,47 @@ +package com.glxp.api.util; + +import cn.hutool.core.util.ArrayUtil; +import com.beust.jcommander.internal.Lists; + +import java.util.List; + +public class ByteArraySplitter { + + /** + * 对String分片转换为List + * + * @param source 字符串 + * @param size 分片的长度 单位字节 + * @return + */ + public static List split(String source, int size) { + // 存放最终结果 + List result = Lists.newArrayList(); + + if (StringUtils.isEmpty(source)) { + return null; + } + + byte[] sourceBytes = source.getBytes(); + if (size > sourceBytes.length) { + result.add(sourceBytes); + return result; + } + // 开始进行split + int startIndex = 0; + int endIndex = sourceBytes.length - 1; + boolean isRunning = true; + while (isRunning) { + if ((endIndex + 1) - startIndex > size) { + result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size)); + startIndex += size; + } else { + result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1)); + isRunning = false; + } + } + return result; + } +} + + diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 83193ca0..bd845221 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -53,3 +53,4 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs API_KEY: 1101 API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L WEB_TITLE: 平潭协和医院 +WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b diff --git a/src/main/resources/application-pro.yml b/src/main/resources/application-pro.yml index fbed1b88..7813e7d6 100644 --- a/src/main/resources/application-pro.yml +++ b/src/main/resources/application-pro.yml @@ -53,3 +53,5 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs API_KEY: 1101 API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L WEB_TITLE: 漳州市中医院 + +WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b diff --git a/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml b/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml index b6457c82..779763cd 100644 --- a/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml +++ b/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml @@ -396,5 +396,6 @@ INNER JOIN sup_manufacturer on sup_product.manufacturerIdFk = sup_manufacturer.manufacturerId WHERE sup_manufacturer.companyName = #{manufactory} and sup_product.recordProductName = #{cpmctymc} + limit 1