Zohar's blog

RabbitMQ 使用笔记

ArchitectureMessage Queuemessage queuerabbit mq

本文记录了 RabbitMQ 的背后协议 AMQP 和 RabbitMQ 的工作模式

1. AMQP

AMQP(Advanced Message Queuing Protocol),高级消息队列协议

1.1 是什么

是一个提供统一消息服务的应用层标准高级消息队列协议,是为消息中间件而设计的应用层协议的开放标准;遵循此应用协议的客户端与消息中间件之间能正常工作。

1.2 为什么

在 AMQP 诞生之前,对于异步消息处理技术,不同厂商的解决方案都是闭源的,各大厂商产品不相兼容且价格高昂。

2006 年,AMQP 规范发布,大部分公司开始遵循此标准进行开发,成为全行业广泛使用的消息中间件技术标准;

而消息中间件的目标,是让消息中间件的能力最终被网络本身所具有,使得 AMQP 仿佛 TCP 一般的存在;

1.2.1 AMQP 解决的问题

  1. 信息的发送者和接收者如何维持连接,如果连接中断数据丢失怎么办;

  2. 如何降低发送者和接受者的耦合度?

  3. 如何让优先级接收者最先接收到消息?

  4. 如何均衡接收者的负载?

  5. 如何过滤接收者不需要的消息?

  6. 如何保证接收者接收到完整正确的消息?

1.3 怎么做

AMQP 协议划分为两个部分:

  1. Functional Layer
    提供了一套确定的消息交换模型,即“高级消息交换协议”,模型定义了路由和存储消息等功能模块,以及定义了这些模块间交换消息的规则;

  2. Transport Layer:
    定义了网络数据传输协议,不同的客户端通过此协议和消息代理可以实现和模型的通信;基于二进制数据流传输,不同的客户端通过此协议和消息代理可以实现和模型的通信;

1.4 AMQP 模型

AMQP模型

  • Blocker
    消息队列服务器实体,即 MQ Server Application。
    为 Message 提供一条从 Producer 到确定的 Consumer 之间的路线,保证消息按照指定的方式准确传输

  • Virtual Host
    虚拟主机,缩写为 VHost,可视为一台概念上的 MQ 服务器。
    一台 Blocker 中有多个 VHost,即现实中的 MQ 服务器用作多个虚拟消息队列服务器,以供 Blocker 中的不同用户作权限分离

  • Client - Publisher
    提供者客户端,也称 Produce ,创建并将消息发送至 Blocker。
    Publisher 发送的消息中携带一个叫 routingKey 的信息,指示 Blocker 根据该 routingKey 将消息放入指定的消息队列;

  • Client - Consumer
    消费者客户端,消息的处理者,订阅 Blocker 中的消息队列,接收并处理消息。

  • Connection
    连接,Client 与 Server 之间建立的 TCP 连接。
    断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或 broker 服务出现问题。

  • Channel
    通道,建立在 Connection 中的虚拟连接。
    通常是每个 Thread 创建单独的 Channel 进行通讯,以减轻服务器创建连接的开销;AMQP 提供了 Channel id 帮助客户端和服务器识别 Channels,所以 Channel 之间是相互隔离的。

  • Exchange
    消息交换机,消息中携带的分发规则(routingKey),将消息分发到不同的消息队列。
    消息交换机与多个消息队列通过 Binding 进行绑定,若无绑定的话,消息同样可以发送给交换机,不过消息将会直接丢失,因为交换机并没有存储的能力。

  • Queue
    消息队列,存储消息并推送消息。
    使消息根据指定的规则进行排序或过滤,再不断地将整理后的消息发送给订阅者

  • Binding
    绑定规则,每一个消息队列与消息交换机相绑定,每一条绑定规则对应一个 bindingKey。
    bindingKey 等绑定信息存储于消息队列中的查询表中,交换机根据该表选择分发消息到目标队列。

1.5 工作方式

  1. 客户端与主机建立连接 Connection

     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("127.0.0.1");
     factory.setProt("5672");
     factory.setVirtualHost("/vhost_guest");
     factory.setUsername("guest");
     factory.setPassword("guest");
     Connection connection = factory.newConnection();
    
  2. 根据不同业务建立不同通道 Channel

     Channel channel = connection.createChannel();
    
  3. 提供者客户端绑定交换机并发送消息

     channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
    
  4. 消费者客户端监听队列

     channel.basicConsume(QUEUE_NAME, autoAck, consumer)
    
  5. Exchange 根据消息中的标识将消息路由至相应的队列

  6. 队列根据订阅它的客户端状态,选择将消息分发给消费者客户端

2. Rabbit MQ

