第12章 Spring Boot整合RabbitMQ(2次课)
分类: springboot 专栏: springboot3.0新教材 标签: rabbitmq
2024-02-19 19:34:56 1795浏览
消息中间件是基于队列与消息传递的技术,是在网络中为系统提供可靠的消息传递的软件系统,rabbitmq是常用的消息中间件,
简介
RabbitMQ是个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。翻译过来就是高级消息队列协议。说白了mq整体上是一个生产者与消费者模型,主要负责接收、存储的转发消息。可以把消息传递的过程想象成发货人将一个快件送到快递站,快递公司暂时存储后发货并最终将快件通过快递员送到收件人的手上,rabbitmq就好比由快件、快递站和快递员组成的一个系统,该系统的两端还维系着发货人和收件人,同样rabbitmq对应有消息的生产者和消息的消费者
应用场景
异步处理
案例:用户注册成功后,给该用户发送邮件和短信提醒其注册成功
采用消息队列后可以增强用户体验,让用户更快地得到响应。
应用解耦
当用户下单后要对库存系统进行一系列增删改查操作。传统方式的话耦合性太高。
或者是两个系统压根不是同一种语言开发的,比如一个是java开发的,一个是Python开发的,也可以用消息队列的方式做到解耦。
流量削峰
案例:10万人秒杀1万个奖品。
这里就可以创建一个1万个座位的消息队列,开始活动时,当这一万个座位已经抢完后,剩下的9万人再点击秒杀按钮的时候立马返回秒杀失败谢谢参与的提醒,而占到座位的那1万个客户再由队列慢慢处理消化。
核心概念
消息message
类似快件
由消息头和消息体组成。
消息体是不透明的,而消息头是由一系列的可选属性组成的。
这些属性routing-key (路由键)、priority (相对于其他消息的优先权)、delivery-mode (指出
该消息可能需要持久性存储)等。
消息的生产者publisher
类似发件人
也是一个向交换器发布消息的客户端应用程序。
交换器exchange
类似物流公司——顺丰
用来接收生产者发送的消息并将这些消息路由给到服务器中队列。
Exchange有4种类型: direct(默认), fanout, topic,和headers,不同类型的Exchange转发消息的策略有所区别。
消息队列queue
类似菜鸟驿站
用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。
一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
绑定binding
用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。
网络连接connection
比如一个TCP连接。
信道 channel
多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这都是通过信道完成,因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所了信道的概念,以复用一条TCP连接。
消费者consumer
收件人
表示一个从消息队列中取得消息的客户端应用程序。
虚拟主机virtual host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一 个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
服务器实体broker
表示消息队列服务器实体
运行机制
核心就是交换机和绑定规则
原理图
单播
广播
主题
Windows下安装rabbitmq(了解)
参考文章:http://www.taodudu.cc/news/show-2912695.html
启动rabbitmq失败的话怎么解决:
我这边遇到的问题解决方法是:
进到RabbitMQ安装目录下的sbin下,注意:=后面的是写对应你的安装目录
set RABBITMQ_BASE=D:\RabbitMQ\rabbitmq_server-3.7.3\sbin
启动rabbitmq之前必须设置下这个才行(某前不大清楚啥情况)
启动rabbitmq
rabbitmq-server.bat
参考文章:http://t.csdnimg.cn/ybPQ1
最终效果:
默认账号密码都是guest
docker安装rabbitmq(推荐)
- 先安装docker
步骤如下:
- 检查系统内核版本。确保CentOS 7系统的内核版本高于3.10,可以通过命令`uname -r`查看当前的内核版本。
- 使用root权限登录CentOS。确保yum包更新到最新,使用命令
sudo yum update
- 卸载旧版本的Docker(如果安装过旧版本的话)。使用命令`sudo yum remove docker docker-common docker-selinux docker-engine`。
- 安装必要的软件包。使用命令`sudo yum install -y yum-utils device-mapper-persistent-data lvm2`。
- 设置Docker的yum源。可以使用阿里云的镜像源,使用命令`sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo`或`sudo yum-config-manager --add-repohttp://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo`。
- 查看所有仓库中所有Docker版本,并选择特定版本安装。使用命令`sudo yum list docker-ce --showduplicates | sort -r`。
- 安装Docker。使用命令`sudo yum install docker-ce`,如果需要指定版本安装,可以使用命令`sudo yum install docker-ce-18.03.1.ce-1.el7.centos`。
- 启动并加入开机启动。使用命令`sudo systemctl start docker`和`sudo systemctl enable docker`。
- 验证安装是否成功。使用命令`docker version`,如果显示client和service两部分,表示Docker安装启动都成功了
下载镜像
docker pull rabbitmq:3.11.13-alpine
创建并启动容器
docker run -d -p 15672:15672 -p 5672:5672 --name mq 镜像ID
遇到的问题:
· 防火墙关闭了,但宿主机还是访问不到mq的15672的页面解决方案:
https://blog.csdn.net/weixin_44912902/article/details/129709071
· 访问到页面了但登录进去点击交换器报错Management API returned status code 500
https://blog.csdn.net/qq_38080370/article/details/124749083
docker的使用,查看所有镜像之类的命令熟悉一下。
参考文章:https://www.jf3q.com/article/detail/3730
Spring Boot 整合 RabbitMQ
- 创建生产者
- 创建队列
- 创建交换机并绑定路由键
- 创建消费者监听和异步接收消息
- 发送消息
java使用流程
1.pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置
spring.rabbitmq.host=192.168.56.15
3.创建交换机-消息队列-绑定关系
方式一:配置类的方式
直连交换机
//单播的形式
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
//durable是否实例化持久化,autodelete是否自动删除
DirectExchange directExchange = new DirectExchange("email_direct");//durable是否实例化,autodelete是否自动删除
return directExchange;
}
@Bean
public Queue emailQueue() {
Queue queue = new Queue("email_queue");
return queue;
}
@Bean
Binding bindingEmailQueue(){
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email_direct_queue");
}
}
方式二:AmqpAdmin
RabbitMQ系统管理功能组件;创建和删除queue exchange binding
@Autowired
AmqpAdmin amqpAdmin;
@Test
void testamqp(){
//创建一个Direct的交换器
//amqpAdmin.declareExchange(new DirectExchange("amqp.exchange"));
//创建一个永久的队列
/*amqpAdmin.declareQueue(new Queue("amqp.queue",true));*/
//创建一个绑定规则
amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,"amqp.exchange","amqp.hh",null));
}
4.消息发送者
@Resource
RabbitTemplate rabbitTemplate;
//消息提供者
@Test
void testSend() {
rabbitTemplate.convertAndSend("email_direct","email_direct_queue","141134487@qq.com");
}
这里注意下:如果发送的是对象或者list之类的这种数据的话,要考虑下序列化的问题,默认采用的是jvm的序列化方式,修改成json序列化方式
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
5.消息消费
@Component
public class EmailConsumer {
//模拟多个消费者
@RabbitListener(queues = "email_queue")
public void show(String message){
System.out.println("客户1接收到的邮件消息:"+message);
}
@RabbitListener(queues = "email_queue")
public void show2(String message){
System.out.println("客户2接收到的邮件消息:"+message);
}
}
如果是有多个消费者同时监听一个队列的话,默认是采取轮询的机制。比如上面的代码就是第一次客户1接收消息第二次的时候就是客户2接收消息了。
广播交换机
创建交换机、队列、绑定关系
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange newsExchange() {//公司新闻交换机
//durable是否实例化持久化,autodelete是否自动删除
FanoutExchange fanoutExchange = new FanoutExchange("news_fanout");
return fanoutExchange;
}
@Bean
public Queue itQueue() {//技术部新闻队列
Queue queue = new Queue("it_news_queue");
return queue;
}
@Bean
public Queue marketQueue() {//市场部新闻队列
Queue queue = new Queue("market_news_queue");
return queue;
}
@Bean
Binding bindingQueue1() {//绑定关系
return BindingBuilder.bind(itQueue()).to(newsExchange());
}
@Bean
Binding bindingQueue2() {//绑定关系
return BindingBuilder.bind(marketQueue()).to(newsExchange());
}
}
发送消息
//测试广播
@Test
void testFanout(){
rabbitTemplate.convertAndSend("news_fanout", null,"杰凡it放假10天");
}
接收消息
@Component
public class NewsConsumer {
@RabbitListener(queues = "it_news_queue")
public void show(String message){
System.out.println("技术部接收到的消息:"+message);
}
@RabbitListener(queues = "market_news_queue")
public void getmes(String message){
System.out.println("市场部接收到的消息:"+message);
}
}
主题交换机
参考文章:
创建交换机、队列、绑定关系
@Component
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange() {
//durable是否实例化持久化,autodelete是否自动删除
TopicExchange directExchange = new TopicExchange("exchange.topic");
return directExchange;
}
//湖南的消息队列
@Bean
public Queue hunanQueue() {
Queue queue = new Queue("hunan.queue");
return queue;
}
//湖北的消息队列
@Bean
public Queue hubeiQueue() {
Queue queue = new Queue("hubei.queue");
return queue;
}
//新闻的消息队列
@Bean
public Queue newsQueue() {
Queue queue = new Queue("news.queue");
return queue;
}
//天气的消息队列
@Bean
public Queue weatherQueue() {
Queue queue = new Queue("weather.queue");
return queue;
}
@Bean
Binding bindingChinaQueue(){
//#:代表0个或多个关键词
//*:代表1个单词
return BindingBuilder.bind(hunanQueue()).to(topicExchange()).with("hunan.#");
}
@Bean
Binding bindingJapanQueue(){
return BindingBuilder.bind(hubeiQueue()).to(topicExchange()).with("hubei.#");
}
@Bean
Binding bindingNewsQueue(){
// return BindingBuilder.bind(newsQueue()).to(topicExchange()).with("#.news");
return BindingBuilder.bind(newsQueue()).to(topicExchange()).with("*.news");
}
@Bean
Binding bindingWeatherQueue(){
return BindingBuilder.bind(weatherQueue()).to(topicExchange()).with("#.weather");
}
}
发送消息
//测试主题
@Test
void testTopic(){
rabbitTemplate.convertAndSend("exchange.topic", "hunan.yueyang.news","湖南岳阳的新闻");
// rabbitTemplate.convertAndSend("exchange.topic", "hubei.news","湖北新闻");
// rabbitTemplate.convertAndSend("exchange.topic", "hunan.weather","湖南天气");
// rabbitTemplate.convertAndSend("exchange.topic", "hubei.weather","湖北天气");
}
接收消息
@Component
public class TopicConsumer {
@RabbitListener(queues = "hunan.queue")
public void getHunan(String message){
System.out.println("监听湖南的信息:"+message);
}
@RabbitListener(queues = "hubei.queue")
public void getHubei(String message){
System.out.println("监听湖北的消息:"+message);
}
@RabbitListener(queues = "news.queue")
public void getNews(String message){
System.out.println("监听新闻消息:"+message);
}
@RabbitListener(queues = "weather.queue")
public void get(String message){
System.out.println("监听天气消息:"+message);
}
}
消息发送确认
由于网络可能出现故障,不能保证发送的消息都能够到达对方或由它成功地处理,那mq是怎么知道消息到底有没有被消费者消费呢?生产者是怎么知道自己发送的消息是不是真的已经发送到mq中了呢?
消息确认是保证消息可靠性传递的重要步骤,消息确认又包含消息发送确认和消息接收(消费)确认。
消息发送确认,是指生产者投递消息后,如果mq收到消息,则会给生产者一个应答,生产者接收应答,用来确定这条消息是否正常地被发送到mq,这种方式也是消息的可靠性投递的核心保障。
消息发送确认又分为生产者到交换器的确认和交换器到队列的确认。
1. 生产者到交换器的确认
通过实现ConfirmCallback回调接口,当消息发送到mq后触发回调,此时需要确认消息是否到达mq服务器,也就是只确认是否正确到达交换机
- 开启发布者确认
#确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirm-type=correlated
- 在生产者项目中创建配置类,实现ConfirmCallback接口
@Configuration
public class ConfirmConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//ack表示发送结果,如果成功则返回true,否则返回false
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback相关数据:"+correlationData);
System.out.println("ConfirmCallback确认情况:"+ack);
System.out.println("ConfirmCallback原因:"+cause);
}
});
return rabbitTemplate;
}
}
- 测试错误发送的情况(把消息推送到一个没有创建且没有配置过的交换器)
//测试一个错误的交换器
@Test
void testError(){
rabbitTemplate.convertAndSend("exchange_null","email_direct_queue","1913284695@qq.com");
}
ConfirmCallback相关数据:null
ConfirmCallback确认情况:false
ConfirmCallback原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange_null' in vhost '/', class-id=60, method-id=40)
- 测试正确发送的情况
2. 交换器到队列的确认
通过实现ReturnCallback回调接口,启动消息失败返回,此接口是在交换机路由不到队列时触发
- 启动消息失败返回
# 确认消息已经发送到队列
spring.rabbitmq.publisher-returns=true
- 在生产者项目中mq配置类创建ReturnsCallback回调函数
//setReturnCallback已过时,可以使用setReturnsCallback代替
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("ReturnsCallback消息:"+returnedMessage);
}
});
- 测试路由不到的情况(在单播配置类中创建一个名为AloneDirectExchange的交换器,但不给它绑定队列)
@Bean
DirectExchange AloneDirectExchange() {
return new DirectExchange("AloneDirectExchange");
}
//测试一个没绑定路由的交换器(也就意味着无法找到对应队列)
@Test
void testError2(){
rabbitTemplate.convertAndSend("AloneDirectExchange","email_direct_queue","1913284695@qq.com");
}
ConfirmCallback相关数据:null
ConfirmCallback确认情况:true
ConfirmCallback原因:null
ReturnsCallback消息:ReturnedMessage [message=(Body:'1913284695@qq.com' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=AloneDirectExchange, routingKey=email_direct_queue]
- 测试正常发送的情况
不会触发ReturnsCallback,只触发ConfirmCallback回调
ConfirmCallback相关数据:null
ConfirmCallback确认情况:true
ConfirmCallback原因:null
消息接收确认:
消费者消费消息后需要对 RabbitMQ Server进行消息 ACK 确认,默认情况下如果一个 Message被消费者所正确接收则会被从 Queue中移除。消费者入如何通知rabbit消息消费成功了呢?
消息通过ack确认是否被正确接收,可以手动去ack或自动ack
一共有三种消息确认模式:
1. AcknowledgeMode.NONE: 自动确认
2. AcknowledgeMode.AUTO: 根据情况确认
3. AcknowledgeMode.MANUAL: 手动确认
主要学习手动确认。
- 消费者项目配置文件
# 开启消费者手动确认机制
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL
- 消费者项目配置类
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
- 消费者监听队列
这段代码在没有发生异常的时候可以正常消费消息,使用basicAck进行手动确认消息,通知服务器消息已经被正常处理,服务器手打确认消息后将正常删除队列中的消息。
但如果发生异常,basicNack方法将通知服务器消息未能正常处理,服务器将重新发送消息,但显然如果这段程序重新执行,则结果仍然是异常,再次重发,程序将不断重发,死循环。basicReject方法也一样,解决方法是将basicNack的第三个参数或者basicReject的第二个参数由true改成false,这样就不再重发了。
@RabbitListener(queues = "email_queue")
public void show(String email, ReceiveMessage, Message message, Channel channel) throws IOException {
try {
System.out.println("消费消息:" + email);
// int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("消费消息确认" + message.getMessageProperties().getConsumerQueue() + ",接收到了回调方法");
} catch (Exception e) {
//重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("尝试重发:" + message.getMessageProperties().getConsumerQueue());
//requeue =true 重回队列,false 丢弃
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
// TODO 该消息已经导致异常,重发无意义,应自己实现补偿机制
}
}
商品秒杀实战
当有大量用户同时抢购商品时,按正常流程, 数据库无法实时处理那么多数据,为了应对这个流量商峰,采用 RabbitMQ 进行流量削峰,首先将瞬时请求数据发送到 RabbitMQ 缓存,然后 RabbitMQ再根据顺序(不紧不慢地)将订单数据一个一个取出来连接数据库进行处理。
案例代码:
1.模拟200人秒杀商品,写到一个测试类就行(不要急着操作数据库,而是把用户id直接作为消息生产者发送的信息扔到队列中去)
核心代码如下:
初始化交换机、队列和绑定关系
@Configuration
public class DirectConfig {
@Bean
DirectExchange directExchange(){
// this(name, true, false); 第三个参数是是否自动删除
return new DirectExchange("miaosha_exchange");
}
@Bean
Queue queue(){
return new Queue("miaosha_queue");
}
@Bean
Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with("miaosha_exchange_queue");
}
}
//模拟两百人同时抢商品
for(int i=1;i<=200;i++) {
final int userId=i;
new Thread(new Runnable() {
@Override
public void run() {
//将消息封装为Map
String messageId = String.valueOf(UUID.randomUUID());
String messageData = userId+"";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息绑定路由键:miaosha_exchange_queue 发送到交换机miaosha_exchange
rabbitTemplate.convertAndSend("miaosha_exchange", "miaosha_exchange_queue", map);
}
}).start();
}
2.然后建消息消费者去慢慢消化去,消费的时候要把库存减去同时生成秒杀记录,
核心代码如下:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Product {
@TableId(value="id",type= IdType.AUTO)
private int id;
@TableField("productNo")
private String productNo;
private int total;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("productrecord")
@Accessors(chain = true)
public class ProductRecord {
@TableId(value="id",type= IdType.AUTO)
private int id;
@TableField("productNo")
private String productNo;
@TableField("userId")
private int userId;
}
server.port=80
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.datasource.url=jdbc:mysql:///miaosha?serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=123456
@Component
@Slf4j
public class MiaoShaConsumer {
@Autowired
ProductService productService;
@Autowired
ProductRecordService productRecordService;
@RabbitListener(queues = "miaosha_queue")
public void miao(Map ReceiveMessage){
System.out.println("消费者收到消息:" + ReceiveMessage.get("messageData"));
Integer userId= Integer.valueOf(ReceiveMessage.get("messageData").toString());//获取消息中的关键数据,即用户编号
//假设秒杀的就是id为1 的这个产品
Product product = productService.getById(1);//获得产品信息
if (product != null && product.getTotal() > 0) { //判断库存
//更新库存表,库存量减少1。返回1说明更新成功。返回0说明库存已经为0
product.setTotal(product.getTotal()-1);
boolean flag = productService.saveOrUpdate(product);
if(flag){
//插入记录
productRecordService.save(new ProductRecord().setProductNo(product.getProductNo()).setUserId(userId));
//发送短信
log.info("用户{}抢单成功", userId);
}else {
log.error("用户{}抢单失败", userId);
}
} else {
log.error("用户{}抢单失败", userId);
}
}
}
测试发现:有的用户抢购成功,有的抢购失败。再观察数据库, 表 product中商品的数量已经清0了,即被抢购完毕。Productrecord表中记录了抢到商品的用户。
好博客就要一起分享哦!分享海报
此处可发布评论
评论(1)展开评论
您可能感兴趣的博客