RabbitMQ概述

消息队列基础

什么是消息队列

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

为什么使用消息队列

主要有三个作用:

  • 解耦: 如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

  • 异步: 如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

  • 削峰:如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

常见消息队列产品

MQ产品

  1. ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ作为数据流的传输。

  2. RabbitMQ:结合erlang语言本身的并发优势,支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。性能较好,但是不利于做二次开发和维护。

  3. ActiveMQ: 历史悠久的开源项目,是Apache下的一个子项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,不够轻巧(源代码比RocketMQ多),支持持久化到数据库,对队列数较多的情况支持不好,不过我们的项目中并不会建很多的队列。

  4. Redis:做为一个基于内存的K-V数据库,其提供了消息订阅的服务,可以当作MQ来使用,目前应用案例较少,且不方便扩展。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为 128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如 果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于 Redis。

  5. RocketMQ: 阿里巴巴的MQ中间件,在其多个产品下使用,并能够撑住双十一的大流量,他并没有实现JMS规范,使用起来很简单。部署由一个 命名服务(nameserver)和一个代理(broker)组成,nameserver和broker以及producer都支持集群,队列的容量受机器硬盘的限制,队列满后可以支持持久化到硬盘(也可以自己适配代码,将其持久化到NOSQL数据库中),队列满后会影响吞吐量,可以采用主备来保证稳定性,支持回溯消费,可以在broker端进行消息过滤。

  6. Jafka/Kafka:Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka 之上孵化而来的,即Kafka的一个升级版。

    具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以 达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持 Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过 Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。

    Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

MQ选型

针对消息中间件的选择可以从以下方面进行考虑,以ActiveMQ和RocketMQ对比作为案例。

  1. 优先级:RocketMQ需要新建一个特殊队列来接收优先级高的队列,无法实现从0-65535这种细粒度的控制,ActiveMQ可以精细控制。

  2. 顺序:activeMQ无法保证严格的顺序,RocketMQ可以保证严格的消费顺序。

  3. 持久化:都支持

  4. 稳定性:RoketMQ在稳定性上可能更值得信赖,支持多种集群方案。

  5. 消息过滤:ActiveMQ仅支持在客户端消费的时候进行判断是否是自己需要的消息,RocketMQ可以在broker端进行过滤,对于我们的消息总线,这里可以节省大量的网络传输是否会有消息重发造成的重复消费:RocketMQ可以保证,ActiveMQ无法保证。

  6. 回溯消费:即重新将某一个时刻之前的消息重新消费一遍,RocketMQ支持,ActiveMQ不支持(RocketMQ的队列是持久化到硬盘的,定期进行清除。

  7. 事务:都支持

  8. 定时消费:RocketMQ支持

  9. 消息堆积:就是当缓存消息的内存满了之后的解决方案,一种是丢弃策略,这种不会影响吞吐量,还有一种就是将消息持久化到磁盘,这种会影响吞吐量,在评估影响程度上,RocketMQ的成绩稍微好一点。

  10. 客户端不在线:RocketMQ可以在客户端上线后继续将未消费的消息推送到客户端。

RabbitMQ基本概念

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

RabbitMQ组成部分

RabbitMQ大致由一下几部分组成:

  • Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列
  • Queue:消息队列,存储消息的队列。
  • Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
  • Consumer:消息消费者。消费队列中存储的消息。

这些组成部分是如何协同工作的呢,大概的流程如下,请看下图:

  • 消息生产者连接到RabbitMQ Broker,创建connection,开启channel。
  • 生产者声明交换机类型、名称、是否持久化等。
  • 生产者发送消息,并指定消息是否持久化等属性和routing key。
  • exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的队列里面。
  • 消费者监听接收到消息之后开始业务处理。

Exhange类型及用法

Direct Exchange

直连交换机意思是此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的,点对点的发送。

Fanout Exchange

这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。

Topic Exchange

直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 “#”。需要注意的是通配符前面必须要加上"."符号。

* 符号:有且只匹配一个词。比如 a.*可以匹配到"a.b"、“a.c”,但是匹配不了"a.b.c"。

# 符号:匹配一个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”,也可以匹配到"rabbit.a.b.c"。

Headers Exchange

这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。如图所示:

创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。

6种消息模型

基本消息类型

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

work消息模型

工作队列或者竞争消费者模式

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

这个消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。

接下来我们来模拟这个流程:

P:生产者:任务的发布者

C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)

C2:消费者2:领取任务并且完成任务,假设完成速度较快

发布订阅模型

Publish/subscribe模型示意图 :

对于生产者,和前面两种模式不同:

  • 1) 声明Exchange,不再声明Queue
  • 2) 发送消息到Exchange,不再发送到Queue

1、publish/subscribe与work queues有什么区别?

  • 区别:

    1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

    2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。

    3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。

  • 相同点:

    所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

2、实际工作用 publish/subscribe还是work queues?

建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。

Routing路由模型

Routing模型示意图:

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

Topics通配符模型

Topics模型示意图:

每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

从示意图可知,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的Routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“..”。

我们创建了三个绑定:Q1绑定了“.orange.”,Q2绑定了“...rabbit”和“lazy.#”。

Q1匹配所有的橙色动物。

Q2匹配关于兔子以及懒惰动物的消息。

RPC模型

RPC模型示意图:

一、基本概念

Callback queue 回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

Correlation id 关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

二、流程说明

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用。

常见面试题

  1. 避免消息堆积?

    1) 采用workqueue,多个消费者监听同一队列。

    2)接收到消息以后,而是通过线程池,异步消费。

  2. 如何避免消息丢失?

    1) 消费者的ACK机制。可以防止消费者丢失消息。但是,如果在消费者消费之前,MQ就宕机了,消息就没了?

    2)可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化

参考资料

  1. https://developer.aliyun.com/article/769883
  2. https://blog.51cto.com/u_15076204/2582716
  3. https://blog.csdn.net/jisuanjiguoba/article/details/118722644
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2021-2022 Yin Peng
  • 引擎: Hexo   |  主题:修改自 Ayer
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信