新增websocket通信(中继服务同步相关)

dev2.0
anthonywj 2 years ago
parent 6647fbb9d7
commit 1476a0f894

@ -0,0 +1,16 @@
package com.glxp.api.constant;
public interface SocketMsgType {
String DL_ALL_DATA = "DL_ALL_DATA"; //生产入库
String DL_NOTICE = "DL_NOTICE"; //通知类消息
/**
*
*/
String TASK_INVOICE_CONFIRM = "TASK_INVOICE_CONFIRM"; //发票确认
}

@ -0,0 +1,20 @@
package com.glxp.api.entity.sync;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* websocket
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SocketMsgEntity {
private String type;
private String content;
private String remark;
}

@ -0,0 +1,40 @@
package com.glxp.api.service.sync;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.glxp.api.common.res.BaseResponse;
import com.glxp.api.constant.SocketMsgType;
import com.glxp.api.entity.sync.SocketMsgEntity;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class SocketMsgService {
@Value("${SPMS_WEBSOCKET_TOKEN}")
String token;
@Async
public void dealNoticeMsg(SocketMsgEntity socketMsgEntity) {
if (socketMsgEntity != null && StrUtil.isNotEmpty(socketMsgEntity.getType())) {
switch (socketMsgEntity.getType()) {
case SocketMsgType.TASK_INVOICE_CONFIRM:
invoiceConfirm(socketMsgEntity);
break;
default:
break;
}
}
}
//发票确认
public void invoiceConfirm(SocketMsgEntity socketMsgEntity) {
}
}

@ -0,0 +1,120 @@
package com.glxp.api.service.sync;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.glxp.api.constant.SocketMsgType;
import com.glxp.api.entity.sync.SocketMsgEntity;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
public class SpsWebSocketClient extends WebSocketClient {
private String excptMessage;
Logger log = LoggerFactory.getLogger(SpsWebSocketClient.class);
private static List<SpsWebSocketClient> list = new ArrayList<>();
SocketMsgService socketMsgService;
public void setSocketMsgService(SocketMsgService socketMsgService) {
this.socketMsgService = socketMsgService;
}
public SpsWebSocketClient(String serverUri) throws URISyntaxException {
super(new URI(serverUri));
this.setConnectionLostTimeout(0);
if (list.isEmpty()) {
return;
}
for (SpsWebSocketClient client : list) {
client.close();
}
list.clear();
list.add(this);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("在线日志socket连接成功");
}
@Override
public void onMessage(String s) {
log.info("收到消息:" + s);
if (StrUtil.isNotEmpty(s)) {
SocketMsgEntity socketMsgEntity = JSONObject.parseObject(s, SocketMsgEntity.class);
//收到更新下载数据则赋值由外部处理,多个任务只处理一次避免重复处理
if (socketMsgEntity.getType().equals(SocketMsgType.DL_ALL_DATA)) {
this.excptMessage = s;
} else {
//通知类消息则需转发
socketMsgService.dealNoticeMsg(socketMsgEntity);
}
} else {
log.error("无法识别消息!");
}
}
@Override
public void onClose(int i, String s, boolean b) {
log.info("在线日志socket断开");
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
send("---pong---");
}
@Deprecated
public static void destroy() {
if (list.isEmpty())
return;
for (SpsWebSocketClient client : list) {
client.close();
}
list.clear();
}
//发送消息
public void sendMessage(String message) {
this.send(message);
System.out.println("已发送消息:" + message);
}
//获取接收到的信息
public String getExcptMessage() {
if (excptMessage != null) {
String message = new String(excptMessage);
excptMessage = null;
return message;
}
return null;
}
}

