一、基本概念
1、消息队列
1.1 消息队列是什么
消息(Message)是指在应用间传送的数据。
消息队列(Message Queue)是一种应用间的通信方式,是一种应用间的异步协作机制。消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
1.2 消息队列的模式
1.2.1 点对点模式
一个具体的消息只能由一个消费者消费。多个生产者可以向同一个消息队列发送消息;但是,一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。需要额外注意的是,如果消费者处理一个消息失败了,消息系统一般会把这个消息放回队列,这样其他消费者可以继续处理。
1.2.2 发布/订阅模式
单个消息可以被多个订阅者并发的获取和处理。一般来说,订阅有两种类型:
- 临时(ephemeral)订阅,这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。
- 持久(durable)订阅,这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。
1.3 消息队列选型衡量标准
- 消息顺序:发送到队列的消息,消费时是否可以保证消费的顺序,比如A先下单,B后下单,应该是A先去扣库存,B再去扣,顺序不能反。
- 消息路由:根据路由规则,只订阅匹配路由规则的消息,比如有A/B两者规则的消息,消费者可以只订阅A消息,B消息不会消费。
- 消息可靠性:是否会存在丢消息的情况,比如有A/B两个消息,最后只有B消息能消费,A消息丢失。
- 消息时序:主要包括“消息存活时间”和“延迟/预定的消息”,“消息存活时间”表示生产者可以对消息设置TTL,如果超过该TTL,消息会自动消失;“延迟/预定的消息”指的是可以延迟或者预订消费消息,比如延时5分钟,那么消息会5分钟后才能让消费者消费,时间未到的话,是不能消费的。
- 消息留存:消息消费成功后,是否还会继续保留在消息队列。
- 容错性:当一条消息消费失败后,是否有一些机制,保证这条消息是一种能成功,比如异步第三方退款消息,需要保证这条消息消费掉,才能确定给用户退款成功,所以必须保证这条消息消费成功的准确性。
- 伸缩:当消息队列性能有问题,比如消费太慢,是否可以快速支持库容;当消费队列过多,浪费系统资源,是否可以支持缩容。
- 吞吐量:支持的最高并发数。
2、RabbitMQ是什么
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
3、RabbitMQ的特点
-
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 -
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 -
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 -
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 -
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 -
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 -
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 -
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 -
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
4、RabbitMQ 中的概念模型
4.1 消息模型
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
4.2 具体结构
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念。
AMQP协议模型由三部分组成:生产者、消费者和服务端。
生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。
同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
-
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 -
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。 -
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 -
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 -
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 -
Connection
网络连接,比如一个TCP连接。 -
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 -
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 -
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 -
Broker
表示消息队列服务器实体。
4.3 AMQP 中的消息路由
AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
4.4 Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。
4.4.1 direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
4.4.2 fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
4.4.3 topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:“*” 、 “#”。需要注意的是通配符前面必须要加上"."符号。#匹配0个或多个单词,*只匹配一个单词。
4.5 Queue 类型
4.5.1 durable
持久化,保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
若是将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,会重新读取之前被持久化的queue。
虽然队列可以被持久化,但是里面的消息是否为持久化,还要看消息的持久化设置。即重启queue,但是queue里面还没有发出去的消息,那队列里面还存在该消息么?这个取决于该消息的设置。
4.5.2 autoDeleted
自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
当一个Queue被设置为自动删除时,当消费者断掉之后,queue会被删除,这个主要针对的是一些不是特别重要的数据,不希望出现消息积累的情况。
5、消费原理
- broker:每个节点运行的服务程序,功能为维护该节点的队列的增删以及转发队列操作请求。
- master queue:主队列,每个队列都分为一个主队列和若干个镜像队列。
- mirror queue:镜像队列,作为master queue的备份。在master queue所在节点挂掉之后,系统把mirror queue提升为master queue,负责处理客户端队列操作请求。注意,mirror queue只做镜像,设计目的不是为了承担客户端读写压力。
因为mirror queue要和master queue保持一致,故需要同步机制,正因为一致性的限制,导致所有的读写操作都必须都操作在master queue上,然后由master节点同步操作到mirror queue所在的节点。即使consumer连接到了非master queue节点,该consumer的操作也会被路由到master queue所在的节点上,这样才能进行消费。
6、高级特性
6.1 TTL
TTL即一条消息在队列中的最大存活时间。
- RabbitMQ可以对消息和队列设置TTL。
- RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。
- RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。
- 如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。
- 当然也可以不设置TTL,不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃。
6.2 消息确认
消费者订阅队列的时候,可以指定autoAck参数,当autoAck为true的时候,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。
当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。
6.3 持久化
RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。
也可以将所有的消息都设置为持久化,但是这样做会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。
6.4 死信队列
当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列。消息变成死信有下面几种情况:
- 消息被拒绝。
- 消息过期
- 队列达到最大长度
当这个队列中有死信的时候,RabbitMQ会自动将这个消息重新发送到设置的交换器上,进而被路由到另一个队列,我们可以监听这个队列中消息做相应的处理。
当发生异常的时候,消息不能够被消费者正常消费,被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常,进而改善和优化系统。
6.5 延迟队列
一般的队列,消息一旦进入队列就会被消费者立即消费。
延迟队列就是进入该队列的消息会被消费者延迟消费,延迟队列中存储的对象是的延迟消息,“延迟消息”是指当消息被发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。
6.6 特性分析
- 消息有序(不支持):当消费消息时,如果消费失败,消息会被放回队列,然后重新消费,这样会导致消息无序;
- 持久化(不太好):没有消费的消息,可以支持持久化,这个是为了保证机器宕机时消息可以恢复,但是消费过的消息,就会被马上删除,因为RabbitMQ设计时,就不是为了去存储历史数据的。
- 高吞吐(中等):因为所有的请求的执行,最后都是在master queue,它的这个设计,导致单机性能达不到十万级的标准。
- 消息回溯(不支持):因为消息不支持永久保存,所以自然就不支持回溯。
7、RabbitMQ 集群
RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。
7.1 集群中一些概念
RabbitMQ 会始终记录以下四种类型的内部元数据:
-
队列元数据
包括队列名称和它们的属性,比如是否可持久化,是否自动删除 -
交换器元数据
交换器名称、类型、属性 -
绑定元数据
内部是一张表格记录如何将消息路由到队列 -
vhost 元数据
为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性
在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。
集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,除了消息队列。当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。
如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。
7.2 集群存储
集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用
RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。
换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但直到该节点恢复,否则无法更改任何东西。
二、安装
1、安装
1.1 linux
由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang
#由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang
sudo apt-get install erlang-nox
#更新源
sudo apt-get update
#安装rabbitmq
sudo apt-get install rabbitmq-server
1.2 mac
brew update
brew install rabbitmq
2、基本操作
#启动
sudo rabbitmq-server start
#停止
sudo rabbitmq-server stop
#重启
sudo rabbitmq-server restart
#状态
sudo rabbitmqctl status
rabbitmqctl 提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。
#命令位置
/usr/sbin/
#后台启动
/usr/sbin/rabbitmq-server -detached
#启动应用程序
/usr/sbin/rabbitmqctl start_app
#重置节点,该命令将清除所有的队列
/usr/sbin/rabbitmqctl reset
#关闭整个节点
/usr/sbin/rabbitmqctl stop
#关闭指定节点
# -n node 默认 node 名称是 rabbit@server,如果主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com
/usr/sbin/rabbitmqctl -n rabbit@server.example.com stop
#关闭应用程序但保持Erlang节点运行
/usr/sbin/rabbitmqctl stop_app
#查看已声明的队列
/usr/sbin/rabbitmqctl list_queues
#查看交换器
/usr/sbin/rabbitmqctl list_exchanges
#列出交换器的名称、类型、是否持久化、是否自动删除
/usr/sbin/rabbitmqctl list_exchanges name type durable auto_delete
#查看绑定
/usr/sbin/rabbitmqctl list_bindings
3、权限操作
#添加admin用户,密码设置为admin
sudo rabbitmqctl add_user admin admin
#赋予权限
sudo rabbitmqctl set_user_tags admin administrator
#赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
4、web管理器连接
http://localhost:15672
5、集群
5.1 集群配置和启动
#第一个节点
/usr/sbin/rabbitmq-server -detached
#第二个节点如果在同一台主机上,修改名字和端口,避免冲突
RABBITMQ_NODENAME=test_rabbit_1
RABBITMQ_NODE_PORT=5672
/usr/sbin/rabbitmq-server -detached
集群中除第一个节点外后加入的节点需要获取集群中的元数据,所以要先停止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入并且获取集群的元数据,最后重新启动 RabbitMQ 应用程序。
#停止第2个节点的应用程序
/usr/sbin/rabbitmqctl -n test_rabbit_2 stop_app
#重置第2个节点元数据
/usr/sbin/rabbitmqctl -n test_rabbit_2 reset
#第2节点加入第1个节点组成的集群
/usr/sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
#启动第2个节点的应用程序
/usr/sbin/rabbitmqctl -n test_rabbit_2 start_app
之后的节点配置过程类比第二个节点
5.2 集群运维
#停止指定节点
RABBITMQ_NODENAME=test_rabbit_2
/usr/sbin/rabbitmqctl stop
#查看指定节点的状态
/usr/sbin/rabbitmqctl -n test_rabbit_3 cluster_status
参考链接:
https://www.jianshu.com/p/79ca08116d57
https://blog.csdn.net/u010889616/article/details/80643892
https://mp.weixin.qq.com/s/adse6qpIiK0RE-Ebo-z5_Q
如有不对,烦请指出,感谢~