Compare commits

...

1 Commits

Author SHA1 Message Date
wj 92951209c7 rabbit 2 years ago

@ -307,6 +307,11 @@
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>

@ -0,0 +1,103 @@
package com.glxp.sale.admin.config.rabbit;
import cn.hutool.core.util.StrUtil;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitConfig {
// private final static String exchangeName = "sync_direct_exchange";
public final static String topicExchangeName = "sync_topic_exchange";
/**
*
*/
public final static String syncQueueName = "sync";
public final static String uploadQueueName = "upload";
private final static String syncTopicKey = "sync.*";
public final static String syncInsertKey = "sync.insert";
public final static String syncUpdateKey = "sync.update";
public final static String syncDeleteKey = "sync.delete";
private final static String uploadTopicKey = "upload.*";
public final static String uploadInsertKey = "upload.insert";
public final static String uploadUpdateKey = "upload.update";
public final static String uploadDeleteKey = "upload.delete";
/**
* direct
*/
// @Bean
// DirectExchange udiExchange() {
// //设置消息持久化
// return new DirectExchange(exchangeName, true, false);
// }
/**
* topic
*/
@Bean
TopicExchange udiTopicExchange() {
//设置消息持久化
return new TopicExchange(topicExchangeName, true, false);
}
/**
*
*/
@Bean
Queue syncQueue() {
//设置消息持久化
return new Queue(syncQueueName, true);
}
@Bean
Queue uploadQueue() {
//设置消息持久化
return new Queue(uploadQueueName, true);
}
/**
* key
*/
@Bean
Binding bindingSync() {
return BindingBuilder.bind(syncQueue()).to(udiTopicExchange()).with(syncTopicKey);
}
/**
* key
*/
@Bean
Binding bindingUpload() {
return BindingBuilder.bind(uploadQueue()).to(udiTopicExchange()).with(uploadTopicKey);
}
// /**
// * 将根据路由key队列绑定交换机
// */
// @Bean
// Binding bindingSync() {
// return BindingBuilder.bind(syncQueue()).to(udiExchange()).with(syncKey);
// }
//
// @Bean
// Binding bindingUpload() {
// return BindingBuilder.bind(uploadQueue()).to(udiExchange()).with(uploadKey);
// }
}

