@ -226,11 +226,11 @@ public class IdcServiceImpl implements IdcService {
/*未发送重新发送*/
asyncFailTask ( map . get ( "syncIp" ) . toString ( ) ) ;
}
String [ ] syncTables = TableUtils . syncTables ( ) ;
for ( int i = 0 ; i < syncTables . length ; i + + ) {
String [ ] tnames = syncTables [ i ] . split ( "/" ) ;
boolean sync = ( StringUtils . isEmpty ( tnames [ 0 ] ) & & StringUtils . isEmpty ( tnames [ 1 ] ) ) | |
boolean sync = ( StringUtils . isEmpty ( tnames [ 0 ] ) & & StringUtils . isEmpty ( tnames [ 1 ] ) ) | |
( ! StringUtils . isEmpty ( tnames [ 0 ] ) & & map ! = null & & map . get ( tnames [ 0 ] ) ! = null & & map . get ( tnames [ 0 ] ) . toString ( ) . equals ( "1" ) ) ;
saveIdcLog ( "---" , "" , map . get ( tnames [ 0 ] ) + syncTables [ i ] , 0 , 0 ) ;
if ( sync ) {
@ -242,23 +242,24 @@ public class IdcServiceImpl implements IdcService {
}
}
/*上传失败重新上传*/
private void asyncFailTask ( String host ) {
try {
try
{
String filePathSlash = filePath . substring ( filePath . length ( ) - 1 ) . equals ( "/" ) ? "" : "/" ;
String sql = "select * from basic_export_status where status='1' and receiveStatus='0' where updateTime<date_sub(now(),interval 6 minute) order by updateTime" ;
Map < String , Object > map = new HashMap < String , Object > ( ) ;
Map < String , Object > map = new HashMap < String , Object > ( ) ;
map . put ( "sql" , sql ) ;
List < Map < String , Object > > list = dbDao . list ( map ) ;
List < Map < String , Object > > list = dbDao . list ( map ) ;
ArrayList < String > files = new ArrayList < > ( ) ;
for ( int i = 0 ; i < list . size ( ) ; i + + ) {
for ( int i = 0 ; i < list . size ( ) ; i + + ) {
String json = FileUtils . readFileAll ( list . get ( i ) . get ( "cacheFilePath" ) . toString ( ) ) ;
JSONObject object = JSONObject . parseObject ( json ) ;
List < Map > dataList = JSONObject . parseArray ( JSON . toJSONString ( object . get ( "data" ) ) , Map . class ) ;
for ( int m = 0 ; m < dataList . size ( ) ; m + + ) {
if ( object . get ( "filePathColumn" ) ! = null & & ! StringUtils . isEmpty ( object . get ( "filePathColumn" ) . toString ( ) ) & &
dataList . get ( m ) . get ( object . get ( "filePathColumn" ) . toString ( ) ) ! = null ) {
if ( object . get ( "filePathColumn" ) ! = null & & ! StringUtils . isEmpty ( object . get ( "filePathColumn" ) . toString ( ) ) & &
dataList . get ( m ) . get ( object . get ( "filePathColumn" ) . toString ( ) ) ! = null ) {
String fileNames = dataList . get ( m ) . get ( object . get ( "filePathColumn" ) . toString ( ) ) . toString ( ) ;
String [ ] str = fileNames . split ( "," ) ;
for ( int r = 0 ; r < str . length ; r + + ) {
@ -266,20 +267,20 @@ public class IdcServiceImpl implements IdcService {
files . add ( filePath + filePathSlash + imagePath + str [ r ] ) ;
}
}
for ( int k = 0 ; k < 30 ; k + + ) {
if ( dataList . get ( k ) . get ( "tableName" + k ) = = null )
for ( int k = 0 ; k < 30 ; k + + ) {
if ( dataList . get ( k ) . get ( "tableName" + k ) = = null )
break ;
if ( dataList . get ( k ) . get ( "data" + k ) ! = null & & dataList . get ( k ) . get ( "filePathColumn" + k ) ! = null ) {
List < Map > childList = JSONObject . parseArray ( JSON . toJSONString ( dataList . get ( i ) . get ( "data" + k ) ) , Map . class ) ;
for ( int x = 0 ; x < childList . size ( ) ; x + + ) {
if ( childList . get ( x ) . get ( dataList . get ( k ) . get ( "filePathColumn" + k ) . toString ( ) ) ! = null ) {
String [ ] str = childList . get ( x ) . get ( dataList . get ( k ) . get ( "filePathColumn" + k ) . toString ( ) ) . toString ( ) . split ( "," ) ;
for ( int s = 0 ; s < str . length ; s + + ) {
if ( ! StringUtils . isEmpty ( str [ s ] ) & & FileUtils . isFileExist ( filePath + filePathSlash + imagePath + str [ s ] ) )
files . add ( filePath + filePathSlash + imagePath + str [ s ] ) ;
}
}
}
if ( dataList . get ( k ) . get ( "data" + k ) ! = null & & dataList . get ( k ) . get ( "filePathColumn" + k ) ! = null ) {
List < Map > childList = JSONObject . parseArray ( JSON . toJSONString ( dataList . get ( i ) . get ( "data" + k ) ) , Map . class ) ;
for ( int x = 0 ; x < childList . size ( ) ; x + + ) {
if ( childList . get ( x ) . get ( dataList . get ( k ) . get ( "filePathColumn" + k ) . toString ( ) ) ! = null ) {
String [ ] str = childList . get ( x ) . get ( dataList . get ( k ) . get ( "filePathColumn" + k ) . toString ( ) ) . toString ( ) . split ( "," ) ;
for ( int s = 0 ; s < str . length ; s + + ) {
if ( ! StringUtils . isEmpty ( str [ s ] ) & & FileUtils . isFileExist ( filePath + filePathSlash + imagePath + str [ s ] ) )
files . add ( filePath + filePathSlash + imagePath + str [ s ] ) ;
}
}
}
}
}
}
@ -292,7 +293,7 @@ public class IdcServiceImpl implements IdcService {
if ( IDCUtils . isJson ( result ) ) {
JSONObject res = JSON . parseObject ( result ) ;
if ( res . getInteger ( "code" ) = = 20000 ) {
executeSql ( "update basic_export_status set receiveStatus='1',endTime=now() where id='" + list . get ( i ) . get ( "id" ) + "'" ) ;
executeSql ( "update basic_export_status set receiveStatus='1',endTime=now() where id='" + list . get ( i ) . get ( "id" ) + "'" ) ;
saveIdcLog ( object . getString ( "messageType" ) , list . get ( i ) . get ( "id" ) . toString ( ) , object . getString ( "tableName" ) + ">reUpload->success" , 0 , 0 ) ;
} else {
saveIdcLog ( object . getString ( "messageType" ) , list . get ( i ) . get ( "id" ) . toString ( ) , object . getString ( "tableName" ) + ">reUpload->fail" , 0 , 0 ) ;
@ -302,78 +303,78 @@ public class IdcServiceImpl implements IdcService {
}
}
} catch ( Exception ex ) {
}
}
/*数据删除同步*/
private void asyncDelete ( String tname , boolean isUpload , String syncIp ) {
private void asyncDelete ( String tname , boolean isUpload , String syncIp ) {
String [ ] tnames = tname . split ( "/" ) ;
String lastUpdateTime = getUpdateTime ( tnames [ 2 ] + "." + tnames [ 0 ] + "." + tnames [ 1 ] + ".delete" ) ;
String lastUpdateTime = getUpdateTime ( tnames [ 2 ] + "." + tnames [ 0 ] + "." + tnames [ 1 ] + ".delete" ) ;
Date nowUpdateTime = new Date ( ) ;
String where = "tableName='" + tnames [ 2 ] . toLowerCase ( ) + "' and updateTime between cast('" + lastUpdateTime + "' as datetime) " +
" and cast('" + DateUtil . formatDate ( nowUpdateTime , "yyyy-MM-dd HH:mm:ss" ) + "' as datetime)" ;
Map < String , Object > count = new HashMap < String , Object > ( ) ;
count . put ( "sql" , "select count(*) from idc_delete where " + where ) ;
String where = "tableName='" + tnames [ 2 ] . toLowerCase ( ) + "' and updateTime between cast('" + lastUpdateTime + "' as datetime) " +
" and cast('" + DateUtil . formatDate ( nowUpdateTime , "yyyy-MM-dd HH:mm:ss" ) + "' as datetime)" ;
Map < String , Object > count = new HashMap < String , Object > ( ) ;
count . put ( "sql" , "select count(*) from idc_delete where " + where ) ;
int total = dbDao . count ( count ) ;
int limit = 50 ;
if ( total > 0 ) {
boolean success = true ;
for ( int i = 0 ; i < Math . ceil ( total / limit ) + 1 ; i + + ) {
Date startTime = new Date ( ) ;
Map < String , Object > map = new HashMap < String , Object > ( ) ;
Map < String , Object > map = new HashMap < String , Object > ( ) ;
map . put ( "sql" , "select * from idc_delete" ) ;
map . put ( "sqlWhere" , where ) ;
map . put ( "limit" , limit ) ;
map . put ( "page" , i * limit ) ;
List < Map < String , Object > > list = dbDao . list ( map ) ;
if ( list ! = null & & map . size ( ) > 0 ) {
List < Map < String , Object > > data = new ArrayList < > ( ) ;
for ( int k = 0 ; i < list . size ( ) ; k + + ) {
String line = list . get ( k ) . get ( "uniqueValue" ) . toString ( ) ;
JSONObject obj = JSON . parseObject ( line ) ;
String uniqueColumn = "" ;
for ( String key : obj . keySet ( ) ) {
uniqueColumn + = uniqueColumn . length ( ) > 0 ? "," + key : key ;
}
obj . put ( "uniqueColumn" , uniqueColumn ) ;
obj . put ( "operateMode" , "D" ) ;
data . add ( obj ) ;
}
Map < String , Object > msg = new HashMap < String , Object > ( ) ;
msg . put ( "messageId" , CustomUtil . getId ( ) ) ;
msg . put ( "messageType" , tnames [ 9 ] + "(删除)" ) ;
msg . put ( "apiCode" , "common" ) ;
msg . put ( "tableName" , DBAUtils . tableAliasName ( tnames [ 2 ] ) ) ;
msg . put ( "sendTime" , new Date ( ) ) ;
msg . put ( "version" , "1.0" ) ;
msg . put ( "total" , data . size ( ) ) ;
msg . put ( "data" , data ) ;
if ( isUpload ) {
String result = "" ;
try {
result = relay ( "" , JSON . toJSONString ( msg ) , null , syncIp ) ;
} catch ( Exception ex ) {
}
if ( IDCUtils . isJson ( result ) ) {
JSONObject json = JSON . parseObject ( result ) ;
if ( json . getInteger ( "code" ) = = 20000 ) {
saveIdcLog ( tnames [ 9 ] , "" , tnames [ 2 ] + ">success(delete)" , i * limit , total ) ;
} else {
success = false ;
saveIdcLog ( tnames [ 9 ] , "" , tnames [ 2 ] + ">" + result , i * limit , total ) ;
}
} else {
success = false ;
saveIdcLog ( tnames [ 9 ] , "" , syncIp + ":" + tnames [ 2 ] + ">fail:上传地址未连通" , i * limit , total ) ;
}
} else {
saveIdcLog ( tnames [ 9 ] , "" , tnames [ 2 ] + ">success(delete)" , i * limit , total ) ;
}
syncAddTaskStatus ( msg , isUpload ? 1 : 0 , true , startTime , isUpload , success ) ;
List < Map < String , Object > > list = dbDao . list ( map ) ;
if ( list ! = null & & map . size ( ) > 0 ) {
List < Map < String , Object > > data = new ArrayList < > ( ) ;
for ( int k = 0 ; i < list . size ( ) ; k + + ) {
String line = list . get ( k ) . get ( "uniqueValue" ) . toString ( ) ;
JSONObject obj = JSON . parseObject ( line ) ;
String uniqueColumn = "" ;
for ( String key : obj . keySet ( ) ) {
uniqueColumn + = uniqueColumn . length ( ) > 0 ? "," + key : key ;
}
obj . put ( "uniqueColumn" , uniqueColumn ) ;
obj . put ( "operateMode" , "D" ) ;
data . add ( obj ) ;
}
Map < String , Object > msg = new HashMap < String , Object > ( ) ;
msg . put ( "messageId" , CustomUtil . getId ( ) ) ;
msg . put ( "messageType" , tnames [ 9 ] + "(删除)" ) ;
msg . put ( "apiCode" , "common" ) ;
msg . put ( "tableName" , DBAUtils . tableAliasName ( tnames [ 2 ] ) ) ;
msg . put ( "sendTime" , new Date ( ) ) ;
msg . put ( "version" , "1.0" ) ;
msg . put ( "total" , data . size ( ) ) ;
msg . put ( "data" , data ) ;
if ( isUpload ) {
String result = "" ;
try {
result = relay ( "" , JSON . toJSONString ( msg ) , null , syncIp ) ;
} catch ( Exception ex ) {
}
if ( IDCUtils . isJson ( result ) ) {
JSONObject json = JSON . parseObject ( result ) ;
if ( json . getInteger ( "code" ) = = 20000 ) {
saveIdcLog ( tnames [ 9 ] , "" , tnames [ 2 ] + ">success(delete)" , i * limit , total ) ;
} else {
success = false ;
saveIdcLog ( tnames [ 9 ] , "" , tnames [ 2 ] + ">" + result , i * limit , total ) ;
}
} else {
success = false ;
saveIdcLog ( tnames [ 9 ] , "" , syncIp + ":" + tnames [ 2 ] + ">fail:上传地址未连通" , i * limit , total ) ;
}
} else {
saveIdcLog ( tnames [ 9 ] , "" , tnames [ 2 ] + ">success(delete)" , i * limit , total ) ;
}
syncAddTaskStatus ( msg , isUpload ? 1 : 0 , true , startTime , isUpload , success ) ;
}
}
}
@ -492,11 +493,11 @@ public class IdcServiceImpl implements IdcService {
} else {
success = false ;
}
syncAddTaskStatus ( json . getJSONObject ( "data" ) , 3 , true , startTime , true , success ) ;
syncAddTaskStatus ( json . getJSONObject ( "data" ) , 3 , true , startTime , true , success ) ;
} else {
if ( json . get ( "code" ) ! = null & & json . get ( "data" ) ! = null ) {
//logger.info("res1-->"+JSON.toJSONString(json));
syncAddTaskStatus ( json . getJSONObject ( "data" ) , 0 , true , startTime , true , success ) ;
syncAddTaskStatus ( json . getJSONObject ( "data" ) , 0 , true , startTime , true , success ) ;
analyMiddle ( host , json . getJSONObject ( "data" ) , files , false , false ) ;
}
}
@ -584,7 +585,7 @@ public class IdcServiceImpl implements IdcService {
boolean success = false ;
if ( isLastLevel ( ) ) {
success = analyMiddle ( "" , json , files , true , true ) ;
syncAddTaskStatus ( json , 3 , true , startTime , true , success ) ;
syncAddTaskStatus ( json , 3 , true , startTime , true , success ) ;
if ( ! success ) {
return ResultVOUtils . error ( 9000 , "解析失败" ) ;
@ -595,11 +596,11 @@ public class IdcServiceImpl implements IdcService {
String host = getNextHost ( ) ;
String result = relay ( request . getHeader ( "reqNo" ) , content , saveFiles , host ) ;
if ( IDCUtils . isJson ( result ) ) {
syncAddTaskStatus ( json , 2 , true , startTime , true , true ) ;
syncAddTaskStatus ( json , 2 , true , startTime , true , true ) ;
BaseResponse object = JSON . parseObject ( result , BaseResponse . class ) ;
return object ;
} else {
syncAddTaskStatus ( json , 2 , true , startTime , false , false ) ;
syncAddTaskStatus ( json , 2 , true , startTime , false , false ) ;
return ResultVOUtils . error ( 9000 , "转发失败" ) ;
}
@ -630,7 +631,6 @@ public class IdcServiceImpl implements IdcService {
OutputStream os ;
String filePathSlash = filePath . substring ( filePath . length ( ) - 1 ) . equals ( "/" ) ? "" : fileName . substring ( 0 , 1 ) . equals ( "/" ) ? "" : "/" ;
String sourceFileName = filePath + filePathSlash + imagePath + fileName ;
// String sourceFileName = fileName;
try {
if ( FileUtils . isFileExist ( sourceFileName ) ) {
byte [ ] bytes = FileUtils . readFileByBytes ( sourceFileName ) ;
@ -827,7 +827,7 @@ public class IdcServiceImpl implements IdcService {
}
saveIdcLog ( messageType , "" , tableName + ">success" , i * limit , total ) ;
}
syncAddTaskStatus ( data , isUpload ? 1 : 0 , true , startTime , isUpload , success ) ;
syncAddTaskStatus ( data , isUpload ? 1 : 0 , true , startTime , isUpload , success ) ;
}
}
}
@ -839,7 +839,7 @@ public class IdcServiceImpl implements IdcService {
/*增加同步任务状态*/
private void syncAddTaskStatus ( Map < String , Object > json , int scheduleType , boolean success , Date startTime ,
boolean isEnd , boolean isReceive ) {
boolean isEnd , boolean isReceive ) {
try {
String content = JSON . toJSONString ( json ) ;
String datePath = DateUtil . formatDate ( new Date ( ) , "yyyy-MM-dd" ) ;
@ -866,7 +866,7 @@ public class IdcServiceImpl implements IdcService {
map . put ( "status" , success ? "1" : "0" ) ;
map . put ( "receiveStatus" , isReceive ? "1" : "0" ) ;
map . put ( "startTime" , startTime ! = null ? startTime : new Date ( ) ) ;
if ( isReceive )
if ( isReceive )
map . put ( "endTime" , new Date ( ) ) ;
map . put ( "updateTime" , new Date ( ) ) ;
map . put ( "remark" , json . get ( "messageType" ) + ": " + json . get ( "total" ) + "条" ) ;
@ -960,7 +960,7 @@ public class IdcServiceImpl implements IdcService {
String [ ] keyColumn = new String [ 30 ] ;
String [ ] keyDataType = new String [ 30 ] ;
List < Map < String , String > > columnList = dbDao . listColumnsMysql ( tName ) ;
Map < String , Object > column = getColumn ( tName ) ;
Map < String , Object > column = getColumn ( tName ) ;
boolean result = false ;
int key = 0 ;
int col = 0 ;
@ -998,13 +998,13 @@ public class IdcServiceImpl implements IdcService {
}
if ( "A,D,U" . contains ( operateMode ) ) {
if ( operateMode . equals ( "D" ) & & list . get ( i ) . get ( "uniqueColumn" ) ! = null ) {
if ( operateMode . equals ( "D" ) & & list . get ( i ) . get ( "uniqueColumn" ) ! = null ) {
String [ ] ucs = list . get ( i ) . get ( "uniqueColumn" ) . toString ( ) . split ( "," ) ;
for ( String str : ucs ) {
Map < String , Object > map = ( Map < String , Object > ) column . get ( str ) ;
for ( String str : ucs ) {
Map < String , Object > map = ( Map < String , Object > ) column . get ( str ) ;
String dataType = map . get ( "dataType" ) . toString ( ) ;
updateWhere + = ! StringUtils . isEmpty ( updateWhere ) ? " and " : " " ;
updateWhere + = str + " = " + ( dataType . equals ( "C" ) ? "'" : "" ) + list . get ( i ) . get ( str ) + ( dataType . equals ( "C" ) ? "'" : "" ) ;
updateWhere + = str + " = " + ( dataType . equals ( "C" ) ? "'" : "" ) + list . get ( i ) . get ( str ) + ( dataType . equals ( "C" ) ? "'" : "" ) ;
}
} else {
for ( int z = 0 ; z < keyColumn . length ; z + + ) {
@ -1180,14 +1180,15 @@ public class IdcServiceImpl implements IdcService {
. post ( body )
. addHeader ( "Content-Type" , "application/x-www-form-urlencoded" )
. build ( ) ;
int total = 0 ;
int total = 0 ;
try {
Response result = client . newCall ( request ) . execute ( ) ;
String msg = result ! = null ? result . message ( ) . length ( ) > 200 ? result . message ( ) . substring ( 0 , 200 ) : result . message ( ) : "" ;
if ( result ! = null & & result . isSuccessful ( ) & & MediaType . parse ( "application/force-download" ) . equals ( result . body ( ) . contentType ( ) ) ) {
try ( InputStream inputStream = result . body ( ) . byteStream ( ) ) {
FileOutputStream outputStream = new FileOutputStream ( filePath + filePathSlash + imagePath + fileName ) ;
byte b [ ] = new byte [ 1024 ] ;
@ -1207,8 +1208,8 @@ public class IdcServiceImpl implements IdcService {
}
} catch ( Exception e ) {
e . printStackTrace ( ) ;
log . error ( "" , e ) ;
}
}
if ( ! ( total > 0 ) ) {
@ -1221,71 +1222,9 @@ public class IdcServiceImpl implements IdcService {
e . printStackTrace ( ) ;
}
return ( total > 0 ) ;
return ( total > 0 ) ;
}
// @Override
// public boolean signleDownloadFile(String syncIp, String fileFullName) {
// OkHttpClient client = new OkHttpClient().newBuilder()
// .build();
// ;
// MediaType mediaType = MediaType.parse("application/x-www-form-urlencoded");
//
// RequestBody body = RequestBody.create(mediaType, "fileName=" + fileFullName);
//
// Request request = new Request.Builder()
// .url(syncIp + "/spssync/common/downloadFile")
// .post(body)
// .addHeader("Content-Type", "application/x-www-form-urlencoded")
// .build();
// int total = 0;
// try {
// Response result = client.newCall(request).execute();
// String msg = result!=null ? result.message().length()>200 ? result.message().substring(0,200) : result.message() : "";
//
// if (result!=null&&result.isSuccessful()&&MediaType.parse("application/force-download").equals(result.body().contentType())) {
// try (InputStream inputStream = result.body().byteStream()) {
//
// String path = fileFullName.substring(0,fileFullName.lastIndexOf("/"));
// if(!FileUtils.makeDirectory(path))
// IDCUtils.createDirectory(path);
//
//
// FileOutputStream outputStream = new FileOutputStream(fileFullName);
//
// byte b[] = new byte[1024];
//
// int len = 0;
// while ((len = inputStream.read(b)) != -1) {
// total += len;
// outputStream.write(b, 0, len);
//
// }
//
// outputStream.flush();
// outputStream.close();
// if(!(total>0)) {
// new File(fileFullName).delete();
// executeSql("delete from idc_file where filePath='"+fileFullName+"'");
// }
//
// } catch (Exception e) {
//
//
// }
// }
// if(!(total>0)) {
// String sql = "replace idc_file (filePath,createTime,msg) values ('"+fileFullName+"',now(),'"+msg+"')";
// executeSql(sql);
// }
//
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// return (total>0);
// }
/*获取转发服务地址, 当前值允许单向, 只使用参数upper_server_ip*/
private String getNextHost ( ) {
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService . selectByParamKey ( "upper_server_ip" ) ;