ruoyi-vue 整合EMQX接收MQTT协议数据

EMQX安装完成后,需要搭建客户端进行接收数据进一步对数据处理,下面介绍基于若依分离版开源框架来整合EMQX方法。

1.application.yml 添加代码

mqtt:hostUrl: tcp://localhost:1883username: devpassword: devclient-id: MQTT-CLIENT-DEVcleanSession: truereconnect: truetimeout: 100keepAlive: 100defaultTopic: client/dev/reportserverTopic: server/dev/reportisOpen: trueqos: 0

2.pom.xml 引入依赖

        <!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--配置文件报错问题--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency>

3.新建 MqttAcceptCallback

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description : MQTT接受服务的回调类* @Author : lsyong* @Date : 2023/8/1 16:29*/@Component
public class MqttAcceptCallback implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);@Autowiredprivate MqttAcceptClient mqttAcceptClient;@Autowiredprivate MqttProperties mqttProperties;/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以重连");if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {logger.info("【emqx重新连接】....................................................");mqttAcceptClient.reconnection();}}/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("【接收消息主题】:" + topic);logger.info("【接收消息Qos】:" + mqttMessage.getQos());logger.info("【接收消息内容】:" + new String(mqttMessage.getPayload()));//        int i = 1/0;}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题【" + topic + "】发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("【消息内容】:" + s);} catch (Exception e) {logger.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");// 以/#结尾表示订阅所有以test开头的主题// 订阅所有机构主题mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0);}
}

4.新建 MqttAcceptClient

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description : MQTT接受服务的客户端* @Author : lsyong* @Date : 2023/8/1 16:26*/
@Component
public class MqttAcceptClient {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);@Autowiredprivate MqttAcceptCallback mqttAcceptCallback;@Autowiredprivate MqttProperties mqttProperties;public static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttAcceptClient.client = client;}/*** 客户端连接*/public void connect() {MqttClient client;try {client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(),new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setAutomaticReconnect(mqttProperties.getReconnect());options.setCleanSession(mqttProperties.getCleanSession());MqttAcceptClient.setClient(client);// 设置回调client.setCallback(mqttAcceptCallback);client.connect(options);} catch (Exception e) {logger.error("MqttAcceptClient connect error,message:{}", e.getMessage());e.printStackTrace();}}/*** 重新连接*/public void reconnection() {try {client.connect();} catch (MqttException e) {logger.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("========================【开始订阅主题:" + topic + "】========================");try {client.subscribe(topic, qos);} catch (MqttException e) {logger.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());e.printStackTrace();}}/*** 取消订阅某个主题** @param topic*/public void unsubscribe(String topic) {logger.info("========================【取消订阅主题:" + topic + "】========================");try {client.unsubscribe(topic);} catch (MqttException e) {logger.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());e.printStackTrace();}}
}

5.新建 MqttCondition

package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;/*** @Description : 自定义配置,通过这个配置,来控制启动项目的时候是否启动mqtt* @Author : lsyong* @Date : 2023/8/1 16:32*/
public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {//1、能获取到ioc使用的beanfactoryConfigurableListableBeanFactory beanFactory = context.getBeanFactory();//2、获取类加载器ClassLoader classLoader = context.getClassLoader();//3、获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.valueOf(isOpen);}
}

6.新建 MqttConfig

package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;/*** @Description : 启动服务的时候开启监听客户端* @Author : lsyong* @Date : 2023/8/1 16:35*/
@Configuration
public class MqttConfig {@Autowiredprivate MqttAcceptClient mqttAcceptClient;/*** 订阅mqtt** @return*/@Conditional(MqttCondition.class)@Beanpublic MqttAcceptClient getMqttPushClient() {mqttAcceptClient.connect();return mqttAcceptClient;}
}

7.新建 MqttProperties

package com.ruoyi.iot.mqtt;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Description : MQTT配置信息* @Author : lsyong* @Date : 2023/8/1 16:25*/
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;/*** 默认连接主题,以/#结尾表示订阅所有以test开头的主题*/private String defaultTopic;/*** 默认服务器发送主题前缀,格式:server:${env}:report:${topic}*/private String serverTopic;/*** 超时时间*/private int timeout;/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;/*** 是否断线重连*/private Boolean reconnect;/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;/*** 连接方式*/private Integer qos;/*** 获取默认主题,以/#结尾表示订阅所有以test开头的主题** @return*/public String getDefaultTopic() {return defaultTopic + "/#";}/*** 获取服务器发送主题,格式:server/${env}/report/${topic}** @param topic* @return*/public String getServerTopic(String topic) {return serverTopic + "/" + topic;}
}

