Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求,仅想提高sink2es的qps,所以仅调节了sink2es的并行度,但在调节不同算子并行度时遇到一些问题,找出问题的根本原因解决问题,并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms

CREATE TABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
     publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
     msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
     send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
     --event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     --WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
   --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
  'format' = 'json'
);



CREATE TABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER
    ,init           INTEGER
    ,init_cancel    INTEGER
    ,push          INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,init_delete    INTEGER
    ,update_time    STRING
    ,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE view  tmp as
select
    send_type,
    account_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
    case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
    case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');


--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6

CREATE view  tmp_groupby as
select
 COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
         when send_type is not null and account_id is null and publish_time is null then 2
         when send_type is not null and account_id is not null and publish_time is null then 3
         when send_type is not null and account_id is not null and publish_time is not null then 4
         end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上

INSERT INTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

发现问题

由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同,上游算子同一个key的数据可能会下发到下游多个不同算子中。
导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id),报错如下:

...
	Caused by: [test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]
		at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
		at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
		at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
		at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
		at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
		at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
		at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
		at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
		... 1 more
	[CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]

问题原因

Flink中存在八种分区策略,常用Operator Chain链接方式有三种分区器:

  • forward:上下游并行度相同,且不发生shuffle,直连的分区器
  • hash:将数据按照key的Hash值下发到下游的算子中
  • rebalance:数据会被循环或者随机下发到下游算子中,改变并行度若无keyby,默认使用RebalancePartitioner分区策略

rebalance分区器,可能会将上游算子的同一个key随机下发到下游不同算子中,因而引起报错,如下图:
在这里插入图片描述在这里插入图片描述模型如下:

在这里插入图片描述

解决方案

  • 分组聚合算子与sink2es算子配置成相同的并行度,即使用forward分区器,如下图:
    在这里插入图片描述在这里插入图片描述
    模型如下:
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/215b7ba208b142da803a78d85b0f474e.png

  • sink2es算子的并行度配置为1,如下图:

operator chain为forward分区器模型如下:
在这里插入图片描述

总结

归根结底就是需要保证:上游subtask中同一个key只能下发到下游一个subtask中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/347956.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

对比损失函数

多看大佬的文章&#xff0c;总结的太好了&#xff01; 善于利用工具&#xff1a;researchrabbit 所以应该是contrastive metric learning

美,英,法,德、意大利和西班牙的geojson,以及区域json

美&#xff0c;英&#xff0c;法&#xff0c;德、意大利和西班牙的geojson文件 json地址 https://pan.baidu.com/s/1nio1bV_j-jAEVqgEHXWsNw?pwdqwer#list/path/GEOJSON 感谢大佬提供的 大佬连接 大佬的知乎原地址 国内geojson获取工具地址 http://da![在这里插入图片描述](h…

【大数据】流处理基础概念(二):时间语义(处理时间、事件时间、水位线)

流处理基础概念&#xff08;一&#xff09;&#xff1a;Dataflow 编程基础、并行流处理流处理基础概念&#xff08;二&#xff09;&#xff1a;时间语义&#xff08;处理时间、事件时间、水位线&#xff09;流处理基础概念&#xff08;三&#xff09;&#xff1a;状态和一致性模…

【并发】什么是 Future?

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;JAVA ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 关键特性和操作包括&#xff1a; 提交任务&#xff1a; 查询完成状态&#xff1a; 等待结果&#xff1a; 取消任务&#xff1a…

[linux] 域名解析失败案例定位 [Errno -2] Name or service not known

首先发现代码里这段运行报错&#xff1a; socket.gethostbyname_ex(host_name) Traceback (most recent call last): File “”, line 1, in socket.gaierror: [Errno -2] Name or service not known import socket host_name socket.gethostname() print(socket.gethostby…

搜索(3):双向BFS

对于一些问题我们使用普通BFS求解时&#xff0c;队列中最多会存在两层的搜索节点&#xff0c;搜索空间的上界取决于目标节点所在的搜索层次的深度所对应的宽度&#xff0c;一旦层数比较多&#xff0c;那可能会出现搜索爆炸的问题&#xff0c;双向BFS可以让我们不使用这么宽的搜…

Java Web(四)--JavaScript

介绍 JavaScript 教程 JavaScript 能改变 HTML 内容&#xff0c;能改变 HTML 属性&#xff0c;能改变 HTML 样式 (CSS)&#xff0c;能完成页面的数据验证&#xff1b; JS 需要运行浏览器来解析执行JavaScript 代码&#xff1b; JS 是 Netscape 网景公司的产品&#xf…

