使用paho.mqtt.client实现MQTT Client连接EMQX Broker

目录

概述

1 认识paho.mqtt.client

2 实现MQTT Client

2.1 功能介绍

2.2 paho.mqtt.client库函数介绍

2.3 MQTT Client实现

2.3.1 创建项目

2.3.2 编写MQTT Client代码

2.3.3 Log工具源码

2.4 功能测试代码实现

2.4.1 功能介绍

2.4.2 代码实现

3 测试

3.1 EMQX上创建Client

3.2 运行UserMqttClient

3.3 使用MQTT.fx订阅消息


概述

本文主要介绍使用paho.mqtt.client库实现一个MQTT Client,并使其连接到EMQX物联网平台。其中包括在EMQX创建项目的方法,配置参数的步骤。还是用该MQTT Client发布数据至EMQX,并使用MQTT.fx订阅Topic。MQTT Client也能订阅MQTT.fx发布的Topic数据。

1 认识paho.mqtt.client

paho.mqtt.client是一个Python MQTT客户端库,它提供了与MQTT代理进行通信的功能。MQTT是一种轻量级的消息传递协议,通常用于物联网应用中的设备间通信。

使用paho.mqtt.client,可以创建一个MQTT客户端,并使用其提供的方法来连接到MQTT代理,发布和订阅主题,接收和处理消息。

登录网站可以了解paho.mqtt.client的相关内容。登录地址如下:

https://mqtt.org/software/

2 实现MQTT Client

2.1 功能介绍

使用paho.mqtt.client编写一个 MQTT客户端,可以实现,消息订阅,发布功能。主要功能如下:

1)和Broker相关的参数通过配置文件来实现,不能写死在代码里

2)实现publish topic和subscriber topic功能

3)实时打印publish log和subscriber log

4)使用专用的logging库来处理log数据

5)使用class的方式来实现软件功能

2.2 paho.mqtt.client库函数介绍

在使用paho.mqtt.client库函数编写代码之前,必须对它所提供的重要接口函数的用法有一个清晰的认识,这对后面的如何使用这些接口非常重要。paho.mqtt.client提供了需要参数可供使用,这里只介绍一些,编程中必须用到的接口。其他函数的功能和用法可参考官方文档。

1)connect ()

连接MQTT Broker函数,函数原型如下:

参数介绍:

参数名称功能介绍
hostMQTT Broker地址
port连接Broker的端口号
keeplive心跳包时间间隔

2)username_pw_set()

设置Client的用户名和对应的密码,函数原型如下:

参数介绍:

参数名称功能介绍
usernameClient的用户名
passwordClient的用户名对应的密码

3)loop_start()

调用该接口,启动MQTT Client工作线程,这时就可以进行publish或者subscrib 消息,函数原型如下:

4)loop_stop()

调用该接口,销毁MQTT Client工作线程,函数原型如下:

5)subscribe()

订阅消息接口,函数原型如下:

参数介绍:

参数名称功能介绍
topic订阅的主题,例如:subscribe("my/topic", 2)
qos消息的服务质量等级
options and properties这两个参数在MQTT v5.0的版本中使用,当前版本不使用这两个参数

6)unsubscribe()

取消订阅消息接口,函数原型如下:

参数介绍:

参数名称功能介绍
topic取消订阅的主题,例如:unsubscribe("my/topic")
properties这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数

7)publish()

发布消息接口,函数原型如下:

参数介绍:

参数名称功能介绍
topic发布消息的主题
payload消息内容
qos消息服务等级
retainBroker是否保留消息
properties这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数

2.3 MQTT Client实现

2.3.1 创建项目

本项目使用PyCharm 作为开发工具,运行代码前必须安装 paho.mqtt.client库。详细安装方法和步骤见官网文档。创建项目之后编写如下代码

2.3.2 编写MQTT Client代码

编写一个MQTTClient的user类,实现MQTT Client的基本功能,函数列表和介绍如下:

函数名描述
start注册回调函数和连接MQTT Client
stop断开连接和销毁MQTT Client
usr_subscribe订阅函数
usr_publish发布消息函数
usr_unsubscribe取消订阅函数
usr_on_message订阅消息回调函数
usr_log_callbacklog监控函数
receive_msg接收消息函数

详细代码如下:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @describe : mqtt handler
# @Time    : 2022/03/18 16:21
# @Author  : ming fei.tang
import logging
from queue import Queue
​
import paho.mqtt.client as mqtt
​
__all__ = ["MQTTClient"]
​
​
class MQTTClient:
​
    def __init__(self, host, port, qos, heartbeat, client_id, username, password):
        self.host = host
        self.port = port
        self.qos = qos
        self.queue = Queue()
        self.mqtt_client_id = client_id
        self.heartbeat = heartbeat
        self.username = username
        self.password = password
        self.mqtt_client = None
