基于Locust实现MQTT协议服务的压测脚本

 一、背景简介

业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。

目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。

本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。

捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):

  1. 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
  2. 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
  3. 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。

其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。

二、技术选型

这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。

所以就继续编码好了,仍然首选python,想到了locust库,后来看官方文档的时候,看到locust也针对mqtt协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust+paho.mqtt的组合来实现本次的负载脚本。

三、代码编写

1. 脚本代码

暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。

脚本目前做了这些事情:

  • 从db中查询有效可用的所有测试车辆信息数据
  • 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
  • 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
  • 脚本统计连接数、请求数、响应时间等信息写到报表中
  • 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv
import datetime
import queue
import os
import sys
import time
import ssl

from paho.mqtt import client as mqtt_client

# 根据不同系统进行路径适配
if os.name == "nt":
    path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    sys.path.insert(0, path)
    from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
    sys.path.append("/app/qa_test_app/")
    from GB_test.utils.mysql_operating import DB

from locust import User, TaskSet, events, task, between, run_single_user


BROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # 超时时间
TEST_TOPIC = "test_topic"

TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用来publish的测试数据,仅示意

BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 业务需要转换成 byte 类型后再发送

# 创建队列
client_queue = queue.Queue()

# 连接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:
    # 把可用的车辆信息存到队列中去
    client_queue.put(t)


def fire_success(**kwargs):
    """请求成功时调用"""
    events.request.fire(**kwargs)


def calculate_resp_time(t1, t2):
    """计算响应时间"""
    return int((t2 - t1) * 1000)


class MQTTMessage:
    """已发送的消息实体类"""
    def __init__(self, _type, qos, topic, payload, start_time, timeout):
        self.type = _type,
        self.qos = qos,
        self.topic = topic
        self.payload = payload
        self.start_time = start_time
        self.timeout = timeout


# 统计总共发送成功的消息数量
total_published = 0
disconnect_record_list = []  # 定义存放连接断开的记录的列表容器


class PublishTask(TaskSet):

    @task
    def task_publish(self):
        self.client.loop_start()
        topic = TEST_TOPIC
        payload = BYTES_DATA
        # 记录发送的开始时间
        start_time = time.time()
        mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
        published_mid = mqtt_msg_info.mid
        # 将发送成功的消息内容,放入client实例的 published_message 字段
        self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
                                                                   0,
                                                                   topic,
                                                                   payload,
                                                                   start_time,
                                                                   PUBLISH_TIMEOUT)
        # 发送成功回调
        self.client.on_publish = self.on_publish
        # 断开连接回调
        self.client.on_disconnect = self.on_disconnect

    @staticmethod
    def on_disconnect(client, userdata, rc):
        """ broker连接断开,放入列表容器"""
        disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
        disconnect_record_list.append(disconnected_info)
        print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))

    @staticmethod
    def on_publish(client, userdata, mid):
        if mid:
            # 记录消息发送成功的时间
            end_time = time.time()
            # 从已发送的消息容器中,取出消息
            message = client.published_message.pop(mid, None)
            # 计算开始发送到发送成功的耗时
            publish_resp_time = calculate_resp_time(message.start_time, end_time)
            fire_success(
                request_type="p_success",
                name="client_id: " + str(client._client_id),
                response_time=publish_resp_time,
                response_length=len(message.payload),
                exception=None,
                context=None
            )
            global total_published
            # 成功发送累加1
            total_published += 1


class MQTTLocustUser(User):
    tasks = [PublishTask]
    wait_time = between(2, 2)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 从队列中获取客户端 username 和 client_id
        current_client = client_queue.get()

        self.client = mqtt_client.Client(current_client[1])
        self.client.username_pw_set(current_client[0], PASSWORD)
        # self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模拟client连接报错

        # 定义一个容器,存放已发送的消息
        self.client.published_message = {}

    def on_start(self):
        # 设置tls
        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
        self.client.tls_set_context(context)

        self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
        self.client.on_connect = self.on_connect

    def on_stop(self):
        print("publish 成功, 当前已成功发送数量:{}".format(total_published))
        if len(disconnect_record_list) == 0:
            print("无断开连接的client")
        else:
            # 把断开记录里的信息写入csv
            with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
                for i in disconnect_record_list:
                    writer.writerow(i)
            print("断开连接的client信息已写入csv文件")

    @staticmethod
    def on_connect(client, userdata, flags, rc, props=None):
        if rc == 0:
            print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))
            fire_success(
                request_type="c_success",
                name='count_connected',
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )
        else:
            print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))
            fire_success(
                request_type="c_fail",
                name="client_id: " + str(client._client_id),
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )


