rabbit
parent
75c9544048
commit
92951209c7
@ -0,0 +1,103 @@
|
|||||||
|
package com.glxp.sale.admin.config.rabbit;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import org.springframework.amqp.core.*;
|
||||||
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class RabbitConfig {
|
||||||
|
|
||||||
|
// private final static String exchangeName = "sync_direct_exchange";
|
||||||
|
public final static String topicExchangeName = "sync_topic_exchange";
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 队列名
|
||||||
|
*/
|
||||||
|
public final static String syncQueueName = "sync";
|
||||||
|
public final static String uploadQueueName = "upload";
|
||||||
|
|
||||||
|
|
||||||
|
private final static String syncTopicKey = "sync.*";
|
||||||
|
public final static String syncInsertKey = "sync.insert";
|
||||||
|
public final static String syncUpdateKey = "sync.update";
|
||||||
|
public final static String syncDeleteKey = "sync.delete";
|
||||||
|
|
||||||
|
private final static String uploadTopicKey = "upload.*";
|
||||||
|
public final static String uploadInsertKey = "upload.insert";
|
||||||
|
public final static String uploadUpdateKey = "upload.update";
|
||||||
|
public final static String uploadDeleteKey = "upload.delete";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明direct交换机
|
||||||
|
*/
|
||||||
|
// @Bean
|
||||||
|
// DirectExchange udiExchange() {
|
||||||
|
// //设置消息持久化
|
||||||
|
// return new DirectExchange(exchangeName, true, false);
|
||||||
|
// }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明topic交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
TopicExchange udiTopicExchange() {
|
||||||
|
//设置消息持久化
|
||||||
|
return new TopicExchange(topicExchangeName, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明队列
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
Queue syncQueue() {
|
||||||
|
//设置消息持久化
|
||||||
|
return new Queue(syncQueueName, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
Queue uploadQueue() {
|
||||||
|
//设置消息持久化
|
||||||
|
return new Queue(uploadQueueName, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将根据路由key队列绑定交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
Binding bindingSync() {
|
||||||
|
return BindingBuilder.bind(syncQueue()).to(udiTopicExchange()).with(syncTopicKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将根据路由key队列绑定交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
Binding bindingUpload() {
|
||||||
|
return BindingBuilder.bind(uploadQueue()).to(udiTopicExchange()).with(uploadTopicKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
// /**
|
||||||
|
// * 将根据路由key队列绑定交换机
|
||||||
|
// */
|
||||||
|
// @Bean
|
||||||
|
// Binding bindingSync() {
|
||||||
|
// return BindingBuilder.bind(syncQueue()).to(udiExchange()).with(syncKey);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// @Bean
|
||||||
|
// Binding bindingUpload() {
|
||||||
|
// return BindingBuilder.bind(uploadQueue()).to(udiExchange()).with(uploadKey);
|
||||||
|
// }
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,39 @@
|
|||||||
|
package com.glxp.sale.admin.rabbitmq;
|
||||||
|
|
||||||
|
import com.glxp.sale.admin.config.rabbit.RabbitConfig;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.messaging.handler.annotation.Payload;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class SyncListener {
|
||||||
|
|
||||||
|
@Resource(name = "localRabbitTemplate")
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@RabbitListener(queues = RabbitConfig.syncQueueName, containerFactory = "upperFactory", concurrency = "20-40")
|
||||||
|
public void process(Message message, @Payload String str) {
|
||||||
|
//获取路由key
|
||||||
|
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
||||||
|
System.out.println(routingKey);
|
||||||
|
System.out.println(str);
|
||||||
|
switch (routingKey) {
|
||||||
|
case RabbitConfig.syncInsertKey:
|
||||||
|
case RabbitConfig.syncUpdateKey:
|
||||||
|
case RabbitConfig.syncDeleteKey:
|
||||||
|
rabbitTemplate.convertSendAndReceive(RabbitConfig.topicExchangeName, routingKey, str);
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,44 @@
|
|||||||
|
package com.glxp.sale.admin.rabbitmq;
|
||||||
|
|
||||||
|
import com.glxp.sale.admin.config.rabbit.RabbitConfig;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
|
import org.springframework.messaging.handler.annotation.Payload;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class UploadListener {
|
||||||
|
|
||||||
|
@Resource(name = "upperRabbitTemplate")
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@Value("${upper.spring.rabbitmq.exchange}")
|
||||||
|
private String upperExchange;
|
||||||
|
|
||||||
|
@RabbitListener(queues = RabbitConfig.uploadQueueName,containerFactory = "localFactory", concurrency = "20-40")
|
||||||
|
public void process(Message message, @Payload String str) {
|
||||||
|
//获取路由key
|
||||||
|
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
||||||
|
System.out.println(routingKey);
|
||||||
|
System.out.println(str);
|
||||||
|
switch (routingKey) {
|
||||||
|
case RabbitConfig.uploadInsertKey:
|
||||||
|
case RabbitConfig.uploadUpdateKey:
|
||||||
|
case RabbitConfig.uploadDeleteKey:
|
||||||
|
rabbitTemplate.convertSendAndReceive(upperExchange, routingKey, str);
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue