RabbitMQ
应用场景
异步处理
- 异步并发执行可并行的业务逻辑,缩短事务时间。例如:用户注册成功,发提醒邮件、发提醒短信、送积分、送优惠券等。
- 异步、分步处理事务,降低了事务处理的复杂度,同时提高了事务的容错能力。对于复杂的大事务,将其拆分为多个小的子事务,最后通过最终一致性来保证事务处理的完整性,事务处理过程中,只要关键的子事务处理完成,后续的各个子事务处理失败都可以单独重试处理,大大提高了系统的容错能力。
应用解耦
- 解耦分布式系统之间的强依赖,提高系统可用性。例如:订单系统下单到库存系统去减库存,如果库存系统暂时不可用会导致无法下单,通过MQ解耦两个子系统后,即使库存系统暂时不可用,也可以完成下单,待后续库存系统故障解决后,可以继续对未处理减库存的订单进行处理。
流量削峰
- 限制系统访问流量,防止海量并发访问导致服务宕机,保证系统可用性。例如:秒杀活动可以使用MQ将高并发访问量通过异步方式处理,从而达到削峰的目的。
系统架构

Broker
它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。
Exchange
消息交换机,它指定消息按什么规则,路由到哪个队列。
Exchange Type
fanout
direct
topic
headers
Queue
消息的载体,每个消息都会被投到一个或多个队列。
Name
Durable
Exclusive
Auto-delete
Binding
绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key
路由关键字,exchange根据这个关键字进行消息投递。
Vhost
虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer
消息生产者,就是投递消息的程序。
Consumer
消息消费者,就是接受消息的程序。
Channel
消息通道,在客户端的每个连接里,可建立多个channel。
Consumer消费模式(Push & Pull)
可靠性
发送可靠性
Transaction
开启事务发送消息,保证消息肯定发送成功。但开启事务会造成MQ的吞吐量急剧下降(降低250倍,非事务模式可处理100000条左右,事务模式每秒仅能处理几百条),不适用于对性能有要求的场景。
在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定是到达broker了,如果在txCommit执行之前broker异常奔溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
try{
channel.TxSelect();
channel.BasicPublish("yu.exchange", "yu.1", props, msg);
channel.TxCommit();
}
catch (Exception ex)
{
channel.TxRollback();
}
Publisher Confirm
Publisher Confirm机制(又称为Confirms或Publisher Acknowledgements)是作为解决事务机制性能开销大(导致吞吐量下降)而提出的另外一种保证消息不会丢失的方式。confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack。
Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。
普通confirm模式
每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
channel.confirmSelect();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
批量confirm模式
每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
异步confirm模式
提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
服务可靠性
为了保证消费者和生产者在RabbitMQ节点崩溃的情况下继续运行,或者通过增加更多的节点来扩展消息通信的吞吐量,可以通过搭建RabbitMQ分布式集群架构保证高可用性(HA)。
cluster
不支持跨网段,用于同一个网段内的局域网;可以随意的动态增加或者减少;节点之间需要运行相同版本的RabbitMQ和Erlang。
普通模式
默认的集群模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。
镜像模式
将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。
federation
应用于广域网,允许单台服务器上的交换机或队列接收发布到另一台服务器上交换机或队列的消息,可以是单独机器或集群。
shovel
连接方式与federation的连接方式类似,但它工作在更低层次。可以应用于广域网。
消费可靠性
手动确认(ACK)模式
消息的消费模式默认为自动确认模式,只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。此种模式适用于无需保证消费可靠性的场景。
为了确保消费的可靠性,一般采用手动确认模式,消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
if(process()){
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}else{
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
可靠性方案
要求:可重试、幂等、去重、消息ID、ACK、requeue、集群、事务、持久化
发送可靠性通过publisher confirms机制保证
发送方通过异步事务publisher confirm来保证发送的可靠性,同时为了提高发送的鲁棒性,将消息持久化到消息数据库(mongodb)再发送,发送成功则删除,发送失败则由调度进行重试,最大程度地保证发送的可靠性。
服务可靠性通过持久化队列消息、镜像模式集群保证
RabbitMQ服务的可靠性通过持久化队列、消息到磁盘中,同时通过镜像模式的集群部署保证服务的高可用性。
消费可靠性通过手动消息确认机制保证
消费方通过手动的消息确认机制保证消息的可靠性,消费成功则通知RabbitMQ删除消息,消费失败则将消息发送到失败队列(或者持久化到数据库)单独处理。
延迟队列
延迟队列存储的对象肯定是对应的延迟消息,所谓”延迟消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
- 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。
- 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。
RPC
参考文献
RabbitMQ的应用场景以及基本原理介绍
RabbitMQ基础概念详细介绍
rabbitmq可靠发送的自动重试机制
rabbitmq可靠确认模式的java封装及示例
rabbitMq生产者角度:消息持久化、事务机制、PublisherConfirm、mandatory
RabbitMQ之PublisherConfirm实战问题总结
RabbitMQ不同Confirm模式下的性能对比
RabbitMQ之消息确认机制(事务+Confirm)
RabbitMQ分布式集群架构和高可用性(HA)
RabbitMQ如何实现延迟队列?
RabbitMQ之RPC实现
RabbitMQ之Consumer消费模式(Push & Pull)
RabbitMQ消息可靠性分析