RocketMQ总结
1 rocketmq是什么?
是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
- Producer、Consumer、队列都可以分布式。
- Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 consumer
实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 topic 对应的队列集合。
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 较少的依赖
2 rocketmq网络结构
RocketMQ 网络部署特点
Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。
Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
3 模块功能特性
3.1 Namesrv
Namesrv用于存储Topic、Broker关系信息,功能简单,稳定性高。多个Namesrv之间相互没有通信,单台Namesrv宕机不影响其他Namesrv与集群;即使整个Namesrv集群宕机,已经正常工作的Producer,Consumer,Broker仍然能正常工作,但新起的Producer, Consumer,Broker就无法工作。
Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesr发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。
3.2 Broker
1)高并发读写服务
Broker的高并发读写主要是依靠以下两点:
- 消息顺序写,所有Topic数据同时只会写一个文件,一个文件满1G,再写新文件,真正的顺序写盘,使得发消息TPS大幅提高。
- 消息随机读,RocketMQ尽可能让读命中系统pagecache,因为操作系统访问pagecache时,即使只访问1K的消息,系统也会提前预读出更多的数据,在下次读时就可能命中pagecache,减少IO操作。
2) 负载均衡与动态伸缩
负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
动态伸缩能力(非顺序消息):Broker的伸缩性体现在两个维度:Topic, Broker。
- Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
- Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后想Namesrv注册,Producer、Consumer通过Namesrv发现新Broker,立即跟该Broker直连,收发消息。
3) 高可用&高可靠
高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。
高可靠:所有发往broker的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电
4)Broker与Namesrv的心跳机制
单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息,如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
3.3 消费者
消费者启动时需要指定Namesrv地址,与其中一个Namesrv建立长连接。消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。连接建立后,从namesrv中获取当前消费Topic所涉及的Broker,直连Broker。
Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。
消费者端的负载均衡
先讨论消费者的消费模式,消费者有两种模式消费:集群消费,广播消费。
- 广播消费:每个消费者消费Topic下的所有队列。
- 集群消费:一个topic可以由同一个ID下所有消费者分担消费。具体例子:假如TopicA有6个队列,某个消费者ID起了2个消费者实例,那么每个消费者负责消费3个队列。如果再增加一个消费者ID相同消费者实例,即当前共有3个消费者同时消费6个队列,那每个消费者负责2个队列的消费。
消费者端的负载均衡,就是集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列。
3.4 生产者(Producer)
Producer启动时,也需要指定Namesrv的地址,从Namesrv集群中选一台建立长连接。如果该Namesrv宕机,会自动连其他Namesrv。直到有可用的Namesrv为止。
生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。
生产者端的负载均衡
生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。
这里需要注意一点:假如某个Broker宕机,意味生产者最长需要30秒才能感知到。在这期间会向宕机的Broker发送消息。当一条消息发送到某个Broker失败后,会往该broker自动再重发2次,假如还是发送失败,则抛出发送失败异常。业务捕获异常,重新发送即可。客户端里会自动轮询另外一个Broker重新发送,这个对于用户是透明的。
4 集群说明
单个 Master
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
多 Master 模式(2m-noslave)
| brokerClusterName | brokerName | brokerRole | brokerId |
| —————– | ———- | ———— | ——– |
| DefaultCluster | broker-a | ASYNC_MASTER | 0 |
| DefaultCluster | broker-b | ASYNC_MASTER | 0 |1
2
3
4
5说明:一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。
启动说明:先启动 NameServer在机器 A,启动第一个 Master在机器 B,启动第二个 Master多 Master 多 Slave 模式,异步复制(2m-2s-async)
| brokerClusterName | brokerName | brokerRole | brokerId |
| —————– | ———- | ———— | ——– |
| DefaultCluster | broker-a | ASYNC_MASTER | 0 |
| DefaultCluster | broker-a | SLAVE | 1 |
| DefaultCluster | broker-b | ASYNC_MASTER | 0 |
| DefaultCluster | broker-b | SLAVE | 1 |1
2
3
4
5
6
7
8
9
10
11
12说明:每个 Master 配置一个 Slave,有多对Master-Slave,HA。采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
启动说明:
先启动 NameServer
在机器 A,启动第一个 Master
在机器 B,启动第二个 Master
在机器 C,启动第一个 Slave
在机器 D,启动第二个 Slave多 Master 多 Slave 模式,同步双写(2m-2s-sync)
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | SYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | SYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
1 | 说明:每个 Master 配置一个 Slave,有多对Master-Slave,HA。采用同步双写方式,主备都写成功,向应用返回成功。 |
5 集群部署案例-2m-2s-sync
5.1 环境说明
1) 软件及其机器
软件及版本 | 下载地址 | |
---|---|---|
系统 | centos7 | https://www.centos.org/download/ |
软件 | rocketmq-all-4.2.0 | https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip |
依赖 | jdk1.8+ | oracle官网 |
ip | hostname | 部署服务 | role/brokerid |
---|---|---|---|
192.168.59.2 | mqnamesrv-*.env.rocketmq.com | NameServer | No |
192.168.59.3 | mqnamesrv-*.env.rocketmq.com | NameServer | No |
192.168.59.4 | mqbroker-*.env.rocketmq.com | Broker | SYNC_MASTER/0 |
192.168.59.5 | mqbroker-*.env.rocketmq.com | Broker | SLAVE/1 |
192.168.59.6 | mqbroker-*.env.rocketmq.com | Broker | SYNC_MASTER/0 |
192.168.59.7 | mqbroker-*.env.rocketmq.com | Broker | SLAVE/1 表 5.2 |
绑定hosts或dns:
1 | 测试环境hosts |
主机命名说明:
主机名命名规范:项目名-随机数.环境.组件.公司/机房
例如:表5.2中第一行,项目名:mqnamesrv,部署环境:dev,ip:192.168.59.2,组件是rocketmq,命名主机名为:
mqnamesrv-192168059002.dev.rocketmq.com
该命名好处:通过主机名可以判断当前机器部署的服务以及部署环境、机房情况,方便在报警系统里或者cmdb中很快判断到该主机的的影响。
2) 多环境说明
在实际应用中都会涉及多环境的问题,比如有线下环境(dev)和生产环境(prod),不同环境的应用最好保持配置一致,减少各个每个环境的配置工作量。
Rocketmq各环境统一连接地址:
NAMESRV_ADDR=”nameserver1.rocketmq.test.com:9876;nameserver2.rocketmq.test.com:9876”
根据Rocketmq集群说明,其实最终只需暴露nameserver的地址给应用即可,因此,各个环境绑定各个环境对应的hosts/dns即可使用统一连接的地址。
3) 打包部署
编译
官方提供的是源码包,需要编译成二进制包:
1
2
3
4unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq打包
二进制包虽然可以直接运行,批量安装和批量管理以及不利于统一管理,这里我直接打成rpm包,规范目录(安装目录、日志目录、数据目录),指定rocketmq用户运行,设置服务自动启动等;
因为nameserver和broker基本上是一个包,只是启动命令不一样,因此,打包也是打成一个包了,根据启动命令和当前主机名判断是何服务。
1
2使用fpm打rocketmq,然后上传到yum仓库
fpm -s dir -t rpm -n apache-rocketmq --epoch 1 -a 'x86_64' -v 4.2.0 --iteration 1.el7 -C rocketmq/root -d 'jdk >= 1.8.0' -d 'git' --license 'Apache License, Version 2.0' --description "Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability." --no-rpm-sign --url 'http://rocketmq.apache.org/' --before-install rocketmq/install_before.sh --after-install rocketmq/install_after.sh --before-remove rocketmq/remove_before.sh部署
打包以后,部署非常简单,直接yum安装
1
2
3
4
5
6安装rocketmq
yum install apache-rocketmq
启动/停止/重启 NameServer 服务
systemctl start/stop/restart rocketmq-mqnamesrv
启动/停止/重启 Broker 服务
systemctl start/stop/restart rocketmq-mqbroker配置文件
NameServer没有配置文件,直接可以启动
Broker配置文件:配置基本上一致,需要更改下表的一些内容
| brokerClusterName | brokerName | brokerRole | brokerId |
| —————– | ———- | ———– | ——– |
| iotcls | broker01 | SYNC_MASTER | 0 |
| iotcls | broker01 | SLAVE | 1 |
| iotcls | broker02 | SYNC_MASTER | 0 |
| iotcls | broker02 | SLAVE | 1 |1
2
3
4
5
6
7
8
9
10
11#broker.conf.j2
brokerClusterName=iotcls
brokerIP1={{broker_name}}-{{broker_id}}.rocketmq.test.com
brokerIP2={{broker_name}}-{{broker_id}}.rocketmq.test.com
brokerName={{broker_name}}
brokerId={{broker_id}}
deleteWhen=04
fileReservedTime=48
brokerRole={{broker_role}}
flushDiskType=SYNC_FLUSH
namesrvAddr=nameserver1.rocketmq.test.com:9876;nameserver2.rocketmq.test.com:9876需要注意的是:
多网卡环境:需要配置brokerIP1(broker ip)和brokerIP2(ha ip),brokerIP1注册到NameServer,brokerIP2 这个ip是master和slave同步数据的ip,如果不配置,默认会选择第一个网卡。
BrokerName,master和slave组成一个broker group,通过broker name来区别是否是一个broker group。
ansible role 一键部署
1
ansible-playbook -i environments/dev/hosts rocketmq.yml
ansible role配置文件说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25rocketmq
├── README.md
├── defaults
│ └── main.yml
├── files
├── handlers
│ └── main.yml
├── meta
│ └── main.yml
├── tasks
│ ├── install.yml
│ ├── main.yml
│ ├── mqbroker.yml
│ └── mqnamesrv.yml
├── templates
│ ├── broker.conf.j2
│ ├── rocketmq-mqbroker.service
│ └── rocketmq-mqnamesrv.service
├── tests
│ ├── README.md
│ ├── etc_hosts
│ ├── inventory
│ └── test.yml
└── vars
└── main.yml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15#rocetmq.yml
- name: run rocketmq role for all hosts
hosts: rocketmq
roles:
- { role: commons, tags: ["hosts"] }
- name: run mqnamesrv
hosts: mqnamesrv
roles:
- { role: "rocketmq", rocket_type: "mqnamesrv"}
- name: run mqbroker
hosts: mqbroker
roles:
- { role: "rocketmq", rocket_type: "mqbroker"}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21hosts
[rocketmq:children]
mqnamesrv
mqbroker
[mqnamesrv]
192.168.59.2
192.168.59.3
[mqbroker]
192.168.59.3 broker_name="broker03" broker_id=0 broker_role=SYNC_MASTER
192.168.59.4 broker_name="broker01" broker_id=0 broker_role=SYNC_MASTER
192.168.59.5 broker_name="broker01" broker_id=1 broker_role=SLAVE
192.168.59.6 broker_name="broker02" broker_id=0 broker_role=SYNC_MASTER
192.168.59.7 broker_name="broker02" broker_id=1 broker_role=SLAVE
[mqbroker:vars]
PROJECT_NAME=mqbroker
[mqnamesrv:vars]
PROJECT_NAME=mqnamesrv1
2
3
4
5
6
7
8
9
10
11# broker.conf.j2
brokerClusterName=iotcls
brokerIP1={{broker_name}}-{{broker_id}}.rocketmq.test.com
brokerIP2={{broker_name}}-{{broker_id}}.rocketmq.test.com
brokerName={{broker_name}}
brokerId={{broker_id}}
deleteWhen=04
fileReservedTime=48
brokerRole={{broker_role}}
flushDiskType=SYNC_FLUSH
namesrvAddr=nameserver1.rocketmq.test.com:9876;nameserver2.rocketmq.test.com:98761
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36# role-rocketmq tasks/main.yml
# install jdk1.8
- include: ../../commons/tasks/jdk.yml
- name: install apache-rocketmq
yum: name=apache-rocketmq state=present
tags: install_apache-rocketmq
- name: Update mqnamesrv.service
template:
src: rocketmq-mqnamesrv.service
dest: "/etc/systemd/system/"
force: true
owner: root
group: root
mode: "0755"
notify: reload systemd
when: rocket_type == 'mqnamesrv'
- include: mqbroker.yml
when: rocket_type == 'mqbroker'
- name: enable apache-rocketmq
service:
name: "rocketmq-{{ rocket_type }}"
state: started
enabled: True
handlers:
- name: reload systemd
command: "systemctl daemon-reload"
- name: restart apache-rocketmq
service:
name: "rocketmq-{{ rocket_type }}"
state: restarted1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21# role-rocketmq tasks/mqbroker.yml
- name: Update mqbroker.service
template:
src: rocketmq-mqbroker.service
dest: "/etc/systemd/system/"
force: true
owner: root
group: root
mode: "0755"
notify: reload systemd
- name: update conf for mqbroker
template:
src: broker.conf.j2
dest: "/opt/apache-rocketmq/conf/broker.conf"
force: true
owner: root
group: root
mode: "0755"
notify: restart apache-rocketmq4) 集群扩容
rocketmq各个组件都支持横向扩容:
| 组件 | 扩容 |
| ———- | ——————— |
| Producer | 横向扩容,添加机器 |
| Consumer | 横向扩容,添加机器,数量<=队列数(分区) |
| NameServer | 横向扩容,无状态 |
| Broker | 横向扩容,新增topic会自动负载 |5) WEB管理(rocketmq-console)
通过web可以查看集群状态,查看topic信息以及创建更改topic,管理producer和consumer等。
1
2
3安装 && 启动
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar用户手册:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md