ClickHouse 如何实现数据一致性

文章目录

    • ReplacingMegreTree 引擎
    • 数据一致性实现方式
      • 1.ReplacingMegreTree 引擎
      • 2.ReplacingMegreTree 引擎 + 手动合并
      • 3.ReplacingMegreTree 引擎 + FINAL 查询
      • 4.ReplacingMegreTree 引擎 + 标记 + GroupBy
      • 5.允许偏差

前言:在大数据中,基本上所有组件都要求做到数据的一致性,因为大多数环境都是分布式的情况,如果数据无法做到一致,最终在进行分析计算时,导致指标出现问题,影响业务。

本篇文章将探讨在 ClickHouse 中实现数据一致性的几种方式。

ReplacingMegreTree 引擎

在 ClickHouse 中,推荐使用 ReplacingMegreTree 作为确保数据一致性的引擎。

与传统的 MergeTree 引擎相比,ReplacingMergeTree 允许更新已有的数据,而不是简单地追加新数据。当有新数据插入到表中时,它会删除排序键值相同的重复项,替换旧数据。

当有新数据替换旧数据时,旧数据并不会被立即删除,而是被标记为过时的版本。它定期会触发数据块合并操作,此时才会将历史数据进行删除,只保留新版本数据,优化性能、减少存储空间,做到最终一致性。

只有在触发合并操作后,数据才会发生真正的替换操作,在这之前,你查询到的都是历史版本数据。

数据一致性实现方式

1.ReplacingMegreTree 引擎

如果我们仅仅依赖 ReplacingMegreTree 引擎是无法做到数据一致性的,虽然它可以做到最终一致性,但是在未进行数据合并前,它可能存在重复数据。

而且官方说明,合并时间是无法预测的,也就是说我们并不知道它具体什么时候会发生合并操作。

那么为什么数据的合并时间无法预测呢?

  • 数据量和数据分布不确定性:合并操作的时间受到数据量和数据分布的影响。如果数据量较大,合并操作可能需要花费更长的时间。此外,数据的分布情况也会影响合并操作的时间,例如数据块的大小和数量、数据块之间的差异等。

  • 系统负载和资源竞争:合并操作需要消耗系统资源,包括 CPU、内存、磁盘等。当系统处于高负载状态时,合并操作可能需要等待资源空闲才能执行,这会导致合并操作的时间不确定。

2.ReplacingMegreTree 引擎 + 手动合并

虽然我们无法预知 ReplacingMegreTree 合并的具体时间,但是我们可以提前触发手动合并。

在新数据写入后,通过如下语句,主动执行合并:

OPTIMIZE TABLE table_name FINAL;

# 完成语法
OPTIMIZE TABLE [db.]name [ON CLUSTER cluster] [PARTITION partition | PARTITION ID 'partition_id'] [FINAL] [DEDUPLICATE [BY expression]]

但是手动合并会付出很大的代价,官方建议不要使用 OPTIMIZE TABLE ... FINAL,因为它用于管理(测试),而不是日常操作。

原因是,使用该查询时,它会尝试将指定表计划外的数据部分合并到一个原来的数据部分中。在此过程中,ClickHouse 读取所有数据部分,解压缩、合并、压缩为单个部分,然后重写回对象存储,造成巨大的 CPU 和 IO 消耗。这一优化会重写单个部分,即使它们已经合并为单个部分,所以代价很大,日常避免使用。

3.ReplacingMegreTree 引擎 + FINAL 查询

在进行查询时,添加 FINAL 后缀,如下所示:

SELECT * FROM test FINAL;

当指定 FINAL 时,ClickHouse 会在返回结果之前完全合并数据。但它限制引擎,只适用于从 ReplacingMergeTree、SummingMergeTree、AggregatingMergeTree、CollapsingMergeTree 和 VersionedCollapsingMergeTree 表中选择数据。

