开发实战角度:distinct实现原理及具体优化总结

1.背景

Distinct是一种常用的操作,在所有数据库的SQl语言中都是一个非常重要的操作,在Hive中,Distinct去重原理是通过MapReduce来实现的,Distinct操作可以应用于单个列,亦可以应用于多个列。基本原理是将输入的数据集按照指定的列进行分组,在每个分组内部去除重复的值,最后将每个分组的唯一值合并成一个结果集。

最近一位好学的小伙伴在学习的过程中,学习到count distinct 的这块内容的时候,基于单个count  distinct 的原理,可以理解的不错,但是对于多字段的count distinct的时候讲的就很难让人理解,今天就来给大家总结一下。

2.Group By

group by的操作适合我们的聚合时distinct息息相关的,所以在我们总结distinct 之前 ,我们不得不先来看一下group by 操作的具体实现原理。

  1. map阶段,将group by后的字段组合作为一个key,如果group by单个字段,那么key就一个。将group by之后要进行的聚合操作字段作为值,如果要进行count,则value是赋1;如要sum另一个字段,那么value就是该字段。

  2. shuffle阶段,按照key的不同分发到不同的reducer。注意此时可能因为key分布不均匀而出现数据倾斜的问题。这个问题是我们处理数据倾斜比较常规的查找原因的方法之一,也是我们解决数据倾斜的处理阶段。

  3. reduce阶段,如果是count将相同key的值累加,如果是其他操作,按需要的聚合操作,得到结果。

实例如下图,对应语句是:

with tmp1 as (
  select
    'a' as pro,
    '1' as city
  union all
  select
    'a',
    '1'
  union all
  select
    'a',
    '1'
  union all
  select
    'b',
    '0'
)
select
  pro,
  city,
  count(*)
from
  tmp1
group by
  pro,
  city

在这个阶段如果出现数据倾斜,我们也可以进行相应的处理,常见的就是把key单独拉出来计算,也可以替换随机数,当然除去替换key为随机数、提前挑出大数量级key值等通用调优方法,适用于group by的特殊方法有以下几种:

  1. set hive.map.aggr=true,就是开启map端的combiner,减少传到reducer的数量级,同时需要设置参数hive.groupby.mapaggr.checkinterval, 规定在 map 端进行聚合操作的条目数目。

  2. 设置mapred.reduce.tasks为较大数量,用来降低每个reducer处理的数据量。

  3. set hive.groupby.skewindata=true,该参数也是比较常规的设置,该参数可自动进行负载均衡。生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中,最后完成最终的聚合操作。

3.Distinct 单字段

原理

上面我们可以清晰的明白group  by的流程,那么接下来我们再来回顾一下distinct 的具体实现,当执行Distinct操作时,Hive会将操作转化为一个MapReduce作业,并按照指定的列进行分组。在Map阶段,每个Mapper会读取输入数据,并将指定的列作为输出的key,然后,通过Shuffle过程将具有相同key的数据发送到同一个Reducer中。

我们可以看出,由于使用了distinct,这就导致在map端的combine无法合并重复数据,所以必须将id作为Key输出,在Reduce阶段再对来自于不同Map Task、相同Key的结果进行消重,计入最终统计值。

对于这种count(distinct)全聚合操作的时候,即使我们设定了reduce task的具体个数,例如set mapred.reduce.tasks=100;hive最终也只会启动一个reducer。这就造成了所有map端传来的数据都在一个tasks中执行,这唯一的Reduce Task需要Shuffle大量的数据,并且进行排序聚合等处理,这使得这个操作成为整个作业的IO和运算瓶颈。

当distinct一个字段时,这里会将group by的字段和distinct的字段组合在一起作为map输出的key,value设置为1,同时将group by的字段定为分区键,这一步非常重要,这样就可以将GroupBy字段作为reduce的key,在reduce阶段,利用mapreduce的排序,输入天然就是按照组合key排好序的。根据分区键将记录分发到reduce端后,按顺序取出组合键中的distinct字段,这时distinct字段也是排好序的。依次遍历distinct字段,每找到一个不同值,计数器就自增1,即可得到count distinct结果。例如下面的SQL语句,过程可以下图示意。

