From e14a0f8ff66bfe1a667c33a52105e7a4686938b3 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 26 Mar 2023 09:32:26 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96=E4=B8=AD=E7=BB=A7=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sale/admin/idc/service/IdcService.java | 3 +- .../idc/service/impl/IdcServiceImpl.java | 85 +++++++++---------- .../sale/admin/idc/thread/AsyncFetchTask.java | 54 ++++++++++++ .../glxp/sale/admin/util/RedirectUtils.java | 5 ++ 4 files changed, 102 insertions(+), 45 deletions(-) create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/idc/thread/AsyncFetchTask.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/util/RedirectUtils.java diff --git a/api-admin/src/main/java/com/glxp/sale/admin/idc/service/IdcService.java b/api-admin/src/main/java/com/glxp/sale/admin/idc/service/IdcService.java index 768e9e5..f532526 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/idc/service/IdcService.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/idc/service/IdcService.java @@ -23,9 +23,10 @@ public interface IdcService { BaseResponse download(HttpServletRequest request,Map params); - void pull(); + public void asyncFetchTask(); BaseResponse downlaodSuccess(HttpServletRequest request,Map params); public void asyncIdcTask(); + } diff --git a/api-admin/src/main/java/com/glxp/sale/admin/idc/service/impl/IdcServiceImpl.java b/api-admin/src/main/java/com/glxp/sale/admin/idc/service/impl/IdcServiceImpl.java index 8828c47..166d8a3 100644 --- a/api-admin/src/main/java/com/glxp/sale/admin/idc/service/impl/IdcServiceImpl.java +++ b/api-admin/src/main/java/com/glxp/sale/admin/idc/service/impl/IdcServiceImpl.java @@ -122,18 +122,27 @@ public class IdcServiceImpl implements IdcService { } /*拉取前一级中继服务数据*/ + @Async @Override - public void pull() { - Map query = new HashMap(); - String result = post(getNextHost("U")+"/spssync/common/list",query); - JSONObject json = JSON.parseObject(result); - if(json.getInteger("code")==20000&&json.getString("data")!=null) { - List list = JSON.parseArray(json.getString("data"), Map.class); - if(list!=null) { - for(Map map:list) { - Map params = new HashMap(); - params.put("taskId", map.get("taskId")); - pullData(getNextHost("U")+"/spssync/common/list",params); + public void asyncFetchTask() { + String[] directions = {"I","U"}; + for(String dir:directions) { + Map query = new HashMap(); + String host = getNextHost(dir); + if(!StringUtils.isEmpty(host)) { + String result = post(host+"/spssync/common/list",query); + if(IDCUtils.isJson(result)) { + JSONObject json = JSON.parseObject(result); + if(json!=null&&json.getInteger("code")==20000&&json.getString("data")!=null) { + List list = JSON.parseArray(json.getString("data"), Map.class); + if(list!=null) { + for(Map map:list) { + Map params = new HashMap(); + params.put("taskId", map.get("taskId")); + fetchData(host+"/spssync/common/list",params); + } + } + } } } } @@ -154,8 +163,8 @@ public class IdcServiceImpl implements IdcService { /*为顶级或末级,以及下游或上游连通,可执行*/ Boolean isLastLevel = isLastLevel(direction); Boolean isRelay = isRelay(direction); - System.out.print("-----数据传输任务开始----"+direction+"\n"); - System.out.print("-----是否顶级或末级服务:"+isLastLevel+",是否转发数据:"+isRelay+"----\n"); + logger.info("-----数据传输任务开始----"+direction+"\n"); + logger.info("-----是否顶级或末级服务:"+isLastLevel+",是否转发数据:"+isRelay+"----\n"); if(isLastLevel&&isRelay) { for(String t:tNames) { uploadData(t); @@ -163,6 +172,8 @@ public class IdcServiceImpl implements IdcService { } } + + private void uploadData(String t) { String[] tn = t.split("/"); String lastUpdateTime = getUpdateTime(tn[0]); @@ -170,7 +181,7 @@ public class IdcServiceImpl implements IdcService { Map map = new HashMap(); String sqlWhere = "not exists (select fkId from idc_record where fkId="+tn[0]+".id)"; if(!StringUtils.isEmpty(tn[1])) { - sqlWhere+=" and ifnull("+tn[1]+",now())>=cast('"+lastUpdateTime+"' as datetime)"; + sqlWhere+=" and "+tn[1]+">=cast('"+lastUpdateTime+"' as datetime)"; } else { map.put("isEnd", "1"); } @@ -189,15 +200,15 @@ public class IdcServiceImpl implements IdcService { Date nextTimePoint = cronSequenceGenerator.next(DateUtil.parseDate(lastUpdateTime)); send = nextTimePoint.before(nowUpdateTime); } - //if(send) { + if(send) { BaseResponse result = send(map); if(result.getCode()==20000) { - setUpdateTime(tn[0],DateUtil.formatDate(nowUpdateTime)); + setUpdateTime(tn[0],DateUtil.formatDate(nowUpdateTime,"yyyy-MM-dd HH:mm:ss")); } - //} + } } - private String pullData(String url,Map params) { + private String fetchData(String url,Map params) { OkHttpClient client = new OkHttpClient().newBuilder() .build(); MediaType mediaType = MediaType.parse("application/json"); @@ -590,43 +601,29 @@ public class IdcServiceImpl implements IdcService { /*获取转发服务地址,当前值允许单向,只使用参数upper_server_ip*/ private String getNextHost(String direction) { - SystemParamConfigEntity systemParamConfigEntity = getSystemParamConfig("upper_server_ip","中继上传服务地址" , "",""); - //direction.equals("U") ? getSystemParamConfig("upper_server_ip","中继上传服务地址" , "",""): - //getSystemParamConfig("sync_idc_lower_host","下级(下发)中继服务地址" , "","") ; - return systemParamConfigEntity.getParamValue(); + SystemParamConfigEntity systemParamConfigEntity = + direction.equals("U") ? getSystemParamConfig("upper_server_host","自助平台数据接收服务地址" , "","") : + direction.equals("I") ? getSystemParamConfig("lower_server_host","UDI系统数据接收服务地址" , "","") : null; + String host = systemParamConfigEntity!=null ? systemParamConfigEntity.getParamValue() : ""; + host = !StringUtils.isEmpty(host)&&host.substring(host.length() -1).equals("/") ? host.substring(0,host.length() -1) : host; + return host; } - /*判断是否最后一级*/ + /*判断是否上传或下发数据*/ private boolean isLastLevel(String direction) { - SystemParamConfigEntity systemParamConfigEntity = - direction.equals("I") ? getSystemParamConfig("sync_idc_top","是否顶级中继服务(连接自助平台)" , "0","0:否;1:是(是,接收下级上传数据后解析入库)"): - getSystemParamConfig("sync_idc_final","是否末级中继服务(连接UDI管理系统)" , "0","0:否;1:是(是,接收上级下发数据后解析入库)") ; - return systemParamConfigEntity.getParamValue().equals("0") ? false : true ; - + return direction.equals("I") || direction.equals("U") ? true : false ; } /*检查当前系统为自助平台(下发)还是UDI系统(上传),返回传输方向,如果都未设置,默认返回下发*/ private String getDirection() { - SystemParamConfigEntity systemParamConfigEntity = getSystemParamConfig("sync_idc_top","是否顶级中继服务(连接自助平台)" , "0","0:否;1:是(是,接收下级上传数据后解析入库)"); - System.out.print(">>>>>>>>>>"+JSON.toJSONString(systemParamConfigEntity)+"\n"); - if(systemParamConfigEntity.getParamValue().equals("0")) { - systemParamConfigEntity = getSystemParamConfig("sync_idc_final","是否末级中继服务(连接UDI管理系统)" , "0","0:否;1:是(是,接收上级下发数据后解析入库)"); - if(systemParamConfigEntity.getParamValue().equals("1")) - return "U"; - } - return "I"; - + SystemParamConfigEntity systemParamConfigEntity = getSystemParamConfig("sync_system_type","系统类型" , "IDC","UDI(UDI管理系统),SPS(自助平台),IDC(中继服务)"); + return systemParamConfigEntity.getParamValue().equals("UDI") ? "U" : systemParamConfigEntity.getParamValue().equals("SPS") ? "I" : "N"; } /*是否需要转发*/ private boolean isRelay(String direction) { - String relayStr = direction.equals("U") ? "sync_upstream_enable" : "sync_downstream_enable"; + String relayHost = getNextHost(direction); - SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey(relayStr); - if (systemParamConfigEntity!=null&&systemParamConfigEntity.getParamValue()!=null&& - systemParamConfigEntity.getParamValue().equals("0")) { - return false; - } - return true; + return !StringUtils.isEmpty(relayHost); } diff --git a/api-admin/src/main/java/com/glxp/sale/admin/idc/thread/AsyncFetchTask.java b/api-admin/src/main/java/com/glxp/sale/admin/idc/thread/AsyncFetchTask.java new file mode 100644 index 0000000..fcab829 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/idc/thread/AsyncFetchTask.java @@ -0,0 +1,54 @@ +package com.glxp.sale.admin.idc.thread; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Component; + +import com.glxp.sale.admin.dao.info.ScheduledDao; +import com.glxp.sale.admin.entity.info.ScheduledEntity; +import com.glxp.sale.admin.idc.service.IdcService; +import com.glxp.sale.admin.req.info.ScheduledRequest; + +@Component +@EnableScheduling +public class AsyncFetchTask implements SchedulingConfigurer { + + final Logger logger = LoggerFactory.getLogger(AsyncIdcTask.class); + + @Resource + private ScheduledDao scheduledDao; + + @Resource + private IdcService idcService; + + @Override + public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { + scheduledTaskRegistrar.addTriggerTask(() -> process(), + triggerContext -> { + ScheduledRequest scheduledRequest = new ScheduledRequest(); + scheduledRequest.setCronName("syncFetch"); + logger.info("syncFetch----------------"); + ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest); + String cron = scheduledEntity!=null ? scheduledEntity.getCron() : "15 * * * * ?"; + + if (cron.isEmpty()) { + logger.error("cron is null"); + } + logger.info("syncFetch----------------"); + return new CronTrigger(cron).nextExecutionTime(triggerContext); + }); + } + + private void process() { + logger.info("syncFetch----process------------"); + + idcService.asyncFetchTask(); + } + +} \ No newline at end of file diff --git a/api-admin/src/main/java/com/glxp/sale/admin/util/RedirectUtils.java b/api-admin/src/main/java/com/glxp/sale/admin/util/RedirectUtils.java new file mode 100644 index 0000000..cc82a76 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/util/RedirectUtils.java @@ -0,0 +1,5 @@ +package com.glxp.sale.admin.util; + +public class RedirectUtils { + +}