增加独立发送方法,接口

master
chengqf 2 years ago
parent f0b090c17d
commit fd7d7d160c

@ -57,7 +57,14 @@ public class IdcController {
return idcService.receive(request, content, files);
}
@RequestMapping(value = "/spssync/common/once")
@ResponseBody
public BaseResponse once(HttpServletRequest request,@RequestBody Map<String, Object> params) {
//
boolean isUpload = params.get("isUpload")!=null && params.get("isUpload").equals("1") ? true : false ;
return idcService.onceSync( params.get("tableName").toString(), isUpload);
}
//@Log("数据同步测试")
@RequestMapping(value = "/spssync/common/test")
public BaseResponse test(HttpServletRequest request, @RequestBody Map<String, Object> params) {

@ -9,10 +9,10 @@ import org.springframework.web.multipart.MultipartFile;
import com.glxp.api.common.res.BaseResponse;
/*数据中继数据中心(接收)*/
public interface IdcService {
BaseResponse receive(HttpServletRequest request,
String content,MultipartFile[] files);
@ -23,7 +23,7 @@ public interface IdcService {
BaseResponse send(String messageType,String tableName,Map<String,Object> params);
BaseResponse taskList(HttpServletRequest request,Map<String,Object> params);
BaseResponse download(HttpServletRequest request,Map<String,Object> params);
BaseResponse uploadStatus(HttpServletRequest request,Map<String,Object> params);
BaseResponse downloadStatus(HttpServletRequest request,Map<String,Object> params);
@ -36,4 +36,6 @@ public interface IdcService {
void asyncSpsTask();
void downloadFile(String fileName,HttpServletResponse response) ;
BaseResponse onceSync(String tableName,boolean isUpload);
}

@ -85,6 +85,7 @@ public class IdcServiceImpl implements IdcService {
@Resource
private ScheduledDao scheduledDao;
/*获取拉取任务列表*/
@Override
public BaseResponse taskList(HttpServletRequest request, Map<String, Object> params) {
Map<String, Object> map = new HashMap<String, Object>();
@ -93,6 +94,7 @@ public class IdcServiceImpl implements IdcService {
return ResultVOUtils.success(list);
}
/*下载任务*/
@Override
public BaseResponse download(HttpServletRequest request, Map<String, Object> params) {
@ -132,6 +134,7 @@ public class IdcServiceImpl implements IdcService {
return ResultVOUtils.success(object);
}
/*UDI系统上传自助平台*/
@Override
@ -147,13 +150,14 @@ public class IdcServiceImpl implements IdcService {
asyncDataTask(false);
}
/*拉取前一级中继服务数据*/
/*UDI系统拉取前一级中继服务或自助平台数据*/
@Async
@Override
public void asyncFetchUdiTask() {
fetchTask(false);
}
/*中继服务拉取任务*/
@Async
@Override
public void asyncFetchTask() {
@ -166,6 +170,7 @@ public class IdcServiceImpl implements IdcService {
}
/*任务拉取,isIdc 是否中继服务*/
private void fetchTask(boolean isIdc) {
String host = getNextHost();
@ -174,10 +179,8 @@ public class IdcServiceImpl implements IdcService {
if (map != null && map.get("syncIp") != null)
host = map.get("syncIp").toString();
}
logger.info("fetch from ip:" + host);
if (!StringUtils.isEmpty(host)) {
String result = IDCUtils.post(host + "/spssync/common/list", null);
if (IDCUtils.isJson(result)) {
JSONObject json = JSON.parseObject(result);
if (json != null && json.getInteger("code") == 20000 && json.getString("data") != null) {
@ -194,7 +197,7 @@ public class IdcServiceImpl implements IdcService {
}
}
private void asyncDataTask(boolean isUpload) {
initTable();
Map<String, Object> map = dbDao.get("select * from sync_data_set limit 1");
@ -206,7 +209,6 @@ public class IdcServiceImpl implements IdcService {
}
}
String[] syncTables = TableUtils.syncTables();
for (int i = 0; i < syncTables.length; i++) {
String[] tnames = syncTables[i].split("/");
boolean sync = ((tnames[0]==null||StringUtils.isEmpty(tnames[0])||(tnames[0]!=null&&tnames[0].equals("null")))&&
@ -221,19 +223,53 @@ public class IdcServiceImpl implements IdcService {
}
}
private void syncData(String t, boolean isUpload, String syncIp) {
/*单独表调用tableName可只传表名如传完整参数按SYNCS_TABLES格式*/
@Override
public BaseResponse onceSync(String tableName,boolean isUpload) {
String tnames = tableName.contains("/") ? tableName : "//"+tableName+"///////";
String[] ts = tnames.split("/");
String ip="";
Map<String, Object> map = dbDao.get("select * from sync_data_set limit 1");
if(isUpload) {
if (!(map.get("syncIp") != null && !StringUtils.isEmpty(map.get("syncIp").toString()))) {
logger.error("中继服务地址未配置");
return ResultVOUtils.error(9999, "中继服务地址未配置");
}
ip = map.get("syncIp").toString();
}
if(!StringUtils.isEmpty(ts[0])) {
if(!(map!=null&&map.get(ts[0])!=null&&map.get(ts[0]).equals("1")))
return ResultVOUtils.error(9999, "当前不允许生成");
}
if(syncData(tnames,isUpload,ip))
return ResultVOUtils.success();
return ResultVOUtils.error(9999, "系统错误");
}
private boolean syncData(String t, boolean isUpload, String syncIp) {
boolean sync = true;
boolean result = false;
try {
String[] tnames = t.split("/");
String lastUpdateTime = getUpdateTime(tnames[2] + "." + tnames[0]);
Date nowUpdateTime = new Date();
//if (!StringUtils.isEmpty(tnames[0])) {
List<Map<String, String>> keyList = dbDao.listKeyMysql(tnames[2]);
String keyColumn = keyList != null && keyList.size() > 0 ? keyList.get(0).get("columnName").toString() : "id";
Map<String, Object> map = new HashMap<String, Object>();
String sqlWhere = "";
if (!StringUtils.isEmpty(tnames[6])) {
sqlWhere += " " + tnames[6] + ">= cast('" + lastUpdateTime + "' as datetime)";// and date_add(cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime),INTERVAL 1 day)";
String updateTimeColumn = tnames[6];
if(StringUtils.isEmpty(updateTimeColumn)) {
Map<String,Object> tMap = dbDao.get("select column_name columnName from information_schema.columns where column_name='updateTime' and lower(table_name) = lower('"+tnames[0]+"') and table_schema = (select database()) limit 1");
updateTimeColumn = tMap !=null ? "updateTime" : "";
}
if (!StringUtils.isEmpty(updateTimeColumn)) {
sqlWhere += " " + updateTimeColumn + ">= cast('" + lastUpdateTime + "' as datetime)";// and date_add(cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime),INTERVAL 1 day)";
sqlWhere += " and not exists (select fkId from idc_record where type='" + tnames[2] + "' and fkId=" + tnames[2] + "." + keyColumn + " and createTime>date_sub(now(),interval 15 MINUTE))";
} else {
sqlWhere = "not exists (select fkId from idc_record where type='" + tnames[2] + "' and fkId=" + tnames[2] + "." + keyColumn + ")";
@ -261,14 +297,18 @@ public class IdcServiceImpl implements IdcService {
sync = nextTimePoint.before(nowUpdateTime);
}
if (sync) {
if (syncMasterData(map, isUpload, syncIp)) {
result = syncMasterData(map, isUpload, syncIp);
if (result) {
setUpdateTime(tnames[2] + "." + tnames[0], DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss"));
}
}
//}
} catch (Exception ex) {
}
return result;
}
private String fetchData(String host, Map<String, Object> params) {
OkHttpClient client = new OkHttpClient().newBuilder()
.build();
@ -504,11 +544,13 @@ public class IdcServiceImpl implements IdcService {
int childNum = -1;
/*子表*/
String[] syncTables = TableUtils.syncTables();
for (String str : syncTables) {
if (str.contains("/" + tableKey + "/")) {
childNum++;
childs[childNum] = str;
}
if(!StringUtils.isEmpty(tableKey)) {
for (String str : syncTables) {
if (str.contains("/" + tableKey + "/")) {
childNum++;
childs[childNum] = str;
}
}
}
Map<String, String> table = dbDao.getMysql(tableName);
if (!(table != null && table.get("tableName") != null))

Loading…
Cancel
Save