Merge remote-tracking branch 'origin/dev' into dev

dev
chenhc 3 days ago
commit e5c8c2aa44

@ -22,12 +22,22 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Component
@EnableScheduling
public class HeartTask implements SchedulingConfigurer {
final Logger logger = LoggerFactory.getLogger(HeartTask.class);
// 使用与HeartTaskService一致的常量名
private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS";
// 用于确保单个JVM实例内的并发安全
private final Lock processLock = new ReentrantLock();
@Resource
protected ScheduledDao scheduledDao;
@Resource
@ -41,42 +51,91 @@ public class HeartTask implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.addTriggerTask(() -> process(), triggerContext -> {
ScheduledRequest scheduledRequest = new ScheduledRequest();
scheduledRequest.setCronName("heartTask");
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
String cron = scheduledEntity.getCron();
if (cron.isEmpty()) {
logger.error("cron is null");
try {
ScheduledRequest scheduledRequest = new ScheduledRequest();
scheduledRequest.setCronName("heartTask");
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
// 健壮性检查cron表达式
if (scheduledEntity == null) {
logger.error("未找到heartTask的计划任务配置");
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
}
String cron = scheduledEntity.getCron();
if (cron == null || cron.isEmpty()) {
logger.error("cron表达式为空使用默认配置");
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
}
return new CronTrigger(cron).nextExecutionTime(triggerContext);
} catch (Exception e) {
logger.error("获取cron表达式异常使用默认配置", e);
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
}
return new CronTrigger(cron).nextExecutionTime(triggerContext);
});
}
private void process() {
boolean lockAcquired = false;
try {
// 尝试获取锁避免同一JVM内多线程同时执行
lockAcquired = processLock.tryLock(5, TimeUnit.SECONDS);
if (!lockAcquired) {
logger.warn("心跳任务锁获取失败,可能有其他线程正在执行");
return;
}
logger.info("------------心跳任务-----------------");
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 6 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
if (lastTime == null) {
logger.info("------------心跳任务开始-----------------");
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
if (systemParamConfigEntity == null || systemParamConfigEntity.getParamValue() == null) {
logger.error("系统参数sc_udiinfo_status未配置");
return;
}
try {
lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00");
// 确保使用正确的乘数 10
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 10 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get(REDIS_DOWNLOAD_STATUS_KEY);
if (lastTime == null) {
try {
lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00");
} catch (Exception e) {
logger.error("时间戳转换失败", e);
lastTime = 0L; // 使用安全默认值
}
// 首次初始化Redis时间戳
redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, lastTime);
}
if (curTime - lastTime > timeInterval) {
logger.info("时间间隔已满足,开始执行数据下载");
// 直接调用heartTaskService.dlAllData()
// 注意不在此处更新REDIS_DOWNLOAD_STATUS_KEY避免与HeartTaskService的冲突
// 由HeartTaskService.dlAllData()负责更新REDIS_DOWNLOAD_STATUS_KEY
heartTaskService.dlAllData();
} else {
logger.info("时间未到,距离下次执行还有{}毫秒", timeInterval - (curTime - lastTime));
}
} catch (NumberFormatException e) {
logger.error("解析时间间隔参数失败: {}", systemParamConfigEntity.getParamValue(), e);
} catch (Exception e) {
e.printStackTrace();
logger.error("心跳任务执行异常", e);
}
logger.info("------------心跳任务结束-----------------");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("获取心跳任务锁时被中断", e);
} finally {
if (lockAcquired) {
processLock.unlock();
}
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
heartTaskService.dlAllData();
} else {
logger.info("时间未到");
}
}
}

