增加数据删除同步

master
chengqf 2 years ago
parent 852dd4a41f
commit d63bcbf598

@ -55,6 +55,7 @@ import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
/*数据中继数据中心(接收)*/
@Slf4j
@Service
public class IdcServiceImpl implements IdcService {
@ -202,7 +203,7 @@ public class IdcServiceImpl implements IdcService {
}
}
private void fetchFailFile(String host) {
Map<String,Object> map = new HashMap<String,Object>();
map.put("sql", "select * from idc_file where createTime<date_sub(now(),interval 30 minute) order by createTime");
@ -222,6 +223,7 @@ public class IdcServiceImpl implements IdcService {
return;
}
}
String[] syncTables = TableUtils.syncTables();
for (int i = 0; i < syncTables.length; i++) {
String[] tnames = syncTables[i].split("/");
@ -231,12 +233,84 @@ public class IdcServiceImpl implements IdcService {
if (sync) {
String syncIp = map.get("syncIp") != null ? map.get("syncIp").toString() : "";
syncData(syncTables[i], isUpload, syncIp);
asyncDelete(syncTables[i], isUpload, syncIp);
}
}
}
}
/*数据删除同步*/
private void asyncDelete(String tname,boolean isUpload,String syncIp) {
String[] tnames = tname.split("/");
String lastUpdateTime = getUpdateTime(tnames[2] + "." + tnames[0]+"."+tnames[1]+".delete");
Date nowUpdateTime = new Date();
String where = "tableName='"+tnames[2].toLowerCase()+"' and updateTime between cast('"+lastUpdateTime+"' as datetime) "+
" and cast('"+DateUtil.formatDate(nowUpdateTime, "yyyy-MM-dd HH:mm:ss")+"' as datetime)";
Map<String,Object> count = new HashMap<String,Object>();
count.put("sql", "select count(*) from idc_delete where "+where);
int total = dbDao.count(count);
int limit = 50;
if (total > 0) {
boolean success = true;
for (int i = 0; i < Math.ceil(total / limit) + 1; i++) {
Date startTime = new Date();
Map<String,Object> map = new HashMap<String,Object>();
map.put("sql", "select * from idc_delete");
map.put("sqlWhere", where);
map.put("limit", limit);
map.put("page", i * limit);
List<Map<String,Object>> list = dbDao.list(map);
if(list!=null&&map.size()>0) {
List<Map<String,Object>> data = new ArrayList<>();
for(int k=0;i<list.size();k++) {
String line = list.get(k).get("uniqueValue").toString();
JSONObject obj = JSON.parseObject(line);
String uniqueColumn = "";
for(String key : obj.keySet()){
uniqueColumn += uniqueColumn.length()>0 ? ","+key : key;
}
obj.put("uniqueColumn", uniqueColumn);
obj.put("operateMode", "D");
data.add(obj);
}
Map<String,Object> msg = new HashMap<String, Object>();
msg.put("messageId", CustomUtil.getId());
msg.put("messageType", tnames[9]+"(删除)");
msg.put("apiCode", "common");
msg.put("tableName", DBAUtils.tableAliasName(tnames[2]));
msg.put("sendTime", new Date());
msg.put("version", "1.0");
msg.put("total", data.size());
msg.put("data", data);
if (isUpload) {
String result = "";
try {
result = relay("", JSON.toJSONString(msg), null, syncIp);
} catch (Exception ex) {
}
if (IDCUtils.isJson(result)) {
JSONObject json = JSON.parseObject(result);
if (json.getInteger("code") == 20000) {
saveIdcLog(tnames[9], "", tnames[2] + ">success(delete)", i * limit, total);
} else {
success = false;
saveIdcLog(tnames[9], "", tnames[2] + ">" + result, i * limit, total);
}
} else {
success = false;
saveIdcLog(tnames[9], "", syncIp + ":" + tnames[2] + ">fail:上传地址未连通", i * limit, total);
}
} else {
saveIdcLog(tnames[9], "", tnames[2] + ">success(delete)", i * limit, total);
}
syncAddTaskStatus(msg, isUpload ? 1 : 0, success, startTime, isUpload);
}
}
}
}
/*单独表调用tableName可只传表名如传完整参数按SYNCS_TABLES格式*/
@Override
@ -579,7 +653,7 @@ public class IdcServiceImpl implements IdcService {
Map<String, Object> whereParams = new HashMap<String, Object>();
whereParams.put("sqlWhere", params.get("sqlWhere"));
String dataWhere = params.get("dataWhere")!=null ? params.get("dataWhere").toString() : "";
Map<String, Object> map = new HashMap<String, Object>();
String where = DBAUtils.convertWhere(column, whereParams, dataWhere);
@ -811,7 +885,7 @@ public class IdcServiceImpl implements IdcService {
private boolean analyToDB(String host, String tableName, String uniqueColumn, String
filePathColumn, List<Map<String, Object>> list, boolean isUpload) {
String tName = DBAUtils.tableRealName(tableName);
String sql = "replace " + tName + "(";
String del = "delete from " + tName + " where ";
@ -819,6 +893,7 @@ public class IdcServiceImpl implements IdcService {
String[] keyColumn = new String[30];
String[] keyDataType = new String[30];
List<Map<String, String>> columnList = dbDao.listColumnsMysql(tName);
Map<String,Object> column = getColumn(tName);
boolean result = false;
int key = 0;
int col = 0;
@ -856,15 +931,24 @@ public class IdcServiceImpl implements IdcService {
}
if ("A,D,U".contains(operateMode)) {
for (int z = 0; z < keyColumn.length; z++) {
if (list.get(i).get(keyColumn[z]) != null && !StringUtils.isEmpty(list.get(i).get(keyColumn[z]).toString())) {
if(operateMode.equals("D")&&list.get(i).get("uniqueColumn")!=null) {
String[] ucs = list.get(i).get("uniqueColumn").toString().split(",");
for(String str:ucs) {
Map<String,Object> map = (Map<String, Object>) column.get(str);
String dataType = map.get("dataType").toString();
updateWhere += !StringUtils.isEmpty(updateWhere) ? " and " : " ";
String value = list.get(i).get(keyColumn[z]) != null ? list.get(i).get(keyColumn[z]).toString() : "";
value = keyDataType[z].equals("D") ? "cast('" + DateUtil.formatDate(IDCUtils.parseDate(value), "yyyy-MM-dd HH:mm:ss") + "' as datetime)" : value;
updateWhere += keyColumn[z] + " = " + (keyDataType[z].equals("C") ? "'" : "") + value + (keyDataType[z].equals("C") ? "'" : "");
updateWhere += str +" = "+(dataType.equals("C") ? "'" : "") + list.get(i).get(str) + (dataType.equals("C") ? "'" : "");
}
} else {
for (int z = 0; z < keyColumn.length; z++) {
if (list.get(i).get(keyColumn[z]) != null && !StringUtils.isEmpty(list.get(i).get(keyColumn[z]).toString())) {
updateWhere += !StringUtils.isEmpty(updateWhere) ? " and " : " ";
String value = list.get(i).get(keyColumn[z]) != null ? list.get(i).get(keyColumn[z]).toString() : "";
value = keyDataType[z].equals("D") ? "cast('" + DateUtil.formatDate(IDCUtils.parseDate(value), "yyyy-MM-dd HH:mm:ss") + "' as datetime)" : value;
updateWhere += keyColumn[z] + " = " + (keyDataType[z].equals("C") ? "'" : "") + value + (keyDataType[z].equals("C") ? "'" : "");
}
}
}
if ("A,D".contains(operateMode) && !StringUtils.isEmpty(updateWhere))
dbDao.delete(del + updateWhere);
}
@ -1034,27 +1118,27 @@ public class IdcServiceImpl implements IdcService {
int total = 0;
if (result!=null&&result.isSuccessful()&&MediaType.parse("application/force-download").equals(result.body().contentType())) {
try (InputStream inputStream = result.body().byteStream()) {
FileOutputStream outputStream = new FileOutputStream(filePath + filePathSlash + imagePath+fileName);
byte b[] = new byte[1024];
int len = 0;
while ((len = inputStream.read(b)) != -1) {
total += len;
outputStream.write(b, 0, len);
}
outputStream.flush();
outputStream.close();
if(!(total>0)) {
new File(filePath + filePathSlash + imagePath+fileName).delete();
executeSql("delete from idc_file where filePath='"+fileName+"'");
}
FileOutputStream outputStream = new FileOutputStream(filePath + filePathSlash + imagePath+fileName);
byte b[] = new byte[1024];
int len = 0;
while ((len = inputStream.read(b)) != -1) {
total += len;
outputStream.write(b, 0, len);
}
outputStream.flush();
outputStream.close();
if(!(total>0)) {
new File(filePath + filePathSlash + imagePath+fileName).delete();
executeSql("delete from idc_file where filePath='"+fileName+"'");
}
} catch (Exception e) {
@ -1252,7 +1336,7 @@ public class IdcServiceImpl implements IdcService {
}
private void executeSql(String sql) {
try {
jdbcTemplate.execute(sql);
@ -1263,4 +1347,4 @@ public class IdcServiceImpl implements IdcService {
}
}
}
Loading…
Cancel
Save