HTML 炫酷进度条

下面是代码 <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>Light Loader - CodePen</title><style> html, body, div, span, applet, object, iframe, h1, h2, h3, h4, h5, h6, p, blockquote, pre, a, abbr…

1.25学习总结

今天学习了二叉树&#xff0c;了解了二叉树的创建和遍历的过程 今天所了解的遍历过程主要分为三种&#xff0c;前序中序和后序&#xff0c;都是DFS的想法 前序遍历&#xff1a;先输出在遍历左节点和右节点&#xff08;输出->左->右&#xff09; 中序遍历&#xff1a;先…

【mongoDB】下载与安装教程

目录 1. 下载 2.安装 3.启动 mangoDB是否启动成功 ? 1. 下载 官网地址&#xff1a;https://www.mongodb.com/try/download/community 2.安装 3.启动 在此输入命令操作数据库 mangoDB是否启动成功 ? 在浏览器访问&#xff1a;http://127.0.0.1:27017/ 出现该页面表示m…

注册表学习——注册表结构

简介&#xff1a;注册表是由很多项和值构成的。 HEKY_USERS&#xff08;HKU&#xff09; 主要保存默认用户及当前登录用户配置信息。 .DEFAULT 该项是针对未来创建的新用户所保存的默认配置项。 S-1-5-18等项 这些项叫作安全标识符&#xff08;SID&#xff09;用来表示Windows操…

HarmonyOS4.0系统性深入开发26方舟开发框架(ArkUI)概述

方舟开发框架&#xff08;ArkUI&#xff09;概述 方舟开发框架&#xff08;简称ArkUI&#xff09;为HarmonyOS应用的UI开发提供了完整的基础设施&#xff0c;包括简洁的UI语法、丰富的UI功能&#xff08;组件、布局、动画以及交互事件&#xff09;&#xff0c;以及实时界面预览…

计算机网络 第4章(网络层)

系列文章目录 计算机网络 第1章&#xff08;概述&#xff09; 计算机网络 第2章&#xff08;物理层&#xff09; 计算机网络 第3章&#xff08;数据链路层&#xff09; 计算机网络 第4章&#xff08;网络层&#xff09; 计算机网络 第5章&#xff08;运输层&#xff09; 计算机…

Offset Noise

如果尝试用stable diffusion生成特别暗或特别亮的图像&#xff0c;它几乎总是生成平均值相对接近 0.5 的图像。如下图所示&#xff0c;生成暗的图片总是带着明亮的区域&#xff08;暗的街道明亮的光&#xff09;&#xff0c;生成亮的图片总是带着暗的区域&#xff08;白的雪暗的…

list的介绍及其模拟实现

今天我们了解list&#xff0c;list在python中是列表的意思 &#xff0c;但是在C中它是一个带头双向循环链表&#xff1a; list的介绍 list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。list的底层是双向链表结构&#xf…

Cuda笔记1

1、培训001 1 1…100&#xff0c;CPU是串行执行&#xff0c;GPU是分成几部分同时计算&#xff0c;如123,456… 2、培训002 一来一回 每种定义有对应的调用位置&#xff0c;和执行位置&#xff0c;不对会报错。 下图是用NVPROF时间分析 下图是资源分析 1&#xff09; CUDA…

W3School离线手册(2017.03.11版)

点击下载 W3School离线手册(2017.03.11版)

企业软件项目成果-图像识别

下面图像识别仅仅使用了OpenCV库而已&#xff0c;并没有涉及深度学习、机器学习。 整盘样本的拍照识别结果&#xff08;识别准确率达100%&#xff09;&#xff1a; 宫颈刷图像识别的测试结果&#xff08;识别准确率达100%&#xff09;&#xff1a;

springboot druid数据库配置密码加密

1.使用的druid版本 <!-- 阿里数据库连接池 --> <dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-3-starter</artifactId><version>1.2.21</version> </dependency> 2.配置文件 # Spring配置 …

Linux文件管理技术实践

shell shell的种类(了解) shell是用于和Linux内核进行交互的一个程序&#xff0c;他的功能和window系统下的cmd是一样的。而且shell的种类也有很多常见的有c shell、bash shell、Korn shell等等。而本文就是使用Linux最常见的bash shell对Linux常见指令展开探讨。 内置shell…