消息中间件RocketMQ4.X急速入门

奋斗吧
奋斗吧
擅长邻域:未填写

标签: 消息中间件RocketMQ4.X急速入门 博客 51CTO博客

2023-07-17 18:24:09 75浏览

消息中间件RocketMQ4.X急速入门,一文搞定RocketMQ全部知识点


文章目录

  • 1.RocketMQ4.X简介
  • 2.Springboot整合RocketMQ4.X
  • 3.RocketMQ4.X集群架构
  • 3.1.RocketMQ4.X集群模式架构
  • 3.2.RocketMQ主从模式搭建
  • 3.3.RocketMQ4.X主从同步总结
  • 4.RocketMQ4.X生产者配置
  • 4.1.生产者常见核心配置
  • 4.2.RocketMQ消息发送状态
  • 4.3.RocketMQ生产和消费消息重试处理
  • 4.4.RocketMQ发送消息的多种场景
  • 4.5.RocketMQ延迟消息介绍和应用
  • 4.6.RocketMQ生产者指定队列发送
  • 4.7.RocketMQ顺序消息讲解
  • 5.RocketMQ4.X消费者配置
  • 5.1.RocketMQ消费者核心配置
  • 5.2.消费者集群模式和广播模式
  • 5.3.Tag实战和消息过滤
  • 5.4.PushConsumer和PullConsumer
  • 6.消息队列Offset和CommitLog
  • 6.1.RocketMQ消息偏移量Offset
  • 6.2.RocketMQ消息存储CommitLog
  • 6.3.ZeroCopy零拷贝技术
  • 7.RocketMQ实现分布式事务
  • 7.1.分布式事务介绍
  • 7.2.RocketMQ分布式事务消息实战
  • 8.RocketMQ搭建双主双从架构

1.RocketMQ4.X简介

阿里开源消息队列RocketMQ介绍

  • Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。
  • 特点:
  • 支持Broker和Consumer端消息过滤,既可以在Broker端过滤消息,也可以在Consumer端过虑消息。
  • 支持发布订阅,点对点模型。
  • 支持消费端pull消息和Broker来push消息。
  • 单一队列百万消息、亿级消息堆积。
  • 支持单master节点,多master节点,多master多slave节点
  • 任意一点都是高可用,水平扩展,Producer、Consumer、队列都可以分布式
  • 消息失败重试机制、支持特定level的定时消息
  • 新版本底层采用Netty
  • 4.3x支持分布式事务
  • 适合金融类业务,高可用性跟踪和审计功能
  • 概念:
  • Producer:消息生产者
  • Producer Group:消息生产者组,发送同类消息的一个消息生产者组
  • Consumer:消费者
  • Consumer Group:消费同类消息的多个实例
  • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一主题下的不同业务的消息。
  • Topic:主题,如订单类消息,queue式消息的物理管理单位,而topic是逻辑管理单位,一个topic下可以有多个queue。默认自动创建4个,手动创建是8个。
  • Message:消息,每个message必须指定一个topic。
  • Broker:MQ程序,接收生产的消息,提供给消费者消费的程序,一个Broker就是一个MQ节点。
  • Name Server:给生产者和消费者提供路由信息,提供轻量级的服务发现、路哟与、元数据信息,可以多个部署,相互独立(比Zookeeper更轻量)。
  • Offset:偏移量,每个消费者都会维护一个当前最大的消费偏移量。
  • commit log:消费存储会写在Commit log文件里面,消息持久化存储的地方。

2.Springboot整合RocketMQ4.X

(1)创建SpringBoot项目,加入相关依赖

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>

(2)Message对象

  • topic:主题名称
  • tag:标签,用于过滤
  • key:消息唯一标识,可以是业务字段组合
  • body:消息体,字节数组

注意:发送消息到Broker,需要判断是否有此topic,启动Broker的时候,本地建议开启自动创建topic,生产环境加以关闭自动化创建topic。建议手工创建topic,如果靠程序自动创建,然后在投递消息,会出现延迟的情况。

概念模型:一个topic下面对应多个queue,可以创建Topic时指定,如订单类topic

(3)生产者发送消息编码实战

@Component
public class PayProducer{
    private String producerGroup = "pay_producer_group";
    public DefaultMQProducer producer;
    public PayProducer(){
        //创建Producer对象,设置生产者组
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址逗号隔开
        producer.setNamesrvAdder(JmsConfig.NAME_SERVER);
        //开启生产者
        start();
    }
    //得到Producer对象
    public DefaultMQProducer getProducer(){
        return this.producer;
    }
    
    /**
     * 对象在使用之前必须调用一次,只能初始化一次
     */
    public void start(){
        try{
             this.producer.start(); 
        }catch(MQClientException e){
             e.printStackTrace();    
        }
    }
     /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
    
}
public class JmsConfig {

