1. MQ介绍
##1.1 为什么要用MQ
消息队列是一种“先进先出”的数据结构
其应用场景主要包含以下3个方面
- 应用解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
- 流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。
处于经济考量目的:
业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰
- 数据分发
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可
1.2 MQ的优点和缺点
优点:解耦、削峰、数据分发
缺点包含以下几点:
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
如何保证MQ的高可用?
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。
如何保证消息数据处理的一致性?
1.3 各种MQ产品的比较
常见的MQ产品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。
2. RocketMQ快速入门
RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。
2.1 准备工作
2.1.1 下载RocketMQ
RocketMQ最新版本:4.5.1
2.2.2 环境要求
Linux64位系统
JDK1.8(64位)
源码安装需要安装Maven 3.2.x
2.2 安装RocketMQ
2.2.1 安装步骤
本次以二进制包方式安装
解压安装包
unzip rocketmq-all-4.4.0-bin-release.zip
- 进入安装目录
2.2.2 目录介绍
- bin:启动脚本,包括shell脚本和CMD脚本
- conf:实例配置文件 ,包括broker配置文件、logback配置文件等
- lib:依赖jar包,包括Netty、commons-lang、FastJSON等
2.3 启动RocketMQ
进入bin目录启动。
- 启动NameServer
|
- 启动Broker
|
问题描述:
RocketMQ默认的虚拟机内存较大,启动Broker如果因为内存不足失败,需要编辑如下两个配置文件,修改JVM内存大小
|
- 参考设置:
vi runbroker.sh文件修改为:
|
修改完成后,如果之前已经启动了nameserver,那么需要关闭重启,然后再启动broker。
2.4 测试RocketMQ
2.4.1 发送消息
|
2.4.2 接收消息
|
2.5 关闭RocketMQ
|
3. RocketMQ集群搭建
3.1 各角色介绍
- Producer:消息的发送者,发送前需要询问nameser到底要发到哪个broker,broker地址在哪里
- Consumer:消息接收者,订阅消息时,也要询问namesesr需要订阅的消息在那个borker
- Broker:暂存和传输消息,启动后也需要向nameser汇报自己的状况信息。
- NameServer:管理Broker。接受生产者和消费者的请求
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
- Message Queue:相当于是Topic的分区;用于并行发送和接收消息
3.2 集群搭建方式
3.2.1 集群特点
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- 每一个nameser节点的数据都是一样的,但是他们之间是没有进行相互数据同步的,那么只能说明,broker启动后需要都给所有的nameser通信,汇报自己的情况。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
- master和slave的出现目的是为了,进行读写分离。主节点是负责写。从节点负责读,他们之间进行数据同步(类似zk的集群架构)。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
3.2.3 集群模式
1)单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
2)多Master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
3)多Master多Slave模式(消息异步更新)
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式(master和broker之间数据同步采用异步方式),主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
4)多Master多Slave模式(消息同步更新)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
3.3 双主双从集群搭建
3.3.1 总体架构
消息高可用采用2m-2s(同步双写)方式
3.3.2 集群工作流程
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
3.3.3 服务器环境
序号 | IP | 角色 | 架构模式 |
---|---|---|---|
1 | 192.168.25.135 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.25.138 | nameserver、brokerserver | Master2、Slave1 |
3.3.4 Host添加信息
|
配置如下:
|
配置完成后, 重启网卡
|
3.3.5 防火墙配置
宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙
|
或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:
nameserver
默认使用 9876 端口master
默认使用 10911 端口slave
默认使用11011 端口
执行以下命令:
|
3.3.6 环境变量配置
|
在profile文件的末尾加入如下命令
|
输入:wq! 保存并退出, 并使得配置立刻生效:
|
3.3.7 创建消息存储路径
|
3.3.8 broker配置文件
我们可以打开rmq的conf配置目录:
可以看到他本身就提供了三种样例的配置目录,从上往下分别是:双master和双slave的消息异步同步,双master和双slave的消息同步同步,双master。分别对应了我们上面所讲的三种集群模式。
因为本次搭建的是双m双s的消息同步的方式,所以只需要修改2m-2s-sync目录下的配置文件即可。
1)master1
服务器:192.168.25.135
|
修改配置如下:
|
2)slave2
服务器:192.168.25.135
|
修改配置如下:
|
3)master2
服务器:192.168.25.138
|
修改配置如下:
|
4)slave1
服务器:192.168.25.138
|
修改配置如下:
|
3.3.81
同理把其余两个服务器还没有配置的配置文件,根据上面的broker配置,在配置即可
3.3.9 修改启动脚本文件
1)runbroker.sh
|
需要根据内存大小进行适当的对JVM参数进行调整:
|
####2)runserver.sh
|
|
3.3.10 服务启动
1)启动NameServe集群
分别在192.168.25.135和192.168.25.138启动NameServer
|
2)启动Broker集群
- 在192.168.25.135上启动master1和slave2
master1:
|
slave2:
|
- 在192.168.25.138上启动master2和slave2
master2
|
slave1
|
3.3.11 查看进程状态
启动后通过JPS查看启动进程
3.3.12 查看日志
|
3.6 重要提示!!!!!
如果在云服务器上部署,配置跟以上是一样的,但是需要注意的是,为了避免启动的master或者slave使用内网ip进行通信,所以我们在配置《3.3.8broker配置文件中》
在每个properties配置文件中,需要显式声明master或者salve所属的ip地址:
|
3.4 mqadmin管理工具
3.4.1 使用方式
进入RocketMQ安装位置,在bin目录下执行
注意:打包前在
启动rocketmq-console:
|
启动成功后,我们就可以通过浏览器访问http://localhost:8080
进入控制台界面了,如下图:
集群状态:
4. 消息发送样例
- 导入MQ客户端依赖
|
- 消息发送者步骤分析
|
- 消息消费者步骤分析
|
4.1 基本样例
4.1.1 消息发送
1)发送同步消息
意思就是:生产者发送消息后,阻塞等待rmq的ack通知。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
|
2)发送异步消息
异步消息通常用在对响应时间敏感(不用阻塞等待rmq响应)的业务场景,即发送端不能容忍长时间地等待Broker的响应。
提供了通过回调函数的方式,触发broker响应的结果
|
3)单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
|
4.1.2 消费消息
消息消费模式由消费者来决定,可以由消费者设置MessageModel来决定消息模式。
消息模式默认为集群消费模式
|
1)负载均衡/集群模式 - 默认模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
|
架构图
集群消息是指集群化部署消费者
当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
特点
- 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时(为什么会重投,那是因为你没有返回ack给rmq,所以rmq认为你没收到会再重发),但是不能保证路由到同一台机器上
- 消费状态由broker维护
2)广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
|
当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
特点
消费进度由consumer维护
保证每个消费者消费一次消息
消费失败的消息不会重投
为什么要定义组名!!!!
我们可以看到在创建生产者或者消费者的时候,构造函数都需要传递一个组名;
|
我们可以回过头看一下rmq集群架构图:
可以看到生产者和消费者,都会构建一个集群。那么例如生产者构建集群的目的到底有设么用?生产者不就负责发送消息就可以了么?
答案是:事务消息的时候会使用。我们是否还记得,rmq会回查生产者的接口,确认本次半消息是否投递commit。
而消费者定义组名,那么就可以做负载均衡。相同组内的所有消费者在消费消息时可以做负载均衡。
4.2 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列)(broker中 可能存在多个队列);而消费消息的时候从多个queue上拉取消息(消费者多线程消费消息),这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
4.2.1 顺序消息生产
|
4.2.2 顺序消费消息
|
输出:
|
同一个队列,都是用相同的线程消费(一个queue开启一个线程消费)。
4.3 延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
4.3.1 启动消息消费者
|
4.3.2 发送延时消息
|
###4.3.3 验证
您将会看到消息的消费比存储时间晚10秒
4.3.4 使用限制
|
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
4.4 批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
4.4.1 发送批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
|
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
|
4.5 过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
|
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
|
4.5.1 SQL基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
|
4.5.2 消息生产者
发送消息时,你能通过putUserProperty
来设置消息的属性
|
4.5.3 消息消费者
用MessageSelector.bySql来使用sql筛选消息
|
4.6 事务消息
###4.6.1 流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性
rmq的事务消息默认使用,2PC。
####1)事务消息发送及提交
(1) 发送消息(half消息)(预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。broker端会删除半消息
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
RocketMQ实现方式
Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时:如果超过回查次数,默认回滚消息
TransactionListener的两个方法
executeLocalTransaction
半消息发送成功触发此方法来执行本地事务
checkLocalTransaction
broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
###4.6.1 发送事务消息
1) 创建事务性生产者
使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
|
2)实现事务的监听接口
当发送半消息成功时,我们使用 executeLocalTransaction
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
|
4.6.2 使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionMsgTimeout
参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
kafka和rmq
kafka的定位是:日志处理,那么他对消息的丢失是可以容忍的。
rmq的定位是:消息中间件,对于消息的丢失和处理要求更高。rmq他是借鉴与kafka产生的。
rmq中,broker的master之间是没有进行数据同步的,生产者发送的消息会路由到其中某一个master。也就是说,2m-2s架构中两个master的作用就相当于负载均衡的效果,就是为了承担生产者的写请求压力。两个slave就是承担消费者的读压力。
也就是说,整个rmq的集群你会发现,nameserver之间也是不进行通信的,broker之间也是不进行通信的。仅仅只有master和slave才会存在通信(需要做数据同步)
rmq做到了读写分离,master负责写,slave负责读。而且假设master挂掉了,slave也不会重新选举master,挂了就挂了。但是此时与挂掉的master相关联的slave还是可以接受读请求的。