springboot整合mqtt实现消息订阅和推送

前言

mica-mqtt-client-spring-boot-starter是一个基于Spring Boot的MQTT客户端启动器,它集成了mica-mqtt客户端,提供了在Spring Boot应用程序中使用MQTT协议进行消息通信的能力。以下是关于mica-mqtt-client-spring-boot-starter的简介:

特点:

  • 简单易用:通过Spring Boot的自动配置,可以轻松地集成到Spring应用程序中,并使用Spring的注解或Java配置进行MQTT客户端的配置。

  • 低延迟:支持MQTT协议,能够实现实时消息通信,具有较低的延迟。

  • 高性能:基于mica-mqtt客户端,具有高效的消息处理和网络通信能力,能够处理大量的并发连接和消息。

  • 集群支持:支持基于Redis的发布/订阅模式的集群,可以实现多个节点之间的消息同步和负载均衡。

  • 使用场景:适用于需要使用MQTT协议进行消息通信的物联网、实时应用、移动应用等领域。可以在云端或边缘端使用,实现设备与设备之间、设备与服务器之间的消息通信。

  • 集成方式:通过在Spring Boot项目中添加相关依赖,并配置MQTT客户端的相关参数,即可快速集成mica-mqtt-client-spring-boot-starter。具体的使用方法可以参考官方文档和示例代码。

  • 注意事项:在使用过程中需要注意确保网络连接的稳定性和安全性,并根据实际需求进行适当的配置和优化。同时,也需要关注数据安全和隐私保护等方面的问题。

总之,mica-mqtt-client-spring-boot-starter是一个方便、高效、可靠的MQTT客户端启动器,适用于需要使用MQTT协议进行消息通信的Spring Boot应用程序。

功能

  • 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。

  • 支持 websocket mqtt 子协议(支持 mqtt.js)。

  • 支持 http rest api,http api 文档详见[1]。

  • 支持 MQTT client 客户端。

  • 支持 MQTT server 服务端。

  • 支持 MQTT client、server 共享订阅支持(捐助VIP版采用 topic 树存储,跟 topic 数无关,百万 topic 性能依旧)。

  • 支持 MQTT 遗嘱消息。

  • 支持 MQTT 保留消息。

  • 支持自定义消息(mq)处理转发实现集群。

  • MQTT 客户端 阿里云 mqtt 连接 demo。

  • 支持 GraalVM 编译成本机可执行程序。

  • 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。

  • mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。

  • 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块[2]

教程

添加依赖

在springboot项目中添加maven依赖:

        <!-- https://mvnrepository.com/artifact/net.dreamlu/mica-mqtt-client-spring-boot-starter -->
        <dependency>
            <groupId>net.dreamlu</groupId>
            <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
            <version>2.2.8</version>
        </dependency>

配置参数

在spring配置文件中配置mqtt相关参数,配置如下:

mqtt:
  server:    
    enabled: false              # 是否开启服务端,默认:false
  client:
    enabled: true               # 是否开启客户端,默认:false
    ip: 172.16.10.203   # 连接的服务端 ip ,默认:127.0.0.1
    port: 1883                  # 端口:默认:1883
    name: Mica2-Mqtt2-Client      # 名称,默认:Mica-Mqtt-Client
    clientId: coalface_safety_3d            # 客户端Id(非常重要,一般为设备 sn,不可重复)
    user-name: admin           # 认证的用户名 你的用户名
    password: 3@!cHy@j       # 认证的密码
    timeout: 5                  # 连接超时时间,单位:秒,默认:5秒
    reconnect: true             # 是否重连,默认:true
    re-interval: 5000           # 重连时间,默认 5000 毫秒
    version: MQTT_3_1           # mqtt 协议版本,默认:3.1.1
    read-buffer-size: 8092      # 接收数据的 buffer size,默认:8092
    max-bytes-in-message: 8092  # 消息解析最大 bytes 长度,默认:8092
    buffer-allocator: heap      # 堆内存和堆外内存,默认:堆内存
    keep-alive-secs: 60         # keep-alive 心跳维持时间,单位:秒
    clean-session: false         # mqtt clean session,默认:true
    will-message:                # 消息遗嘱
      qos: at_least_once
    ssl:
    enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证
    keystore-path:            # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
    keystore-pass:            # 可选参数:ssl 双向认证 keystore 密码
    truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
    truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码
  • 注意:ssl 存在三种情况
服务端开启ssl客户端
ClientAuth 为 NONE(不需要客户端验证)仅仅需要开启 ssl 即可不用配置证书
ClientAuth 为 OPTIONAL(与客户端协商)需开启 ssl 并且配置 truststore 证书
ClientAuth 为 REQUIRE (必须的客户端验证)需开启 ssl 并且配置 truststore、 keystore证书

创建订阅

创建一个mqtt订阅消息监听类,例如SimulationSubscriber,代码如下:


import com.alibaba.fastjson.JSONObject;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.tio.utils.buffer.ByteBufferUtil;

/**
 * @author tarzan
 */
@Component
@Slf4j
public class SimulationSubscriber {

