RabbitMQ学习笔记

简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之前共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

特点

  • 开源、性能优秀、稳定保障
  • 提供可靠性信息投递模式(confirm)、返回模式(return)
  • API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模式
  • 保证数据不丢失的前提下做到高可靠性、可用性

AMQP高级消息队列协议

  • AMQP全称:Advanced Message Queuing Protocol
  • AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。

AMQP协议模型

AMQP核心概念

  • Server:又称为Broker,接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序已Broker之间的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行信息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。
  • Message:消息,服务器和应用程序之间传递的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息内容实体。
  • Virtaul host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual host里面可以有若干个Exchange和Queue,同一个Vitual host里面不能有相同名称的Exchange或Queue。
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue直接的虚拟连接,Binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ整体架构

RabbitMQ消息流转过程

Message会携带 exchange信息和routing key,根据exchange信息确定消息传递到哪个exchange,再有exchange根据routing key路由到指定的Message Queue。

RabbitMQ基础操作

RabbitMQ共有三种主要的操作命令语句:rabbitmqctl、rabbitmq-server、rabbitmq-plugins。其中rabbitmqctl命令是最丰富的。

常用命令如下:

  • 启动应用:rabbitmqctl start_app
  • 关闭应用:rabbitmqctl stop_app
  • 节点状态:rabbitmqctl status

用户相关的命令:

  • 添加用户:rabbitmqctl add_user username password
  • 列出所有用户:rabbitmqctl list_users
  • 删除用户:rabbitmqctl delete_user username
  • 清除用户权限:rabbitmqctl clear_permission -p vhostpath username
  • 修改用户密码:rabbitmqctl change_password username newpassword
  • 列出用户权限:rabbitmqctl list_user_permissions username

案例,新增root用户,并设置root用户的权限和tags:

虚拟主机相关的命令:

  • 创建虚拟主机:rabbitmqctl add_vhost vhostpath
  • 列出所有虚拟主机:rabbitmqctl list_vhosts
  • 列出虚拟主机上的所有权限:rabbitmqctl list_permissions -p vhostpath
  • 删除虚拟主机:rabbitmqctl delete_vhost vhostpath

队列相关的命令:

  • 查看所有队列信息:rabbitmqctl list_queues
  • 清除队列里的消息:rabbitmqctl -p vhostpath purge_queue blue

高级命令:

  • 移除所有数据,要在rabbitmqctl stop_app之后使用:rabbitmqctl reset
  • 组成集群命令:rabbitmqctl join_cluster [–ram | –disc]
  • 查看集群状态:rabbitmqctl cluster_status
  • 修改集群节点的存储形式:rabbitmqctl change_cluster_node_type ram | disc
  • 摘除节点:rabbitmqctl forget_cluster_node [–offline]
  • 修改节点名称:rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2] …

消息生产与消费