    public static final String NAME_SERVER = "8.140.116.67:9876";

    public static final String TOPIC  = "pay_topic";

}
@RestController
@RequestMapping("api/v1/pay")
public class PayController {

    @Autowired
    private PayProducer payProducer;

    @RequestMapping("/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
		Message message = new Message(JmsConfig.TOPIC,"taga",text.getBytes());
        
        SendResult sendResult = payProducer.getProducer().send(message);
        
        System,out,println(sendResult);
        
        return sendResult;
		
    }

}

(4)常见错误一

MQClientException: No route info of this topic, TopicTest1
原因:Broker禁止自动创建Topic,且用户没有通过手工取创建此Topic,或者broker和nameServer网络不通
解决:sh bin/mqbroker -m 查看配置信息
autoCreateTopicEnable=true 则自动创建topic
centos7关闭防火墙 systemctl stop fitewalld
手动创建topic

(5)常见错误二

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多个网卡,rocketmq都会根据当前网卡选择一个ip使用,当你的机器有多个网卡时,很有可能会有问题,阿里云机器有两个网卡,因此需要broker.conf来配置当前公网ip,然后重启broker
broker.conf新增:brokerIP1=公网IP
在以配置文件的方式重启broker

(6)常见错误三

控制台查询不了数据,连接提示10909错误
原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
解决:阿里云安全组加上10909端口

(7)其他问题

#%E5%BC%82%E5%B8
%B8%E8%AF%B4%E6%98%8E
https://www.jianshu.com/p/bfd6d849f156

(8)消费者消费消息编码实战

@Component
public class PayConsumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public PayConsumer(){
        //new一个Consumer对象,这是ConsumerGroup
        consumer = new DefaultMQPushConsumer(consumerGroup);
        //设置NameServer地址
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //设置消费的策略,从最后一个开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            //订阅主题
            consumer.subscribe(JmsConfig.TOPIC,"*");
            //注册消息监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                    //业务逻辑处理
                    MessageExt message = msgs.get(0);
                    String body = new String(msgs.get(0).getBody());
                    String tags = msgs.get(0).getTags();
                    String keys = msgs.get(0).getKeys();
                    System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys);
                    //消费完成返回给broker消费的状态,成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

        } catch (MQClientException e) {
            e.printStackTrace();
        }
        try {
            //开启消费者
            consumer.start();
            System.out.println("consumer.start()...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }
}

(9)常见问题

1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 

2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, 	MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
	1、设置producer:  producer.setVipChannelEnabled(false);
	2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
		namesrvAddr = 192.168.0.101:9876
		brokerIP1 = 192.168.0.101

4、DESC: service not available now, maybe disk full, CL:
	解决:修改启动脚本runbroker.sh,在里面增加一句话即可:		
	JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
	(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)
	
常见问题处理

3.RocketMQ4.X集群架构

3.1.RocketMQ4.X集群模式架构

(1)单机模式

消息中间件RocketMQ4.X急速入门_数据

优点:本地开发测试用,配置简单,同步刷盘消息不会丢失

缺点:不可靠,服务宕机,导致服务不可用,数据丢失

(2)主从模式(异步复制、同步双写)

消息中间件RocketMQ4.X急速入门_数据_02

优点:同步双写不丢失数据,异步复制存在少量数据丢失,主节点宕机,从节点可以对外提供消费服务,但是不提供写服务。

缺点:主备有短暂的消息延迟,毫秒级,主节点宕机后,目前不支持自动切换,需要手动设置从节点成为主节点。

(3)双主、多主模式

消息中间件RocketMQ4.X急速入门_数据_03

优点:配置简单,可以根据配置RAID10磁盘阵列保证消息的可靠性,异步刷盘丢失少量消息

缺点:master宕机期间,未被消费的消息不能被消费,只有当节点恢复才会恢复消费

(4)双主双从,多主多从模式(异步复制)

消息中间件RocketMQ4.X急速入门_后端_04

优点:磁盘损坏,消息丢失的量少,消息消费的实时性不受影响,Master宕机后,会从Slave消费消息

缺点:主备消息同步由延迟,Master宕机会存在少量信息丢失

(5)双主双从,多主多从模式(同步双写)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kT8IjZQB-1667542108371)(图片\3.2(4).jpg)]

优点:同步双写方式,主备都写成功,才返回成功,服务可用性与数据的可用性都非常高

缺点:新跟那个相比异步复制要低,主节点宕机后不支持自动切换主机

(6)消息可靠性之同步、异步刷盘

异步刷盘:每个节点,不论是从节点还是主节点都会有一个服务内存数据和磁盘同步的过程,异步刷盘就是当数据到达节点内存后,就返回成功,采用异步复制的方式给磁盘同步数据。