​
    def usr_on_message(self, user, data, msg):
        payload = msg.payload.decode('utf-8')
        payload = payload.replace('\n', '').replace('\r', '').replace(' ', '')
        logging.debug('subscribe: %s , payload: %s, QoS = %s' % (msg.topic, payload, msg.qos))
​
        self.queue.put(msg)
​
    def usr_subscribe(self, topic):
        self.mqtt_client.subscribe(topic, self.qos)
        logging.info('subscribe the topic: %s' % topic)
​
    def usr_unsubscribe(self, topic):
        self.mqtt_client.unsubscribe(topic)
        logging.info('unsubscribe %s' % topic)
​
    def receive_msg(self, timeout=None):
        logging.info('waiting for message.')
        if timeout is None:
            timeout = self.heartbeat
        return self.queue.get(timeout=timeout)
​
    def usr_publish(self, topic, payload, qos, retain=False):
        self.mqtt_client.publish(topic, payload, qos, retain)
        logging.debug('public topic = %s, payload = %s , qos = %s, retain = %s' % (topic, payload, qos, retain))
​
    def usr_log_callback(self, client, userdata, level, msg):
        # logging.info('public topic: %s ' % msg)
        pass
​
    def start(self):
        if self.mqtt_client is None:
            self.mqtt_client = mqtt.Client(client_id=self.mqtt_client_id)
            self.mqtt_client.on_log = self.usr_log_callback
            self.mqtt_client.on_message = self.usr_on_message
            self.mqtt_client.username_pw_set(self.username, self.password)
            self.mqtt_client.connect(self.host, self.port, self.heartbeat)
            self.mqtt_client.loop_start()
            logging.info("client('%s') is connected" % self.mqtt_client_id)
        else:
            logging.error("mqtt_client object is None")
​
    def stop(self):
        if self.mqtt_client is not None:
            self.mqtt_client.loop_stop()
            logging.info("client('%s')  is disconnected" % self.mqtt_client_id)
            self.mqtt_client.disconnect()
            self.mqtt_client = None

2.3.3 Log工具源码

创建logging_tool.py,编写如下代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2020/4/9 13:05
# @Author  : ming fei.tang
# @File    : log tools
# ---------------------
import os
import logging
import datetime
import sys
​
import coloredlogs
​
__all__ = ["LogTool"]
​
​
class LogTool:
    def __init__(self):
        self.log_folder = 'clog' + os.sep + datetime.datetime.now().strftime('%Y%m%d')
​
    def setup_logging(self, level=logging.DEBUG, filename=None):
        if os.path.exists(self.log_folder) is False:
            os.makedirs(self.log_folder)
​
        log_filename = None
        if filename is not None:
            log_filename = self.log_folder + os.sep + filename
​
        log_format = '%(asctime)5s - %(levelname)5s - %(lineno)4s - %(filename)18s - %(message)s'
        if log_filename is not None:
            console = logging.StreamHandler(stream=sys.stdout)
            console.setLevel(logging.getLogger().level)
            console.setFormatter(logging.Formatter(log_format))
            logging.getLogger().addHandler(console)
​
        logging.basicConfig(filename=log_filename, level=level, format=log_format)
        coloredlogs.install(level=level, fmt=log_format, milliseconds=True)
​
    def remove_log_folder(self):
        if os.path.exists(self.log_folder) is False:
            os.remove(self.log_folder)
​

2.4 功能测试代码实现

2.4.1 功能介绍

测试函数主要完成功能:

1)连接EMQX服务器上的mqtt_user3 MQTT Client

2)发布消息至EMQX Broker

3) 订阅其他Client的消息,并通过log打印出来

2.4.2 代码实现

编写一个UserMqttClient的类,测试MQTTClient类中的方法,函数列表和介绍如下:

详细代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2023/4/9 13:05
# @Author  : ming fei.tang
# @File    : mqtt client test
# ---------------------
import logging
import random
import time
​
from lib.MQTT_Client import MQTTClient
from lib.logging_tool import LogTool
​
​
class UserMqttClient(MQTTClient):
    def __init__(self, broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password):
        # prepare log file
        super().__init__(broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password)
        self.debug = LogTool()
        self.debug.setup_logging()
        self.debug.remove_log_folder()
        try:
            self.start()
        except Exception as e:
            logging.exception(e)
            assert False, e
​
    def load_para(self):
        pass