if __name__ == '__main__':
    run_single_user(MQTTLocustUser)

2. 代码分析-locust库部分

并发请求能力还是使用的locust库的能力。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承UserTaskSet即可。

User

首先是先定义User类,这里就是用来生成我要用来测试的车辆。

类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的文档使用就行。

红色框里,是User类的2个重要熟悉属性:

  • tasks: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask类下面定义的内容。
  • wait_time: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。

绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。

蓝色框里有2个方法,也是locust提供的能力:

  • on_start:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。

  • on_stop:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。

TaskSet

定义好User类,就需要来定义TaskSet类,你得告诉产生出来的用户,要干点啥。

我这根据业务需要,就是让车辆不停的像broker发送数据即可。

红色部分,同样是paho.mqtt提供的能力,会启动新的线程去执行你定义的事情。

黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo类。

注意返回的2个属性:

  • mid: 返回这个消息发送的顺序
  • rc: 表示发送的响应状态,0 就是成功

绿色部分,还记得我在上面的User类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。

2. 代码分析-paho.mqtt库部分

上面的代码已经用到了不少paho.mqtt的能力,这里再进行整体梳理下。

  • client.Client():声明一个client
  • client.username_pw_set(): 设置客户端的用户名,密码
  • client.tls_set_context: 设置ssl模式
  • client.connect(): 连接代理
  • client.publish:向代理推送消息

还用到了一些回调函数:

  • on_connect:连接操作成功时回调
  • on_publish:发布成功时回调
  • on_disconnect:客户端与代理断开连接时回调

另外还用到了一个事件函数events.request

当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event

查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。

比如我在发送回调函数里调用了该方法。

所以最后在控制台显示的报告里就有我定义的内容了。

由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。

后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。

三、小结

后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。

现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。

另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/384303.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++进阶(十五)C++的类型转换

📘北尘_:个人主页 🌎个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上,不忘来时的初心 文章目录 一、C语言中的类型转换二、为什么C需要四种类型转换三、C强制类型转换1、static_cast2、reint…

中国电子学会2019年12月份青少年软件编程Scratch图形化等级考试试卷三级真题(选择题、判断题)

一、单选题(共 25 题,每题 2 分,共 50 分) 1.怎样修改图章的颜色?( ) A. 只需要一个数字来设置颜色 B. 设置 RGB 的值 C. 在画笔中设置颜色、饱和度、亮度 D. 在外观中设置或修改角色颜色特效 2.以下程序的执…

2024年Midjourney 付费订阅流程 | Midjourney 各版本介绍,使用虚拟信用卡支付买Midjourney流程指南

1.Midjourney介绍 Midjourney 是一款备受欢迎的人工智能生成图像工具,它可以通过输入文字描述,自动生成精美的图像。与许多其他图像生成工具不同,Midjourney 不需要安装任何软件,也不受个人电脑性能的限制,因为它运行…

计算机毕业设计SSM基于的冷链食品物流信息管理系统

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: vue mybatis Maven mysql5.7或8.0等等组成,B…

微信小程序(四十二)wechat-http拦截器

注释很详细,直接上代码 上一篇 新增内容: 1.wechat-http请求的封装 2.wechat-http请求的拦截器的用法演示 源码: utils/http.js import http from "wechat-http"//设置全局默认请求地址 http.baseURL "https://live-api.ith…

【学网攻】 第(26)节 -- 综合网络实验一

系列文章目录 目录 系列文章目录 文章目录 前言 一、综合实验 二、实验 1.引入 实验目标 实验设备 实验拓扑图 实验配置 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节…

C++ //练习 6.5 编写一个函数输出其实参的绝对值。

