springboot集成MQTT实现消息接收

MQTT介绍

简单来说MQTT是一种协议,用来解决物联网之间的数据传输,它功耗更低,稳定性也不错,现在很多物联网的设备都在使用mqtt。感兴趣可以查看详情中文介绍

SpringBoot集成Mqtt

  1. 引入pom文件
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
  1. 编写mqtt的配置类
publish:mqtt:host: tcp://你自己的mqtt地址:端口clientId: mqtt_iduserName: 你的账号名password: 你的密码# 这里表示会话不过期cleanSession: false# 配置一个默认的主题,加载时不会用到,只能在需要时手动提取defaultTopic: devopstimeout: 1000keepAliveInterval: 10#断线重连方式,自动重新连接与会话不过期配合使用会导致#断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我automaticReconnect: trueconnectionTimeout: 3000topic: topic_0# 最大链接数maxInFlight: 100topics: topic_1,topic_2,topic_3,topic_4
package com.cshf.receive.common;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;/***/
@Configuration
@ConfigurationProperties(MQTTConfig.PREFIX)
@Data
public class MQTTConfig {//配置的名称public static final String PREFIX = "publish.mqtt";private String host;private String clientId;private String username;private String password;private boolean cleanSession;private String defaultTopic;private int timeout;private int keepAliveInterval;private boolean automaticReconnect;private int connectionTimeout;private int maxInFlight;private String topic;private List<String> topics;}
  1. 编写连接mqtt的配置类
package com.cshf.receive.mqtt;import com.cshf.receive.common.MQTTConfig;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
@IntegrationComponentScan
@AllArgsConstructor
@Slf4j
public class MqttSenderConfig {private final MQTTConfig mqttConfig;private MqttMessageHandler messageHandler;@Beanpublic MqttConnectOptions getMqttConnectOptions() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setCleanSession(true);mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setKeepAliveInterval(90);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(mqttConfig.getUsername());mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getHost()});mqttConnectOptions.setKeepAliveInterval(2);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId(), mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttConfig.getTopic());messageHandler.setDefaultQos(1);messageHandler.setDefaultRetained(true);return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** 接收通道*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置client,监听的topic*/@Beanpublic MessageProducer inbound() {String[] topicsArr = mqttConfig.getTopics().toArray(new String[0]);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"_inbound", mqttClientFactory(), topicsArr);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** 通过通道获取数据*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return messageHandler;}
}
  1. 编写handler类
package com.cshf.receive.mqtt;//import com.cvdmp.domain.exception.ConditionException;
//import com.cvdmp.service.MqttDrugBoxDataService;
import com.cshf.core.exception.ServiceException;
import com.cshf.receive.service.NestDataService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Component
@Slf4j
@AllArgsConstructor
public class MqttMessageHandler implements MessageHandler {/***接收到的消息和topic*/@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();log.info("消息主题:{},内容:{}", topic, message.getPayload());}
}
  1. springboot发送消息给mqtt
package com.cshf.receive.mqtt;//import com.cvdmp.domain.exception.ConditionException;
//import com.cvdmp.service.MqttDrugBoxDataService;
import com.cshf.core.exception.ServiceException;
import com.cshf.receive.service.NestDataService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;/*** @Program iot-platform* @ClassName MqttMessageHandler* @Author 代志华* @Date 2021/10/18 13:16* @Description: mqtt处理类*/
@Component
@Slf4j
@AllArgsConstructor
public class MqttMessageHandler implements MessageHandler {private final NestDataService nestDataService;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();log.debug("消息主题:{},内容:{}", topic, message.getPayload());nestDataService.handleMqttData(topic,message.getPayload().toString());}
}
package com.cvdmp.service;import java.util.Date;import com.cvdmp.mqtt.MqttGateway;
import com.cvdmp.service.util.DateUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import com.alibaba.fastjson.JSONObject;@Service
@Slf4j
@AllArgsConstructor
public class MqttCommandService{private final MqttGateway mqttGateway;public void sendCommondToMqtt(JSONObject jsonObject) {String content = jsonObject.getString("content");String topic = jsonObject.getString("topic");//log.info("sendCommondToMqtt  -----------------------");mqttGateway.sendToMqtt(content,topic);log.info("发送mqtt指令成功,topic:{} ,content:{}",topic,content);}public static void main(String[] args) {String date = "2024-06-26 17:46:00";Date time = DateUtil.paseHour(date);String recomander = time.getTime()/1000+"-"+(time.getTime()+5*60*1000)/1000+"-"+(time.getTime()+10*60*1000)/1000;System.out.println(recomander);Date ddd = new Date(1634629566000L);System.out.println(DateUtil.dateToStr(ddd,"yyyy-MM-dd HH:mm:ss"));}
}

总结

总结下来就这几步