同步刷盘:同步刷盘就是当节点中的数据全部同步到磁盘中时,才会返回确认成功。

(7)消息可靠性之同步、异步复制

同步复制:数据安全性搞,性能相对差一点

异步复制:数据安全性低,性能会高一点

最终建议:同步双写,异步刷盘

消息中间件RocketMQ4.X急速入门_java_05

3.2.RocketMQ主从模式搭建

机器列表

192.168.10.200
192.168.10.201

(1)修改RocketMQ(runserver.sh,runbroker.sh)

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"

(2)启动两个节点的nameServer

nohup sh runserver.sh

(3)编辑broker配置文件并启动两个broker

消息中间件RocketMQ4.X急速入门_java_06


消息中间件RocketMQ4.X急速入门_rocketmq_07


消息中间件RocketMQ4.X急速入门_java_08

主节点:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &

配置:
namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
从节点:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &

配置:
namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

消息中间件RocketMQ4.X急速入门_rocketmq_09

(4)安装控制台

修改事项
pom.xml  里面的rocketmq版本号
application.properties里面的nameserver

增加 rocketmq.config.namesrvAddr=192.168.159.129:9876;192.168.159.130:9876

mvn install -Dmaven.test.skip=true

java -jar rocketmq-console-ng-1.0.0.jar

3.3.RocketMQ4.X主从同步总结

  • Broker分为master和slave,一个master可以对应多个slave,一个slave只能对应一个master,master与slave通过相同的Broker Name来匹配,不同的broker id来定义时master还是slave。
  • Broker向所有的NameServer节点建立长连接,定时注册Topic和发送元数据信息。
  • NameServer定时扫描(默认2分钟)所有存货broker的连接,如果超时未响应则断开连接(心跳检测),但是consumer客户端感知不到,consumer定时30s从NameServer获取topic最新信息,所以broker不可用时,consumer最多也就30s就能发现broker宕机。
  • producer和consumer一样,在未发现broker宕机前发送的消息会失败
  • 只有master才会有写消息的操作,slave只能提供消费,同步的策略取决于master的配置
  • 客户端消费可以从master和slave消费,默认消费者都从master消费,如果master宕机后,客户端从NameServer中感知到Broker宕机,就会从slave消费,感知非实时,存在一定的之后性,slave不能保证master的消息100%同步过来,会有少量的消息丢失,但是一旦master恢复,未同步过去的消息会最终被消费掉。
  • 如果consumer实例的数量比message queue的总数量还要多,多出来的consumer将无法分到queue,也就无法消费消息,无法起到负载的作用,所以需要控制让queue的总数量大于等于consumer的数量。

4.RocketMQ4.X生产者配置

4.1.生产者常见核心配置

  • **compressMsgBodyOverHowmuch:**消息超过默认字节4096后进行压缩
  • **retryTimesWhenSendFailed:**失败重新发送的次数
  • **maxMessageSize:**最大消息配置,默认128k
  • **topicQueueNums:**主题下队列的数量,默认是4个
  • **autoCreateTopicEnable:**是否自动创建topic,开发建议为true,生产建议为false
  • **defaultTopicQueueNums:**自动创建的Topic创建的默认队列数
  • **autoCreateSubscriptionGroup:**是否允许Broker自动创建订阅组,建议线下开启,生产关闭。
  • **brokerClusterName:**集群名称
  • **brokerId:**0表示主节点,大于0表示从节点
  • **brokerIP1:**broker服务器地址,一定要配公网ip,否则外机访问不到
  • **brokerRole:**broker角色 ASYNC_MASTER /SYNC_MASTER /SLAVE
  • **deleteWhen:**每天执行删除过期文件的时间,默认是每天凌晨4点
  • **flushDiskType:**刷盘策略,默认为ASYNC_FLUSH(异步刷盘),另外是SYNC_FLUSH(同步刷盘)
  • **listenPort:**Broker监听的端口号
  • **mapedFileSizeCommitLog:**单个conmmitlog文件大小,默认是1GB
  • **mapedFileSizeConsumeQueue:**ConsumerQueue每个文件默认30w条,可以根据项目进行调整
  • **storePathRootDir:**存储消息以及一些配置信息的根目录,默认是${home}/store
  • **storePathCommitLog:**commitlog存储目录默认为${storePathRootDir}/commitlog
  • **storePathIndex:**消息索引存储路径
  • **syncFlushTimeout:**同步刷盘超时时间
  • **diskMaxUsedSpaceRatio:**检测可用的磁盘空间大小,超过后写入会报错

4.2.RocketMQ消息发送状态