    @MqttClientSubscribe("tuoyuan/publish/zj/#")
    public void zjOne(String topic, byte[] payload){
        String[] strs=topic.split("/");
        String ID=strs[strs.length-1];
        log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID);
    }
    
	 @MqttClientSubscribe("/sys/${deviceName}/thing/sub/register")
	  public void thingSubRegister(String topic, byte[] payload) {
	    // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
	    // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
	    logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
	  }

    @MqttClientSubscribe("/tianma/publish/cmj")
    public void cmj(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }

    @MqttClientSubscribe("/tianma/publish/zj")
    public void zj(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }

    @MqttClientSubscribe("/tianma/publish/gbj")
    public void gbj(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }

    @MqttClientSubscribe("/tianma/publish/ltl")
    public void ltl(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }

    @MqttClientSubscribe("/tianma/publish/ntl")
    public void ntl(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }

    @MqttClientSubscribe("/tianma/publish/ccl")
    public void ccl(@Header("topic") String topic,@Payload byte[] payload) {
        System.out.println("*****************gc**************************************"+topic);
        JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
        //业务的处理
        System.out.println("*****************test**************************************"+jsonObject);
    }



}

  • @Header(“topic”) 和@Payload 注解可以省略

  • tuoyuan/publish/zj/# 中的# 是通配符

    • 在MQTT协议中,#是一个通配符,代表匹配该主题的所有子主题。例如,如果你订阅了主题sports/baseball/#,那么你将接收到所有以sports/baseball/开头的主题的消息。

    • 请注意,通配符#只能用于多层的主题名称中,并且只能用于最后一个级别。例如,sports/baseball/#是有效的,但#sports/baseball或sports/#/baseball都是无效的。

    • 除了#之外,MQTT协议还支持一个单层通配符+,它代表只匹配该级别的主题。例如,如果你订阅了主题sports/baseball/+,那么你将只接收到以sports/baseball/开头,且后面跟着至少一个字符的主题的消息。

    • 请注意,使用通配符时需要谨慎,因为它们可能会匹配到意外的主题。确保你的订阅主题明确,并且只匹配你感兴趣的主题。

  • /sys/${deviceName}/thing/sub/register

    • 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
    • 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。

创建发布

创建一个mqtt消息发布接口类,例如 MqttTestController,代码如下:


import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.springblade.core.secure.annotation.NoToken;
import org.springblade.core.tool.api.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

/**
 * @author tarzan
 */
@RestController
@Api(tags = "mqtt测试")
@NoToken
@RequestMapping("/mqtt")
@AllArgsConstructor
@Slf4j
public class MqttTestController {

    private final MqttClientTemplate mqttClientTemplate;

    @ApiOperation(value = "消息发送")
    @PostMapping("/publish")
    private R<Boolean> publish(String topic, String msg) {
        return R.status(mqttClientTemplate.publish(topic, msg.getBytes(StandardCharsets.UTF_8)));
    }


}

接口测试

接口调用
在这里插入图片描述
控制台输出
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/359982.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Prometheus】Prometheus的PromQL语句

Prometheus promQL的语法&#xff1a; #时间序列 node_cpu_guest_seconds_total{cpu"0"} 监控&#xff08;指标数据&#xff09; {标签} node使用CPU的描述的统计&#xff0c;符合标签CPU0的时间序列的查询结果 指标标签生成时间序列 标签&#xff1a; __address…

DeepSORT算法实现车辆和行人跟踪计数和是否道路违规检测(代码+教程)

DeepSORT算法是一种用于目标跟踪的算法&#xff0c;它可以对车辆和行人进行跟踪计数&#xff0c;并且可以检测是否存在道路违规行为。该算法采用深度学习技术来提取特征&#xff0c;并使用卡尔曼滤波器来估计物体的速度和位置。 DeepSORT算法通过首先使用目标检测算法来识别出…

基于Kubernetes的微服务架构,你学废了吗?

至于服务网关&#xff0c;虽然保留了 Zuul&#xff0c;但没有采用 Kubernetes 的 Ingress 来替代。这里有两个主要考虑因素&#xff1a;首先&#xff0c;Ingress Controller 并非 Kubernetes 的内置组件&#xff0c;有多种可选方案&#xff08;例如 KONG、Nginx、Haproxy 等&am…

目标检测算法训练数据准备——Penn-Fudan数据集预处理实例说明(附代码)

目录 0. 前言 1. Penn-Fudan数据集介绍 2. Penn-Fudan数据集预处理过程 3. 结果展示 4. 完整代码 0. 前言 按照国际惯例&#xff0c;首先声明&#xff1a;本文只是我自己学习的理解&#xff0c;虽然参考了他人的宝贵见解及成果&#xff0c;但是内容可能存在不准确的地方。如…

Springboot项目启动后浏览器不能直接访问接口,而postman可以访问?

在云服务器上部署springboot后端时&#xff0c;项目启动后浏览器不能直接访问接口,而postman可以访问。这是当时困扰了我大半天的小问题&#xff0c;在我打开防火墙和阿里云安全组之后还是没解决。然后在网上搜了很多很多资料&#xff0c;以为是浏览器访问权限或者是https什么证…

