diff --git a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java index 8965aa3..e2c9ee6 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTask.java @@ -22,12 +22,22 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Component @EnableScheduling public class HeartTask implements SchedulingConfigurer { final Logger logger = LoggerFactory.getLogger(HeartTask.class); + + // 使用与HeartTaskService一致的常量名 + private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS"; + + // 用于确保单个JVM实例内的并发安全 + private final Lock processLock = new ReentrantLock(); + @Resource protected ScheduledDao scheduledDao; @Resource @@ -41,42 +51,91 @@ public class HeartTask implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { - scheduledTaskRegistrar.addTriggerTask(() -> process(), triggerContext -> { - ScheduledRequest scheduledRequest = new ScheduledRequest(); - scheduledRequest.setCronName("heartTask"); - ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest); - String cron = scheduledEntity.getCron(); - if (cron.isEmpty()) { - logger.error("cron is null"); + try { + ScheduledRequest scheduledRequest = new ScheduledRequest(); + scheduledRequest.setCronName("heartTask"); + ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest); + + // 健壮性检查cron表达式 + if (scheduledEntity == null) { + logger.error("未找到heartTask的计划任务配置"); + return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext); + } + + String cron = scheduledEntity.getCron(); + if (cron == null || cron.isEmpty()) { + logger.error("cron表达式为空,使用默认配置"); + return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext); + } + + return new CronTrigger(cron).nextExecutionTime(triggerContext); + } catch (Exception e) { + logger.error("获取cron表达式异常,使用默认配置", e); + return new CronTrigger("0 0/10 * * * ?").nextExecutionTime(triggerContext); } - return new CronTrigger(cron).nextExecutionTime(triggerContext); }); } - private void process() { + boolean lockAcquired = false; + try { + // 尝试获取锁,避免同一JVM内多线程同时执行 + lockAcquired = processLock.tryLock(5, TimeUnit.SECONDS); + if (!lockAcquired) { + logger.warn("心跳任务锁获取失败,可能有其他线程正在执行"); + return; + } - logger.info("------------心跳任务-----------------"); - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status"); - long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 6 * 1000; - long curTime = System.currentTimeMillis(); - Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS"); - if (lastTime == null) { + logger.info("------------心跳任务开始-----------------"); + + SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status"); + if (systemParamConfigEntity == null || systemParamConfigEntity.getParamValue() == null) { + logger.error("系统参数sc_udiinfo_status未配置"); + return; + } + try { - lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00"); + // 确保使用正确的乘数 10 + long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 10 * 1000; + long curTime = System.currentTimeMillis(); + Long lastTime = (Long) redisUtil.get(REDIS_DOWNLOAD_STATUS_KEY); + + if (lastTime == null) { + try { + lastTime = DateUtil.timeToStamp("1949-01-01 00:00:00"); + } catch (Exception e) { + logger.error("时间戳转换失败", e); + lastTime = 0L; // 使用安全默认值 + } + // 首次初始化Redis时间戳 + redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, lastTime); + } + + if (curTime - lastTime > timeInterval) { + logger.info("时间间隔已满足,开始执行数据下载"); + + // 直接调用heartTaskService.dlAllData() + // 注意:不在此处更新REDIS_DOWNLOAD_STATUS_KEY,避免与HeartTaskService的冲突 + // 由HeartTaskService.dlAllData()负责更新REDIS_DOWNLOAD_STATUS_KEY + heartTaskService.dlAllData(); + } else { + logger.info("时间未到,距离下次执行还有{}毫秒", timeInterval - (curTime - lastTime)); + } + } catch (NumberFormatException e) { + logger.error("解析时间间隔参数失败: {}", systemParamConfigEntity.getParamValue(), e); } catch (Exception e) { - e.printStackTrace(); + logger.error("心跳任务执行异常", e); + } + + logger.info("------------心跳任务结束-----------------"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("获取心跳任务锁时被中断", e); + } finally { + if (lockAcquired) { + processLock.unlock(); } - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime); - } - if (curTime - lastTime > timeInterval) { - heartTaskService.dlAllData(); - } else { - logger.info("时间未到"); } - } - - } diff --git a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java index 5812485..857a2d2 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/thread/HeartTaskService.java @@ -14,11 +14,20 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; @Service public class HeartTaskService { - final Logger logger = LoggerFactory.getLogger(HeartTask.class); + private static final Logger logger = LoggerFactory.getLogger(HeartTaskService.class); + // 确保常量命名与HeartTask保持一致 + private static final String REDIS_DOWNLOAD_STATUS_KEY = "SC_UDIINFO_DOWNLOAD_STATUS"; + private static final String REDIS_UPLOAD_STATUS_KEY = "UPLOAD_UDIINFO_STATUS"; + private static final String DOWNLOAD_LOCK_KEY = "is_doing_download"; + // Redis锁过期时间(秒):设置合理的超时时间,避免任务卡死导致锁无法释放 + private static final int LOCK_EXPIRY_SECONDS = 30 * 60; // 30分钟 + @Resource SystemParamConfigService systemParamConfigService; @Resource @@ -34,91 +43,85 @@ public class HeartTaskService { @Value("${SPMS_WEBSOCKET_TOKEN}") private String socketToken; - //定时从上游下载数据 - public void dlData() { - SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable"); - if (upConnect != null && upConnect.getParamValue().equals("1")) { - dlAllData(); - scanUpload(); - } - SystemParamConfigEntity donwConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable"); - if (donwConnect != null && donwConnect.getParamValue().equals("1")) { - scanDonwload(); - } - } - private void scanUpload() { - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_upload"); - long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000; - long curTime = System.currentTimeMillis(); - //定时扫描 - Long lastTime = (Long) redisUtil.get("UPLOAD_UDIINFO_STATUS"); - if (lastTime == null) { - lastTime = System.currentTimeMillis(); - redisUtil.set("UPLOAD_UDIINFO_STATUS", lastTime, 30 * 60); - } - if (curTime - lastTime > timeInterval) { - redisUtil.set("UPLOAD_UDIINFO_STATUS", curTime); - scanUploadService.scanAllDatas(); - scanUploadService.scanAllBus(); - scanUploadService.scanAllOrders(); - scanUploadService.scanAllSchedule(); - } - } - - private void scanDonwload() { - SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_downstream_enable"); - if (upConnect.getParamValue().equals("1")) { - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("sc_udiinfo_status"); - long timeInterval = Long.parseLong(systemParamConfigEntity.getParamValue()) * 60 * 1000; - long curTime = System.currentTimeMillis(); - Long lastTime = (Long) redisUtil.get("SC_UDIINFO_DOWNLOAD_STATUS"); - if (lastTime == null) { - lastTime = System.currentTimeMillis(); - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", lastTime); - } - if (curTime - lastTime > timeInterval) { - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", curTime); - scanDownloadService.scanAllData(); - scanDownloadService.scanAllBus(); - scanDownloadService.scanAllOrder(); - scanDownloadService.scanScheduleList(); - scanDownloadService.scanUdis(); - } - } - - } - - public void dlUpAllData() { - SystemParamConfigEntity upConnect = systemParamConfigService.selectByParamKey("sync_upstream_enable"); - if (upConnect != null && upConnect.getParamValue().equals("1")) { - dlAllData(); - } - } + /** + * 从上游下载全部数据 + * 该方法支持被HeartTask或其他服务直接调用 + * 内部实现了基于Redis的分布式锁,确保在分布式环境下也只有一个实例执行 + */ + public void dlAllData() { + logger.info("定时从上游下载全部据-----开始"); + // 先更新时间戳,防止其他实例重复执行 + // 此处的时间戳更新会被HeartTask及其他服务读取用于判断是否执行 + redisUtil.set(REDIS_DOWNLOAD_STATUS_KEY, System.currentTimeMillis()); + // 生成唯一的锁标识,用于安全释放锁(避免释放其他实例的锁) + String lockValue = UUID.randomUUID().toString(); + boolean lockAcquired = false; - public void dlAllData() { - logger.info("定时从上游下载全部据-----"); - redisUtil.set("SC_UDIINFO_DOWNLOAD_STATUS", System.currentTimeMillis()); - String doing = (String) redisUtil.get("is_doing_download"); - if (doing == null || doing.equals("false")) { - redisUtil.set("is_doing_download", "true", 60); - dlAllDataService.dllNewAllOrder(); - Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { - dlAllDataService.pullData(i); - }); - dlAllDataService.dlAllDi(); - redisUtil.set("is_doing_download", "false"); - spsSyncWebSocket.sendMessage(SocketMsgEntity.builder().type(SocketMsgType.DL_ALL_DATA).content("").remark("下载基础信息").build(), "2:" + socketToken); + try { + // 检查是否已有下载任务在进行 + String existingLock = (String) redisUtil.get(DOWNLOAD_LOCK_KEY); + if (existingLock != null && !existingLock.equals("false")) { + logger.info("已有下载任务正在执行中,跳过本次执行"); + return; + } + // 尝试获取锁,使用唯一值标识锁的拥有者 + lockAcquired = redisUtil.set(DOWNLOAD_LOCK_KEY, lockValue, LOCK_EXPIRY_SECONDS); + if (!lockAcquired) { + logger.warn("无法获取下载任务锁,可能有其他进程正在执行"); + return; + } + logger.info("成功获取下载锁,开始执行数据下载任务,锁ID: {}", lockValue); + try { + // 执行订单数据下载 + dlAllDataService.dllNewAllOrder(); + // 执行其他类型数据下载 + Arrays.stream(BasicExportTypeEnum.values()).forEach(i -> { + try { + dlAllDataService.pullData(i); + } catch (Exception e) { + logger.error("下载数据类型[{}]时发生异常", i, e); + } + }); + // 注释掉的DI下载,保留注释 + // dlAllDataService.dlAllDi(); + // 发送WebSocket消息通知 + try { + spsSyncWebSocket.sendMessage( + SocketMsgEntity.builder() + .type(SocketMsgType.DL_ALL_DATA) + .content("") + .remark("下载基础信息") + .build(), + "2:" + socketToken + ); + } catch (Exception e) { + logger.error("发送WebSocket消息失败", e); + } + logger.info("数据下载任务执行完成"); + } catch (Exception e) { + logger.error("执行数据下载任务过程中发生异常", e); + } + } finally { + // 确保无论如何都释放锁,但只释放自己的锁 + if (lockAcquired) { + // 检查锁是否仍然是我们设置的值,避免释放他人的锁 + String currentLock = (String) redisUtil.get(DOWNLOAD_LOCK_KEY); + if (lockValue.equals(currentLock)) { + boolean released = redisUtil.set(DOWNLOAD_LOCK_KEY, "false"); + if (!released) { + logger.error("释放下载任务锁失败"); + } else { + logger.info("成功释放下载锁,锁ID: {}", lockValue); + } + } else { + logger.warn("锁已被其他进程修改,不进行释放,当前锁值: {}, 期望锁值: {}", currentLock, lockValue); + } + } + logger.info("定时从上游下载全部据-----结束"); } - logger.info("定时从上游下载全部据-----结束"); - } - - public void getData(String message){ - logger.error("jiehsou 要发送的数据"); - spsSyncWebSocket.sendMessage(SocketMsgEntity.builder().content(message).type(SocketMsgType.STAT_DATA).remark("我瞎说的数据").build(), "2:" + socketToken); - } }