Broker消息投递状态讲解

  • FLUSH_DISK_TIMEOUT
  • 没有在规定的时间内完成刷盘(刷盘策略为SYNC_FLUSH才会出现这个错误)
  • FLUSH_SLAVE_TIMEOUT
  • 主从模式下,broker的brokerRole是SYNC_MASTER,没有在规定时间内同步到slave
  • SLAVE_NOT_AVAILABLE
  • 主从模式下,broker的brokerRole是SYNC_MASTER,没有找到对应匹配的slave
  • SEND_OK
  • 发送成功,一切正常

4.3.RocketMQ生产和消费消息重试处理

(1)生产者Producer重试(异步和SendOneWay下配置无效)

  • 消息重投(保证数据的可靠性),本身内部支持重试,默认是2次,可以修改配置
//设置生产者发送broker失败重复发送的次数
producer.setRetryTimesWhenSendFailed(5);

(2)消费者重试

  • 消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等。
  • 重试间隔时间配置如下,默认重试16次
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 超过重试次数人工补偿
  • 消费端去重
  • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
  • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息.
//设置广播模式,默认集群模式,广播模式下失败不会重新消费,继续消费下一个
consumer.setMessageModel(MessageModel.BROADCASTING);

4.4.RocketMQ发送消息的多种场景

(1)SYNC

//默认是同步发送
SendResult sendResult = payProducer.getProducer().send(message);
  • 同步响应
  • 应用场景:重要通知邮件、报名短信通知、营销短信通知等等

(2)ASYNC

//
SendResult sendResult = payProducer.getProducer().send(message,new SendCallback(){
    //发送成功处理函数
    onSuccess(){}
    //异常处理函数
    onException(){}
});
  • 异步响应
  • 应用场景:对RT时间敏感,可以支持更高的并发,会i到成功触发向对应的业务,比如注册成功后通知信人优惠卷发放系统。

(3)ONEWAY

//单向发送
payProducer.getProducer().sendOneway(message);
  • 无需等待响应
  • 使用场景:主要是日志收集,使用与某ixe好事非常短,单对可靠性要寻求不高的场景,LogServer中常用,只负责发送消息,不关注结果。

汇总对比

发送方式

发送TPS

发送结果反馈

可靠性

同步发送(SYNC)



不丢失

异步发送(ASYNC)



不丢失

单向发送(ONEWAY)

最快


可能丢失

4.5.RocketMQ延迟消息介绍和应用

什么是延迟消息

Producer将消息发送到RocketMQ服务端,单是并不希望消息马上被消费,而是推迟到某一个时间点之后在去消费,目前支持固定的精度消息

源码:rocketmq-store > MessageStoreConfig.java 属性messageDelayLevel

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

使用message.setDelayTimeLevel(xxxx) //xxx是级别,1表示配置里面的第一个级别,2表示第二个级别

定时消息:目前rocketmq开源版本还不支持,商业版本有,两者是用场景类似

使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息。
  • 消息生产和消费有时间窗口要求,淘宝中超时支付关单

4.6.RocketMQ生产者指定队列发送

  • RocketMQ生产者之MessageQueueSelector指定队列发送
  • 应用场景:顺序消息,分摊负载
  • 默认topic下的queue数量是4,可以配置
producer.getProducer().send(message,new MessageQueueSelector(){
    select(List<MessageQueue> mqs, Message msg, Object arg){
				Integer queueNum = (Integer)arg;
				return mqs.get(queueNum);
			}
},0);
  • 支持同步,异步发送指定的MessageQueue
  • 选择的queue数量必须小于配置的,否则会出错
RestController
@RequestMapping("api/v1/pay")
public class PayController {

    @Autowired
    private PayProducer payProducer;

    @RequestMapping("/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        Message message = new Message(JmsConfig.TOPIC,"taga","6688",text.getBytes());

        /**
         * 异步发送到指定队列
         */
        payProducer.getProducer().send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                int queueNum = Integer.parseInt(arg.toString());
                return mqs.get(queueNum);
            }
        }, 3, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

        /**
         * 同步发送到指定队列
         */
       /* SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {

            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                //查找队列
                int queueNum = Integer.parseInt(arg.toString());
                return mqs.get(queueNum);
            }
        },0);
        System.out.println(sendResult);*/

        /**
         * 单向发送
         */
     /*   message.setDelayTimeLevel(3);

        SendResult sendResult = payProducer.getProducer().send(message);

        //payProducer.getProducer().sendOneway(message);

        System.out.println(sendResult);*/

        return new HashMap<>();
    }

}

4.7.RocketMQ顺序消息讲解

