From 1476a0f894316b73de8bc9d908dd78e9085b3876 Mon Sep 17 00:00:00 2001 From: anthonywj Date: Thu, 3 Aug 2023 14:25:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ewebsocket=E9=80=9A=E4=BF=A1?= =?UTF-8?q?=EF=BC=88=E4=B8=AD=E7=BB=A7=E6=9C=8D=E5=8A=A1=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/glxp/api/constant/SocketMsgType.java | 16 +++ .../glxp/api/entity/sync/SocketMsgEntity.java | 20 +++ .../api/service/sync/SocketMsgService.java | 40 ++++++ .../api/service/sync/SpsWebSocketClient.java | 120 ++++++++++++++++++ .../api/service/sync/WebSocketComponent.java | 93 ++++++++++++++ .../com/glxp/api/task/SyncHeartService.java | 105 +++++++++++++++ .../java/com/glxp/api/task/SyncHeartTask.java | 103 +++------------ src/main/resources/application-dev.yml | 4 +- src/main/resources/application-pro.yml | 2 +- src/main/resources/application-test.yml | 2 +- src/main/resources/application-wmd.yml | 2 +- 11 files changed, 418 insertions(+), 89 deletions(-) create mode 100644 src/main/java/com/glxp/api/constant/SocketMsgType.java create mode 100644 src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java create mode 100644 src/main/java/com/glxp/api/service/sync/SocketMsgService.java create mode 100644 src/main/java/com/glxp/api/service/sync/SpsWebSocketClient.java create mode 100644 src/main/java/com/glxp/api/service/sync/WebSocketComponent.java create mode 100644 src/main/java/com/glxp/api/task/SyncHeartService.java diff --git a/src/main/java/com/glxp/api/constant/SocketMsgType.java b/src/main/java/com/glxp/api/constant/SocketMsgType.java new file mode 100644 index 000000000..d259b0b47 --- /dev/null +++ b/src/main/java/com/glxp/api/constant/SocketMsgType.java @@ -0,0 +1,16 @@ +package com.glxp.api.constant; + +public interface SocketMsgType { + + + String DL_ALL_DATA = "DL_ALL_DATA"; //生产入库 + String DL_NOTICE = "DL_NOTICE"; //通知类消息 + + + /** + * 任务类型 + */ + + String TASK_INVOICE_CONFIRM = "TASK_INVOICE_CONFIRM"; //发票确认 + +} diff --git a/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java b/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java new file mode 100644 index 000000000..bfe3e5cef --- /dev/null +++ b/src/main/java/com/glxp/api/entity/sync/SocketMsgEntity.java @@ -0,0 +1,20 @@ +package com.glxp.api.entity.sync; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * websocket 消息体 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class SocketMsgEntity { + + private String type; + private String content; + private String remark; +} diff --git a/src/main/java/com/glxp/api/service/sync/SocketMsgService.java b/src/main/java/com/glxp/api/service/sync/SocketMsgService.java new file mode 100644 index 000000000..9c9b5f6c0 --- /dev/null +++ b/src/main/java/com/glxp/api/service/sync/SocketMsgService.java @@ -0,0 +1,40 @@ +package com.glxp.api.service.sync; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.TypeReference; +import com.glxp.api.common.res.BaseResponse; +import com.glxp.api.constant.SocketMsgType; +import com.glxp.api.entity.sync.SocketMsgEntity; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class SocketMsgService { + + @Value("${SPMS_WEBSOCKET_TOKEN}") + String token; + + @Async + public void dealNoticeMsg(SocketMsgEntity socketMsgEntity) { + if (socketMsgEntity != null && StrUtil.isNotEmpty(socketMsgEntity.getType())) { + switch (socketMsgEntity.getType()) { + case SocketMsgType.TASK_INVOICE_CONFIRM: + invoiceConfirm(socketMsgEntity); + break; + default: + break; + } + } + + } + + + //发票确认 + public void invoiceConfirm(SocketMsgEntity socketMsgEntity) { + + } +} diff --git a/src/main/java/com/glxp/api/service/sync/SpsWebSocketClient.java b/src/main/java/com/glxp/api/service/sync/SpsWebSocketClient.java new file mode 100644 index 000000000..9ae9b5069 --- /dev/null +++ b/src/main/java/com/glxp/api/service/sync/SpsWebSocketClient.java @@ -0,0 +1,120 @@ +package com.glxp.api.service.sync; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.glxp.api.constant.SocketMsgType; +import com.glxp.api.entity.sync.SocketMsgEntity; +import org.java_websocket.WebSocket; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.framing.Framedata; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Resource; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + + +public class SpsWebSocketClient extends WebSocketClient { + + private String excptMessage; + + Logger log = LoggerFactory.getLogger(SpsWebSocketClient.class); + + private static List list = new ArrayList<>(); + SocketMsgService socketMsgService; + + public void setSocketMsgService(SocketMsgService socketMsgService) { + this.socketMsgService = socketMsgService; + } + + public SpsWebSocketClient(String serverUri) throws URISyntaxException { + super(new URI(serverUri)); + this.setConnectionLostTimeout(0); + if (list.isEmpty()) { + return; + } + for (SpsWebSocketClient client : list) { + client.close(); + } + list.clear(); + list.add(this); + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + log.info("在线日志socket连接成功"); + } + + @Override + public void onMessage(String s) { + log.info("收到消息:" + s); + if (StrUtil.isNotEmpty(s)) { + SocketMsgEntity socketMsgEntity = JSONObject.parseObject(s, SocketMsgEntity.class); + //收到更新下载数据则赋值由外部处理,多个任务只处理一次避免重复处理 + if (socketMsgEntity.getType().equals(SocketMsgType.DL_ALL_DATA)) { + this.excptMessage = s; + } else { + //通知类消息则需转发 + socketMsgService.dealNoticeMsg(socketMsgEntity); + } + } else { + log.error("无法识别消息!"); + } + + + } + + @Override + public void onClose(int i, String s, boolean b) { + log.info("在线日志socket断开"); + } + + @Override + public void onError(Exception e) { + e.printStackTrace(); + } + + @Override + public void onWebsocketPing(WebSocket conn, Framedata f) { + try { + Thread.sleep(1000 * 5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + send("---pong---"); + } + + @Deprecated + public static void destroy() { + if (list.isEmpty()) + return; + for (SpsWebSocketClient client : list) { + client.close(); + } + list.clear(); + + } + + //发送消息 + public void sendMessage(String message) { + this.send(message); + System.out.println("已发送消息:" + message); + } + + //获取接收到的信息 + public String getExcptMessage() { + if (excptMessage != null) { + String message = new String(excptMessage); + excptMessage = null; + return message; + } + return null; + } + + +} + diff --git a/src/main/java/com/glxp/api/service/sync/WebSocketComponent.java b/src/main/java/com/glxp/api/service/sync/WebSocketComponent.java new file mode 100644 index 000000000..a380b32c1 --- /dev/null +++ b/src/main/java/com/glxp/api/service/sync/WebSocketComponent.java @@ -0,0 +1,93 @@ +package com.glxp.api.service.sync; + +import com.glxp.api.constant.SocketMsgType; +import com.glxp.api.dao.system.SyncDataSetDao; +import com.glxp.api.entity.system.SyncDataSetEntity; +import com.glxp.api.service.system.SystemParamConfigService; +import com.glxp.api.task.SyncHeartService; +import lombok.extern.slf4j.Slf4j; +import org.java_websocket.enums.ReadyState; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.net.URISyntaxException; + +@Slf4j +@Component +public class WebSocketComponent implements CommandLineRunner { + + @Value("${SPMS_WEBSOCKET_TOKEN}") + private String socketToken; + + @Resource + SyncHeartService heartTaskService; + @Resource + private SyncDataSetDao syncDataSetDao; + @Resource + SocketMsgService socketMsgService; + + @Override + public void run(String... args) throws Exception { + + SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet(); + String ip = syncDataSetEntity.getSyncIp(); + ip = ip.replace("http://", ""); + try { + SpsWebSocketClient client = new SpsWebSocketClient("ws://" + ip + "/sps/web/sync/2/" + socketToken); + client.setSocketMsgService(socketMsgService); + initConnect(client); + //等待服务端响应 + while (!client.getReadyState().equals(ReadyState.OPEN)) { + log.info("连接中···请稍后"); + Thread.sleep(1000); + } + //等待WebSocket服务端响应 + String message = null; + while (true) { + while ((message = client.getExcptMessage()) == null) { + log.info("已连接,等待接收数据--------"); + Thread.sleep(1000); + if (client.isClosed()) { + initConnect(client); + } + } + if (message.equals(SocketMsgType.DL_ALL_DATA)) { + heartTaskService.pullData(); + } + //打印服务端返回的数据 + log.info("成功获取数据:" + message); + } + + + } catch (URISyntaxException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + public void initConnect(SpsWebSocketClient client) { + if (client == null && client.isOpen()) { + log.info("WebSocket已连接,不需要重连"); + return; + } + log.info("重新建立连接"); + client.connect(); + while (!client.getReadyState().equals(ReadyState.OPEN)) { + log.info("连接中···请稍后"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (client.getReadyState().equals(ReadyState.OPEN)) { + return; + } else if (client.getReadyState().equals(ReadyState.CLOSED)) { + client.reconnect(); + } + } + } +} diff --git a/src/main/java/com/glxp/api/task/SyncHeartService.java b/src/main/java/com/glxp/api/task/SyncHeartService.java new file mode 100644 index 000000000..829dc4552 --- /dev/null +++ b/src/main/java/com/glxp/api/task/SyncHeartService.java @@ -0,0 +1,105 @@ +package com.glxp.api.task; + +import com.glxp.api.constant.BasicExportTypeEnum; +import com.glxp.api.dao.system.SyncDataSetDao; +import com.glxp.api.entity.system.SyncDataSetEntity; +import com.glxp.api.service.sync.HeartService; +import com.glxp.api.util.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Arrays; + +@Slf4j +@Service +public class SyncHeartService { + + @Resource + RedisUtil redisUtil; + @Resource + HeartService heartService; + @Resource + private SyncDataSetDao syncDataSetDao; + + public void syncProcess() { + //查询数据同步设置 + pushData(); + pushOrder(); + pullData(); + } + + public void pushData() { + SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet(); + if (!syncDataSetEntity.isDownstreamEnable()) { + return; + } + //定时上传最近更新基础数据至上游轮询时间 + long timeInterval1 = syncDataSetEntity.getSyncTime() * 6 * 1000L; + long curTime1 = System.currentTimeMillis(); + Long lastTime1 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_DATA"); + if (lastTime1 == null) { + lastTime1 = System.currentTimeMillis(); + redisUtil.set("SPS_SYNC_UPLOAD_DATA", lastTime1); + } + try { + if (curTime1 - lastTime1 > timeInterval1) { + Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { + heartService.pushData(syncDataSetEntity, null, i); + }); + redisUtil.set("SPS_SYNC_UPLOAD_DATA", curTime1); + } + } catch (Exception e) { + log.error(ExceptionUtils.getStackTrace(e)); + e.printStackTrace(); + + } + } + + public void pushOrder() { + SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet(); + if (!syncDataSetEntity.isDownstreamEnable()) { + return; + } + //定时上传最近更新单据数据至上游轮询时间 + long timeInterval2 = syncDataSetEntity.getOrderSyncTime() * 6 * 1000L; + long curTime2 = System.currentTimeMillis(); + Long lastTime2 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_ORDER"); + if (lastTime2 == null) { + lastTime2 = System.currentTimeMillis(); + redisUtil.set("SPS_SYNC_UPLOAD_ORDER", lastTime2); + } + try { + if (curTime2 - lastTime2 > timeInterval2) { + heartService.uploadAllOrder(null); + heartService.uploadAllBusOrder(null); + redisUtil.set("SPS_SYNC_UPLOAD_ORDER", curTime2); + } + } catch (Exception e) { + log.error(ExceptionUtils.getStackTrace(e)); + e.printStackTrace(); + } + } + + public void pullData() { + try { + heartService.dlAllOrder(); + } catch (Exception e) { + e.printStackTrace(); + log.error(ExceptionUtils.getStackTrace(e)); + } + try { + heartService.dlAllDiProducts(); + } catch (Exception e) { + e.printStackTrace(); + log.error(ExceptionUtils.getStackTrace(e)); + } + Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { + heartService.pullData(i); + }); + redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", System.currentTimeMillis()); + } + +} + diff --git a/src/main/java/com/glxp/api/task/SyncHeartTask.java b/src/main/java/com/glxp/api/task/SyncHeartTask.java index aaf5e9027..78cb299df 100644 --- a/src/main/java/com/glxp/api/task/SyncHeartTask.java +++ b/src/main/java/com/glxp/api/task/SyncHeartTask.java @@ -10,8 +10,6 @@ import com.glxp.api.service.sync.HeartService; import com.glxp.api.util.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; @@ -26,10 +24,11 @@ import java.util.Arrays; @EnableScheduling public class SyncHeartTask implements SchedulingConfigurer { - final Logger logger = LoggerFactory.getLogger(SyncHeartTask.class); @Resource protected ScheduledDao scheduledDao; @Resource + SyncHeartService syncHeartService; + @Resource RedisUtil redisUtil; @Resource HeartService heartService; @@ -54,91 +53,25 @@ public class SyncHeartTask implements SchedulingConfigurer { private void process() { - log.info("数据同步心跳--"); - //查询数据同步设置 - SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet(); - if (syncDataSetEntity.isDownstreamEnable()) { - - - //定时上传最近更新基础数据至上游轮询时间 - long timeInterval1 = syncDataSetEntity.getSyncTime() * 6 * 1000L; - long curTime1 = System.currentTimeMillis(); - Long lastTime1 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_DATA"); - if (lastTime1 == null) { - lastTime1 = System.currentTimeMillis(); - redisUtil.set("SPS_SYNC_UPLOAD_DATA", lastTime1); - } - try { - if (curTime1 - lastTime1 > timeInterval1) { -// heartService.uploadAllBus(null); -// heartService.uploadAllUserData(null); - // heartService.uploadScheduleList(); -// heartService.uploadThrData(null); -// heartService.uploadThrProducts(null); -// heartService.uploadSysSetting(null); - Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { - heartService.pushData(syncDataSetEntity, null, i); - }); - redisUtil.set("SPS_SYNC_UPLOAD_DATA", curTime1); - } - } catch (Exception e) { - log.error(ExceptionUtils.getStackTrace(e)); - e.printStackTrace(); - - } - - - //定时上传最近更新单据数据至上游轮询时间 - long timeInterval2 = syncDataSetEntity.getOrderSyncTime() * 6 * 1000L; - long curTime2 = System.currentTimeMillis(); - Long lastTime2 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_ORDER"); - if (lastTime2 == null) { - lastTime2 = System.currentTimeMillis(); - redisUtil.set("SPS_SYNC_UPLOAD_ORDER", lastTime2); - } - try { - if (curTime2 - lastTime2 > timeInterval2) { - heartService.uploadAllOrder(null); - heartService.uploadAllBusOrder(null); - redisUtil.set("SPS_SYNC_UPLOAD_ORDER", curTime2); - } - } catch (Exception e) { - log.error(ExceptionUtils.getStackTrace(e)); - e.printStackTrace(); - } - - - //定时下载上游最近更新数据轮询时间 - long timeInterval = syncDataSetEntity.getSyncDownloadTime() * 6 * 1000; - long curTime = System.currentTimeMillis(); - Long lastTime = (Long) redisUtil.get("SPS_SYNC_DOWNLOAD_DATA"); - if (lastTime == null) { - lastTime = System.currentTimeMillis(); - redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", lastTime); - } - if (curTime - lastTime > timeInterval) { - try { - heartService.dlAllOrder(); - } catch (Exception e) { - e.printStackTrace(); - log.error(ExceptionUtils.getStackTrace(e)); - } - try { - heartService.dlAllDiProducts(); - } catch (Exception e) { - e.printStackTrace(); - log.error(ExceptionUtils.getStackTrace(e)); - } - Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { - heartService.pullData(i); - }); -// heartService.pullBasicData(); -// heartService.pullOtherData(); - redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", curTime); - } + syncHeartService.syncProcess(); + SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet(); + if (!syncDataSetEntity.isDownstreamEnable()) { + return; } + //定时下载上游最近更新数据轮询时间 + long timeInterval = syncDataSetEntity.getSyncDownloadTime() * 6 * 1000; + long curTime = System.currentTimeMillis(); + Long lastTime = (Long) redisUtil.get("SPS_SYNC_DOWNLOAD_DATA"); + if (lastTime == null) { + lastTime = System.currentTimeMillis(); + redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", lastTime); + } + if (curTime - lastTime > timeInterval) { + syncHeartService.pullData(); + } + } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 9463b3440..d6d7df54e 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -4,7 +4,7 @@ server: spring: datasource: driver-class-name: com.p6spy.engine.spy.P6SpyDriver - jdbc-url: jdbc:p6spy:mysql://127.0.0.1:3306/udi_wms_zyy?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true + jdbc-url: jdbc:p6spy:mysql://127.0.0.1:3306/udi_wms_pt?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true username: root password: 123456 hikari: @@ -55,3 +55,5 @@ back_file_path: D:/share/udisps/back/ API_KEY: 1101 API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ6L WEB_TITLE: 平潭协和医院 + +SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b diff --git a/src/main/resources/application-pro.yml b/src/main/resources/application-pro.yml index 7d2de19c9..298791fc5 100644 --- a/src/main/resources/application-pro.yml +++ b/src/main/resources/application-pro.yml @@ -52,5 +52,5 @@ back_file_path: D:/share/udisps/back/ API_KEY: 1101 API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L WEB_TITLE: 平潭协和医院 - +SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index a4c2e52f6..02f954967 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -51,4 +51,4 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs API_KEY: 1101 API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L WEB_TITLE: 平潭协和医院 - +SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b diff --git a/src/main/resources/application-wmd.yml b/src/main/resources/application-wmd.yml index f1a701c48..ae600f03b 100644 --- a/src/main/resources/application-wmd.yml +++ b/src/main/resources/application-wmd.yml @@ -52,5 +52,5 @@ back_file_path: D:/share/udisps/back/ API_KEY: 1101 API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L WEB_TITLE: 平潭协和医院 - +SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b