with tmp1 as (
  select
    'a' as pro,
    '1' as city,
    '张三' as userid
  union all
  select
    'a',
    '1',
    '张三'
  union all
  select
    'a',
    '1',
    '张三'
  union all
  select
    'b',
    '0',
    '张三'
)
select
  pro,
  count(distinct userid)
from
  tmp1
group by
  pro

优化

对于单distinct 的优化,我们的课程也提到过很多次,利用Hive对嵌套语句的支持,将原来一个MapReduce作业转换为两个作业,在第一阶段选出全部的非重复的字段id,在第二阶段再对这些已消重的id进行计数;这样在第一阶段我们可以通过增大Reduce的并发数,并发处理Map输出。在第二阶段,由于id已经消重,因此COUNT(*)操作在Map阶段不需要输出原id数据,只输出一个合并后的计数即可。这样即使第二阶段Hive强制指定一个Reduce Task的时候,极少量的Map输出数据也不会使单一的Reduce Task成为瓶颈。

其实在实际运行时,Hive还对这两阶段的作业做了额外的优化。它将第二个MapReduce作业Map中的Count过程移到了第一个作业的Reduce阶段。这样在第一阶Reduce就可以输出计数值,而不是消重的全部id。这一优化大幅地减少了第一个作业的Reduce输出IO以及第二个作业Map的输入数据量。最终在同样的运行环境下优化后的语句可以说是大大提升了执行效率。sql优化模板如下:

SELECT
  COUNT(*)
FROM
  (
    SELECT
      DISTINCT id
    FROM
      TABLE_NAME
    WHERE
      …
  ) t;

4.Distinct 多字段(mult-distinct)

原理

对于mult-distinct,如果我们仍然按照上面一个distinct字段的方法,即下图这种实现方式,无法根据uid和date分别排序,也就无法通过LastKey去重,仍然需要在reduce阶段在内存中通过Hash去重。

with tmp1 as (
  select
    'a' as pro,
    '1' as city,
    '张三' as userid
  union all
  select
    'a',
    '1',
    '张三'
  union all
  select
    'a',
    '1',
    '张三'
  union all
  select
    'b',
    '0',
    '张三'
)
select
  pro,
  count(distinct userid),
  count(distinct city)
from
  tmp1
group by
  pro

所以hive会使用另外一种处理方式,如果查询中有多个 distinct-expression,同一条record,会生成多条记录进行Shuffle,增大Shuffle量;假设有N个distinct,处置数据有M条,那么这布处置后的输出是N*M条数据(这里产生数据膨胀,也是数据倾斜产生的阶段),所以尽量在MAP端过滤尽可能多的数据生。处理方法是对所有的distinct字段编号,那么相同字段就会分别排序,这时只需要在reduce阶段记录LastKey即可去重。这种实现方式很好的利用了MapReduce的排序,节省了reduce阶段去重的内存消耗,但是缺点是增加了shuffle的数据量。需要注意的是,在生成reduce value时,除第一个distinct字段所在行需要保留value值,其余distinct数据行value字段均可为空。

当然我们有读源码的能力的话,而也可以从hive源码里具体代码可参考ReduceSinkOperator.process方法,代码片段如下图所示。

优化

select
  sum(case when d.pv_flag = 1 then 1 else 0 end) as pv,
  count(distinct id) as uv,
  count(distinct ip) as ip,
  sum(distinct otime),
  count(distinct cookie)
from
  access_dap d
where
  log_date = '$YESTERDAY'

对于这种多重distinct的操作,我们也会经常遇见,就比如上面的的sql,这个sql我们可以按照三步走的步骤进行优化。

  • 1.预处理,去重汇总,将能去重的重复的数据先去重,并且尽量过滤数据,收敛数据。

  • 2.以空间换时间,我们的磁盘空间是非常容易获取到,我们可以用union all的把数据根据distinct的字段扩充起来,假如有4个distinct,相当于数据扩充4倍,用rownumber=1来达到间接去重的目的,这里的union all只走一个job。

  • 3.嵌套查询,得到最终结果,这里没有一个distinct,全部走的是普通sum,可以在mapper端提前聚合,同样可以调节reduce数,这个会快很多。

