|
|
|
@ -22,143 +22,120 @@ import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.locks.Lock;
|
|
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
|
|
|
|
@Component
|
|
|
|
|
@EnableScheduling
|
|
|
|
|
public class HeartTask implements SchedulingConfigurer {
|
|
|
|
|
|
|
|
|
|
final Logger logger = LoggerFactory.getLogger(HeartTask.class);
|
|
|
|
|
|
|
|
|
|
// 使用与HeartTaskService一致的常量名
|
|
|
|
|
private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS";
|
|
|
|
|
|
|
|
|
|
// 用于确保单个JVM实例内的并发安全
|
|
|
|
|
private final Lock processLock = new ReentrantLock();
|
|
|
|
|
|
|
|
|
|
@Resource
|
|
|
|
|
protected ScheduledDao scheduledDao;
|
|
|
|
|
@Resource
|
|
|
|
|
HeartTaskService heartTaskService;
|
|
|
|
|
@Resource
|
|
|
|
|
SystemParamConfigService systemParamConfigService;
|
|
|
|
|
@Resource
|
|
|
|
|
RedisUtil redisUtil;
|
|
|
|
|
@Resource
|
|
|
|
|
ScanUploadService scanUploadService;
|
|
|
|
|
@Resource
|
|
|
|
|
ScanDownloadService scanDownloadService;
|
|
|
|
|
@Resource
|
|
|
|
|
DlAllDataService dlAllDataService;
|
|
|
|
|
@Resource
|
|
|
|
|
AsyncDiDlService asyncDiDlService;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
|
|
|
|
|
|
|
|
|
|
scheduledTaskRegistrar.addTriggerTask(() -> process(), triggerContext -> {
|
|
|
|
|
ScheduledRequest scheduledRequest = new ScheduledRequest();
|
|
|
|
|
scheduledRequest.setCronName("heartTask");
|
|
|
|
|
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
|
|
|
|
|
String cron = scheduledEntity.getCron();
|
|
|
|
|
if (cron.isEmpty()) {
|
|
|
|
|
logger.error("cron is null");
|
|
|
|
|
try {
|
|
|
|
|
ScheduledRequest scheduledRequest = new ScheduledRequest();
|
|
|
|
|
scheduledRequest.setCronName("heartTask");
|
|
|
|
|
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
|
|
|
|
|
|
|
|
|
|
// 健壮性检查cron表达式
|
|
|
|
|
if (scheduledEntity == null) {
|
|
|
|
|
logger.error("未找到heartTask的计划任务配置");
|
|
|
|
|
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String cron = scheduledEntity.getCron();
|
|
|
|
|
if (cron == null || cron.isEmpty()) {
|
|
|
|
|
logger.error("cron表达式为空,使用默认配置");
|
|
|
|
|
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return new CronTrigger(cron).nextExecutionTime(triggerContext);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("获取cron表达式异常,使用默认配置", e);
|
|
|
|
|
return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext);
|
|
|
|
|
}
|
|
|
|
|
return new CronTrigger(cron).nextExecutionTime(triggerContext);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void process() {
|
|
|
|
|
boolean lockAcquired = false;
|
|
|
|
|
try {
|
|
|
|
|
// 尝试获取锁,避免同一JVM内多线程同时执行
|
|
|
|
|
lockAcquired = processLock.tryLock(5, TimeUnit.SECONDS);
|
|
|
|
|
if (!lockAcquired) {
|
|
|
|
|
logger.warn("心跳任务锁获取失败,可能有其他线程正在执行");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info("------------心跳任务-----------------");
|
|
|
|
|
dlData();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//定时从上游下载数据
|
|
|
|
|
private void dlData() {
|
|
|
|
|
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable");
|
|
|
|
|
if (upConnect != null && upConnect.getParamValue().equals("1")) {
|
|
|
|
|
dlAllData();
|
|
|
|
|
scanUpload();
|
|
|
|
|
}
|
|
|
|
|
SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
|
|
|
|
|
if (donwConnect != null && donwConnect.getParamValue().equals("1")) {
|
|
|
|
|
scanDonwload();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void scanUpload() {
|
|
|
|
|
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload");
|
|
|
|
|
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
|
|
|
|
|
long curTime = System.currentTimeMillis();
|
|
|
|
|
//定时扫描
|
|
|
|
|
Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS");
|
|
|
|
|
if (lastTime == null) {
|
|
|
|
|
lastTime = System.currentTimeMillis();
|
|
|
|
|
redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60);
|
|
|
|
|
}
|
|
|
|
|
if (curTime - lastTime > timeInterval) {
|
|
|
|
|
redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime);
|
|
|
|
|
scanUploadService.scanAllDatas();
|
|
|
|
|
scanUploadService.scanAllBus();
|
|
|
|
|
scanUploadService.scanAllOrders();
|
|
|
|
|
scanUploadService.scanAllSchedule();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void scanDonwload() {
|
|
|
|
|
SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable");
|
|
|
|
|
|
|
|
|
|
if (upConnect.getParamValue().equals("1")) {
|
|
|
|
|
logger.info("------------心跳任务开始-----------------");
|
|
|
|
|
|
|
|
|
|
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
|
|
|
|
|
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000;
|
|
|
|
|
long curTime = System.currentTimeMillis();
|
|
|
|
|
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
|
|
|
|
|
if (lastTime == null) {
|
|
|
|
|
lastTime = System.currentTimeMillis();
|
|
|
|
|
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
|
|
|
|
|
}
|
|
|
|
|
if (curTime - lastTime > timeInterval) {
|
|
|
|
|
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
|
|
|
|
|
scanDownloadService.scanAllData();
|
|
|
|
|
scanDownloadService.scanAllBus();
|
|
|
|
|
scanDownloadService.scanAllOrder();
|
|
|
|
|
scanDownloadService.scanScheduleList();
|
|
|
|
|
scanDownloadService.scanUdis();
|
|
|
|
|
if (systemParamConfigEntity == null || systemParamConfigEntity.getParamValue() == null) {
|
|
|
|
|
logger.error("系统参数sc_udiinfo_status未配置");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void dlAllData() {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status");
|
|
|
|
|
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 6 * 1000;
|
|
|
|
|
long curTime = System.currentTimeMillis();
|
|
|
|
|
Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS");
|
|
|
|
|
if (lastTime == null) {
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00");
|
|
|
|
|
// 确保使用正确的乘数 10
|
|
|
|
|
long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 10 * 1000;
|
|
|
|
|
long curTime = System.currentTimeMillis();
|
|
|
|
|
Long lastTime = (Long) redisUtil.get(REDIS_DOWNLOAD_STATUS_KEY);
|
|
|
|
|
|
|
|
|
|
if (lastTime == null) {
|
|
|
|
|
try {
|
|
|
|
|
lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00");
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
logger.error("时间戳转换失败", e);
|
|
|
|
|
lastTime = 0L; // 使用安全默认值
|
|
|
|
|
}
|
|
|
|
|
// 首次初始化Redis时间戳
|
|
|
|
|
redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, lastTime);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (curTime - lastTime > timeInterval) {
|
|
|
|
|
logger.info("时间间隔已满足,开始执行数据下载");
|
|
|
|
|
|
|
|
|
|
// 直接调用heartTaskService.dlAllData()
|
|
|
|
|
// 注意:不在此处更新REDIS_DOWNLOAD_STATUS_KEY,避免与HeartTaskService的冲突
|
|
|
|
|
// 由HeartTaskService.dlAllData()负责更新REDIS_DOWNLOAD_STATUS_KEY
|
|
|
|
|
heartTaskService.dlAllData();
|
|
|
|
|
} else {
|
|
|
|
|
logger.info("时间未到,距离下次执行还有{}毫秒", timeInterval - (curTime - lastTime));
|
|
|
|
|
}
|
|
|
|
|
} catch (NumberFormatException e) {
|
|
|
|
|
logger.error("解析时间间隔参数失败: {}", systemParamConfigEntity.getParamValue(), e);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
logger.error("心跳任务执行异常", e);
|
|
|
|
|
}
|
|
|
|
|
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime);
|
|
|
|
|
}
|
|
|
|
|
if (curTime - lastTime > timeInterval) {
|
|
|
|
|
logger.info("定时从上游下载全部据-----");
|
|
|
|
|
redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime);
|
|
|
|
|
String doing = (String) redisUtil.get("is_doing_download");
|
|
|
|
|
if (doing == null || doing.equals("false")) {
|
|
|
|
|
redisUtil.set("is_doing_download", "true", 60);
|
|
|
|
|
// dlAllDataService.dllNewAllData();
|
|
|
|
|
dlAllDataService.dllNewAllOrder();
|
|
|
|
|
Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> {
|
|
|
|
|
dlAllDataService.pullData(i);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// dlAllDataService.dllNewAllBusType();
|
|
|
|
|
// dlAllDataService.dlScheduleStatus();
|
|
|
|
|
dlAllDataService.dlAllDi();
|
|
|
|
|
redisUtil.set("is_doing_download", "false");
|
|
|
|
|
|
|
|
|
|
logger.info("------------心跳任务结束-----------------");
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
logger.error("获取心跳任务锁时被中断", e);
|
|
|
|
|
} finally {
|
|
|
|
|
if (lockAcquired) {
|
|
|
|
|
processLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
logger.info("定时从上游下载全部据-----结束");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|