(1)什么是顺序消息:消息的生产和消费顺序一致

  • 全局顺序:topic下面全部消息都要有序
  • 性能要求不高,所有的消息严格按照FIFO原则进行发布和消费的场景,并行度成为消息的瓶颈,吞吐量不够
  • 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ中使用)
  • 性能要求高,电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单物流消息、订单交易成功消息都会按照先后顺序来发布消费
  • 顺序发布:对于指定的一个Topic,客户端将按照一定的先后顺序发送消息
  • 顺序消费:对于指定的一个Topic,按照一定的先后顺序消费接受的消息,机先发送的消息一定会被客户端接收到
  • 注意:
  • 顺序消息不支持广播模式
  • 顺序消息不支持异步发送

(2)RocketMQ中顺序消息的使用

  • 生产端保证发送消息的有序,且发送同一个topic下的queue里面
  • 生产端根据MessageQueueSelector可以自定义策略,根据同个业务的订单id放置到同个queue里面,如订单号取模,同一条订单的消息就会被放在同一个队列中
public MessageQueue select(List<MessageQueue> mqs,Message msg,Object arg){
    //订单id进行取模
    Long orderId  = Integer.parseLong(arg.toString());
    long index = orderId%mqs.size();
    return mqs.get((int)index);
}
  • 消费端要在保证消费同个topic里面的同个queue,不应该用MessageListenerConcurrently,应该使用MessageListenerOrderly,自带单线程消费消息,不能在Consumer端使用多线程去消费,消费端分配到queue数量是固定的,集群会锁住当前正在消费的队列集群的消息,所以会保证顺序消费。

(3)顺序消息实战

  • 生产者编码保证同步发送和发送指定队列
@RestController
@RequestMapping("api/v1/pay")
public class PayController {

    @Autowired
    private PayProducer payProducer;

    @RequestMapping("/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        List<ProducerOrder> producerList = ProducerOrder.getProducerList();

        for (ProducerOrder producerOrder : producerList) {

            Message message = new Message(JmsConfig.TOPIC,"",producerOrder.getOrderId()+"",producerOrder.toString().getBytes());

            SendResult send = payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long orderId = (Long)arg;
                    //订单号和队列集合长度取模
                    long index = orderId % mqs.size();
                    return mqs.get((int)index);
                }
            },producerOrder.getOrderId());

            MessageQueue messageQueue = send.getMessageQueue();
            int queueId = messageQueue.getQueueId();
            System.out.println("orderId:"+producerOrder.getOrderId()+"---orderType:"+producerOrder.getType()+"---queueId:"+queueId);
            System.out.println("---------------------------");
        }

        return new HashMap<>();
    }

}
  • 消费者编码用MessageListenerOrderly
concurrently多线程消费
@Component
public class PayConsumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

            consumer.subscribe("","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                    MessageExt message = msgs.get(0);

                    int times = message.getReconsumeTimes();

                    try{

//                        if(message.getKeys().equalsIgnoreCase("6688")){
//                            throw new Exception();
//                        }

                        String body = new String(msgs.get(0).getBody());
                        String tags = msgs.get(0).getTags();
                        String keys = msgs.get(0).getKeys();
                        System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }catch (Exception e){

                        if(times>=3){
                            System.out.println("消息失败,转入人工处理");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }

                        System.out.println("消息异常,重新投递");
                        System.out.println("消息重投次数:"+times);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
            consumer.start();
            System.out.println("consumer.start()...");

    }

}
orderly单线程消费(保证顺序消费必须用单线程消费)
@Component
public class PayOrderlyConsumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_orderly_consumer_group";

    public PayOrderlyConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

            consumer.subscribe(JmsConfig.TOPIC,"*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeConcurrentlyContext) {

                    MessageExt message = msgs.get(0);

                    int times = message.getReconsumeTimes();

                    try{

                        String body = new String(msgs.get(0).getBody());
                        String tags = msgs.get(0).getTags();
                        String keys = msgs.get(0).getKeys();
                        System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys);
                        return ConsumeOrderlyStatus.SUCCESS;
                    }catch (Exception e){
                        System.out.println("消息异常,重新投递");
                        System.out.println("消息重投次数:"+times);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            });
            consumer.start();
            System.out.println("consumer.start()...");

    }

}

Consumer会平衡分配queue的数量,并不是简单的禁止并发处理,而是为每个Consumer Queue加个锁,消费每个消息前,需要获取这个消息的所在锁,这样同个时间,同个queue不能并发处理,但是不同的queue的消息可以并发处理,采用分段锁Segment。

消息中间件RocketMQ4.X急速入门_JAVA_10

5.RocketMQ4.X消费者配置

