SpringBoot集成MQTT入门
标签: SpringBoot集成MQTT入门 博客 51CTO博客
2023-07-19 18:24:33 168浏览
一、新建消息发布者服务mqtt_publish
1.项目的pom文件如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.13</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>mqtt_publish</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mqtt_publish</name>
<description>mqtt_publish</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.在application.properties添加mqtt的相关配置
#端口
server.port=8091
#MQTT-服务端地址
publish.mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
publish.mqtt.username=admin
#MQTT-服务端密码
publish.mqtt.password=123456
#MQTT-是否清理session
publish.mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
publish.mqtt.clientid=mqtt_publish
#当前客户端的主题
publish.mqtt.defaultTopic=topic
#发送超时时间
publish.mqtt.timeout=1000
#心跳时间
publish.mqtt.keepalive=10
#连接超时时间
publish.mqtt.connectionTimeout=3000
3.创建MQTT服务端连接工具类,用于加载配置参数
package com.example.mqtt_publish.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author qx
* @date 2023/07/18
* @desc MQTT配置类
*/
@Configuration
@ConfigurationProperties(prefix = "publish.mqtt")
@Getter
@Setter
public class MQTTConfig {
/**
* 服务端地址
*/
private String host;
/**
* 客户端的唯一标识
*/
private String clientid;
/**
* 服务端用户名
*/
private String username;
/**
* 服务端密码
*/
private String password;
/**
* 是否清理session
*/
private boolean cleansession;
/**
* 客户端的主题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 心跳时间
*/
private int keepalive;
/**
* 超时时间
*/
private int connectionTimeout;
}
4.建立MQTT服务端连接类MqttConnect
package com.example.mqtt_publish.util;
import com.example.mqtt_publish.config.MQTTConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/07/18
* @desc MQTT服务端连接类
*/
@Component
public class MqttConnect {
@Autowired
private MQTTConfig config;
public MqttConnect(MQTTConfig config) {
this.config = config;
}
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(config.isCleansession());
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray());
options.setConnectionTimeout(config.getConnectionTimeout());
//设置心跳
options.setKeepAliveInterval(config.getKeepalive());
return options;
}
public MqttConnectOptions getOptions(MqttConnectOptions options) {
options.setCleanSession(options.isCleanSession());
options.setUserName(options.getUserName());
options.setPassword(options.getPassword());
options.setConnectionTimeout(options.getConnectionTimeout());
options.setKeepAliveInterval(options.getKeepAliveInterval());
return options;
}
}
5.编写消息接收回调类PushCallback
package com.example.mqtt_publish.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.nio.charset.StandardCharsets;
/**
* @author qx
* @date 2023/07/18
* @desc 消息接收回调类
*/
@Slf4j
public class PushCallback implements MqttCallback {
private final MQTTServer mqttServer;
public PushCallback(MQTTServer mqttServer) {
this.mqttServer = mqttServer;
}
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.info("---------------------连接断开,可以做重连");
mqttServer.subsribeConnect();
while (true) {
try {
//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);
break;
} catch (Exception e) {
continue;
}
}
}
/**
* 发送消息,消息到达后处理方法
*
* @param token
*/
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
/**
* 接收所订阅的主题的消息并处理
*
* @param topic 主题
* @param message 消息
*/
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String result = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + result);
//这里可以针对收到的消息做处理,比如持久化
}
}
6.建立消息发送类MQTTServer类,实现主题消息的发送,以及订阅主题、取消订阅主题
package com.example.mqtt_publish.service;
import com.example.mqtt_publish.config.MQTTConfig;
import com.example.mqtt_publish.util.MqttConnect;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author qx
* @date 2023/07/18
* @desc MQTT消息发送
*/
@Service
@Slf4j
public class MQTTServer {
/* 订阅者客户端对象 */
private MqttClient subsribeClient;
/**
* 发布者客户端对象
* 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
* 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
* 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
* */
private MqttClient publishClient;
/* 主题对象 */
public MqttTopic topic;
/* 消息内容对象 */
public MqttMessage message;
@Autowired
private MqttConnect mqttConnect;
@Autowired
private MQTTConfig config;
public MQTTServer() {
log.info("8091上线了");
}
/**
* 发布者客户端和服务端建立连接
*/
public MqttClient publishConnect() {
//防止重复创建MQTTClient实例
try {
if (publishClient==null) {
//先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
//发布消息不需要回调连接
//client.setCallback(new PushCallback());
}
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!publishClient.isConnected()) {
publishClient.connect(options);
log.info("---------------------连接成功");
}else {//这里的逻辑是如果连接成功就重新连接
publishClient.disconnect();
publishClient.connect(mqttConnect.getOptions(options));
log.info("---------------------连接成功");
}
} catch (MqttException e) {
log.info(e.toString());
}
return publishClient;
}
/**
* 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
* 断线重连方法,如果是持久订阅,重连时不需要再次订阅
* 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
* true为非持久订阅
*/
public void subsribeConnect() {
try {
//防止重复创建MQTTClient实例
if (subsribeClient==null) {
//clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
subsribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
//如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
subsribeClient.setCallback(new PushCallback(MQTTServer.this));
}
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!subsribeClient.isConnected()) {
subsribeClient.connect(options);
}else {//这里的逻辑是如果连接成功就重新连接
subsribeClient.disconnect();
subsribeClient.connect(mqttConnect.getOptions(options));
}
log.info("----------客户端连接成功");
} catch (MqttException e) {
log.info(e.getMessage(), e);
}
}
/**
* 把组装好的消息发出去
* @param topic
* @param message
* @return
*/
public boolean publish(MqttTopic topic , MqttMessage message) {
MqttDeliveryToken token = null;
try {
//把消息发送给对应的主题
token = topic.publish(message);
token.waitForCompletion();
//检查发送是否成功
boolean flag = token.isComplete();
StringBuffer sbf = new StringBuffer(200);
sbf.append("给主题为'"+topic.getName());
sbf.append("'发布消息:");
if (flag) {
sbf.append("成功!消息内容是:"+new String(message.getPayload()));
} else {
sbf.append("失败!");
}
log.info(sbf.toString());
} catch (MqttException e) {
log.info(e.toString());
}
return token.isComplete();
}
/**
* MQTT发送指令:主要是组装消息体
* @param topic 主题
* @param data 消息内容
* @param qos 消息级别
*/
public void sendMQTTMessage(String topic, String data, int qos) {
try {
this.publishClient = publishConnect();
this.topic = this.publishClient.getTopic(topic);
message = new MqttMessage();
//消息等级
//level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
//level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
//level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
message.setQos(qos);
//如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
message.setRetained(false);
message.setPayload(data.getBytes());
//将组装好的消息发出去
publish(this.topic, message);
} catch (Exception e) {
log.info(e.toString());
e.printStackTrace();
}
}
/**
* 订阅端订阅消息
* @param topic 要订阅的主题
* @param qos 订阅消息的级别
*/
public void init(String topic, int qos) {
//建立连接
subsribeConnect();
//以某个消息级别订阅某个主题
try {
subsribeClient.subscribe(topic, qos);
} catch (MqttException e) {
log.info(e.getMessage(), e);
}
}
/**
* 订阅端取消订阅消息
* @param topic 要订阅的主题
*/
public void unionInit(String topic) {
//建立连接
subsribeConnect();
//取消订阅某个主题
try {
//MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
subsribeClient.unsubscribe(topic);
} catch (MqttException e) {
log.info(e.getMessage(), e);
}
}
}
7.创建控制层进行测试
package com.example.mqtt_publish.controller;
import com.example.mqtt_publish.service.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qx
* @date 2023/07/18
* @desc 测试
*/
@RestController
@RequestMapping("/publish")
public class PublishController {
@Autowired
private MQTTServer mqttServer;
/**
* 发送数据
*
* @param topic 主题
* @param msg 消息内容
* @param qos qos消息级别
* @return
*/
@RequestMapping("/sendMsg")
public String testSend(String topic, String msg, int qos) {
mqttServer.sendMQTTMessage(topic, msg, qos);
return "发送了一条主题是‘" + topic + "’,内容是:" + msg + ",消息级别 " + qos;
}
/**
* 订阅主题
*
* @param topic 主题名称
* @param qos qos消息级别
* @return
*/
@RequestMapping("/subsribe")
public String testSubsribe(String topic, int qos) {
mqttServer.init(topic, qos);
return "订阅主题" + topic + "成功";
}
}
8.启动程序,在浏览器输入http://localhost:8091/publish/sendMsg?topic=test&msg=hello&qos=0
控制台显示
2023-07-19 14:30:55.023 INFO 3712 --- [nio-8091-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 5 ms
2023-07-19 14:30:55.438 INFO 3712 --- [nio-8091-exec-1] c.e.mqtt_publish.service.MQTTServer : ---------------------连接成功
2023-07-19 14:30:55.441 INFO 3712 --- [nio-8091-exec-1] c.e.mqtt_publish.service.MQTTServer : 给主题为'test'发布消息:成功!消息内容是:hello
现在EMQ X服务端监控信息如下:
二、新建消息订阅者服务mqtt_subsribe
pom文件和发布端的文件一样,参考一下就可以了
1.application.properties的文件配置和发布端的类似,修改一下端口号和客户端ID就可以了
#端口
server.port=8092
#MQTT-服务端地址
publish.mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
publish.mqtt.username=admin
#MQTT-服务端密码
publish.mqtt.password=123456
#MQTT-是否清理session
publish.mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
publish.mqtt.clientid=mqtt_subsribe
#当前客户端的主题
publish.mqtt.defaultTopic=topic
#发送超时时间
publish.mqtt.timeout=1000
#心跳时间
publish.mqtt.keepalive=10
#连接超时时间
publish.mqtt.connectionTimeout=3000
2.把发布者端的PushCallback、MQTTConfig、MqttConnect复制过来。
3.把发布者端的MQTTServer服务复制过来修改一下名称为MQTTSubsribe。
4.创建测试控制层
package com.example.mqtt_subsribe.controller;
import com.example.mqtt_subsribe.service.MQTTSubsribe;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qx
* @date 2023/07/19
* @desc 测试
*/
@RestController
@RequestMapping("/subsribe")
public class SubsribeController {
@Autowired
private MQTTSubsribe mqttSubsribe;
/**
* 订阅主题
*
* @param topic 主题名称
* @param qos 消息级别
* @return
*/
@RequestMapping("/topic")
public String testTopic(String topic, int qos) {
mqttSubsribe.init(topic, qos);
return "订阅[" + topic + "]成功";
}
/**
* 取消订阅
*
* @param topic 主题名称
* @return
*/
@RequestMapping("/cancel")
public String cancelTopic(String topic) {
mqttSubsribe.unionInit(topic);
return "取消订阅[" + topic + "]成功";
}
}
5.启动程序,先订阅消息。
2023-07-19 15:16:58.222 INFO 5468 --- [nio-8092-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 5 ms
2023-07-19 15:16:58.647 INFO 5468 --- [nio-8092-exec-1] c.e.mqtt_subsribe.service.MQTTSubsribe : ----------客户端连接成功
6.我们重新调用发布者端的服务,发布指定主题的消息
我们的订阅端因为订阅了test这个主题,所以接收到了发布者端发送过来的消息。
订阅者服务的控制台显示如下:
接收消息主题 : test
接收消息Qos : 0
接收消息内容 : haha
此时的MQTT服务器页面显示了两台客户端
7.我们在订阅者服务里面调用取消订阅的接口。
发现MQTT服务器页面里面的订阅者客户端已经没有了订阅的数量
我们再次调用发布者端发布订阅的消息接口
然后我们在订阅者服务的控制台里面并没有打印出获取消息的日志。我们取消订阅的测试成功了。
总而言之, 所有的客户端都可以是订阅端,或者是发布端。任何一个客户端发布一条某个主题的消息,都会通过服务端转发给每一个订阅了该主题的客户端。
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论