FINAL 查询在 v20.5.2.7 之前是单线程操作,十分缓慢,在这之后的版本都是并行执行的,默认采用 16 线程数,如果机器不能满足这么多线程,则默认使用当前机器最大线程数。我们也可以指定线程数运行该语句:

SELECT * FROM test FINAL settings max_threads = n;

当然,它会比日常的查询慢很多,主要原因有:

  • 数据是在查询执行期间才进行合并的。

  • 除了查询中指定的列之外,还会读取主键列的数据。

  • 需要额外的计算和内存资源,因为只有在查询时才会进行合并,所有操作都是在内存中进行的。你虽然可以在查询中使用 FINAL 获得最终所需的结果,但是要注意资源的消耗。

4.ReplacingMegreTree 引擎 + 标记 + GroupBy

在创建表时,我们可以增加一列作为标记,记录该值是否被删除或更新。通过两个字段来完成这一操作:

  • 标记列:使用特殊值标识该行数据是否被删除,例如:0 表示存在,1 表示过期。

  • 时间戳:标记列只能确保数据是否被删除,并不能标识是否发生过更新操作。所以我们可以借助数据中本身存在的时间戳来选择最新的数据,从而避免重复数据,做到数据一致性。

在日常开发中推荐使用这种方式来保证数据的一致性。

实现案例

假设有一个表存储用户的信息,包括用户ID、用户名、邮箱和标记列表示是否被删除。

首先,创建表

CREATE TABLE users (
    id UInt32,
    name String,
    email String,
    is_deleted UInt8,
    event_time DateTime
) ENGINE = ReplacingMergeTree(event_time)
ORDER BY (id, event_time);

在这个表中,我使用了 ReplacingMergeTree 引擎,并指定了 event_time 字段作为排序键,以确保数据的时间顺序。is_deleted 字段表示该行数据是否被删除,0 表示存在,1 表示被删除。

接下来,插入一些测试数据:

INSERT INTO users VALUES
    (1, 'Alice', 'alice@example.com', 0, '2024-04-25 08:00:00'),
    (2, 'Bob', 'bob@example.com', 0, '2024-04-25 09:00:00'),
    (3, 'Charlie', 'charlie@example.com', 0, '2024-04-25 10:00:00');

进行查询:

SELECT * FROM users;

在这里插入图片描述

现在,来模拟更新和删除操作的增量写入,假设用户 Bob 的邮箱地址被更新,用户 Charlie 被删除:

-- 更新 Bob 的邮箱地址
INSERT INTO users VALUES
    (2, 'Bob', 'new_bob@example.com', 0, '2024-04-25 11:00:00');

-- 删除 Charlie
INSERT INTO users VALUES
    (3, 'Charlie', '', 1, '2024-04-25 12:00:00');

增量写入后,表中的数据如下所示:

在这里插入图片描述
可以看到,这种情况就出现了重复数据。

但是,我们现在可以使用标记以及 Group By 语句查询每个用户的最新信息,手动过滤失效信息:

SELECT 
	id,
	argMax(name,event_time) name,
	argMax(email,event_time) email,
	argMax(is_deleted,event_time) is_deleted,
    max(event_time) max_event_time
FROM 
	users
GROUP BY 
	id
HAVING
	is_deleted = 0;

按照用户 ID 进行分组,按 event_time 字段当前的最大值,取对应行所有字段的数据。

这里需要注意的是,最后取 event_time 最大值时,重命名字段必须与之前不同,否则会报错。

例如 max(event_time) event_time 这种写法是错误的,因为该列已经被其它聚合函数 argMax 引用了。

在 ClickHouse 中,可以使用 argMax 函数来获取满足指定条件的某个字段的最大值所在行对应的另一个字段的值。argMax 函数通常与 GROUP BY 结合使用,以便在每个分组中找到满足条件的最大值对应的其他字段的值。

运行结果如下:

在这里插入图片描述