RabbitMQ 是根据 AMQP 协议开发的,其工作原理与 AMQP 工作原理相同

官方教程:RabbitMQ tutorial

2.1 Queue 工作模式

Queue 工作模式负责处理的是:Queue 与 Consumer 之间的关系

2.1.1 Simple Queue

simple-queue

  • 介绍:简单模式,单个消费者订阅单个队列,现实中这种情况较为少见,消费者和队列都不需要进行特殊配置

  • 缺陷:生产者一一对应消费者,耦合性高;现实中,对于消息的处理业务是比较花时间的,而单一消费者消费队列中信息的话,效率将非常低

2.1.2 Work Queue

  • 介绍:工作模式,多个消费者订阅同一队列,此时消费者被称为 Worker。在工作模式中,队列对于消息的分发有两种方式

  • 模式:

    • Round-robin: work-queue 轮询模式,队列根据订阅它的消费者数量,均分消息,即不同消费者所处理的消息数量是一样的。 消费者工作效率不同,假设有 A、B 消费者,队列使用轮询的方式,不断询问A、B消费者是否空闲,消费者如果空闲,就将消息发送给消费者;如果 B 效率较高,不论 B 是否已经处理完属于它的所有消息,属于 A 的消息都不会分发给 B。

      Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              doWork(body);
          }
      };
      boolean autoAck = true; // 自动确认模式
      channel.basicConsume(QUEUE_NAME, autoAck, consumer);
      
    • fair-dispatch: fair-dispatch 公平分发,队列根据消费者处理能力进行消息分发,多劳多得。 消费者每处理完消息,便返回回执给队列,队列再将消息发送给消费者。

      int prefetchCount = 1;// 消费者同一时间接收并处理消息的数量
      channel.basicQos(prefetchCount);
      Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              doWork(body);
              channel.basicAck(envelope.getDeliveryTag(), false);// 返回回执
          }
      };
      boolean autoAck = false; // 手动确认模式
      channel.basicConsume(QUEUE_NAME, autoAck, consumer);
      

2.1.3 Duration

RabbitMQ 的消息默认保存在内存中,但在声明队列时,可配置队列消息持久化

boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

2.2 Exchange 工作模式

Exchange 工作模式负责处理的是:Queue 与 Exchange 之间的关系

2.2.1 fanout

fanout

广播模式,直译为“扇展”,将消息发送至所有与该交换机相绑定的队列。

String routingKey = "";
BasicProperties properties = "";
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
...
channel.basicPublish(EXCHANGE_NAME, routingKey, properties, message.getBytes());

2.2.2 direct

exchange-direct 直接模式,将消息发送至 bindingKey 与消息标识的 routingKey 相同的队列,比广播模式更具筛选效果,要求全字匹配。

  1. 类广播模式direct-exchange-multiple

    直接模式允许以相同的 bindingKey 绑定同一个交换机,此时交换机会像广播模式一样工作。

  2. 订阅模式direct-exchange-subscribe

    直接模式同样允许同一队列与交换机有多个绑定,并称之为“订阅模式”。

2.2.3 topic

exchange-topic

主题模式,比直接模式更为灵活,提供模糊匹配功能,规则提供了两个通配符:#*

  • #:匹配 0 至 n 个单词

  • *:必须匹配 1 个单词

规则是关键字与通配符中加上一个 “.” 进行连接,如:#.payment.*payment.#*.payment.#。可前置可后置,可同时使用。

如:携带 “payment.jojo” routingKey 信息的消息将路由至由 #.payment.*payment.# 所绑定的队列,“refound.payment.jojo” 将路由至 #.payment.**.payment.# 队列,而 “refound.payment” 将只路由至 *.payment.# 队列

2.2.4 headers

待补充…

2.3 消息确认机制、

2.3.1 消费者消息应答

Message Acknowledgement,RabbitMQ 支持消息应答,通过 autoAck 进行设置,默认为自动应答模式

boolean autoAck = false; // 消息确认模式
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  • autoAck = true
    自动确认模式,一旦消息队列将消息分发给消费者,就会将消息从内存中删除。

  • autoAck = false 手动确认模式,消息队列将消息分发给消费者,消费者处理消息结束后,会发放回执给消息队列,此时消息队列再将消息删除;若发放后,该消费者与消息队列断开连接,则消息队列会将消息发放给其他队列。

2.3.2 生产者事务

消息应答能够保证消息发送到消费者客户端,而如何保证消息提供者的消息能准确到达消息队列,事务功能提供了部分实现:若消息发送至队列时出错,则进行回滚和记录。

  • 开启事务:channel.txSelect();

  • 提交事务:channel.txCommit();

  • 回滚事务:channel.txRollback();

