2024.1.8 Day04_SparkCore_homeWork

目录

1. 简述Spark持久化中缓存和checkpoint检查点的区别

2 . 如何使用缓存和检查点?

3 . 代码题

浏览器Nginx案例

先进行数据清洗,做后续需求用

1、需求一:点击最多的前10个网站域名

2、需求二:用户最喜欢点击的页面排序TOP10

3、需求三:统计每分钟用户搜索次数

学生系统案例

4. RDD依赖的分类

5. 简述DAG与Stage 形成过程 

DAG :  

Stage : 


1. 简述Spark持久化中缓存和checkpoint检查点的区别

1- 数据存储位置不同
    缓存: 存储在内存或者磁盘 或者 堆外内存中
    checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
    缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
    checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
    缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
    checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
    
4- 主要作用不同:
    缓存: 提高Spark程序的运行效率
    checkpoint检查点: 提高Spark程序的容错性

2 . 如何使用缓存和检查点?

       将两种方案同时用在一个项目中, 先设置缓存,再设置检查点 ,  最后一同使用Action算子进行触发, 这样程序只会有一次IO操作, 如果先设置检查点的话,就会有2次IO操作;

         当在后续工程中读取数据的时候,优先从缓存中读取,如果缓存中没有数据, 再从检查点读取数据,并且会将数据缓存一份到内存中 ,后续直接从缓存中读取数据

3 . 代码题

浏览器Nginx案例

        

先进行数据清洗,做后续需求用

import os
from pyspark import SparkConf, SparkContext,StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

if __name__ == '__main__':
# 1- 创建SparkSession对象
    conf = SparkConf().setAppName('需求1').setMaster('local[*]')
    sc = SparkContext(conf=conf)
# 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/2024.1.2_Spark/1.6_day04/SogouQ.sample')

# 3- 数据处理
    filter_tmp_rdd = init_rdd.filter(lambda line:line.strip()!='')
    print('过滤空行的数据',filter_tmp_rdd.take(10))
    map_rdd = filter_tmp_rdd.map(lambda line:line.split())
    print('map出来的数据',map_rdd.take(10))
    len6_rdd = map_rdd.filter(lambda line:len(line)==6)
    print('字段数为6个的字段',len6_rdd.take(10))
    etl_rdd = len6_rdd.map(lambda list:(
        list[0],list[1],list[2][1:-1],list[3],list[4],list[5])  )
    print('转换成元组后的数据',etl_rdd.take(10))
    # 设置缓存
    etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

1、需求一:点击最多的前10个网站域名


    print('点击最多的前10个网站域名','-'*50)
    website_map_rdd = etl_rdd.map(lambda tup:(tup[5].split('/')[0],1))
    print('把网站域名切出来,变成(hello,1)的格式',website_map_rdd.take(10))
    website_reducekey_rdd = website_map_rdd.reduceByKey(lambda agg,curr:agg+curr)
    print('进行聚合',website_reducekey_rdd.take(10))
    sort_rdd =website_reducekey_rdd.sortBy(lambda tup:tup[1],ascending=False)
    print('进行降序排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源
    sc.stop()

2、需求二:用户最喜欢点击的页面排序TOP10

    print('用户最喜欢点击的页面排序TOP10','-'*100)
    top_10_order = etl_rdd.map(lambda tup:(tup[4],1))
    print('点击量排行',top_10_order.take(10))
    top_10_reducebykey = top_10_order.reduceByKey(lambda agg,curr:agg+curr)
    print('进行聚合',top_10_reducebykey.take(10))
    sortby_top10 = top_10_reducebykey.sortBy(lambda line:line[1],ascending=False)
    print('进行排序',sortby_top10.take(10))
# 4- 数据输出
# 5- 释放资源
    sc.stop()

3、需求三:统计每分钟用户搜索次数

    print('统计每分钟用户搜索次数','-'*50)
    search_map_rdd = etl_rdd.map(lambda tup:(tup[0][0:5],1))
    print('把网站域名切出来,变成(hello,1)的格式',search_map_rdd.take(10))
    search_reducekey_rdd = search_map_rdd.reduceByKey(lambda agg,curr:agg+curr)
    print('进行聚合',search_reducekey_rdd.take(10))
    sort_rdd =search_reducekey_rdd.sortBy(lambda tup:tup)
    print('按照时间进行排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源
    sc.stop()

学生系统案例

 数据准备

import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

if __name__ == '__main__':
# 1- 创建SparkSession对象
    conf = SparkConf().setAppName('学生案例').setMaster('local[*]')
    sc = SparkContext(conf=conf)
# 2- 数据输入
    init_rdd= sc.textFile('hdfs://node1:8020/input/day04_home_work.txt')

# 3- 数据处理
    stu_rdd = init_rdd.map(lambda line:line.split(',')).cache()
    print('切分后的数据为',stu_rdd.collect())
# 1、需求一:该系总共有多少学生
    stu_cnt = stu_rdd.map(lambda line:line[0]).distinct().count()
    print(f'该系总共有{stu_cnt}个学生')
# 2、需求二:该系共开设了多少门课程
    subject_cnt = stu_rdd.map(lambda line:line[1]).distinct().count()
    print(f'该系共开设了{subject_cnt}门课程')
# 3、需求三:Tom同学的总成绩平均分是多少
    tom_score_sum = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:int(line[2])).sum()
    tom_subject_num = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:line[1]).distinct().count()
    tom_score_avg = tom_score_sum/tom_subject_num
    print(f'Tom同学的总成绩平均分是{round(tom_score_avg,2)}')