可以看到,我们成功的过滤掉了失效数据(用户 Bob 的邮箱地址被更新,用户 Charlie 被删除)

5.允许偏差

当我们在对某个指标进行计算时,并不关心该指标最终特别准确的值,或者说允许偏差一点,重复的数据量并不大,不会对总体造成影响

那么这种情况我们就不需要去确保该份数据的一致性,只需要确保最终一致性即可,此时选用 ReplacingMegreTree 引擎作为数据去重方案即可,不用大费周章的去花时间设计。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/574590.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

硬件玩物 | 性价比超高的NAS,威联通【TS-464-C2】快速上手初体验!

[ 知识是人生的灯塔,只有不断学习,才能照亮前行的道路。] 大家好,我是【WeiyiGeek/唯一极客】一个正在向全栈工程师(SecDevOps)前进的技术爱好者 作者微信:WeiyiGeeker 公众号/知识星球:全栈工程师修炼指南 主页博…

1、k8s问题pod从service中剔除

一、起因 redis原来由两服务器的集群变为三服务器的集群,通过statefulset扩展了两节点,并把redis-app-0和redis-app-3从集群中去除,但是由于service路由后端不变,导致程序连接后端仍然可能到redis-app-0和redis-app-3 二、处理 …

【Unity 实用工具篇】 | UIEffect 实现一系列UGUI特效,描边及阴影特效

前言 【Unity 实用工具篇】 | UIEffect 实现一系列UGUI特效,描边及阴影特效一、UGUI特效插件:UIEffect1.1 介绍 二、组件属性面板三、代码操作组件 总结 前言 在Unity中 UGUI 的使用几乎是必不可少的,虽然也有NGUI、FGUI等可以使用&#xff…

Spring Kafka——基于 Spring Kafka 实现动态管理 Kafka 连接和 topic 的监听

文章目录 使用 Spring Kafka 动态管理 Kafka 连接和主题监听1. 前言2. 简单的消费程序配置3. Spring Kafka 主要的相关类的说明4. KafkaListener 注解的加载执行流程解析5. 动态监听消费订阅的设计与实现 使用 Spring Kafka 动态管理 Kafka 连接和主题监听 文章内容较长&#x…

Error opening file a bytes-like object is required,not ‘NoneType‘

错误显示,打开的是一个无效路径的文件 查看json文件内容,索引的路径与json文件保存的路径不同 方法:使用python脚本统一修改json文件路径 import json import os import argparse import cv2 from tqdm import tqdm import numpy as np impo…

组合优于继承:什么情况下可以使用继承?

C设计模式专栏:http://t.csdnimg.cn/8Ulj3 目录 1.引言 2.为什么不推荐使用继承 3.相比继承,组合有哪些优势 4.如何决定是使用组合还是使用继承 1.引言 面向对象编程中有一条经典的设计原则:组合优于继承,也常被描述为多用组合&#xff0…

JavaScript原理篇——深入理解作用域、作用域链、闭包、this指向

执行上下文描述了代码执行时的环境,包括变量对象、作用域链和 this 值;而作用域则决定了变量和函数的可访问性范围,分为全局作用域和局部作用域。 变量对象用于存储变量和函数声明:是与执行上下文相关联的数据结构,用于…

USB设备的音频类UAC

一、UAC简介 UAC(USB Audio Class)是USB设备的音频类,它定义了USB音频设备与主机计算机通信的方式。UAC标准是USB规范的一部分,并受到各种操作系统(包括Windows、macOS和Linux)的支持。 UAC是基于libusb,实…

代码随想录算法训练营第五十一天| 309.最佳买卖股票时机含冷冻期,714.买卖股票的最佳时机含手续费,总结

题目与题解 参考资料:买卖股票总结 309.最佳买卖股票时机含冷冻期 题目链接:309.最佳买卖股票时机含冷冻期 代码随想录题解:309.最佳买卖股票时机含冷冻期 视频讲解:动态规划来决定最佳时机,这次有冷冻期!|…

