From 92951209c7b82424a0efebd9b6a7fd2c03bb2090 Mon Sep 17 00:00:00 2001 From: wj <1285151836@qq.com> Date: Tue, 9 May 2023 20:10:09 +0800 Subject: [PATCH] rabbit --- api-admin/pom.xml | 5 + .../admin/config/rabbit/RabbitConfig.java | 103 ++++++++++++++++++ .../config/rabbit/RabbitConnectionConfig.java | 103 ++++++++++++++++++ .../sale/admin/rabbitmq/SyncListener.java | 39 +++++++ .../sale/admin/rabbitmq/UploadListener.java | 44 ++++++++ .../main/resources/application-dev.properties | 22 +++- 6 files changed, 312 insertions(+), 4 deletions(-) create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConfig.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConnectionConfig.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/SyncListener.java create mode 100644 api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/UploadListener.java diff --git a/api-admin/pom.xml b/api-admin/pom.xml index 5996192..6272e2b 100644 --- a/api-admin/pom.xml +++ b/api-admin/pom.xml @@ -307,6 +307,11 @@ 3.5.3 + + org.springframework.boot + spring-boot-starter-amqp + + diff --git a/api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConfig.java b/api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConfig.java new file mode 100644 index 0000000..0aaf241 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConfig.java @@ -0,0 +1,103 @@ +package com.glxp.sale.admin.config.rabbit; + +import cn.hutool.core.util.StrUtil; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +@Configuration +public class RabbitConfig { + + // private final static String exchangeName = "sync_direct_exchange"; + public final static String topicExchangeName = "sync_topic_exchange"; + + + /** + * 队列名 + */ + public final static String syncQueueName = "sync"; + public final static String uploadQueueName = "upload"; + + + private final static String syncTopicKey = "sync.*"; + public final static String syncInsertKey = "sync.insert"; + public final static String syncUpdateKey = "sync.update"; + public final static String syncDeleteKey = "sync.delete"; + + private final static String uploadTopicKey = "upload.*"; + public final static String uploadInsertKey = "upload.insert"; + public final static String uploadUpdateKey = "upload.update"; + public final static String uploadDeleteKey = "upload.delete"; + + /** + * 声明direct交换机 + */ +// @Bean +// DirectExchange udiExchange() { +// //设置消息持久化 +// return new DirectExchange(exchangeName, true, false); +// } + + /** + * 声明topic交换机 + */ + @Bean + TopicExchange udiTopicExchange() { + //设置消息持久化 + return new TopicExchange(topicExchangeName, true, false); + } + + /** + * 声明队列 + */ + @Bean + Queue syncQueue() { + //设置消息持久化 + return new Queue(syncQueueName, true); + } + + @Bean + Queue uploadQueue() { + //设置消息持久化 + return new Queue(uploadQueueName, true); + } + + /** + * 将根据路由key队列绑定交换机 + */ + @Bean + Binding bindingSync() { + return BindingBuilder.bind(syncQueue()).to(udiTopicExchange()).with(syncTopicKey); + } + + /** + * 将根据路由key队列绑定交换机 + */ + @Bean + Binding bindingUpload() { + return BindingBuilder.bind(uploadQueue()).to(udiTopicExchange()).with(uploadTopicKey); + } + +// /** +// * 将根据路由key队列绑定交换机 +// */ +// @Bean +// Binding bindingSync() { +// return BindingBuilder.bind(syncQueue()).to(udiExchange()).with(syncKey); +// } +// +// @Bean +// Binding bindingUpload() { +// return BindingBuilder.bind(uploadQueue()).to(udiExchange()).with(uploadKey); +// } + +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConnectionConfig.java b/api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConnectionConfig.java new file mode 100644 index 0000000..2a5d5ff --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/config/rabbit/RabbitConnectionConfig.java @@ -0,0 +1,103 @@ +package com.glxp.sale.admin.config.rabbit; + + +import cn.hutool.core.util.StrUtil; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +@Configuration +public class RabbitConnectionConfig { + + /* + *定义上级连接,rabbitMQ + */ + @Bean(name = "upperConnectionFactory") + public ConnectionFactory upperConnectionFactory( + @Value("${upper.spring.rabbitmq.host}") String host, + @Value("${upper.spring.rabbitmq.port}") int port, + @Value("${upper.spring.rabbitmq.username}") String username, + @Value("${upper.spring.rabbitmq.password}") String password, + @Value("${upper.spring.rabbitmq.virtual-host}") String virtualHost) { + return connectionFactory(host, port, username, password, virtualHost); + } + /* + *定义本地连接,rabbitMQ + */ + @Bean(name = "localConnectionFactory") + @Primary + public ConnectionFactory localConnectionFactory( + @Value("${local.spring.rabbitmq.host}") String host, + @Value("${local.spring.rabbitmq.port}") int port, + @Value("${local.spring.rabbitmq.username}") String username, + @Value("${local.spring.rabbitmq.password}") String password, + @Value("${local.spring.rabbitmq.virtual-host}") String virtualHost) { + return connectionFactory(host, port, username, password, virtualHost); + } + + /* + *定义本地连接,rabbitMQ + */ +// @Bean(name = "localConnectionFactory") +// @Primary +// public ConnectionFactory localConnectionFactory( +// @Value("${listen.rabbitmq.local.host}") String host, +// @Value("${listen.rabbitmq.local.port}") int port, +// @Value("${listen.rabbitmq.local.username}") String username, +// @Value("${listen.rabbitmq.local.password}") String password, +// @Value("${listen.rabbitmq.local.virtual-host}") String virtualHost) { +// return connectionFactory(host, port, username, password, virtualHost); +// } + + public CachingConnectionFactory connectionFactory(String host, int port, String username, String password, String virtualHost) { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setHost(host); + connectionFactory.setPort(port); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + if (StrUtil.isNotBlank(virtualHost)) { + connectionFactory.setVirtualHost(virtualHost); + } + return connectionFactory; + } + + @Bean(name = "upperRabbitTemplate") + public RabbitTemplate upperRabbitTemplate(@Qualifier("upperConnectionFactory") ConnectionFactory connectionFactory) { + RabbitTemplate upperRabbitTemplate = new RabbitTemplate(connectionFactory); + return upperRabbitTemplate; + } + + @Bean(name = "localRabbitTemplate") + @Primary + public RabbitTemplate localRabbitTemplate(@Qualifier("localConnectionFactory") ConnectionFactory connectionFactory) { + RabbitTemplate localRabbitTemplate = new RabbitTemplate(connectionFactory); + return localRabbitTemplate; + } + + + @Bean(name = "upperFactory") + public SimpleRabbitListenerContainerFactory upperFactory( + SimpleRabbitListenerContainerFactoryConfigurer configurer, + @Qualifier("upperConnectionFactory") ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } + @Bean(name = "localFactory") + @Primary + public SimpleRabbitListenerContainerFactory localFactory( + SimpleRabbitListenerContainerFactoryConfigurer configurer, + @Qualifier("localConnectionFactory") ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/SyncListener.java b/api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/SyncListener.java new file mode 100644 index 0000000..bf1b9c1 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/SyncListener.java @@ -0,0 +1,39 @@ +package com.glxp.sale.admin.rabbitmq; + +import com.glxp.sale.admin.config.rabbit.RabbitConfig; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SyncListener { + + @Resource(name = "localRabbitTemplate") + private RabbitTemplate rabbitTemplate; + + @RabbitListener(queues = RabbitConfig.syncQueueName, containerFactory = "upperFactory", concurrency = "20-40") + public void process(Message message, @Payload String str) { + //获取路由key + String routingKey = message.getMessageProperties().getReceivedRoutingKey(); + System.out.println(routingKey); + System.out.println(str); + switch (routingKey) { + case RabbitConfig.syncInsertKey: + case RabbitConfig.syncUpdateKey: + case RabbitConfig.syncDeleteKey: + rabbitTemplate.convertSendAndReceive(RabbitConfig.topicExchangeName, routingKey, str); + default: + break; + } + } + +} diff --git a/api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/UploadListener.java b/api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/UploadListener.java new file mode 100644 index 0000000..5f89658 --- /dev/null +++ b/api-admin/src/main/java/com/glxp/sale/admin/rabbitmq/UploadListener.java @@ -0,0 +1,44 @@ +package com.glxp.sale.admin.rabbitmq; + +import com.glxp.sale.admin.config.rabbit.RabbitConfig; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Slf4j +@Service +@RequiredArgsConstructor +public class UploadListener { + + @Resource(name = "upperRabbitTemplate") + private RabbitTemplate rabbitTemplate; + + @Value("${upper.spring.rabbitmq.exchange}") + private String upperExchange; + + @RabbitListener(queues = RabbitConfig.uploadQueueName,containerFactory = "localFactory", concurrency = "20-40") + public void process(Message message, @Payload String str) { + //获取路由key + String routingKey = message.getMessageProperties().getReceivedRoutingKey(); + System.out.println(routingKey); + System.out.println(str); + switch (routingKey) { + case RabbitConfig.uploadInsertKey: + case RabbitConfig.uploadUpdateKey: + case RabbitConfig.uploadDeleteKey: + rabbitTemplate.convertSendAndReceive(upperExchange, routingKey, str); + default: + break; + } + } + +} diff --git a/api-admin/src/main/resources/application-dev.properties b/api-admin/src/main/resources/application-dev.properties index 1f6055f..2a5666f 100644 --- a/api-admin/src/main/resources/application-dev.properties +++ b/api-admin/src/main/resources/application-dev.properties @@ -1,7 +1,7 @@ # \u751F\u4EA7\u73AF\u5883 -server.port=10001 +server.port=10002 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.jdbc-url=jdbc:mysql://127.0.0.1:3306/udispsync?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true +spring.datasource.jdbc-url=jdbc:mysql://127.0.0.1:3333/udispsync?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true spring.datasource.username=root spring.datasource.password=123456 #spring.datasource.password=xiamenswan @@ -44,9 +44,9 @@ spring.redis.database=6 # Redis服务器地址 spring.redis.host=127.0.0.1 # Redis服务器连接端口 -spring.redis.port=6379 +spring.redis.port=6377 # Redis服务器连接密码(默认为空) -spring.redis.password= +spring.redis.password=123456 #连接池最大连接数(使用负值表示没有限制) spring.redis.jedis.pool.max-active=8 # 连接池最大阻塞等待时间(使用负值表示没有限制) @@ -57,3 +57,17 @@ spring.redis.jedis.pool.max-idle=8 spring.redis.jedis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.jedis.timeout=300 + + +local.spring.rabbitmq.host=192.168.0.66 +local.spring.rabbitmq.port=5672 +local.spring.rabbitmq.virtual-host=sync/dev +local.spring.rabbitmq.username=glxp +local.spring.rabbitmq.password=glxp1234 + +upper.spring.rabbitmq.host=192.168.0.66 +upper.spring.rabbitmq.port=5672 +upper.spring.rabbitmq.virtual-host=dev +upper.spring.rabbitmq.username=glxp +upper.spring.rabbitmq.password=glxp1234 +upper.spring.rabbitmq.exchange=spms_topic_exchange