RocketMQ普通消息的同步、异步、单向消息放送方式详解
标签: RocketMQ普通消息的同步、异步、单向消息放送方式详解 JavaScript博客 51CTO博客
2023-06-24 18:24:02 247浏览
普通消息-三种消息发送方式
1 发送同步消息
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

代码演示
package com.neu.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/19/18:31
* @Description:
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
//实例化生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group-test");
//设置NameServer地址
producer.setNamesrvAddr("node1:9876");
//启动Producer实例
producer.start();
//同步方法发送10条消息
for (int i = 0; i < 10; i++) {
//创建消息,并指定Topic,Tag和消息体
Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ" + i).getBytes());
//同步发送方式
SendResult sr = producer.send(message);
//通过sr返回消息是否送达
System.out.printf("%s%n",sr);
}
//如果不再发送消息,关闭producer
producer.shutdown();
}
}
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B3A0000, offsetMsgId=C0A858A100002A9F00000000000012AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B3F0001, offsetMsgId=C0A858A100002A9F000000000000139C, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B410002, offsetMsgId=C0A858A100002A9F000000000000148A, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B420003, offsetMsgId=C0A858A100002A9F0000000000001578, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B440004, offsetMsgId=C0A858A100002A9F0000000000001666, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B440005, offsetMsgId=C0A858A100002A9F0000000000001754, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B460006, offsetMsgId=C0A858A100002A9F0000000000001842, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B480007, offsetMsgId=C0A858A100002A9F0000000000001930, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B480008, offsetMsgId=C0A858A100002A9F0000000000001A1E, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=7F000001217818B4AAC260B91B4A0009, offsetMsgId=C0A858A100002A9F0000000000001B0C, messageQueue=MessageQueue [topic=TopicTest, brokerName=node1.itcast.cn, queueId=3], queueOffset=4]
- msgId
消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息
- sendStatus
发送的标识:成功,失败等
- queueId
queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。
- queueOffset
Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

2 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

代码演示
package com.neu.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/19/18:54
* @Description:
*/
public class AsynProducer {
public static void main(String[] args) throws Exception {
//实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("group-test01");
//设置NameServer的地址
producer.setNamesrvAddr("node1:9876");
//启动Producer
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
//创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest01", "TagB", "OrderID888",
("Hello world").getBytes(RemotingHelper.DEFAULT_CHARSET));
//SendCallback接受异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n",sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("% -10d Exception %s %n",index,e);
e.printStackTrace();
}
});
}
Thread.sleep(1000);
//如果不再发送消息,关闭Producer实例
producer.shutdown();
}
}
发送结果分析跟发送同步消息相同。
3 单向发送
这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

package com.neu.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/19/19:33
* @Description:
*/
public class OnceProducer {
public static void main(String[] args) throws Exception {
//实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("group-test02");
//设置NameServer地址
producer.setNamesrvAddr("node1:9876");
//启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
//创建消息,并指定Topic,Tag,和消息
Message msg = new Message("TopicTest02", "TagC",
("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
//关闭producer
producer.shutdown();
}
}

4 消息发送的权衡

好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论




