RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(
     msg_id int
    ,status smallint 
    ,public_time timestamp
    ,process_time timestamp as proctime()
) APPEND ONLY;

set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;

select * from t_msg;

--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT 
 public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;

select * from mv_t_msg_groupby;


--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT 
 public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);


--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.

--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT 
 public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);

--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (
     init_count int
    ,query_count int 
    ,callback_count int 
    ,public_time timestamp
    ,query_succ_process_time timestamp
)
WITH (
   connector='kafka',
   topic='t_sink_query_succ',
   properties.bootstrap.server='192.168.1.100:8092',
   scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;

select * from source_query_succ;

--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT 
 b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);

-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/406451.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面试经典150题 -- 二叉树搜索树 (总结)

总的链接 : https://leetcode.cn/studyplan/top-interview-150/ 二叉搜索树相关概念 : 二叉搜索树是一个有序树。 若它的左子树不空&#xff0c;则左子树上所有结点的值均小于它的根结点的值&#xff1b;若它的右子树不空&#xff0c;则右子树上所有结点的值均大于它的根结…

PyTorch概述(六)---View

Tensor.view(*shape)-->Tensor 返回一个新的张量同之前的张量具有相同的数据&#xff0c;但是具有不同的形状&#xff1b;返回的张量同之前的张量共享相同的数据&#xff0c;必须具有相同数目的元素&#xff0c;可能具有不同的形状&#xff1b;对于经过view操作的张量&…

【Java程序设计】【C00286】基于Springboot的生鲜交易系统(有论文)

基于Springboot的生鲜交易系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的生鲜交易系统 本系统分为系统功能模块、管理员功能模块、用户功能模块以及商家功能模块。 系统功能模块&#xff1a;在系统首页可以…

[HTML]Web前端开发技术28(HTML5、CSS3、JavaScript )JavaScript基础——喵喵画网页

希望你开心&#xff0c;希望你健康&#xff0c;希望你幸福&#xff0c;希望你点赞&#xff01; 最后的最后&#xff0c;关注喵&#xff0c;关注喵&#xff0c;关注喵&#xff0c;佬佬会看到更多有趣的博客哦&#xff01;&#xff01;&#xff01; 喵喵喵&#xff0c;你对我真的…

Linux-基础知识(黑马学习笔记)

硬件和软件 我们所熟知的计算机是由&#xff1a;硬件和软件组成。 硬件&#xff1a;计算机系统中电子&#xff0c;机械和光电元件等组成的各种物理装置的总称。 软件&#xff1a;是用户和计算机硬件之间的接口和桥梁&#xff0c;用户通过软件与计算机进行交流。 而操作系统…

【前端素材】推荐优质后台管理系统PORTAL平台模板(附源码)

一、需求分析 后台管理系统是一种具有多层次结构的软件系统&#xff0c;用于管理网站、应用程序或系统的后台操作和管理。下面是对后台管理系统的分层次、详细分析&#xff1a; 第一层&#xff1a;用户界面层 登录界面&#xff1a;提供用户登录验证&#xff0c;确保只有经过授…

村镇医院医疗中心污废水如何处理达标

污废水处理是村镇医院医疗中心运营中不可忽视的重要环节。如何有效处理污废水&#xff0c;使其达到相关标准&#xff0c;是保障医疗中心环境卫生的关键之一。 首先&#xff0c;村镇医院医疗中心应建立科学的废水处理系统。该系统应包括预处理、初级处理、中级处理和高级处理等环…

二十七、图像的均值模糊操作

项目功能实现&#xff1a;对一张图片进行均值模糊操作 按照之前的博文结构来&#xff0c;这里就不在赘述了 更多的图像模糊操作原理可参考博文&#xff1a;七、模糊操作&#xff0c;里面有详细原理讲解&#xff0c;只不过代码是python写的。 一、头文件 blurtest.h #pragma…

网络存储技术

第4章 存储文件系统 1.元数据 文件系统中的数据分为数据和元数据&#xff0c;数据是指普通文件中的实际数据&#xff0c;而元数据是描述数据属性的信息。 在Linux操作系统下&#xff0c;使用文件状态信息stat命令&#xff0c;可以显示文件的元数据如下。 [rootgitlab ~]# s…

英国客户亲临育菁,考察桌面级CNC机床

