From fad72257047c5b47d14e0315e63bd21dcd5a3ee5 Mon Sep 17 00:00:00 2001 From: chengqf <584883665@139.com> Date: Wed, 12 Apr 2023 07:25:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=8B=AC=E7=AB=8B=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=96=B9=E6=B3=95=EF=BC=8C=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/idc/controller/IdcController.java | 8 +++ .../com/glxp/api/idc/service/IdcService.java | 6 +- .../api/idc/service/impl/IdcServiceImpl.java | 62 +++++++++++++++---- 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/glxp/api/idc/controller/IdcController.java b/src/main/java/com/glxp/api/idc/controller/IdcController.java index 13a386a2..96ae3b68 100644 --- a/src/main/java/com/glxp/api/idc/controller/IdcController.java +++ b/src/main/java/com/glxp/api/idc/controller/IdcController.java @@ -56,7 +56,15 @@ public class IdcController { return idcService.receive( request, content, files); } + @RequestMapping(value = "/spssync/common/once") + @ResponseBody + public BaseResponse once(HttpServletRequest request,@RequestBody Map params) { + // + boolean isUpload = params.get("isUpload")!=null && params.get("isUpload").equals("1") ? true : false ; + return idcService.onceSync( params.get("tableName").toString(), isUpload); + } + //@Log("数据同步测试") @RequestMapping(value = "/spssync/common/test") public BaseResponse test(HttpServletRequest request, @RequestBody Map params) { diff --git a/src/main/java/com/glxp/api/idc/service/IdcService.java b/src/main/java/com/glxp/api/idc/service/IdcService.java index 72f0634d..095a9abb 100644 --- a/src/main/java/com/glxp/api/idc/service/IdcService.java +++ b/src/main/java/com/glxp/api/idc/service/IdcService.java @@ -9,12 +9,14 @@ import org.springframework.web.multipart.MultipartFile; import com.glxp.api.common.res.BaseResponse; + /*数据中继数据中心(接收)*/ public interface IdcService { BaseResponse receive(HttpServletRequest request, String content,MultipartFile[] files); + BaseResponse receiveJson(HttpServletRequest request,Map params); BaseResponse uploadFile(HttpServletRequest request, String content,MultipartFile[] files); @@ -22,7 +24,7 @@ public interface IdcService { BaseResponse send(String messageType,String tableName,Map params); BaseResponse taskList(HttpServletRequest request,Map params); - + BaseResponse download(HttpServletRequest request,Map params); BaseResponse uploadStatus(HttpServletRequest request,Map params); BaseResponse downloadStatus(HttpServletRequest request,Map params); @@ -35,4 +37,6 @@ public interface IdcService { void asyncSpsTask(); void downloadFile(String fileName,HttpServletResponse response) ; + + BaseResponse onceSync(String tableName,boolean isUpload); } diff --git a/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java b/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java index 4041e3d6..b9105858 100644 --- a/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java +++ b/src/main/java/com/glxp/api/idc/service/impl/IdcServiceImpl.java @@ -85,6 +85,7 @@ public class IdcServiceImpl implements IdcService { @Resource private ScheduledDao scheduledDao; + /*获取拉取任务列表*/ @Override public BaseResponse taskList(HttpServletRequest request, Map params) { Map map = new HashMap(); @@ -93,6 +94,7 @@ public class IdcServiceImpl implements IdcService { return ResultVOUtils.success(list); } + /*下载任务*/ @Override public BaseResponse download(HttpServletRequest request, Map params) { @@ -114,7 +116,8 @@ public class IdcServiceImpl implements IdcService { id = params.get("id").toString(); } - Map map = map = dbDao.get("select * from " + tableName + " where id='" + id + "'"); + Map map = map = dbDao.get("select * from " + tableName + " where id='" + id + "'"); + if (map == null) return ResultVOUtils.error(9000, "下载任务不存在"); if (!(map.get("cacheFilePath") != null && FileUtils.isFileExist(map.get("cacheFilePath").toString()))) @@ -131,7 +134,7 @@ public class IdcServiceImpl implements IdcService { return ResultVOUtils.success(object); } - + /*UDI系统上传自助平台*/ @Override @@ -147,13 +150,14 @@ public class IdcServiceImpl implements IdcService { asyncDataTask(false); } - /*拉取前一级中继服务数据*/ + /*UDI系统拉取前一级中继服务或自助平台数据*/ @Async @Override public void asyncFetchUdiTask() { fetchTask(false); } + /*中继服务拉取任务*/ @Async @Override public void asyncFetchTask() { @@ -166,6 +170,7 @@ public class IdcServiceImpl implements IdcService { } + /*任务拉取,isIdc 是否中继服务*/ private void fetchTask(boolean isIdc) { String host = getNextHost(); @@ -174,10 +179,8 @@ public class IdcServiceImpl implements IdcService { if (map != null && map.get("syncIp") != null) host = map.get("syncIp").toString(); } - logger.info("fetch from ip:" + host); if (!StringUtils.isEmpty(host)) { String result = IDCUtils.post(host + "/spssync/common/list", null); - if (IDCUtils.isJson(result)) { JSONObject json = JSON.parseObject(result); if (json != null && json.getInteger("code") == 20000 && json.getString("data") != null) { @@ -194,7 +197,7 @@ public class IdcServiceImpl implements IdcService { } } - + private void asyncDataTask(boolean isUpload) { initTable(); Map map = dbDao.get("select * from sync_data_set limit 1"); @@ -207,11 +210,11 @@ public class IdcServiceImpl implements IdcService { } String[] syncTables = TableUtils.syncTables(); for (int i = 0; i < syncTables.length; i++) { - String[] tnames = syncTables[i].split("/"); + String[] tnames = syncTables[i].split("/"); boolean sync = ((tnames[0]==null||StringUtils.isEmpty(tnames[0])||(tnames[0]!=null&&tnames[0].equals("null")))&& (tnames[1]==null||StringUtils.isEmpty(tnames[1])||(tnames[1]!=null&&tnames[1].equals("null"))))|| (!StringUtils.isEmpty(tnames[0]) && map != null && map.get(tnames[0]) != null && map.get(tnames[0]).toString().equals("1")); - saveIdcLog("---"+sync, "", map.get(tnames[0]) + syncTables[i], 0, 0); + saveIdcLog("---", "", map.get(tnames[0]) + syncTables[i], 0, 0); if (sync) { String syncIp = map.get("syncIp") != null ? map.get("syncIp").toString() : ""; syncData(syncTables[i], isUpload, syncIp); @@ -220,8 +223,33 @@ public class IdcServiceImpl implements IdcService { } } - private void syncData(String t, boolean isUpload, String syncIp) { + + /*单独表调用,tableName可只传表名,如传完整参数按SYNCS_TABLES格式*/ + @Override + public BaseResponse onceSync(String tableName,boolean isUpload) { + String tnames = tableName.contains("/") ? tableName : "//"+tableName+"///////"; + String[] ts = tnames.split("/"); + String ip=""; + Map map = dbDao.get("select * from sync_data_set limit 1"); + if(isUpload) { + if (!(map.get("syncIp") != null && !StringUtils.isEmpty(map.get("syncIp").toString()))) { + logger.error("中继服务地址未配置"); + return ResultVOUtils.error(9999, "中继服务地址未配置"); + } + ip = map.get("syncIp").toString(); + } + if(!StringUtils.isEmpty(ts[0])) { + if(!(map!=null&&map.get(ts[0])!=null&&map.get(ts[0]).equals("1"))) + return ResultVOUtils.error(9999, "当前不允许生成"); + } + if(syncData(tnames,isUpload,ip)) + return ResultVOUtils.success(); + return ResultVOUtils.error(9999, "系统错误"); + } + + private boolean syncData(String t, boolean isUpload, String syncIp) { boolean sync = true; + boolean result = false; try { String[] tnames = t.split("/"); @@ -233,14 +261,22 @@ public class IdcServiceImpl implements IdcService { String keyColumn = keyList != null && keyList.size() > 0 ? keyList.get(0).get("columnName").toString() : "id"; Map map = new HashMap(); String sqlWhere = ""; - if (!StringUtils.isEmpty(tnames[6])) { - sqlWhere += " " + tnames[6] + ">= cast('" + lastUpdateTime + "' as datetime)";// and date_add(cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime),INTERVAL 1 day)"; + String updateTimeColumn = tnames[6]; + if(StringUtils.isEmpty(updateTimeColumn)) { + Map tMap = dbDao.get("select column_name columnName from information_schema.columns where column_name='updateTime' and lower(table_name) = lower('"+tnames[0]+"') and table_schema = (select database()) limit 1"); + updateTimeColumn = tMap !=null ? "updateTime" : ""; + } + + + if (!StringUtils.isEmpty(updateTimeColumn)) { + sqlWhere += " " + updateTimeColumn + ">= cast('" + lastUpdateTime + "' as datetime)";// and date_add(cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime),INTERVAL 1 day)"; sqlWhere += " and not exists (select fkId from idc_record where type='" + tnames[2] + "' and fkId=" + tnames[2] + "." + keyColumn + " and createTime>date_sub(now(),interval 15 MINUTE))"; } else { sqlWhere = "not exists (select fkId from idc_record where type='" + tnames[2] + "' and fkId=" + tnames[2] + "." + keyColumn + ")"; map.put("isEnd", "1"); } map.put("sqlWhere", sqlWhere); + map.put("tableKey", tnames[0]); map.put("tableName", tnames[2]); map.put("uniqueColumn", tnames[3]); @@ -261,7 +297,8 @@ public class IdcServiceImpl implements IdcService { sync = nextTimePoint.before(nowUpdateTime); } if (sync) { - if (syncMasterData(map, isUpload, syncIp)) { + result = syncMasterData(map, isUpload, syncIp); + if (result) { setUpdateTime(tnames[2] + "." + tnames[0], DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")); } } @@ -269,6 +306,7 @@ public class IdcServiceImpl implements IdcService { } catch (Exception ex) { } + return result; }