FluxMQ:新一代的高性能MQTT代理服务器

FluxMQ:新一代的高性能MQTT代理服务器

前言

FLuxMQ是一款基于java开发,支持无限设备连接的云原生分布式物联网接入平台。FluxMQ基于Netty开发,底层采用Reactor3反应堆模型,具备低延迟,高吞吐量,千万、亿级别设备连接;方便企业快速构建其物联网平台与应用。

FluxMQ官网:https://www.fluxmq.com

FluxMQ演示系统:http://demo.fluxmq.com/

丰富的功能

通配符

通配符解释
#可以匹配零个或多个层级的主题。只能出现在主题末尾。
+可以匹配一个层级的主题。可以出现在主题的任何位置。

FluxMQ支持两种通配符:+和#。+表示匹配一个层级,#表示匹配零个或多个层级。下面是一些例子:

  • 井号 #:

井号通配符表示可以匹配零个或多个层级的主题。它只能出现在主题的末尾,例如 home/living_room/#。
用法示例:如果一个订阅者订阅了主题 home/living_room/#,那么它将接收到诸如 home/living_room/light、home/living_room/temperature 等所有以 home/living_room/ 开头的消息。

  • 加号 +:

加号通配符表示可以匹配一个层级的主题。它可以出现在主题的任何位置,例如 home/+/light。
用法示例:如果一个订阅者订阅了主题 home/+/light,那么它将接收到诸如 home/living_room/light、home/kitchen/light 等层级为 home/ 下,且以 /light 结尾的消息。

共享订阅

FluxMQ支持共享订阅,单个topic共享订阅集群支持10W/s吞吐量,下面吗我们介绍下共享订阅的使用方式:

如何使用共享订阅
  • 通过Topic的订阅规则,可以指定订阅组名称,从而实现共享订阅。例如,订阅规则为$share/topic1,则订阅组名称为DEFAULT,订阅主题为topic1
  • 分组共享订阅是指将订阅组的消息分发均匀地分发给订阅组成员。在分组共享订阅中,订阅同一主题的客户机依次接收此主题下的消息。同一消息不会发送给多个订阅客户端,从而实现多个订阅客户端之间的负载均衡。订阅的规则为$queue/group1/topic1,则订阅组名称为group1,订阅主题为topic1

规则引擎

FluxMQ Rule Engine (以下简称规则引擎) 用于配置FluxMQ 消息流与设备事件的处理、响应规则。规则引擎不仅提供了清晰、灵活的 “配置式” 的业务集成方案,简化了业务开发流程,提升用户易用性,降低业务系统与 FluxMQ 的耦合度;也为 FluxMQ 的私有功能定制提供了一个更优秀的基础架构。
FluxMQ 在 消息发布或事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息;

流程图

Sql支持

规则的 SQL 语句基本格式为:

SELECT <字段名> FROM <事件类型> [WHERE <条件>]
  • FROM 子句将规则挂载到某个事件类型上,可多选
  • SELECT 子句用于对数据进行变换,并选择出需要的字段
  • WHERE 子句用于对 SELECT 选择出来的某个字段施加条件过滤
## SELECT 语句用于决定最终的输出结果里的字段。比如:
## 下面 SQL 的输出结果中将只有两个字段 "a" 和 "b":

SELECT a, b FROM "$EVENT.PUBLISH"

# 选取 username 为 'abc' 的建立连接消息,输出结果为所有可用字段:

SELECT * FROM "$EVENT.CONNECT" WHERE auth.username = 'abc'

## 选取 clientId 为 'abc' 的终端发来的消息,输出结果将只有 cid 一个字段。

SELECT clientId as cid FROM "$EVENT.PUBLISH" WHERE clientId = 'abc'

## 选取 qos 为 '1' 的发布消息,输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段,所有消息发布事件中的可用字段 (比如 clientId、topic 等) 仍然可以在 WHERE 语句中使用:

SELECT clientId as cid FROM "$EVENT.PUBLISH" WHERE qos = 1

