master
admin 2 years ago
parent f8fef1b86d
commit a1d625c0d6

@ -27,7 +27,6 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import com.alibaba.fastjson.JSON;
@ -59,7 +58,6 @@ import okhttp3.Response;
/*数据中继数据中心(接收)*/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class IdcServiceImpl implements IdcService {
private static final Logger logger = LoggerFactory.getLogger(IdcServiceImpl.class);
@Value("${file_path}")
@ -75,16 +73,22 @@ public class IdcServiceImpl implements IdcService {
/*同步表,格式:同步设置表列名/同步设置表列名(子表时设置,主表不设置)/主表唯一列(多列逗号分隔)/主表关联列/子表关联列/数据库实际表/时间列/图片或文件列/数据条件/说明*/
private final String[] SYNC_TABLES= {
"entrustAction//basic_entrust_accept/id//////委托验收","basicProducts//basic_udirel/id///updateTime///耗材字典","/basicProducts/basic_products/id/uuid/uuid////耗材字典信息详情",
"//company_product_relevance////updateTime///供应商关联信息",
"/companyCert/company_product_relevance/id/customerId/unitFk/updateTime///供应商关联信息",
"basicCorp//basic_corp/id///updateTime///往来单位","typeThird//thr_bustype_origin/id///updateTime///第三方单据类型",
"basicThirdCorp//thr_corp/id///updateTime///第三方往来单位","//thr_dept/id///updateTime///第三方部门",
"//thr_inv_products/id///updateTime///第三方库存","basicThirdInv//thr_inv_warehouse/id//////第三方仓库",
"basicThirdBusOrder//thr_order/id///updateTime///第三方业务单据","/basicThirdBusOrder/thr_order_detail/orderIdFk/id/orderIdFk/updateTime///第三方单据详情",
"basicThirdProducts//thr_products/id///updateTime///第三方产品信息","//sup_cert/id///updateTime/filePath//资质证书信息",
"basicThirdProducts//thr_products/id///updateTime///第三方产品信息",
"//sup_cert_set/id///updateTime///供应商资质证书设置","companyCert//sup_company/customerId///updateTime///配送企业",
"manufacturerCert//sup_manufacturer/id///updateTime///生产企业","productCert//sup_product/id///updateTime///产品资质信息",
"/companyCert/sup_cert/id/customerId/customerId/updateTime/filePath/type=1/配送企业资质证书信息",
"/manufacturerCert/sup_cert/id/manufacturerId/manufacturerIdFk/updateTime/filePath/type=2/生产企业资质证书信息",
"/productCert/sup_cert/id/productId/productIdFk/updateTime/filePath/type=3/产品资质证书信息",
"//udicompany/id///updateTime///国际库医疗器械注册人信息"};
//"dbDiProducts//productinfo/id///updateTime///DI产品信息",
//"basicInv/////////仓库字典",
//"typeBus/////////业务单据类型",
//"typeScan/////////扫码单据类型",
private int orderNum=0;
@Resource
@ -119,11 +123,11 @@ public class IdcServiceImpl implements IdcService {
BaseResponse response = JSON.parseObject(json,BaseResponse.class);
return response;
}
return ResultVOUtils.success(object);
}
/*UDI系统上传自助平台*/
@Override
public void asyncUdiTask() {
@ -136,7 +140,7 @@ public class IdcServiceImpl implements IdcService {
logger.info("自助平台同步任务开始--------");
asyncDataTask(false);
}
/*拉取前一级中继服务数据*/
@Async
@Override
@ -149,7 +153,7 @@ public class IdcServiceImpl implements IdcService {
fetchTask(true);
}
private void fetchTask(boolean isIdc) {
String host = getNextHost();
if(!isIdc) {
@ -160,7 +164,7 @@ public class IdcServiceImpl implements IdcService {
logger.info("fetch from ip:"+host);
if(!StringUtils.isEmpty(host)) {
String result = post(host+"/spssync/common/list",null);
if(IDCUtils.isJson(result)) {
JSONObject json = JSON.parseObject(result);
if(json!=null&&json.getInteger("code")==20000&&json.getString("data")!=null) {
@ -176,7 +180,7 @@ public class IdcServiceImpl implements IdcService {
}
}
}
@Async
@Override
public void asyncIdcTask() {
@ -186,7 +190,7 @@ public class IdcServiceImpl implements IdcService {
dbDao.save("insert into idc_var (code,content) values ('system_type','IDC')");
}
}
private void asyncDataTask(boolean isUpload) {
initTable();
@ -200,23 +204,25 @@ public class IdcServiceImpl implements IdcService {
}
for(int i=0;i<SYNC_TABLES.length;i++) {
String[] tnames = SYNC_TABLES[i].split("/");
boolean sync = StringUtils.isEmpty(tnames[0]) ||
(!StringUtils.isEmpty(tnames[0])&&map!=null&&map.get(tnames[0])!=null&&map.get(tnames[0]).toString().equals("1"));
saveIdcLog("---","",map.get(tnames[0])+SYNC_TABLES[i],0,0);
if(!StringUtils.isEmpty(tnames[0])&&map!=null&&map.get(tnames[0])!=null&&map.get(tnames[0]).toString().equals("1")) {
if(sync) {
String syncIp = map.get("syncIp")!=null ? map.get("syncIp").toString() : "";
syncData(SYNC_TABLES[i],isUpload,syncIp);
}
}
}
}
private void syncData(String t,boolean isUpload,String syncIp) {
boolean sync = true;
String[] tnames = t.split("/");
String lastUpdateTime = getUpdateTime(tnames[3]);
String lastUpdateTime = getUpdateTime(tnames[2]+"."+tnames[0]);
Date nowUpdateTime = new Date();
if(!StringUtils.isEmpty(tnames[0])) {
List<Map<String, String>> keyList = dbDao.listKeyMysql(tnames[2]);
String keyColumn = keyList!=null&&keyList.size()>0 ? keyList.get(0).get("columnName").toString() : "id";
Map<String,Object> map = new HashMap<String,Object>();
@ -229,7 +235,7 @@ public class IdcServiceImpl implements IdcService {
map.put("isEnd", "1");
}
map.put("sqlWhere", sqlWhere);
map.put("tableKey", tnames[0]);
map.put("tableName", tnames[2]);
map.put("uniqueColumn", tnames[3]);
@ -243,7 +249,7 @@ public class IdcServiceImpl implements IdcService {
scheduledRequest.setCronName("sync"+WordUtils.capitalizeFully(tnames[2], new char[]{'_'}).replace("_", ""));
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
String cron = scheduledEntity!=null ? scheduledEntity.getCron() : "";
if(!StringUtils.isEmpty(cron)) {
CronSequenceGenerator cronSequenceGenerator = new CronSequenceGenerator(cron);
Date nextTimePoint = cronSequenceGenerator.next(DateUtil.parseDate(lastUpdateTime));
@ -251,13 +257,13 @@ public class IdcServiceImpl implements IdcService {
}
if(sync) {
if(syncMasterData(map,isUpload,syncIp)) {
setUpdateTime(tnames[2],DateUtil.formatDate(nowUpdateTime,"yyyy-MM-dd HH:mm:ss"));
setUpdateTime(tnames[2]+"."+tnames[0],DateUtil.formatDate(nowUpdateTime,"yyyy-MM-dd HH:mm:ss"));
}
}
}
}
private String fetchData(String host,Map<String,Object> params) {
OkHttpClient client = new OkHttpClient().newBuilder()
@ -276,12 +282,12 @@ public class IdcServiceImpl implements IdcService {
Response response = client.newCall(request).execute();
result = response.body().string();
JSONObject json = JSONObject.parseObject(result);
if(isLastLevel()) {
if(json.get("code")!=null) {
success = analyToDB(host,json.getJSONObject("data"),files,false);
success = analyMiddle(host,json.getJSONObject("data"),files,false,true);
} else {
success = analyToDB(host,json,files,false);
success = analyMiddle(host,json,files,false,true);
}
if(!success)
logger.info("fetchData-->解析失败");
@ -292,12 +298,12 @@ public class IdcServiceImpl implements IdcService {
syncAddTaskStatus(json,false,success);
}
}
if(success) {
String ret = post(host+"/spssync/common/success",params);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@ -353,7 +359,7 @@ public class IdcServiceImpl implements IdcService {
IDCUtils.createDirectory(filePath +filePathSlash +datePath+"/");
if(!FileUtils.makeDirectory(backFilePath +backFileSlash +datePath))
IDCUtils.createDirectory(filePath +backFileSlash +datePath+"/");
FileUtils.SaveFileAs(content, fileName);
FileUtils.SaveFileAs(content, backFileName);
String[] saveFiles= {};
@ -380,7 +386,7 @@ public class IdcServiceImpl implements IdcService {
syncAddTaskStatus(json,true,true);
/*解析入库*/
if(isLastLevel()) {
if(!analyToDB("",json,files,true))
if(!analyMiddle("",json,files,true,true))
return ResultVOUtils.error(9000, "解析失败");
} else {
//需要转发
@ -393,7 +399,7 @@ public class IdcServiceImpl implements IdcService {
} else {
return ResultVOUtils.error(9000, "转发失败");
}
}
}
return ResultVOUtils.success();
@ -403,17 +409,17 @@ public class IdcServiceImpl implements IdcService {
/*数据同步,从数据库获取数据下发或上传下级中继服务*/
@Override
public BaseResponse send(Map<String,Object> params) {
return send(params.get("messageType").toString(),params.get("tableName").toString(),params);
return send(params.get("messageType").toString(),params.get("tableName").toString(),params);
}
@Override
public BaseResponse send(String messageType,String tableName,Map<String,Object> params) {
return ResultVOUtils.error(9999, "");
}
@Override
public void downloadFile(String fileName,HttpServletResponse response) {
OutputStream os;
@ -440,7 +446,7 @@ public class IdcServiceImpl implements IdcService {
dbDao.save(sql);
}
private void saveExportStatus(Map<String,Object> params) {
Map<String,Object> map = new HashMap<String,Object>();
map.put("receiveStatus", "0");
@ -480,8 +486,8 @@ public class IdcServiceImpl implements IdcService {
dbDao.save(sql);
}
private boolean syncMasterData(Map<String,Object> params,boolean isUpload,String syncIp) {
boolean success=false;
String tableName = params.get("tableName").toString();
@ -504,13 +510,13 @@ public class IdcServiceImpl implements IdcService {
Map<String,Object> whereParams = new HashMap<String,Object>();
whereParams.put("sqlWhere", params.get("sqlWhere"));
whereParams.put("dataWhere", params.get("dataWhere"));
Map<String,Object> map = new HashMap<String,Object>();
String where = DBAUtils.convertWhere(column,whereParams,"");
sql+=!StringUtils.isEmpty(where) ? " where "+where : "";
map.put("sql", sql);
int total = dbDao.count(map);
String isEnd = params.get("isEnd")!=null ? params.get("isEnd").toString() : "0";
int limit = 50;
String[] files = new String[limit];
@ -519,13 +525,13 @@ public class IdcServiceImpl implements IdcService {
String messageType = params.get("messageType").toString();
saveIdcLog(messageType,"",tableName +">"+where,0,total);
String filePathSlash = filePath.substring(filePath.length() -1).equals("/") ? "" : "/";
if(total>0) {
success = true;
whereParams.put("page", 0);
whereParams.put("limit", limit);
List<Map<String, String>> keyList = dbDao.listKeyMysql(tableName);
String keyColumn = keyList!=null&&keyList.size()>0 ? keyList.get(0).get("columnName").toString() : "id";
for(int i=0;i<Math.ceil(total/limit)+1;i++) {
@ -561,13 +567,13 @@ public class IdcServiceImpl implements IdcService {
}
data.replace("data", list);
orderNum++;
if(isUpload) {
String result = "";
try {
result = relay("",JSON.toJSONString(data),files,syncIp);
} catch (Exception ex) {
}
if(IDCUtils.isJson(result)) {
JSONObject json = JSON.parseObject(result);
@ -596,12 +602,12 @@ public class IdcServiceImpl implements IdcService {
}
}
}
}
}
return success;
}
/*增加同步任务状态*/
private void syncAddTaskStatus(Map<String,Object> json,boolean isUpload,boolean success) {
String content = JSON.toJSONString(json);
@ -612,10 +618,10 @@ public class IdcServiceImpl implements IdcService {
IDCUtils.createDirectory(filePath +datePath+"/");
if(!FileUtils.makeDirectory(backFilePath +datePath))
IDCUtils.createDirectory(filePath +datePath+"/");
FileUtils.SaveFileAs(content, fileName);
FileUtils.SaveFileAs(content, backFileName);
//type,transportType,superiorService,subordinateService,cacheFilePath
Map<String,Object> map = new HashMap<>();
map.put("type", json.get("messageType"));
@ -624,16 +630,12 @@ public class IdcServiceImpl implements IdcService {
map.put("taskId", json.get("messageId"));
map.put("cacheFilePath", fileName);
map.put("status", success ? "1" : "0");
saveExportStatus(map);
}
/*解析到数据库*/
private boolean analyToDB(String host,JSONObject jsonObject,MultipartFile[] files,boolean isUpload) {
private boolean analyMiddle(String host,JSONObject jsonObject,MultipartFile[] files,boolean isUpload,boolean isToDB) {
if(jsonObject.get("data")!=null) {
String tableName = "";
String uniqueColumn = "";
@ -653,7 +655,12 @@ public class IdcServiceImpl implements IdcService {
filePathColumn = obj.getString("filePathColumn");
}
if(!StringUtils.isEmpty(tableName)) {
return analyData(host,tableName,uniqueColumn,filePathColumn,list,isUpload);
if(isToDB) {
return analyToDB(host,tableName,uniqueColumn,filePathColumn,list,isUpload);
} else {
logger.info("files-->"+tableName);
return analyToFile(host,filePathColumn,list);
}
} else {
logger.error("数据格式错误:无数据标记");
}
@ -662,8 +669,24 @@ public class IdcServiceImpl implements IdcService {
return false;
}
private boolean analyToFile(String host,String filePathColumn,List<Map<String,Object>> list) {
if(list!=null&&list.size()>0&&!StringUtils.isEmpty(filePathColumn)) {
String[] files = new String[list.size()];
logger.info("filePathColumn-->"+filePathColumn);
for(int i=0;i<list.size();i++) {
files[i] = list.get(i).get(filePathColumn) !=null ? list.get(i).get(filePathColumn).toString() : "";
logger.info("file-->"+files[i]);
}
batchDownloadFile(host,files);
}
return true;
}
/*解析到数据库*/
/*按表名解析数据到数据库,子表暂未处理*/
private boolean analyData(String host,String tableName,String uniqueColumn,String filePathColumn,List<Map<String,Object>> list,boolean isUpload) {
private boolean analyToDB(String host,String tableName,String uniqueColumn,String filePathColumn,List<Map<String,Object>> list,boolean isUpload) {
String tName = DBAUtils.tableRealName(tableName);
String sql="replace "+tName +"(";
@ -707,7 +730,7 @@ public class IdcServiceImpl implements IdcService {
if(!StringUtils.isEmpty(filePathColumn)) {
files[i] = list.get(i).get(filePathColumn) !=null ? list.get(i).get(filePathColumn).toString() : "";
}
if("A,D,U".contains(operateMode)) {
for(int z=0;z<keyColumn.length;z++ ) {
if(list.get(i).get(keyColumn[z])!=null&&!StringUtils.isEmpty(list.get(i).get(keyColumn[z]).toString())) {
@ -717,7 +740,7 @@ public class IdcServiceImpl implements IdcService {
updateWhere+=keyColumn[z]+" = "+(keyDataType[z].equals("C") ? "'" :"") +value +(keyDataType[z].equals("C") ? "'" :"");
}
}
if("A,D".contains(operateMode)&&!StringUtils.isEmpty(updateWhere))
dbDao.delete(del+updateWhere);
}
@ -734,7 +757,7 @@ public class IdcServiceImpl implements IdcService {
value = DBAUtils.escape(value);
String dataType = columnList.get(k).get("dataType").toLowerCase().contains("char") || columnList.get(k).get("dataType").toLowerCase().contains("text") ?
"C" : columnList.get(k).get("dataType").toLowerCase().contains("date") ? "D" : "N" ;
sql+=m>0 ? "," : "";
boolean ups = list.get(i).containsKey(attrName) ? true : false ;
updateSet+=ups&&h>0 ? "," : "";
@ -762,11 +785,11 @@ public class IdcServiceImpl implements IdcService {
dbDao.update(upd + updateSet+" where "+updateWhere);
h+=ups ? 1 : 0;
}
sql+=")";
n++;
}
for(int m=0;m<30;m++) {
if(list.get(i).get("tableName"+m)!=null&&list.get(i).get("data"+m)!=null) {
Object obj = list.get(i).get("data"+m);
@ -774,12 +797,12 @@ public class IdcServiceImpl implements IdcService {
for (Object o : (List<?>) obj) {
chList.add((Map<String,Object>)o);
}
analyData(host,list.get(i).get("tableName"+m).toString(),"","",chList,isUpload);
analyToDB(host,list.get(i).get("tableName"+m).toString(),"","",chList,isUpload);
} else {
break;
}
}
}
if(n>0) {
result = (dbDao.save(sql)>0);
@ -787,6 +810,7 @@ public class IdcServiceImpl implements IdcService {
logger.error(tableName+"-->fetchSave Fail");
}
if(!isUpload&&!StringUtils.isEmpty(filePathColumn)) {
logger.info("downloadFile-->"+files.toString());
batchDownloadFile(host,files);
}
@ -866,20 +890,20 @@ public class IdcServiceImpl implements IdcService {
.addFormDataPart("fileName", fileName)
.build();
Request request = new Request.Builder()
.url(syncIp+"/spssync/common/downloadFile")
.post(requestBody)
.build();
try {
Response result = client.newCall(request).execute();
if (MediaType.parse("application/force-download").equals(result.body().contentType())) {
try (InputStream inputStream = result.body().byteStream()) {
String filePathSlash = filePath.substring(filePath.length() -1).equals("/") ? "" : "/";
FileOutputStream outputStream =new FileOutputStream(filePath +filePathSlash+fileName);
byte b[]=new byte[1024];
@ -894,14 +918,14 @@ public class IdcServiceImpl implements IdcService {
outputStream.flush();
} catch (Exception e) {
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@ -909,7 +933,7 @@ public class IdcServiceImpl implements IdcService {
return true;
}
/*获取转发服务地址当前值允许单向只使用参数upper_server_ip*/
private String getNextHost() {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService.selectByParamKey("upper_server_ip");
@ -923,7 +947,7 @@ public class IdcServiceImpl implements IdcService {
Map<String,Object> map = dbDao.get("select * from idc_var where code='system_type'");
return !(map!=null&&map.get("content")!=null&&map.get("content").toString().equals("IDC")) ;
}
/*是否需要转发*/
private boolean isRelay() {
String relayHost = getNextHost();
@ -934,7 +958,7 @@ public class IdcServiceImpl implements IdcService {
private List<Map<String,Object>> getList(String tableName,Map<String,Object> params) {
String sql = "select "+tableName+".*,'A' as operateMode from "+tableName;
Map<String,Object> column = getColumn(tableName);
String where = DBAUtils.convertWhere(column,params,"");
sql+=!StringUtils.isEmpty(where) ? " where "+where : "";
Map<String,Object> map = new HashMap<String,Object>();
@ -1034,7 +1058,7 @@ public class IdcServiceImpl implements IdcService {
} else {
//国家库数据
if(id.equals("productinfo")) {
}
updateTime = "2000-01-01 00:00:00";
String sql = "insert into idc_status (id,statusTime) values ('"+id+"',cast('"+updateTime+"' as datetime))";
@ -1064,7 +1088,7 @@ public class IdcServiceImpl implements IdcService {
jdbcTemplate.execute("alter table basic_export_status add column cacheFilePath varchar(255)");
jdbcTemplate.execute("alter table basic_upload_status add column cacheFilePath varchar(255)");
} catch (Exception e) {
}
}
private void alterTable(String tableName,String sql) {
@ -1074,6 +1098,6 @@ public class IdcServiceImpl implements IdcService {
}
}
}

Loading…
Cancel
Save