@ -0,0 +1,93 @@
package com.glxp.api.service.sync;
import com.glxp.api.constant.SocketMsgType;
import com.glxp.api.dao.system.SyncDataSetDao;
import com.glxp.api.entity.system.SyncDataSetEntity;
import com.glxp.api.service.system.SystemParamConfigService;
import com.glxp.api.task.SyncHeartService;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.enums.ReadyState;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.URISyntaxException;
@Slf4j
@Component
public class WebSocketComponent implements CommandLineRunner {
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
@Resource
SyncHeartService heartTaskService;
@Resource
private SyncDataSetDao syncDataSetDao;
@Resource
SocketMsgService socketMsgService;
@Override
public void run(String... args) throws Exception {
SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet();
String ip = syncDataSetEntity.getSyncIp();
ip = ip.replace("http://", "");
try {
SpsWebSocketClient client = new SpsWebSocketClient("ws://" + ip + "/sps/web/sync/2/" + socketToken);
client.setSocketMsgService(socketMsgService);
initConnect(client);
//等待服务端响应
while (!client.getReadyState().equals(ReadyState.OPEN)) {
log.info("连接中···请稍后");
Thread.sleep(1000);
}
//等待WebSocket服务端响应
String message = null;
while (true) {
while ((message = client.getExcptMessage()) == null) {
log.info("已连接,等待接收数据--------");
Thread.sleep(1000);
if (client.isClosed()) {
initConnect(client);
}
}
if (message.equals(SocketMsgType.DL_ALL_DATA)) {
heartTaskService.pullData();
}
//打印服务端返回的数据
log.info("成功获取数据:" + message);
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void initConnect(SpsWebSocketClient client) {
if (client == null && client.isOpen()) {
log.info("WebSocket已连接不需要重连");
return;
}
log.info("重新建立连接");
client.connect();
while (!client.getReadyState().equals(ReadyState.OPEN)) {
log.info("连接中···请稍后");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (client.getReadyState().equals(ReadyState.OPEN)) {
return;
} else if (client.getReadyState().equals(ReadyState.CLOSED)) {
client.reconnect();
}
}
}
}

@ -0,0 +1,105 @@
package com.glxp.api.task;
import com.glxp.api.constant.BasicExportTypeEnum;
import com.glxp.api.dao.system.SyncDataSetDao;
import com.glxp.api.entity.system.SyncDataSetEntity;
import com.glxp.api.service.sync.HeartService;
import com.glxp.api.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
@Slf4j
@Service
public class SyncHeartService {
@Resource
RedisUtil redisUtil;
@Resource
HeartService heartService;
@Resource
private SyncDataSetDao syncDataSetDao;
public void syncProcess() {
//查询数据同步设置
pushData();
pushOrder();
pullData();
}
public void pushData() {
SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet();
if (!syncDataSetEntity.isDownstreamEnable()) {
return;
}
//定时上传最近更新基础数据至上游轮询时间
long timeInterval1 = syncDataSetEntity.getSyncTime() * 6 * 1000L;
long curTime1 = System.currentTimeMillis();
Long lastTime1 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_DATA");
if (lastTime1 == null) {
lastTime1 = System.currentTimeMillis();
redisUtil.set("SPS_SYNC_UPLOAD_DATA", lastTime1);
}
try {
if (curTime1 - lastTime1 > timeInterval1) {
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
heartService.pushData(syncDataSetEntity, null, i);
});
redisUtil.set("SPS_SYNC_UPLOAD_DATA", curTime1);
}
} catch (Exception e) {
log.error(ExceptionUtils.getStackTrace(e));
e.printStackTrace();
}
}
public void pushOrder() {
SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet();
if (!syncDataSetEntity.isDownstreamEnable()) {
return;
}
//定时上传最近更新单据数据至上游轮询时间
long timeInterval2 = syncDataSetEntity.getOrderSyncTime() * 6 * 1000L;
long curTime2 = System.currentTimeMillis();
Long lastTime2 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_ORDER");
if (lastTime2 == null) {
lastTime2 = System.currentTimeMillis();
redisUtil.set("SPS_SYNC_UPLOAD_ORDER", lastTime2);
}
try {
if (curTime2 - lastTime2 > timeInterval2) {
heartService.uploadAllOrder(null);
heartService.uploadAllBusOrder(null);
redisUtil.set("SPS_SYNC_UPLOAD_ORDER", curTime2);
}
} catch (Exception e) {
log.error(ExceptionUtils.getStackTrace(e));
e.printStackTrace();
}
}
public void pullData() {
try {
heartService.dlAllOrder();
} catch (Exception e) {
e.printStackTrace();
log.error(ExceptionUtils.getStackTrace(e));
}
try {
heartService.dlAllDiProducts();
} catch (Exception e) {
e.printStackTrace();
log.error(ExceptionUtils.getStackTrace(e));
}
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
heartService.pullData(i);
});
redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", System.currentTimeMillis());
}
}