## 但下面这个 SQL 语句就不能工作了,因为变量 xyz 不是消息发布事件中的可用字段:

SELECT clientId as cid FROM "$EVENT.PUBLISH" WHERE xyz = 'abc'

比较符号

函数名函数作用返回值
>大于true/false
<小于true/false
<=小于等于true/false
>=大于等于true/false
<>不等于true/false
!=不等于true/false
=比较两者是否完全相等。可用于比较变量和主题true/false
=~比较主题(topic)是否能够匹配到主题过滤器(topic filter)。只能用于主题匹配true/false

SQL 语句示例

基本语法举例

  • 从 topic 为 “t/a” 的消息中提取所有字段:
SELECT * FROM "$EVENT.PUBLISH" WHERE topic = 't/a'
  • 从 topic 为 “t/a” “t/b” 的消息中提取所有字段:
SELECT * FROM "$EVENT.PUBLISH" WHERE topic = 't/a' or topic = 't/b'
  • 从 topic 能够匹配到 ‘t/#’ 的消息中提取所有字段。
SELECT * FROM "$EVENT.PUBLISH" WHERE topic =~ "t/#"
  • 从 topic 能够匹配到 ‘t/#’ 的消息中提取 qos、messageId 和 clientId 字段:
SELECT qos, messageId, clientId FROM "$EVENT.PUBLISH" WHERE topic =~ "t/#"
  • 从 建立连接 消息中提取 username 字段,并且筛选条件为 username = ‘fluxmq’:
select auth.username as username from "$EVENT.CONNECT" where auth.username = 'fluxmq'
  • 从任意 topic 的 JSON 消息体(payload) 中 提取 x 字段,并创建别名 x 以便在 WHERE 子句中使用。WHERE 子句限定条件为 x = 1。下面这个 SQL 语句可以匹配到消息体 {“x”: 1},但不能匹配到消息体 {“x”: 2}:
SELECT payload.x as x FROM "$EVENT.PUBLISH" WHERE payload.x = 1
  • 类似于上面的 SQL 语句,但 **嵌套地提取 **消息体中的数据,下面的 SQL 语句可以匹配到 JSON 消息体 {“x”: {“y”: 1}}:
SELECT payload FROM "$EVENT.PUBLISH" WHERE payload.x.y = 1
  • 在 clientId = ‘c1’ 连接成功时,提取其来源 IP 地址:
SELECT clientIp FROM "$EVENT.CONNECT" WHERE clientId = 'c1'
  • 筛选所有订阅 ‘t/#’ 主题 订阅级别为 QoS 1 的 clientId:
SELECT clientId FROM "$EVENT.SUBSCRIBE" WHERE topic =~ 't/#' and qos = 1
  • 筛选所有订阅主题能匹配到 ‘t/#’ 且订阅级别为 QoS 1 的 clientId。注意与上例不同的是,这里用的是主题匹配操作符 ‘=~’,所以会匹配订阅 ‘t’ 或 ‘t/+/a’ 的订阅事件:
SELECT clientId FROM "$EVENT.SUBSCRIBE" WHERE topic =~ 't/#' and qos = 1
  • 从topic包含"fluxmq"字符的消息中提取所有字段,使用 ‘like’ 语法,包含的字符用%包裹:
SELECT * FROM "$EVENT.PUBLISH" WHERE topic like '%fluxmq%'
  • 使用常量字段,用双引号""包裹字符串,作为常量值;as后面跟字段,作为输出的字段名:
SELECT *, "test" as event FROM "$EVENT.PUBLISH"
  • 可以直接使用Java的String API,如 startsWith,endsWith;获取以’test’开头的所有Publish消息:
select * from "$EVENT.PUBLISH" where topic.startsWith('test')