----1
create table tmp1 as
select
  id,
  ip,
  cookie,
  idis_zero,
  sum(case when pv_flag = 1 then 1 else 0 end) as pv,
  sum(otime) as onlinetime
from
  login_log
group by
  id,
  ip,
  cookie,
  idis_zero;

---2
  create table tmp2 as
select
  type,
  type_value,
  rownumber(type, type_value) as rn,
  pv,
  onlinetime
from
  (
    select
      type,
      type_value,
      pv,
      onlinetime
    from
      (
        select
          'id' as type,
          cast(id as string) as type_value,
          pv,
          onlinetime
        from
          tmp1
        where
          idis_zero = 0
        union all
        select
          'ip' as type,
          ip as type_value,
          pv,
          onlinetime
        from
          tmp1
        union all
        select
          'cookie' as type,
          case when cookie = 'null' then 'acorn_cookie' else cookie end as type_value,
          pv,
          onlinetime
        from
          tmp1
      ) t1 cluster by type,
      type_value
  ) t2;
-----------------------------------------------------------------------------------------------------
select
  sum(
    case when type = 'ip' then pv else cast(0 as bigint) end
  ) as pv,
  sum(
    case when type = 'id'
    and rn = 1 then 1 else 0 end
  ) as uv,
  sum(
    case when type = 'ip'
    and rn = 1 then 1 else 0 end
  ) as ip,
  sum(
    case when type = 'ip' then onlinetime else cast('0' as bigint) end
  ) as onlinetime,
  sum(
    case when type = 'cookie'
    and rn = 1 then 1 else 0 end
  ) as cookie,
  '$STA_TYPE',
  '$STA_TYPE'
from
  tmp2

开发实战角度:distinct实现原理及具体优化总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/343393.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot的教务管理系统设计与实现(源码+调试)

项目描述 临近学期结束,还是毕业设计,你还在做java程序网络编程,期末作业,老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SpringBoot的教务管…

StableDiffusion新版汉化

新旧版不同,这里以新版为例,用的是带链接,可以更新的方法。 步骤: 1.找到这个位置,依次点击,注意选项。 2.点击加载,等待刷新。 ctrlF搜索 zh_CN Localization 右边点击install&#xff0c…

[Linux 杂货铺] —— 权限(文件权限和目录配置)

目录 🌈前言 📁 文件的属性 📁 权限的概念 📂拥有者和所属组(角色): 📂用户(具体的人): 📁 权限的管理 📂1. chmod…

Object.defineProperty、Proxy、Reflect-个人总结

Object.defineProperty 前言 用于给一个对象添加或者修改一个属性,返回操作后的对象。 写法:Object.defineProperty(对象,属性,配置对象) 配置对象 通过对配置对象不同的配置,可以将属性分为数据属性和存取属性。 数据…

[Linux]HTTP状态响应码列举

1xx:信息响应类,表示接收到请求并且继续处理 2xx:处理成功响应类,表示动作被成功接收、理解和接受 3xx:重定向响应类,为了完成指定的动作,必须接受进一步处理 4xx:客户端错误&#x…

7.Feign远程调用

2.Feign远程调用 先来看我们以前利用RestTemplate发起远程调用的代码: 存在下面的问题: •代码可读性差,编程体验不统一 •参数复杂URL难以维护 Feign是一个声明式的http客户端,官方地址:https://github.com/OpenF…

RabbitMQ消息应答与发布

消息应答 RabbitMQ一旦向消费者发送了一个消息,便立即将该消息,标记为删除. 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个很长的任务并仅仅执行了一半就突然挂掉了,在这种情况下,我们将丢失正在处理的消息,后续给消费者发送的消息也就无法接收到了. 为了…

【BIAI】Lecture 6 - Somatosensory systems

Lecture 6- Somatosensory systems 专业术语 somatosensory system 体感系统 Thermoreceptors 温度感受器 Photoreceptors 光感受器 Chemoreceptoprs 化学感受器 hairy skin 毛发皮肤 glabrous skin 光滑皮肤 sensory receptors 感觉受体 dermal 真皮的 epidermal 表皮的 axon…

外包干了2个多月,技术退步明显。。。。。

先说一下自己的情况,本科生,19年通过校招进入广州某软件公司,干了接近3年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

创建高打开率邮件标题的技巧:吸引潜在客户的秘诀