# 4、需求四:求每名同学的选修的课程门数
#     every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct().map(lambda tup: (tup[0], 1))\
#         .reduceByKey(lambda agg, curr: agg + curr).collect()
    every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct()
    print('学生与选修课,把一个学生重修一门选修课的情况去掉',every_student_course_num.collect())
    every_student_course_num2 = every_student_course_num\
        .map(lambda tup:(tup[0],1))\
        .reduceByKey(lambda agg,curr:agg+curr).collect()
    print('每个同学的选修课数',every_student_course_num2)
# 5、需求五:该系DataBase课程共有多少人选修
    subject_database = stu_rdd.filter(lambda line:line[1]=='DataBase').map(lambda line:line[0]).distinct().count()
    print(f'数据库有{subject_database}人选修')
# 6、需求六:各门课程的平均分是多少
    total_score = stu_rdd.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))
    print('各科总分为',total_score.collect())
    total_num = stu_rdd.map(lambda x: (x[1], 1)).groupByKey().map(lambda x: (x[0], sum(x[1])))
    print('各科的数量为',total_num.collect())
    #
    total_join =total_score.join(total_num)
    print('join后结果',total_join.collect())
# 各科总分为 [('DataBase', 170), ('Algorithm', 110), ('DataStructure', 140)]
# 各科的数量为 [('DataBase', 2), ('Algorithm', 2), ('DataStructure', 2)]
# 合表后为 [('DataBase', (170, 2)), ('DataStructure', (140, 2)), ('Algorithm', (110, 2))]
    total_avg =total_score.join(total_num).map(lambda x: (x[0], round(x[1][0] / x[1][1], 2))).collect()
    print('各科目的平均分为',total_avg)
# 4- 数据输出

# 5- 释放资源
    sc.stop()

4. RDD依赖的分类

窄依赖:  父RDD分区与子RDD分区是一对一关系

宽依赖:  父RDD分区与子RDD分区是一对多关系

5. 简述DAG与Stage 形成过程 

DAG :  

1-Spark应用程序,遇到了Action算子以后,就会触发一个Job任务的产生。Job任务首先将它所依赖的全部算子加载到内存中,形成一个完整Stage

2-会根据算子间的依赖关系,从Action算子开始,从后往前进行回溯,如果算子间是窄依赖,就放到同一个Stage中;如果是宽依赖,就形成新的Stage。一直回溯完成。

Stage : 

1-Driver进程启动成功以后,底层基于PY4J创建SparkContext对象,在创建SparkContext对象的过程中,还会同时创建DAGScheduler(DAG调度器)和TaskScheduler(Task调度器)
    DAGScheduler: 对Job任务形成DAG有向无环图和划分Stage阶段
    TaskScheduler: 调度Task线程给到Executor进程进行执行