提示

  • FROM 子句后面的主题需要用双引号 “”,或者单引号 ‘’ 引起来。
  • WHERE 子句后面接筛选条件,如果使用到字符串需要用单引号 ‘’ 引起来。
  • FROM 子句里如有多个事件,需要用逗号 “,” 分隔。例如 SELECT * FROM “t/1”, “t/2” 。
  • 可以使用使用 “.” 符号对json字段进行嵌套选择。
  • 尽量不要给 payload 创建别名,否则会影响运行性能。即尽量不要这么写:SELECT payload as p

SQL函数列表

函数名说明
json- 使用在SQL语句中,可以将嵌套的Json结构转换成字符串
- 使用在支持模板替换的想象中,比如数据库保存的SQL,KAFKA的TOPIC等地方
bytes将对象转成字节数组,会先将对象JSON化,再转成UTF8格式的字节数组
int8将对象转为Int8类型,相当于byte
int16将对象转为Int16类型,相当于short
int32将对象转为Int32类型,相当于int
int64将对象转为Int64类型,相当于long
toDouble将对象转为double类型
hexStr将对象转成HEX字符串, 会先将对象JSON化,再转成对象的16进制字符串
date格式化时间字符串:yyyy-MM-dd
datetime格式化时间字符串:yyyy-MM-dd HH:mm:ss
dateToTimestampyyyy-MM-dd 时间字符串转成时间戳
datetimeToTimestampyyyy-MM-dd HH:mm:ss 时间字符串转成时间戳
uuid生成32位随机小写字符串
uuidUpper生成32位随机大写字符串
isBytes判断是否是字节数组
isJson判断是否是json

SQL事件

规则的 SQL 语句可以处理事件(发布消息、客户端上下线、客户端订阅等),FROM 子句后面跟事件主题。
事件主题以 $EVENT/ 开头,比如 E V E N T . P U B L I S H , EVENT.PUBLISH, EVENT.PUBLISH,EVENT.SUBSCRIBE 。

事件名称事件主题名释义
发布事件$EVENT.PUBLISH发布消息
订阅事件$EVENT.SUBSCRIBE订阅成功消息
取消订阅事件$EVENT.UNSUBSCRIBE取消订阅成功消息
发布回复事件$EVENT.ACK消息接收成功并回复
心跳事件$EVENT.PING连接保活心跳消息
取消连接事件$EVENT.DISCONNECT客户端主动断开连接
连接断开事件$EVENT.CLOSE服务端关闭连接
建立连接事件$EVENT.CONNECT连接成功
离线消息事件$EVENT.OFFLINE离线期间接收的消息
支持的数据源
  • [✔] MySQL
  • [✔] PostgreSQL
  • [✔] Oracle
  • [✔] SQL Server
  • [✔] Redis
  • [✔] TDengine
  • [✔] ClickHourse
  • [✔] Kafka
  • [✔] RabbitMQ
  • [✔] RocketMQ
  • [✔] Pulsar
  • [✔] MQTT
  • [✔] Mogodb
  • [✔] Log

多协议

目前已经支持的协议如下:

  • [✔] Websocket
  • [✔] Ocpp
  • [✔] GBT32960
  • [✔] JT808
  • [✔] V2C
  • [✔] I1
  • [✔] Coap

通过控制台启动协议插件后,默认会在集群每个节点气动对应的协议插件,依托于FluxMQ,该协议天然支持集群,订阅的规则如下:

下行指令通过MQTT下发给具体的设备,TOPIC规则为:{协议名称}/down/{vin}
上行是设备上报的报文,TOPIC规则为:{协议名称}/up/{vin}

脚本

脚本的处理时机是在MQTT事件处理器之前,因此脚本的适用于修改、解析、加解密MQTT传输的Payload。脚本默认匹配主题支持通配符,处理Payload可以直接返回byte[]或者String。

Javascript脚本
 function convert(topic, payload){
   return payload;
 }
Lua脚本
function convert(topic, payload)
   return payload
end
Groovy脚本
def convert(String topic, byte[] payload){
    return payload
}

FluxMQ免费推广

FluxMQ默认提供免费的接入License许可,直接参考我们文档下载即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/366732.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ClickHouse基于数据分析常用函数