8.新建 MqttSendCallBack

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;/*** @Description : MQTT发送客户端的回调类* @Author : lsyong* @Date : 2023/8/1 16:31*/
@Component
public class MqttSendCallBack implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以重连");}/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {logger.info("【接收消息主题】: " + topic);logger.info("【接收消息Qos】: " + mqttMessage.getQos());logger.info("【接收消息内容】: " + new String(mqttMessage.getPayload()));}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题【" + topic + "】发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");logger.info("【消息内容】:" + s);} catch (Exception e) {logger.error("MqttSendCallBack deliveryComplete error,message:{}", e.getMessage());e.printStackTrace();}}/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {logger.info("============================= 客户端【" + MqttAcceptClient.client.getClientId() + "】连接成功!=============================");}
}

9.新建 MqttSendClient

package com.ruoyi.iot.mqtt;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** @Description : MQTT发送客户端* @Author : lsyong* @Date : 2023/8/1 16:30*/
@Component
public class MqttSendClient {private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);@Autowiredprivate MqttSendCallBack mqttSendCallBack;@Autowiredprivate MqttProperties mqttProperties;public MqttClient connect() {MqttClient client = null;try {String uuid = UUID.randomUUID().toString().replaceAll("-", "");client = new MqttClient(mqttProperties.getHostUrl(), uuid, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setCleanSession(true);options.setAutomaticReconnect(false);// 设置回调client.setCallback(mqttSendCallBack);client.connect(options);} catch (Exception e) {logger.error("MqttSendClient connect error,message:{}", e.getMessage());e.printStackTrace();}return client;}/*** 发布消息** @param retained 是否保留* @param topic    主题,格式: server:${env}:report:${topic}* @param content  消息内容*/public void publish(boolean retained, String topic, String content) {MqttMessage message = new MqttMessage();message.setQos(mqttProperties.getQos());message.setRetained(retained);message.setPayload(content.getBytes());MqttDeliveryToken token;MqttClient mqttClient = connect();try {mqttClient.publish(mqttProperties.getServerTopic(topic), message);} catch (MqttException e) {logger.error("MqttSendClient publish error,message:{}", e.getMessage());e.printStackTrace();} finally {disconnect(mqttClient);close(mqttClient);}}/*** 关闭连接** @param mqttClient*/public static void disconnect(MqttClient mqttClient) {try {if (mqttClient != null)mqttClient.disconnect();} catch (MqttException e) {logger.error("MqttSendClient disconnect error,message:{}", e.getMessage());e.printStackTrace();}}/*** 释放资源** @param mqttClient*/public static void close(MqttClient mqttClient) {try {if (mqttClient != null)mqttClient.close();} catch (MqttException e) {logger.error("MqttSendClient close error,message:{}", e.getMessage());e.printStackTrace();}}
}

10.新建测试类 MqttController

package com.ruoyi.iot.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Description : 测试类* @Author : lsyong* @Date : 2023/8/1 16:35*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttSendClient mqttSendClient;@GetMapping(value = "/publishTopic")public String publishTopic(String topic, String sendMessage) {System.out.println("topic:" + topic);System.out.println("message:" + sendMessage);this.mqttSendClient.publish(false, topic, sendMessage);return "topic:" + topic + "\nmessage:" + sendMessage;}}

放开测试类的访问权限,在com.ruoyi.framework.config 路径下的 SecurityConfig 类中添加如下代码

