Compare commits

...

23 Commits
test ... dev

Author SHA1 Message Date
chenhc e5c8c2aa44 Merge remote-tracking branch 'origin/dev' into dev 2 days ago
chenhc 6b26bd24f5 feat: 更新线程 2 days ago
yewj a137584591 定时任务优化 4 days ago
chenhc c050d6f73f feat: 图片同步 1 week ago
chenhc 210186d002 feat: 图片同步 1 week ago
wangwei c75f0ebee8 3/24 读取文件优化1.0 2 weeks ago
chenhc 21c9c62cf4 feat: UDI管理系统转至国家库平台 2 months ago
chenhc afd111fb68 feat: 增加阿里健康平台转发 2 months ago
chenhc 395ebd37e8 feat: 增加阿里健康平台转发 2 months ago
chenhc 1fcad6c88f feat: 增加阿里健康平台转发 2 months ago
chenhc 23ae9fabe8 feat: 药品关联关系查重功能 5 months ago
chenhc 8084a9bde6 feat: 多码融合转发 5 months ago
chenhc cdf33c8990 feat: 多码融合转发 5 months ago
chenhc f8c948eee4 feat: 多码融合转发 5 months ago
chenhc e5cd13ff4f feat: 药品关联信息上下传输功能 6 months ago
chenhc a91d50021f feat: 药品关联关系功能 6 months ago
anthonywj 4d2ec8c834 空数据不写入问题 8 months ago
wangwei 2336637294 6/21 同步优化增加枚举以及汇总日报 10 months ago
anthonywj 20d0da6075 消息推送错误结束字符串 2 years ago
anthonywj 60887e6f46 删除任务同步相关调整(后续还需调整) 2 years ago
anthonywj b4dbd61563 转发消息类推送问题 2 years ago
wangwei e263c8febd 同步删除基础数据 2 years ago
anthonywj ff58a80153 新增websocket通信(中继服务同步相关) 2 years ago

@ -96,11 +96,11 @@
<!--<version>1.0</version>-->
<!--</dependency>-->
<!--mybatis -->
<!-- <dependency>-->
<!-- <groupId>org.mybatis.spring.boot</groupId>-->
<!-- <artifactId>mybatis-spring-boot-starter</artifactId>-->
<!-- <version>1.3.2</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.mybatis.spring.boot</groupId>-->
<!-- <artifactId>mybatis-spring-boot-starter</artifactId>-->
<!-- <version>1.3.2</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.itfsw</groupId>
<artifactId>mybatis-generator-plugin</artifactId>
@ -295,10 +295,10 @@
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.1</version>
</dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.1</version>
</dependency>
<!--mybatis plus -->
<dependency>
@ -307,6 +307,18 @@
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- https://github.com/TooTallNate/Java-WebSocket-->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.4</version>
</dependency>
</dependencies>
<build>

@ -1,7 +1,9 @@
package com.glxp.sale.admin.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -10,29 +12,65 @@ import java.util.concurrent.ThreadPoolExecutor;
/**
* 线
*/
@Configuration
@EnableAsync
@Configuration
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
public class TaskPoolConfig {
// @Bean(name = "taskExecutor")
// @Primary
// public ThreadPoolTaskExecutor taskExecutor() {
// // 获取当前主机的cpu核心数
// int threadCount = Runtime.getRuntime().availableProcessors();
// ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// //核心池的大小
// taskExecutor.setCorePoolSize(threadCount);
// //线程池最大线程数
// taskExecutor.setMaxPoolSize(threadCount * 2);
// //队列最大长度
// taskExecutor.setQueueCapacity(200);
// //线程空闲时间
// taskExecutor.setKeepAliveSeconds(60);
// //配置线程前缀
// taskExecutor.setThreadNamePrefix("custom_executor");
// //配置拒绝策略
// taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// //执行初始化
// taskExecutor.initialize();
// return taskExecutor;
// }
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
// 获取当前主机的cpu核心数
int threadCount = Runtime.getRuntime().availableProcessors();
@Primary
public ThreadPoolTaskExecutor taskExecutor(ThreadPoolConfigProperties properties) {
// 动态获取CPU核心数
final int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心池的大小
taskExecutor.setCorePoolSize(threadCount);
//线程池最大线程数
taskExecutor.setMaxPoolSize(threadCount * 2);
//队列最大长度
taskExecutor.setQueueCapacity(200);
//线程空闲时间
taskExecutor.setKeepAliveSeconds(60);
//配置线程前缀
taskExecutor.setThreadNamePrefix("custom_executor");
//配置拒绝策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
// 核心线程数配置优先默认CPU核心数
taskExecutor.setCorePoolSize(
properties.getCorePoolSize() != null ?
properties.getCorePoolSize() : cpuCores
);
// 最大线程数配置优先默认CPU核心数*2
taskExecutor.setMaxPoolSize(
properties.getMaxPoolSize() != null ?
properties.getMaxPoolSize() : cpuCores * 2
);
// 其他固定配置项
taskExecutor.setQueueCapacity(properties.getQueueCapacity());
taskExecutor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
taskExecutor.setThreadNamePrefix(properties.getThreadNamePrefix());
// 拒绝策略保持原样
taskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
taskExecutor.initialize();
return taskExecutor;
}
}

@ -0,0 +1,36 @@
package com.glxp.sale.admin.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
// 1. 创建配置类建议放在config包下
@ConfigurationProperties(prefix = "thread.pool")
@Data
public class ThreadPoolConfigProperties {
/**
* 线CPU
*/
private Integer corePoolSize;
/**
* 线CPU*2
*/
private Integer maxPoolSize;
/**
* 200
*/
private int queueCapacity = 200;
/**
* 线60
*/
private int keepAliveSeconds = 60;
/**
* 线"custom_executor_"
*/
private String threadNamePrefix = "custom_executor_";
}

@ -0,0 +1,12 @@
package com.glxp.sale.admin.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