使用Java代码构建消息的生产者与消费者,主要步骤如下:

  1. 获取连接工厂:ConnectionFactory
  2. 获取一个连接:Connection
  3. 数据通信信道,可以发送个接收消息:Channel
  4. 具体的消息存储队列:Queue
  5. 生产者和消费者:Productor & Consumer
  • 生产者端代码:

    package com.rabbitmq.quickstart;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Procuder {
        public static void main(String[] args) throws Exception{
            //1. 创建一个链接工厂,并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //2. 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
            //3. 通过connection创建一个channel
            Channel channel = connection.createChannel();
            //4. 通过channel发送数据
            String msg = "Hello World";
            channel.basicPublish("","test001",null,msg.getBytes());
            System.out.println("生产端消息:" + msg);
            //5. 关闭连接
            channel.close();
            connection.close();
        }
    }
  • 消费者端代码:

    package com.rabbitmq.quickstart;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    @SuppressWarnings("ALL")
    public class Consumer {
        public static void main(String[] args) throws Exception{
            //1. 创建一个链接工厂,并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            //2. 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
            //3. 通过connection创建一个channel
            Channel channel = connection.createChannel();
            //4. 声明(创建)一个队列
            String queueName = "test001";
            channel.queueDeclare(queueName,true,false,false,null);
            //5. 创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            //6. 设置channel
            channel.basicConsume(queueName, true, queueingConsumer);
            //7. 获取消息
            while(true){
                try{
                    Delivery delivery = queueingConsumer.nextDelivery();
                    String msg = new String(delivery.getBody());
                    System.err.println("消费端消息:" + msg);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
  • 先运行消费者端代码,开始监听消息:

  • 然后运行生产者端代码,发布消息:

  • 最后查看消费端是否收到生产端发送的消息:

Exchange交换机

  • Exhange:接收消息,并根据路由键转发消息到队列。

  • 交换机的属性

    • Name:交换机名称
    • Type:交换机类型 direct/topic/fanout/headers
    • Durability:是否需要持久化,true为持久化
    • Auto Delete:当最后一个绑定到exchange上的队列删除后,自动删除该exchange
    • Internal:当前exchange是否用于RabbitMQ内部使用,默认为false
    • Argument:扩展参数,用户扩展AMQP协议自制定化使用
  • Direct Exchange

    • 所有发送到Direct Exchange的消息被转发到Routing Key中指定的Queue

      注:Direct模式可以使用RabbitMQ自带的 default exchange,default exchange不需要将exchange进行任何绑定(binding)操作,消息传递时,Routing Key必须完全匹配才会被队列接收,否则该消息会被抛弃。

  • Topic Exchange

    • 所有发送到topic exchange的消息被转发到所有关心routing key中指定的topic的queue上。

    • exchange将routing key 和某topic进行模糊匹配,此时队列需要绑定一个topic。

      注:可以使用通配符进行模糊匹配

      符号‘#’匹配一个或多个词

      符号‘*’匹配一个词

      例如:log.# 能够匹配到 log.info.la

      log.* 只能匹配到 log.err

  • Fanout Exchange

    • 不处理路由键,只需要简单的将队列绑定到交换机上

    • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

    • fanout交换机转发消息是最快的

      注:不使用routing key。

其他概念详解

  • Binding绑定

    • Exchange和Exchange,Exchange和Queue直接的连接关系
    • Binding中可以包含routing key路由规则
  • Queue消息队列

    • 消息队列,实际存储消息数据
    • Durability:是否持久化,Durable为是,Transient为否
    • Auto Delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除
  • Message消息

    • 服务器和应用程序之间传递的数据
    • 本质上就是一段数据,由Properties和Payload(Body)组成
    • 常用属性:delivery mode、headers(自定义属性)
    • 其他属性:content_type、content_encoding、priority、correlation_id、reply_to、expiration、message_id、timestamp、type、user_id、app_id…
  • Virtual Host虚机主机

    • 虚机地址,用于进行逻辑隔离,最上层的消息路由

    • 一个Virtual Host里面可以有若干个exchange和queue

    • 同一个virtual host里面不能有相同名称的exchange和queue

RabbitMQ高级属性

  • 消息如何保障100%的投递成功

    1. 什么是生产端的可靠性投递?

      • 保障消息的成功发出
      • 保障MQ节点的成功接收
      • 发送端收到MQ节点(Broker)确认应答
      • 完善的消息补偿机制
    2. 互联网大厂的解决方案

      • 消息落库,对消息状态进行打标
      • 消息的延迟投递,做二次确认,回调检查
    3. 生产端可靠性投递方案–消息落库

      step1: 消息投递之前先存数据库,BIZ DB是指业务数据库,MSG DB是指消息数据库,专门存储message

      step2: 进行消息投递,将消息发送的MQ Broker

      step3: 消息确认,broker发送确认消息给生产端,表示消息接收成功

      step4: 将MSG DB中message的status字段更新为status:1,表示该消息已经发送成功

      如果在执行step3的时候由于网络等原因导致确认消息发送失败,那么生产端无法收到确认消息,message的status会一直未0,这种情况就需要依靠其他任务来辅助,比如图中的分布式定时任务。

      step5: 分布式定时任务,定时无获取MSG DB中的messge的status字段信息,如果发现status为0,将会执行消息重传

      step6: 重传消息

      step7:定时任务中会设置消息重传的次数,如图所示,若重传次数大于3,会将message的status设置为2,表示该消息发送失败

    4. 生产端可靠性投递–消息延迟投递

      • 消息落库的方式在高并发场景下不适用

      • 消息延迟投递,做二次确认,回调检查

  • 幂等性概念

    1. 幂等性是什么

      对一个结果执行一次或多次操作后的结果都相同的,唯一的,这就是幂等性。

      借鉴数据库的乐观锁机制:

      比如执行一条更新库存的SQL语句:

      UPDATE T_REPS SET COUNT = COUNT + 1, VERSION = VERSION + 1 WHERE VERSION = 1;

    2. 消费端-幂等性保障

      在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

      • 消费端实现幂等性,就意味着消息永远不会被消费多次,即使收到了多条一样的消息
      • 业界主流的幂等性操作(尤其是互联网金融领域):
        • 唯一ID + 指纹码机制,利用数据库主键去重
        • 利用redis的原子性实现
    3. 消费端幂等性方案–唯一ID + 指纹码

      • 唯一ID + 指纹码 机制,利用数据库主键去重
      • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码
      • 好处:实现简单
      • 坏处:高并发下有数据库写入的性能瓶颈
      • 解决方案:根据ID进行分库分表,进行算法路由(哈希算法)
    4. 消费端幂等性方案–redis原子性

      • 使用redis进行幂等,需要考虑的问题
        • 第一:是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
        • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
  • Confirm确认消息

    1. 理解Confirm消息确认机制:

      • 消息的确认,是指生产者投递消息之后,如果broker收到消息,则会给生产者一个应答

      • 生产者进行接收应答,用来确定这条消息是否成功的发送到broker,这种方式也是消息的可靠性投递的核心保障

    2. 如何实现Confirm确认消息

      • 第一步:在channel上开启确认模式:channel.confirmSelect()
      • 第二步:在channel上添加监听:addConfirmListener, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
  • Return消息机制

    Return Listener用于处理一些不可路由的消息!

    • 消息的生产者,通过指定一个exchange和routing key,把消息发送到某一个队列中,然后消费者监听队列,进行消费处理操作

    • 但是在某些情况下,如果在发送消息的时候,当前的exchange不存在或者指定的routing key路由不到,这个时候如果需要监听这种不可达的消息,就要使用return listener。

    • 在基础API中有一个关机的配置项:

      • Mandatory(默认为false, 使用return消息机制时需设置为true),如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果为false,那么broker会自动删除该消息。
    • return机制流程机制:

  • 消费端自定义监听

    • 一般消费端监听就是在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。
    • 在实际工作中最常用的方式是使用自定义的Consumer,解耦性也更强,通过继承DefaultConsumer类实现
  • 消费端限流

    • 什么是消费端的限流?

      • 假设一个场景,首先,rabbitmq服务器上有上万条未处理的消息,随便打开一个消费者客户端,会出现以下情况:巨量的消息瞬间全部推送过滤,但是单个客户端无法同时处理这个多数据。
    • 消费端限流的解释

      • rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置qos的值)未被确认前,不消费新的消息。

        非自动确认即手工ACK

  • 消费端ACK和重回队列

    消费端的手工ACK包括:ACK(接收成功)和NACK(接收失败)

    • 消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿

    • 如果由于服务器宕机等严重问题,那就需要手工进行ACK保障消费端消费成功

      消费端的重回队列

    • 消费端重回队列是为了针对没有处理成功的消息,把消息重新传递给broker

    • 一般实际的应用中都会关闭重回队列,也就是设置为false

  • TTL队列/消息

    • TTL是Time To Live的缩写,也就是生存时间
    • rabbitmq支持消息的过期时间,在消息发送时进行制定
    • rabbitmq支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间,那么消息会被自动清除
  • 死信队列

    • 死信队列:DLX, Dead-Letter-Exchange

    • 利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个exchange,这个exchange就是DLX。

    • 消息变成死信有以下几种情况:

      • 消息被拒绝(basic.reject/basic.nack)并且requeue=false
      • 消息TTL过期
      • 队列达到最大长度
    • 死信队列

      • DLX也是一个正常的exchange,和一般的exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性
      • 当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上去进而被路由到另一个队列
      • 可以监听这个队列中的消息并做相应的处理,这个特性可以弥补rabbitmq3.0以前支持的immediate参数的功能
    • 死信队列设置的步骤:

      • 首先需要设置死信队列的exchange和queue,然后进行绑定

        • Exchange: dlx.exchange
        • Queue: dlx.queue
        • RoutingKey: #(任何路由)
      • 然后在进行正常声明交换机、队列、绑定,只不过在队列上需要加上一个参数即可:arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);

      • 这样在消息过期、requeue、队列达到最大长度时,消息就可以直接路由到死信队列中去

RabbitMQ集群

  • RabbitMQ集群架构模式–主备模式

    • 主备模式:实现rabbitmq的高可用集群,一般在并发和数据量不高的情况下,这种模型非常好用且简单。主备模式也称之为Warren模式

    • RabbitMQ主备模式架构:

    • HaProxy配置说明:

      listen rabbitmq_cluster
      bind 0.0.0.0:5672   
      mode tcp       # 配置tcp模式
      balance roundrobin   # 简单的轮询
      server server10 192.168.1.10:5672 check inter 5000 rise 2 fall 2  # 主节点
      server server11 192.168.1.11:5672 backup check inter 5000 rise 2 fall 2 # 备节点

      注:inter 5000 表示每个5秒钟对mq集群做一次健康检查

      rise 2 表示2次正确代表服务可用

      fall 2 表示2次失败代表服务不可用

  • RabbitMQ集群架构模式–远程模式(使用较少)

    • 远程模式:远程模式是可以实现双活的一种模式,简称为Shovel模式,所谓Shovel就是可以把消息进行不同数据中心的复制工作,可以跨地域让两个mq集群互联
    • 通过使用Shovel插件实现
  • RabbitMQ集群架构模式–镜像模式

    • 镜像模式:这是一种非常经典的mq集群模式–Mirror镜像模式,可以保证100%数据不丢失,在实际工作中也是使用最多的。并且实现该集群模式的非常简单,一般互联网大厂都会构建这种集群模式。

    • 镜像队列:目的是为了保证rabbitmq数据的高可靠性,主要实现数据的同步,一般2-3个节点实现数据同步。

    • 镜像模式架构:

  • RabbitMQ集群架构模式–多活模式

    • 多活模式:这种模式也是实现异地数据复制的主流模式,因为Shovel模式配置比较复杂,所以一般实现异地集群都是使用这种双活或多活模型来实现。这种模式需要依赖rabbitmq的federation插件,可以实现持续可靠的AMQP数据通信,多活模式的实际配置与应用也非常简单。

    • Federation插件是一个不需要构建cluster,而在broker直接传输消息的高性能插件,federation插件可以在brokers或者cluster直接传输消息,连接的双方可以使用不同的users和virtual hosts,双方也可以使用版本不同的rabbitmq和Erlang。federation插件使用AMQP协议,可以接收不连续的传输。

    • 多活模式架构: