Compare commits
1 Commits
test
...
rabbit_dev
Author | SHA1 | Date |
---|---|---|
|
92951209c7 | 2 years ago |
@ -0,0 +1,103 @@
|
||||
package com.glxp.sale.admin.config.rabbit;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
||||
@Configuration
|
||||
public class RabbitConfig {
|
||||
|
||||
// private final static String exchangeName = "sync_direct_exchange";
|
||||
public final static String topicExchangeName = "sync_topic_exchange";
|
||||
|
||||
|
||||
/**
|
||||
* 队列名
|
||||
*/
|
||||
public final static String syncQueueName = "sync";
|
||||
public final static String uploadQueueName = "upload";
|
||||
|
||||
|
||||
private final static String syncTopicKey = "sync.*";
|
||||
public final static String syncInsertKey = "sync.insert";
|
||||
public final static String syncUpdateKey = "sync.update";
|
||||
public final static String syncDeleteKey = "sync.delete";
|
||||
|
||||
private final static String uploadTopicKey = "upload.*";
|
||||
public final static String uploadInsertKey = "upload.insert";
|
||||
public final static String uploadUpdateKey = "upload.update";
|
||||
public final static String uploadDeleteKey = "upload.delete";
|
||||
|
||||
/**
|
||||
* 声明direct交换机
|
||||
*/
|
||||
// @Bean
|
||||
// DirectExchange udiExchange() {
|
||||
// //设置消息持久化
|
||||
// return new DirectExchange(exchangeName, true, false);
|
||||
// }
|
||||
|
||||
/**
|
||||
* 声明topic交换机
|
||||
*/
|
||||
@Bean
|
||||
TopicExchange udiTopicExchange() {
|
||||
//设置消息持久化
|
||||
return new TopicExchange(topicExchangeName, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明队列
|
||||
*/
|
||||
@Bean
|
||||
Queue syncQueue() {
|
||||
//设置消息持久化
|
||||
return new Queue(syncQueueName, true);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue uploadQueue() {
|
||||
//设置消息持久化
|
||||
return new Queue(uploadQueueName, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将根据路由key队列绑定交换机
|
||||
*/
|
||||
@Bean
|
||||
Binding bindingSync() {
|
||||
return BindingBuilder.bind(syncQueue()).to(udiTopicExchange()).with(syncTopicKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将根据路由key队列绑定交换机
|
||||
*/
|
||||
@Bean
|
||||
Binding bindingUpload() {
|
||||
return BindingBuilder.bind(uploadQueue()).to(udiTopicExchange()).with(uploadTopicKey);
|
||||
}
|
||||
|
||||
// /**
|
||||
// * 将根据路由key队列绑定交换机
|
||||
// */
|
||||
// @Bean
|
||||
// Binding bindingSync() {
|
||||
// return BindingBuilder.bind(syncQueue()).to(udiExchange()).with(syncKey);
|
||||
// }
|
||||
//
|
||||
// @Bean
|
||||
// Binding bindingUpload() {
|
||||
// return BindingBuilder.bind(uploadQueue()).to(udiExchange()).with(uploadKey);
|
||||
// }
|
||||
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package com.glxp.sale.admin.rabbitmq;
|
||||
|
||||
import com.glxp.sale.admin.config.rabbit.RabbitConfig;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class SyncListener {
|
||||
|
||||
@Resource(name = "localRabbitTemplate")
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@RabbitListener(queues = RabbitConfig.syncQueueName, containerFactory = "upperFactory", concurrency = "20-40")
|
||||
public void process(Message message, @Payload String str) {
|
||||
//获取路由key
|
||||
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
||||
System.out.println(routingKey);
|
||||
System.out.println(str);
|
||||
switch (routingKey) {
|
||||
case RabbitConfig.syncInsertKey:
|
||||
case RabbitConfig.syncUpdateKey:
|
||||
case RabbitConfig.syncDeleteKey:
|
||||
rabbitTemplate.convertSendAndReceive(RabbitConfig.topicExchangeName, routingKey, str);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package com.glxp.sale.admin.rabbitmq;
|
||||
|
||||
import com.glxp.sale.admin.config.rabbit.RabbitConfig;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class UploadListener {
|
||||
|
||||
@Resource(name = "upperRabbitTemplate")
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Value("${upper.spring.rabbitmq.exchange}")
|
||||
private String upperExchange;
|
||||
|
||||
@RabbitListener(queues = RabbitConfig.uploadQueueName,containerFactory = "localFactory", concurrency = "20-40")
|
||||
public void process(Message message, @Payload String str) {
|
||||
//获取路由key
|
||||
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
|
||||
System.out.println(routingKey);
|
||||
System.out.println(str);
|
||||
switch (routingKey) {
|
||||
case RabbitConfig.uploadInsertKey:
|
||||
case RabbitConfig.uploadUpdateKey:
|
||||
case RabbitConfig.uploadDeleteKey:
|
||||
rabbitTemplate.convertSendAndReceive(upperExchange, routingKey, str);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue