本网站集成rabbitMq同步myslq与ES数据

上文说到本网站集成了ElasticSearch,留下了一个数据一致性问题 因为作者想跟更加熟练Mq的使用,所以选择了以消息中间件异步通知的方式来更新数据

整合SpringAMQP

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>    		
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置信息:

spring:
  rabbitmq:
      host: 192.168.0.1 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: guest # 用户名
      password: guest # 密码

声明和绑定队列与交换机

package cn.llg.hotel.constants;

public class HotelMqConstants {
	//交换机名称
    public static final String EXCHANGE_NAME = "hotel.topic";
    //新增和修改队列
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
    //删除队列
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
    //RoutingKey
    public static final String INSERT_KEY = "hotel.insert";
    public static final String DELETE_KEY = "hotel.delete";
}

可以基于注解,也可以基于Bean,这里是基于Bean

package cn.llg.hotel.config;

import cn.llg.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
    }

    /**
     * 绑定队列和交换机关系
     */
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder
                .bind(insertQueue())
                .to(topicExchange())
                .with(HotelMqConstants.INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder
                .bind(deleteQueue())
                .to(topicExchange())
                .with(HotelMqConstants.DELETE_KEY);
    }
}
#基于注解
   @RabbitListener(bindings = @QueueBinding(
            value = @Queue("myQueue"), // 队列名称
            exchange = @Exchange("myExchange"), // 交换机名称
            key = "myRoutingKey")) // 路由键

发送消息MQ

注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:

交换机名称 routingKey 消息内容,这里消息体使用id

	@Resource
    private RabbitTemplate rabbitTemplate; #注入
	
rabbitTemplate.convertAndSend(Constants.EXCHANGE_NAME,Constants.INSERT_KEY,blog.getBlogId()); #新增与修改
rabbitTemplate.convertAndSend(Constants.EXCHANGE_NAME,Constants.DELETE_KEY,ids); # 删除文档

监听MQ消息

@Component
public class BlogListener {
    @Resource
    private BlogService blogService;

    @RabbitListener(queues = Constants.INSERT_QUEUE_NAME)
    private void insertUpdateDoc(Long id){
        blogService.insertUpdateDoc(id); # 调用ElasticsearchRestTemplate的save方法,根据id查询数据,直接放入save
		#使用 ElasticsearchRestTemplate 的 save 方法来保存或更新文档。如果传递的文档对象具有已存在的文档 ID,则会执行更新操作;如果文档对象没有 ID,或者指定的 ID 在 Elasticsearch 中不存在,则会执行新增操作
    }

    @RabbitListener(queues = Constants.DELETE_QUEUE_NAME)
    private void deleteDoc(Integer[] ids){
        blogService.deleteDoc(Arrays.asList(ids)); # 调用ElasticsearchRestTemplate的delete方法,传入id与文档的class类型
    }
}

后续

既然我们使用了mq,也要考虑到mq消息的可靠性,这又是待完成功能。。。。