育菁桌面级CNC机床生产车间 2024年2月22日&#xff0c;英国某机床服务公司Alston和Gary一行拜访考察育菁&#xff0c;在育菁总经理Jimyang和总工程师Alan及海外营销中心总监Akuma的陪同下&#xff0c;参观了育菁桌面小型数控机床生产装配车间&#xff0c;并对育菁牌桌面型数控机…

《TCP/IP详解 卷一》第2章 Internet地址结构

目录 2.1 引言 2.2 表示IP地址 2.3 基本的IP地址结构 单播地址 全球单播地址&#xff1a; 组播地址 任播地址 2.4 CIDR和聚合 2.5 特殊用途地址 2.6 分配机构 2.7 单播地址分配 2.8 与IP地址相关的攻击 2.9 总结 2.1 引言 2.2 表示IP地址 IPv4地址&#xff1a;3…

【寸铁的刷题笔记】树、dfs、bfs、回溯、递归(二)

【寸铁的刷题笔记】树、dfs、bfs、回溯、递归(二) 大家好 我是寸铁&#x1f44a; 金三银四&#xff0c;树、dfs、bfs、回溯、递归是必考的知识点✨ 快跟着寸铁刷起来&#xff01;面试顺利上岸&#x1f44b; 喜欢的小伙伴可以点点关注 &#x1f49d; 上期回顾 感谢大家的支持&am…

Matlab/simulink基于vsg的风光储调频系统建模仿真(持续更新)

​ 1.Matlab/simulink基于vsg的风光储调频系统建模仿真&#xff08;持续更新&#xff09;

LeetCode 2583.二叉树中的第 K 大层和:层序遍历 + 排序

【LetMeFly】2583.二叉树中的第 K 大层和&#xff1a;层序遍历 排序 力扣题目链接&#xff1a;https://leetcode.cn/problems/kth-largest-sum-in-a-binary-tree/ 给你一棵二叉树的根节点 root 和一个正整数 k 。 树中的 层和 是指 同一层 上节点值的总和。 返回树中第 k …

sql注入 [极客大挑战 2019]FinalSQL1

打开题目 点击1到5号的结果 1号 2号 3号 4号 5号 这里直接令传入的id6 传入id1^1^1 逻辑符号|会被检测到&#xff0c;而&感觉成了注释符&#xff0c;&之后的内容都被替换掉了。 传入id1|1 直接盲注比较慢&#xff0c;还需要利用二分法来编写脚本 这里利用到大佬的脚…

学习使用在mysql中查询指定字段字符串包含多个字符串的方法

学习使用在mysql中查询指定字段字符串包含多个字符串的方法 使用LIKE关键字使用REGEXP关键字使用FIND_IN_SET函数使用INSTR函数和AND关键字 使用LIKE关键字 SELECT * FROM table_name WHERE column_name LIKE %string1% AND column_name LIKE %string2%;使用LIKE关键字&#x…

Python爬虫技术详解:从基础到高级应用,实战与应对反爬虫策略【第93篇—Python爬虫】

前言 随着互联网的快速发展&#xff0c;网络上的信息爆炸式增长&#xff0c;而爬虫技术成为了获取和处理大量数据的重要手段之一。在Python中&#xff0c;requests模块是一个强大而灵活的工具&#xff0c;用于发送HTTP请求&#xff0c;获取网页内容。本文将介绍requests模块的…

深入探究node搭建socket服务器

自从上篇中sokect实现了视频通话&#xff0c;但是是使用ws依赖库实现的服务端&#xff0c;所以最近再看ws源码&#xff0c;不看不知道&#xff0c;一看很惊讶。 接下来一点点记录一下&#xff0c;如何搭建一个简易的服务端socket&#xff0c;来实现上次的视频通讯。 搭建一个…

检索增强生成(RAG)——提示工程方法

在检索增强生成(RAG)过程中,提示工程也是一个不可忽略的部分。提示工程可以降低 RAG 应用出现的幻觉,提高 RAG 应用回答的质量。 下面简单介绍一些关于提示工程的论文。 欢迎关注公众号(NLP Research),及时查看最新内容 1. Thread of Thought(ThoT) 论文:Thread of …

LLM (Large language model)的指标参数

1. 背景介绍 我们训练大模型的时候&#xff0c;或者我们用RAG的时候&#xff0c;不知道我们的算法&#xff0c;或者我们的提示&#xff0c;或者我们的本地知识库是否已经整理得符合要求了。又或我们需要一个指标去评估我们目前的所有围绕大模型&#xff0c;向量数据库或外挂知…
最新文章