新增websocket通信(中继服务同步相关)

dev
anthonywj 2 years ago
parent 2314e8076b
commit ff58a80153

@ -307,6 +307,18 @@
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- https://github.com/TooTallNate/Java-WebSocket-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.4</version>
</dependency>
</dependencies>
<build>

@ -0,0 +1,12 @@
package com.glxp.sale.admin.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

@ -1,125 +0,0 @@
package com.glxp.sale.admin.config;
import com.alibaba.fastjson.JSON;
import com.glxp.sale.admin.entity.info.WebSocketEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@Slf4j
@Service
@ServerEndpoint("/api/websocket/{sid}")
public class WebSocketServer {
private static int onlineCount = 0;
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
private Session session;
private String sid = "";
/**
*
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
webSocketSet.add(this); //加入set中
this.sid = sid;
addOnlineCount(); //在线数加1
try {
sendMessage(new WebSocketEntity("sys", "连接成功"));
log.info("有新窗口开始监听:" + sid + ",当前在线人数为:" + getOnlineCount());
} catch (IOException e) {
log.error("websocket IO Exception");
}
}
/**
*
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
//断开连接情况下,更新主板占用情况为释放
log.info("释放的sid为" + sid);
//这里写你 释放的时候,要处理的业务
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
*
*
* @ Param message
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + sid + "的信息:" + message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(new WebSocketEntity("back", message));
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @ Param session
* @ Param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
*
*/
public void sendMessage(WebSocketEntity webSocketEntity) throws IOException {
String message = JSON.toJSON(webSocketEntity).toString();
this.session.getBasicRemote().sendText(message);
}
public static void sendInfo(String message, String type) {
log.info("推送消息到窗口" + type + ",推送内容:" + message);
for (WebSocketServer item : webSocketSet) {
try {
if (type == null) {
item.sendMessage(new WebSocketEntity("sid", message));
} else {
item.sendMessage(new WebSocketEntity(type, message));
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
public static CopyOnWriteArraySet<WebSocketServer> getWebSocketSet() {
return webSocketSet;
}
}

@ -8,4 +8,9 @@ public class ConstantStatus {
public static final String SYNC_BUS_ORDER = "AutoUploadBusOrder";
public static final String SYNC_DI_PRODUCTS = "AutoDownloadDiProducts";
}

@ -0,0 +1,7 @@
package com.glxp.sale.admin.constant;
public interface SocketMsgType {
String DL_ALL_DATA = "DL_ALL_DATA";
}

@ -1,53 +0,0 @@
package com.glxp.sale.admin.constant;
public interface TypeConstant {
String TYPE_PUT_PRODUCT = "ProduceWareHouseIn"; //生产入库
String TYPE_PUT_PURCHASE = "PurchaseWareHouseIn"; //采购入库
String TYPE_PUT_RETURN = "ReturnWareHouseIn"; //退货入库
String TYPE_PUT_ALLOT = "AllocateWareHouseIn"; //调拨入库
String TYPE_OUT_SALE = "SalesWareHouseOut"; //销售出库
String TYPE_OUT_RETURN = "ReturnWareHouseOut"; //退货出库
String TYPE_OUT_ALLOT = "AllocateWareHouseOut"; //调拨出库
String TYPE_OUT_DESTORY = "DestoryWareHouseOut"; //销毁出库
String TYPE_OUT_STRAIGHT = "DirectAllocateWareHouseOut"; //直调出库
String TYPE_OUT_REWORK = "ReworkWareHouseOut"; //返工出库
String TYPE_OUT_CHECK = "CheckWareHouseOut"; //盘点 抽检出库
String TYPE_CODE_REPLACE = "CodeReplace"; //码替换
String TYPE_CODE_DESTORY = "CodeDestory"; //码注销
String TYPE_STOCK_CHECK = "StockCheck"; //盘点
String TYPE_PUT = "WareHouseIn"; //出库
String TYPE_OUT = "WareHouseOut"; //入库
String SALE_OUT = "321"; //零售出库 (321)
String RETURNN_IN = "103"; //退货入库
String PURCHASE_IN = "102"; //采购入库
String RETURN_OUT = "202"; //退货出库
String SUPPLY_IN = "107"; //供应入库
String SUPPLY_OUT = "209"; //供应出库
String DESTORY_OUT = "205"; //销毁出库
String CHECK_OUT = "206"; //抽检出库
String VACCINE = "322"; //疫苗接种 (322)
//状态
//1.未校验已校验校验异常process,success,error
String CHECKED = "checked";//已校验
String UP_NOT_UPLOAD = "up_not_upload";// 上游未上传
String NOT_STOCK = "not_stock"; //无码库存
String UN_UPLOAD = "un_upload"; //未上传
String UPLOADED = "uploaded"; //已上传
String UPLOAD_FAIL = "upload_fail"; //上传失败
String DEAL_FAIL = "deal_fail"; //上传处理异常
String DEAL_SUCCESS = "deal_success"; //上传处理成功
String DL_FAIL = "dl_fail";//下载异常
String DOWNLOADED = "downloaded";//已完成
String FINISHED = "finished";//已完成
String REMOVE = "remove";
}

@ -0,0 +1,19 @@
package com.glxp.sale.admin.entity.sync;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* websocket
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SocketMsgEntity {
private String type;
private String content;
private String remark;
}

@ -0,0 +1,20 @@
package com.glxp.sale.admin.socket.client;
import com.glxp.sale.admin.socket.server.SpsSyncWebSocket;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class SocketMsgService {
@Value("${SPMS_WEBSOCKET_TOKEN}")
String token;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
public void sendNoticeMsg(String message) {
spsSyncWebSocket.sendMessage(message, "2:" + token);
}
}

@ -0,0 +1,113 @@
package com.glxp.sale.admin.socket.client;
import com.alibaba.fastjson.JSONObject;
import com.glxp.sale.admin.constant.SocketMsgType;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
public class SpsWebSocketClient extends WebSocketClient {
private String excptMessage;
Logger log = LoggerFactory.getLogger(SpsWebSocketClient.class);
private static List<SpsWebSocketClient> list = new ArrayList<>();
@Resource
SocketMsgService socketMsgService;
public void setSocketMsgService(SocketMsgService socketMsgService) {
this.socketMsgService = socketMsgService;
}
public SpsWebSocketClient(String serverUri) throws URISyntaxException {
super(new URI(serverUri));
this.setConnectionLostTimeout(0);
if (list.isEmpty()) {
return;
}
for (SpsWebSocketClient client : list) {
client.close();
}
list.clear();
list.add(this);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("在线日志socket连接成功");
}
@Override
public void onMessage(String s) {
log.info("收到消息:" + s);
//收到更新下载数据则赋值由外部处理,多个任务只处理一次避免重复处理
if (s.contains(SocketMsgType.DL_ALL_DATA)) {
this.excptMessage = s;
} else {
//通知类消息则需转发
socketMsgService.sendNoticeMsg(s);
}
}
@Override
public void onClose(int i, String s, boolean b) {
log.info("在线日志socket断开");
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
send("---pong---");
}
@Deprecated
public static void destroy() {
if (list.isEmpty())
return;
for (SpsWebSocketClient client : list) {
client.close();
}
list.clear();
}
//发送消息
public void sendMessage(String message) {
this.send(message);
System.out.println("已发送消息:" + message);
}
//获取接收到的信息
public String getExcptMessage() {
if (excptMessage != null) {
String message = new String(excptMessage);
excptMessage = null;
return message;
}
return null;
}
}

@ -0,0 +1,83 @@
package com.glxp.sale.admin.socket.client;
import com.glxp.sale.admin.constant.SocketMsgType;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.thread.HeartTask;
import com.glxp.sale.admin.thread.HeartTaskService;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.enums.ReadyState;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.URISyntaxException;
@Slf4j
@Component
public class WebSocketComponent implements CommandLineRunner {
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
HeartTaskService heartTaskService;
@Resource
SocketMsgService socketMsgService;
@Override
public void run(String... args) throws Exception {
String ip = systemParamConfigService.selectValueByParamKey("upper_server_ip");
ip = ip.replace("http://", "");
try {
SpsWebSocketClient client = new SpsWebSocketClient("ws://" + ip + "/spms/sync/1/" + socketToken);
client.setSocketMsgService(socketMsgService);
initConnect(client);
//等待WebSocket服务端响应
String message = null;
while (true) {
while ((message = client.getExcptMessage()) == null) {
log.info("已连接,等待接收数据--------");
Thread.sleep(1000);
if (client.isClosed()) {
initConnect(client);
}
}
if (message.contains(SocketMsgType.DL_ALL_DATA)) {
heartTaskService.dlAllData();
}
//打印服务端返回的数据
log.info("成功获取数据:" + message);
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void initConnect(SpsWebSocketClient client) {
if (client == null && client.isOpen()) {
log.info("WebSocket已连接不需要重连");
return;
}
log.info("重新建立连接");
client.connect();
while (!client.getReadyState().equals(ReadyState.OPEN)) {
log.info("连接中···请稍后");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (client.getReadyState().equals(ReadyState.OPEN)) {
return;
} else if (client.getReadyState().equals(ReadyState.CLOSED)) {
client.reconnect();
}
}
}
}

@ -0,0 +1,164 @@
package com.glxp.sale.admin.socket.server;
import cn.hutool.json.JSONUtil;
import com.glxp.sale.admin.entity.sync.SocketMsgEntity;
import com.glxp.sale.admin.util.ByteArraySplitter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint(value = "/sps/web/sync/{type}/{token}")
@Slf4j
public class SpsSyncWebSocket {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
private static int onlineCount = 0;
private static ConcurrentHashMap<String, SpsSyncWebSocket> socketServersMap = new ConcurrentHashMap<>();
private String key = "";
/**
*
*
* @param session
* @param type typeregister/verify
*/
@OnOpen
public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) {
this.session = session;
key = type + ":" + token + ":" + session.getId();
log.info("onOpen->>key:{}", key);
boolean flag = socketServersMap.containsKey(key);
if (flag) {
socketServersMap.remove(key);
socketServersMap.put(key, this);
} else {
socketServersMap.put(key, this);
addOnlineCount();
}
// if (StrUtil.isEmpty(token) || !token.equals(socketToken)) {
// throw new ServiceException("token验证异常");
// }
log.info("online number:{}", getOnlineCount());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId());
}
@OnClose
public void onClose() {
log.info("onClose : {}" + key);
if (socketServersMap.containsKey(key)) {
socketServersMap.remove(key);
subOnlineCount(); //在线数减1
}
log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount());
}
@OnError
public void onError(Throwable error) {
socketServersMap.remove(key);
subOnlineCount();
log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage());
}
/**
* @param message
* @param sessionId token
*/
public synchronized void sendMessage(String message, String sessionId) {
synchronized (this.getClass()) {
for (Map.Entry<String, SpsSyncWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) {
try {
String key = stringMyWebSocketEntry.getKey();
SpsSyncWebSocket value = stringMyWebSocketEntry.getValue();
if (key.contains(sessionId)) {
log.info("推送的消息为:" + key);
value.session.getBasicRemote().sendText(message);
// List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024);
// for (byte[] bytes : result) {
// String s = new String(bytes);
// value.session.getBasicRemote().sendText(s);
// }
// 推送结束符
value.session.getBasicRemote().sendText("#end#");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @param socketMsgEntity ,
* @param sessionId token
*/
public synchronized void sendMessage(SocketMsgEntity socketMsgEntity, String sessionId) {
synchronized (this.getClass()) {
for (Map.Entry<String, SpsSyncWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) {
try {
String key = stringMyWebSocketEntry.getKey();
SpsSyncWebSocket value = stringMyWebSocketEntry.getValue();
if (key.contains(sessionId)) {
log.info("推送的消息为:" + key);
value.session.getBasicRemote().sendText(JSONUtil.toJsonStr(socketMsgEntity));
// List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024);
// for (byte[] bytes : result) {
// String s = new String(bytes);
// value.session.getBasicRemote().sendText(s);
// }
// 推送结束符
value.session.getBasicRemote().sendText("#end#");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 线
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 线+1
*/
public static synchronized void addOnlineCount() {
SpsSyncWebSocket.onlineCount++;
}
/**
* 线-1
*/
public static synchronized void subOnlineCount() {
SpsSyncWebSocket.onlineCount--;
}
}

@ -1,7 +1,6 @@
package com.glxp.sale.admin.thread;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
@ -13,30 +12,30 @@ import com.glxp.sale.admin.dto.RelaySyncDto;
import com.glxp.sale.admin.entity.param.SystemParamConfigEntity;
import com.glxp.sale.admin.entity.sync.BasicExportStatusEntity;
import com.glxp.sale.admin.entity.sync.ProductInfoEntity;
import com.glxp.sale.admin.entity.sync.SocketMsgEntity;
import com.glxp.sale.admin.entity.sync.UdiCompanyEntity;
import com.glxp.sale.admin.http.SpGetHttp;
import com.glxp.sale.admin.idc.service.IdcService;
import com.glxp.sale.admin.req.sync.BasicExportStatusRequest;
import com.glxp.sale.admin.req.sync.SpsSyncDataRequest;
import com.glxp.sale.admin.res.sync.*;
import com.glxp.sale.admin.res.sync.SpSyncUdiResponse;
import com.glxp.sale.admin.res.sync.SpsSyncDataResponse;
import com.glxp.sale.admin.res.sync.SpsSyncScheduleResponse;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.service.sync.BasicExportService;
import com.glxp.sale.admin.socket.server.SpsSyncWebSocket;
import com.glxp.sale.admin.thread.didl.AsyncDiDlHelper;
import com.glxp.sale.admin.util.*;
import com.glxp.sale.common.enums.ResultEnum;
import com.glxp.sale.common.res.BaseResponse;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.awt.*;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -501,6 +500,7 @@ public class DlAllDataService {
spGetHttp.postUpdateBasicStatus(basicExportStatusEntity1);
spGetHttp.updateLastTime("AutoDownloadDiProducts", DateUtil.formatDate(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
} else {
SpsSyncDataRequest spsSyncDataRequest = new SpsSyncDataRequest();
spsSyncDataRequest.setId(basicExportStatusEntity.getId());

@ -31,17 +31,13 @@ public class HeartTask implements SchedulingConfigurer {
@Resource
protected ScheduledDao scheduledDao;
@Resource
HeartTaskService heartTaskService;
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
RedisUtil redisUtil;
@Resource
ScanUploadService scanUploadService;
@Resource
ScanDownloadService scanDownloadService;
@Resource
DlAllDataService dlAllDataService;
@Resource
AsyncDiDlService asyncDiDlService;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
@ -62,71 +58,6 @@ public class HeartTask implements SchedulingConfigurer {
private void process() {
logger.info("------------心跳任务-----------------");
dlData();
}
//定时从上游下载数据
private void dlData() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (upConnect != null && upConnect.getParamValue().equals("1")) {
dlAllData();
scanUpload();
}
SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (donwConnect != null && donwConnect.getParamValue().equals("1")) {
scanDonwload();
}
}
private void scanUpload() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
//定时扫描
Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime);
scanUploadService.scanAllDatas();
scanUploadService.scanAllBus();
scanUploadService.scanAllOrders();
scanUploadService.scanAllSchedule();
}
}
private void scanDonwload() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (upConnect.getParamValue().equals("1")) {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
scanDownloadService.scanAllData();
scanDownloadService.scanAllBus();
scanDownloadService.scanAllOrder();
scanDownloadService.scanScheduleList();
scanDownloadService.scanUdis();
}
}
}
private void dlAllData() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 6 * 1000;
long curTime = System.currentTimeMillis();
@ -140,25 +71,12 @@ public class HeartTask implements SchedulingConfigurer {
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
logger.info("定时从上游下载全部据-----");
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
String doing = (String) redisUtil.get("is_doing_download");
if (doing == null || doing.equals("false")) {
redisUtil.set("is_doing_download", "true", 60);
// dlAllDataService.dllNewAllData();
dlAllDataService.dllNewAllOrder();
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
dlAllDataService.pullData(i);
});
// dlAllDataService.dllNewAllBusType();
// dlAllDataService.dlScheduleStatus();
dlAllDataService.dlAllDi();
redisUtil.set("is_doing_download", "false");
}
logger.info("定时从上游下载全部据-----结束");
heartTaskService.dlAllData();
} else {
logger.info("时间未到");
}
}
}

@ -0,0 +1,118 @@
package com.glxp.sale.admin.thread;
import com.glxp.sale.admin.constant.BasicExportTypeEnum;
import com.glxp.sale.admin.constant.SocketMsgType;
import com.glxp.sale.admin.entity.param.SystemParamConfigEntity;
import com.glxp.sale.admin.entity.sync.SocketMsgEntity;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.socket.server.SpsSyncWebSocket;
import com.glxp.sale.admin.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
@Service
public class HeartTaskService {
final Logger logger = LoggerFactory.getLogger(HeartTask.class);
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
RedisUtil redisUtil;
@Resource
ScanUploadService scanUploadService;
@Resource
ScanDownloadService scanDownloadService;
@Resource
DlAllDataService dlAllDataService;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
//定时从上游下载数据
public void dlData() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (upConnect != null && upConnect.getParamValue().equals("1")) {
dlAllData();
scanUpload();
}
SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (donwConnect != null && donwConnect.getParamValue().equals("1")) {
scanDonwload();
}
}
private void scanUpload() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
//定时扫描
Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime);
scanUploadService.scanAllDatas();
scanUploadService.scanAllBus();
scanUploadService.scanAllOrders();
scanUploadService.scanAllSchedule();
}
}
private void scanDonwload() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (upConnect.getParamValue().equals("1")) {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
scanDownloadService.scanAllData();
scanDownloadService.scanAllBus();
scanDownloadService.scanAllOrder();
scanDownloadService.scanScheduleList();
scanDownloadService.scanUdis();
}
}
}
public void dlUpAllData() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (upConnect != null && upConnect.getParamValue().equals("1")) {
dlAllData();
}
}
public void dlAllData() {
logger.info("定时从上游下载全部据-----");
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", System.currentTimeMillis());
String doing = (String) redisUtil.get("is_doing_download");
if (doing == null || doing.equals("false")) {
redisUtil.set("is_doing_download", "true", 60);
dlAllDataService.dllNewAllOrder();
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
dlAllDataService.pullData(i);
});
dlAllDataService.dlAllDi();
redisUtil.set("is_doing_download", "false");
spsSyncWebSocket.sendMessage(SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build(), "2:" + socketToken);
}
logger.info("定时从上游下载全部据-----结束");
}
}

