本网站集成rabbitMq同步myslq与ES数据
上文说到本网站集成了ElasticSearch,留下了一个数据一致性问题 因为作者想跟更加熟练Mq的使用,所以选择了以消息中间件异步通知的方式来更新数据
整合SpringAMQP
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置信息:
spring:
rabbitmq:
host: 192.168.0.1 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: guest # 用户名
password: guest # 密码
声明和绑定队列与交换机
package cn.llg.hotel.constants;
public class HotelMqConstants {
//交换机名称
public static final String EXCHANGE_NAME = "hotel.topic";
//新增和修改队列
public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
//删除队列
public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
//RoutingKey
public static final String INSERT_KEY = "hotel.insert";
public static final String DELETE_KEY = "hotel.delete";
}
可以基于注解,也可以基于Bean,这里是基于Bean
package cn.llg.hotel.config;
import cn.llg.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
}
/**
* 绑定队列和交换机关系
*/
@Bean
public Binding insertQueueBinding(){
return BindingBuilder
.bind(insertQueue())
.to(topicExchange())
.with(HotelMqConstants.INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder
.bind(deleteQueue())
.to(topicExchange())
.with(HotelMqConstants.DELETE_KEY);
}
}
#基于注解
@RabbitListener(bindings = @QueueBinding(
value = @Queue("myQueue"), // 队列名称
exchange = @Exchange("myExchange"), // 交换机名称
key = "myRoutingKey")) // 路由键
发送消息MQ
注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:
交换机名称 routingKey 消息内容,这里消息体使用id
@Resource
private RabbitTemplate rabbitTemplate; #注入
rabbitTemplate.convertAndSend(Constants.EXCHANGE_NAME,Constants.INSERT_KEY,blog.getBlogId()); #新增与修改
rabbitTemplate.convertAndSend(Constants.EXCHANGE_NAME,Constants.DELETE_KEY,ids); # 删除文档
监听MQ消息
@Component
public class BlogListener {
@Resource
private BlogService blogService;
@RabbitListener(queues = Constants.INSERT_QUEUE_NAME)
private void insertUpdateDoc(Long id){
blogService.insertUpdateDoc(id); # 调用ElasticsearchRestTemplate的save方法,根据id查询数据,直接放入save
#使用 ElasticsearchRestTemplate 的 save 方法来保存或更新文档。如果传递的文档对象具有已存在的文档 ID,则会执行更新操作;如果文档对象没有 ID,或者指定的 ID 在 Elasticsearch 中不存在,则会执行新增操作
}
@RabbitListener(queues = Constants.DELETE_QUEUE_NAME)
private void deleteDoc(Integer[] ids){
blogService.deleteDoc(Arrays.asList(ids)); # 调用ElasticsearchRestTemplate的delete方法,传入id与文档的class类型
}
}
后续
既然我们使用了mq,也要考虑到mq消息的可靠性,这又是待完成功能。。。。