diff --git a/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java b/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java index 1801f03..78c4cf5 100644 --- a/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java +++ b/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java @@ -1,43 +1,69 @@ package com.glxp.api.config.rabbit; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.HashMap; +import java.util.Map; + @Configuration public class TopicRabbitConfig { //绑定键 public final static String common = "topic.common"; public final static String trace = "topic.trace"; + public final static String delaytrace = "local.trace.delay"; + @Bean - public Queue firstQueue() { + public Queue traceQueue() { return new Queue(TopicRabbitConfig.trace); } @Bean - public Queue secondQueue() { + public Queue commonQueue() { return new Queue(TopicRabbitConfig.common); } + + @Bean + public Queue traceDelayQueue() { + return QueueBuilder + .durable(delaytrace) + .build(); + } + @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } + @Bean + public Exchange traceDelayExchange() { + Map args = new HashMap<>(1); + args.put("x-delayed-type", "topic"); + return new CustomExchange("local.trace.delay.exchange", "x-delayed-message", true, false, args); + } @Bean Binding bindingExchangeMessage() { - return BindingBuilder.bind(firstQueue()).to(exchange()).with(trace); + return BindingBuilder.bind(traceQueue()).to(exchange()).with(trace); } @Bean Binding bindingExchangeMessage2() { - return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); + return BindingBuilder.bind(commonQueue()).to(exchange()).with("topic.#"); } + + @Bean + public Binding orderBinding(Queue traceDelayQueue, Exchange traceDelayExchange) { + return BindingBuilder + .bind(traceDelayQueue) + .to(traceDelayExchange) + .with("trace.delay.*") + .noargs(); + + } } diff --git a/src/main/java/com/glxp/api/entity/trace/MqTaskDelayMessage.java b/src/main/java/com/glxp/api/entity/trace/MqTaskDelayMessage.java new file mode 100644 index 0000000..659e966 --- /dev/null +++ b/src/main/java/com/glxp/api/entity/trace/MqTaskDelayMessage.java @@ -0,0 +1,26 @@ +package com.glxp.api.entity.trace; + +import lombok.Data; + +import java.util.Date; + +/** + * 查询任务超时处理 + */ +@Data +public class MqTaskDelayMessage { + + private Date createTime; + private Long taskId; + + /** + * 任务状态 + * 1:未处理;(超时后,更改状态为4,超时处理失败) + * 2:正在处理(任务被下载) (超时后,更改状态为4,超时处理失败) + * 3:已处理(任务已提交);(超时后,删除该任务) + * 4:超时处理失败 + */ + private Integer taskStatus; + + +} diff --git a/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java b/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java index 31bc9bf..3a66e53 100644 --- a/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java +++ b/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java @@ -1,21 +1,48 @@ package com.glxp.api.task.mq; +import com.glxp.api.config.rabbit.TopicRabbitConfig; +import com.glxp.api.entity.trace.MqTaskDelayMessage; import com.glxp.api.entity.trace.MqTraceMessage; +import com.glxp.api.entity.trace.TraceTaskNoticeEntity; +import com.glxp.api.service.trace.TraceTaskNoticeService; +import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; +import javax.annotation.Resource; +import java.util.Date; +@Slf4j @Component - public class TraceSearchReceiver { + + @Resource + TraceTaskNoticeService taskNoticeService; + @RabbitHandler - @RabbitListener(queues = "topic.trace") + @RabbitListener(queues = TopicRabbitConfig.trace) public void traceReceiver(MqTraceMessage mqTraceMessage) { - System.out.println("topic.trace消费者收到消息 : " + mqTraceMessage.toString()); + + log.info("追随查询任务" + mqTraceMessage.toString()); + //todo 测试用 } + @RabbitListener(queues = TopicRabbitConfig.delaytrace) + public void traceDeleyReceiver(MqTaskDelayMessage mqTaskDelayMessage) { + + if (mqTaskDelayMessage.getTaskStatus() == 1 || mqTaskDelayMessage.getTaskStatus() == 2) { + log.info(mqTaskDelayMessage.getTaskId() + ":任务超时未处理,更改状态为超时异常"); + TraceTaskNoticeEntity taskNoticeEntity = taskNoticeService.getById(mqTaskDelayMessage.getTaskId()); + taskNoticeEntity.setStatus(4); + taskNoticeEntity.setUpdateTime(new Date()); + taskNoticeService.updateById(taskNoticeEntity); + } else if (mqTaskDelayMessage.getTaskStatus() == 3) { + log.info(mqTaskDelayMessage.getTaskId() + ":任务已完成,过期删除"); + taskNoticeService.removeById(mqTaskDelayMessage.getTaskId()); + } + } }