MQTT

news/2024/10/11 13:28:42

安装

服务端

EMQX

客户端

MQTTX
image

Java集成SrpingBoot

pom.xml

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>

application.yml

mqtt:broker:# 设备MQTT应用配置url: tcp://your_server_ip:1883username: your_usernamepassword: your_password# 自定义约定client_idclientId: your_client_idtopics:# 默认监听default: your/default_topic# 自定义监听subscriptions:- your/topic1- your/topic2# application.properties配置如下
# MQTT 设备应用配置
# mqtt.broker.url=tcp://your_server_ip:1883
# mqtt.broker.username=your_username
# mqtt.broker.password=your_password
# mqtt.broker.clientId=terminalTcpServer
# MQTT 监听的默认主题和自定义订阅主题
# mqtt.topics.default=your/default_topic
# mqtt.topics.subscriptions=your/topic1,your/topic2

MqttConfig配置类

import com.baoer.terminaltcpserver.mqtt.utils.MqttUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.beans.factory.annotation.Autowired;@Configuration
public class MqttConfig {//    @Autowired
//    private MqttUtil mqttUtil;@Value("${mqtt.broker.url}")private String brokerUrl;@Value("${mqtt.broker.username}")private String username;@Value("${mqtt.broker.password}")private String password;@Value("${mqtt.broker.clientId}")private String clientId;@Value("${mqtt.topics.default}")private String defaultTopic;@Value("${mqtt.topics.subscriptions}")private String[] topics;// 配置MQTT客户端工厂@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());factory.setConnectionOptions(options);return factory;}// (接收消息:步骤一)定义一个消息通道,用于接收消息@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}// (接收消息:步骤二)配置消息驱动通道适配器,用于接收指定主题的消息@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), topics);adapter.setOutputChannel(mqttInputChannel());return adapter;}// (接收消息:步骤三)配置消息处理器,用于处理接收到的消息@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler(MqttUtil mqttUtil) {return message -> {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();String payload = message.getPayload().toString();System.out.println("接收到消息: 主题 = " + topic + ", 内容 = " + payload);// 发布接收到的消息事件mqttUtil.publishReceivedMessage(topic, payload);};}// 配置MQTT消息发送处理器@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler(clientId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic); // 设置默认主题return messageHandler;}// 定义一个消息通道,用于发送消息@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}

MqttUtil 工具类

import com.baoer.terminaltcpserver.mqtt.config.MqttMessageEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Component
public class MqttUtil {@Autowiredprivate MessageChannel mqttOutboundChannel;@Autowiredprivate ApplicationEventPublisher eventPublisher;/*** 发送消息到默认主题* @param payload 消息内容*/public void sendMessage(String payload) {mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());}/*** 发送消息到指定主题* @param topic 主题* @param payload 消息内容*/public void sendMessage(String topic, String payload) {mqttOutboundChannel.send(MessageBuilder.withPayload(payload).setHeader("mqtt_topic", topic).build());}/*** (接收消息:步骤四)发布接收到的MQTT消息事件* @param topic 主题* @param payload 消息内容*/public void publishReceivedMessage(String topic, String payload) {MqttMessageEvent event = new MqttMessageEvent(this, topic, payload);eventPublisher.publishEvent(event);}
}

MqttMessageEvent消息事件类

import org.springframework.context.ApplicationEvent;public class MqttMessageEvent extends ApplicationEvent {private final String topic;private final String payload;public MqttMessageEvent(Object source, String topic, String payload) {super(source);this.topic = topic;this.payload = payload;}public String getTopic() {return topic;}public String getPayload() {return payload;}
}

MqttMessageListener消息监听器

import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;@Component
public class MqttMessageListener {/*** (接收消息:步骤五)事件监听器中处理消息* 说明:此方法将接收到上方 MqttUtil.publishReceivedMessage() 发布的事件并处理这些消息** @param event 接收到的事件*/@EventListenerpublic void handleMqttMessage(MqttMessageEvent event) {String topic = event.getTopic();String payload = event.getPayload();System.out.println("处理接收到的消息: 主题 = " + topic + ", 内容 = " + payload);// 根据不同的主题进行不同的处理if ("your/topic1".equals(topic)) {handleTopic1Message(payload);} else if ("your/topic2".equals(topic)) {handleTopic2Message(payload);}}private void handleTopic1Message(String payload) {// 处理来自 topic1 的消息System.out.println("处理 topic1 消息: " + payload);}private void handleTopic2Message(String payload) {// 处理来自 topic2 的消息System.out.println("处理 topic2 消息: " + payload);}
}

测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttUtil mqttUtil;@RequestMapping("/send")@ResponseBodypublic String sendMessage() {mqttUtil.sendMessage("your/topic1", "Hello from Spring Boot");return "Message sent!";}
}

