package com.glxp.api.service.sync; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.json.JSONUtil; import com.glxp.api.constant.SocketMsgType; import com.glxp.api.dao.system.SyncDataSetDao; import com.glxp.api.entity.system.SyncDataSetEntity; import com.glxp.api.req.inout.FilterStatDataRequest; import com.glxp.api.res.inout.IoStatOrderResponse; import com.glxp.api.service.inout.IoStatOrderService; import com.glxp.api.task.SyncHeartService; import lombok.extern.slf4j.Slf4j; import org.java_websocket.enums.ReadyState; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.URISyntaxException; import java.util.List; @Slf4j @Component public class WebSocketComponent implements CommandLineRunner { @Value("${SPMS_WEBSOCKET_TOKEN}") private String socketToken; @Resource SyncHeartService heartTaskService; @Resource private SyncDataSetDao syncDataSetDao; @Resource SocketMsgService socketMsgService; @Resource IoStatOrderService statOrderService; @Override public void run(String... args) throws Exception { ThreadUtil.execAsync(() -> { SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet(); String ip = syncDataSetEntity.getSyncIp(); ip = ip.replace("http://", ""); // ip = ip.replace("/SP_SYNC_SERVER", ""); try { SpsWebSocketClient client = new SpsWebSocketClient("ws://" + ip + "/sps/web/sync/2/" + socketToken); client.setSocketMsgService(socketMsgService); initConnect(client); //等待服务端响应 while (!client.getReadyState().equals(ReadyState.OPEN)) { // log.info("连接中···请稍后"); Thread.sleep(1000); } //等待WebSocket服务端响应 String message = null; while (true) { while ((message = client.getExcptMessage()) == null) { log.info("已连接,等待接收数据--------"); Thread.sleep(1000); if (client.isClosed()) { initConnect(client); } } if (message.equals(SocketMsgType.DL_ALL_DATA)) { heartTaskService.pullData(null); } if (message.contains(SocketMsgType.STAT_DATA)) { cn.hutool.json.JSONObject obj = JSONUtil.parseObj(message); String content = obj.getStr("content"); cn.hutool.json.JSONObject obj1 = JSONUtil.parseObj(content); FilterStatDataRequest request = JSONUtil.toBean(obj1.getStr("content"), FilterStatDataRequest.class); List ioStatOrderResponses = statOrderService.filterList(request); client.sendMessage(ioStatOrderResponses.toString()); } Thread.sleep(500); //打印服务端返回的数据 log.info("成功获取数据:" + message); } } catch (URISyntaxException e) { // e.printStackTrace(); } catch (InterruptedException e) { // e.printStackTrace(); } }); } public void initConnect(SpsWebSocketClient client) { if (client == null && client.isOpen()) { log.info("WebSocket已连接,不需要重连"); return; } log.info("重新建立连接"); client.connect(); while (!client.getReadyState().equals(ReadyState.OPEN)) { // log.info("连接中···请稍后"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (client.getReadyState().equals(ReadyState.OPEN)) { return; } else if (client.getReadyState().equals(ReadyState.CLOSED)) { client.reconnect(); } } } }