高可用环境kafka消息未按顺序消费问题

目录

1、背景

2、问题排查

3、问题解决


1、背景

质检任务是异步执行,正常情况下任务状态扭转是    等待中》运行中》成功(失败)。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的kafka消息,正常情况下状态是顺序下发。

升级部署某个项目,生产环境突然出现很多任务,一直是运行中状态。

2、问题排查

(1)怀疑大数据平台,任务没有正常执行完成,所以任务一直是运行中

1、在yarn平台以及数据库中,都没有发现正在运行中的质量sql任务

2、排查质量平台服务器日志(kafka消息打印接收消息日志很必要,出问题利于排查),发现这个某个sqlId,正常返回了kafka消息,包括运行中、成功、失败等消息。

通过上面的排查,大数据平台没问题,正常执行了任务,正常按顺序给质量平台发了kafka消息

(2)排查质量平台处理kafka消息逻辑

kafka按顺序返回了状态,质量平台没按顺序消费,看质量平台代码如下。

    @Override
    @Async("zyslClThreadPool")
    public void procZyslJobData(StatusInfo statusInfo) {
        try {
            String zyslId = statusInfo.getTaskId();
            String sqlId = statusInfo.getSqlId();
          -dosomething();
            //运行中任务把检核状态更新成运行中
            if (StatusEnum.RUNNING.getCode().equals(statusInfo.getStatus().getCode())) {
                if (ZyslYxztEnum.WAITING_SUBMIT.getBm().equals(oldGxZysl.getYxzt())
               || ZyslYxztEnum.WAITING.getBm().equals(oldGxZysl.getYxzt())) {
                  //运行中
                oldGxZysl.setYxzt(ZyslYxztEnum.RUNNING.getBm());


                    gxZyslMapper.updateById(oldGxZysl);
                }
                //运行中状态
               oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.RUNNING.getBm());
                gxZyrzMapper.updateById(oldGxZyrzUpdate);
                return;
            }
            if (StatusEnum.FAILED.getCode().equals(statusInfo.getStatus().getCode())) {
                log.error("大数据job错误,执行任务失败:{},zyslid:{},参数:{}", statusInfo.getMessage(), zyslId,
                        JSON.toJSONString(statusInfo));
                //处理失败
                procFail(statusInfo, oldGxZyrzUpdate);
            }
            if (StatusEnum.FINISH.getCode().equals(statusInfo.getStatus().getCode())) {
                //正常sql和异常sql都执行完成
                oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.SUCCESS.getBm());
            }
            // 先更新状态,后处理事件
             dosomething2();
        } catch (Exception e) {
            log.error("大数据作业实例结果处理报错:{}", e.getMessage());
        }
    }
    
     public void  dosomething2(GxZyrz gxZyrz, SsZyxx zyxx) {
        if (Constants.CODE_SUCCESS.equals(gxZyrz.getYxzt())) {
              //成功状态查询es、发邮件等
             dosomething3();
        } else if (Constants.CODE_FAILED.equals(gxZyrz.getYxzt())) {
            gxZyrz.setYcs(0L);
            gxZyrz.setZyl(0L);
            gxZyrz.setSfgj("N");
        }
        gxZyrzMapper.updateById(gxZyrz);
    }

kafka消息被多线程异步处理了。

1、如果任务sql执行成功,kafka返回运行中、执行成功消息

线程1 - 处理待运行任务

线程2- 处理成功状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如gxZyslMapper.updateById是2秒时间,   线程2更新状态之前会dosomething3()查询es、查询数据表、发邮件,肯定超过5秒,然后更新sql任务状态。

结论: 如果sql是执行成功,这种情况,应该不会出现线程2先把任务状态更新成成功,然后线程1把状态更新是运行中。

2、如果任务sql执行失败,kafka返回运行中、执行失败消息

线程1 - 处理待运行任务

线程2- 处理失败状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如update是2秒时间,   线程2直接更新sql任务状态。