​
    def user_publish(self, topic, base_payload):
        while True:
            val = random.randint(1, 15) * 0.1
            val = base_payload + val
            val = '{:.2f}'.format(val)
            publish_payload = str(val)
            self.usr_publish(topic, publish_payload, 1, True)
            time.sleep(10)
​
​
if __name__ == '__main__':
    host = "192.168.1.11"
    port = 1883
    client_id = "paho.mqtt.client"
    username = "mqtt_user3"
    password = "123456"
​
    user_client = UserMqttClient(host, port, 0, 60, client_id, username, password)
​
    user_client.usr_subscribe("switch")
    user_client.usr_subscribe("attributes")
​
    user_client.user_publish(topic="temperature", base_payload=12)
​

3 测试

3.1 EMQX上创建Client

打开EMQX创建如下MQTT Client

名称参数值
usermnamemqtt_user3
password123456

在EMQX客户端认证面板上创建该Client

3.2 运行UserMqttClient

运行UserMqttClient之后, mqtt_user3会自动连接至EMQX,可以在客户端面板查看它的状态

查看EMQX上 mqtt_user3状态,其已经正常的连接到了Broker:

mqtt_user3订阅的消息分别为 switch和attributes

3.3 使用MQTT.fx订阅消息

使用MQTT.fx作为MQTT Client登录EMQX, 然后订阅temperature消息,可以看见MQTT.fx这边能正确的收到

查看EMQX上保留的mqtt_user3发布的消息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/473781.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

学点儿Java_Day6_面向对象:类、封装、构造方法

1 类 1.1 定义 类:对现实世界中事物的抽象。Student 对象:现实世界中具体的个体。张三、李四 这些具体的学生 面向对象的特征:抽象、封装、继承、多态 OOP: Object Oriented Programming 类和对象的总结: 1、现实世界都是由很多…

语音识别教程:Whisper

语音识别教程:Whisper 一、前言 最近看国外教学视频的需求,有些不是很适应,找了找AI字幕效果也不是很好,遂打算基于Whisper和GPT做一个AI字幕给自己。 二、具体步骤 1、安装FFmpeg Windows: 进入 https://github.com/BtbN/FF…

python爬虫学习第二天----类型转换

🎈🎈作者主页: 喔的嘛呀🎈🎈 🎈🎈所属专栏:python爬虫学习🎈🎈 ✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天…

C语言 指针练习

一、 a、b是两个浮点型变量&#xff0c;给a、b赋值&#xff0c;建立两个指针分别指向a的地址和b的地址&#xff0c;输出两个指针的值。 #include<stdio.h> int main() {float a,b,*p1,*p2;a10.2;b2.3;p1&a;p2&b;printf("a%f,b%f\n",a,b);printf("…

墨菲安全在软件供应链安全领域阶段性总结及思考

向外看&#xff1a;墨菲安全在软件供应链安全领域的一些洞察、思考、行动 洞察 现状&挑战&#xff1a; 过去开发安全体系是无法解决软件供应链安全问题的&#xff1b;一些过去专注开发安全领域的厂商正在错误的引导行业用开发安全思维解决软件供应链安全问题&#xff0c;治…

ResNet目标检测算法实现交通灯分类

红绿灯识别方案&#xff1a;https://zhuanlan.zhihu.com/p/674791906 目录 一、制作数据集二、ResNet算法三、pytorch转onnx文件四、onnx推理测试五、onnx转mnn 一、制作数据集 1、数据集划分 将红绿灯数据集大文件夹中不同类别的小文件夹中的图片按照9&#xff1a;1进行划分…

【Flutter】文件选择器(file_picker)的用法

Flutter 没有提供内置的文件选择器&#xff0c;但社区内有人贡献了一个比较完整的解决方案——file_picker。 file_picker 的 API 简洁易用&#xff0c;支持全平台&#xff08;Android / iOS / Mac / Linux / Windows&#xff09;&#xff0c;是我开发桌面应用时的首选。 这边…

MySql实战--一条SQL查询语句是如何执行的?

平时我们使用数据库&#xff0c;看到的通常都是一个整体。比如&#xff0c;你有个最简单的表&#xff0c;表里只有一个ID字段&#xff0c;在执行下面这个查询语句时&#xff1a; select * from T where ID10&#xff1b; 我们看到的只是输入一条语句&#xff0c;返回一个结果…

Chain of Note-CoN增强检索增强型语言模型的鲁棒性

Enhancing Robustness in Retrieval-Augmented Language Models 检索增强型语言模型&#xff08;RALMs&#xff09;在大型语言模型的能力上取得了重大进步&#xff0c;特别是在利用外部知识源减少事实性幻觉方面。然而&#xff0c;检索到的信息的可靠性并不总是有保证的。检索…