微信公众号数量达到上限怎么办

一般可以申请多少个公众号&#xff1f;许多用户在申请公众号时可能会遇到“公众号显示主体已达上限”的问题。这是因为在2018年11月16日对公众号申请数量进行了调整&#xff0c;具体调整如下&#xff1a;1、个人主体申请公众号数量上限从2个调整为1个。2、企业主体申请公众号数…

Mac删除自带的ABC输入法,简单快捷

一、下载PlistEdit Pro软件 二、终端执行 sudo open ~/Library/Preferences/com.apple.HIToolbox.plist 三、其中有一个数字下面的KeyboardLayout Name的value为“ABC”&#xff0c;这就是ABC输入法&#xff0c;点击上面的Delete按钮&#xff0c;删除整项ABC内容&#xff0c…

【计算机毕业设计】128电脑配件销售系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

public class和class的区别

不用public修饰的类 一个Java源文件中可以定义多个不用public修饰的class&#xff0c;且类名不用和java源文件名一致。public修饰的类可以没有。编译之后&#xff0c;一个class就会对应生成一个class字节码文件 对于用public修饰的类 如果一个类用了public修饰&#xff0c;那…

搞懂Nginx的.conf文件路径配置

详解server中各部分作用及如何配置 如下图所示&#xff0c;这是我配置好的一个server代码块&#xff0c;我这里配置了https&#xff0c;所以会比默认的多一部分内容&#xff0c;如果你只需要配置http&#xff0c;则只需关注红色方框的部分即可&#xff0c;下面会按顺序讲解。 ①…

C++进阶--继承

概念 继承&#xff0c;允许一个类&#xff08;称为子类或派生类&#xff09;从另一个类&#xff08;称为父类或基类&#xff09;继承属性和方法。 继承的主要目的是实现代码的重用和构建类之间的层次关系。通过继承&#xff0c;子类可以获得父类的特性&#xff0c;包括数据成员…

海外短剧系统国际短剧源码h5多语言版app挂载tiktok油管ins

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目 前言 一、海外短剧系统是什么&#xff1f; 二、海外短剧系统功能与运营方式介绍 1.系统功能 2.短剧APP运营方式 总结 前言 本文简单介绍海外短剧系统的功能&#xff…

【已解决】Ubuntu64位无法运行32位ELF文件的问题

问题起因 因为在做一道逆向题&#xff0c;发现题目给的文件是32位elf文件&#xff0c;所以想在Linux下执行一下&#xff0c;然后发现会报错。 于是查了一下资料&#xff0c;发现报错的原因是64位的Ubuntu无法直接运行32位的程序&#xff0c;需要下载兼容32位的库。 解决方法…

JUC并发编程10——ThreadLocal

目录 1. ThreadLocal是什么&#xff1f; 2. ThreadLocal怎么用&#xff1f; 3. ThreadLocal源码分析 3.1set方法 3.2get()方法 3.3remove()方法 4.为什么key使用弱引用&#xff1f; 5.ThreadLocalMap 和 HashMap 区别 6.ThreadLocal变量不具有传递性 7.InheritableTh…

Android 跳转应用设置/热点界面或等常用操作

Android 跳转应用设置/热点界面或等常用操作 https://www.jianshu.com/p/ba7164126690 android学习进阶——Setting https://blog.csdn.net/csdn_wanziooo/article/details/81980984 Android 7.1 以太网反射 EthernetManager 配置 DHCP、静态 IP https://codeleading.com/art…

防火墙综合拓扑(NAT、双机热备)

实验需求 拓扑 实验注意点&#xff1a; 先配置双机热备&#xff0c;再来配置安全策略和NAT两台双机热备的防火墙的接口号必须一致如果其中一台防火墙有过配置&#xff0c;最好清空或重启&#xff0c;不然配置会同步失败两台防火墙同步完成后&#xff0c;可以直接在主状态防火墙…

MyBatis 的注解实现方法

MyBatis 的注解实现方法 MyBatis 的注解实现方法引入依赖添加配置创建表创建实体类创建mapper接口InsertDeleteSelectResults和ResultMap通过配置文件解决 UpdateOptions MyBatis 的注解实现方法 引入依赖 在springBoot项目中下载了EditStarters插件的,可以直接在配置文件处右…

JVM学习

1.Java虚拟机内部有哪些线程共享&#xff0c;那些线程隔离 程序计数器&#xff1a; 通过改变这个计数器的值来选取下一条需要执行的字节码命令 Java虚拟机栈&#xff1a; 栈&#xff0c;每个方法被执行时&#xff0c;Java虚拟机都会同步的创建一个栈帧用于存储局部变量表&…

Linux:进度条的创建

目录 使用工具的简单介绍&#xff1a; \r &#xff1a; fflush &#xff1a; 倒计时的创建&#xff1a; 倒计时的工作原理&#xff1a; 进度条的创建&#xff1a; 不同场景下、打印任意长度的进度条&#xff1a; main .c procbor.c 测试效果&#xff1a; 使用工具…
最新文章