diff --git a/pom.xml b/pom.xml
index 70ec617e..e4a0e65e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,12 +54,12 @@
-
-
-
-
-
-
+
+
+
+
+
+
org.springframework.boot
@@ -348,6 +348,13 @@
knife4j-spring-boot-starter
2.0.9
+
+
+
+ org.java-websocket
+ Java-WebSocket
+ 1.5.4
+
diff --git a/src/main/java/com/glxp/api/constant/SocketMsgType.java b/src/main/java/com/glxp/api/constant/SocketMsgType.java
new file mode 100644
index 00000000..42d8f789
--- /dev/null
+++ b/src/main/java/com/glxp/api/constant/SocketMsgType.java
@@ -0,0 +1,8 @@
+package com.glxp.api.constant;
+
+public interface SocketMsgType {
+
+
+ String DL_ALL_DATA = "DL_ALL_DATA"; //生产入库
+ String DL_NOTICE = "DL_NOTICE"; //通知类消息
+}
diff --git a/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java b/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java
index 5e9c3d4e..7b9f43f7 100644
--- a/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java
+++ b/src/main/java/com/glxp/api/controller/sync/SpsSyncDownloadStatusController.java
@@ -6,7 +6,9 @@ import com.glxp.api.annotation.Log;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.common.util.ResultVOUtils;
import com.glxp.api.constant.BusinessType;
+import com.glxp.api.constant.SocketMsgType;
import com.glxp.api.entity.sync.BasicDownloadStatusEntity;
+import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.req.sync.BasicDownloadRequest;
import com.glxp.api.req.sync.SpsSyncDataRequest;
import com.glxp.api.req.system.DeleteRequest;
@@ -14,6 +16,7 @@ import com.glxp.api.res.PageSimpleResponse;
import com.glxp.api.service.sync.BasicDownloadService;
import com.glxp.api.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -31,13 +34,29 @@ public class SpsSyncDownloadStatusController {
BasicDownloadService basicDownloadService;
@Resource
RedisUtil redisUtil;
-
+ @Value("${WEBSOCKET_TOKEN}")
+ private String socketToken;
+ @Resource
+ SpsSyncWebSocket spsSyncWebSocket;
@GetMapping("/udispsync/sync/testConnection")
public BaseResponse connectTest(SpsSyncDataRequest spsSyncDataRequest) {
return ResultVOUtils.success("连接成功");
}
+ /**
+ * 立即同步
+ * 通知中继服务立即下载数据
+ */
+ @AuthRuleAnnotation("")
+ @GetMapping("/spssync/download/now")
+ public BaseResponse noticeDlNow() {
+ SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build();
+ spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
+ return ResultVOUtils.success("已通知中级服务下载待同步数据!");
+ }
+
+
@AuthRuleAnnotation("")
@GetMapping("/spssync/download/basic/udiinfo/getStatus")
public BaseResponse getStatus(BasicDownloadRequest basicDownloadRequest) {
diff --git a/src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java b/src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java
new file mode 100644
index 00000000..4109e7cc
--- /dev/null
+++ b/src/main/java/com/glxp/api/controller/sync/SpsSyncWebSocket.java
@@ -0,0 +1,138 @@
+package com.glxp.api.controller.sync;
+
+import cn.hutool.core.util.StrUtil;
+import com.glxp.api.entity.sync.SocketMsgEntity;
+import com.glxp.api.exception.ServiceException;
+import com.glxp.api.util.ByteArraySplitter;
+import com.glxp.api.util.JsonUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+@Component
+@ServerEndpoint(value = "/spms/sync/{type}/{token}")
+@Slf4j
+public class SpsSyncWebSocket {
+
+ //与某个客户端的连接会话,需要通过它来给客户端发送数据
+ private Session session;
+ @Value("${WEBSOCKET_TOKEN}")
+ private String socketToken;
+
+ private static int onlineCount = 0;
+
+ private static ConcurrentHashMap socketServersMap = new ConcurrentHashMap<>();
+
+ private String key = "";
+
+
+ /**
+ * 建立连接
+ *
+ * @param session
+ * @param type type为register/verify
+ */
+ @OnOpen
+ public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) {
+ this.session = session;
+ key = type + ":" + token + ":" + session.getId();
+ log.info("onOpen->>key:{}", key);
+ boolean flag = socketServersMap.containsKey(key);
+ if (flag) {
+ socketServersMap.remove(key);
+ socketServersMap.put(key, this);
+ } else {
+ socketServersMap.put(key, this);
+ addOnlineCount();
+ }
+// if (StrUtil.isEmpty(token) || !token.equals(socketToken)) {
+// throw new ServiceException("token验证异常");
+// }
+ log.info("online number:{}", getOnlineCount());
+ }
+
+ @OnMessage
+ public void onMessage(String message, Session session) {
+ log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId());
+ }
+
+ @OnClose
+ public void onClose() {
+ log.info("onClose : {}" + key);
+ if (socketServersMap.containsKey(key)) {
+ socketServersMap.remove(key);
+ subOnlineCount(); //在线数减1
+ }
+ log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount());
+ }
+
+ @OnError
+ public void onError(Throwable error) {
+ socketServersMap.remove(key);
+ subOnlineCount();
+ log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage());
+ }
+
+
+ /**
+ * @param socketMsgEntity 数据推送,只做通知消息推送,数据量不宜过大
+ * @param sessionId token
+ */
+ public synchronized void sendMessage(SocketMsgEntity socketMsgEntity, String sessionId) {
+ synchronized (this.getClass()) {
+ for (Map.Entry stringMyWebSocketEntry : socketServersMap.entrySet()) {
+ try {
+ String key = stringMyWebSocketEntry.getKey();
+ SpsSyncWebSocket value = stringMyWebSocketEntry.getValue();
+ if (key.contains(sessionId)) {
+ log.info("推送的消息为:" + key);
+ value.session.getBasicRemote().sendText(JsonUtils.toJsonString(socketMsgEntity));
+// List result = ByteArraySplitter.split(message, 64 * 1024);
+// for (byte[] bytes : result) {
+// String s = new String(bytes);
+// value.session.getBasicRemote().sendText(s);
+// }
+ // 推送结束符
+// value.session.getBasicRemote().sendText("#end#");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+
+ /**
+ * 获取在线数
+ *
+ * @return
+ */
+ public static synchronized int getOnlineCount() {
+ return onlineCount;
+ }
+
+ /**
+ * 在线数+1
+ */
+ public static synchronized void addOnlineCount() {
+ SpsSyncWebSocket.onlineCount++;
+ }
+
+ /**
+ * 在线数-1
+ */
+ public static synchronized void subOnlineCount() {
+ SpsSyncWebSocket.onlineCount--;
+ }
+}
+
diff --git a/src/main/java/com/glxp/api/controller/sync/TestController.java b/src/main/java/com/glxp/api/controller/sync/TestController.java
index a1837fdb..4c63bbe2 100644
--- a/src/main/java/com/glxp/api/controller/sync/TestController.java
+++ b/src/main/java/com/glxp/api/controller/sync/TestController.java
@@ -1,17 +1,24 @@
package com.glxp.api.controller.sync;
import cn.hutool.core.util.StrUtil;
+import com.github.pagehelper.PageInfo;
import com.glxp.api.annotation.Log;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.common.util.ResultVOUtils;
import com.glxp.api.constant.BusinessType;
+import com.glxp.api.constant.SocketMsgType;
+import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.idc.utils.IDCUtils;
+import com.glxp.api.res.inout.IoCodeResponse;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
+import javax.annotation.Resource;
import java.io.File;
import java.util.List;
@@ -19,6 +26,11 @@ import java.util.List;
@RestController
public class TestController {
+ @Value("${WEBSOCKET_TOKEN}")
+ private String socketToken;
+ @Resource
+ SpsSyncWebSocket spsSyncWebSocket;
+
@PostMapping("/test/file/uplaod")
@Log(title = "测试上传数据", businessType = BusinessType.INSERT)
public BaseResponse uploadProducts(@RequestParam("file") List files, @RequestParam("key") String key) {
@@ -44,4 +56,14 @@ public class TestController {
}
return ResultVOUtils.success("上传成功");
}
+
+
+ @GetMapping("/testWebSocket")
+ public BaseResponse testWebSocket(String message) {
+ SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content(message).remark("下载基础信息").build();
+ spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
+// spsSyncWebSocket.sendMessage(message, "1:07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b");
+ return ResultVOUtils.success(socketMsgEntity);
+ }
+
}
diff --git a/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java b/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java
new file mode 100644
index 00000000..0d9eb9ae
--- /dev/null
+++ b/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java
@@ -0,0 +1,19 @@
+package com.glxp.api.entity.sync;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * websocket 消息体
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SocketMsgEntity {
+ private String type;
+ private String content;
+ private String remark;
+}
diff --git a/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java b/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java
index 1beeda9c..a5cbdaf9 100644
--- a/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java
+++ b/src/main/java/com/glxp/api/service/sync/SpsSyncDownloadService.java
@@ -9,12 +9,14 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.common.util.ResultVOUtils;
import com.glxp.api.constant.*;
+import com.glxp.api.controller.sync.SpsSyncWebSocket;
import com.glxp.api.dao.basic.BasicProductsDao;
import com.glxp.api.entity.basic.*;
import com.glxp.api.entity.inout.*;
import com.glxp.api.entity.purchase.*;
import com.glxp.api.entity.sync.BasicExportStatusEntity;
import com.glxp.api.entity.sync.BasicExportStatusTimeEntity;
+import com.glxp.api.entity.sync.SocketMsgEntity;
import com.glxp.api.entity.sync.SyncDataBustypeEntity;
import com.glxp.api.entity.thrsys.ThrBusTypeOriginEntity;
import com.glxp.api.req.basic.ProductInfoFilterRequest;
@@ -130,6 +132,8 @@ public class SpsSyncDownloadService {
}
}
basicExportService.insertExportStatus(orderStatusEntity);
+ SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).remark("下载基础信息").build();
+ spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
}
}
}
@@ -466,6 +470,9 @@ public class SpsSyncDownloadService {
// 异常回滚
this.exportTimeRollback(totalTimeMap, exportType, fileFullPath);
}
+ SocketMsgEntity socketMsgEntity = SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).remark("下载基础信息").build();
+ spsSyncWebSocket.sendMessage(socketMsgEntity, "1:" + socketToken);
+
return true;
} catch (IOException e) {
logger.error(String.format("syncIdcSps----process------------生成[%s]文件及更改库操作异常,异常信息<%s>"
@@ -481,6 +488,10 @@ public class SpsSyncDownloadService {
}
}
+ @Value("${WEBSOCKET_TOKEN}")
+ private String socketToken;
+ @Resource
+ SpsSyncWebSocket spsSyncWebSocket;
private final IoCodeLostService ioCodeLostService;
diff --git a/src/main/java/com/glxp/api/util/ByteArraySplitter.java b/src/main/java/com/glxp/api/util/ByteArraySplitter.java
new file mode 100644
index 00000000..cb6a1d21
--- /dev/null
+++ b/src/main/java/com/glxp/api/util/ByteArraySplitter.java
@@ -0,0 +1,47 @@
+package com.glxp.api.util;
+
+import cn.hutool.core.util.ArrayUtil;
+import com.beust.jcommander.internal.Lists;
+
+import java.util.List;
+
+public class ByteArraySplitter {
+
+ /**
+ * 对String分片转换为List
+ *
+ * @param source 字符串
+ * @param size 分片的长度 单位字节
+ * @return
+ */
+ public static List split(String source, int size) {
+ // 存放最终结果
+ List result = Lists.newArrayList();
+
+ if (StringUtils.isEmpty(source)) {
+ return null;
+ }
+
+ byte[] sourceBytes = source.getBytes();
+ if (size > sourceBytes.length) {
+ result.add(sourceBytes);
+ return result;
+ }
+ // 开始进行split
+ int startIndex = 0;
+ int endIndex = sourceBytes.length - 1;
+ boolean isRunning = true;
+ while (isRunning) {
+ if ((endIndex + 1) - startIndex > size) {
+ result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size));
+ startIndex += size;
+ } else {
+ result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1));
+ isRunning = false;
+ }
+ }
+ return result;
+ }
+}
+
+
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 83193ca0..bd845221 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -53,3 +53,4 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 平潭协和医院
+WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b
diff --git a/src/main/resources/application-pro.yml b/src/main/resources/application-pro.yml
index fbed1b88..7813e7d6 100644
--- a/src/main/resources/application-pro.yml
+++ b/src/main/resources/application-pro.yml
@@ -53,3 +53,5 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 漳州市中医院
+
+WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b
diff --git a/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml b/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml
index b6457c82..779763cd 100644
--- a/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml
+++ b/src/main/resources/mybatis/mapper/purchase/SupProductDao.xml
@@ -396,5 +396,6 @@
INNER JOIN sup_manufacturer on sup_product.manufacturerIdFk = sup_manufacturer.manufacturerId
WHERE sup_manufacturer.companyName = #{manufactory}
and sup_product.recordProductName = #{cpmctymc}
+ limit 1