@ -10,8 +10,6 @@ import com.glxp.api.service.sync.HeartService;
import com.glxp.api.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
@ -26,10 +24,11 @@ import java.util.Arrays;
@EnableScheduling
public class SyncHeartTask implements SchedulingConfigurer {
final Logger logger = LoggerFactory.getLogger(SyncHeartTask.class);
@Resource
protected ScheduledDao scheduledDao;
@Resource
SyncHeartService syncHeartService;
@Resource
RedisUtil redisUtil;
@Resource
HeartService heartService;
@ -54,91 +53,25 @@ public class SyncHeartTask implements SchedulingConfigurer {
private void process() {
log.info("数据同步心跳--");
//查询数据同步设置
SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet();
if (syncDataSetEntity.isDownstreamEnable()) {
//定时上传最近更新基础数据至上游轮询时间
long timeInterval1 = syncDataSetEntity.getSyncTime() * 6 * 1000L;
long curTime1 = System.currentTimeMillis();
Long lastTime1 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_DATA");
if (lastTime1 == null) {
lastTime1 = System.currentTimeMillis();
redisUtil.set("SPS_SYNC_UPLOAD_DATA", lastTime1);
}
try {
if (curTime1 - lastTime1 > timeInterval1) {
// heartService.uploadAllBus(null);
// heartService.uploadAllUserData(null);
// heartService.uploadScheduleList();
// heartService.uploadThrData(null);
// heartService.uploadThrProducts(null);
// heartService.uploadSysSetting(null);
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
heartService.pushData(syncDataSetEntity, null, i);
});
redisUtil.set("SPS_SYNC_UPLOAD_DATA", curTime1);
}
} catch (Exception e) {
log.error(ExceptionUtils.getStackTrace(e));
e.printStackTrace();
}
//定时上传最近更新单据数据至上游轮询时间
long timeInterval2 = syncDataSetEntity.getOrderSyncTime() * 6 * 1000L;
long curTime2 = System.currentTimeMillis();
Long lastTime2 = (Long) redisUtil.get("SPS_SYNC_UPLOAD_ORDER");
if (lastTime2 == null) {
lastTime2 = System.currentTimeMillis();
redisUtil.set("SPS_SYNC_UPLOAD_ORDER", lastTime2);
}
try {
if (curTime2 - lastTime2 > timeInterval2) {
heartService.uploadAllOrder(null);
heartService.uploadAllBusOrder(null);
redisUtil.set("SPS_SYNC_UPLOAD_ORDER", curTime2);
}
} catch (Exception e) {
log.error(ExceptionUtils.getStackTrace(e));
e.printStackTrace();
}
//定时下载上游最近更新数据轮询时间
long timeInterval = syncDataSetEntity.getSyncDownloadTime() * 6 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SPS_SYNC_DOWNLOAD_DATA");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", lastTime);
}
if (curTime - lastTime > timeInterval) {
try {
heartService.dlAllOrder();
} catch (Exception e) {
e.printStackTrace();
log.error(ExceptionUtils.getStackTrace(e));
}
try {
heartService.dlAllDiProducts();
} catch (Exception e) {
e.printStackTrace();
log.error(ExceptionUtils.getStackTrace(e));
}
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
heartService.pullData(i);
});
// heartService.pullBasicData();
// heartService.pullOtherData();
redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", curTime);
}
syncHeartService.syncProcess();
SyncDataSetEntity syncDataSetEntity = syncDataSetDao.selectSet();
if (!syncDataSetEntity.isDownstreamEnable()) {
return;
}
//定时下载上游最近更新数据轮询时间
long timeInterval = syncDataSetEntity.getSyncDownloadTime() * 6 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SPS_SYNC_DOWNLOAD_DATA");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("SPS_SYNC_DOWNLOAD_DATA", lastTime);
}
if (curTime - lastTime > timeInterval) {
syncHeartService.pullData();
}
}

@ -4,7 +4,7 @@ server:
spring:
datasource:
driver-class-name: com.p6spy.engine.spy.P6SpyDriver
jdbc-url: jdbc:p6spy:mysql://127.0.0.1:3306/udi_wms_zyy?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
jdbc-url: jdbc:p6spy:mysql://127.0.0.1:3306/udi_wms_pt?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
hikari:
@ -55,3 +55,5 @@ back_file_path: D:/share/udisps/back/
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ6L
WEB_TITLE: 平潭协和医院
SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -52,5 +52,5 @@ back_file_path: D:/share/udisps/back/
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 平潭协和医院
SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -51,4 +51,4 @@ SPMS_KEY: lCOdWCBKS6Kw45wdnnqUTELXyuSKnXEs
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 平潭协和医院
SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -52,5 +52,5 @@ back_file_path: D:/share/udisps/back/
API_KEY: 1101
API_SECRET: zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
WEB_TITLE: 平潭协和医院
SPMS_WEBSOCKET_TOKEN: 07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

Loading…
Cancel
Save