 .antMatchers("/mqtt/**").permitAll()

11.启动项目进行测试

如果连接不上,确认emqx是否启动成功,详细可以查看Windows安装EMQX(搭建MQTT服务)-CSDN博客

连接成功后可以登入EMQX去查看

浏览器访问 http://localhost:8080/mqtt/publishTopic?sendMessage=你好啊

控制台打印

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/183581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【物联网与大数据应用】Hadoop数据处理

Hadoop是目前最成熟的大数据处理技术。Hadoop利用分而治之的思想为大数据提供了一整套解决方案&#xff0c;如分布式文件系统HDFS、分布式计算框架MapReduce、NoSQL数据库HBase、数据仓库工具Hive等。 Hadoop的两个核心解决了数据存储问题&#xff08;HDFS分布式文件系统&#…

mysql5.7生成SSL证书

1、创建 CA 私钥和 CA 证书 &#xff08;1&#xff09;下载并安装openssl,将bin目录配置到环境变量&#xff1b; &#xff08;2&#xff09;设置openssl.cfg路径&#xff08;若不设置会报错&#xff0c;找不到openssl配置文件&#xff09; set OPENSSL_CONFG:\Program Files\…

nexus 制品库管理

目录 一、nexus 介绍 二、nexus 支持的仓库 三、nexus 部署 四、nexus 数据备份 五、创建一个内网yum源 六、创建一个代理yum仓库 七、jenkins 使用 nexus插件 7.1 jenkins 安装插件 7.2 配置 maven 工程 7.3 查看构建和上传 一、nexus 介绍 Nexus 是一个强大的仓库管…

在氮化镓和AlGaN上的湿式数字蚀刻

引言 由于其独特的材料特性&#xff0c;III族氮化物半导体广泛应用于电力、高频电子和固态照明等领域。加热的四甲基氢氧化铵(TMAH)和KOH3处理的取向相关蚀刻已经被用于去除III族氮化物材料中干法蚀刻引起的损伤&#xff0c;并缩小垂直结构。 不幸的是&#xff0c;由于化学蚀…

基于协同过滤算法的职业发展推荐系统设计

点我完整下载&#xff1a;基于协同过滤算法的职业发展推荐系统设计 基于协同过滤算法的职业发展推荐系统设计 Design of Career Development Recommendation System Based on Collaborative Filtering Algorithm 目录 目录 2 摘要 3 关键词 3 第一章 引言 3 1.1 研究背景 3 1.2…

谱方法学习笔记-下(超详细)

谱方法学习笔记&#x1f4d2; 谱方法学习笔记-上(超详细) 声明&#xff1a;鉴于CSDN使用 K a T e X KaTeX KaTeX 渲染公式&#xff0c; KaTeX \KaTeX KATE​X 与 L a T e X LaTeX LaTeX 不同&#xff0c;不支持直接的交叉引用命令&#xff0c;如\label和\eqref。 KaTeX \KaT…

MySQL报错:sql_mode=only_full_group_by 解决方法含举例

方法一&#xff1a;直接修改数据库配置 首先&#xff0c;打开数据库&#xff0c;输入 select global.sql_mode;这个时候&#xff0c;就会返回得到以下的信息&#xff1a;&#xff08;不同电脑返回的信息可能不同&#xff09; ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ENG…

Docker + Jenkins + Nginx实现前端自动化部署

目录 前言一、前期准备工作1、示例环境2、安装docker3、安装Docker Compose4、安装Git5、安装Nginx和Jenkinsnginx.confdocker-compose.yml 6、启动环境7、验证Nginx8、验证Jenkins 二、Jenkins 自动化部署配置1、设置中文2、安装Publish Over SSH、NodeJS&#xff08;1&#x…

Stream API练习题

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 考虑到Stream API在实际…

关于前端学习的思考-浮动元素和块级元素的关系

先摆关系&#xff1a;浮动元素嵌套块级元素&#xff0c;浮动元素和块级元素是上下关系。 1、浮动元素为父盒子&#xff0c;块级元素为子盒子。 父盒子为浮动元素&#xff0c;子盒子不会继承。如图floatnone&#xff1b; 摆结论&#xff1a;子盒子为行内元素&#xff0c;行内块…

37.从0到上线三天搭建个人网站(第一天)

点赞收藏加关注&#xff0c;你也能住大别墅&#xff01; 挑战三天搭建个人网站 从0到上线 一、项目的主要功能 1.作为自己在网上的一个工作室。 2.发帖 3.展示个人项目连接 4.介绍自己&#xff08;没准儿还能接点活儿&#xff09; 二、UI风格参考 三、技术选型 1.前端&a…

设计规则:模块化的力量

这是一本比较冷门的书**《设计规则&#xff1a;模块化的力量》**&#xff0c;虽然豆瓣上只有58个评价&#xff0c;但是确实能学到很多东西。 这本书对我非常深远。不是是投资&#xff0c;创业&#xff0c;还是其他领域&#xff0c;模块化思想都能帮上你。这本书告诉我们生万物…

数据结构中的二分查找(折半查找)

二分法&#xff1a;顾名思义&#xff0c;把问题一分为2的处理&#xff0c;是一种常见的搜索算法&#xff0c;用于在有序数组或这有序列表中查找指定元素的位置&#xff0c;它的思想是将待搜索的区间不断二分&#xff0c;然后比较目标值与中间元素的大小关系&#xff0c;然后确定…

第八天:信息打点-系统端口CDN负载均衡防火墙

信息打点-系统篇&端口扫描&CDN服务&负载均衡&WAF防火墙 一、知识点 1、获取网络信息-服务器厂商&#xff1a; 阿里云&#xff0c;腾讯云&#xff0c;机房内部等。 网络架构&#xff1a; 内外网环境。 2、获取服务信息-应用协议-内网资产&#xff1a; FTP…

OD机考真题搜集:矩阵中非1的元素个数

题目 存在一个m*n的二维数组,其成员取值范围为0,1,2。其中值为1的元素具备同化特性,每经过1S,将上下左右值为0的元素同化为1。而值为2的元素,免疫同化。将数组所有成员随机初始化为0或2,再将矩阵的[0,0]元素修改成1,在经过足够长的时间后求矩阵中有多少个元素是0或2(即…

芯片及设计制造 - 小记

文章目录 关于芯片制造材料 芯片分类ASICASSPSoCFPGA可编程SoC或SoC FPGA微处理器&#xff08;μP 或 MPU&#xff09;微控制器&#xff08;μC 或 MCU&#xff09; 芯片设计和制造过程&#xff1a;需求 & 方案架构设计架构验证形式验证/属性检查 前端设计RTL 设计逻辑综合…

Making Reconstruction-based Method Great Again for Video Anomaly Detection

Making Reconstruction-based Method Great Again for Video Anomaly Detection 文章信息&#xff1a; 发表于ICDM 2022&#xff08;CCF B会议&#xff09; 原文地址&#xff1a;https://arxiv.org/abs/2301.12048 代码地址&#xff1a;https://github.com/wyzjack/MRMGA4VAD…

Android : Handler -简单应用

主线程才能操作UI界面 实现子线程处理主线程UI MainActivity.java package com.example.myhandler;import androidx.annotation.NonNull; import androidx.appcompat.app.AppCompatActivity;import android.annotation.SuppressLint; import android.os.Bundle; import andr…

layui提示框没有渲染bug解决

bug&#xff1a;使用layui时或许是依赖导入又或是ideal和浏览器缓存问题导致前面明明正常的页面显示&#xff0c;后面出现提示框没有css样式&#xff0c;弹出框没有背景css 效果如下 解决后 解决方法 在你的代码中引入layer.js 我这是jsp页面 <script type"text/jav…

Unity求向量A在平面L上的投影向量

如题&#xff1a;求向量A在平面L上的投影向量(图左) 即求 其实等价于求向量&#xff0c;那在中&#xff0c;,所以只需要求即可 而就是在平面L的法向量的投影坐标&#xff0c;所以代码就是 /// <summary>/// 求向量A在平面B上的投影向量/// </summary>/// <para…