SpringBoot整合MQTT实战:基于EMQX构建高可靠物联网通信,从零到一实现设备云端双向对话

一、引言

随着物联网(IoT)技术的快速发展,MQTT(Message Queuing Telemetry Transport)协议因其轻量级、低功耗和高效的特点,已成为物联网设备通信的事实标准。本文将详细介绍如何使用SpringBoot框架整合MQTT协议,基于开源MQTT代理EMQX实现设备与服务器之间的双向通信。

二、技术选型与环境准备

2.1 技术栈介绍

  • SpringBoot 2.7.x:简化Spring应用初始搭建和开发过程

  • EMQX 5.0:开源的大规模分布式MQTT消息服务器

  • Eclipse Paho:流行的MQTT客户端库

  • Lombok:简化Java Bean编写

2.2 环境准备

  1. 安装EMQX服务器(可使用Docker快速部署):

    docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14

  2. 确保Java开发环境(JDK 11+)和Maven已安装

三、SpringBoot项目集成MQTT

3.1 创建SpringBoot项目并添加依赖

pom.xml中添加必要的依赖:

<dependencies><!-- SpringBoot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- MQTT Paho Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>

3.2 配置MQTT连接参数

application.yml中添加配置:

mqtt:broker-url: tcp://localhost:1883username: emqxpassword: publicclient-id: springboot-serverdefault-topic: device/statustimeout: 30keepalive: 60qos: 1clean-session: true

创建配置类MqttProperties.java

@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {private String brokerUrl;private String username;private String password;private String clientId;private String defaultTopic;private int timeout;private int keepalive;private int qos;private boolean cleanSession;
}

3.3 实现MQTT客户端配置

创建MqttConfiguration.java

@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {private final MqttProperties mqttProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepalive());options.setCleanSession(mqttProperties.isCleanSession());options.setAutomaticReconnect(true);return options;}@Beanpublic IMqttClient mqttClient() throws MqttException {IMqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.connect(mqttConnectOptions());return client;}
}

3.4 实现MQTT消息发布服务

创建MqttPublisher.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;public void publish(String topic, String payload) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}MqttMessage message = new MqttMessage(payload.getBytes());message.setQos(mqttProperties.getQos());message.setRetained(true);mqttClient.publish(topic, message);log.info("MQTT message published to topic: {}, payload: {}", topic, payload);}public void publish(String payload) throws MqttException {publish(mqttProperties.getDefaultTopic(), payload);}
}

3.5 实现MQTT消息订阅服务

创建MqttSubscriber.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {subscribe(mqttProperties.getDefaultTopic());}public void subscribe(String topic) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);log.info("Subscribed to MQTT topic: {}", topic);}private void handleMessage(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);// 这里可以添加业务逻辑处理接收到的消息processMessage(topic, payload);}private void processMessage(String topic, String payload) {// 示例:解析JSON格式的消息try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);// 根据不同的topic和payload内容进行业务处理if (topic.startsWith("device/status")) {handleDeviceStatus(jsonNode);} else if (topic.startsWith("device/control")) {handleDeviceControl(jsonNode);}} catch (JsonProcessingException e) {log.error("Failed to parse MQTT message payload: {}", payload, e);}}private void handleDeviceStatus(JsonNode jsonNode) {// 处理设备状态上报String deviceId = jsonNode.get("deviceId").asText();String status = jsonNode.get("status").asText();log.info("Device {} status updated to: {}", deviceId, status);}private void handleDeviceControl(JsonNode jsonNode) {// 处理设备控制指令响应String deviceId = jsonNode.get("deviceId").asText();String command = jsonNode.get("command").asText();String result = jsonNode.get("result").asText();log.info("Device {} executed command {} with result: {}", deviceId, command, result);}
}

四、实现双向通信

4.1 服务器向设备发送控制指令

创建REST API接口用于发送控制指令:

