1. AMQP
AMQP(Advanced Message Queuing Protocol),高级消息队列协议
1.1 是什么
是一个提供统一消息服务的应用层标准高级消息队列协议,是为消息中间件而设计的应用层协议的开放标准;遵循此应用协议的客户端与消息中间件之间能正常工作。
1.2 为什么
在 AMQP 诞生之前,对于异步消息处理技术,不同厂商的解决方案都是闭源的,各大厂商产品不相兼容且价格高昂。
2006 年,AMQP 规范发布,大部分公司开始遵循此标准进行开发,成为全行业广泛使用的消息中间件技术标准;
而消息中间件的目标,是让消息中间件的能力最终被网络本身所具有,使得 AMQP 仿佛 TCP 一般的存在;
1.2.1 AMQP 解决的问题
-
信息的发送者和接收者如何维持连接,如果连接中断数据丢失怎么办;
-
如何降低发送者和接受者的耦合度?
-
如何让优先级接收者最先接收到消息?
-
如何均衡接收者的负载?
-
如何过滤接收者不需要的消息?
-
如何保证接收者接收到完整正确的消息?
1.3 怎么做
AMQP 协议划分为两个部分:
-
Functional Layer:
提供了一套确定的消息交换模型,即“高级消息交换协议”,模型定义了路由和存储消息等功能模块,以及定义了这些模块间交换消息的规则; -
Transport Layer:
定义了网络数据传输协议,不同的客户端通过此协议和消息代理可以实现和模型的通信;基于二进制数据流传输,不同的客户端通过此协议和消息代理可以实现和模型的通信;
1.4 AMQP 模型
-
Blocker:
消息队列服务器实体,即 MQ Server Application。
为 Message 提供一条从 Producer 到确定的 Consumer 之间的路线,保证消息按照指定的方式准确传输 -
Virtual Host:
虚拟主机,缩写为 VHost,可视为一台概念上的 MQ 服务器。
一台 Blocker 中有多个 VHost,即现实中的 MQ 服务器用作多个虚拟消息队列服务器,以供 Blocker 中的不同用户作权限分离 -
Client - Publisher:
提供者客户端,也称 Produce ,创建并将消息发送至 Blocker。
Publisher 发送的消息中携带一个叫 routingKey 的信息,指示 Blocker 根据该 routingKey 将消息放入指定的消息队列; -
Client - Consumer:
消费者客户端,消息的处理者,订阅 Blocker 中的消息队列,接收并处理消息。 -
Connection:
连接,Client 与 Server 之间建立的 TCP 连接。
断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或 broker 服务出现问题。 -
Channel:
通道,建立在 Connection 中的虚拟连接。
通常是每个 Thread 创建单独的 Channel 进行通讯,以减轻服务器创建连接的开销;AMQP 提供了 Channel id 帮助客户端和服务器识别 Channels,所以 Channel 之间是相互隔离的。 -
Exchange:
消息交换机,消息中携带的分发规则(routingKey),将消息分发到不同的消息队列。
消息交换机与多个消息队列通过 Binding 进行绑定,若无绑定的话,消息同样可以发送给交换机,不过消息将会直接丢失,因为交换机并没有存储的能力。 -
Queue:
消息队列,存储消息并推送消息。
使消息根据指定的规则进行排序或过滤,再不断地将整理后的消息发送给订阅者 -
Binding:
绑定规则,每一个消息队列与消息交换机相绑定,每一条绑定规则对应一个 bindingKey。
bindingKey 等绑定信息存储于消息队列中的查询表中,交换机根据该表选择分发消息到目标队列。
1.5 工作方式
-
客户端与主机建立连接 Connection
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setProt("5672"); factory.setVirtualHost("/vhost_guest"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection();
-
根据不同业务建立不同通道 Channel
Channel channel = connection.createChannel();
-
提供者客户端绑定交换机并发送消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
-
消费者客户端监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer)
-
Exchange 根据消息中的标识将消息路由至相应的队列
-
队列根据订阅它的客户端状态,选择将消息分发给消费者客户端
2. Rabbit MQ
RabbitMQ 是根据 AMQP 协议开发的,其工作原理与 AMQP 工作原理相同
官方教程:RabbitMQ tutorial
2.1 Queue 工作模式
Queue 工作模式负责处理的是:Queue 与 Consumer 之间的关系
2.1.1 Simple Queue
-
介绍:简单模式,单个消费者订阅单个队列,现实中这种情况较为少见,消费者和队列都不需要进行特殊配置
-
缺陷:生产者一一对应消费者,耦合性高;现实中,对于消息的处理业务是比较花时间的,而单一消费者消费队列中信息的话,效率将非常低
2.1.2 Work Queue
-
介绍:工作模式,多个消费者订阅同一队列,此时消费者被称为 Worker。在工作模式中,队列对于消息的分发有两种方式
-
模式:
-
Round-robin:
轮询模式,队列根据订阅它的消费者数量,均分消息,即不同消费者所处理的消息数量是一样的。 消费者工作效率不同,假设有 A、B 消费者,队列使用轮询的方式,不断询问A、B消费者是否空闲,消费者如果空闲,就将消息发送给消费者;如果 B 效率较高,不论 B 是否已经处理完属于它的所有消息,属于 A 的消息都不会分发给 B。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { doWork(body); } }; boolean autoAck = true; // 自动确认模式 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
-
fair-dispatch:
公平分发,队列根据消费者处理能力进行消息分发,多劳多得。 消费者每处理完消息,便返回回执给队列,队列再将消息发送给消费者。
int prefetchCount = 1;// 消费者同一时间接收并处理消息的数量 channel.basicQos(prefetchCount); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { doWork(body); channel.basicAck(envelope.getDeliveryTag(), false);// 返回回执 } }; boolean autoAck = false; // 手动确认模式 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
-
2.1.3 Duration
RabbitMQ 的消息默认保存在内存中,但在声明队列时,可配置队列消息持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
2.2 Exchange 工作模式
Exchange 工作模式负责处理的是:Queue 与 Exchange 之间的关系
2.2.1 fanout
广播模式,直译为“扇展”,将消息发送至所有与该交换机相绑定的队列。
String routingKey = "";
BasicProperties properties = "";
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
...
channel.basicPublish(EXCHANGE_NAME, routingKey, properties, message.getBytes());
2.2.2 direct
直接模式,将消息发送至 bindingKey 与消息标识的 routingKey 相同的队列,比广播模式更具筛选效果,要求全字匹配。
-
类广播模式:
直接模式允许以相同的 bindingKey 绑定同一个交换机,此时交换机会像广播模式一样工作。
-
订阅模式:
直接模式同样允许同一队列与交换机有多个绑定,并称之为“订阅模式”。
2.2.3 topic
主题模式,比直接模式更为灵活,提供模糊匹配功能,规则提供了两个通配符:#
和 *
-
#
:匹配 0 至 n 个单词 -
*
:必须匹配 1 个单词
规则是关键字与通配符中加上一个 “.” 进行连接,如:#.payment.*
、payment.#
、*.payment.#
。可前置可后置,可同时使用。
如:携带 “payment.jojo” routingKey 信息的消息将路由至由 #.payment.*
和 payment.#
所绑定的队列,“refound.payment.jojo” 将路由至 #.payment.*
和 *.payment.#
队列,而 “refound.payment” 将只路由至 *.payment.#
队列
2.2.4 headers
待补充…
2.3 消息确认机制、
2.3.1 消费者消息应答
Message Acknowledgement,RabbitMQ 支持消息应答,通过 autoAck 进行设置,默认为自动应答模式
boolean autoAck = false; // 消息确认模式
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
-
autoAck = true
自动确认模式,一旦消息队列将消息分发给消费者,就会将消息从内存中删除。 -
autoAck = false 手动确认模式,消息队列将消息分发给消费者,消费者处理消息结束后,会发放回执给消息队列,此时消息队列再将消息删除;若发放后,该消费者与消息队列断开连接,则消息队列会将消息发放给其他队列。
2.3.2 生产者事务
消息应答能够保证消息发送到消费者客户端,而如何保证消息提供者的消息能准确到达消息队列,事务功能提供了部分实现:若消息发送至队列时出错,则进行回滚和记录。
-
开启事务:
channel.txSelect();
-
提交事务:
channel.txCommit();
-
回滚事务:
channel.txRollback();
try {
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, null, null, message.getBytes());
// other codes...
channel.txCommit();
} catch(Exception e) {
channel.txRollback();
// log sth...
}
2.3.3 confirm 模式
channel 可以设置为 confirm 模式,此时所有在该 channel 上发布的消息都会携带一个唯一 id,一旦消息投递到目标队列,broker 会向 Producer 发送一个确认应答。
若队列开启了持久化消息功能,则 broker 将在将消息持久化完成后再返回确认消息。
broker 回传的确认消息中的 deliver-tag 域包含了消息的 id,broker 可以设置 basic.ack 为 multiple,则表示,在此 id 前的所有消息都已经得到处理。
-
普通模式: 每一条消息都需要经过确认,安全性较高,效率较低
... channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); if(!channel.waitForConfirms()){ // log fail... } else { // log success... }
-
批量模式: 所有消息发送完再进行确认,效率较高,但是难以确认具体那一条消息出错
... channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect(); for (String message : messages) { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } if(!channel.waitForConfirms()){ // log fail... } else { // log success... }
-
异步模式: Channel 对象提供了 ConfirmListener() 回调方法,即监听器;监听器包含两个参数:
deliveryTag(long)
和multiple(boolean)
,即序列号和批量标识。我们可以为未确认消息创建一个集合,采用实现了 SortedSet 的 SynchronizedSortedSet 数据结构,将消息的序列号按照发送顺序存储,方便实现批量删除。channel.confirmSelect(); final SortedSet<Long> unConfirmMsg = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.addConfirmListener(new ConfirmListener() { // 成功接收消息 public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { // log batch success.. unConfirmMsg.headSet(deliveryTag+1).clear(); } else { // log single success.. unConfirmMsg.remove(deliveryTag); } } // 消息接收异常,如果消息多次重传失败,标记为无效消息 public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple) { // log batch fail.. unConfirmMsg.headSet(deliveryTag+1).clear(); // resent the messages in 1s/3s/10s... } else { // log single fail.. unConfirmMsg.remove(deliveryTag); // resent the messages in 1s/3s/10s... } } }); long deliveryTag; ... deliveryTag = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); unConfirmMsg.add(deliveryTag);
2.3 RPC 实现
RPC 即 Remote Procedure Call,远程过程调用,其目的在于,提供者可以获取消费者消息的处理结果。
提供者在发送消息时,会为消息设置 “replyTo” (队列的名字)属性和 “correlationId”(标识号);同时提供者会订阅 “replyTo” 队列;消费者在处理完成消息后,将处理结果连同标识号发送至 “replyTo” 所指的队列,提供者就能获取消费者的处理结果。
// Producer
public String doServe(message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String responseQueueName = channel.queueDeclare().getQueue(); // "reply_to"
BasicProperties requestProperties = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(responseQueueName)
.build();
channel.basicPublish("", requestQueueName, requestProperties, message.getBytes());
// the result from the Consumer
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
// Consumer
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
BasicProperties responseProperties = new BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String responseQueueName = delivery.getProperties().getReplyTo();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
response = consume(message); // handle the message
} catch (RuntimeException e) {
// log error...
} finally {
// sent the result to the Producer
channel.basicPublish("", responseQueueName, responseProperties, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(requestQueueName, false, deliverCallback);