From 38a122ca60bef91fe6286f3465f332a5c24bbb24 Mon Sep 17 00:00:00 2001 From: anthonywj Date: Thu, 1 Jun 2023 16:33:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A8=E6=80=81=E6=B7=BB=E5=8A=A0=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/glxp/api/ApiAdminApplication.java | 4 ++ .../api/config/rabbit/TopicRabbitConfig.java | 4 ++ .../api/service/trace/RabbitPushService.java | 18 ++++++- .../api/service/trace/RabbitQueueService.java | 52 +++++++++++++++++++ .../api/service/trace/UdiTraceService.java | 3 +- .../glxp/api/task/mq/TraceSearchReceiver.java | 4 -- .../java/com/glxp/api/util/RedisUtil.java | 5 +- 7 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/glxp/api/service/trace/RabbitQueueService.java diff --git a/src/main/java/com/glxp/api/ApiAdminApplication.java b/src/main/java/com/glxp/api/ApiAdminApplication.java index 4e002d7..7ad1e93 100644 --- a/src/main/java/com/glxp/api/ApiAdminApplication.java +++ b/src/main/java/com/glxp/api/ApiAdminApplication.java @@ -22,6 +22,10 @@ public class ApiAdminApplication extends SpringBootServletInitializer { SpringApplication application = new SpringApplication(ApiAdminApplication.class); application.addInitializers(new ToolApplicationContextInitializer()); application.run(args); + + + + // System.out.println(DateUtil.getStartTime("17:30:01")+""); } diff --git a/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java b/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java index d65c988..74688a2 100644 --- a/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java +++ b/src/main/java/com/glxp/api/config/rabbit/TopicRabbitConfig.java @@ -10,6 +10,7 @@ import java.util.Map; @Configuration public class TopicRabbitConfig { + //绑定键 public final static String common = "topic.common"; public final static String trace = "topic.trace"; @@ -75,4 +76,7 @@ public class TopicRabbitConfig { .noargs(); } + + + } diff --git a/src/main/java/com/glxp/api/service/trace/RabbitPushService.java b/src/main/java/com/glxp/api/service/trace/RabbitPushService.java index 3764706..63a1507 100644 --- a/src/main/java/com/glxp/api/service/trace/RabbitPushService.java +++ b/src/main/java/com/glxp/api/service/trace/RabbitPushService.java @@ -8,6 +8,8 @@ import com.glxp.api.entity.trace.MqTaskDelayMessage; import com.glxp.api.entity.trace.MqTraceMessage; import com.glxp.api.util.DateUtil; import com.glxp.api.util.JsonUtils; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -15,6 +17,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import javax.annotation.Resource; +import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -25,10 +28,23 @@ public class RabbitPushService { @Resource private final RabbitTemplate rabbitTemplate; + @Resource + RabbitQueueService rabbitQueueService; + public static String FANOUT_EXCHANGE = " trace_fanout_exchange"; - public BaseResponse send(MqTraceMessage mqTraceMessage) { + public void publicPush(MqTraceMessage mqTraceMessage) { + Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); + String queueName = "fanout_" + mqTraceMessage.getCompanyId() + ""; + rabbitQueueService.createQueue(queueName); +// channel.basicPublish(FANOUT_EXCHANGE, "", null, JsonUtils.toJsonString(mqTraceMessage).getBytes()); + rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, null, JsonUtils.toJsonString(mqTraceMessage)); + + } + + + public BaseResponse send(MqTraceMessage mqTraceMessage) { rabbitTemplate.convertAndSend(TopicRabbitConfig.TRACE_TOPIC_EXCHANGE, TopicRabbitConfig.trace, JsonUtils.toJsonString(mqTraceMessage)); return ResultVOUtils.success("发送成功"); } diff --git a/src/main/java/com/glxp/api/service/trace/RabbitQueueService.java b/src/main/java/com/glxp/api/service/trace/RabbitQueueService.java new file mode 100644 index 0000000..45a81e9 --- /dev/null +++ b/src/main/java/com/glxp/api/service/trace/RabbitQueueService.java @@ -0,0 +1,52 @@ +package com.glxp.api.service.trace; + +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.Properties; + +/** + * 动态创建队列 + */ +@Slf4j +@Service +public class RabbitQueueService { + + + @Resource + private AmqpAdmin amqpAdmin; + + + public void createQueue(String queueName) { + //判断队列是否存在 + if (!isExistQueue(queueName)) { + Queue queue = new Queue(queueName, true); + FanoutExchange fanoutExchange = new FanoutExchange(RabbitPushService.FANOUT_EXCHANGE); + amqpAdmin.declareQueue(queue); + amqpAdmin.declareExchange(fanoutExchange); + amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange)); + } + } + + public boolean isExistQueue(String queueName) { + boolean flag = true; + if (StrUtil.isNotBlank(queueName)) { + Properties queueProperties = amqpAdmin.getQueueProperties(queueName); + if (queueProperties == null) { + flag = false; + } + } else { + throw new RuntimeException("队列名称为空"); + } + return flag; + } + + +} diff --git a/src/main/java/com/glxp/api/service/trace/UdiTraceService.java b/src/main/java/com/glxp/api/service/trace/UdiTraceService.java index 824b7d4..8ac3c37 100644 --- a/src/main/java/com/glxp/api/service/trace/UdiTraceService.java +++ b/src/main/java/com/glxp/api/service/trace/UdiTraceService.java @@ -239,7 +239,8 @@ public class UdiTraceService { mqTraceMessage.setCompanyId(companyId); mqTraceMessage.setCerditNo(traceProductDetailEntity.getCreditNum()); mqTraceMessage.setTaskId(traceProductDetailEntity.getId()); - rabbitPushService.send(mqTraceMessage); +// rabbitPushService.send(mqTraceMessage); + rabbitPushService.publicPush(mqTraceMessage); } diff --git a/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java b/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java index cf8892b..368c998 100644 --- a/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java +++ b/src/main/java/com/glxp/api/task/mq/TraceSearchReceiver.java @@ -21,10 +21,7 @@ public class TraceSearchReceiver { @Resource TraceTaskNoticeService taskNoticeService; - @RabbitHandler - @RabbitListener(queues = TopicRabbitConfig.common) public void traceReceiver(String mqTraceMessage) { - System.out.println("xxxx.xxxx.xxxx收到的消息内容为:\n" + mqTraceMessage); //todo 测试用 } @@ -42,7 +39,6 @@ public class TraceSearchReceiver { log.info(mqTaskDelayMessage.getTaskId() + ":任务已完成,过期删除"); taskNoticeService.removeById(mqTaskDelayMessage.getTaskId()); } - } } diff --git a/src/main/java/com/glxp/api/util/RedisUtil.java b/src/main/java/com/glxp/api/util/RedisUtil.java index 4411ddc..04ee2ad 100644 --- a/src/main/java/com/glxp/api/util/RedisUtil.java +++ b/src/main/java/com/glxp/api/util/RedisUtil.java @@ -9,6 +9,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -84,7 +85,7 @@ public class RedisUtil { if (key.length == 1) { redisTemplate.delete(key[0]); } else { - redisTemplate.delete(CollectionUtils.arrayToList(key)); + redisTemplate.delete((Collection) CollectionUtils.arrayToList(key)); } } } @@ -602,8 +603,6 @@ public class RedisUtil { } - - /** * 不存在key则缓存放入并设置时间 *