RuoYi-Vue分离版集成MQTT客户端,超详细版

一:前言

1、使用场景:最近对接物联网开发需要集成MQTT进行消息订阅

2、前置准备:首先需要下载部署好MQTT服务(推荐emqx),在我上一条博客里面有详细步骤,官方文档部署的也非常详细(因为网站在国外所以访问比较慢,可以参考文档)

链接: Download EMQX

二:代码

我们将代码全部集成到common模块下面

1、第一步导入依赖,在ruoyi-common模块下的pom文件导入依赖

        <!--mqtt依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、第二步在ruoyi-admin模块中找到application.yml文件中添加配置

  # mqtt
  mqtt:
    username: admin # 用户名
    password: public # 密码
    hostUrl: tcp://IP:1883 # tcp://ip:端口
    #    clientId: mqttx_fdb83555 # 客户端id,线下
    clientId: mqttx_fdb83518 # 客户端id,线上
    defaultTopic: lob,test # 订阅主题
    timeout: 100 # 超时时间 (单位:秒)
    keepalive: 60 # 心跳 (单位:秒)
    enabled: true # 是否使用mqtt功能

3、第三步需要在ruoyi-common模块下src\main\java\com\superVisualization\common\utils目录下新建MqttConfig.java文件和mqtt文件夹添加MqttPushClient.java、PushCallback.java文件

MqttConfig.java

package com.ruoyi.common.utils;
 
 
import com.ruoyi.common.utils.mqtt.MqttPushClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
 
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;
 
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接地址
     */
    private String hostUrl;
    /**
     * 客户Id
     */
    private String clientId;
    /**
     * 默认连接话题
     */
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 保持连接数
     */
    private int keepalive;
    /**
     * mqtt功能使能
     */
    private boolean enabled;
 
    public String getUsername() {
        return username;
    }
 
    public void setUsername(String username) {
        this.username = username;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
 
    public String getHostUrl() {
        return hostUrl;
    }
 
    public void setHostUrl(String hostUrl) {
        this.hostUrl = hostUrl;
    }
 
    public String getClientId() {
        return clientId;
    }
 
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
 
    public String getDefaultTopic() {
        return defaultTopic;
    }
 
    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }
 
    public int getTimeout() {
        return timeout;
    }
 
    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }
 
    public int getKeepalive() {
        return keepalive;
    }
 
    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }
 
    public boolean isEnabled() {
        return enabled;
    }
 
    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }
 
    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
            //连接
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
            for(int i=0; i<mqtt_topic.length; i++){
                //订阅主题
                mqttPushClient.subscribe(mqtt_topic[i], 1);
            }
        }
        return mqttPushClient;
    }
}

MqttPushClient.java、PushCallback.java

package com.ruoyi.common.utils.mqtt;
 

import com.ruoyi.common.core.domain.AjaxResult;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

 
@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
 
    @Autowired
    private PushCallback pushCallback;
 
    private static MqttClient client;
 
    private static MqttClient getClient() {
        return client;
    }
 
    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }
 
    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            return AjaxResult.success();
        } catch (MqttPersistenceException e)  {
            e.printStackTrace();
            return AjaxResult.error();
        } catch (MqttException e) {
            e.printStackTrace();
            return AjaxResult.error();
        }
    }
 
    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("开始订阅主题" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
}
 