结论: 如果sql是失败成功,这种情况,如果运行中、运行失败状态消息时间建个在2秒内,应该会出现线程2先把任务状态更新成失败,然后线程1把状态更新是运行中。

代码分析之后,带着结论去现场验证,发现确实是失败状态任务状态被逆写了。

按这个结论,按理来讲部署的所有现场都会出现问题,为什么只有这个现场有问题呢?

大数据那边升级了代码,以前执行失败的任务,运行中和运行失败,他们发消息间隔至少耗时在5秒,改了逻辑之后直接失败的任务,发信息间隔在2秒内。这就验证了这个问题

3、问题解决

1、运行中状态,先更新sqlId对应的任务状态,然后更新别的数据表状态;2、更新运行中的状态不直接更新,带着状态更新    update zyrz set yxzt = '2' where id = 'xxx' and yxzt not in('0','1')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/559292.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++ | Leetcode C++题解之第38题外观数列

题目&#xff1a; 题解&#xff1a; class Solution { public:string countAndSay(int n) {string prev "1";for (int i 2; i < n; i) {string curr "";int start 0;int pos 0;while (pos < prev.size()) {while (pos < prev.size() &&…

vue全屏后下拉框失效

如图&#xff0c;vue页面有个全屏功能 问题&#xff1a;全屏后下拉菜单消失 解决&#xff1a;加个这个 :teleported"false"如果不行试试这个 :popper-append-to-body"false"ok我话说完

nvidia-smi CUDA Version:N/A

问题 nvidia-smi显示&#xff1a;CUDA Version:N/A nvidia-smi -a显示&#xff1a;CUDA Version: Not Found 解决方法 查看Nvidia驱动版本 nvidia-smi如下图&#xff0c;版本为530.41.03 搜索cuda库 apt search libcuda注&#xff1a;不同的源&#xff0c;同一个库的命…

【大数据】bigtable,分布式数据库的鼻祖

目录 1.概述 2.数据模型 3.API 4.架构 5.一个完整的读写过程 6.如何查找到要的tablet 7.LSM树 1.概述 本文是作者阅读完bigtable论文后对bigtable进行的一个梳理&#xff0c;只涉及核心概念不涉及具体实操&#xff0c;具体实操会在后续的文章中推出。 GFS的出现虽然解…

海纳斯新装系统设置,安装删除卸载应用

文章目录 一、修改密码二、修改网卡地址三、修改主机名称四、挂载硬盘五、卸载应用省流版&#xff0c;直接执行以下脚本即可 六、安装网络流量可视化监控面板serverBee总结 一、修改密码 passwd root passwd ubuntu二、修改网卡地址 vi /etc/network/interfaces.d/eth0三、修…

HLS数据可以一起下载sentinel2源和Landsat89的数据吗?

可以的&#xff0c;地图资源工具可以同时下载同一时间段、同一范围的不同类别的数据&#xff0c;这对我们利用不同数据进行综合数据分析很有意义&#xff01;下面视频就是操作方法&#xff1a; 地图资源工具可以同时下载同一时间段、同一范围的不同类别的数据

人体行为识别/人体姿态估计AI算法模型介绍及场景应用

AI算法模型训练是指利用大量的数据以及特定的算法来训练出一个能够完成任务的计算模型。在进行AI算法模型训练时&#xff0c;通常需要经过以下几个步骤&#xff1a; 数据收集和预处理&#xff1a;首先需要收集用于训练的数据&#xff0c;然后对数据进行清洗、标注、归一化等处…

揭秘App广告变现,高效开发者必看攻略

在移动互联网高速发展的今天&#xff0c;应用市场竞争日益激烈。如何有效地进行app广告变现&#xff0c;是每个移动应用开发者都需要面对的挑战。以下是一些有效的广告变现策略。 选择合适的广告形式至关重要。插屏广告、横幅广告、视频广告等各有优劣&#xff0c;开发者需要根…

SQL注入作业

