拉取任务拆分

master
admin 2 years ago
parent 664c053e5b
commit b936879443

@ -25,7 +25,7 @@ public interface IdcService {
BaseResponse download(HttpServletRequest request,Map<String,Object> params);
public void asyncFetchTask();
public void asyncFetchUdiTask();
BaseResponse downlaodSuccess(HttpServletRequest request,Map<String,Object> params);
public void asyncIdcTask();

@ -130,12 +130,29 @@ public class IdcServiceImpl implements IdcService {
/*拉取前一级中继服务数据*/
@Async
@Override
public void asyncFetchUdiTask() {
fetchTask(false);
}
@Async
@Override
public void asyncFetchTask() {
Map<String,Object> query = new HashMap<String,Object>();
fetchTask(true);
}
private void fetchTask(boolean isIdc) {
String host = getNextHost();
if(!isIdc) {
Map<String,Object> map = dbDao.get("select * from sync_data_set limit 1");
if(map!=null&&map.get("syncIp")!=null)
host = map.get("syncIp").toString();
}
logger.info("fetch from ip:"+host);
if(!StringUtils.isEmpty(host)) {
String result = post(host+"/spssync/common/list",query);
String result = post(host+"/spssync/common/list",null);
logger.info("fetchTask--->"+result);
if(IDCUtils.isJson(result)) {
JSONObject json = JSON.parseObject(result);
if(json!=null&&json.getInteger("code")==20000&&json.getString("data")!=null) {
@ -144,15 +161,14 @@ public class IdcServiceImpl implements IdcService {
for(Map map:list) {
Map<String,Object> params = new HashMap<String,Object>();
params.put("taskId", map.get("taskId"));
fetchData(host+"/spssync/common/list",params);
fetchData(host+"/spssync/common/download",params);
}
}
}
}
}
}
@Async
@Override
public void asyncIdcTask() {
@ -249,6 +265,7 @@ public class IdcServiceImpl implements IdcService {
boolean success=true;
Response response = client.newCall(request).execute();
result = response.body().string();
logger.info("fetchData-->"+result);
JSONObject json = JSONObject.parseObject(result);
if(isLastLevel()) {
success = analyToDB(json,files);
@ -265,6 +282,7 @@ public class IdcServiceImpl implements IdcService {
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("fetchData-->"+e.getMessage());
}
return result;
}
@ -273,7 +291,9 @@ public class IdcServiceImpl implements IdcService {
OkHttpClient client = new OkHttpClient().newBuilder()
.build();
MediaType mediaType = MediaType.parse("application/json");
RequestBody body = RequestBody.create(mediaType, JSON.toJSONString(params));
RequestBody body = RequestBody.create(mediaType, "");
if(params!=null)
body = RequestBody.create(mediaType,JSON.toJSONString(params));
Request request = new Request.Builder()
.url(url)
.method("POST", body)
@ -283,6 +303,7 @@ public class IdcServiceImpl implements IdcService {
try {
Response response = client.newCall(request).execute();
result = response.body().string();
logger.info("post-result-->"+result);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@ -567,7 +588,9 @@ public class IdcServiceImpl implements IdcService {
map.put("taskId", json.get("messageId"));
map.put("cacheFilePath", fileName);
map.put("status", success ? "1" : "0");
saveExportStatus(map);
}

@ -1,55 +0,0 @@
package com.glxp.api.idc.thread;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import com.glxp.api.dao.schedule.ScheduledDao;
import com.glxp.api.entity.system.ScheduledEntity;
import com.glxp.api.idc.service.IdcService;
import com.glxp.api.req.system.ScheduledRequest;
@Component
@EnableScheduling
public class AsyncFetchTask implements SchedulingConfigurer {
final Logger logger = LoggerFactory.getLogger(AsyncFetchTask.class);
@Resource
private ScheduledDao scheduledDao;
@Resource
private IdcService idcService;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.addTriggerTask(() -> process(),
triggerContext -> {
ScheduledRequest scheduledRequest = new ScheduledRequest();
scheduledRequest.setCronName("syncFetch");
logger.info("syncFetch----------------");
ScheduledEntity scheduledEntity = scheduledDao.findScheduled(scheduledRequest);
String cron = scheduledEntity!=null ? scheduledEntity.getCron() : "* 0/5 * * * ?";
if (cron.isEmpty()) {
logger.error("cron is null");
}
logger.info("syncFetch----------------");
return new CronTrigger(cron).nextExecutionTime(triggerContext);
});
}
private void process() {
logger.info("syncFetch----process------------");
idcService.asyncFetchTask();
}
}
Loading…
Cancel
Save