阿里云99元服务器40G ESSD Entry系统盘够用吗?

阿里云99元服务器40G ESSD Entry云盘够用吗&#xff1f;够用&#xff0c;操作系统占15GB左右&#xff0c;还有25G富余。如果是40G ESSD Entry系统盘不够用&#xff0c;还可以为云服务器另外挂载数据盘&#xff0c;所以不用担心40G系统盘不够用。可以在阿里云CLUB中心查看 aliyu…

基于SpringBoot实现WebSocket实时通讯的服务端和客户端

实现功能 服务端注册的客户端的列表&#xff1b;服务端向客户端发送广播消息&#xff1b;服务端向指定客户端发送消息&#xff1b;服务端向多个客户端发送消息&#xff1b;客户端给服务端发送消息&#xff1b; 效果&#xff1a; 环境 jdk&#xff1a;1.8 SpringBoot&#x…

some/ip CAN CANFD

关于SOME/IP的理解 在CAN总线的车载网络中&#xff0c;通信过程是面向信号的 当ECU的信号的值发生了改变&#xff0c;或者发送周期到了&#xff0c;就会发送消息&#xff0c;而不考虑接收者是否需要&#xff0c;这样就会造成总线上出现不必要的信息&#xff0c;占用了带宽 …

get_local_ip.bat:快速获取IPv4地址

批处理脚本&#xff0c;用于在Windows命令提示符下获取本地计算机的IPv4地址。 echo off ipconfig | findstr IPv4 pause - echo off&#xff1a;这会关闭命令提示符窗口中的命令回显&#xff0c;使得在运行脚本时不会显示每条命令的执行结果。 - ipconfig&#xff1a;这是一…

流畅的 Python 第二版(GPT 重译)(十三)

第二十四章&#xff1a;类元编程 每个人都知道调试比一开始编写程序要困难两倍。所以如果你在编写时尽可能聪明&#xff0c;那么你将如何调试呢&#xff1f; Brian W. Kernighan 和 P. J. Plauger&#xff0c;《编程风格的要素》 类元编程是在运行时创建或自定义类的艺术。在 P…

元素定位之xpath和css

元素定位 xpath绝对路径相对路径案例xpath策略&#xff08;路径&#xff09;案例xpath策略&#xff08;层级、扩展&#xff09;属性层级与属性层级与属性拓展层级与属性综合 csscss选择器&#xff08;id、类、标签、属性&#xff09;id选择器类选择器标签选择器属性选择器案例-…

按面积筛选填充二值图中的孔洞-python源码

目录 &#x1f64b;&#x1f64b;需求 &#x1f345;&#x1f345;解决方案 &#x1f64b;&#x1f64b;需求 前提条件是二值图中0是背景&#xff0c;255是前景。 二值化后的影像中有很多小孔洞&#xff0c;现在需要按孔洞面积进行筛选&#xff0c;填充面积小于阈值的孔洞&…

何恺明重提十年之争——模型表现好是源于能力提升还是捕获数据集偏见

2011年,知名学者Antonio Torralba和Alyosha Efros提出了“数据集偏差之战”&#xff0c;他们发现机器学习模型很容易“过拟合”到特定的数据集上&#xff0c;导致在其他数据集上表现不佳。过去十年&#xff0c;随着深度学习革命的到来&#xff0c;建立多样化、大规模、全面且尽…

三级数据库技术考点(详解!!)

1、 答疑:【解析】分布式数据库系统按不同层次提供的分布透明性有:分片透明性;②位置透明性;③局部映像透明性&#xff0c;位置透明性是指数据分片的分配位置对用户是透明的&#xff0c;用户编写程序时只需 要考虑数据分片情况&#xff0c;不需要了解各分片在各个场地的分配情…

GitHub配置SSH Key(详细版本)

GitHub配置SSH Key的目的是为了帮助我们在通过git提交代码是&#xff0c;不需要繁琐的验证过程&#xff0c;简化操作流程。比如新建的仓库可以下载, 但是提交需要账号密码。 步骤 一、设置git的user name和email 如果你是第一次使用&#xff0c;或者还没有配置过的话需要操作…

zookeeper底层细节

zk 临时节点和watch机制实现注册中心自动注册和发现&#xff0c;数据都在内存&#xff0c;nio 多线程模型&#xff1b; cp注重一致性&#xff0c;数据不一致时集群不可用 事务请求处理方式 1.all事务由唯一服务器处理 2.将客户端事务请求转成proposal分发follower 3.等待半…
最新文章