目录 一、万能密码和二阶注入测试 1.万能密码 2.二阶注入测试 二、联合查询注入测试 1.判断注入点 2.判断当前查询语句的列数 3.查询数据库基本信息 4.查询数据库中的数据 三、报错注入 1. 报错注入函数EXTRATVALUE 2.UPDATEXML 四、盲注测试 1.布尔盲注 判断数据…

16.4 冒泡排序

题目简介 排序动画模拟网站 phttps://www.cs.usfca.edugalles/visualization/ComparisonSort.htm 简洁版 #include <stdio.h> int main() {int a[10]{9,3,6,5,8,2,4,1,7,0};int n sizeof(a)/sizeof(int);int temp 0;for(int j0;j<n-1;j){ //外层循环循环9轮即可f…

Scala 第一篇 基础篇

Scala 第一篇 基础篇 一、变量与常量 1、变量2、常量 二、数据类型 1、数据基本类型概览2、元组的声明与使用3、Range介绍和使用4、Option 类型的使用和设计5、类型别名 三、运算符四、程序逻辑 1、一切都是表达式2、分支语句3、循环语句 五、集合 1、List2、Set3、Map4、Arra…

【大语言模型+Lora微调】10条对话微调Qwen-7B-Chat并进行推理 (聊天助手)

代码&#xff1a;https://github.com/QwenLM/Qwen/tree/main 国内源安装说明&#xff1a;https://modelscope.cn/models/qwen/Qwen-7B-Chat/summary 通义千问&#xff1a;https://tongyi.aliyun.com/qianwen 一、环境搭建 下载源码 git clone https://github.com/QwenLM/Qwen…

【python】如何通过python来发邮件,各种发邮件方式详细解析

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

27 管道

概念 管道式Unix中最古老的进程间通信的形式 把从一个进程连接到另一个进程的一个数据流称为一个“管道” 原理 task_struct中保存了一个files的结构体数组&#xff0c;里面存储了所有打开文件的编号&#xff0c;新打开一个文件&#xff0c;数据会写入到文件对应的 缓冲区中去…

程序,进程,进程管理的相关命令

程序 程序是执行特定任务的代码 1.是一组计算机能识别和执行的指令&#xff0c;运行于电子计算机上&#xff0c;满足人们某种需求的信息化工具 2.用于描述进程要完成的功能&#xff0c;是控制进程执行的指令集 进程的状态 为了对进程进行管理&#xff0c;操作系统首先定义…

上位机图像处理和嵌入式模块部署(树莓派4b实现xmlrpc通信)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面&#xff0c;我们也用纯API实现过上位机和开发板之间的通信。当时使用的方法&#xff0c;就是用windows自带的网络sdk和linux自带的api函数来完…

rc_visard 3D Stereo Senso

1 简介 rc_visard 3D立体视觉传感器 支持的接口标准 GenICam Generic Interface for CamerasGigE Gigabit Ethernet 词汇表 SGM semi-global matching 半全局匹配 SLAM Simultaneous Localization and Mapping 即时定位与地图构建 2 安全 3 硬件规格 坐标系 rc_visar…

linux信号相关概念

signal 信号引入什么是信号&#xff1f;如何产生信号&#xff1f;通过按键产生信号调用系统函数向进程发信号系统调用函数发送信号的流程: 由软件条件产生信号软件发送信号的流程&#xff1a; 硬件异常产生信号硬件异常的流程&#xff1a; Deliver、Pending、Block概念信号在内…

【Ne4j图数据库入门笔记1】图形数据建模初识

1.1 图形建模指南 图形数据建模是用户将任意域描述为节点的连接图以及与属性和标签关系的过程。Neo4j 图数据模型旨在以 Cypher 查询的形式回答问题&#xff0c;并通过组织图数据库的数据结构来解决业务和技术问题。 1.1.1 图形数据模型介绍 图形数据模型通常被称为对白板友…

明文scheme拉起此小程序

微信开发文档说明&#xff1a;https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/url-scheme.html 1、开发者无需调用平台接口&#xff0c;在MP平台->设置->隐私与安全->明文Scheme拉起此小程序声明后&#xff0c;可自行根据如下格式拼接app…
最新文章