Merge remote-tracking branch 'origin/master'

cert
schry 2 years ago
commit cf35a6572a

@ -1,43 +1,69 @@
package com.glxp.api.config.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String common = "topic.common";
public final static String trace = "topic.trace";
public final static String delaytrace = "local.trace.delay";
@Bean
public Queue firstQueue() {
public Queue traceQueue() {
return new Queue(TopicRabbitConfig.trace);
}
@Bean
public Queue secondQueue() {
public Queue commonQueue() {
return new Queue(TopicRabbitConfig.common);
}
@Bean
public Queue traceDelayQueue() {
return QueueBuilder
.durable(delaytrace)
.build();
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Exchange traceDelayExchange() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-delayed-type", "topic");
return new CustomExchange("local.trace.delay.exchange", "x-delayed-message", true, false, args);
}
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(trace);
return BindingBuilder.bind(traceQueue()).to(exchange()).with(trace);
}
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
return BindingBuilder.bind(commonQueue()).to(exchange()).with("topic.#");
}
@Bean
public Binding orderBinding(Queue traceDelayQueue, Exchange traceDelayExchange) {
return BindingBuilder
.bind(traceDelayQueue)
.to(traceDelayExchange)
.with("trace.delay.*")
.noargs();
}
}

@ -0,0 +1,26 @@
package com.glxp.api.entity.trace;
import lombok.Data;
import java.util.Date;
/**
*
*/
@Data
public class MqTaskDelayMessage {
private Date createTime;
private Long taskId;
/**
*
* 1:(4,
* 2: (4,
* 3(
* 4:
*/
private Integer taskStatus;
}

@ -1,21 +1,48 @@
package com.glxp.api.task.mq;
import com.glxp.api.config.rabbit.TopicRabbitConfig;
import com.glxp.api.entity.trace.MqTaskDelayMessage;
import com.glxp.api.entity.trace.MqTraceMessage;
import com.glxp.api.entity.trace.TraceTaskNoticeEntity;
import com.glxp.api.service.trace.TraceTaskNoticeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
@Slf4j
@Component
public class TraceSearchReceiver {
@Resource
TraceTaskNoticeService taskNoticeService;
@RabbitHandler
@RabbitListener(queues = "topic.trace")
@RabbitListener(queues = TopicRabbitConfig.trace)
public void traceReceiver(MqTraceMessage mqTraceMessage) {
System.out.println("topic.trace消费者收到消息 : " + mqTraceMessage.toString());
log.info("追随查询任务" + mqTraceMessage.toString());
//todo 测试用
}
@RabbitListener(queues = TopicRabbitConfig.delaytrace)
public void traceDeleyReceiver(MqTaskDelayMessage mqTaskDelayMessage) {
if (mqTaskDelayMessage.getTaskStatus() == 1 || mqTaskDelayMessage.getTaskStatus() == 2) {
log.info(mqTaskDelayMessage.getTaskId() + ":任务超时未处理,更改状态为超时异常");
TraceTaskNoticeEntity taskNoticeEntity = taskNoticeService.getById(mqTaskDelayMessage.getTaskId());
taskNoticeEntity.setStatus(4);
taskNoticeEntity.setUpdateTime(new Date());
taskNoticeService.updateById(taskNoticeEntity);
} else if (mqTaskDelayMessage.getTaskStatus() == 3) {
log.info(mqTaskDelayMessage.getTaskId() + ":任务已完成,过期删除");
taskNoticeService.removeById(mqTaskDelayMessage.getTaskId());
}
}
}

Loading…
Cancel
Save