5.1.RocketMQ消费者核心配置

  • consumeFromWhere
  • CONSUME_FROM_FIRST_OFFSET:初次从消息队列的头部开始消费,即历史消息全部消费一遍,后续在消费就是接着上次消费的进度来消费
  • CONSUME_FROM_LAST_OFFSET:初次从消息队列的尾部开始消费,及历史消息全部消费一遍,后续在消费就是接着上次消费的进度来消费
  • CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半小时以前,后须也是跟着上次消费的进度开始消费
  • allocateMessageQueueStrategy
  • 负载均衡算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
  • offsetStore
  • 消息消费进度存储器,offsetStore有两个策略
  • LocalFileOffsetStore本地存储消费进度的具体实现,给广播模式使用
  • RemoteBrokerOffsetStore远程存储消费进度的具体实现,给集群模式使用,将消费进度存储在broker中
  • consumeThreadMin
  • 最小消费线程池数量
  • consumeThreadMax
  • 最大消费线程池数量
  • pullBatchSize
  • 消费者去broker拉取消息时,一次拉去多少条,默认是32条
  • consumeMessageBatchMaxSize
  • 单次消费时一次性拉取多少条消息,批量消费接口才有用,默认是1
  • messageModel
  • 消费者消费模式,CLUSTERING默认是集群模式,BROADCASTING广播模式

5.2.消费者集群模式和广播模式

  • Topic下队列的奇偶数回影响Customer个数里面的消费数量
  • 如果是4个队列,8个消息,4个节点则会各消费2条,如果不等,则会负载均衡分配不均
  • 如果consumer实例的数量比message queue的总数量还多的话,那么多出来的consumer实例将无法分到queue,也就无法消费消息,无法起到负载均衡的作用,锁以要控制让queue的总数量大于等于consumer的数量。

集群模式(默认)CONUSMEING

  • Consumer实例平均分摊消费生产发送的消息
  • 例子:订单消息,一般只消费一次

广播模式 BROADCASTING

  • 广播模式下消费消息:投递到broker的消息会被每个Consumer进行消费,一条消息会被多个Consumer进行消费,广播模式下ConsumerGroup暂时无用
  • 例子:群公告、每个人都需要消费这个消息

通过setMessageModel()方法来进行切换消费者模式

MessageModel.CONSUMEING \ MessageModel.BROADCASTING

5.3.Tag实战和消息过滤

(1)讲解RocketMQ里面的Tag作用和消息过滤原理

  • 一个Message只有一个Tag,tag是二级分类
  • 举例:订单类----数码订单,服装订单
  • 过滤分为Broker端和Consumer端过滤
  • Broker端过滤,减少了无用的消息进行网络传输,但是增加了broker 的负担
  • Consumer端过滤,完全可以根据业务需要进行筛选,但是增加了无用的消息的传输
  • 一般监听是"*",或者指定Tag,||运算,SLQ92,FilterServer等
  • tag性能高,逻辑简单
  • SQL92性能较差,支持复杂的逻辑,MessageSelector.bySql
  • 语法:>,<,=,IS NULL, OR, AND ,NOT等,sql where后面的基本都支持
  • 注意:消费者订阅关系要一直,不然会消费混乱,甚至会消息丢失
  • 订阅者关系一致,订阅关系由topic和tag组成,同一个group name,订阅的topic和tag必须是一样的

常见错误:

The broker does not support consumer to filter message by SQL92

broker.conf配置文件中配置
enablePropertyFilter=true
备注,修改之后要重启Broker

(2)消息过滤

RocketMQ的消息过滤方式有别于其他的消息中间件,是在订阅时,在做过滤。

Consume Queue的存储结构

消息中间件RocketMQ4.X急速入门_后端_11

在Broker端进行Message Tag比对,先遍历Conusme Queue,如果存储的Message Tag与订阅Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer 。Consumer收到过滤的消息后,同样也是执行在broker端的操作,但是对别的真实的Message Tag字符串,而不是HashCode。

  • 在Conusme Queue中存储HashCode,是为了在Consume Queue中定长的方式存储,节约空间。
  • 过滤式不会访问Commit Log的数据,可以保证高效的过滤
  • 即使存在Hash冲突,也可以在Conusmer端进行修正

5.4.PushConsumer和PullConsumer

(1)消费模式

  • Push:实时性高,但增加服务端的负载,消费能力不同,如果推送的过快,消费端会出现很多问题
  • pull:消费者从server拉去数据,主动权在消费者端,可控性好,但是间隔时间不好设置,间隔太短,则空请求,间隔太长则消息不能及时处理
  • 长轮询:Client请求Server端也就是Broker,Broker会保持一段时间的长连接,默认是15s,超过15s则返回空,在进行重新请求。缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控否则会堆积一堆连接

(2)PushConsumer本质就是长轮询

  • 系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡
  • 在broker端可通过longPollingEnable=true来开启长轮询
  • 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
  • 服务端代码:broker.longing
  • 虽然是push,但是代码里大量用了pull,因为使用了长轮询方式达到了push的效果也有pull的效果

