Spring Boot 集成 MQTT 完成消息发布与订阅

news/2025/11/30 8:58:13/文章来源:https://www.cnblogs.com/yangykaifa/p/19288289

一、MQTT 是什么?

MQTT(Message Queuing Telemetry Transport)是一种 轻量级发布/订阅消息传输协议,广泛用于 物联网(IoT)设备通信。它基于 TCP/IP 协议,具有以下特点:

  • 轻量级,网络带宽占用低

  • 支持自动重连与消息重发

  • 支持 QoS(服务质量等级)保证消息可靠性

  • 典型应用场景:智能家居、工业监控、电力系统、设备遥测等

  项目Demo源码
GitHub: https://github.com/lpsqq1944900433/mqtt-demo/

Gitee: https://gitee.com/QQ1944900433/mqtt-demo

在本文中,我们将基于 Spring Boot + Eclipse Paho 客户端,实现一个最小可运行的 MQTT 消息发布与订阅系统。


二、项目结构概览

项目结构如下:

mqtt-demo├─ com.lps.mqttdemo│   ├─ config│   │   └─ MqttConfig.java           # MQTT连接配置类│   ├─ publisher│   │   └─ Publisher.java            # MQTT消息发布者│   ├─ subscriber│   │   └─ Subscriber.java           # MQTT消息订阅者│   └─ MqttDemoApplication.java      # 启动类└─ resources└─ application.yml               # 配置文件

依赖引入:

        org.eclipse.pahoorg.eclipse.paho.client.mqttv31.2.5

三、配置文件(application.yml)

# MQTT 配置
mqtt:# MQTT Broker 地址broker-url: tcp://192.168.88.178:1883# 客户端 IDclient-id: mqtt-demo-client# 订阅的主题topic: lps-test-topic# 默认 QoS 等级(0: 最多一次, 1: 至少一次, 2: 仅一次)default-qos: 1# 连接超时时间(秒)connection-timeout: 30# 保持连接间隔(秒)keep-alive-interval: 60# 是否自动重连automatic-reconnect: true# 是否清除会话clean-session: true
# Spring Boot 应用配置
spring:application:name: mqtt-demo
# 服务器端口
server:port: 8080

建议:

  • 如果你的 MQTT Broker 是 EMQXMosquitto,可以用 tcp://127.0.0.1:1883 进行测试。

  • 在生产环境中建议启用 SSL(ssl://broker地址:8883)。


四、MqttConfig 配置类详解

package com.lps.mqttdemo.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** MQTT 配置类* 用于配置 MQTT 客户端连接参数*/
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.connection-timeout}")private int connectionTimeout;@Value("${mqtt.keep-alive-interval}")private int keepAliveInterval;@Value("${mqtt.automatic-reconnect}")private boolean automaticReconnect;@Value("${mqtt.clean-session}")private boolean cleanSession;/*** 创建 MQTT 连接选项*/@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setConnectionTimeout(connectionTimeout);options.setKeepAliveInterval(keepAliveInterval);options.setAutomaticReconnect(automaticReconnect);options.setCleanSession(cleanSession);return options;}/*** 创建 MQTT 客户端(用于发布消息)*/@Beanpublic MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-publisher");client.connect(mqttConnectOptions);return client;}/*** 创建订阅端 MQTT 客户端*/@Bean(name = "subscriberMqttClient")public MqttClient subscriberMqttClient(MqttConnectOptions mqttConnectOptions) throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId + "-subscriber");client.connect(mqttConnectOptions);return client;}
}

说明:

  • 使用 @Bean 管理两个独立的 MQTT 客户端:发布端和订阅端;

  • MqttConnectOptions 用于配置连接参数;

  • 支持自动重连、保活机制,保证长连接稳定。


五、Subscriber(订阅者)