@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {private final MqttPublisher mqttPublisher;@PostMapping("/control")public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {try {ObjectMapper mapper = new ObjectMapper();String payload = mapper.writeValueAsString(command);String topic = "device/control/" + command.getDeviceId();mqttPublisher.publish(topic, payload);return ResponseEntity.ok("Control command sent successfully");} catch (Exception e) {log.error("Failed to send control command", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send control command: " + e.getMessage());}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class DeviceCommand {private String deviceId;private String command;private Map<String, Object> params;}
}

4.2 设备模拟客户端

为了测试双向通信,我们可以创建一个简单的设备模拟客户端:

@Component
@Slf4j
public class DeviceSimulator {private final MqttPublisher mqttPublisher;private final MqttProperties mqttProperties;private IMqttClient deviceClient;public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {this.mqttPublisher = mqttPublisher;this.mqttProperties = mqttProperties;initDeviceClient();}private void initDeviceClient() {try {String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);deviceClient = new MqttClient(mqttProperties.getBrokerUrl(), deviceId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);deviceClient.connect(options);// 订阅控制主题String controlTopic = "device/control/" + deviceId;deviceClient.subscribe(controlTopic, (topic, message) -> {String payload = new String(message.getPayload());log.info("Device received control command: {}", payload);// 模拟设备执行命令并返回响应executeCommand(payload, deviceId);});// 模拟设备定期上报状态simulatePeriodicStatusReport(deviceId);} catch (MqttException e) {log.error("Failed to initialize device simulator", e);}}private void executeCommand(String payload, String deviceId) {try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);String command = jsonNode.get("command").asText();// 模拟命令执行Thread.sleep(1000); // 模拟执行耗时// 构造响应ObjectNode response = mapper.createObjectNode();response.put("deviceId", deviceId);response.put("command", command);response.put("result", "success");response.put("timestamp", System.currentTimeMillis());// 发布响应String responseTopic = "device/control/response/" + deviceId;mqttPublisher.publish(responseTopic, response.toString());} catch (Exception e) {log.error("Failed to execute command", e);}}private void simulatePeriodicStatusReport(String deviceId) {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {ObjectMapper mapper = new ObjectMapper();ObjectNode status = mapper.createObjectNode();status.put("deviceId", deviceId);status.put("status", "online");status.put("cpuUsage", Math.random() * 100);status.put("memoryUsage", 30 + Math.random() * 50);status.put("timestamp", System.currentTimeMillis());String topic = "device/status/" + deviceId;mqttPublisher.publish(topic, status.toString());} catch (Exception e) {log.error("Failed to send status report", e);}}, 0, 10, TimeUnit.SECONDS);}
}

五、测试与验证

5.1 测试设备状态上报

  1. 启动SpringBoot应用

  2. 观察日志输出,应该能看到设备模拟客户端定期上报状态信息

5.2 测试服务器控制指令

使用Postman或curl发送控制指令:

curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{"deviceId": "device-123456","command": "restart","params": {"delay": 5}
}'

5.3 验证双向通信

  1. 服务器发送控制指令到特定设备

  2. 设备接收指令并执行

  3. 设备发送执行结果回服务器

  4. 服务器接收并处理设备响应

六、高级功能扩展

6.1 消息持久化与QoS级别

  • QoS 0:最多一次,消息可能丢失

  • QoS 1:至少一次,消息不会丢失但可能重复

  • QoS 2:恰好一次,消息不丢失且不重复

根据业务需求选择合适的QoS级别:

// 在发布消息时设置QoS
message.setQos(2); // 使用最高级别的QoS

6.2 安全配置

  1. 启用TLS加密:

mqtt:broker-url: ssl://localhost:8883
  1. 配置EMQX的ACL规则,限制客户端权限

6.3 集群部署

对于生产环境,可以部署EMQX集群:

# 启动第一个节点
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14# 启动第二个节点
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14

6.4 消息桥接与WebHook

通过EMQX的桥接功能,可以将消息转发到其他MQTT服务器或Kafka等消息队列。也可以通过WebHook将消息推送到HTTP服务。

七、总结

本文详细介绍了如何使用SpringBoot整合MQTT协议,基于EMQX实现设备与服务器之间的双向通信。主要内容包括:

  1. SpringBoot项目中集成MQTT客户端

  2. 实现消息发布和订阅功能

  3. 设计双向通信机制

  4. 设备模拟与测试验证

  5. 高级功能扩展建议

这种架构非常适合物联网场景,能够支持海量设备连接和实时消息通信。开发者可以根据实际业务需求,在此基础上进行扩展和优化,构建稳定可靠的物联网平台。

八、参考资料

  1. EMQX官方文档:Introduction | EMQX 5.0 Docs

  2. Eclipse Paho项目:Eclipse Paho | The Eclipse Foundation

  3. MQTT协议规范:MQTT Version 3.1.1

  4. Spring Boot官方文档:Spring Boot

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/81824.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zData X zStorage 为什么采用全闪存架构而非混闪架构?

点击蓝字 关注我们 最近有用户问到 zData X 的存储底座 zStorage 分布式存储为什么采用的是全闪存架构而非混闪架构&#xff1f;主要原因还是在于全闪存架构在性能和可靠性方面具有更显著的优势。zData X 的上一代产品 zData 的早期版本也使用了SSD盘作为缓存的技术架构&#x…

Fiddler抓包教程->HTTP和HTTPS基础知识

1.简介 有的伙伴可能会好奇&#xff0c;不是讲解和分享抓包工具,怎么这里开始讲解HTTP和HTTPS协议了。这是因为你对HTTP协议越了解&#xff0c;你就能越掌握Fiddler的使用方法&#xff0c;反过来你越使用Fiddler&#xff0c;就越能帮助你了解HTTP协议。 Fiddler无论对开发人员…

虚拟机NAT模式获取不到ip

虚拟机NAT模式获取不到ip 如图所示 解决方案&#xff1a; 先查看NetworkManager是否启动 systemctl status NetworkManager如果没启动就启动一遍 使用DHCP手动获取一遍ip sudo dhclient ens33成功得到ip 这是后遇到了另一个问题&#xff0c;ip释放后&#xff0c;不能自动…

Sass 基础用法速览

Sass 基础用法速览 目录 Sass 基础用法速览1. 什么是 Sass&#xff1f;2. 安装 Sass2.1 使用 npm 安装&#xff08;推荐&#xff09;2.2 使用 Dart Sass&#xff08;官方推荐&#xff09;2.3 使用 GUI 工具 3. Sass 基本用法3.1 编译 Sass 4. Sass 语法详解4.1 变量4.2 嵌套4.3…

洛谷B3840 [GESP202306 二级] 找素数

题目描述 小明刚刚学习了素数的概念&#xff1a;如果一个大于 1 的正整数&#xff0c;除了 1 和它自身外&#xff0c;不能被其他正整数整除&#xff0c;则这个正整数是素数。现在&#xff0c;小明想找到两个正整数 A 和 B 之间&#xff08;包括 A 和 B&#xff09;有多少个素数…

idea部署本地仓库和连接放送远程仓库

1.下载git&#xff0c;安装好后任意地方又键会出现两个带git的东西 2.点击bash here的那个&#xff0c;召唤出git的小黑窗&#xff0c;输入 git config --global user.name "你自己取名" git config --global user.email "你自己输入你的邮箱" 3.打开id…

C++(20): 文件输入输出库 —— <fstream>

目录 一、 的核心功能 二、核心类及功能 三、核心操作示例 1. 文本文件写入&#xff08;ofstream&#xff09; 2. 文本文件读取&#xff08;ifstream&#xff09; 3. 二进制文件操作&#xff08;fstream&#xff09; 四、文件打开模式 五、文件指针操作 六、错误处理技巧…

elementUI 循环出来的表单,怎么做表单校验?

数据结构如下&#xff1a; diversionParamList: [ { length: null, positionNumber: null, value: null, } ] 思路&#xff1a;可根据 index 动态绑定 :props 属性值&#xff0c;校验规则写在:rules <div class"config-item" v-for"(item, index) in form.…

x-cmd install | Pillager:Go 语言打造的敏感信息文件系统扫描利器

目录 Pillager 的独特优势安装Pillager 的应用场景Pillager 的核心功能 还在为文件系统中潜在的敏感信息泄露而担忧吗&#xff1f;Pillager 是一款由 Go 语言编写的强大工具&#xff0c;旨在帮助你轻松扫描文件系统&#xff0c;发现隐藏的密钥、密码、API 令牌等敏感信息。 Pil…

大模型(2)——提示工程(Prompt Engineering)

文章目录 一、提示工程的核心概念为什么需要提示工程&#xff1f; 二、提示设计的基本原则三、实用提示工程技巧1. 角色设定法2. 示例引导法&#xff08;Few-Shot Learning&#xff09;3. 分阶段提问4. 负面约束5. 温度&#xff08;Temperature&#xff09;控制 四、不同任务类…

环境搭建

一个简单的请求在加入spring security之前的样子, 在浏览器中输入地址就可以直接访问 <!--引入spring security依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId>&…

院校机试刷题第六天:1134矩阵翻转、1052学生成绩管理、1409对称矩阵

一、1134矩阵翻转 1.题目描述 2.解题思路 很简单的模拟题&#xff0c;甚至只是上下翻转&#xff0c;遍历输出的时候先把最下面那一行输出即可。 3.代码 #include <iostream> #include <vector> using namespace std;int main() {int n;cin >> n;vector&l…

软件架构风格系列(5):数据共享架构

数据共享架构&#xff1a;如何让数据在系统间自由“流淌”&#xff1f; 引言 在企业数字化转型的浪潮中&#xff0c;“数据孤岛”成为横在业务创新面前的大山&#xff1a;营销系统的用户画像无法同步到客服系统&#xff0c;供应链的库存数据难以为销售决策提供支撑…… 此时&…

SAP-13-内表与工作区

内表 作用&#xff1a; 内表是 ABAP 程序中一种非常重要的数据结构&#xff0c;它类似于数据库表&#xff0c;用于在程序运行时存储和处理数据。与数据库表不同的是&#xff0c;内表存在于程序的内存中&#xff0c;数据的读写速度比从数据库中读取要快很多。它可以存储多条具有…

dali本地安装和使用

Dali&#xff08;Distance-matrix ALIgnment&#xff09;是一种广泛使用的蛋白质结构比对工具&#xff0c;主要用于比较蛋白质三维结构之间的相似性。它通过计算蛋白质结构之间的距离矩阵来评估结构之间的相似性&#xff0c;并生成比对结果。 1. 安装 wget http://ekhidna2.b…

Unreal 从入门到精通之SceneCaptureComponent2D实现UI层3D物体360°预览

文章目录 前言SceneCaptureComponent2D实现步骤新建渲染目标新建材质UI控件激活3DPreview鼠标拖动旋转模型最后前言 我们在(电商展示/角色预览/装备查看)等应用场景中,经常会看到这种3D展示的页面。 即使用相机捕获一个3D的模型的视图,然后把这个视图显示在一个UI画布上,…

2024CCPC辽宁省赛 个人补题 ABCEGJL

Dashboard - 2024 CCPC Liaoning Provincial Contest - Codeforces 过题难度 B A J C L E G 铜奖 4 953 银奖 6 991 金奖 8 1664 B&#xff1a; 模拟题 // Code Start Here string s;cin >> s;reverse(all(s));cout << s << endl;A&#xff1a;很…

Java基础 Day17

一、递归 方法直接或者间接调用本身 将大问题, 层层转化为一个与原问题相似的、规模更小的问题来解决 二、异常 程序在编译或执行过程中&#xff0c;出现的非正常的情况 (错误) 语法错误不是异常 1、阅读异常信息 从下往上看&#xff1a;发生异常的位置、异常名称、发生异…

hook原理和篡改猴编写hook脚本

hook原理&#xff1a; hook是常用于js反编译的技术&#xff1b;翻译就是钩子&#xff0c;他的原理就是劫持js的函数然后进行篡改 一段简单的js代码 &#xff1a;这个代码是顺序执行的 function test01(){console.log(test01)test02() } function test02(){console.log(02)tes…

使用 Vue 展示 Markdown 文本

使用 Vue 展示 Markdown 文本可以通过以下几种方法&#xff1a; 方法 1&#xff1a;使用 v-html 指令 可以使用 v-html 指令来渲染 Markdown 文本&#xff1a; <template><div v-html"markdownText"></div> </template> <script>e…