【自然语言处理】InstructGPT、GPT-4 概述

InstructGPT官方论文地址:https://arxiv.org/pdf/2203.02155.pdf GPT-4 Technical Report:https://arxiv.org/pdf/2303.08774.pdf GPT-4:GPT-4 目录 1 InstructGPT 2 GPT-4 1 InstructGPT 在了解ChatGPT之前,我们先看看Instr…

k8s pod 无法启动一直ContainerCreating

情况如下,更新 pod 时,一直在ContainerCreating 查看详细信息如下 Failed to create pod sandbox: rpc error: code Unknown desc [failed to set up sandbox container “334d991a478b9640c66c67b46305122d7f0eefc98b2b4e671301f1981d9b9bc6” networ…

Jsoncpp搭建交叉编译环境(移植到arm)

1. 官网下载源码 github地址:GitHub - open-source-parsers/jsoncpp at update 2. 交叉编译环境 当前平台/开发平台-编译环境: [rootlocalroot ~]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) [rootlocalroot ~]# uname -a Lin…

Django框架之Django安装与使用

一、Django框架下载 首先我们需要先确定好自己电脑上的python解释器环境,否则会导致后面项目所需要的库安装不了以及项目无法运行的问题。 要下载Django并开始使用它,你可以按照以下步骤进行: 1、安装Python 首先,确保你的计算…

C/C++开发,opencv-ml库学习,支持向量机(SVM)应用

目录 一、OpenCV支持向量机(SVM)模块 1.1 openCV的机器学习库 1.2 SVM(支持向量机)模块 1.3 支持向量机(SVM)应用步骤 二、支持向量机(SVM)应用示例 2.1 训练及验证数据获取 2…

报错:OpenGL.error.NullFunctionError: Attempt to call an undefined function”

文件我已经上传 CSDN默认就是收费的 我修改不了 免费链接在文中 请寻找 OpenGL.error.NullFunctionError: Attempt to call an undefined function” 环境陈述: windows11 AMD-R9 python版本3.9.9 背景: 通过pip安装pip install PyOpenGL安装PyOpenGL模块后 运行出现的问题…

NLP Step by Step -- How to use pipeline

正如我们在摸鱼有一手:NLP step by step -- 了解Transformer中看到的那样,Transformers模型通常非常大。对于数以百万计到数千万计数十亿的参数,训练和部署这些模型是一项复杂的任务。此外,由于几乎每天都在发布新模型&#xff0c…

数据挖掘实验一

一、实验环境及背景 使用软件: Anaconda3 Jupyter Notebook 实验内容: 1.使用Tushare或者其他手段获取任意两支股票近三个月的交易数据。做出收盘价的变动图像。2.使用Pandas_datareader获取世界银行数据库中美国(USA)、瑞典&…

Windows电脑中护眼(夜间)模式的开启异常

我的电脑是联想小新16pro,Windows11版本。之前一直可以正常使用夜间模式,但是经过一次电脑的版本更新之后,我重启电脑发现我的夜间模式不能使用了。明明显示开启状态,但是却不能使用,电脑还是无法显示夜间模式。 询问…

基于Spring Boot的考研资讯平台设计与实现

基于Spring Boot的考研资讯平台设计与实现 开发语言:Java框架:springbootJDK版本:JDK1.8数据库工具:Navicat11开发软件:eclipse/myeclipse/idea 系统部分展示 系统功能界面图,在系统首页可以查看首页、考…

【Qt QML】TabBar的用法

Qt Quick中的TabBar提供了一个基于选项卡的导航模型。TabBar由TabButton控件填充,并且可以与任何提供currentIndex属性的布局或容器控件一起使用,例如StackLayout或SwipeView。 import QtQuick import QtQuick.Controls import QtQuick.LayoutsWindow …
最新文章