2-Spark应用程序遇到了一个Action算子以后,就会触发一个Job任务的产生。SparkContext对象将Job任务提交DAG调度器,对Job形成DAG有向无环图和划分Stage阶段。并且确定每个Stage阶段需要有多少个Task线程,将这些Task线程放置在TaskSet集合中。再将TaskSet集合给到Task调度器。

3-Task调度器接收到DAG调度器传递过来的TaskSet集合以后,将Task线程分配给到具体的Executor进行执行,底层是基于调度队列SchedulerBackend。Stage阶段是一个一个按顺序执行的,不能并行执行。

4-Executor进程开始执行具体的Task线程。后续过程就是Driver监控多个Executor的执行状态,直到Job任务执行完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/312769.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

制造知识普及--MES系统中的调度排产管理

要想弄清楚MES系统调度排产的管理机制,则要首先搞清楚车间调度排产是一套怎样的工作流程,它的难点在什么地方? 生产调度指的是具体组织实现生产作业计划的工作,是对执行生产作业计划过程中发生的问题和可能出现的问题&#xff0c…

PingCAP 受邀参加 FICC 2023,获 Open100 世纪全球开源贡献奖

2023 年 12 月,2023 国际测试委员会智能计算与芯片联邦大会(FICC 2023)在海南三亚举办,中外院士和数十位领域专家莅临出席。 大会现场 ,开放源代码促进会创始人 Bruce Perens 颁发了 Open100 世纪全球开源贡献奖&…

WXUI 基于uni-app x开发的高性能混合UI库

uni-app x 是什么? uni-app x,是下一代 uni-app,是一个跨平台应用开发引擎。 uni-app x 没有使用js和webview,它基于 uts 语言。在App端,uts在iOS编译为swift、在Android编译为kotlin,完全达到了原生应用…

GPT Store,是否会成为下一个App Store?

经历了一场风波后,原本计划推出的GPT Store终于成功上线。OpenAI在北京时间1月11日推出了GPT Store,被广泛视为类似于苹果的"App Store",为人工智能应用生态系统迈出了重要一步。然而,OpenAI要想将GPT Store打造成苹果般…

vue知识-05

聊天室案例(django接口) # chat.hetm<<script src"/static/axios.js"></script><script src"/static/vue.js"></script><body> <div id"app"><h1>聊天室</h1><button click"handleS…

YOLOv8涨点改进:多层次特征融合(SDI),小目标涨点明显,| UNet v2,比UNet显存占用更少、参数更少

💡💡💡本文独家改进:多层次特征融合(SDI),能够显著提升不同尺度和小目标的识别率 如何引入到YOLOv8 1)替代原始的Concat; 💡💡💡Yolov8魔术师,独家首发创新(原创),适用于Yolov5、Yolov7、Yolov8等各个Yolo系列,专栏文章提供每一步步骤和源码,轻松带你…

汽车级线性电压稳压器LM317MBSTT3G:新能源汽车的理想之选

LM317MBSTT3G是一款可调三端子正向线性稳压器&#xff0c;能够在 1.2 V 至 37 V 的输出电压范围内提供 500 mA 以上的电流。此线性电压稳压器使用非常简便&#xff0c;仅需两个外部电阻即可设置输出电压。另外&#xff0c;它采用内部电流限制、高温关断和安全区域补偿&#xff…

npm发布js函数库

直接上干货吧 首先进入npm官网注册账号下面会用到 1.新建文件夹例如 chengyu 2.cdm chengyu 3.npm init &#xff08;填写一些基本信息一直y就可以 后面可以直接修改文件&#xff09;结束之后多了一个package.json文件就是下面的样子 {"name": "brogramme…

多测师肖sir___ui自动化测试po框架讲解版

po框架 一、ui自动化po框架介绍 &#xff08;1&#xff09;PO是Page Object的缩写 &#xff08;2&#xff09;业务流程与页面元素操作分离的模式&#xff0c;可以简单理解为每个页面下面都有一个配置class&#xff0c; 配置class就用来维护页面元素或操作方法 &#xff08;3&am…

Spring Boot中加@Async和不加@Async有什么区别?设置核心线程数、设置最大线程数、设置队列容量是什么意思?直接在yml中配置线程池

在 Spring 中&#xff0c;Async 注解用于将方法标记为异步执行的方法。当使用 Async 注解时&#xff0c;该方法将在单独的线程中执行&#xff0c;而不会阻塞当前线程。这使得方法可以在后台执行&#xff0c;而不会影响主线程的执行。 在您提供的代码示例中&#xff0c;a1() 和…