@ -14,11 +14,20 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Service
public class HeartTaskService {
final Logger logger = LoggerFactory.getLogger(HeartTask.class);
private static final Logger logger = LoggerFactory.getLogger(HeartTaskService.class);
// 确保常量命名与HeartTask保持一致
private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS";
private static final String REDIS_UPLOAD_STATUS_KEY = "UPLOAD_UDIINFO_STATUS";
private static final String DOWNLOAD_LOCK_KEY = "is_doing_download";
// Redis锁过期时间(秒):设置合理的超时时间,避免任务卡死导致锁无法释放
private static final int LOCK_EXPIRY_SECONDS = 30 * 60; // 30分钟
@Resource
SystemParamConfigService systemParamConfigService;
@Resource
@ -34,91 +43,85 @@ public class HeartTaskService {
@Value("${SPMS_WEBSOCKET_TOKEN}")
private String socketToken;
//定时从上游下载数据
public void dlData() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (upConnect != null && upConnect.getParamValue().equals("1")) {
dlAllData();
scanUpload();
}
SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (donwConnect != null && donwConnect.getParamValue().equals("1")) {
scanDonwload();
}
}
private void scanUpload() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
//定时扫描
Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime);
scanUploadService.scanAllDatas();
scanUploadService.scanAllBus();
scanUploadService.scanAllOrders();
scanUploadService.scanAllSchedule();
}
}
private void scanDonwload() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
if (upConnect.getParamValue().equals("1")) {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
long curTime = System.currentTimeMillis();
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
if (lastTime == null) {
lastTime = System.currentTimeMillis();
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
}
if (curTime - lastTime > timeInterval) {
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
scanDownloadService.scanAllData();
scanDownloadService.scanAllBus();
scanDownloadService.scanAllOrder();
scanDownloadService.scanScheduleList();
scanDownloadService.scanUdis();
}
}
}
public void dlUpAllData() {
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
if (upConnect != null && upConnect.getParamValue().equals("1")) {
dlAllData();
}
}
/**
*
* HeartTask
* Redis
*/
public void dlAllData() {
logger.info("定时从上游下载全部据-----开始");
// 先更新时间戳,防止其他实例重复执行
// 此处的时间戳更新会被HeartTask及其他服务读取用于判断是否执行
redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, System.currentTimeMillis());
// 生成唯一的锁标识,用于安全释放锁(避免释放其他实例的锁)
String lockValue = UUID.randomUUID().toString();
boolean lockAcquired = false;
public void dlAllData() {
logger.info("定时从上游下载全部据-----");
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", System.currentTimeMillis());
String doing = (String) redisUtil.get("is_doing_download");
if (doing == null || doing.equals("false")) {
redisUtil.set("is_doing_download", "true", 60);
dlAllDataService.dllNewAllOrder();
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
dlAllDataService.pullData(i);
});
dlAllDataService.dlAllDi();
redisUtil.set("is_doing_download", "false");
spsSyncWebSocket.sendMessage(SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build(), "2:" + socketToken);
try {
// 检查是否已有下载任务在进行
String existingLock = (String) redisUtil.get(DOWNLOAD_LOCK_KEY);
if (existingLock != null && !existingLock.equals("false")) {
logger.info("已有下载任务正在执行中,跳过本次执行");
return;
}
// 尝试获取锁,使用唯一值标识锁的拥有者
lockAcquired = redisUtil.set(DOWNLOAD_LOCK_KEY, lockValue, LOCK_EXPIRY_SECONDS);
if (!lockAcquired) {
logger.warn("无法获取下载任务锁,可能有其他进程正在执行");
return;
}
logger.info("成功获取下载锁开始执行数据下载任务锁ID: {}", lockValue);
try {
// 执行订单数据下载
dlAllDataService.dllNewAllOrder();
// 执行其他类型数据下载
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
try {
dlAllDataService.pullData(i);
} catch (Exception e) {
logger.error("下载数据类型[{}]时发生异常", i, e);
}
});
// 注释掉的DI下载保留注释
// dlAllDataService.dlAllDi();
// 发送WebSocket消息通知
try {
spsSyncWebSocket.sendMessage(
SocketMsgEntity.builder()
.type(SocketMsgType.DL_ALL_DATA)
.content("")
.remark("下载基础信息")
.build(),
"2:" + socketToken
);
} catch (Exception e) {
logger.error("发送WebSocket消息失败", e);
}
logger.info("数据下载任务执行完成");
} catch (Exception e) {
logger.error("执行数据下载任务过程中发生异常", e);
}
} finally {
// 确保无论如何都释放锁,但只释放自己的锁
if (lockAcquired) {
// 检查锁是否仍然是我们设置的值,避免释放他人的锁
String currentLock = (String) redisUtil.get(DOWNLOAD_LOCK_KEY);
if (lockValue.equals(currentLock)) {
boolean released = redisUtil.set(DOWNLOAD_LOCK_KEY, "false");
if (!released) {
logger.error("释放下载任务锁失败");
} else {
logger.info("成功释放下载锁锁ID: {}", lockValue);
}
} else {
logger.warn("锁已被其他进程修改,不进行释放,当前锁值: {}, 期望锁值: {}", currentLock, lockValue);
}
}
logger.info("定时从上游下载全部据-----结束");
}
logger.info("定时从上游下载全部据-----结束");
}
public void getData(String message){
logger.error("jiehsou 要发送的数据");
spsSyncWebSocket.sendMessage(SocketMsgEntity.builder().content(message).type(SocketMsgType.STAT_DATA).remark("我瞎说的数据").build(), "2:" + socketToken);
}
}

Loading…
Cancel
Save