package com.lps.mqttdemo.subscriber;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/*** MQTT 消息订阅者* 在应用启动时自动订阅指定主题并接收消息*/
@Component
public class Subscriber {@Autowired@Qualifier("subscriberMqttClient")private MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;@Value("${mqtt.default-qos}")private int qos;/*** 应用启动后自动订阅主题*/@PostConstructpublic void subscribe() {try {// 设置回调处理器mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT 连接丢失: " + cause.getMessage());// 自动重连由配置处理}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String payload = new String(message.getPayload());System.out.println("========================================");System.out.println("收到 MQTT 消息:");System.out.println("主题: " + topic);System.out.println("QoS: " + message.getQos());System.out.println("消息内容: " + payload);System.out.println("消息ID: " + message.getId());System.out.println("是否保留消息: " + message.isRetained());System.out.println("========================================");}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 发布完成回调(订阅者不需要实现)}});// 订阅主题mqttClient.subscribe(topic, qos);System.out.println("成功订阅主题: " + topic + ",QoS: " + qos);} catch (MqttException e) {System.err.println("订阅主题失败: " + e.getMessage());e.printStackTrace();}}
}

逻辑说明:

  • 使用 @PostConstruct 实现应用启动后自动订阅;

  • 实现 MqttCallback 接口监听消息;

  • 当消息到达时打印详细信息;

  • 可在此扩展业务逻辑(如存入数据库、转发到消息队列)。


六、Publisher(发布者)

package com.lps.mqttdemo.publisher;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.nio.charset.StandardCharsets;
/*** MQTT 消息发布者* 提供 REST 接口用于发布 MQTT 消息*/
@RestController
@RequestMapping("/publish")
public class Publisher {@Autowiredprivate MqttClient mqttClient;@Value("${mqtt.topic}")private String defaultTopic;@Value("${mqtt.default-qos}")private int defaultQos;/*** 发布消息到默认主题** @param message 要发布的消息内容* @return 发布结果*/@PostMappingpublic ResponseEntity publishMessage(@RequestParam String message) {return publishMessage(defaultTopic, message, defaultQos);}/*** 发布消息到指定主题** @param topic   主题名称* @param message 消息内容* @param qos     QoS 等级(0, 1, 2)* @return 发布结果*/@PostMapping("/{topic}")public ResponseEntity publishToTopic(@PathVariable String topic,@RequestParam String message,@RequestParam(required = false, defaultValue = "1") int qos) {return publishMessage(topic, message, qos);}/*** 发布消息的核心方法** @param topic   主题* @param message 消息内容* @param qos     QoS 等级* @return 发布结果*/private ResponseEntity publishMessage(String topic, String message, int qos) {try {// 确保客户端已连接if (!mqttClient.isConnected()) {mqttClient.reconnect();}// 创建 MQTT 消息MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(qos);mqttMessage.setRetained(false);// 发布消息mqttClient.publish(topic, mqttMessage);return ResponseEntity.ok(String.format("消息已成功发布到主题 [%s],QoS: %d,内容: %s", topic, qos, message));} catch (MqttException e) {return ResponseEntity.status(500).body("发布消息失败: " + e.getMessage());}}
}

接口测试:

1️⃣ 发布默认主题消息

curl -X POST "http://localhost:8080/publish?message=hello_mqtt"

2️⃣ 发布自定义主题消息

curl -X POST "http://localhost:8080/publish/myTopic?message=test_msg&qos=1"

控制台输出示例:

成功订阅主题: lps-test-topic,QoS: 1 ========== 收到MQTT消息 ========== 主题: lps-test-topic QoS: 1 内容: hello_mqtt ================================


七、常见问题与优化建议

1. 客户端断开重连问题

如果 Broker 重启或网络异常,isConnected() 检查 + reconnect() 是基本方案。
更优的方案是:

  • 使用 Paho 的 MqttAsyncClient