@ -0,0 +1,103 @@
package com.glxp.sale.admin.config.rabbit;
import cn.hutool.core.util.StrUtil;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitConnectionConfig {
/*
*rabbitMQ
*/
@Bean(name = "upperConnectionFactory")
public ConnectionFactory upperConnectionFactory(
@Value("${upper.spring.rabbitmq.host}") String host,
@Value("${upper.spring.rabbitmq.port}") int port,
@Value("${upper.spring.rabbitmq.username}") String username,
@Value("${upper.spring.rabbitmq.password}") String password,
@Value("${upper.spring.rabbitmq.virtual-host}") String virtualHost) {
return connectionFactory(host, port, username, password, virtualHost);
}
/*
*rabbitMQ
*/
@Bean(name = "localConnectionFactory")
@Primary
public ConnectionFactory localConnectionFactory(
@Value("${local.spring.rabbitmq.host}") String host,
@Value("${local.spring.rabbitmq.port}") int port,
@Value("${local.spring.rabbitmq.username}") String username,
@Value("${local.spring.rabbitmq.password}") String password,
@Value("${local.spring.rabbitmq.virtual-host}") String virtualHost) {
return connectionFactory(host, port, username, password, virtualHost);
}
/*
*rabbitMQ
*/
// @Bean(name = "localConnectionFactory")
// @Primary
// public ConnectionFactory localConnectionFactory(
// @Value("${listen.rabbitmq.local.host}") String host,
// @Value("${listen.rabbitmq.local.port}") int port,
// @Value("${listen.rabbitmq.local.username}") String username,
// @Value("${listen.rabbitmq.local.password}") String password,
// @Value("${listen.rabbitmq.local.virtual-host}") String virtualHost) {
// return connectionFactory(host, port, username, password, virtualHost);
// }
public CachingConnectionFactory connectionFactory(String host, int port, String username, String password, String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
if (StrUtil.isNotBlank(virtualHost)) {
connectionFactory.setVirtualHost(virtualHost);
}
return connectionFactory;
}
@Bean(name = "upperRabbitTemplate")
public RabbitTemplate upperRabbitTemplate(@Qualifier("upperConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate upperRabbitTemplate = new RabbitTemplate(connectionFactory);
return upperRabbitTemplate;
}
@Bean(name = "localRabbitTemplate")
@Primary
public RabbitTemplate localRabbitTemplate(@Qualifier("localConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate localRabbitTemplate = new RabbitTemplate(connectionFactory);
return localRabbitTemplate;
}
@Bean(name = "upperFactory")
public SimpleRabbitListenerContainerFactory upperFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("upperConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "localFactory")
@Primary
public SimpleRabbitListenerContainerFactory localFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("localConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}

@ -0,0 +1,39 @@
package com.glxp.sale.admin.rabbitmq;
import com.glxp.sale.admin.config.rabbit.RabbitConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
@RequiredArgsConstructor
public class SyncListener {
@Resource(name = "localRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitConfig.syncQueueName, containerFactory = "upperFactory", concurrency = "20-40")
public void process(Message message, @Payload String str) {
//获取路由key
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
System.out.println(routingKey);
System.out.println(str);
switch (routingKey) {
case RabbitConfig.syncInsertKey:
case RabbitConfig.syncUpdateKey:
case RabbitConfig.syncDeleteKey:
rabbitTemplate.convertSendAndReceive(RabbitConfig.topicExchangeName, routingKey, str);
default:
break;
}
}
}

@ -0,0 +1,44 @@
package com.glxp.sale.admin.rabbitmq;
import com.glxp.sale.admin.config.rabbit.RabbitConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
@RequiredArgsConstructor
public class UploadListener {
@Resource(name = "upperRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Value("${upper.spring.rabbitmq.exchange}")
private String upperExchange;
@RabbitListener(queues = RabbitConfig.uploadQueueName,containerFactory = "localFactory", concurrency = "20-40")
public void process(Message message, @Payload String str) {
//获取路由key
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
System.out.println(routingKey);
System.out.println(str);
switch (routingKey) {
case RabbitConfig.uploadInsertKey:
case RabbitConfig.uploadUpdateKey:
case RabbitConfig.uploadDeleteKey:
rabbitTemplate.convertSendAndReceive(upperExchange, routingKey, str);
default:
break;
}
}
}

@ -1,7 +1,7 @@
# \u751F\u4EA7\u73AF\u5883
server.port=10001
server.port=10002
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.jdbc-url=jdbc:mysql://127.0.0.1:3306/udispsync?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.jdbc-url=jdbc:mysql://127.0.0.1:3333/udispsync?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=123456
#spring.datasource.password=xiamenswan
@ -44,9 +44,9 @@ spring.redis.database=6
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
spring.redis.port=6377
# Redis服务器连接密码默认为空
spring.redis.password=
spring.redis.password=123456
#连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制)
@ -57,3 +57,17 @@ spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.jedis.timeout=300
local.spring.rabbitmq.host=192.168.0.66
local.spring.rabbitmq.port=5672
local.spring.rabbitmq.virtual-host=sync/dev
local.spring.rabbitmq.username=glxp
local.spring.rabbitmq.password=glxp1234
upper.spring.rabbitmq.host=192.168.0.66
upper.spring.rabbitmq.port=5672
upper.spring.rabbitmq.virtual-host=dev
upper.spring.rabbitmq.username=glxp
upper.spring.rabbitmq.password=glxp1234
upper.spring.rabbitmq.exchange=spms_topic_exchange

Loading…
Cancel
Save