文章标题 一、WITH语法-定义变量1.1 定义变量1.2 调用函数1.3 子查询 二、GROUP BY子句&#xff08;结合WITH ROLLUP、CUBE、TOTALS&#xff09;三、FORM语法3.1表函数3.1.1 file3.1.2 numbers3.1.3 mysql3.1.4 hdfs 四、ARRAY JOIN语法&#xff08;区别于arrayJoin(arr)函数&a…

创建自己的Hexo博客

目录 一、Github新建仓库二、支持环境安装Git安装Node.js安装Hexo安装 三、博客本地运行本地hexo文件初始化本地启动Hexo服务 四、博客与Github绑定建立SSH密钥&#xff0c;并将公钥配置到github配置Hexo与Github的联系检查github链接访问hexo生成的博客 一、Github新建仓库 登…

完整的 HTTP 请求所经历的步骤及分布式事务解决方案

1. 对分布式事务的了解 分布式事务是企业集成中的一个技术难点&#xff0c;也是每一个分布式系统架构中都会涉及到的一个东西&#xff0c; 特别是在微服务架构中&#xff0c;几乎可以说是无法避免。 首先要搞清楚&#xff1a;ACID、CAP、BASE理论。 ACID 指数据库事务正确执行…

小程序中picker多列选择器

需求&#xff1a;实现类似省市联动的效果&#xff0c;选择第一列后&#xff0c;第二列数据变化 html部分: <view class"section"><view>多列选择器</view><picker mode"multiSelector" bindchange"bindMultiPickerChange"…

大模型实践笔记(1)——GLM-6B实践

目录 在Ubuntu上的配置Git Large File Storage 安装Git LFS&#xff1a; 设置Git LFS&#xff1a; 使用Git LFS&#xff1a; 安装GLM-6B 环境依赖 ChatGLM2-6B介绍 配置GLM 下载代码 构建环境 安装依赖 本地部署 网页UI 很多模型在hugging face上面&#xff0c;…

Pudgy Penguins NFT 概览与数据分析

作者&#xff1a;stellafootprint.network 数据来源&#xff1a;Pudgy Penguins NFT Collection Dashboard “胖企鹅” Pudgy Penguins NFT 系列是由 8,888 个独特的企鹅头像组成的以太坊区块链项目。这个 NFT 项目能否在 2024 年达到发展的高峰&#xff1f; 关于 Pudgy Pe…

微信小程序新手入门教程三:基础语法介绍

WXML&#xff08;WeiXin Markup Language&#xff09;是框架设计的一套标签语言&#xff0c;可以与各种组件相结合&#xff0c;进行页面构建。 一 常用标签 wxml的语法结构与我们熟悉的html很像&#xff0c;但在细节处略有不同&#xff0c;我们可以参考html标签对比记忆。wxm…

12.scala下划线使用总结

目录 概述实践变量初始化导包引入方法转变为函数用户访问Tuple元素简化函数参数传递定义偏函数变长参数 结束 概述 实践 变量初始化 在Scala中&#xff0c;变量在声明时需要显式指定初始值。可以使用下划线为变量提供初始值&#xff0c;但这种语法仅限于成员变量&#xff0c;…

IDEA反编译Jar包

反编译步骤 使用IDEA安装decompiler插件 找到decompiler插件文件夹所在位置&#xff08;IDEA安装路径/plugins/java-decompiler/lib &#xff09;&#xff0c;将需要反编译的jar包放到decompiler插件文件夹下&#xff0c;并创建一个空的文件夹&#xff0c;用来存放反编译后的…

前端玩Word:Word文档解析成浏览器认识的HTML

前言 领导跟富文本编辑器杠上啦&#xff0c;领导有一个类似于某雀导入Word文档&#xff0c;解析内容后渲染到编辑器编辑的需求。某雀功能效果如下&#xff1a; 怎么搞定呢&#xff1f;自己写一个解析器&#xff1f; 本文分享Word文件转换成浏览器认识的HTML实战经验。 什么是…