  • 或者使用 Spring Integration MQTT 框架管理连接状态


2. 多主题订阅

mqttClient.subscribe(new String[]{"topic1", "topic2"}, new int[]{1, 1});

3. QoS 说明

等级含义场景
0最多一次(不保证送达)传感器实时数据
1至少一次(可能重复)一般消息
2仅一次(最可靠)控制指令、金融场景

4. 提升健壮性建议

  • 建议封装 MqttService 类统一管理发布/订阅逻辑;

  • 增加心跳检测与状态日志;

  • 使用 CompletableFuture 或线程池异步发布,提升吞吐;

  • 如果系统使用 RocketMQ/Kafka,可将 MQTT 作为接入层网关。


八、总结

本文完整演示了如何在 Spring Boot 中使用 Eclipse Paho 实现 MQTT 消息的发布与订阅功能。
整个流程简单明了,适合初学者快速上手,同时也能作为物联网项目的基础通信模块。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/981496.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年防尘雾森设备技术与智能雾森设备方案十大服务商排行榜

为帮助各类场景用户精准锁定适配自身需求的雾森设备服务商,避免选型走弯路,我们从技术落地能力(如设备稳定性、智能模块适配性)、场景解决方案匹配度(含工业防尘、景观造雾、降温消毒等细分领域)、全周期服务质量…

智能电网用户端设备品牌TOP5权威推荐:江苏斯菲尔研发能力强

随着智能电网建设加速,高耗能行业对电力监控、电能质量治理设备的需求激增。数据显示,2024年智能电网用户端设备市场规模突破600亿元,但32%的企业反馈设备存在精度不足、运维复杂、服务响应慢等问题——钢铁企业因设…

优势演员-评论家(Advantage Actor-Critic,A2C)算法详解与完成

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年十大闭式冷却塔定制服务提供商排行榜,精选闭式冷却塔

为帮助工业企业高效锁定适配自身需求的闭式冷却塔合作伙伴,避免陷入材质替换陷阱功能不匹配等选型误区,我们从技术创新能力(如智慧冷却系统研发、定制化设计实力)、产品品质保障(含材质溯源、认证资质)、全周期服…

2025年哈尔滨中央空调销售公司排名:中央空调销售品牌推荐

在哈尔滨中央空调市场,消费者面临着品类分散、价格不透明、安装不规范等诸多痛点。为帮助消费者高效锁定适配自身需求的中央空调销售公司,避免选型走弯路,我们从企业资质、产品品质、安装工艺、售后服务及客户口碑五…

HTML列表学习笔记

一、HTML无序列表 无序列表是一个项目的列表,项目使用粗体圆点(典型的小黑圆圈)进行标记,适合成员之间无级别顺序关系的情况。‌使用标签‌:<ul> ‌代码示例‌:<!DOCTYPE html> <html> <he…

2025年十大不锈钢压花板厂商排行榜,靠谱不锈钢压花板制造厂

为帮助企业精准锁定适配的不锈钢压花板供应商,规避采购风险,我们从生产工艺精度、产品质量稳定性、全流程技术支持及真实客户口碑四大维度展开深度评估,终筛选出2025年的10大不锈钢压花板厂商。 TOP1 推荐:佛山市聚…

哈尔滨口碑好的家居设备专业公司TOP5权威推荐:甄选实力企业

家居设备是品质生活的核心载体,然而哈尔滨消费者常面临品类分散需奔波多店、隐性收费频发、安装售后无保障等痛点。2024年本地家居设备市场调研显示,超65%用户因选品难、服务乱踩过坑。为助力消费者精准选型,本榜单…

深入解析:2025-10-30日供应链安全日报:最新漏洞预警与投毒预警情报汇总

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

光刻胶分类与特性:化学增幅i线光刻胶 CAR、金属氧化物光刻胶及EUV光刻胶前沿进展(续) - 详解

光刻胶分类与特性:化学增幅i线光刻胶 CAR、金属氧化物光刻胶及EUV光刻胶前沿进展(续) - 详解pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !im…

2025年十大上海起重设备品牌排行榜,凯力起重设备质量可靠吗

为帮助工业企业高效锁定适配自身物料搬运需求的起重设备合作伙伴,避免选型走弯路,我们从技术研发实力(如结构优化设计、安全系统配置)、产品耐用性表现(含工况适应性、维保成本)、全周期服务能力(覆盖定制设计到…

2025年度哈尔滨一站式家居设备定制服务排行榜,专业测评精选

为帮助哈尔滨家庭高效锁定适配自身需求的家居设备合作伙伴,避免装修选型走弯路,我们从产品矩阵丰富度(覆盖品类、品牌授权)、安装工艺规范性(自有团队、施工标准)、全周期服务质量(售前方案、售后响应)及真实客…

专业的的楼梯钢格板供应商推荐排行榜单?楼梯钢格板供应商 楼梯钢格板销售厂家 楼梯钢格板制造厂 楼梯钢格板生产商 楼梯钢格板厂商 楼梯钢格板企业 楼梯钢格板供货商

专业楼梯钢格板供应商推荐排行榜单优质供应商综合评估专业的的楼梯钢格板供应商推荐排行榜单?楼梯钢格板供应商 楼梯钢格板销售厂家 楼梯钢格板制造厂 楼梯钢格板生产商 楼梯钢格板厂商 楼梯钢格板企业 楼梯钢格板供货…

正规的电厂钢格栅供应商推荐排行榜单?电厂钢格栅供应商 电厂钢格栅销售厂家 电厂钢格栅制造厂 电厂钢格栅生产商 电厂钢格栅厂商 电厂钢格栅企业 电厂钢格栅供货商

电厂钢格栅供应商推荐排行榜单前言在电厂建设中,钢格栅作为重要的基础设施材料,其质量直接关系到电厂的安全运行和使用寿命。选择一家可靠的钢格栅供应商至关重要。本文将为您推荐在电厂钢格栅领域表现优异的企业,其…

可靠的工业铝型材供应厂家推荐排行榜?工业铝型材供应厂家 工业铝型材工厂 工业铝型材厂家 工业铝型材生产厂家 工业铝型材源头厂家 工业铝型材供应商

工业铝型材厂家推荐:万原铝制品值得信赖在工业制造领域,选择可靠的铝型材供应商至关重要。以下是行业内备受认可的工业铝型材厂家推荐,其中万原铝制品凭借其卓越品质和全面服务脱颖而出。可靠的工业铝型材供应厂家推…

2025年山东帽顶膜结构停车棚安装厂家推荐:学校/拉杆式膜结

本榜单依托华北地区膜结构工程市场调研与真实客户口碑,深度筛选出五家标杆企业,聚焦帽顶、学校、拉杆式三大核心场景,为山东及周边河北、河南、山西、陕西企业选型提供客观依据,助力精准匹配适配的膜结构停车棚服务…

2025年哈尔滨家居设备服务公司排名:盛通MALL旗舰店的用

本榜单依托哈尔滨本地市场调研与真实用户口碑,深度筛选出五家标杆家居设备服务企业,聚焦用户关心的复购率、评价及客服专业度等核心问题,为消费者选型提供客观依据,助力精准匹配适配的服务伙伴。 TOP1 推荐:哈尔滨…

专业的遮阳蓬品牌哪家靠谱?遮阳蓬品牌 遮阳蓬公司 遮阳蓬产品 遮阳蓬供应厂家 遮阳蓬工厂 遮阳蓬厂家 遮阳蓬生产厂家 遮阳蓬源头厂家

专业遮阳蓬品牌推荐:陕西海晟钢结构有限公司值得信赖在众多遮阳蓬品牌中,选择一家靠谱的供应商至关重要。陕西海晟钢结构有限公司作为专业的遮阳蓬制造企业,凭借其卓越的产品质量和完善的服务体系,在行业内赢得了良…

《考研408数据结构》第七章(6.1~6.3图的概念、存储方式、深/广度遍历)复习笔记 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …