动态添加队列

cert
anthonywj 2 years ago
parent d1e9a65aca
commit 38a122ca60

@ -22,6 +22,10 @@ public class ApiAdminApplication extends SpringBootServletInitializer {
SpringApplication application = new SpringApplication(ApiAdminApplication.class);
application.addInitializers(new ToolApplicationContextInitializer());
application.run(args);
// System.out.println(DateUtil.getStartTime("17:30:01")+"");
}

@ -10,6 +10,7 @@ import java.util.Map;
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String common = "topic.common";
public final static String trace = "topic.trace";
@ -75,4 +76,7 @@ public class TopicRabbitConfig {
.noargs();
}
}

@ -8,6 +8,8 @@ import com.glxp.api.entity.trace.MqTaskDelayMessage;
import com.glxp.api.entity.trace.MqTraceMessage;
import com.glxp.api.util.DateUtil;
import com.glxp.api.util.JsonUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -15,6 +17,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -25,10 +28,23 @@ public class RabbitPushService {
@Resource
private final RabbitTemplate rabbitTemplate;
@Resource
RabbitQueueService rabbitQueueService;
public static String FANOUT_EXCHANGE = " trace_fanout_exchange";
public BaseResponse send(MqTraceMessage mqTraceMessage) {
public void publicPush(MqTraceMessage mqTraceMessage) {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
String queueName = "fanout_" + mqTraceMessage.getCompanyId() + "";
rabbitQueueService.createQueue(queueName);
// channel.basicPublish(FANOUT_EXCHANGE, "", null, JsonUtils.toJsonString(mqTraceMessage).getBytes());
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, null, JsonUtils.toJsonString(mqTraceMessage));
}
public BaseResponse send(MqTraceMessage mqTraceMessage) {
rabbitTemplate.convertAndSend(TopicRabbitConfig.TRACE_TOPIC_EXCHANGE, TopicRabbitConfig.trace, JsonUtils.toJsonString(mqTraceMessage));
return ResultVOUtils.success("发送成功");
}

@ -0,0 +1,52 @@
package com.glxp.api.service.trace;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Properties;
/**
*
*/
@Slf4j
@Service
public class RabbitQueueService {
@Resource
private AmqpAdmin amqpAdmin;
public void createQueue(String queueName) {
//判断队列是否存在
if (!isExistQueue(queueName)) {
Queue queue = new Queue(queueName, true);
FanoutExchange fanoutExchange = new FanoutExchange(RabbitPushService.FANOUT_EXCHANGE);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(fanoutExchange);
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
}
}
public boolean isExistQueue(String queueName) {
boolean flag = true;
if (StrUtil.isNotBlank(queueName)) {
Properties queueProperties = amqpAdmin.getQueueProperties(queueName);
if (queueProperties == null) {
flag = false;
}
} else {
throw new RuntimeException("队列名称为空");
}
return flag;
}
}

@ -239,7 +239,8 @@ public class UdiTraceService {
mqTraceMessage.setCompanyId(companyId);
mqTraceMessage.setCerditNo(traceProductDetailEntity.getCreditNum());
mqTraceMessage.setTaskId(traceProductDetailEntity.getId());
rabbitPushService.send(mqTraceMessage);
// rabbitPushService.send(mqTraceMessage);
rabbitPushService.publicPush(mqTraceMessage);
}

@ -21,10 +21,7 @@ public class TraceSearchReceiver {
@Resource
TraceTaskNoticeService taskNoticeService;
@RabbitHandler
@RabbitListener(queues = TopicRabbitConfig.common)
public void traceReceiver(String mqTraceMessage) {
System.out.println("xxxx.xxxx.xxxx收到的消息内容为\n" + mqTraceMessage);
//todo 测试用
}
@ -42,7 +39,6 @@ public class TraceSearchReceiver {
log.info(mqTaskDelayMessage.getTaskId() + ":任务已完成,过期删除");
taskNoticeService.removeById(mqTaskDelayMessage.getTaskId());
}
}
}

@ -9,6 +9,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -84,7 +85,7 @@ public class RedisUtil {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
@ -602,8 +603,6 @@ public class RedisUtil {
}
/**
* key
*

Loading…
Cancel
Save