try {
    channel.txSelect();
    channel.basicPublish(EXCHANGE_NAME, null, null, message.getBytes());
    // other codes...
    channel.txCommit();
} catch(Exception e) {
    channel.txRollback();
    // log sth...
}

2.3.3 confirm 模式

channel 可以设置为 confirm 模式,此时所有在该 channel 上发布的消息都会携带一个唯一 id,一旦消息投递到目标队列,broker 会向 Producer 发送一个确认应答。

若队列开启了持久化消息功能,则 broker 将在将消息持久化完成后再返回确认消息。

broker 回传的确认消息中的 deliver-tag 域包含了消息的 id,broker 可以设置 basic.ack 为 multiple,则表示,在此 id 前的所有消息都已经得到处理。

  • 普通模式: 每一条消息都需要经过确认,安全性较高,效率较低

      ...
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      channel.confirmSelect();
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
      if(!channel.waitForConfirms()){
          // log fail...
      } else {
          // log success...
      }
    
  • 批量模式: 所有消息发送完再进行确认,效率较高,但是难以确认具体那一条消息出错

      ...
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      channel.confirmSelect();
      for (String message : messages) {
          channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
      }
      if(!channel.waitForConfirms()){
          // log fail...
      } else {
          // log success...
      }
    
  • 异步模式: Channel 对象提供了 ConfirmListener() 回调方法,即监听器;监听器包含两个参数:deliveryTag(long)multiple(boolean),即序列号和批量标识。我们可以为未确认消息创建一个集合,采用实现了 SortedSet 的 SynchronizedSortedSet 数据结构,将消息的序列号按照发送顺序存储,方便实现批量删除。

      channel.confirmSelect();
      final SortedSet<Long> unConfirmMsg = Collections.synchronizedSortedSet(new TreeSet<Long>());
    
      channel.addConfirmListener(new ConfirmListener() {
          // 成功接收消息
          public void handleAck(long deliveryTag, boolean multiple) throws IOException {
              if (multiple) {
                  // log batch success..
                  unConfirmMsg.headSet(deliveryTag+1).clear();
              } else {
                  // log single success..
                  unConfirmMsg.remove(deliveryTag);
              }
          }
          // 消息接收异常,如果消息多次重传失败,标记为无效消息
          public void handleNack(long deliveryTag, boolean multiple) throws IOException {
              if (multiple) {
                  // log batch fail..
                  unConfirmMsg.headSet(deliveryTag+1).clear();
                  // resent the messages in 1s/3s/10s...
              } else {
                  // log single fail..
                  unConfirmMsg.remove(deliveryTag);
                  // resent the messages in 1s/3s/10s...
              }
          }
      });
    
      long deliveryTag;
      ...
      deliveryTag = channel.getNextPublishSeqNo();
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
      unConfirmMsg.add(deliveryTag);
    

2.3 RPC 实现

RPC 即 Remote Procedure Call,远程过程调用,其目的在于,提供者可以获取消费者消息的处理结果。

RPC

提供者在发送消息时,会为消息设置 “replyTo” (队列的名字)属性和 “correlationId”(标识号);同时提供者会订阅 “replyTo” 队列;消费者在处理完成消息后,将处理结果连同标识号发送至 “replyTo” 所指的队列,提供者就能获取消费者的处理结果。

// Producer
public String doServe(message) throws IOException, InterruptedException {
    final String corrId = UUID.randomUUID().toString();
    String responseQueueName = channel.queueDeclare().getQueue(); // "reply_to"
    BasicProperties requestProperties = new BasicProperties
                            .Builder()
                            .correlationId(corrId)
                            .replyTo(responseQueueName)
                            .build();

    channel.basicPublish("", requestQueueName, requestProperties, message.getBytes());
    // the result from the Consumer
    final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
    String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response.offer(new String(delivery.getBody(), "UTF-8"));
        }
    });
    String result = response.take();
    channel.basicCancel(ctag);
    return result;
}
// Consumer
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    BasicProperties responseProperties = new BasicProperties
            .Builder()
            .correlationId(delivery.getProperties().getCorrelationId())
            .build();

    String responseQueueName = delivery.getProperties().getReplyTo();
    String response = "";

    try {
        String message = new String(delivery.getBody(), "UTF-8");
        response = consume(message); // handle the message
    } catch (RuntimeException e) {
        // log error...
    } finally {
        // sent the result to the Producer
        channel.basicPublish("", responseQueueName, responseProperties, response.getBytes());
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        // RabbitMq consumer worker thread notifies the RPC server owner thread
        synchronized (monitor) {
            monitor.notify();
        }
    }
};
channel.basicConsume(requestQueueName, false, deliverCallback);