(3)PullConsumer需要自己维护Offset

  • 获取MessageQueue遍历
  • 客户端维护Offset,需要用本地存储Offset,存储内存、磁盘、数据库等
  • 处理不同状态的消息,FOUND(新消息)、NO_NEW_MSG(不是新消息)、OFFSET_ILLRGL(非法偏移量)、NO_MATCHED_MSG(筛选结果不匹配)四种状态
  • 灵活性高可控性强,但是编码复杂度会高
  • 优雅关闭:释放资源和保存Offset,需要程序自己保护好Offset,特别是异常处理的时候

6.消息队列Offset和CommitLog

6.1.RocketMQ消息偏移量Offset

(1)什么是offset

  • message queue是无限长的数组,一条消息进来下标就会增长1,小标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位这条消息,或者指示Consumer从这条消息开始消费。
  • message queue中maxOffset表示消息的最大offset,maxOffset并不是最新的那条Offset而是新的消息的offset+1,minOffset则是现存的最小的offset
  • fileReserveTime=48默认消息存储48消息后,消息就会被从磁盘中删除,message queue的min offset也就对应的增长,锁以比minOffset还要小的那些消息已经不在broker上了,就无法消费

(2)类型(父类是OffsetStore)

  • 本地文件类型
  • DefaultMQPushConsumer的BROADCASTING模式,各个Conusmer没有相互干扰,使用LocalFileOffsetStore,把Offset存储在本地
  • Broker代存储类型
  • DefalutMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
  • 主要是记录消息消费的偏移量,由多个消费者进行消费
  • 集群模式下采用RemoteBrokerOffsetStore,Broker控制offset的值
  • 广播模式下采用LocalFileOffsetStore,消费端存储,消费者控制
  • 建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果是PullConsumer则需要自己维护OffsetStore

6.2.RocketMQ消息存储CommitLog

(1)消息存储是由ConsumeQueue和CommitLog

  • ConsumeQueue是逻辑队列,CommitLog是真正存储消息文件的,存储的是指向物理存储地址
  • Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持续化到磁盘
  • 默认地址:store/consumequeue/{topicname}/{queueid}/fileName
  • CommitLog是消息文件的存储地址。CommitLog的生成规则是每个文件默认达到1G开始切割,CommitLog文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte,当着文件满了,第二个文件名字为00000000001073741824,起始便宜量为1073741825=上一个最大的偏移量+1,消息存储的时候会顺序写入,当文件满了则写入下一个文件。
  • Broker里面多个Topic,一个Topic中有多个MessageQueue,每个MessageQueue对应一个ConusmeQueue,ConsumeQueue里面记录的是消息在CommitLog里面的物理存储地址

6.3.ZeroCopy零拷贝技术

(1)高效原因

  • CommitLog顺序写,存储了MessageBody、MessageKey、Tag等消息
  • ConsumeQueue随机读+操作系统PageCache+零拷贝技术ZeroCopy

(2)Linux将一个File文件发送出去(Linux有两个上下文,内核态,用户态)

File经历了4次copy

  • 调用read,将文件从磁盘拷贝到了kernel内核态
  • CPU控制kernel态的数据copy到用户态
  • 调用write时,user态下的内容会copy到内核态的socket的buffer中
  • 最后将内核态socket buffer的数据copy到网卡设备中传送

(3)ZeroCopy

  • 请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区跟用户缓冲区的拷贝,从而减少CPU的开销和小勺kernel和user模式的上下文切换

(4)对应的零拷贝技术有mmap和sendfile

  • mmap:小文件传输快
  • sendfile:大文件传输快

7.RocketMQ实现分布式事务

7.1.分布式事务介绍

(1)什么是分布式事务

  • 来源:单体应用->拆分成分布式应用
  • 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保证。

(2)常见的解决方案

  • 2PC:两阶段提交,基于XA协议
  • TCC:Try(尝试)、Confirm(证实)、Cancel(取消)

(3)框架

  • GTS->开源Fescar:地址:https://github.com/alibaba/fescar
  • LCN:地址:https://github.com/codingapi/tx-lcn

(4)RocketMQ分布式事务消息

  • RocketMQ提供分布式事务功能,通过RocketMQ事务消息道道分布式事务的一致性。

(5)半消息Half Message

  • 暂时不能消费的消息,是Producer投递到Broker的消息,但是服务端还未收到生产者的二次确认,此时该消息被标记为“暂不能投递状态”,也就是不能投递给消费者,处于该状态的消息为半消息。

(6)整体交互流程

