新增mq延迟队列;新增任务超时处理
parent
1746e021f5
commit
97deaec85f
@ -1,43 +1,69 @@
|
|||||||
package com.glxp.api.config.rabbit;
|
package com.glxp.api.config.rabbit;
|
||||||
|
|
||||||
import org.springframework.amqp.core.Binding;
|
import org.springframework.amqp.core.*;
|
||||||
import org.springframework.amqp.core.BindingBuilder;
|
|
||||||
import org.springframework.amqp.core.Queue;
|
|
||||||
import org.springframework.amqp.core.TopicExchange;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
public class TopicRabbitConfig {
|
public class TopicRabbitConfig {
|
||||||
//绑定键
|
//绑定键
|
||||||
public final static String common = "topic.common";
|
public final static String common = "topic.common";
|
||||||
public final static String trace = "topic.trace";
|
public final static String trace = "topic.trace";
|
||||||
|
public final static String delaytrace = "local.trace.delay";
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Queue firstQueue() {
|
public Queue traceQueue() {
|
||||||
return new Queue(TopicRabbitConfig.trace);
|
return new Queue(TopicRabbitConfig.trace);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Queue secondQueue() {
|
public Queue commonQueue() {
|
||||||
return new Queue(TopicRabbitConfig.common);
|
return new Queue(TopicRabbitConfig.common);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue traceDelayQueue() {
|
||||||
|
return QueueBuilder
|
||||||
|
.durable(delaytrace)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
TopicExchange exchange() {
|
TopicExchange exchange() {
|
||||||
return new TopicExchange("topicExchange");
|
return new TopicExchange("topicExchange");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Exchange traceDelayExchange() {
|
||||||
|
Map<String, Object> args = new HashMap<>(1);
|
||||||
|
args.put("x-delayed-type", "topic");
|
||||||
|
return new CustomExchange("local.trace.delay.exchange", "x-delayed-message", true, false, args);
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
Binding bindingExchangeMessage() {
|
Binding bindingExchangeMessage() {
|
||||||
return BindingBuilder.bind(firstQueue()).to(exchange()).with(trace);
|
return BindingBuilder.bind(traceQueue()).to(exchange()).with(trace);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
Binding bindingExchangeMessage2() {
|
Binding bindingExchangeMessage2() {
|
||||||
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
|
return BindingBuilder.bind(commonQueue()).to(exchange()).with("topic.#");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Binding orderBinding(Queue traceDelayQueue, Exchange traceDelayExchange) {
|
||||||
|
return BindingBuilder
|
||||||
|
.bind(traceDelayQueue)
|
||||||
|
.to(traceDelayExchange)
|
||||||
|
.with("trace.delay.*")
|
||||||
|
.noargs();
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,21 +1,48 @@
|
|||||||
package com.glxp.api.task.mq;
|
package com.glxp.api.task.mq;
|
||||||
|
|
||||||
|
import com.glxp.api.config.rabbit.TopicRabbitConfig;
|
||||||
|
import com.glxp.api.entity.trace.MqTaskDelayMessage;
|
||||||
import com.glxp.api.entity.trace.MqTraceMessage;
|
import com.glxp.api.entity.trace.MqTraceMessage;
|
||||||
|
import com.glxp.api.entity.trace.TraceTaskNoticeEntity;
|
||||||
|
import com.glxp.api.service.trace.TraceTaskNoticeService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
|
|
||||||
public class TraceSearchReceiver {
|
public class TraceSearchReceiver {
|
||||||
|
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
TraceTaskNoticeService taskNoticeService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
@RabbitListener(queues = "topic.trace")
|
@RabbitListener(queues = TopicRabbitConfig.trace)
|
||||||
public void traceReceiver(MqTraceMessage mqTraceMessage) {
|
public void traceReceiver(MqTraceMessage mqTraceMessage) {
|
||||||
System.out.println("topic.trace消费者收到消息 : " + mqTraceMessage.toString());
|
|
||||||
|
log.info("追随查询任务" + mqTraceMessage.toString());
|
||||||
|
//todo 测试用
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = TopicRabbitConfig.delaytrace)
|
||||||
|
public void traceDeleyReceiver(MqTaskDelayMessage mqTaskDelayMessage) {
|
||||||
|
|
||||||
|
if (mqTaskDelayMessage.getTaskStatus() == 1 || mqTaskDelayMessage.getTaskStatus() == 2) {
|
||||||
|
log.info(mqTaskDelayMessage.getTaskId() + ":任务超时未处理,更改状态为超时异常");
|
||||||
|
TraceTaskNoticeEntity taskNoticeEntity = taskNoticeService.getById(mqTaskDelayMessage.getTaskId());
|
||||||
|
taskNoticeEntity.setStatus(4);
|
||||||
|
taskNoticeEntity.setUpdateTime(new Date());
|
||||||
|
taskNoticeService.updateById(taskNoticeEntity);
|
||||||
|
} else if (mqTaskDelayMessage.getTaskStatus() == 3) {
|
||||||
|
log.info(mqTaskDelayMessage.getTaskId() + ":任务已完成,过期删除");
|
||||||
|
taskNoticeService.removeById(mqTaskDelayMessage.getTaskId());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue