From fd7d7d160cecf85427c39a65a3100b4ecf33e056 Mon Sep 17 00:00:00 2001 From: chengqf <584883665@139.com> Date: Wed, 12 Apr 2023 07:28:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=8B=AC=E7=AB=8B=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=96=B9=E6=B3=95=EF=BC=8C=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/idc/controller/IdcController.java | 9 ++- .../com/glxp/api/idc/service/IdcService.java | 6 +- .../api/idc/service/impl/IdcServiceImpl.java | 72 +++++++++++++++---- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/glxp/api/idc/controller/IdcController.java b/src/main/java/com/glxp/api/idc/controller/IdcController.java index 667afbff0..ae07bdb9d 100644 --- a/src/main/java/com/glxp/api/idc/controller/IdcController.java +++ b/src/main/java/com/glxp/api/idc/controller/IdcController.java @@ -57,7 +57,14 @@ public class IdcController { return idcService.receive(request, content, files); } - + @RequestMapping(value = "/spssync/common/once") + @ResponseBody + public BaseResponse once(HttpServletRequest request,@RequestBody Map params) { + // + boolean isUpload = params.get("isUpload")!=null && params.get("isUpload").equals("1") ? true : false ; + return idcService.onceSync( params.get("tableName").toString(), isUpload); + } + //@Log("数据同步测试") @RequestMapping(value = "/spssync/common/test") public BaseResponse test(HttpServletRequest request, @RequestBody Map params) { diff --git a/src/main/java/com/glxp/api/idc/service/IdcService.java b/src/main/java/com/glxp/api/idc/service/IdcService.java index ca4459ecc..031d2f49c 100644 --- a/src/main/java/com/glxp/api/idc/service/IdcService.java +++ b/src/main/java/com/glxp/api/idc/service/IdcService.java @@ -9,10 +9,10 @@ import org.springframework.web.multipart.MultipartFile; import com.glxp.api.common.res.BaseResponse; + /*数据中继数据中心(接收)*/ public interface IdcService { - BaseResponse receive(HttpServletRequest request, String content,MultipartFile[] files); @@ -23,7 +23,7 @@ public interface IdcService { BaseResponse send(String messageType,String tableName,Map params); BaseResponse taskList(HttpServletRequest request,Map params); - + BaseResponse download(HttpServletRequest request,Map params); BaseResponse uploadStatus(HttpServletRequest request,Map params); BaseResponse downloadStatus(HttpServletRequest request,Map params); @@ -36,4 +36,6 @@ public interface IdcService { void asyncSpsTask(); void downloadFile(String fileName,HttpServletResponse response) ; + + BaseResponse onceSync(String tableName,boolean isUpload); } diff --git a/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java b/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java index 959483859..1b66a8e8f 100644 --- a/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java +++ b/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java @@ -85,6 +85,7 @@ public class IdcServiceImpl implements IdcService { @Resource private ScheduledDao scheduledDao; + /*获取拉取任务列表*/ @Override public BaseResponse taskList(HttpServletRequest request, Map params) { Map map = new HashMap(); @@ -93,6 +94,7 @@ public class IdcServiceImpl implements IdcService { return ResultVOUtils.success(list); } + /*下载任务*/ @Override public BaseResponse download(HttpServletRequest request, Map params) { @@ -132,6 +134,7 @@ public class IdcServiceImpl implements IdcService { return ResultVOUtils.success(object); } + /*UDI系统上传自助平台*/ @Override @@ -147,13 +150,14 @@ public class IdcServiceImpl implements IdcService { asyncDataTask(false); } - /*拉取前一级中继服务数据*/ + /*UDI系统拉取前一级中继服务或自助平台数据*/ @Async @Override public void asyncFetchUdiTask() { fetchTask(false); } + /*中继服务拉取任务*/ @Async @Override public void asyncFetchTask() { @@ -166,6 +170,7 @@ public class IdcServiceImpl implements IdcService { } + /*任务拉取,isIdc 是否中继服务*/ private void fetchTask(boolean isIdc) { String host = getNextHost(); @@ -174,10 +179,8 @@ public class IdcServiceImpl implements IdcService { if (map != null && map.get("syncIp") != null) host = map.get("syncIp").toString(); } - logger.info("fetch from ip:" + host); if (!StringUtils.isEmpty(host)) { String result = IDCUtils.post(host + "/spssync/common/list", null); - if (IDCUtils.isJson(result)) { JSONObject json = JSON.parseObject(result); if (json != null && json.getInteger("code") == 20000 && json.getString("data") != null) { @@ -194,7 +197,7 @@ public class IdcServiceImpl implements IdcService { } } - + private void asyncDataTask(boolean isUpload) { initTable(); Map map = dbDao.get("select * from sync_data_set limit 1"); @@ -206,7 +209,6 @@ public class IdcServiceImpl implements IdcService { } } String[] syncTables = TableUtils.syncTables(); - for (int i = 0; i < syncTables.length; i++) { String[] tnames = syncTables[i].split("/"); boolean sync = ((tnames[0]==null||StringUtils.isEmpty(tnames[0])||(tnames[0]!=null&&tnames[0].equals("null")))&& @@ -221,19 +223,53 @@ public class IdcServiceImpl implements IdcService { } } - private void syncData(String t, boolean isUpload, String syncIp) { + + /*单独表调用,tableName可只传表名,如传完整参数按SYNCS_TABLES格式*/ + @Override + public BaseResponse onceSync(String tableName,boolean isUpload) { + String tnames = tableName.contains("/") ? tableName : "//"+tableName+"///////"; + String[] ts = tnames.split("/"); + String ip=""; + Map map = dbDao.get("select * from sync_data_set limit 1"); + if(isUpload) { + if (!(map.get("syncIp") != null && !StringUtils.isEmpty(map.get("syncIp").toString()))) { + logger.error("中继服务地址未配置"); + return ResultVOUtils.error(9999, "中继服务地址未配置"); + } + ip = map.get("syncIp").toString(); + } + if(!StringUtils.isEmpty(ts[0])) { + if(!(map!=null&&map.get(ts[0])!=null&&map.get(ts[0]).equals("1"))) + return ResultVOUtils.error(9999, "当前不允许生成"); + } + if(syncData(tnames,isUpload,ip)) + return ResultVOUtils.success(); + return ResultVOUtils.error(9999, "系统错误"); + } + + private boolean syncData(String t, boolean isUpload, String syncIp) { boolean sync = true; + boolean result = false; + try { String[] tnames = t.split("/"); String lastUpdateTime = getUpdateTime(tnames[2] + "." + tnames[0]); Date nowUpdateTime = new Date(); //if (!StringUtils.isEmpty(tnames[0])) { + List> keyList = dbDao.listKeyMysql(tnames[2]); String keyColumn = keyList != null && keyList.size() > 0 ? keyList.get(0).get("columnName").toString() : "id"; Map map = new HashMap(); String sqlWhere = ""; - if (!StringUtils.isEmpty(tnames[6])) { - sqlWhere += " " + tnames[6] + ">= cast('" + lastUpdateTime + "' as datetime)";// and date_add(cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime),INTERVAL 1 day)"; + String updateTimeColumn = tnames[6]; + if(StringUtils.isEmpty(updateTimeColumn)) { + Map tMap = dbDao.get("select column_name columnName from information_schema.columns where column_name='updateTime' and lower(table_name) = lower('"+tnames[0]+"') and table_schema = (select database()) limit 1"); + updateTimeColumn = tMap !=null ? "updateTime" : ""; + } + + + if (!StringUtils.isEmpty(updateTimeColumn)) { + sqlWhere += " " + updateTimeColumn + ">= cast('" + lastUpdateTime + "' as datetime)";// and date_add(cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime),INTERVAL 1 day)"; sqlWhere += " and not exists (select fkId from idc_record where type='" + tnames[2] + "' and fkId=" + tnames[2] + "." + keyColumn + " and createTime>date_sub(now(),interval 15 MINUTE))"; } else { sqlWhere = "not exists (select fkId from idc_record where type='" + tnames[2] + "' and fkId=" + tnames[2] + "." + keyColumn + ")"; @@ -261,14 +297,18 @@ public class IdcServiceImpl implements IdcService { sync = nextTimePoint.before(nowUpdateTime); } if (sync) { - if (syncMasterData(map, isUpload, syncIp)) { + result = syncMasterData(map, isUpload, syncIp); + if (result) { setUpdateTime(tnames[2] + "." + tnames[0], DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")); } } //} + } catch (Exception ex) { + + } + return result; } - private String fetchData(String host, Map params) { OkHttpClient client = new OkHttpClient().newBuilder() .build(); @@ -504,11 +544,13 @@ public class IdcServiceImpl implements IdcService { int childNum = -1; /*子表*/ String[] syncTables = TableUtils.syncTables(); - for (String str : syncTables) { - if (str.contains("/" + tableKey + "/")) { - childNum++; - childs[childNum] = str; - } + if(!StringUtils.isEmpty(tableKey)) { + for (String str : syncTables) { + if (str.contains("/" + tableKey + "/")) { + childNum++; + childs[childNum] = str; + } + } } Map table = dbDao.getMysql(tableName); if (!(table != null && table.get("tableName") != null))