@ -0,0 +1,48 @@
package com.glxp.sale.admin.util;
import cn.hutool.core.util.ArrayUtil;
import com.beust.jcommander.internal.Lists;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
public class ByteArraySplitter {
/**
* StringList<byte[]>
*
* @param source
* @param size
* @return
*/
public static List<byte[]> split(String source, int size) {
// 存放最终结果
List<byte[]> result = Lists.newArrayList();
if (StringUtils.isEmpty(source)) {
return null;
}
byte[] sourceBytes = source.getBytes();
if (size > sourceBytes.length) {
result.add(sourceBytes);
return result;
}
// 开始进行split
int startIndex = 0;
int endIndex = sourceBytes.length - 1;
boolean isRunning = true;
while (isRunning) {
if ((endIndex + 1) - startIndex > size) {
result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size));
startIndex += size;
} else {
result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1));
isRunning = false;
}
}
return result;
}
}

@ -35,8 +35,8 @@ UDIWMS_IP=http://127.0.0.1:9991
#自助平台地址
SPMS_IP=http://127.0.0.1:9906
#SPMS_IP=http://139.9.178.73:8080/SPMS_SERVER
API_KEY=1101
API_SECRET=zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
API_KEY:1101
API_SECRET:zBITspLNvuoEd4FaamlSoqxRHmNsmQ6L
#SPMS_IP=http://139.159.187.130:8080/SPMS_SERVER
##端口号
# Redis数据库索引默认为0
@ -57,3 +57,4 @@ spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.jedis.timeout=300
SPMS_WEBSOCKET_TOKEN=07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -57,3 +57,4 @@ spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.jedis.timeout=300
SPMS_WEBSOCKET_TOKEN=07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

Loading…
Cancel
Save