【深蓝学院】手写VIO第11章--Square Root Bundle Adjustment

文章目录 1. 文章贡献2. 在Jacobian中添加 d a m p \sqrt{damp} damp ​等价于在Hessian中添加damp2. Givens旋转3. Jacobian推导4. Refernece 1. 文章贡献 这篇文章最大的贡献在于证明了对 J l Jl Jl进行QR分解&#xff0c;对normal equation左乘 Q T Q^T QT等价于Schur comp…

创建 SSL证书并应用于WebSocket

写在前面 由于上一篇介绍 如何使用Fleck创建WebSocket服务器 &#xff0c;感觉不够完善&#xff0c;因为生产环境中肯定是需要用到ssl的&#xff0c;而创建或申请ssl证书&#xff0c;相对而言是比较繁琐的事情&#xff0c;特别是本地如果要构建一个使用ssl的测试环境时&#x…

记录一次数据中包含转义字符\引发的bug

后端返回给前端的数据是: { "bizObj": { "current": 1, "orders": [ ], "pages": 2, "records": [ { "from": "1d85b8a4bd33aaf99adc2e71ef02960e", …

【办公技巧】Word功能区灰色显示不能编辑,怎么破?

Word文档可以设置加密来保护文件禁止修改&#xff0c;但是在word文档中设置限制编辑功能时对它的作用是否有详细的了解呢&#xff1f;今天为大家介绍word限制编辑功能的作用以及忘记了限制编辑密码该如何解决。 设置限制大家应该都清楚&#xff0c;就是点击工具栏中的审阅 – …

如何让GPT/GPT4成为你的编程助手?

详情点击链接&#xff1a;如何让GPT/GPT4成为你的编程助手&#xff1f; 一OpenAI 1.最新大模型GPT-4 Turbo 2.最新发布的高级数据分析&#xff0c;AI画图&#xff0c;图像识别&#xff0c;文档API 3.GPT Store 4.从0到1创建自己的GPT应用 5. 模型Gemini以及大模型Claude2二…

Edge扩展插件安装

目录 一、 Edge扩展插件安装步骤 1.1 打开 Edge 浏览器&#xff0c;并点击右上角的菜单按钮&#xff08;三个水平点&#xff09;。 1.2 在菜单中选择“扩展”选项。 1.3 在扩展页面通过“查找新扩展”来找到你想要安装的扩展插件。 ​编辑 1.4 搜索插件 1.5 在详情页面中…

数据交付变革:研发到产运自助化的转型之路

作者 | Chris 导读 本文讲述为了提升产运侧数据观察、分析、决策的效率&#xff0c;支持业务的快速迭代&#xff0c;移动生态数据研发部对数仓建模与BI工具完成升级&#xff0c;采用宽表建模与TDA平台相结合的方案&#xff0c;一站式自助解决数据应用需求。在此过程中&#xff…

SpringBoot 集成 Kafka 高级实现

1、简介 之前博客中记录了直接使用Kafka客户端实现生产者和消费者之间的交互&#xff0c;这种方式通过设置各种参数编码繁琐&#xff0c;因此通过SpringBoot集成Kafka成为一种常用的实现&#xff0c;下面就详细介绍 SpringBoot 是如何和Kafka进行集成的&#xff0c;本文主要参考…

响应式Web开发项目教程(HTML5+CSS3+Bootstrap)第2版 例3-4 CSS 立方体

代码 <!doctype html> <html> <head> <meta charset"utf-8"> <title>CSS 立方体</title> <link href"CSS/style.css" rel"stylesheet" type"text/css"> <style> .box {width: 200px…

解密!神奇代码消除 Vue 中 Mac 电脑左滑右滑页面跳转

想知道如何让Mac电脑左滑右滑不再意外跳转页面吗&#xff1f;本文将揭示一个独家秘籍&#xff0c;通过简单的一行代码&#xff0c;让你的用户体验飞速提升&#xff01;别错过这个让你的Vue表格组件更顺畅的宝贵技巧&#xff01; 最近&#xff0c;我在使用 Vue 开发表格组件时遇…
最新文章