C Primer(第5版) 练习 6.5 练习 6.5 编写一个函数输出其实参的绝对值。 环境:Linux Ubuntu(云服务器) 工具:vim 代码块 /*************************************************************************&…

linux优化空间完全卸载mysql——centos7.9

文章目录 ⭐前言⭐linux命令使用💖 基础命令💖 内存优化💖 完全删除mysql ⭐结束 ⭐前言 大家好,我是yma16,本文分享 linux优化空间&完全卸载mysql——centos7.9。 linux内存分配 在Linux中,内存分配是…

安装faiss环境教程

文章目录 打开环境安装faiss环境检查已安装的环境切换环境至faiss 打开环境 source activate # 打开环境安装faiss环境 conda create -n faiss_env # 安装faiss环境检查已安装的环境 conda info --envs # 检查已安装的环境切换环境至faiss conda a…

【数据结构】13:表达式转换(中缀表达式转成后缀表达式)

思想: 从头到尾依次读取中缀表达式里的每个对象,对不同对象按照不同的情况处理。 如果遇到空格,跳过如果遇到运算数字,直接输出如果遇到左括号,压栈如果遇到右括号,表示括号里的中缀表达式已经扫描完毕&a…

AlmaLinux右键菜单(基于GNOME桌面)

文章目录 前言前提说明在文件上右键在文件夹上右键 前言 在使用VSCode的过程中,AlmaLinux没能像Windows一样在右键菜单上显示打开方式,所以找了一下解决方案,罗列出来 前提说明 虽然说无论是media还是StackOverflow都推荐使用这条命令&…

Impala-架构与设计

架构与设计 一、背景和起源二、框架概述1.设计特点2.框架优点3.框架限制 三、架构图1.Impala Daemon2.Statestore3.Catalog 四、Impala查询流程1.发起查询2.生成执行计划3.分配任务4.交换中间数据5.汇集结果6.返回结果 总结参考链接 一、背景和起源 现有的大数据查询分析工具H…

数据结构:并查集讲解

并查集 1.并查集原理2.并查集实现3.并查集应用4.并查集的路径压缩 1.并查集原理 在一些应用问题中,需要将n个不同的元素划分成一些不相交的集合。开始时,每个元素自成一个单元素集合,然后按一定的规律将归于同一组元素的集合合并。在此过程中…

华为 huawei 交换机 接口 MAC 地址学习限制接入用户数量 配置示例

目录 组网需求: 配置思路: 操作步骤: 配置文件: 组网需求: 如 图 2-14 所示,用户网络 1 和用户网络 2 通过 LSW 与 Switch 相连, Switch 连接 LSW 的接口为GE0/0/1 。用户网络 1 和用户网络 2 分别属于 VLAN10 和 V…

没更新的日子也在努力呀,布局2024!

文章目录 ⭐ 没更新的日子也在努力呀⭐ 近期的一个状态 - 已圆满⭐ 又到了2024的许愿时间了⭐ 开发者要如何去 "创富" ⭐ 没更新的日子也在努力呀 感觉很久没有更新视频了,好吧,其实真的很久没有更新短视频了。最近的一两个月真的太忙了&#…

shell脚本之文件处理命令及字符切片处理

目录 一、文件处理工具 1、tr命令 1.1 转换字符 1.2 压缩字符及删除字符 2、seq命令 3、cut命令 ​4、tac命令 5、rev命令 6、sort命令 ​​​​​7、uniq命令 ​8、echo命令 9、date命令 二、字符串切片处理 1、取字符串的长度 2、跳过字符串最前边的字符 3、…

腾讯云4核8G服务器多少钱?轻量和CVM报价2024新版

腾讯云4核8G服务器S5和轻量应用服务器优惠价格表,轻量应用服务器和CVM云服务器均有活动,云服务器CVM标准型S5实例4核8G配置价格15个月1437.3元,5年6490.44元,标准型SA2服务器1444.8元一年,轻量应用服务器4核8G12M带宽一…

Netty应用(三) 之 NIO开发使用 网络编程 多路复用

目录 重要:logback日志的引入以及整合步骤 5.NIO的开发使用 5.1 文件操作 5.1.1 读取文件内容 5.1.2 写入文件内容 5.1.3 文件的复制 5.2 网络编程 5.2.1 accept,read阻塞的NIO编程 5.2.2 把accept,read设置成非阻塞的NIO编程 5.2.3…

WPF中值转换器的使用

什么是值转换器 在WPF(Windows Presentation Foundation)中,值转换器(Value Converter)是一种机制,允许你在绑定时转换绑定源和绑定目标之间的值。值转换器实现了 IValueConverter 接口,该接口…

SSM实现支付宝沙盒支付

文章目录 沙盒支付准备配置测试 沙盒支付 这里用的支付宝的一个沙盒环境,是支付宝提供给开发者测试用的。 下面主要梳理一下,支付功能的实现,其实还是很简单的,因为支付宝都提供好了,我们只要调用接口去传入参数即可…
最新文章