@ -4,10 +4,10 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject ;
import com.glxp.api.common.res.BaseResponse ;
import com.glxp.api.common.util.ResultVOUtils ;
import com.glxp.api.dao.idc.DbDao ;
import com.glxp.api.dao.schedule.ScheduledDao ;
import com.glxp.api.entity.system.ScheduledEntity ;
import com.glxp.api.entity.system.SystemParamConfigEntity ;
import com.glxp.api.dao.idc.DbDao ;
import com.glxp.api.idc.service.IdcService ;
import com.glxp.api.idc.utils.DBAUtils ;
import com.glxp.api.idc.utils.IDCUtils ;
@ -109,18 +109,27 @@ public class IdcServiceImpl implements IdcService {
}
/*拉取前一级中继服务数据*/
@Async
@Override
public void pull ( ) {
Map < String , Object > query = new HashMap < String , Object > ( ) ;
String result = post ( getNextHost ( "U" ) + "/spssync/common/list" , query ) ;
JSONObject json = JSON . parseObject ( result ) ;
if ( json . getInteger ( "code" ) = = 20000 & & json . getString ( "data" ) ! = null ) {
List < Map > list = JSON . parseArray ( json . getString ( "data" ) , Map . class ) ;
if ( list ! = null ) {
for ( Map map : list ) {
Map < String , Object > params = new HashMap < String , Object > ( ) ;
params . put ( "taskId" , map . get ( "taskId" ) ) ;
pullData ( getNextHost ( "U" ) + "/spssync/common/list" , params ) ;
public void asyncFetchTask ( ) {
String [ ] directions = { "I" , "U" } ;
for ( String dir : directions ) {
Map < String , Object > query = new HashMap < String , Object > ( ) ;
String host = getNextHost ( dir ) ;
if ( ! StringUtils . isEmpty ( host ) ) {
String result = post ( host + "/spssync/common/list" , query ) ;
if ( IDCUtils . isJson ( result ) ) {
JSONObject json = JSON . parseObject ( result ) ;
if ( json ! = null & & json . getInteger ( "code" ) = = 20000 & & json . getString ( "data" ) ! = null ) {
List < Map > list = JSON . parseArray ( json . getString ( "data" ) , Map . class ) ;
if ( list ! = null ) {
for ( Map map : list ) {
Map < String , Object > params = new HashMap < String , Object > ( ) ;
params . put ( "taskId" , map . get ( "taskId" ) ) ;
fetchData ( host + "/spssync/common/list" , params ) ;
}
}
}
}
}
}
@ -135,14 +144,14 @@ public class IdcServiceImpl implements IdcService {
if ( direction . equals ( "I" ) ) {
tNames = SPMS_TO_UDI_TABLES ;
} else {
} else if ( direction . equals ( "U" ) ) {
tNames = UDI_TO_SPMS_TABLES ;
}
/*为顶级或末级,以及下游或上游连通,可执行*/
Boolean isLastLevel = isLastLevel ( direction ) ;
Boolean isRelay = isRelay ( direction ) ;
System. out . print ( "-----数据传输任务开始----" + direction + "\n" ) ;
System. out . print ( "-----是否顶级或末级服务:" + isLastLevel + ",是否转发数据 :" + isRelay + "----\n" ) ;
logger. info ( "-----数据传输任务开始----" + direction + "\n" ) ;
logger. info ( "-----是否需要上传或下发数据:" + isLastLevel + ",是否已配置接收地址 :" + isRelay + "----\n" ) ;
if ( isLastLevel & & isRelay ) {
for ( String t : tNames ) {
uploadData ( t ) ;
@ -151,17 +160,22 @@ public class IdcServiceImpl implements IdcService {
}
private void uploadData ( String t ) {
String [ ] tn = t . split ( "/" ) ;
String lastUpdateTime = getUpdateTime ( tn [ 0 ] ) ;
Date nowUpdateTime = new Date ( ) ;
List < Map < String , String > > keyList = dbDao . listKeyMysql ( tn [ 0 ] ) ;
String keyColumn = keyList ! = null & & keyList . size ( ) > 0 ? keyList . get ( 0 ) . get ( "columnName" ) . toString ( ) : "id" ;
Map < String , Object > map = new HashMap < String , Object > ( ) ;
String sqlWhere = "not exists (select fkId from idc_record where fkId=" + tn [ 0 ] + ".id)" ;
String sqlWhere = "not exists (select fkId from idc_record where type='" + tn [ 0 ] + "' and fkId=" + tn [ 0 ] + ". " + keyColumn + " )";
if ( ! StringUtils . isEmpty ( tn [ 1 ] ) ) {
sqlWhere + = " and ifnull( " + tn [ 1 ] + " ,now()) >=cast('" + lastUpdateTime + "' as datetime)" ;
sqlWhere + = " and " + tn [ 1 ] + " >=cast('" + lastUpdateTime + "' as datetime)" ;
} else {
map . put ( "isEnd" , "1" ) ;
}
if ( tn [ 0 ] . equals ( "thr_order" ) )
logger . info ( "-------------------" + sqlWhere ) ;
map . put ( "sqlWhere" , sqlWhere ) ;
map . put ( "tableName" , tn [ 0 ] ) ;
map . put ( "filePathColumn" , tn [ 2 ] ) ;
@ -177,15 +191,15 @@ public class IdcServiceImpl implements IdcService {
Date nextTimePoint = cronSequenceGenerator . next ( DateUtil . parseDate ( lastUpdateTime ) ) ;
send = nextTimePoint . before ( nowUpdateTime ) ;
}
//if(send) {
BaseResponse result = send ( map ) ;
if ( result . getCode ( ) = = 20000 ) {
setUpdateTime ( tn [ 0 ] , DateUtil . formatDate ( nowUpdateTime ) ) ;
if ( send ) {
BaseResponse result = send ( map ) ;
if ( result . getCode ( ) = = 20000 ) {
setUpdateTime ( tn [ 0 ] , DateUtil . formatDate ( nowUpdateTime , "yyyy-MM-dd HH:mm:ss" ) ) ;
}
}
//}
}
private String pull Data( String url , Map < String , Object > params ) {
private String fetch Data( String url , Map < String , Object > params ) {
OkHttpClient client = new OkHttpClient ( ) . newBuilder ( )
. build ( ) ;
MediaType mediaType = MediaType . parse ( "application/json" ) ;
@ -292,13 +306,20 @@ public class IdcServiceImpl implements IdcService {
/*数据同步,从数据库获取数据下发或上传下级中继服务*/
@Override
public BaseResponse send ( Map < String , Object > params ) {
return send ( params . get ( "messageType" ) . toString ( ) , params . get ( "tableName" ) . toString ( ) , params ) ;
}
@Override
public BaseResponse send ( String messageType , String tableName , Map < String , Object > params ) {
if ( sendOnPage ( messageType , tableName , params ) )
return ResultVOUtils . success ( ) ;
try {
if ( sendOnPage ( messageType , tableName , params ) )
return ResultVOUtils . success ( ) ;
} catch ( Exception ex ) {
logger . error ( ex . getMessage ( ) ) ;
}
return ResultVOUtils . error ( 9999 , "" ) ;
}
@ -348,7 +369,7 @@ public class IdcServiceImpl implements IdcService {
}
private boolean sendOnPage ( String messageType , String tableName , Map < String , Object > params ) {
boolean success = tru e;
boolean success = fals e;
Map < String , String > table = dbDao . getMysql ( tableName ) ;
if ( ! ( table ! = null & & table . get ( "tableName" ) ! = null ) )
return false ;
@ -368,9 +389,14 @@ public class IdcServiceImpl implements IdcService {
String filePathColumn = params . get ( "filePathColumn" ) ! = null ? params . get ( "filePathColumn" ) . toString ( ) : "" ;
orderNum + + ;
saveIdcLog ( messageType , "" , tableName + ">" + where , 0 , total ) ;
if ( tableName . equals ( "thr_order" ) )
logger . info ( "tableName-->" + tableName + "-->row-->" + total + "-->sqlWhere-->" + where ) ;
if ( total > 0 ) {
success = true ;
params . put ( "page" , 0 ) ;
params . put ( "limit" , limit ) ;
List < Map < String , String > > keyList = dbDao . listKeyMysql ( tableName ) ;
String keyColumn = keyList ! = null & & keyList . size ( ) > 0 ? keyList . get ( 0 ) . get ( "columnName" ) . toString ( ) : "id" ;
for ( int i = 0 ; i < Math . ceil ( total / limit ) + 1 ; i + + ) {
params . replace ( "page" , i * limit ) ;
params . replace ( "limit" , limit ) ;
@ -387,7 +413,7 @@ public class IdcServiceImpl implements IdcService {
files [ m ] = list . get ( m ) . get ( filePathColumn ) . toString ( ) ;
}
recordSql + = ! StringUtils . isEmpty ( recordSql ) ? "," : "" ;
recordSql + = "('" + UUID . randomUUID ( ) . toString ( ) . replaceAll ( "-" , "" ) + "','" + tableName + "','" + list . get ( m ) . get ( "id" ) + "','" + isEnd + "',now())" ;
recordSql + = "('" + UUID . randomUUID ( ) . toString ( ) . replaceAll ( "-" , "" ) + "','" + tableName + "','" + list . get ( m ) . get ( keyColumn ) + "','" + isEnd + "',now())" ;
}
orderNum + + ;
@ -409,6 +435,7 @@ public class IdcServiceImpl implements IdcService {
}
}
}
}
return success ;
}
@ -477,6 +504,7 @@ public class IdcServiceImpl implements IdcService {
String extra = columnList . get ( k ) . get ( "extra" ) ! = null & & columnList . get ( k ) . get ( "extra" ) . toLowerCase ( ) . contains ( "auto" ) ? "A" : "N" ;
String attrName = columnList . get ( k ) . get ( "attrName" ) ;
String value = list . get ( i ) . get ( attrName ) ! = null ? list . get ( i ) . get ( attrName ) . toString ( ) : "" ;
value = DBAUtils . escape ( value ) ;
String dataType = columnList . get ( k ) . get ( "dataType" ) . toLowerCase ( ) . contains ( "char" ) | | columnList . get ( k ) . get ( "dataType" ) . toLowerCase ( ) . contains ( "text" ) ?
"C" : columnList . get ( k ) . get ( "dataType" ) . toLowerCase ( ) . contains ( "date" ) ? "D" : "N" ;
if ( extra . equals ( "N" ) ) {
@ -575,44 +603,30 @@ public class IdcServiceImpl implements IdcService {
/*获取转发服务地址, 当前值允许单向, 只使用参数upper_server_ip*/
private String getNextHost ( String direction ) {
SystemParamConfigEntity systemParamConfigEntity = getSystemParamConfig ( "upper_server_ip" , "中继上传服务地址" , "" , "" ) ;
//direction.equals("U") ? getSystemParamConfig("upper_server_ip","中继上传服务地址" , "",""):
//getSystemParamConfig("sync_idc_lower_host","下级(下发)中继服务地址" , "","") ;
return systemParamConfigEntity . getParamValue ( ) ;
SystemParamConfigEntity systemParamConfigEntity =
direction . equals ( "U" ) ? getSystemParamConfig ( "upper_server_host" , "自助平台数据接收服务地址" , "" , "接收由UDI系统上传的数据" ) :
direction . equals ( "I" ) ? getSystemParamConfig ( "lower_server_host" , "UDI系统数据接收服务地址" , "" , "接收由自助平台下发的数据" ) : null ;
String host = systemParamConfigEntity ! = null ? systemParamConfigEntity . getParamValue ( ) : "" ;
host = ! StringUtils . isEmpty ( host ) & & host . substring ( host . length ( ) - 1 ) . equals ( "/" ) ? host . substring ( 0 , host . length ( ) - 1 ) : host ;
return host ;
}
/*判断是否 最后一级 */
/*判断是否 上传或下发数据 */
private boolean isLastLevel ( String direction ) {
SystemParamConfigEntity systemParamConfigEntity =
direction . equals ( "I" ) ? getSystemParamConfig ( "sync_idc_top" , "是否顶级中继服务(连接自助平台)" , "0" , "0: 否; 1: 是(是,接收下级上传数据后解析入库)" ) :
getSystemParamConfig ( "sync_idc_final" , "是否末级中继服务(连接UDI管理系统)" , "0" , "0: 否; 1: 是(是,接收上级下发数据后解析入库)" ) ;
return systemParamConfigEntity . getParamValue ( ) . equals ( "0" ) ? false : true ;
return direction . equals ( "I" ) | | direction . equals ( "U" ) ? true : false ;
}
/*检查当前系统为自助平台( 下发) 还是UDI系统(上传),返回传输方向,如果都未设置,默认返回下发*/
private String getDirection ( ) {
SystemParamConfigEntity systemParamConfigEntity = getSystemParamConfig ( "sync_idc_top" , "是否顶级中继服务(连接自助平台)" , "0" , "0: 否; 1: 是(是,接收下级上传数据后解析入库)" ) ;
System . out . print ( ">>>>>>>>>>" + JSON . toJSONString ( systemParamConfigEntity ) + "\n" ) ;
if ( systemParamConfigEntity . getParamValue ( ) . equals ( "0" ) ) {
systemParamConfigEntity = getSystemParamConfig ( "sync_idc_final" , "是否末级中继服务(连接UDI管理系统)" , "0" , "0: 否; 1: 是(是,接收上级下发数据后解析入库)" ) ;
if ( systemParamConfigEntity . getParamValue ( ) . equals ( "1" ) )
return "U" ;
}
return "I" ;
SystemParamConfigEntity systemParamConfigEntity = getSystemParamConfig ( "sync_system_type" , "系统类型" , "IDC" , "UDI(UDI管理系统, 由UDI系统往自助平台上传数据),SPS(自助平台, 由自助平台往UDI系统下发数据),IDC(中继服务,只接收,转发或暂存数据供下一级服务拉取数据)" ) ;
return systemParamConfigEntity . getParamValue ( ) . equals ( "UDI" ) ? "U" : systemParamConfigEntity . getParamValue ( ) . equals ( "SPS" ) ? "I" : "N" ;
}
/*是否需要转发*/
private boolean isRelay ( String direction ) {
String relay Str = direction . equals ( "U" ) ? "sync_upstream_enable" : "sync_downstream_enable" ;
String relayHost = getNextHost ( direction ) ;
SystemParamConfigEntity systemParamConfigEntity = systemParamConfigService . selectByParamKey ( relayStr ) ;
if ( systemParamConfigEntity ! = null & & systemParamConfigEntity . getParamValue ( ) ! = null & &
systemParamConfigEntity . getParamValue ( ) . equals ( "0" ) ) {
return false ;
}
return true ;
return ! StringUtils . isEmpty ( relayHost ) ;
}
@ -707,7 +721,7 @@ public class IdcServiceImpl implements IdcService {
String updateTime = "" ;
Map < String , Object > map = dbDao . get ( "select * from idc_status where id='" + id + "'" ) ;
if ( map ! = null & & map . get ( "id" ) ! = null ) {
updateTime = DateUtil . formatDate ( IDCUtils . parseDate ( map . get ( "statusTime" ) . toString ( ) ) );
updateTime = DateUtil . formatDate ( IDCUtils . parseDate ( map . get ( "statusTime" ) . toString ( ) ) , "yyyy-MM-dd HH:mm:ss" );
} else {
updateTime = "2000-01-01 00:00:00" ;
String sql = "insert into idc_status (id,statusTime) values ('" + id + "',cast('" + updateTime + "' as datetime))" ;