package com.ruoyi.common.utils.mqtt;
 
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.utils.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
 
    @Autowired
    private MqttConfig mqttConfig;
 
    private static MqttClient client;
 
    private static String _topic;
    private static String _qos;
    private static String _msg;
 
    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        logger.info("连接断开,可以做重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
 
        _topic = topic;
        _qos = mqttMessage.getQos()+"";
        _msg = new String(mqttMessage.getPayload());
    }

    /**
     * 参数回调的方法
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }
 
    //别的Controller层会调用这个方法来  获取  接收到的硬件数据
    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }
 
}

4、第四步修改emqx服务的白名单,去已经下载的emqx文件夹中打开到etc文件夹,打开acl.conf配置文件,修改里面的白名单

改完保存,重启emqx服务:命令: emqx restart

到这一步代码基本是已经集成完毕了

三:测试连接

1、打开浏览器,输入:http://ip:18083

18083端口是浏览器默认端口

1883是TCP协议端口

我是在本机启动的emqx服务所以用的是localhost,如果是线上的可以用线上的IP;

然后登录,账号:admin 密码:public

2、进入emqx服务之后连接websocket客户端

注意:端口号也要与项目配置的一样

全部连接之后就可以互相订阅了,在MqttPushClient.java文件中也有发送订阅的方法就留给你们自己测试了

若有不足伙计们请见谅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/257667.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【QT】C++/Qt使用Qt自带工具windeployqt打包

基本操作 运行项目debug或者release 将运行后的可执行文件单独放到一个文件夹中 根据项目使用的kits来选择Qt的打包工具 打开工具后移动到exe文件夹下执行windeployqt xxx.exe 预览图 问题 打包后再其他电脑上运行出现下图错误 将自己电脑的这个文件拷到可执行文件夹中既…

一文读懂什么是智能工厂?

引言 在当今快速变革的制造业中&#xff0c;智能工厂如一盏明灯&#xff0c;照亮着未来生产的道路。它们不仅代表着技术的进步&#xff0c;更是制造业向前迈进的里程碑。智能工厂利用先进的技术和创新方法&#xff0c;将传统工厂转化为高度自动化、数字化和智能化的生产中心。…

SpringMVC01

SpringMVC 1. 学习⽬标2. 什么叫MVC&#xff1f;3. SpringMVC 框架概念与特点4. SpringMVC 请求流程5. Spring MVC 环境搭建6. URL 地址映射配置7. 参数绑定8. JSON 数据开发JSON普通数组步骤1:pom.xml添加依赖步骤2&#xff1a; 修改配置⽂件步骤3. 注解使⽤ 1. 学习⽬标 2. 什…

LeetCode Hot100 79.单词搜索

题目&#xff1a; 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻”单元格是那…

[C++] 多态(下) -- 多态原理 -- 动静态绑定

文章目录 1、多态原理2、动态绑定和静态绑定3、单继承和多继承关系的虚函数表3.1 单继承中的虚函数表5.2 多继承中的虚函数表 上一篇文章我们了解了虚函数表&#xff0c;虚函数表指针&#xff0c;本篇文章我们来了解多态的底层原理&#xff0c;更好的理解多态的机制。 [C] 多态…

文件操作(下)

标题的顺序是接着之前写的&#xff0c;希望这篇博客对你有帮助 七. 随机读写函数 实际上&#xff0c;无论是读还是写&#xff0c;在一次调用顺序读写函数&#xff0c;文件指针会移到已经读过或者写过的下一个位置&#xff0c;从那个位置开始下一次读和写&#xff08;在文件没有…

Jenkins插件安装失败时这么做就搞定啦!

1.网络或墙的问题导致插件下载安装失败 这种错误提示很明显&#xff0c;就是无法连接到插件下载地址&#xff0c;导致插件下载失败。 解决方法 为Jenkins更换源 点击Jenkins主页面左侧列表中【系统管理】—— 下拉找到【管理插件】 选择【高级】选项卡 替换最下方【升级站点…

JS基础之模块化

JS基础之模块化 JS模块化模块化前端发展 什么是模块&#xff1f;怎么定义模块化IIFE匿名函数自调用IIFE模式增强模块化的好处 JS模块化 模块化 JS DOM操作 代码规范管理的标准 不同模块间的管理模块内部自组织 标准bundler (模块构建工具) ESNext TS -> ES5 前端发展 生态 …

第二百一十六回 分享一种更新页面数据的方法

文章目录 1. 概念介绍2. 实现方法2.1 实现思路2.2 实现方法3. 示例代码4. 内容总结我们在上一章回中介绍了"如何创建单例模式"相关的内容,本章回中将 分享一种更新页面数据的方法.闲话休提,让我们一起Talk Flutter吧。 1. 概念介绍 我们在本章回中介绍一种更新页…

【Maven-Helper】利用 Maven-Helper 解决依赖冲突问题

【Maven-Helper】利用 Maven-Helper 解决依赖冲突问题 1&#xff09;安装 Maven-Helper 插件2&#xff09;Maven Helper 插件使用方法3&#xff09;Idea-Maven 可视化依赖树 1&#xff09;安装 Maven-Helper 插件 这里我们已经安装过了&#xff0c;如果没有安装过&#xff0c;点…

【JUC】三十一、AQS源码

&#x1f4d5;前置笔记&#xff1a;【AQS核心概念与核心类】 文章目录 1、ReentrantLock与AQS类的联系2、lock方法3、acquire方法4、源码分析Demo背景案例5、tryAcquire方法6、addWaiter方法7、acquireQueued方法8、unlock方法9、cancelAcquire方法 AQS是JUC的基石&#xff0c;…

智能优化算法应用:基于寄生捕食算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于寄生捕食算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于寄生捕食算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.寄生捕食算法4.实验参数设定5.算法结果6.…

数字化技术助力英语习得 iEnglish成智慧化学习新选择

日前,美剧《老友记》中钱德勒的扮演者马修派瑞去世的消息引发不少人的回忆杀。《老友记》官方发文悼念马修派瑞:“对于马修派瑞去世的消息,我们深感悲痛,他是给我们所有人的真正礼物,我们的心和他的家人、爱人、所有的粉丝在一起。” 作为不少国人刷剧学习英语的首选,《老友记…

C语言-数组指针笔试题讲解(1)-干货满满!!!

文章目录 ▶️1.sizeof和strlen的对比&#x1f4af;➡️1.1 sizeof是什么&#xff1f;&#x1f4af;➡️1.2sizeof用法举例&#x1f4af;▶️1.3strlen是什么&#xff1f;&#x1f4af;▶️1.4 strlen函数用法举例&#xff1a;&#x1f4af;▶️1.5 strlen和sizeof的对比&#…

优化问题笔记(2)

目录 3. 约束优化问题的全局解3.1 凸优化问题3.2 二次优化问题3.3 无约束二次优化问题3.4 一个典型的二次等式约束二次优化问题 Reference 3. 约束优化问题的全局解 3.1 凸优化问题 局部解成为全局解的一类重要的优化问题是所谓凸优化问题. 我们称优化问题 ( f , D ) (f,\ma…

性能测试之全链路压测流量模型你了解多少

前言 现在全链路越来越火&#xff0c;各大厂商也纷纷推出了自己的全链路压测测试方案。特别是针对全链路压测流量模型&#xff0c;各家方案都有所不同。最近我看了一些这方面的资料&#xff0c;有一些感悟。分享给大家。 全链路压测流量模型的梳理呢&#xff0c;这里就先不讲…

Gemini Pro API 详细申请步骤

Gemini Pro API 详细申请步骤 什么是 Gemini ? 上周&#xff0c;谷歌发布了 Gemini&#xff08;双子座&#xff09;&#xff0c;它是谷歌最新、最强大的人工智能模型&#xff0c;旨在迎头痛击 OpenAI 的 GPT。Gemini 在构建时考虑到了多模态性&#xff0c;这意味着它能够理解…

【日常总结】连接Mysql,打开数据表非常慢

问题 Navicat 连接mysql时&#xff0c;第二次打开非常慢 原因 Mysql服务器端会定时清理长时间不活跃空闲的数据库连接&#xff0c;以此优化数据库的性能。 解决方案 数据库右键---编辑连接--高级---保持连接间隔30秒 带来的问题 每次打开Navicat时&#xff0c;设置设置自动…

2023-南京荣耀Honor最新探访-OPEN DAY,实况记录!

前言 金九银十的秋招季缓缓落幕&#xff01; 接完offer&#xff0c;博主也回归啦&#xff01;有幸带着公开公平的心态&#xff0c;给大家分享一波南京荣耀总部的实况解密&#xff01; 最基础的环境状况&#xff0c;有图有真相&#xff01;~上图~ 民以食为天-南京荣耀食堂&…

issue阶段的选择电路的实现

1-of-M的仲裁电路 为什么要实现oldest-first 功能的仲裁呢&#xff1f; 这是考虑到越是旧的指令&#xff0c;和它存在相关性的指令也就越多&#xff0c;因此优先执行最旧的指令&#xff0c;则可以唤醒更多的指令&#xff0c;能够有效地提高处理器执行指令的并行度,而且最旧的指…
最新文章