From ff58a80153cc59cb27a7426f903602758ccd409a Mon Sep 17 00:00:00 2001 From: anthonywj Date: Thu, 3 Aug 2023 14:26:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ewebsocket=E9=80=9A=E4=BF=A1?= =?UTF-8?q?=EF=BC=88=E4=B8=AD=E7=BB=A7=E6=9C=8D=E5=8A=A1=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api-admin/pom.xml | 30 +++- .../sale/admin/config/WebSocketConfig.java | 12 ++ .../sale/admin/config/WebSocketServer.java | 125 ------------- .../sale/admin/constant/ConstantStatus.java | 5 + .../sale/admin/constant/SocketMsgType.java | 7 + .../sale/admin/constant/TypeConstant.java | 53 ------ .../admin/entity/sync/SocketMsgEntity.java | 19 ++ .../admin/socket/client/SocketMsgService.java | 20 +++ .../socket/client/SpsWebSocketClient.java | 113 ++++++++++++ .../socket/client/WebSocketComponent.java | 83 +++++++++ .../admin/socket/server/SpsSyncWebSocket.java | 164 ++++++++++++++++++ .../sale/admin/thread/DlAllDataService.java | 14 +- .../com/glxp/sale/admin/thread/HeartTask.java | 94 +--------- .../sale/admin/thread/HeartTaskService.java | 118 +++++++++++++ .../sale/admin/util/ByteArraySplitter.java | 48 +++++ .../main/resources/application-dev.properties | 5 +- .../main/resources/application-pro.properties | 1 + 17 files changed, 627 insertions(+), 284 deletions(-) create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketConfig.java delete mode 100644 api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketServer.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/constant/SocketMsgType.java delete mode 100644 api-admin/src/main/java/com/glxp/sale/admin/constant/TypeConstant.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/entity/sync/SocketMsgEntity.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/socket/client/SocketMsgService.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/socket/client/SpsWebSocketClient.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/socket/client/WebSocketComponent.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/socket/server/SpsSyncWebSocket.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/util/ByteArraySplitter.java diff --git a/api-admin/pom.xml b/api-admin/pom.xml index 5996192..178d77b 100644 --- a/api-admin/pom.xml +++ b/api-admin/pom.xml @@ -96,11 +96,11 @@ - - - - - + + + + + com.itfsw mybatis-generator-plugin @@ -295,10 +295,10 @@ - org.apache.commons - commons-text - 1.1 - + org.apache.commons + commons-text + 1.1 + @@ -307,6 +307,18 @@ 3.5.3 + + org.springframework.boot + spring-boot-starter-websocket + + + + + org.java-websocket + Java-WebSocket + 1.5.4 + + diff --git a/api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketConfig.java b/api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketConfig.java new file mode 100644 index 0000000..16555e3 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketConfig.java @@ -0,0 +1,12 @@ +package com.glxp.sale.admin.config; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +@Configuration +public class WebSocketConfig { + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketServer.java b/api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketServer.java deleted file mode 100644 index 68093ab..0000000 --- a/api-admin/src/main/java/com/glxp/sale/admin/config/WebSocketServer.java +++ /dev/null @@ -1,125 +0,0 @@ -package com.glxp.sale.admin.config; - - -import com.alibaba.fastjson.JSON; -import com.glxp.sale.admin.entity.info.WebSocketEntity; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; -import java.io.IOException; -import java.util.concurrent.CopyOnWriteArraySet; - -@Component -@Slf4j -@Service -@ServerEndpoint("/api/websocket/{sid}") -public class WebSocketServer { - private static int onlineCount = 0; - private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); - private Session session; - private String sid = ""; - - /** - * 连接建立成功调用的方法 - */ - @OnOpen - public void onOpen(Session session, @PathParam("sid") String sid) { - this.session = session; - webSocketSet.add(this); //加入set中 - this.sid = sid; - addOnlineCount(); //在线数加1 - try { - sendMessage(new WebSocketEntity("sys", "连接成功")); - log.info("有新窗口开始监听:" + sid + ",当前在线人数为:" + getOnlineCount()); - } catch (IOException e) { - log.error("websocket IO Exception"); - } - } - - /** - * 连接关闭调用的方法 - */ - @OnClose - public void onClose() { - webSocketSet.remove(this); //从set中删除 - subOnlineCount(); //在线数减1 - //断开连接情况下,更新主板占用情况为释放 - log.info("释放的sid为:" + sid); - //这里写你 释放的时候,要处理的业务 - log.info("有一连接关闭!当前在线人数为" + getOnlineCount()); - - } - - /** - * 收到客户端消息后调用的方法 - * - * @ Param message 客户端发送过来的消息 - */ - @OnMessage - public void onMessage(String message, Session session) { - log.info("收到来自窗口" + sid + "的信息:" + message); - //群发消息 - for (WebSocketServer item : webSocketSet) { - try { - item.sendMessage(new WebSocketEntity("back", message)); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - /** - * @ Param session - * @ Param error - */ - @OnError - public void onError(Session session, Throwable error) { - log.error("发生错误"); - error.printStackTrace(); - } - - /** - * 实现服务器主动推送 - */ - public void sendMessage(WebSocketEntity webSocketEntity) throws IOException { - String message = JSON.toJSON(webSocketEntity).toString(); - this.session.getBasicRemote().sendText(message); - } - - public static void sendInfo(String message, String type) { - log.info("推送消息到窗口" + type + ",推送内容:" + message); - - for (WebSocketServer item : webSocketSet) { - try { - if (type == null) { - item.sendMessage(new WebSocketEntity("sid", message)); - } else { - item.sendMessage(new WebSocketEntity(type, message)); - } - } catch (IOException e) { - continue; - } - } - } - - - public static synchronized int getOnlineCount() { - return onlineCount; - } - - public static synchronized void addOnlineCount() { - WebSocketServer.onlineCount++; - } - - public static synchronized void subOnlineCount() { - WebSocketServer.onlineCount--; - } - - public static CopyOnWriteArraySet getWebSocketSet() { - return webSocketSet; - } -} \ No newline at end of file diff --git a/api-admin/src/main/java/com/glxp/sale/admin/constant/ConstantStatus.java b/api-admin/src/main/java/com/glxp/sale/admin/constant/ConstantStatus.java index 649de46..5b17867 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/constant/ConstantStatus.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/constant/ConstantStatus.java @@ -8,4 +8,9 @@ public class ConstantStatus { public static final String SYNC_BUS_ORDER = "AutoUploadBusOrder"; public static final String SYNC_DI_PRODUCTS = "AutoDownloadDiProducts"; + + + + + } diff --git a/api-admin/src/main/java/com/glxp/sale/admin/constant/SocketMsgType.java b/api-admin/src/main/java/com/glxp/sale/admin/constant/SocketMsgType.java new file mode 100644 index 0000000..cc75e5f --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/constant/SocketMsgType.java @@ -0,0 +1,7 @@ +package com.glxp.sale.admin.constant; + +public interface SocketMsgType { + + + String DL_ALL_DATA = "DL_ALL_DATA"; +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/constant/TypeConstant.java b/api-admin/src/main/java/com/glxp/sale/admin/constant/TypeConstant.java deleted file mode 100644 index add480c..0000000 --- a/api-admin/src/main/java/com/glxp/sale/admin/constant/TypeConstant.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.glxp.sale.admin.constant; - -public interface TypeConstant { - - String TYPE_PUT_PRODUCT = "ProduceWareHouseIn"; //生产入库 - String TYPE_PUT_PURCHASE = "PurchaseWareHouseIn"; //采购入库 - String TYPE_PUT_RETURN = "ReturnWareHouseIn"; //退货入库 - String TYPE_PUT_ALLOT = "AllocateWareHouseIn"; //调拨入库 - - String TYPE_OUT_SALE = "SalesWareHouseOut"; //销售出库 - String TYPE_OUT_RETURN = "ReturnWareHouseOut"; //退货出库 - String TYPE_OUT_ALLOT = "AllocateWareHouseOut"; //调拨出库 - String TYPE_OUT_DESTORY = "DestoryWareHouseOut"; //销毁出库 - String TYPE_OUT_STRAIGHT = "DirectAllocateWareHouseOut"; //直调出库 - String TYPE_OUT_REWORK = "ReworkWareHouseOut"; //返工出库 - String TYPE_OUT_CHECK = "CheckWareHouseOut"; //盘点 抽检出库 - String TYPE_CODE_REPLACE = "CodeReplace"; //码替换 - String TYPE_CODE_DESTORY = "CodeDestory"; //码注销 - String TYPE_STOCK_CHECK = "StockCheck"; //盘点 - - String TYPE_PUT = "WareHouseIn"; //出库 - String TYPE_OUT = "WareHouseOut"; //入库 - - - String SALE_OUT = "321"; //零售出库 (321) - String RETURNN_IN = "103"; //退货入库 - String PURCHASE_IN = "102"; //采购入库 - String RETURN_OUT = "202"; //退货出库 - String SUPPLY_IN = "107"; //供应入库 - String SUPPLY_OUT = "209"; //供应出库 - String DESTORY_OUT = "205"; //销毁出库 - String CHECK_OUT = "206"; //抽检出库 - String VACCINE = "322"; //疫苗接种 (322) - - - //状态 - //1.未校验,已校验,校验异常:process,success,error - String CHECKED = "checked";//已校验 - - String UP_NOT_UPLOAD = "up_not_upload";// 上游未上传 - String NOT_STOCK = "not_stock"; //无码库存 - - String UN_UPLOAD = "un_upload"; //未上传 - String UPLOADED = "uploaded"; //已上传 - String UPLOAD_FAIL = "upload_fail"; //上传失败 - String DEAL_FAIL = "deal_fail"; //上传处理异常 - - String DEAL_SUCCESS = "deal_success"; //上传处理成功 - String DL_FAIL = "dl_fail";//下载异常 - String DOWNLOADED = "downloaded";//已完成 - String FINISHED = "finished";//已完成 - String REMOVE = "remove"; -} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/entity/sync/SocketMsgEntity.java b/api-admin/src/main/java/com/glxp/sale/admin/entity/sync/SocketMsgEntity.java new file mode 100644 index 0000000..5b6e2f2 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/entity/sync/SocketMsgEntity.java @@ -0,0 +1,19 @@ +package com.glxp.sale.admin.entity.sync; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * websocket 消息体 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class SocketMsgEntity { + private String type; + private String content; + private String remark; +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/socket/client/SocketMsgService.java b/api-admin/src/main/java/com/glxp/sale/admin/socket/client/SocketMsgService.java new file mode 100644 index 0000000..8c8c1e0 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/socket/client/SocketMsgService.java @@ -0,0 +1,20 @@ +package com.glxp.sale.admin.socket.client; + +import com.glxp.sale.admin.socket.server.SpsSyncWebSocket; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class SocketMsgService { + + @Value("${SPMS_WEBSOCKET_TOKEN}") + String token; + @Resource + SpsSyncWebSocket spsSyncWebSocket; + + public void sendNoticeMsg(String message) { + spsSyncWebSocket.sendMessage(message, "2:" + token); + } +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/socket/client/SpsWebSocketClient.java b/api-admin/src/main/java/com/glxp/sale/admin/socket/client/SpsWebSocketClient.java new file mode 100644 index 0000000..ffd78b7 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/socket/client/SpsWebSocketClient.java @@ -0,0 +1,113 @@ +package com.glxp.sale.admin.socket.client; + +import com.alibaba.fastjson.JSONObject; +import com.glxp.sale.admin.constant.SocketMsgType; +import org.java_websocket.WebSocket; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.framing.Framedata; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Resource; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + + +public class SpsWebSocketClient extends WebSocketClient { + + private String excptMessage; + + Logger log = LoggerFactory.getLogger(SpsWebSocketClient.class); + + private static List list = new ArrayList<>(); + @Resource + SocketMsgService socketMsgService; + + public void setSocketMsgService(SocketMsgService socketMsgService) { + this.socketMsgService = socketMsgService; + } + + public SpsWebSocketClient(String serverUri) throws URISyntaxException { + super(new URI(serverUri)); + this.setConnectionLostTimeout(0); + if (list.isEmpty()) { + return; + } + for (SpsWebSocketClient client : list) { + client.close(); + } + list.clear(); + list.add(this); + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + log.info("在线日志socket连接成功"); + } + + @Override + public void onMessage(String s) { + log.info("收到消息:" + s); + //收到更新下载数据则赋值由外部处理,多个任务只处理一次避免重复处理 + if (s.contains(SocketMsgType.DL_ALL_DATA)) { + this.excptMessage = s; + } else { + //通知类消息则需转发 + socketMsgService.sendNoticeMsg(s); + } + + } + + @Override + public void onClose(int i, String s, boolean b) { + log.info("在线日志socket断开"); + } + + @Override + public void onError(Exception e) { + e.printStackTrace(); + } + + @Override + public void onWebsocketPing(WebSocket conn, Framedata f) { + try { + Thread.sleep(1000 * 5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + send("---pong---"); + } + + @Deprecated + public static void destroy() { + if (list.isEmpty()) + return; + for (SpsWebSocketClient client : list) { + client.close(); + } + list.clear(); + + } + + //发送消息 + public void sendMessage(String message) { + this.send(message); + System.out.println("已发送消息:" + message); + } + + //获取接收到的信息 + public String getExcptMessage() { + if (excptMessage != null) { + String message = new String(excptMessage); + excptMessage = null; + return message; + } + return null; + } + + +} + diff --git a/api-admin/src/main/java/com/glxp/sale/admin/socket/client/WebSocketComponent.java b/api-admin/src/main/java/com/glxp/sale/admin/socket/client/WebSocketComponent.java new file mode 100644 index 0000000..00a5891 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/socket/client/WebSocketComponent.java @@ -0,0 +1,83 @@ +package com.glxp.sale.admin.socket.client; + +import com.glxp.sale.admin.constant.SocketMsgType; +import com.glxp.sale.admin.service.param.SystemParamConfigService; +import com.glxp.sale.admin.thread.HeartTask; +import com.glxp.sale.admin.thread.HeartTaskService; +import lombok.extern.slf4j.Slf4j; +import org.java_websocket.enums.ReadyState; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.net.URISyntaxException; + +@Slf4j +@Component +public class WebSocketComponent implements CommandLineRunner { + + @Value("${SPMS_WEBSOCKET_TOKEN}") + private String socketToken; + + @Resource + SystemParamConfigService systemParamConfigService; + @Resource + HeartTaskService heartTaskService; + @Resource + SocketMsgService socketMsgService; + + @Override + public void run(String... args) throws Exception { + String ip = systemParamConfigService.selectValueByParamKey("upper_server_ip"); + ip = ip.replace("http://", ""); + try { + SpsWebSocketClient client = new SpsWebSocketClient("ws://" + ip + "/spms/sync/1/" + socketToken); + client.setSocketMsgService(socketMsgService); + initConnect(client); + //等待WebSocket服务端响应 + String message = null; + while (true) { + while ((message = client.getExcptMessage()) == null) { + log.info("已连接,等待接收数据--------"); + Thread.sleep(1000); + if (client.isClosed()) { + initConnect(client); + } + } + if (message.contains(SocketMsgType.DL_ALL_DATA)) { + heartTaskService.dlAllData(); + } + //打印服务端返回的数据 + log.info("成功获取数据:" + message); + } + + } catch (URISyntaxException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void initConnect(SpsWebSocketClient client) { + if (client == null && client.isOpen()) { + log.info("WebSocket已连接,不需要重连"); + return; + } + log.info("重新建立连接"); + client.connect(); + while (!client.getReadyState().equals(ReadyState.OPEN)) { + log.info("连接中···请稍后"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (client.getReadyState().equals(ReadyState.OPEN)) { + return; + } else if (client.getReadyState().equals(ReadyState.CLOSED)) { + client.reconnect(); + } + } + } +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/socket/server/SpsSyncWebSocket.java b/api-admin/src/main/java/com/glxp/sale/admin/socket/server/SpsSyncWebSocket.java new file mode 100644 index 0000000..9eaca43 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/socket/server/SpsSyncWebSocket.java @@ -0,0 +1,164 @@ +package com.glxp.sale.admin.socket.server; + +import cn.hutool.json.JSONUtil; +import com.glxp.sale.admin.entity.sync.SocketMsgEntity; +import com.glxp.sale.admin.util.ByteArraySplitter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +@Component +@ServerEndpoint(value = "/sps/web/sync/{type}/{token}") +@Slf4j +public class SpsSyncWebSocket { + + //与某个客户端的连接会话,需要通过它来给客户端发送数据 + private Session session; + @Value("${SPMS_WEBSOCKET_TOKEN}") + private String socketToken; + + private static int onlineCount = 0; + + private static ConcurrentHashMap socketServersMap = new ConcurrentHashMap<>(); + + private String key = ""; + + + /** + * 建立连接 + * + * @param session + * @param type type为register/verify + */ + @OnOpen + public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) { + this.session = session; + key = type + ":" + token + ":" + session.getId(); + log.info("onOpen->>key:{}", key); + boolean flag = socketServersMap.containsKey(key); + if (flag) { + socketServersMap.remove(key); + socketServersMap.put(key, this); + } else { + socketServersMap.put(key, this); + addOnlineCount(); + } +// if (StrUtil.isEmpty(token) || !token.equals(socketToken)) { +// throw new ServiceException("token验证异常"); +// } + log.info("online number:{}", getOnlineCount()); + } + + @OnMessage + public void onMessage(String message, Session session) { + log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId()); + } + + @OnClose + public void onClose() { + log.info("onClose : {}" + key); + if (socketServersMap.containsKey(key)) { + socketServersMap.remove(key); + subOnlineCount(); //在线数减1 + } + log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount()); + } + + @OnError + public void onError(Throwable error) { + socketServersMap.remove(key); + subOnlineCount(); + log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage()); + } + + + /** + * @param message 数据推送 + * @param sessionId token + */ + public synchronized void sendMessage(String message, String sessionId) { + synchronized (this.getClass()) { + for (Map.Entry stringMyWebSocketEntry : socketServersMap.entrySet()) { + try { + String key = stringMyWebSocketEntry.getKey(); + SpsSyncWebSocket value = stringMyWebSocketEntry.getValue(); + if (key.contains(sessionId)) { + log.info("推送的消息为:" + key); + value.session.getBasicRemote().sendText(message); +// List result = ByteArraySplitter.split(message, 64 * 1024); +// for (byte[] bytes : result) { +// String s = new String(bytes); +// value.session.getBasicRemote().sendText(s); +// } + // 推送结束符 + value.session.getBasicRemote().sendText("#end#"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + /** + * @param socketMsgEntity 数据推送,只做通知消息推送,数据量不宜过大 + * @param sessionId token + */ + public synchronized void sendMessage(SocketMsgEntity socketMsgEntity, String sessionId) { + synchronized (this.getClass()) { + for (Map.Entry stringMyWebSocketEntry : socketServersMap.entrySet()) { + try { + String key = stringMyWebSocketEntry.getKey(); + SpsSyncWebSocket value = stringMyWebSocketEntry.getValue(); + if (key.contains(sessionId)) { + log.info("推送的消息为:" + key); + value.session.getBasicRemote().sendText(JSONUtil.toJsonStr(socketMsgEntity)); +// List result = ByteArraySplitter.split(message, 64 * 1024); +// for (byte[] bytes : result) { +// String s = new String(bytes); +// value.session.getBasicRemote().sendText(s); +// } + // 推送结束符 + value.session.getBasicRemote().sendText("#end#"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + + /** + * 获取在线数 + * + * @return + */ + public static synchronized int getOnlineCount() { + return onlineCount; + } + + /** + * 在线数+1 + */ + public static synchronized void addOnlineCount() { + SpsSyncWebSocket.onlineCount++; + } + + /** + * 在线数-1 + */ + public static synchronized void subOnlineCount() { + SpsSyncWebSocket.onlineCount--; + } +} + diff --git a/api-admin/src/main/java/com/glxp/sale/admin/thread/DlAllDataService.java b/api-admin/src/main/java/com/glxp/sale/admin/thread/DlAllDataService.java index b4c7ca8..c680535 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/thread/DlAllDataService.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/thread/DlAllDataService.java @@ -1,7 +1,6 @@ package com.glxp.sale.admin.thread; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; @@ -13,30 +12,30 @@ import com.glxp.sale.admin.dto.RelaySyncDto; import com.glxp.sale.admin.entity.param.SystemParamConfigEntity; import com.glxp.sale.admin.entity.sync.BasicExportStatusEntity; import com.glxp.sale.admin.entity.sync.ProductInfoEntity; +import com.glxp.sale.admin.entity.sync.SocketMsgEntity; import com.glxp.sale.admin.entity.sync.UdiCompanyEntity; import com.glxp.sale.admin.http.SpGetHttp; import com.glxp.sale.admin.idc.service.IdcService; import com.glxp.sale.admin.req.sync.BasicExportStatusRequest; import com.glxp.sale.admin.req.sync.SpsSyncDataRequest; -import com.glxp.sale.admin.res.sync.*; +import com.glxp.sale.admin.res.sync.SpSyncUdiResponse; +import com.glxp.sale.admin.res.sync.SpsSyncDataResponse; +import com.glxp.sale.admin.res.sync.SpsSyncScheduleResponse; import com.glxp.sale.admin.service.param.SystemParamConfigService; import com.glxp.sale.admin.service.sync.BasicExportService; +import com.glxp.sale.admin.socket.server.SpsSyncWebSocket; import com.glxp.sale.admin.thread.didl.AsyncDiDlHelper; import com.glxp.sale.admin.util.*; -import com.glxp.sale.common.enums.ResultEnum; import com.glxp.sale.common.res.BaseResponse; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; -import java.awt.*; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -260,7 +259,7 @@ public class DlAllDataService { */ @Transactional(rollbackFor = Exception.class) public void pullData(BasicExportTypeEnum exportType) { - switch (exportType){ + switch (exportType) { case IO_ORDER: case COUNTRY_DI_DATA: case SYS_SET_DATA: @@ -501,6 +500,7 @@ public class DlAllDataService { spGetHttp.postUpdateBasicStatus(basicExportStatusEntity1); spGetHttp.updateLastTime("AutoDownloadDiProducts", DateUtil.formatDate(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss")); + } else { SpsSyncDataRequest spsSyncDataRequest = new SpsSyncDataRequest(); spsSyncDataRequest.setId(basicExportStatusEntity.getId()); diff --git a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java index d29fa67..8965aa3 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java @@ -31,17 +31,13 @@ public class HeartTask implements SchedulingConfigurer { @Resource protected ScheduledDao scheduledDao; @Resource + HeartTaskService heartTaskService; + @Resource SystemParamConfigService systemParamConfigService; @Resource RedisUtil redisUtil; @Resource - ScanUploadService scanUploadService; - @Resource - ScanDownloadService scanDownloadService; - @Resource DlAllDataService dlAllDataService; - @Resource - AsyncDiDlService asyncDiDlService; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { @@ -62,71 +58,6 @@ public class HeartTask implements SchedulingConfigurer { private void process() { logger.info("------------心跳任务-----------------"); - dlData(); - } - - //定时从上游下载数据 - private void dlData() { - SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable"); - if (upConnect != null && upConnect.getParamValue().equals("1")) { - dlAllData(); - scanUpload(); - } - SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable"); - if (donwConnect != null && donwConnect.getParamValue().equals("1")) { - scanDonwload(); - } - - } - - - private void scanUpload() { - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload"); - long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000; - long curTime = System.currentTimeMillis(); - //定时扫描 - Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS"); - if (lastTime == null) { - lastTime = System.currentTimeMillis(); - redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60); - } - if (curTime - lastTime > timeInterval) { - redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime); - scanUploadService.scanAllDatas(); - scanUploadService.scanAllBus(); - scanUploadService.scanAllOrders(); - scanUploadService.scanAllSchedule(); - } - } - - - private void scanDonwload() { - SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable"); - - if (upConnect.getParamValue().equals("1")) { - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status"); - long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000; - long curTime = System.currentTimeMillis(); - Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS"); - if (lastTime == null) { - lastTime = System.currentTimeMillis(); - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime); - } - if (curTime - lastTime > timeInterval) { - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime); - scanDownloadService.scanAllData(); - scanDownloadService.scanAllBus(); - scanDownloadService.scanAllOrder(); - scanDownloadService.scanScheduleList(); - scanDownloadService.scanUdis(); - } - } - - } - - private void dlAllData() { - - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status"); long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 6 * 1000; long curTime = System.currentTimeMillis(); @@ -140,25 +71,12 @@ public class HeartTask implements SchedulingConfigurer { redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime); } if (curTime - lastTime > timeInterval) { - logger.info("定时从上游下载全部据-----"); - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime); - String doing = (String) redisUtil.get("is_doing_download"); - if (doing == null || doing.equals("false")) { - redisUtil.set("is_doing_download", "true", 60); -// dlAllDataService.dllNewAllData(); - dlAllDataService.dllNewAllOrder(); - Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { - dlAllDataService.pullData(i); - }); - -// dlAllDataService.dllNewAllBusType(); -// dlAllDataService.dlScheduleStatus(); - dlAllDataService.dlAllDi(); - redisUtil.set("is_doing_download", "false"); - } - logger.info("定时从上游下载全部据-----结束"); + heartTaskService.dlAllData(); + } else { + logger.info("时间未到"); } } + } diff --git a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java new file mode 100644 index 0000000..68ae99f --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java @@ -0,0 +1,118 @@ +package com.glxp.sale.admin.thread; + +import com.glxp.sale.admin.constant.BasicExportTypeEnum; +import com.glxp.sale.admin.constant.SocketMsgType; +import com.glxp.sale.admin.entity.param.SystemParamConfigEntity; +import com.glxp.sale.admin.entity.sync.SocketMsgEntity; +import com.glxp.sale.admin.service.param.SystemParamConfigService; +import com.glxp.sale.admin.socket.server.SpsSyncWebSocket; +import com.glxp.sale.admin.util.RedisUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Arrays; + +@Service +public class HeartTaskService { + + final Logger logger = LoggerFactory.getLogger(HeartTask.class); + @Resource + SystemParamConfigService systemParamConfigService; + @Resource + RedisUtil redisUtil; + @Resource + ScanUploadService scanUploadService; + @Resource + ScanDownloadService scanDownloadService; + @Resource + DlAllDataService dlAllDataService; + @Resource + SpsSyncWebSocket spsSyncWebSocket; + @Value("${SPMS_WEBSOCKET_TOKEN}") + private String socketToken; + + //定时从上游下载数据 + public void dlData() { + SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable"); + if (upConnect != null && upConnect.getParamValue().equals("1")) { + dlAllData(); + scanUpload(); + } + SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable"); + if (donwConnect != null && donwConnect.getParamValue().equals("1")) { + scanDonwload(); + } + } + + private void scanUpload() { + SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload"); + long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000; + long curTime = System.currentTimeMillis(); + //定时扫描 + Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS"); + if (lastTime == null) { + lastTime = System.currentTimeMillis(); + redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60); + } + if (curTime - lastTime > timeInterval) { + redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime); + scanUploadService.scanAllDatas(); + scanUploadService.scanAllBus(); + scanUploadService.scanAllOrders(); + scanUploadService.scanAllSchedule(); + } + } + + private void scanDonwload() { + SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable"); + if (upConnect.getParamValue().equals("1")) { + SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status"); + long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000; + long curTime = System.currentTimeMillis(); + Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS"); + if (lastTime == null) { + lastTime = System.currentTimeMillis(); + redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime); + } + if (curTime - lastTime > timeInterval) { + redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime); + scanDownloadService.scanAllData(); + scanDownloadService.scanAllBus(); + scanDownloadService.scanAllOrder(); + scanDownloadService.scanScheduleList(); + scanDownloadService.scanUdis(); + } + } + + } + + public void dlUpAllData() { + SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable"); + if (upConnect != null && upConnect.getParamValue().equals("1")) { + dlAllData(); + } + } + + + public void dlAllData() { + logger.info("定时从上游下载全部据-----"); + redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", System.currentTimeMillis()); + String doing = (String) redisUtil.get("is_doing_download"); + if (doing == null || doing.equals("false")) { + redisUtil.set("is_doing_download", "true", 60); + dlAllDataService.dllNewAllOrder(); + Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { + dlAllDataService.pullData(i); + }); + dlAllDataService.dlAllDi(); + redisUtil.set("is_doing_download", "false"); + spsSyncWebSocket.sendMessage(SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build(), "2:" + socketToken); + + } + logger.info("定时从上游下载全部据-----结束"); + + } +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/util/ByteArraySplitter.java b/api-admin/src/main/java/com/glxp/sale/admin/util/ByteArraySplitter.java new file mode 100644 index 0000000..73133f7 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/util/ByteArraySplitter.java @@ -0,0 +1,48 @@ +package com.glxp.sale.admin.util; + +import cn.hutool.core.util.ArrayUtil; +import com.beust.jcommander.internal.Lists; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; + +public class ByteArraySplitter { + + /** + * 对String分片转换为List + * + * @param source 字符串 + * @param size 分片的长度 单位字节 + * @return + */ + public static List split(String source, int size) { + // 存放最终结果 + List result = Lists.newArrayList(); + + if (StringUtils.isEmpty(source)) { + return null; + } + + byte[] sourceBytes = source.getBytes(); + if (size > sourceBytes.length) { + result.add(sourceBytes); + return result; + } + // 开始进行split + int startIndex = 0; + int endIndex = sourceBytes.length - 1; + boolean isRunning = true; + while (isRunning) { + if ((endIndex + 1) - startIndex > size) { + result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size)); + startIndex += size; + } else { + result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1)); + isRunning = false; + } + } + return result; + } +} + + diff --git a/api-admin/src/main/resources/application-dev.properties b/api-admin/src/main/resources/application-dev.properties index 1f6055f..42404d4 100644 --- a/api-admin/src/main/resources/application-dev.properties +++ b/api-admin/src/main/resources/application-dev.properties @@ -35,8 +35,8 @@ UDIWMS_IP=http://127.0.0.1:9991 #自助平台地址 SPMS_IP=http://127.0.0.1:9906 #SPMS_IP=http://139.9.178.73:8080/SPMS_SERVER -API_KEY=1101 -API_SECRET=zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L +API_KEY:1101 +API_SECRET:zBITspLNvuoEd4FaamlSoqxRHmNsmQ6L #SPMS_IP=http://139.159.187.130:8080/SPMS_SERVER ##端口号 # Redis数据库索引(默认为0) @@ -57,3 +57,4 @@ spring.redis.jedis.pool.max-idle=8 spring.redis.jedis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.jedis.timeout=300 +SPMS_WEBSOCKET_TOKEN=07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b diff --git a/api-admin/src/main/resources/application-pro.properties b/api-admin/src/main/resources/application-pro.properties index 1f6055f..646c73f 100644 --- a/api-admin/src/main/resources/application-pro.properties +++ b/api-admin/src/main/resources/application-pro.properties @@ -57,3 +57,4 @@ spring.redis.jedis.pool.max-idle=8 spring.redis.jedis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.jedis.timeout=300 +SPMS_WEBSOCKET_TOKEN=07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b