  1. 导入依赖
  2. 配置mqtt连接参数(要订阅的topic)
  3. mqtt连接
  4. handle服务获取topic的信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/873312.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xhdra的使用记录

XHydra是一个基于Hydra的分布式密码破解工具&#xff0c;用于进行暴力破解攻击。以下是在Kali Linux中使用XHydra的完整步骤和命令&#xff0c;以及一个示例&#xff1a; 安装XHydra&#xff1a; 在Kali Linux中&#xff0c;XHydra通常已经预装在系统中。如果没有安装&#xff…

Python3.4.4 32位

Python 3.4.4是Python编程语言的一个稳定版本&#xff0c;发布于2015年&#xff0c;主要针对32位操作系统设计。对于那些仍然运行Windows XP的用户来说&#xff0c;这是他们可以安装的最高版本的Python&#xff0c;因为从Python 3.5开始&#xff0c;官方停止了对Windows XP的支…

L298N的输出电流与电压

一、L298N的电流输出范围 L298N的输出电流为2A&#xff0c;瞬间峰值电流可达4A。 L298N是一款意法半导体生产的双路全桥式电机驱动芯片&#xff0c;广泛应用于各种电子和机械控制项目中&#xff0c;如驱动继电器、电磁阀、直流电机和步进电机等。其输出电流之所以重要&#x…

Jenkins及其相关插件的具体流程

目录 一、安装Jenkins二、配置Jenkins三、创建项目并配置构建任务四、运行和监控构建任务五、维护和优化 一、安装Jenkins 下载Jenkins安装包&#xff1a; 访问Jenkins官方网站&#xff08;https://www.jenkins.io/&#xff09;下载页面&#xff0c;选择合适的安装包。对于Linu…

Apache BookKeeper 一致性协议解析

导语 Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案&#xff0c;支持多租户、低延时、读写分离、跨地域复制&#xff08;GEO replication&#xff09;、快速扩容、灵活容错等特性。Pulsar 存储层依托于 BookKeeper 组件&#xff0c;所以本文简单探讨一下 BookK…

Six common classification algorithms in machine learning

分类算法是一种机器学习算法&#xff0c;其主要目的是从数据中发现规律并将数据分成不同的类别。分类算法通过对已知类别训练集的计算和分析&#xff0c;从中发现类别规则并预测新数据的类别。常见的分类算法包括决策树、朴素贝叶斯、逻辑回归、K-最近邻、支持向量机等。分类算…

浅谈:网络协议及网络连接

事情的起因 怪有意思的。&#xff08;纯纯唠嗑&#xff0c;不感兴趣的可以跳过&#xff09; 我们初中&#xff0c;在学期的最后一天换教室&#xff0c;由于我们是十三班&#xff0c;是年级里面的一个“例外”。因为我们其他年级都是12个和10个班级&#xff0c;就我们一个奇数…

【Mysql关于读已提交和可重复读(Read Committed)隔离级别下解决幻读的方案】

目录 读已提交&#xff08;Read Committed&#xff09;隔离级别 解决幻读问题的方法 总结 可重复读&#xff08;Read Committed&#xff09;隔离级别 幻读问题 MVCC机制 解决幻读 数据库支持 示例 注意 读已提交&#xff08;Read Committed&#xff09;隔离级别 在“…

DDei在线设计器-HTML渲染

Html渲染 HtmlViewer插件通过将一个外部DIV附着在图形控件上&#xff0c;从而改变原有图形的显示方式。允许使用者自己定义HTML通过HTML元素。本示例演示了通过Html来扩展渲染图形&#xff0c;从而获得更加丰富的图形展现。 通常情况下&#xff0c;我们创建的图形控件&#xff…

springboot的简单应用

Mvc与三层架构 创建Spring项目 勾选web和mabais框架 配置yml文件&#xff0c;这里创建spring项目默认生成的是propertise文件&#xff0c;但是properties文件的格式看起来没有yml文件一目了然。yml文件配置数据库还有映射mapper层的xml文件以及设置日志级别&#xff0c;比如map…

ESC(ELectronic Stability Control,电子稳定控制系统)

ESC通过实时监测车辆的动态参数&#xff0c;以及车辆轮胎的实际运动状态&#xff0c;通过调节车辆制动系统和发动机输出力&#xff0c;使车辆在紧急或危险情况下保持稳定&#xff0c;防止侧滑和失控。 ESC组成部分 传感器&#xff1a;用于检测车辆的动态参数&#xff0c;如车…

去中心化技术的变革力量:探索Web3的潜力

随着区块链技术的发展和应用&#xff0c;去中心化技术正成为数字世界中的一股强大变革力量。Web3作为去中心化应用的新兴范式&#xff0c;正在重新定义人们对于数据、互联网和价值交换的认知。本文将探索去中心化技术的基本概念、Web3的核心特征及其潜力应用&#xff0c;展示其…

WebRTC音视频-环境搭建

目录 期望效果 1:虚拟机和系统安装 2:WebRTC客户端环境搭建 2.1&#xff1a;VScode安装 2.2&#xff1a;MobaXterm安装 3:WebRTC服务器环境搭建 3.1&#xff1a;安装openssh服务器 3.2&#xff1a;安装Node.js 3.3&#xff1a;coturn穿透和转发服务器 3.3.1&a…

Docker 镜像使用和安装

​ 1、简介 Docker是一个开源的应用容器引擎&#xff1b;是一个轻量级容器技术&#xff1b; Docker支持将软件编译成一个镜像&#xff1b;然后在镜像中各种软件做好配置&#xff0c;将镜像发布出去&#xff0c;其他使用者可以直接使用这个镜像&#xff1b; 运行中的这个镜像…

vue2 使用代码编辑器插件 vue-codemirror

vue 使用代码编辑器插件 vue-codemirror 之前用过一次&#xff0c;当时用的一知半解的&#xff0c;所以也没有成文&#xff0c;前几天又因为项目有需求&#xff0c;所以说有用了一次&#xff0c;当然&#xff0c;依旧是一知半解&#xff0c;但是还是稍微写一下子吧&#xff01;…

双非一本嵌入式方向怎么学?

双非一本&#xff08;非“985”和“211”工程重点建设的本科院校&#xff09;的学生在学习嵌入式方向时&#xff0c;可以通过以下步骤和策略来系统地学习和提升自己。我收集归类了一份嵌入式学习包&#xff0c;对于新手而言简直不要太棒&#xff0c;里面包括了新手各个时期的学…

函数式编程面试题1:什么是合格的函数和有形的函数

目录 面试官:什么是合格的函数和有形的函数合格的函数(Well-Formed Function)有形的函数(Shaped Function)示例代码示例代码:Lambda表达式作为合格的函数该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键…

【学习笔记】无人机系统(UAS)的连接、识别和跟踪(二)-定义和缩写

引言 3GPP TS 23.256 技术规范&#xff0c;主要定义了3GPP系统对无人机&#xff08;UAV&#xff09;的连接性、身份识别、跟踪及A2X&#xff08;Aircraft-to-Everything&#xff09;服务的支持。 3GPP TS 23.256 技术规范&#xff1a; 【免费】3GPPTS23.256技术报告-无人机系…

前端JS特效第46集:js-实现响应式节庆活动砸金蛋效果

js-实现响应式节庆活动砸金蛋效果&#xff0c;先来看看效果&#xff1a; 部分核心的代码如下(全部代码在文章末尾)&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>响应式节庆活动砸金蛋…

Diffusion【1】:SDSeg——基于Stable Diffusion的单步扩散分割!

文章目录 前言AbstractIntroductionMethodsLatent EstimationConcatenate Latent FusionTrainable Vision Encoder ExperimentDatasets and Evaluation MetricsImplementation DetailsExperimental SettingsInference Stage Main ResultsComparison with State-of-the-ArtsComp…