springboot项目引入redis数据库的简单使用案例

springboot项目引入redis数据库的简单使用案例&#xff01;很多项目都需要使用到redis数据库&#xff0c;这是一个内存型的&#xff0c;非关系型数据库。它的读取速度非常快。因为存在了内存中。不是在硬盘中。而且它可以解决很多棘手的问题&#xff0c;比如&#xff1a;解决一…

Django的web框架Django Rest_Framework精讲(二)

文章目录 1.自定义校验功能&#xff08;1&#xff09;validators&#xff08;2&#xff09;局部钩子&#xff1a;单字段校验&#xff08;3&#xff09;全局钩子&#xff1a;多字段校验 2.raise_exception 参数3.context参数4.反序列化校验后保存&#xff0c;新增和更新数据&…

项目开发 多行编辑

问题 项目开发中&#xff0c;如何进行多行编辑 详细问题 笔者使用IDEA&#xff0c;Android Studio进行项目开发时&#xff0c;由于代码冗余&#xff0c;修改过程中若是逐一删除或编辑&#xff0c;效率相对低&#xff0c;如何进行多行删除或编辑 本文将提供IDEA&#xff0c;A…

152基于matlab的GUI滚动轴承特征频率计算

基于matlab的GUI滚动轴承特征频率计算&#xff0c;输入轴承参数&#xff0c;包括转速&#xff0c;节圆直径、滚子直径、滚子数、接触角&#xff0c;就可得滚动特征频率结果&#xff0c;程序已调通&#xff0c;可直接运行。 152 matlab 轴承特征频率 GUI (xiaohongshu.com)

MongoDB的分片集群(一) : 基础知识

转载说明&#xff1a;如果您喜欢这篇文章并打算转载它&#xff0c;请私信作者取得授权。感谢您喜爱本文&#xff0c;请文明转载&#xff0c;谢谢。 目录导读 1. 什么是MongoDB分片 2. MongoDB分片集群介绍 2.1 MongoDB分片集群架构 2.2 MongoDB分片集群优势 3. 分片键…

排序(5)——归并排序

六、归并排序 1.简介 归并排序也是一种很经典的排序算法&#xff0c;采用分治的思想方法进行数据的处理。归并讲究的是先拆后合&#xff0c;也就是分治中的分而治之。在拿到一组数据后&#xff0c;程序会将整个数据进行不断拆分直至有序&#xff0c;因为单个元素必然有序&…

【类和对象】4

日期类的拓展 c语言中的printf函数只能打印内置类型&#xff0c;为了弥补这一不足&#xff0c;c利用运算符重载可以打印自定义类型。 void operator<<(ostream&out);//声明在date.h中void Date::operator<<(ostream& out)//定义在date.cpp中 {out<<…

Flutter解析后台发来的jwt字段数据,了解jwt是否过期

前言 了解JWT是什么可以看这一篇博文 JWT&#xff08;JSON Web Token&#xff09;详解以及在go-zero中配置的方法-CSDN博客 流程 采用jwt_decoder库 添加至pubspec.yaml jwt_decoder: ^2.0.1 解析字段 查看是否过期 获取过期时间和token颁发的年龄

PythonStudio 控件使用常用方式(五)TListBox

PythonStudio是一个极强的开发Python的IDE工具&#xff0c;它使用的是Delphi的控件&#xff0c;常用的内容是与Delphi一致的。但是相关文档并一定完整。现在我试试能否逐步把它的控件常用用法写一点点&#xff0c;也作为PythonStudio的参考。 TListBox是列表框 常用属性 col…

【操作系统·考研】I/O管理概述

1.I/O设备 1.1 块设备 信息交换以数据块为单位&#xff0c;它属于有结构设备。 块设备传输速率较高&#xff0c;可寻址&#xff0c;且可对该设备随机地的读写。 栗子&#x1f330;&#xff1a;磁盘。 1.2 字符设备 信息交换以字符为单位&#xff0c;属于无结构类型。 字符…