@ -1,125 +0,0 @@
package com.glxp.sale.admin.config;
import com.alibaba.fastjson.JSON;
import com.glxp.sale.admin.entity.info.WebSocketEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@Slf4j
@Service
@ServerEndpoint("/api/websocket/{sid}")
public class WebSocketServer {
private static int onlineCount = 0;
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
private Session session;
private String sid = "";
/**
*
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
webSocketSet.add(this); //加入set中
this.sid = sid;
addOnlineCount(); //在线数加1
try {
sendMessage(new WebSocketEntity("sys", "连接成功"));
log.info("有新窗口开始监听:" + sid + ",当前在线人数为:" + getOnlineCount());
} catch (IOException e) {
log.error("websocket IO Exception");
}
}
/**
*
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
//断开连接情况下,更新主板占用情况为释放
log.info("释放的sid为" + sid);
//这里写你 释放的时候,要处理的业务
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
*
*
* @ Param message
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + sid + "的信息:" + message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(new WebSocketEntity("back", message));
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @ Param session
* @ Param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
*
*/
public void sendMessage(WebSocketEntity webSocketEntity) throws IOException {
String message = JSON.toJSON(webSocketEntity).toString();
this.session.getBasicRemote().sendText(message);
}
public static void sendInfo(String message, String type) {
log.info("推送消息到窗口" + type + ",推送内容:" + message);
for (WebSocketServer item : webSocketSet) {
try {
if (type == null) {
item.sendMessage(new WebSocketEntity("sid", message));
} else {
item.sendMessage(new WebSocketEntity(type, message));
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
public static CopyOnWriteArraySet<WebSocketServer> getWebSocketSet() {
return webSocketSet;
}
}

@ -52,6 +52,25 @@ public enum BasicExportTypeEnum {
*/
IO_ORDER("io_order", "单据信息"),
/**
*
*
*/
PRODUCE_BUSINESS_DATA("thr_manufacturer", "生产企业数据"),
DEVICE_INFO_DATA("device_info", "设备信息数据"),
DEVICE_CHECK_DATA("device_check", "巡检管理数据"),
DEVICE_REPAIR_DATA("device_repair_apply", "报修管理数据"),
DEVICE_UPKEEP_DATA("device__upkeep", "设备保养数据"),
/**
*
*/
DEVICE_TASK("device_task", "设备任务"),
/**
*
*/
DRUG_DATA_TASK("drug_data_task", "药品关联信息任"),
;
private String key;

@ -8,4 +8,9 @@ public class ConstantStatus {
public static final String SYNC_BUS_ORDER = "AutoUploadBusOrder";
public static final String SYNC_DI_PRODUCTS = "AutoDownloadDiProducts";
}

@ -0,0 +1,15 @@
package com.glxp.sale.admin.constant;
public interface SocketMsgType {
String DL_ALL_DATA = "DL_ALL_DATA";
String BASIC_DATA_DELETE = "BASIC_DATA_DELETE";
String BASIC_MANAGE_DELETE = "BASIC_MANAGE_DELETE";
String BASIC_CORP_MAINTAIN_DELETE = "BASIC_CORP_MAINTAIN_DELETE"; //供应商字典
String BASIC_BUSINESS_TYPE_DELETE = "BASIC_BUSINESS_TYPE_DELETE"; //单据类型
String STAT_DATA_REQUEST = "STAT_DATA_REQUEST"; //汇总日报
String STAT_DATA = "STAT_DATA"; //
}

@ -1,53 +0,0 @@
package com.glxp.sale.admin.constant;
public interface TypeConstant {
String TYPE_PUT_PRODUCT = "ProduceWareHouseIn"; //生产入库
String TYPE_PUT_PURCHASE = "PurchaseWareHouseIn"; //采购入库
String TYPE_PUT_RETURN = "ReturnWareHouseIn"; //退货入库
String TYPE_PUT_ALLOT = "AllocateWareHouseIn"; //调拨入库
String TYPE_OUT_SALE = "SalesWareHouseOut"; //销售出库
String TYPE_OUT_RETURN = "ReturnWareHouseOut"; //退货出库
String TYPE_OUT_ALLOT = "AllocateWareHouseOut"; //调拨出库
String TYPE_OUT_DESTORY = "DestoryWareHouseOut"; //销毁出库
String TYPE_OUT_STRAIGHT = "DirectAllocateWareHouseOut"; //直调出库
String TYPE_OUT_REWORK = "ReworkWareHouseOut"; //返工出库
String TYPE_OUT_CHECK = "CheckWareHouseOut"; //盘点 抽检出库
String TYPE_CODE_REPLACE = "CodeReplace"; //码替换
String TYPE_CODE_DESTORY = "CodeDestory"; //码注销
String TYPE_STOCK_CHECK = "StockCheck"; //盘点
String TYPE_PUT = "WareHouseIn"; //出库
String TYPE_OUT = "WareHouseOut"; //入库
String SALE_OUT = "321"; //零售出库 (321)
String RETURNN_IN = "103"; //退货入库
String PURCHASE_IN = "102"; //采购入库
String RETURN_OUT = "202"; //退货出库
String SUPPLY_IN = "107"; //供应入库
String SUPPLY_OUT = "209"; //供应出库
String DESTORY_OUT = "205"; //销毁出库
String CHECK_OUT = "206"; //抽检出库
String VACCINE = "322"; //疫苗接种 (322)
//状态
//1.未校验已校验校验异常process,success,error
String CHECKED = "checked";//已校验
String UP_NOT_UPLOAD = "up_not_upload";// 上游未上传
String NOT_STOCK = "not_stock"; //无码库存
String UN_UPLOAD = "un_upload"; //未上传
String UPLOADED = "uploaded"; //已上传
String UPLOAD_FAIL = "upload_fail"; //上传失败
String DEAL_FAIL = "deal_fail"; //上传处理异常
String DEAL_SUCCESS = "deal_success"; //上传处理成功
String DL_FAIL = "dl_fail";//下载异常
String DOWNLOADED = "downloaded";//已完成
String FINISHED = "finished";//已完成
String REMOVE = "remove";
}

@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.glxp.sale.admin.annotation.AuthRuleAnnotation;
import com.glxp.sale.admin.entity.sync.CodeRel;
import com.glxp.sale.admin.entity.sync.PostOrderRequest;
import com.glxp.sale.admin.http.UHttpClient;
import com.glxp.sale.admin.req.sync.OrderFilterRequest;
@ -12,13 +13,11 @@ import com.glxp.sale.common.res.BaseResponse;
import com.glxp.sale.common.util.ResultVOUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -141,4 +140,48 @@ public class DirectConnectController {
}
@GetMapping("/udiwms/relCode/detailByParentCode")
public BaseResponse detailByParentCode(@RequestParam("parentCode") String parentCode) {
Map<String, Object> paramMap = new HashMap<>(2);
paramMap.put("parentCode", parentCode);
String response = UHttpClient.mipsGetHead(getSpmsUrl() + "/udiwms/relCode/detailByParentCode", paramMap, getHeaders());
try {
BaseResponse data = JSONObject.parseObject(response, new TypeReference<BaseResponse>() {
});
return data;
} catch (Exception e) {
e.printStackTrace();
return ResultVOUtils.error(500, "连接失败!");
}
}
@PostMapping("/spms/udchs/codeRe/list")
public BaseResponse getCodeRelDetail(@RequestBody CodeRel codeRel) {
String result = UHttpClient.postJson(getSpmsUrl() + "/spms/udchs/codeRe/list", codeRel, getHeaders());
BaseResponse<List<CodeRel>> response =
JSONObject.parseObject(result, new TypeReference<BaseResponse<List<CodeRel>>>() {
});
return response;
}
@PostMapping("/udiwms/relCode/checkCode")
public BaseResponse checkCode(@RequestBody List<String> codeList) {
String result = UHttpClient.postJson(getSpmsUrl() + "/udiwms/relCode/checkCode", codeList, getHeaders());
BaseResponse<List> response =
JSONObject.parseObject(result, new TypeReference<BaseResponse<List>>() {
});
return response;
}
@PostMapping("/spms/udchs/codeRe/update")
public BaseResponse updateCodeRelDetail(@RequestBody CodeRel codeRel) {
String result = UHttpClient.postJson(getSpmsUrl() + "/spms/udchs/codeRe/update", codeRel, getHeaders());
BaseResponse response =
JSONObject.parseObject(result, new TypeReference<BaseResponse>() {
});
return response;
}
}

@ -40,6 +40,7 @@ import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
@ -268,19 +269,15 @@ public class SpsSyncDataController {
private String readDataFile(String cacheFilePath) {
// 读取文件数据
try {
FileReader reader = new FileReader(cacheFilePath);
StringBuilder str = new StringBuilder();
int data;
while ((data = reader.read()) != -1) {
str.append((char) data);
try (BufferedReader reader = new BufferedReader(new FileReader(cacheFilePath))) {
StringBuilder str = new StringBuilder(1024);
String line;
while ((line = reader.readLine()) != null) {
str.append(line);
}
reader.close();
return str.toString();
} catch (FileNotFoundException e) {
throw new RuntimeException("系统异常,未找到对应数据文件");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (IOException e) { // 3. 合并异常处理
throw new RuntimeException("文件读取失败: " + e.getMessage(), e);
}
}

@ -2,7 +2,9 @@ package com.glxp.sale.admin.controller.sync;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONException;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.glxp.sale.admin.constant.BasicProcessStatus;
@ -21,9 +23,7 @@ import com.glxp.sale.common.util.ResultVOUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.*;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@ -37,8 +37,14 @@ import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.PushBuilder;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
@ -173,6 +179,49 @@ public class SyncController {
return jsonObject;
}
//UDI管理系统转至阿里健康api平台
@RequestMapping("/syncToAliapi/**")
public Object syncToAliapi(HttpServletRequest request, HttpServletResponse httpServletResponse) {
String uri = uriUtils.parseUri(request.getRequestURL().toString());
String isTopService = systemParamConfigService.selectValueByParamKey("is_top_aliService");
if (StrUtil.isNotEmpty(isTopService) && isTopService.equals("1")) {
uri = uri.replace("syncToAliapi/", "");
uri = uri.replace("syncToAliapi", "");
}
String upperServerHost = systemParamConfigService.selectValueByParamKey("aliApi_server_ip");
if (StrUtil.isNotEmpty(upperServerHost)) {
if (StrUtil.isNotEmpty(uri)){
uri = upperServerHost + "/" + uri;
}else {
uri = upperServerHost;
}
}
JSONObject jsonObject = redirectAliApi(request, uri);
return jsonObject;
}
//UDI管理系统转至国家库平台
@RequestMapping("/syncToUdiDL/**")
public Object syncToUdiDL(HttpServletRequest request, HttpServletResponse httpServletResponse) {
String uri = uriUtils.parseUri(request.getRequestURL().toString());
String isTopService = systemParamConfigService.selectValueByParamKey("is_udi_dl_service");
if (StrUtil.isNotEmpty(isTopService) && isTopService.equals("1")) {
uri = uri.replace("syncToUdiDL/", "");
uri = uri.replace("syncToUdiDL", "");
}
String upperServerHost = systemParamConfigService.selectValueByParamKey("udi_dl_server_ip");
if (StrUtil.isNotEmpty(upperServerHost)) {
uri = upperServerHost + "/" + uri;
}
JSONObject jsonObject = redirectNolog(request, uri);
return jsonObject;
}
//UDI管理系统同步数据中转
@RequestMapping("/directToSpms/**")
public Object directToSpms(HttpServletRequest request, HttpServletResponse httpServletResponse) {
@ -229,6 +278,41 @@ public class SyncController {
return result;
}
private JSONObject redirectAliApi(HttpServletRequest request, String uri) {
String queryString = request.getQueryString();
Map<String, Object> queryMap = null;
try {
queryMap = parseQueryString(queryString);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if (request.getMethod().equals("POST")) {
String json = HttpUtil.post(uri,queryMap);
return JSONObject.parseObject(json);
} else if (request.getMethod().equals("GET")) {
String json = HttpUtil.get(uri,queryMap);
return JSONObject.parseObject(json);
}
return new JSONObject();
}
public static Map<String, Object> parseQueryString(String queryString) throws URISyntaxException, UnsupportedEncodingException {
Map<String, Object> queryParams = new HashMap<>();
String[] pairs = queryString.split("&");
for (String pair : pairs) {
int idx = pair.indexOf("=");
String key = URLDecoder.decode(pair.substring(0, idx), StandardCharsets.UTF_8.toString());
String value = URLDecoder.decode(pair.substring(idx + 1), StandardCharsets.UTF_8.toString());
queryParams.put(key, value);
}
return queryParams;
}
private JSONObject redirect(HttpServletRequest request, String uri, String idDatas, String type) {
RestTemplate restTemplate = new RestTemplate();
HttpEntity<String> httpEntity = uriUtils.buildHeader(request);

@ -0,0 +1,80 @@
package com.glxp.sale.admin.entity.sync;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
@Data
@TableName(value = "code_rel")
public class CodeRel {
@TableId(type = IdType.AUTO)
public Integer id;
/**
*
*/
@TableField(value = "diNameCode")
public String diNameCode;
/**
*
*/
@TableField(value = "drugCode")
public String drugCode;
/**
*
*/
@TableField(value = "ybbm")
public String ybbm;
/**
*
*/
@TableField(value = "ycCode")
public String ycCode;
/**
*
*/
@TableField(value = "sptm")
public String sptm;
/**
*
*/
@TableField(value = "ypbwm")
public String ypbwm;
/**
*
*/
@TableField(value = "tyshxyh")
public String tyshxyh;
/**
* 9
*/
@TableField(value = "gjbm")
public String gjbm;
@TableField(value = "status")
public String status;
@TableField(value = "updateTime")
public Date updateTime;
@TableField(value = "createTime")
public Date createTime;
@TableField(value = "fromType")
public Integer fromType;
}

@ -0,0 +1,21 @@
package com.glxp.sale.admin.entity.sync;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Objects;
/**
* websocket
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SocketMsgEntity {
private String type;
private Object content;
private String remark;
}

@ -52,6 +52,7 @@ public class FileServiceImpl implements FileService {
@Resource
private IdcService idcService;
private String imagePath = "register/file/image2/";
private String devImagePath = "register/file/device/";
@Override
public BaseResponse fileUpload(HttpServletRequest request,Map<String,Object> params) {
String host="";
@ -128,13 +129,13 @@ public class FileServiceImpl implements FileService {
ArrayList<String> saveFiles = new ArrayList<>();
Date startTime = new Date();
if (files != null) {
if(!FileUtils.makeDirectory(filePath + filePathSlash + imagePath))
IDCUtils.createDirectory(filePath + filePathSlash + imagePath);
if(!FileUtils.makeDirectory(filePath + filePathSlash + devImagePath))
IDCUtils.createDirectory(filePath + filePathSlash + devImagePath);
try {
for (MultipartFile file : files) {
String imageName = filePath + filePathSlash + imagePath + file.getOriginalFilename();
String imageName = filePath + filePathSlash + devImagePath + file.getOriginalFilename();
saveFiles.add(imageName);
IDCUtils.writeFile(file.getBytes(), filePath + filePathSlash + imagePath, file.getOriginalFilename());
IDCUtils.writeFile(file.getBytes(), filePath + filePathSlash + devImagePath, file.getOriginalFilename());
}
} catch (IOException e) {
// TODO Auto-generated catch block

@ -86,6 +86,7 @@ public class IdcServiceImpl implements IdcService {
private ScheduledDao scheduledDao;
private String imagePath = "register/file/image2/";
private String devImagePath = "register/file/device/";
/*获取拉取任务列表*/
@Override
@ -627,24 +628,45 @@ public class IdcServiceImpl implements IdcService {
@Override
public void downloadFile(String fileName, HttpServletResponse response) {
OutputStream os;
String filePathSlash = filePath.substring(filePath.length() - 1).equals("/") ? "" : fileName.substring(0, 1).equals("/") ? "" : "/";
String sourceFileName = filePath + filePathSlash + imagePath + fileName;
// String sourceFileName = fileName;
try {
if (FileUtils.isFileExist(sourceFileName)) {
byte[] bytes = FileUtils.readFileByBytes(sourceFileName);
os = response.getOutputStream();
os.write(bytes);
os.flush();
os.close();
String sourceFileName2 = filePath + filePathSlash + devImagePath + fileName;
try (OutputStream os = response.getOutputStream()) {
String targetFile = null;
// 优先级检查文件列表(按优先级顺序)
String[] checkFiles = {sourceFileName2, sourceFileName};
for (String file : checkFiles) {
if (FileUtils.isFileExist(file)) {
targetFile = file;
break;
}
}
if (targetFile != null) {
// 读取文件并写入响应流
byte[] bytes = FileUtils.readFileByBytes(targetFile);
if (bytes != null) {
os.write(bytes);
os.flush(); // 强制刷新缓冲区
} else {
response.setStatus(HttpServletResponse.SC_NOT_FOUND); // 404
logger.error("文件内容为空: {}", targetFile);
}
} else {
logger.error("file not exists:" + sourceFileName);
// 处理文件不存在的情况(示例)
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
os.write("File not found".getBytes());
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); // 500
logger.error("写入响应流失败: ", e);
}
}
private void saveUploadStatus(Map<String, Object> params) {

@ -0,0 +1,34 @@
package com.glxp.sale.admin.socket.client;
import com.glxp.sale.admin.socket.server.SpsSyncWebSocket;
import org.jfree.util.Log;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class SocketMsgService {
@Value("${SPMS_WEBSOCKET_TOKEN}")
String token;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
public void sendNoticeMsg(String message) {
spsSyncWebSocket.sendMessage(message, "2:" + token);
}
public void sendDeleteMsg(String message) {
spsSyncWebSocket.sendMessage(message, "2:" + token);
}
public void sendGetMsg(String message) {
Log.error("发送管理系统的信息{}"+message);
spsSyncWebSocket.sendMessage(message, "2:" + token);
}
}

@ -0,0 +1,127 @@
package com.glxp.sale.admin.socket.client;
import com.glxp.sale.admin.constant.SocketMsgType;;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
public class SpsWebSocketClient extends WebSocketClient {
private String excptMessage;
Logger log = LoggerFactory.getLogger(SpsWebSocketClient.class);
private static List<SpsWebSocketClient> list = new ArrayList<>();
@Resource
SocketMsgService socketMsgService;
public void setSocketMsgService(SocketMsgService socketMsgService) {
this.socketMsgService = socketMsgService;
}
public SpsWebSocketClient(String serverUri) throws URISyntaxException {
super(new URI(serverUri));
this.setConnectionLostTimeout(0);
if (list.isEmpty()) {
return;
}
for (SpsWebSocketClient client : list) {
client.close();
}
list.clear();
list.add(this);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("在线日志socket连接成功");
}
@Override
public synchronized void onMessage(String s) {
log.info("收到消息:" + s);
//收到更新下载数据则赋值由外部处理,多个任务只处理一次避免重复处理
if (s.contains(SocketMsgType.DL_ALL_DATA)) {
this.excptMessage = s;
} else if (s.contains(SocketMsgType.BASIC_DATA_DELETE)) {
socketMsgService.sendDeleteMsg(s);
} else if (s.contains(SocketMsgType.BASIC_MANAGE_DELETE)) {
socketMsgService.sendDeleteMsg(s);
} else if (s.contains(SocketMsgType.BASIC_CORP_MAINTAIN_DELETE)) {
socketMsgService.sendDeleteMsg(s);
} else if (s.contains(SocketMsgType.BASIC_BUSINESS_TYPE_DELETE)) {
socketMsgService.sendDeleteMsg(s);
}else if (s.contains(SocketMsgType.STAT_DATA)) {
socketMsgService.sendGetMsg(s);
} else {
//通知类消息则需转发
socketMsgService.sendNoticeMsg(s);
}
}
@Override
public void onClose(int i, String s, boolean b) {
log.info("在线日志socket断开");
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
send("---pong---");
}
@Deprecated
public static void destroy() {
if (list.isEmpty())
return;
for (SpsWebSocketClient client : list) {
client.close();
}
list.clear();
}
//发送消息
public void sendMessage(String message) {
this.send(message);
System.out.println("已发送消息:" + message);
}
//获取接收到的信息
public String getExcptMessage() {
if (excptMessage != null) {
String message = new String(excptMessage);
excptMessage = null;
return message;
}
return null;
}
}

@ -0,0 +1,102 @@
package com.glxp.sale.admin.socket.client;
import cn.hutool.core.thread.ThreadUtil;
import com.glxp.sale.admin.constant.SocketMsgType;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.thread.HeartTaskService;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.enums.ReadyState;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.URISyntaxException;
@Slf4j
@Component
public class WebSocketComponent implements CommandLineRunner {
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
HeartTaskService heartTaskService;
@Resource
SocketMsgService socketMsgService;
private SpsWebSocketClient client;
@Override
public void run(String... args) throws Exception {
ThreadUtil.execAsync(() -> {
String ip = systemParamConfigService.selectValueByParamKey("upper_server_ip");
ip = ip.replace("http://", "");
try {
SpsWebSocketClient client = new SpsWebSocketClient("ws://" + ip + "/spms/sync/1/" + socketToken);
client.setSocketMsgService(socketMsgService);
initConnect(client);
//等待WebSocket服务端响应
String message = null;
while (true) {
while ((message = client.getExcptMessage()) == null) {
log.info("已连接,等待接收数据--------");
Thread.sleep(1000);
if (client.isClosed()) {
initConnect(client);
}
}
if (message.contains(SocketMsgType.DL_ALL_DATA)) {
heartTaskService.dlAllData();
}
//打印服务端返回的数据
log.info("成功获取数据:" + message);
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
);
}
// 添加一个成员变量来保存 SpsWebSocketClient 实例
// 提供公共方法来访问 SpsWebSocketClient 实例
public SpsWebSocketClient getClient() {
if (client == null) {
log.error("WebSocket client is not initialized");
throw new IllegalStateException("WebSocket client is not initialized");
}
return client;
}
public void initConnect(SpsWebSocketClient client) {
if (client == null && client.isOpen()) {
log.info("WebSocket已连接不需要重连");
return;
}
log.info("重新建立连接");
client.connect();
while (!client.getReadyState().equals(ReadyState.OPEN)) {
log.info("连接中···请稍后");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (client.getReadyState().equals(ReadyState.OPEN)) {
return;
} else if (client.getReadyState().equals(ReadyState.CLOSED)) {
client.reconnect();
}
}
}
}

@ -0,0 +1,182 @@
package com.glxp.sale.admin.socket.server;
import cn.hutool.json.JSONUtil;
import com.glxp.sale.admin.entity.sync.SocketMsgEntity;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.socket.client.SocketMsgService;
import com.glxp.sale.admin.socket.client.SpsWebSocketClient;
import com.glxp.sale.admin.socket.client.WebSocketComponent;
import com.glxp.sale.admin.util.ByteArraySplitter;
import com.glxp.sale.common.res.BaseResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint(value = "/sps/web/sync/{type}/{token}")
@Slf4j
public class SpsSyncWebSocket {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
private static int onlineCount = 0;
private static ConcurrentHashMap<String, SpsSyncWebSocket> socketServersMap = new ConcurrentHashMap<>();
private String key = "";
/**
*
*
* @param session
* @param type typeregister/verify
*/
@OnOpen
public void onOpen(Session session, @PathParam("type") String type, @PathParam("token") String token) {
this.session = session;
key = type + ":" + token + ":" + session.getId();
log.info("onOpen->>key:{}", key);
boolean flag = socketServersMap.containsKey(key);
if (flag) {
socketServersMap.remove(key);
socketServersMap.put(key, this);
} else {
socketServersMap.put(key, this);
addOnlineCount();
}
// if (StrUtil.isEmpty(token) || !token.equals(socketToken)) {
// throw new ServiceException("token验证异常");
// }
log.info("online number:{}", getOnlineCount());
}
@Autowired
WebSocketComponent webSocketComponent;
@OnMessage
public void onMessage(String message, Session session) {
//收到管理系统响应的信息
log.info("来自客户端的消息->>message:{};sessionId:{}", message, session.getId());
//TODO 已经接受到消息 准备转发给spms 自助平台
}
@OnClose
public void onClose() {
log.info("onClose : {}" + key);
if (socketServersMap.containsKey(key)) {
socketServersMap.remove(key);
subOnlineCount(); //在线数减1
}
log.info("有一连接关闭!当前在线连接数为: {}", getOnlineCount());
}
@OnError
public void onError(Throwable error) {
socketServersMap.remove(key);
subOnlineCount();
log.error("webSocket连接发生错误->>errorMessage:{}", error.getMessage());
}
/**
* @param message
* @param sessionId token
*/
public synchronized void
sendMessage(String message, String sessionId) {
synchronized (this.getClass()) {
for (Map.Entry<String, SpsSyncWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) {
try {
String key = stringMyWebSocketEntry.getKey();
SpsSyncWebSocket value = stringMyWebSocketEntry.getValue();
if (key.contains(sessionId)) {
log.info("推送的消息为:" + key);
value.session.getBasicRemote().sendText(message);
// List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024);
// for (byte[] bytes : result) {
// String s = new String(bytes);
// value.session.getBasicRemote().sendText(s);
// }
// 推送结束符
// value.session.getBasicRemote().sendText("#end#");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @param socketMsgEntity ,
* @param sessionId token
*/
public synchronized void sendMessage(SocketMsgEntity socketMsgEntity, String sessionId) {
synchronized (this.getClass()) {
for (Map.Entry<String, SpsSyncWebSocket> stringMyWebSocketEntry : socketServersMap.entrySet()) {
try {
String key = stringMyWebSocketEntry.getKey();
SpsSyncWebSocket value = stringMyWebSocketEntry.getValue();
if (key.contains(sessionId)) {
log.info("推送的消息为:" + key);
value.session.getBasicRemote().sendText(JSONUtil.toJsonStr(socketMsgEntity));
// List<byte[]> result = ByteArraySplitter.split(message, 64 * 1024);
// for (byte[] bytes : result) {
// String s = new String(bytes);
// value.session.getBasicRemote().sendText(s);
// }
// 推送结束符
// value.session.getBasicRemote().sendText("#end#");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 线
*
* @return
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 线+1
*/
public static synchronized void addOnlineCount() {
SpsSyncWebSocket.onlineCount++;
}
/**
* 线-1
*/
public static synchronized void subOnlineCount() {
SpsSyncWebSocket.onlineCount--;
}
}

@ -1,7 +1,6 @@
package com.glxp.sale.admin.thread;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
@ -13,30 +12,30 @@ import com.glxp.sale.admin.dto.RelaySyncDto;
import com.glxp.sale.admin.entity.param.SystemParamConfigEntity;
import com.glxp.sale.admin.entity.sync.BasicExportStatusEntity;
import com.glxp.sale.admin.entity.sync.ProductInfoEntity;
import com.glxp.sale.admin.entity.sync.SocketMsgEntity;
import com.glxp.sale.admin.entity.sync.UdiCompanyEntity;
import com.glxp.sale.admin.http.SpGetHttp;
import com.glxp.sale.admin.idc.service.IdcService;
import com.glxp.sale.admin.req.sync.BasicExportStatusRequest;
import com.glxp.sale.admin.req.sync.SpsSyncDataRequest;
import com.glxp.sale.admin.res.sync.*;
import com.glxp.sale.admin.res.sync.SpSyncUdiResponse;
import com.glxp.sale.admin.res.sync.SpsSyncDataResponse;
import com.glxp.sale.admin.res.sync.SpsSyncScheduleResponse;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.service.sync.BasicExportService;
import com.glxp.sale.admin.socket.server.SpsSyncWebSocket;
import com.glxp.sale.admin.thread.didl.AsyncDiDlHelper;
import com.glxp.sale.admin.util.*;
import com.glxp.sale.common.enums.ResultEnum;
import com.glxp.sale.common.res.BaseResponse;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.awt.*;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -260,7 +259,7 @@ public class DlAllDataService {
*/
@Transactional(rollbackFor = Exception.class)
public void pullData(BasicExportTypeEnum exportType) {
switch (exportType){
switch (exportType) {
case IO_ORDER:
case COUNTRY_DI_DATA:
case SYS_SET_DATA:
@ -291,30 +290,43 @@ public class DlAllDataService {
idcService.batchDownloadFile(spGetHttp.getSpmsUrl(), list.toArray(new String[list.size()]));
}
if (StrUtil.isNotBlank(bean.getTaskId())) {
// 将数据写入文件
// try {
FileUtils.makeDirectory(filePath + "sync/");
String fileName = filePath + "sync/" + exportType.getRemark() + "-" + bean.getTaskId() + ".udi";
FileUtils.SaveFileAs(bean.getFileContent(), fileName);
// } catch (IOException e) {
// log.error("从UDI自助平台拉取基础数据 文件[{}]写入异常----{}",bean.getFilePath(), e.getMessage());
// throw new RuntimeException(e);
// }
BasicExportStatusEntity exportStatusEntity = BasicExportStatusEntity.builder()
.id(bean.getTaskId())
.taskId(bean.getTaskId())
.status(BasicExportStatusEnum.WAIT_SYNC.getCode())
.type(exportType.getRemark())
.transportType(1)
.scheduleType(1)
.fromType(1).remark(remark)
.startTime(DateUtil.getDateTime())
.updateTime(new Date())
.cacheFilePath(fileName)
.build();
basicExportService.insertExportStatus(exportStatusEntity);
//通知自助平台任务已完成
spGetHttp.finishTask(bean.getTaskId());
if (StrUtil.isNotBlank(bean.getFileContent()) && !bean.getFileContent().contains("系统繁忙")) {
FileUtils.makeDirectory(filePath + "sync/");
String fileName = filePath + "sync/" + exportType.getRemark() + "-" + bean.getTaskId() + ".udi";
FileUtils.SaveFileAs(bean.getFileContent(), fileName);
BasicExportStatusEntity exportStatusEntity = BasicExportStatusEntity.builder()
.id(bean.getTaskId())
.taskId(bean.getTaskId())
.status(BasicExportStatusEnum.WAIT_SYNC.getCode())
.type(exportType.getRemark())
.transportType(1)
.scheduleType(1)
.fromType(1).remark(remark)
.startTime(DateUtil.getDateTime())
.updateTime(new Date())
.cacheFilePath(fileName)
.build();
basicExportService.insertExportStatus(exportStatusEntity);
//通知自助平台任务已完成
spGetHttp.finishTask(bean.getTaskId());
} else {
BasicExportStatusEntity exportStatusEntity = BasicExportStatusEntity.builder()
.id(bean.getTaskId())
.taskId(bean.getTaskId())
.status(BasicExportStatusEnum.WAIT_SYNC.getCode())
.type(exportType.getRemark())
.transportType(1)
.scheduleType(1)
.fromType(1).remark("异常数据,空数据")
.startTime(DateUtil.getDateTime())
.updateTime(new Date())
.cacheFilePath("")
.build();
basicExportService.insertExportStatus(exportStatusEntity);
spGetHttp.finishTask(bean.getTaskId());
}
}
}
}
@ -501,6 +513,7 @@ public class DlAllDataService {
spGetHttp.postUpdateBasicStatus(basicExportStatusEntity1);
spGetHttp.updateLastTime("AutoDownloadDiProducts", DateUtil.formatDate(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
} else {
SpsSyncDataRequest spsSyncDataRequest = new SpsSyncDataRequest();
spsSyncDataRequest.setId(basicExportStatusEntity.getId());

@ -22,143 +22,120 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Component
@EnableScheduling
public class HeartTask implements SchedulingConfigurer {
final Logger logger = LoggerFactory.getLogger(HeartTask.class);
// 使用与HeartTaskService一致的常量名
private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS";
// 用于确保单个JVM实例内的并发安全
private final Lock processLock = new ReentrantLock();
@Resource
protected ScheduledDao scheduledDao;
@Resource
HeartTaskService heartTaskService;
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
RedisUtil redisUtil;
@Resource
ScanUploadService scanUploadService;
@Resource
ScanDownloadService scanDownloadService;
@Resource
DlAllDataService dlAllDataService;
@Resource
AsyncDiDlService asyncDiDlService;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.addTriggerTask(() -> process(), triggerContext -> {
ScheduledRequest scheduledRequest = new ScheduledRequest();
scheduledRequest.setCronName("heartTask");
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
String cron = scheduledEntity.getCron();
if (cron.isEmpty()) {
logger.error("cron is null");
try {
ScheduledRequest scheduledRequest = new ScheduledRequest();
scheduledRequest.setCronName("heartTask");
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
// 健壮性检查cron表达式
if (scheduledEntity == null) {
logger.error("未找到heartTask的计划任务配置");
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
}
String cron = scheduledEntity.getCron();
if (cron == null || cron.isEmpty()) {
logger.error("cron表达式为空使用默认配置");
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
}
return new CronTrigger(cron).nextExecutionTime(triggerContext);
} catch (Exception e) {
logger.error("获取cron表达式异常使用默认配置", e);
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
}
return new CronTrigger(cron).nextExecutionTime(triggerContext);
});
}
private void process() {
boolean lockAcquired = false;
try {
// 尝试获取锁避免同一JVM内多线程同时执行
lockAcquired = processLock.tryLock(5, TimeUnit.SECONDS);
if (!lockAcquired) {
logger.warn("心跳任务锁获取失败,可能有其他线程正在执行");
return;
}
logger.info("------------心跳任务-----------------");
dlData();
}
//定时从上游下载数据
private void dlData() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (upConnect != null && upConnect.getParamValue().equals("1")) {
dlAllData();
scanUpload();
}
SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (donwConnect != null && donwConnect.getParamValue().equals("1")) {
scanDonwload();
}
}
private void scanUpload() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
//定时扫描
Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime);
scanUploadService.scanAllDatas();
scanUploadService.scanAllBus();
scanUploadService.scanAllOrders();
scanUploadService.scanAllSchedule();
}
}
private void scanDonwload() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (upConnect.getParamValue().equals("1")) {
logger.info("------------心跳任务开始-----------------");
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
scanDownloadService.scanAllData();
scanDownloadService.scanAllBus();
scanDownloadService.scanAllOrder();
scanDownloadService.scanScheduleList();
scanDownloadService.scanUdis();
if (systemParamConfigEntity == null || systemParamConfigEntity.getParamValue() == null) {
logger.error("系统参数sc_udiinfo_status未配置");
return;
}
}
}
private void dlAllData() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 6 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
if (lastTime == null) {
try {
lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00");
// 确保使用正确的乘数 10
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 10 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get(REDIS_DOWNLOAD_STATUS_KEY);
if (lastTime == null) {
try {
lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00");
} catch (Exception e) {
logger.error("时间戳转换失败", e);
lastTime = 0L; // 使用安全默认值
}
// 首次初始化Redis时间戳
redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, lastTime);
}
if (curTime - lastTime > timeInterval) {
logger.info("时间间隔已满足,开始执行数据下载");
// 直接调用heartTaskService.dlAllData()
// 注意不在此处更新REDIS_DOWNLOAD_STATUS_KEY避免与HeartTaskService的冲突
// 由HeartTaskService.dlAllData()负责更新REDIS_DOWNLOAD_STATUS_KEY
heartTaskService.dlAllData();
} else {
logger.info("时间未到,距离下次执行还有{}毫秒", timeInterval - (curTime - lastTime));
}
} catch (NumberFormatException e) {
logger.error("解析时间间隔参数失败: {}", systemParamConfigEntity.getParamValue(), e);
} catch (Exception e) {
e.printStackTrace();
logger.error("心跳任务执行异常", e);
}
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
logger.info("定时从上游下载全部据-----");
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
String doing = (String) redisUtil.get("is_doing_download");
if (doing == null || doing.equals("false")) {
redisUtil.set("is_doing_download", "true", 60);
// dlAllDataService.dllNewAllData();
dlAllDataService.dllNewAllOrder();
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
dlAllDataService.pullData(i);
});
// dlAllDataService.dllNewAllBusType();
// dlAllDataService.dlScheduleStatus();
dlAllDataService.dlAllDi();
redisUtil.set("is_doing_download", "false");
logger.info("------------心跳任务结束-----------------");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("获取心跳任务锁时被中断", e);
} finally {
if (lockAcquired) {
processLock.unlock();
}
logger.info("定时从上游下载全部据-----结束");
}
}
}

@ -0,0 +1,127 @@
package com.glxp.sale.admin.thread;
import com.glxp.sale.admin.constant.BasicExportTypeEnum;
import com.glxp.sale.admin.constant.SocketMsgType;
import com.glxp.sale.admin.entity.param.SystemParamConfigEntity;
import com.glxp.sale.admin.entity.sync.SocketMsgEntity;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.socket.server.SpsSyncWebSocket;
import com.glxp.sale.admin.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Service
public class HeartTaskService {
private static final Logger logger = LoggerFactory.getLogger(HeartTaskService.class);
// 确保常量命名与HeartTask保持一致
private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS";
private static final String REDIS_UPLOAD_STATUS_KEY = "UPLOAD_UDIINFO_STATUS";
private static final String DOWNLOAD_LOCK_KEY = "is_doing_download";
// Redis锁过期时间(秒):设置合理的超时时间,避免任务卡死导致锁无法释放
private static final int LOCK_EXPIRY_SECONDS = 30 * 60; // 30分钟
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
RedisUtil redisUtil;
@Resource
ScanUploadService scanUploadService;
@Resource
ScanDownloadService scanDownloadService;
@Resource
DlAllDataService dlAllDataService;
@Resource
SpsSyncWebSocket spsSyncWebSocket;
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
/**
*
* HeartTask
* Redis
*/
public void dlAllData() {
logger.info("定时从上游下载全部据-----开始");
// 先更新时间戳,防止其他实例重复执行
// 此处的时间戳更新会被HeartTask及其他服务读取用于判断是否执行
redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, System.currentTimeMillis());
// 生成唯一的锁标识,用于安全释放锁(避免释放其他实例的锁)
String lockValue = UUID.randomUUID().toString();
boolean lockAcquired = false;
try {
// 检查是否已有下载任务在进行
String existingLock = (String) redisUtil.get(DOWNLOAD_LOCK_KEY);
if (existingLock != null && !existingLock.equals("false")) {
logger.info("已有下载任务正在执行中,跳过本次执行");
return;
}
// 尝试获取锁,使用唯一值标识锁的拥有者
lockAcquired = redisUtil.set(DOWNLOAD_LOCK_KEY, lockValue, LOCK_EXPIRY_SECONDS);
if (!lockAcquired) {
logger.warn("无法获取下载任务锁,可能有其他进程正在执行");
return;
}
logger.info("成功获取下载锁开始执行数据下载任务锁ID: {}", lockValue);
try {
// 执行订单数据下载
dlAllDataService.dllNewAllOrder();
// 执行其他类型数据下载
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
try {
dlAllDataService.pullData(i);
} catch (Exception e) {
logger.error("下载数据类型[{}]时发生异常", i, e);
}
});
// 注释掉的DI下载保留注释
// dlAllDataService.dlAllDi();
// 发送WebSocket消息通知
try {
spsSyncWebSocket.sendMessage(
SocketMsgEntity.builder()
.type(SocketMsgType.DL_ALL_DATA)
.content("")
.remark("下载基础信息")
.build(),
"2:" + socketToken
);
} catch (Exception e) {
logger.error("发送WebSocket消息失败", e);
}
logger.info("数据下载任务执行完成");
} catch (Exception e) {
logger.error("执行数据下载任务过程中发生异常", e);
}
} finally {
// 确保无论如何都释放锁,但只释放自己的锁
if (lockAcquired) {
// 检查锁是否仍然是我们设置的值,避免释放他人的锁
String currentLock = (String) redisUtil.get(DOWNLOAD_LOCK_KEY);
if (lockValue.equals(currentLock)) {
boolean released = redisUtil.set(DOWNLOAD_LOCK_KEY, "false");
if (!released) {
logger.error("释放下载任务锁失败");
} else {
logger.info("成功释放下载锁锁ID: {}", lockValue);
}
} else {
logger.warn("锁已被其他进程修改,不进行释放,当前锁值: {}, 期望锁值: {}", currentLock, lockValue);
}
}
logger.info("定时从上游下载全部据-----结束");
}
}
}

@ -1,24 +1,15 @@
package com.glxp.sale.admin.thread.didl;
import com.alibaba.fastjson.JSON;
import com.glxp.sale.admin.constant.FileConstant;
import com.glxp.sale.admin.entity.param.SystemParamConfigEntity;
import com.glxp.sale.admin.entity.sync.BasicExportStatusEntity;
import com.glxp.sale.admin.entity.sync.UdiCompanyEntity;
import com.glxp.sale.admin.service.param.SystemParamConfigService;
import com.glxp.sale.admin.service.sync.BasicExportService;
import com.glxp.sale.admin.util.FileUtils;
import com.glxp.sale.admin.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Service
public class AsyncCompanyDlService {
@ -41,41 +32,41 @@ public class AsyncCompanyDlService {
@Async
public void asyncDiByTime(String updateTime) {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (systemParamConfigEntity.getParamValue().equals("1")) {
List<UdiCompanyEntity> mUdiCompanyEntities = new ArrayList<>();
int page = 1;
int limit = 300;
while (true) {
logger.info("更新时间:" + updateTime + "----" + page + "----" + limit);
List<UdiCompanyEntity> udiCompanyEntities = asyncDiDlHelper.dlCompanyByTime(page, limit, updateTime);
if (udiCompanyEntities != null && udiCompanyEntities.size() > 0) {
mUdiCompanyEntities.addAll(udiCompanyEntities);
if (udiCompanyEntities.size() < limit) {
break;
} else {
page++;
}
} else {
break;
}
}
logger.info("更新时间:" + updateTime + "----" + "下载结束");
SystemParamConfigEntity downstream = systemParamConfigService.selectByParamKey("sync_downstream_enable");
String key = "UdiCompanys" + System.currentTimeMillis();
String datas = JSON.toJSON(mUdiCompanyEntities).toString();
if (downstream.getParamValue().equals("1")) {
redisUtil.set(key, datas);
BasicExportStatusEntity myEntity = new BasicExportStatusEntity();
myEntity.setId(key);
basicExportService.insertExportStatus(myEntity);
} else {
String path = getPath();
String fileName = path + FileConstant.upWaitCopy_products + "UdiCompanys_" + key + ".LowerIn";
FileUtils.SaveFileAs(datas, fileName);
}
}
// SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sync_upstream_enable");
// if (systemParamConfigEntity.getParamValue().equals("1")) {
// List<UdiCompanyEntity> mUdiCompanyEntities = new ArrayList<>();
// int page = 1;
// int limit = 300;
// while (true) {
// logger.info("更新时间:" + updateTime + "----" + page + "----" + limit);
// List<UdiCompanyEntity> udiCompanyEntities = asyncDiDlHelper.dlCompanyByTime(page, limit, updateTime);
// if (udiCompanyEntities != null && udiCompanyEntities.size() > 0) {
// mUdiCompanyEntities.addAll(udiCompanyEntities);
// if (udiCompanyEntities.size() < limit) {
// break;
// } else {
// page++;
// }
// } else {
// break;
// }
// }
// logger.info("更新时间:" + updateTime + "----" + "下载结束");
//
// SystemParamConfigEntity downstream = systemParamConfigService.selectByParamKey("sync_downstream_enable");
// String key = "UdiCompanys" + System.currentTimeMillis();
// String datas = JSON.toJSON(mUdiCompanyEntities).toString();
// if (downstream.getParamValue().equals("1")) {
// redisUtil.set(key, datas);
// BasicExportStatusEntity myEntity = new BasicExportStatusEntity();
// myEntity.setId(key);
// basicExportService.insertExportStatus(myEntity);
// } else {
// String path = getPath();
// String fileName = path + FileConstant.upWaitCopy_products + "UdiCompanys_" + key + ".LowerIn";
// FileUtils.SaveFileAs(datas, fileName);
// }
// }
}

@ -0,0 +1,48 @@
package com.glxp.sale.admin.util;
import cn.hutool.core.util.ArrayUtil;
import com.beust.jcommander.internal.Lists;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
public class ByteArraySplitter {
/**
* StringList<byte[]>
*
* @param source
* @param size
* @return
*/
public static List<byte[]> split(String source, int size) {
// 存放最终结果
List<byte[]> result = Lists.newArrayList();
if (StringUtils.isEmpty(source)) {
return null;
}
byte[] sourceBytes = source.getBytes();
if (size > sourceBytes.length) {
result.add(sourceBytes);
return result;
}
// 开始进行split
int startIndex = 0;
int endIndex = sourceBytes.length - 1;
boolean isRunning = true;
while (isRunning) {
if ((endIndex + 1) - startIndex > size) {
result.add(ArrayUtil.sub(sourceBytes, startIndex, startIndex + size));
startIndex += size;
} else {
result.add(ArrayUtil.sub(sourceBytes, startIndex, endIndex + 1));
isRunning = false;
}
}
return result;
}
}

@ -23,7 +23,7 @@ server.connectionTimeout=180000
# \u8F93\u51FA\u65E5\u5FD7\u5230\u9879\u76EE\u6839\u76EE\u5F55\u4E0B\u7684springboot.log\u6587\u4EF6\u4E2D // \u914D\u7F6E logback-spring.xml\u65F6 \u6B64\u65E5\u5FD7\u8F93\u51FA\u65B9\u5F0F\u4F1A\u88AB\u8986\u76D6\u3002
logging.file=D:/1s/udiwms/udiwms.log
#logging.file=/www/glxpdata/spsyc/
file_path=D:/share/udisps/
file_path=D:/udi/udiwms/udiwmsfile/
#file_path=/share/order/sync
back_file_path=D:/share/udisps/back/
#back_file_path=/share/order/sync_back
@ -35,8 +35,8 @@ UDIWMS_IP=http://127.0.0.1:9991
#自助平台地址
SPMS_IP=http://127.0.0.1:9906
#SPMS_IP=http://139.9.178.73:8080/SPMS_SERVER
API_KEY=1101
API_SECRET=zBITspLNvuoEd4FaamlSoqxRHmNsmQ9L
API_KEY:1101
API_SECRET:zBITspLNvuoEd4FaamlSoqxRHmNsmQ6L
#SPMS_IP=http://139.159.187.130:8080/SPMS_SERVER
##端口号
# Redis数据库索引默认为0
@ -57,3 +57,4 @@ spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.jedis.timeout=300
SPMS_WEBSOCKET_TOKEN=07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -57,3 +57,4 @@ spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.jedis.timeout=300
SPMS_WEBSOCKET_TOKEN=07rKFDFkQvBkbxgc7aUBlONo4gWNdx8b

@ -9,3 +9,10 @@ server.max-http-header-size=100MB
#\u6700\u5927\u4E0A\u4F20\u6587\u4EF6\u5927\u5C0F(10MB)
spring.servlet.multipart.max-file-size=200MB
spring.servlet.multipart.max-request-size=200MB
# ?????????8?CPU???
thread.pool.core-pool-size= 8
thread.pool.max-pool-size= 16
thread.pool.queue-capacity= 1000
thread.pool.keep-alive-seconds= 160

@ -5,3 +5,13 @@ CALL Pro_Temp_ColumnWork('basic_export_status', 'receiveStatus', 'varchar(255) '
INSERT ignore INTO `system_param_config`(`id`, `parentId`, `paramName`, `paramKey`, `paramValue`, `paramStatus`,
`paramType`, `paramExplain`)
VALUES (516, NULL, '是否顶级中继服务', 'is_top_service', '1', 1, 0, '0:否1');
INSERT ignore INTO `system_param_config` (`id`, `parentId`, `paramName`, `paramKey`, `paramValue`, `paramStatus`, `paramType`, `paramExplain`)
VALUES (666, 0, '是否启用阿里健康平台', 'is_top_aliService', '1', 1, 0, '0:否1');
INSERT ignore INTO `system_param_config` (`id`, `parentId`, `paramName`, `paramKey`, `paramValue`, `paramStatus`, `paramType`, `paramExplain`)
VALUES (668, 0, '阿里健康平台服务地址', 'aliApi_server_ip', 'http://gw.api.taobao.com/router/rest', 1, 0, '');
INSERT ignore INTO `system_param_config` (`id`, `parentId`, `paramName`, `paramKey`, `paramValue`, `paramStatus`, `paramType`, `paramExplain`)
VALUES (669, 0, '是否启用国家库平台', 'is_udi_dl_service', '1', 1, 0, '0:否1');
INSERT ignore INTO `system_param_config` (`id`, `parentId`, `paramName`, `paramKey`, `paramValue`, `paramStatus`, `paramType`, `paramExplain`)
VALUES (670, 0, '国家库平台服务地址', 'udi_dl_server_ip', 'https://www.udims.com/UDI_DL_Server_test', 1, 0, '');

Loading…
Cancel
Save