邮件打开率是指什么? 邮件打开率是指打开邮件的人数占发送的收件人总人数的比例。 邮件的打开率是衡量营销效果如何的一个非常重要的指标,而邮件标题又是影响邮件打开率非常重要的因素之一。所以,我们要要重视邮件标题。那我们应该如何编辑…

《移动通信原理与应用》——QPSK调制解调仿真

目录 一、QPSK调制与解调流程图: 二、仿真运行结果: 三、MATLAB仿真代码: 一、QPSK调制与解调流程图: QPSK调制流程图: QPSK解调流程图: 二、仿真运行结果: 1、Figure1:为发送端比特流情…

深入了解WPF控件:常用属性与用法(七)

掌握WPF控件:熟练常用属性(七) Menu 用于为应用程序指定命令或选项的项列表。它允许用户通过选择不同的菜单项来执行不同的命令或操作。 每个 Menu 可以包含多个 MenuItem 控件。 每个 MenuItem 都可以调用命令或调用 Click 事件处理程序。…

竞赛保研 电影评论情感分析 - python 深度学习 情感分类

1 前言 🔥学长分享优质竞赛项目,今天要分享的是 🚩 GRU的 电影评论情感分析 - python 深度学习 情感分类 🥇学长这里给一个题目综合评分(每项满分5分) 难度系数:3分工作量:3分创新点:4分 这…

vue3-组件基础

什么是组件 组件允许我们将 UI 划分为独立的、可重用的部分,并且可以对每个部分进行处理。在实际应用中,组件常常被组织成层层嵌套的树状结构。 定义一个组件 我们一般会将 Vue 组件定义在一个单独的 .vue 文件中,这被叫做单文件组件 (简称…

【加速】Ubuntu 22.04 LTS Steam++ Watt Toolkit 加速 github

项目地址 SteamTools: 🛠「Watt Toolkit」是一个开源跨平台的多功能 Steam 工具箱。 下载linux版本 wget https://gitee.com/rmbgame/SteamTools/releases/download/3.0.0-rc.3/Steam%20%20_v3.0.0-rc.3_linux_x64.tgz 解压到/opt/steam sudo mkdir /opt/steam…

【C语言】扫雷游戏完整代码实现

目录 1.game.h 2.game.c 3.progress.c 4.运行结果 1.game.h #define _CRT_SECURE_NO_WARNINGS#include <string.h> #include <stdio.h> #include <time.h> #include<stdlib.h>#define ROW 9 #define COL 9 #define ROWS 11 #define COLS 11 #defin…

ctfshow-反序列化(web271-web276)

目录 web271 web272-273 web274 web275 web276 为什么不用分析具体为什么能成功 ,后面会有几个专题 会对php框架进行更深入的了解 这里面会专门的研究 为什么能够实现RCE 前面作为初步的熟悉 首先知道一下他的框架 知道框架的风格 知道啥版本可以用什么来打 首先先不用太研…

CopyOnWriteArrayList源码

CopyOnWriteArrayList源码 介绍 CopyOnWriteArrayList底层采用数组对元素进行存储&#xff0c;采用写时复制技术:写的时候加锁&#xff0c;将原数组拷贝一份&#xff0c;对新数组进行操作&#xff0c;新数组长度为原数组长度1,写入完成后替换原数组&#xff0c;原数组使用vol…

【Linux】Vagrant搭建Linux环境

1. Vagrant Vagrant是一个基于Ruby的工具&#xff0c;用于创建和部署虚拟化开发环境。它使用Oracle的开源VirtualBox虚拟化系统&#xff0c;使用 Chef创建自动化虚拟环境。 1.1 安装Vagrant 从Vagrant官网下载安装包&#xff0c;执行安装。 1.2 安装VirtualBox 从官网下载…

【linux基础】linux root用户密码忘记解决方式

方式一&#xff1a;进入单用户模式 1.开启虚拟机的时候&#xff0c;按下“e”健进入用户引导界面 2.再按下“e”&#xff0c;进入内核的编辑 3.找到kernel开头的选项&#xff0c;再次按下“e” 4. 输入 5. 按下“b”&#xff0c;启动 6. 启动后&#xff0c;输入passwd&#xf…