消息中间件RocketMQ4.X急速入门_rocketmq_12

  • Producer向Broker发送消息
  • 服务端将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为半消息
  • 发送方执行本地事务逻辑
  • 发送方(SpringBoot应用)根据本地事务的结果向服务器发送二次确认(Commit或者Rollback),服务端收到Commit状态则将版消息标记为可投递,订阅放最终消费该消息,如果收到Rollback状态,则订阅方不会消费该消息,三天后Broker进行删除该消息
  • 在断网或者应用重启的情况下,也就是发送方没有给服务器提交二次确认的结果,这回服务端会经过固定时间对发送方的这条消息进行回查
  • 发送方收到消息回查后,需要检查对应用消息的本地事务执行的最终结果
  • 发送方检查得到本地事务的最终状态在进行二次确认,服务端仍按照Commit或者rollback结果进行相应的处理

(7)RocketMQ事务消息的状态

  • COMMIT_MESSAGE:提交事务的消息,消费者可以消费此消息
  • ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能在消费
  • UNKNOW:broker需要回查确认消息的状态

(8)关于事务消息的消费

  • 事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息被consumer收到

7.2.RocketMQ分布式事务消息实战

(1)编写TransactionListenerImpl类实现TransactionListener接口

public class TransactionListenerImpl implements TransactionListener {

    /**
     * 本地事务方法
     * @param msg 消息体
     * @param arg 附加参数
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        System.out.println("executeLocalTransaction.....");

        System.out.println("tag:"+msg.getTags());

        System.out.println("transactionId:"+msg.getTransactionId());

        System.out.println("body:"+msg.getBody().toString());

        if("1".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if("2".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else if("3".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.UNKNOW;
        }

        return null;
    }

    /**
     * 回查方法
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        System.out.println("checkLocalTransaction.....");

        System.out.println("tag:"+msg.getTags());

        System.out.println("transactionId:"+msg.getTransactionId());

        System.out.println("body:"+msg.getBody());

        return LocalTransactionState.COMMIT_MESSAGE;
    }


}

(2)编写TransactionProdcuer类

@Component
public class TransactionProducer {

    private String producerGroup = "transaction_producer_group";

    private TransactionListener transactionListener = new TransactionListenerImpl();

    private TransactionMQProducer producer;

    /**
     * 设置自定义线程池
     * corePoolSize:池中锁保存的核心线程数
     * maximumPoolSize:池中允许的最大线程数
     * keepActiveTime:非核心线程空闲等待新任务的最长时间
     * timeUnit:keepActiveTime参数的时间单位
     * blockingQueue:任务队列
     */
    ExecutorService executorService = new ThreadPoolExecutor(2, 5,
            100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    public TransactionProducer(){
        producer = new TransactionMQProducer(producerGroup);

        //设置生产者发送broker失败重复发送的次数
        producer.setRetryTimesWhenSendFailed(5);

        //指定NameServer地址,多个地址;隔开
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);

        //设置事务监听器
        producer.setTransactionListener(transactionListener);

        //设置线程池
        producer.setExecutorService(executorService);

        start();
    }

    public TransactionMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 对象在使用之前必须调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }

}

(3)TransactionController编写

@RestController
@RequestMapping("api/v1/tran")
public class TransactionController {

    @Autowired
    private TransactionProducer transactionProducer;

    @RequestMapping("/pay_cb1")
    public Object callback1(String tag,String otherArgs) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        Message message = new Message(JmsConfig.TOPIC,tag,"",tag.getBytes());

        SendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(message,otherArgs);

        System.out.println(sendResult);

        return new HashMap<>();
    }

}

8.RocketMQ搭建双主双从架构

  • 4台机器,2态部署NameServer,4台部署Broker,双主双从同步双鞋,异步刷盘
  • 环境准备:jdk、maven、rocketmq

(1)机器列表

server1 ssh root@192.168.10.200   部署nameServer  Broker-a
server2 ssh root@192.168.10.201   部署nameServer	 Broker-a-s
server3 ssh root@192.168.10.202				  Broker-b
server4 ssh root@192.168.10.203				  Broker-b-s

(2)修改RocketMQ配置文件(启动内存配置,runbroker.sh和runserver.sh)

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"

(3)修改RocketMQ的配置文件并启动

broker-a主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &

namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-a从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &

namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &

namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &

namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer


增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876

mvn install -Dmaven.test.skip=true

java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo

    autoCreateTopicEnable=true
    #是否允许自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir=
#storePathCommitLog

```bash
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &

namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer


增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876

mvn install -Dmaven.test.skip=true

java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo


好博客就要一起分享哦!分享海报

此处可发布评论

评论(0展开评论

暂无评论,快来写一下吧

展开评论

您可能感兴趣的博客

客服QQ 1913284695