调用接口发送成功
image
使用MQTT X查看是否接受成功,并发送测试
image
查看EMQX Dashboard中MQTTX工具的连接
image

运行流程

1.在 件中配置MQTT服务器的URL、用户名、密码、client_id、默认主题和订阅的主题。
2.启动MQTT Broker。
3.确保硬件设备连接到MQTT Broker,并发布和订阅消息。
4.启动Spring Boot应用程序,确保它可以发送和接收MQTT消息。
5.在控制台中查看接收到的消息,并根据不同的主题进行处理(设备上行消息要用不同的topic主题发布)。

参考:
https://blog.csdn.net/weixin_43822632/article/details/141194093

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/70203.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

AI云平台怎么构建

构建AI云平台是一个复杂而系统的过程,涉及多个环节和技术栈。从准备工作到最终的部署运行,每一步都需要精心设计和实现。构建AI云平台是一个复杂而系统的过程,涉及多个环节和技术栈。从准备工作到最终的部署运行,每一步都需要精心设计和实现。下面,petacloud.ai小编将详细…

AI云平台建设意义

AI云平台,作为AI技术与云计算深度融合的产物,其建设不仅标志着技术创新的又一高峰,更蕴含着对社会经济发展、产业升级、创新生态构建等多方面的深远意义。AI云平台,作为AI技术与云计算深度融合的产物,其建设不仅标志着技术创新的又一高峰,更蕴含着对社会经济发展、产业升…

dash-plotly项目

dash-plotly项目的文件解压后如下:将项目放到同一个局域网内的一台linux服务器上运行,服务器在局域网内的ip为10.3.135.103一、将该项目在linux服务器上运行先把pycharm连接到linux服务器上,参考链接:https://www.cnblogs.com/kakafa/p/18405178配置本地目录和远程目录的映…

Scoop

安装先决条件PowerShell 最新版本或者 Windows PowerShell 5.1 Set-ExecutionPolicy -ExecutionPolicy RemoteSigned -Scope CurrentUserScoop 默认安装在C盘当中,我们想要更改安装路径输入以下命令irm get.scoop.sh -outfile install.ps1您可以将 scoop 安装到自定义目录,配…

数仓开发理论(二)数仓构建分层概念

数仓建模的好处好的数据仓库能够支持复杂数据分析和决策,能够提供高性能查询,能够做到数据的通用集成和保持数据的一致性,可以说得上是面向业务分析的数据库 数仓功能本质就是通过建模来达成对复杂业务的抽象,清晰准确完整的刻画业务场景,以便用户通过业务视角便捷的获取所…

【日记】有老师带真的会好很多(1080 字)

正文昨天练舞,可能是太激烈了,总感觉血液在往肉芽组织那边聚集,总给我下一秒就要血崩的感觉。今天没有了。今天开始尝试用左半边牙吃东西。有些勉强。可能再过几天会好一些。练到最后向他请教。感觉自己进步很慢,有许多困惑。何老师说出了我很多问题。最主要的一点:无意义…

github 上将 stable 合并到 master 分支步骤

本地仓库分支:origin 远端仓库分支:upstream 切到非 master 分支上,比如 dev# 本地操作 git branch -D master git fetch upstream master::master git checkout master # 这步是拉取远端 stable 到 master 上,可能会出错误 # fatal: Not possible to fast-forward, aborti…

不要慌,FastGPT 告诉我这是技术性调整,利好大 A!

一觉醒来,股市又变天了,到处一片哀嚎,我看了下前几天牛市的赚钱名单,咱们公众号的粉丝没有一个在里面,说实话很失望,希望大家多做些有意义的事情,而不是整天虚度光阴。一个个平时看着都挺厉害,也没赚到钱,我很失望。 你们什么时候才能起飞?我都替你们着急如果你对自己…