Spark编程实验三:Spark SQL编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
        sc = SparkContext("local","Simple App")
        spark=SparkSession(sc)
        peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")
        rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
        rowRDD.createOrReplaceTempView("employee")
        personsDF = spark.sql("select * from employee")
        personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/267444.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何利用flume进行日志采集

介绍 Apache Flume 是一个分布式、可靠、高可用的日志收集、聚合和传输系统。它常用于将大量日志数据从不同的源(如Web服务器、应用程序、传感器等)收集到中心化的存储或数据处理系统中。 基本概念 Agent(代理): …

039、转置卷积

之——增大高宽 杂谈 通常来说,卷积不会增大输入的高宽,通常要么不变,要么减半;如果想要直接padding来增加高宽,在不断的卷积过程中,padding的0越来越多,最后要做像素级的判断时候,由…

分布式核心技术之分布式共识

文章目录 什么是分布式共识?分布式共识方法PoWPoSDPoS 三种分布式共识算法对比分析 选主过程就是一个分布式共识问题,因为每个节点在选出主节点之前都可以认为自己会成为主节点,也就是说集群节点“存异”;而通过选举的过程选出主节…

基于Java SSM框架实现医院挂号上班打卡系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现医院挂号上班打卡系统演示 摘要 在网络发展的时代,国家对人们的健康越来越重视,医院的医疗设备更加先进,医生的医术、服务水平也不断在提高,给用户带来了很大的选择余地,而且人们越来越追求更个…

【leetcode100-019】【矩阵】螺旋矩阵

【题干】 给你一个 m 行 n 列的矩阵 matrix ,请按照 顺时针螺旋顺序 ,返回矩阵中的所有元素。 【思路】 不难注意到,每进行一次转向,都有一行/列被输出(并失效);既然已经失效,那我…

MyBatis-Plus中默认方法对应的SQL到底长啥样?

我希望成为:自媒体圈中技术最好、实战经验最丰富的达人,技术圈中最会分享的架构师。加油! 我的公众号:Hoeller 过段时间要给公司同事做Mybatis-Plus相关的培训,所以抓紧时间看看Mybatis-Plus的源码,顺便也分…

RT-Thread 内核对象管理框架

内核对象管理框架 RT-Thread采用内核对象管理系统来访问/管理所有内核对象,内核对象包含了内核中绝大部分设施,这些内核对象可以是静态分配的静态对象,也可以是从系统内存堆中分配的动态对象。 RT-Thread内核对象包括:线程&…

使用VisualStutio2022开发第一个C++程序

使用VisualStudio2022创建C项目 第一步:新建C的控制台应用 第二步:填写项目名称和代码存放位置,代码的存放目录不要有中文名 第三步:点击创建,VisualStudio会自动开始帮我们创建项目 第四步:项目创建好以后&…

2023,测试开发人的年终总结

根据TesterHome社区发帖整理。从该年终总结可以看出当前测试行业,哪怕是测试开发也是竞争很厉害。作为测试同仁,不但要掌握功能测试、接口测试、性能测试,掌握各种工具使用,还得懂开发,懂Java/Python,懂VUE…

洛谷——P3884 [JLOI2009] 二叉树问题(最近公共祖先,LCA)c++

文章目录 一、题目[JLOI2009] 二叉树问题题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 提示基本思路: 一、题目 [JLOI2009] 二叉树问题 题目描述 如下图所示的一棵二叉树的深度、宽度及结点间距离分别为: 深度: 4 4 4宽度&…

【AI提示词故事】《启示之星:寻找神殿的探险之旅》

故事叙述 在未来的某一天,人类发现了一个神秘的星球,被称为“启示之星”。传说在这颗星球上,有一座可以实现人类最大愿望的神殿。 启示之星 一支探险队被派往这颗星球,他们的目标是寻找神殿并实现自己的最大愿望。 在星球上&…

关于 curl 常用命令的使用整理【不定期更新】

目录 1. HTTP 请求2. 文件操作2.1 文件下载2.2 文件上传2.3 FTP 操作 3. 代理和网络设置4. 身份验证5. 调试和信息显示6. more and more curl 是一个用于在命令行下进行数据传输的工具,支持多种协议,包括 HTTP、HTTPS、FTP、FTPS、SCP、SFTP 等。它通常用…

结合ChatGPT和MINDSHOW自动生成PPT

总结/朱季谦 一、首先,通过chatGPT说明你的需求,学会提问是Ai时代最关键的一步。你需要提供一些关键信息,如果没有关键信息,就按照大纲方式让它设计,例如,我让它帮我写一份《2023年年中述职报告》的模版—…

项目管理进阶之序言

背景 可能任何一个程序猿/媛都有一个梦想,立志成为一个技术Leader,带领一个Team,完成一个组织中重要的Project。 有些人天赋异禀,光彩夺目,从小已形成的某些特质,足以让他/她胜任这个领域,我们…

第11章 GUI Page428 步骤七 设置圆,矩形,文字的前景色

运行效果: 关键代码: 分别设置圆,矩形,和文字的画笔颜色,其中文字的设置方法稍有不同 圆: 矩形: 文字:

CV算法面试题学习

本文记录了CV算法题的学习。 CV算法面试题学习 1 点在多边形内(point in polygon)2 高斯滤波器3 ViTPatch EmbeddingPosition EmbeddingTransformer Encoder完整的ViT模型 4 SE模块5 Dense Block6 Batch Normalization 1 点在多边形内(point …

2000-2021年全国各省三农指标数据(700+指标)

2000-2021年全国各省三农指标数据合集(700指标) 1、时间:2000-2021年 2、来源:整理自2001-2022年农村年鉴 3、范围:31省市 4、指标:、农村经济在国民经济中的地位、社会消费品零售额_亿元、社会消费品零…

持续集成交付CICD:Jira 发布流水线

目录 一、实验 1.环境 2.GitLab 查看项目 3.Jira 远程触发 Jenkins 实现合并 GitLab 分支 4.K8S master节点操作 5.Jira 发布流水线 一、实验 1.环境 (1)主机 表1 主机 主机架构版本IP备注master1K8S master节点1.20.6192.168.204.180 jenkins…

Linux环境变量剖析

一、什么是环境变量 概念:环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数,是在操作系统中一个具有特定名字的对象,它包含了一个或多个应用程序所将使用到的信息&#xff0c…

Open3D 点云数据处理基础(Python版)

Open3D 点云数据处理基础(Python版) 文章目录 1 概述 2 安装 2.1 PyCharm 与 Python 安装 2.3 Anaconda 安装 2.4 Open3D 0.13.0 安装 2.5 新建一个 Python 项目 3 点云读写 4 点云可视化 2.1 可视化单个点云 2.2 同一窗口可视化多个点云 2.3…