【SparkSQL】DataFrame入门(重点:df代码操作、数据清洗API、通过JDBC读写数据库)

【大家好,我是爱干饭的猿,本文重点介绍DataFrame的组成、DataFrame的代码构建、DataFrame的入门操作、词频统计案例、电影数据分析、SparkSQL Shuffle 分区数目、SparkSQL 数据清洗API、DataFrame数据写出、DataFrame 通过JDBC读写数据库(MySQL示例)

后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】

上一篇文章:《【SparkSQL】基础入门(重点:SparkSQL和Hive的异同、SparkSQL数据抽象)》

3. DataFrame入门

3.1 DataFrame的组成

DataFrame是一个二维表结构, 那么表格结构就有无法
绕开的三个点:

  • 表结构描述

比如,在MySQL中的一张表:

  • 由许多行组成
  • 数据也被分成多个列
  • 表也有表结构信息(列、列名、列类型、列约束等)

基于这个前提,DataFrame的组成如下:

  • 在结构层面:
    • StructType对象描述整个DataFrame的表结构
    • StructField对象描述一个列的信息
  • 在数据层面
    • Row对象记录一行数据
    • Column对象记录一列数据并包含列的信息

在这里插入图片描述
如图, 在表结构层面,DataFrame的表结构由:
StructType描述,如下图:

在这里插入图片描述
一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

同时,一行数据描述为Row对象,如Row(1, 张三, 11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息

3.2 DataFrame的代码构建

1. 基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集
其实就是转换一下内部存储的结构,转换为二维表结构

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame
这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)

# coding:utf8

from pyspark.sql import SparkSession


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("rent.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], x[1], int(x[2])))

    # 构建DataFrame对象
    # 参数1 被转换的RDD
    # 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd, schema=['address', 'area', 'price'])

    # 打印DataFrame的表结构
    df.printSchema()

    # 打印df中的数据
    # 参数1 表示 展示出多少条数据, 默认不传的话是20
    # 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
    # 如果给False 表示不阶段全部显示, 默认是True
    df.show(20, False)

    # 将DF对象转换成临时视图表, 可供sql语句查询
    df.createOrReplaceTempView("people")
    spark.sql("select * from people where price > 2000").show()

2. 基于RDD方式2

将RDD转换为DataFrame方式2:
通过StructType对象来定义DataFrame的“表结构”转换RDD

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("rent.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], x[1], int(x[2])))

    # 构建表结构的描述对象: StructType对象
    schema = StructType().\
        add("address", StringType(), nullable=True).\
        add("area", StringType(), nullable=True).\
        add("price", IntegerType(), nullable=True)

    # 基于StructType对象去构建RDD到DF的转换
    df = spark.createDataFrame(rdd, schema=schema)

    df.printSchema()
    df.show()

3. 基于RDD方式3

将RDD转换为DataFrame方式3:
使用RDD的toDF方法转换RDD

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("rent.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], x[1], int(x[2])))

    # 1. toDF的方式构建DataFrame
    df1 = rdd.toDF(["address", "area", "price"])
    df1.printSchema()
    df1.show()

    # 2. toDF的方式2 通过StructType来构建
    schema = StructType().\
        add("address", StringType(), nullable=True).\
        add("area", StringType(), nullable=True).\
        add("price", IntegerType(), nullable=True)

    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()

4. 基于Pandas的DataFrame

将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame(
        {
            "id": [1, 2, 3],
            "name": ["张大仙", "王晓晓", "吕不为"],
            "age": [11, 21, 11]
        }
    )

    df = spark.createDataFrame(pdf)

    df.printSchema()
    df.show()

5. 读取外部数据-test

读取text数据源
使用format(“text”)读取文本数据
读取到的DataFrame只会有一个列,列名默认称之为:value

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 构建StructType, text数据源, 读取数据的特点是, 将一整行只作为`一个列`读取, 默认列名是value 类型是String
    schema = StructType().\
        add("data", StringType(), nullable=True)
    df = spark.read.format("text").schema(schema=schema).load("rent.txt")

    df.printSchema()
    df.show()

6. 读取外部数据-json

读取json数据源
使用format(“json”)读取json数据

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # JSON类型自带有Schema信息
    df = spark.read.format("json").load("rent.json")
    df.printSchema()
    df.show()

7. 读取外部数据-csv

读取csv数据源
使用format(“csv”)读取csv数据

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 读取csv文件
    df = spark.read.format("csv").\
        option("sep", ",").\
        option("header", True).\
        option("encoding", "utf-8").\
        schema("address STRING, area STRING, price INT").\
        load("rent.csv")

    df.printSchema()
    df.show()

8. 读取外部数据-parquet

parquet: 是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多, 他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema (列名\ 列类型\ 是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中的(有压缩属性体积小)

pycharm查看parquet文件插件:Avro and Parquet Viewer

读取parquet数据源
使用format(“parquet”)读取parquet数据

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 读取parquet类型的文件
    df = spark.read.format("parquet").load("rent.parquet")

    df.printSchema()
    df.show()

3.3 DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:

  • DSL风格
  • SQL风格
  1. DSL语法风格
    DSL称之为:领域特定语言。
    其实就是指DataFrame的特有API
    DSL风格意思就是以调用API的方式来处理Data
    比如:df.where().limit()

  2. SQL语法风格
    SQL风格就是使用SQL语句处理DataFrame的数据
    比如:spark.sql(“SELECT * FROM xxx)

1. DSL风格代码演示

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format("csv").\
        schema("address STRING, area STRING, price INT").\
        load("rent.txt")

    # Column对象的获取
    address_column = df['address']
    price_column = df['price']

    # TODO: DLS风格演示
    df.select(["address", "price"]).show()
    df.select("address", "price").show()
    df.select(address_column, price_column).show()

    # 1. filter API
    df.filter("price > 2500").show()
    df.filter(df['price'] > 2500).show()

    # 2. where API
    df.where("price > 2500").show()
    df.where(df['price']> 2500).show()

    # 3. group By API
    df.groupBy("address").count().show()
    df.groupBy(df['address']).count().show()


    # df.groupBy API的返回值 GroupedData
    # GroupedData对象 不是DataFrame
    # 它是一个 有分组关系的数据结构, 有一些API供我们对分组做聚合
    # SQL: group by 后接上聚合: sum avg count min man
    # GroupedData 类似于SQL分组后的数据结构, 同样有上述5种聚合方法
    # GroupedData 调用聚合方法后, 返回值依旧是DataFrame
    # GroupedData 只是一个中转的对象, 最终还是要获得DataFrame的结果
    r = df.groupBy("address")

2. SQL风格代码演示

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format("csv").\
        schema("address STRING, area STRING, price INT").\
        load("rent.txt")

    # 注册成临时表
    df.createTempView("rent")  # 注册临时视图(表)
    df.createOrReplaceTempView("rent_2")  # 注册 或者 替换  临时视图
    df.createGlobalTempView("rent_3")  # 注册全局临时视图 全局临时视图在使用的时候 需要在前面带上global_temp. 前缀

    # 可以通过SparkSession对象的sql api来完成sql语句的执行
    spark.sql("SELECT area, COUNT(*) AS cnt FROM rent GROUP BY area").show()
    spark.sql("SELECT area, COUNT(*) AS cnt FROM rent_2 GROUP BY area").show()
    spark.sql("SELECT area, COUNT(*) AS cnt FROM global_temp.rent_3 GROUP BY area").show()

3.4 词频统计案例

我们来完成一个单词计数需求,使用DSL和SQL两种风格来实现。

# coding:utf8
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
from pyspark.sql import functions as F

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # words.txt:
    # hello hadoop
    # hello spark
    # hello flink

    # TODO 1: SQL 风格进行处理
    rdd = sc.textFile("words.txt").\
        flatMap(lambda x: x.split(" ")).\
        map(lambda x: [x])

    df = rdd.toDF(["word"])

    # 注册DF为表格
    df.createTempView("words")
    spark.sql("select word, count(*) as cnt from words group by word order by cnt desc").show()

    # TODO 2: DSL 风格处理
    df = spark.read.format("text").load("words.txt")

    # withColumn方法
    # 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
    df2 = df.withColumn("value", F.explode(F.split(df["value"], " ")))

    df2.groupBy("value").\
        count().\
        withColumnRenamed("value", "word").\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        show()

3.5 电影数据分析

MovieLens数据集
MovieLens数据集包含多个用户对多部电影的评级数据,也包括电影元数据信息和用户属性信息。
下载地址: http://files.grouplens.org/datasets/movielens
介绍:下面以ml-10Ok数据集为例进行。
介绍:下载u.data文件。
u.data -由943个用户对1682个电影的10000条评分组成。每个用户至少评分20部电影。用户和电影从1号开始连续编号。数据是随机排序的。
在这里插入图片描述

需求:

  1. 查询用户平均分
  2. 查询电影平均分
  3. 查询大于平均分的电影的数量
  4. 查询高分电影中(>3)打分次数最多的用户,并求出此人打的平均分
  5. 查询每个用户的平均打分,最低打分,最高打分
  6. 查询被评分超过100次的电影,的平均分排名TOP10
# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    """
    spark.sql.shuffle.partitions 参数指的是, 在sql计算中, shuffle算子阶段默认的分区数是200个.
    对于集群模式来说, 200个默认也算比较合适
    如果在local下运行, 200个很多, 在调度上会带来额外的损耗
    所以在local下建议修改比较低 比如2\4\10均可
    这个参数和Spark RDD中设置并行度的参数 是相互独立的.
    """

    # 1. 读取数据集
    schema = StructType().\
        add("user_id", StringType(), nullable=True).\
        add("movie_id", IntegerType(), nullable=True).\
        add("rank", IntegerType(), nullable=True).\
        add("ts", StringType(), nullable=True)
    df = spark.read.format("csv").\
        option("sep", "\t").\
        option("header", False).\
        schema(schema=schema).\
        load("u.data")

    df.printSchema()

    # TODO 1: 用户平均分
    df.groupBy("user_id").\
        avg("rank").\
        withColumnRenamed("avg(rank)", "avg_rank").\
        withColumn("avg_rank", F.round("avg_rank", 2)).\
        orderBy("avg_rank", ascending=False).\
        show()

    df.createTempView("movie")
    spark.sql("select user_id, round(avg(rank), 2) as avg_rank from movie group by user_id").show()

    # TODO 2: 电影的平均分查询
    df.groupBy("movie_id").\
        avg("rank").\
        withColumnRenamed("avg(rank)", "avg_rank").\
        withColumn("avg_rank", F.round("avg_rank", 2)).\
        orderBy("avg_rank", ascending=False).\
        show()

    spark.sql("select movie_id, round(avg(rank), 2) as avg_rank from movie group by movie_id order by avg_rank desc").show()

    # TODO 3: 查询大于平均分的电影的数量 # Row
    print("大于平均分电影的数量: {}".format(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count()))

    spark.sql("select count(*) as cnt from movie where rank > (select avg(rank) from movie)").show()

    # TODO 4: 查询高分电影中(>3)打分次数最多的用户, 此人打分的平均分
    user_id = df.where("rank > 3").\
        groupBy("user_id").\
        count().\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        limit(1).\
        first()["user_id"]
    print("user_id: {}".format(user_id))

    df.filter(df["user_id"] == user_id).\
        select(F.round(F.avg("rank"), 2)).show()

    spark.sql("select movie.user_id, round(avg(rank), 2) as avg_rank from movie, (select user_id from movie where rank > 3 group by user_id order by count(*) desc limit 1) as u where u.user_id == movie.user_id group by movie.user_id").show()

    # TODO 5: 查询每个用户的平局打分, 最低打分, 最高打分
    df.groupBy("user_id").\
        agg(
        F.round(F.avg("rank"), 2).alias("avg_rank"),
        F.min("rank").alias("min_rank"),
        F.max("rank").alias("max_rank")
    ).orderBy("avg_rank", ascending=False).\
    show()

    spark.sql("select user_id, round(avg(rank), 2) as avg_rank, min(rank) as min_rank, max(rank) as max_rank from movie group by user_id order by avg_rank desc limit 10").show()

    # TODO 6: 查询评分超过100次的电影, 的平均分 排名 TOP10
    df.groupBy("movie_id").\
        agg(
        F.count("movie_id").alias("cnt"),
        F.round(F.avg("rank"), 2).alias("avg_rank")
    ).where("cnt > 100").\
        orderBy("avg_rank", ascending=False).\
        limit(10).\
        show()

    spark.sql("select movie_id, count(rank) as cnt, round(avg(rank), 2) as avg_rank from movie group by movie_id having count(rank) > 100 order by avg_rank desc limit 10").show()

    # time.sleep(10000) 可以打开spark运行地址查看task任务执行情况 127.0.0.1:4040

"""
1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
2. alias: 它是Column对象的API, 可以针对一个列 进行改名
3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.
# Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
"""

3.6 SparkSQL Shuffle 分区数目

在这里插入图片描述

3.7 SparkSQL 数据清洗API

前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。

# coding:utf8

from pyspark.sql import SparkSession


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    """读取数据"""
    df = spark.read.format("csv").\
        option("sep", ";").\
        option("header", True).\
        load("../data/input/sql/people.csv")

    # TODO 1: 数据清洗: 数据去重
    # dropDuplicates 是DataFrame的API, 可以完成数据去重
    # 无参数使用, 对全部的列 联合起来进行比较, 去除重复值, 只保留一条
    df.dropDuplicates().show()

    df.dropDuplicates(['age', 'job']).show()

    # TODO 2: 数据清洗: 缺失值处理
    # dropna api是可以对缺失值的数据进行删除
    # 无参数使用, 只要列中有null 就删除这一行数据
    df.dropna().show()
    # thresh = 3表示, 最少满足3个有效列,  不满足 就删除当前行数据
    df.dropna(thresh=3).show()

    df.dropna(thresh=2, subset=['name', 'age']).show()

    # TODO 3: 缺失值处理也可以完成对缺失值进行填充
    # DataFrame的 fillna 对缺失的列进行填充
    df.fillna("loss").show()

    # 指定列进行填充
    df.fillna("N/A", subset=['job']).show()

    # 设定一个字典, 对所有的列 提供填充规则
    df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()

3.8 DataFrame数据写出

SparkSQL统一API写出DataFrame数据

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 1. 读取数据集
    schema = StructType().add("user_id", StringType(), nullable=True). \
        add("movie_id", IntegerType(), nullable=True). \
        add("rank", IntegerType(), nullable=True). \
        add("ts", StringType(), nullable=True)
    df = spark.read.format("csv"). \
        option("sep", "\t"). \
        option("header", False). \
        option("encoding", "utf-8"). \
        schema(schema=schema). \
        load("u.data")

    # 2. Write text 写出, 只能写出一个列的数据, 需要将df转换为单列df
    df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
        write.\
        mode("overwrite").\
        format("text").\
        save("./sql/text")

    # 3. Write csv
    df.write.mode("overwrite").\
        format("csv").\
        option("sep", ";").\
        option("header", True).\
        save("./sql/csv")

    # 4. Write json
    df.write.mode("overwrite").\
        format("json").\
        save("./sql/json")

    # 5. Write parquet
    df.write.mode("overwrite").\
        format("parquet").\
        save("./sql/parquet")

3.9 DataFrame 通过JDBC读写数据库(MySQL示例)

读取JDBC 是需要有驱动的,我们读取的是 jdbc:mysql://

这个协议,也就是读取的是mysql的数据,既然如此,就需要有mysql的驱动jar包给spark程序用.
如果不给驱动jar包,会提示:No suitable Driver
在这里插入图片描述

  • 对于windows系统(使用本地解释器)(以Anaconda环境演示)
    将jar包放在: Anaconda3的安装路径下\envs\虚拟环境\Liblsite-packages\pyspark\jars

  • 对于Linux系统(使用远程解释器执行)(以Anaconda环境演示)
    将jar包放在:Anaconda3的安装路径下/envs/虚拟环境/lib/python3.8/site-packages/pyspark/jars

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 1. 读取数据集
    schema = StructType().add("user_id", StringType(), nullable=True). \
        add("movie_id", IntegerType(), nullable=True). \
        add("rank", IntegerType(), nullable=True). \
        add("ts", StringType(), nullable=True)
    df = spark.read.format("csv"). \
        option("sep", "\t"). \
        option("header", False). \
        option("encoding", "utf-8"). \
        schema(schema=schema). \
        load("u.data")

    # TODO 1: 写出df到mysql数据库中
    df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://127.0.0.1:3306/python_learn?useSSL=false&useUnicode=true").\
        option("dbtable", "movie_date").\
        option("user", "root").\
        option("password", "123456").\
        save()

    # TODO 2: 读取MySQL数据
    df2 = spark.read.\
        format("jdbc").\
        option("url", "jdbc:mysql://127.0.0.1:3306/python_learn?useSSL=false&useUnicode=true").\
        option("dbtable", "movie_date").\
        option("user", "root").\
        option("password", "123456").\
        load()
    df2.printSchema()
    df2.show()

"""
JDBC写出, 会自动创建表的.
因为DataFrame中有表结构信息, StructType记录的 各个字段的 名称 类型  和是否运行为空
"""

3.10 总结

  1. DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据
  2. DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建
  3. spark.read.format()和df.write.format() 是DataFrame读取和写出的统一化标准API
  4. SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能
  5. dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值
  6. SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/205827.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

压力测试+接口测试

jmeter是apache公司基于java开发的一款开源压力测试工具,体积小,功能全,使用方便,是一个比较轻量级的测试工具,使用起来非常简单。因 为jmeter是java开发的,所以运行的时候必须先要安装jdk才可以。jmeter是…

记i18n ally工具检测语言失败的一则思路

情况 只有某个文件检测不到汉字,其余都可以检测出来,困扰许久,发个博客记一下思路 解决方法: 1、肯定不是i18n ally工具的问题,因为其他的vue都能检测成功 2、是这个文件的问题 采用排除法 先删掉所有代码&#…

容器有挂载目录的时候,容器反向生成为镜像,挂载的内容不会保留。只有实打实拷贝进容器的反向生成镜像才会保留。

无容器目录挂载 1、也就是说宿主机未与容器进行路径映射,故我们可以直接使用指令: docker commit 容器名称/容器ID 像名:标签号,把容器保存为镜像; (其中镜像名和标签号是我们随机取的,新镜像名以及我们的标签号!) 2、我们在不能判断容器与宿…

甘草书店记:2023年10月24日 星期二 「在完美和高效之间寻求平衡」

书店装修设计图出了第一版,不能够完全满意也在不在预料之外。 中国人的哲学是中庸的哲学。在高效中去追逐完美,在追逐完美中提升效率。 分享余华先生在节目中的一段话: 一种阅读和一本书的相遇有时候也是一种缘分。但也可能就是所有人都说…

Diffusion:通过扩散和逆扩散过程生成图像的生成式模型

在当今人工智能大火的时代,AIGC 可以帮助用户完成各种任务。作为 AIGC 主流模型的 DDPM,也时常在各种论文中被提起。DDPM 本质就是一种扩散模型,可以用来生成图片或者为图片去噪。 扩散模型定义了一个扩散的马尔科夫过程,每一步逐…

【C++】了解模板

这里是目录 前言函数模板函数模板的实例化类模板 前言 如果我们要交换两个数字,那么我们就需要写一个Swap函数来进行交换,那如果我们要交换char类型的数据呢?那又要写一份Swap的函数重载,参数的两个类型是char,那我们…

Rocketmq架构

NameServer:作为注册中心,提供路由注册、路由踢出、路由发现功能,舍弃强一致,保证高可用,集群中各个节点不会实时通讯,其中一个节点下线之后,会提供另外一个节点保证路由功能。 Rocket mq name…

AI视觉识别有哪些工业应用

AI视觉识别,主要是利用人工智能算法对图像或视频数据进行分析和处理,以提取关键信息并执行筛选、判断、预警等任务。AI视觉识别涵盖多种应用,如人脸识别、目标检测和识别、图像分割、行为识别、视频分析等。本篇就简单介绍一下AI视觉识别的应…

传智杯第五届题解

B.莲子的机械动力学 分析&#xff1a;这题有个小坑&#xff0c;如果是00 0&#xff0c;结果记得要输出0。 得到的教训是&#xff0c;避免前导0出现时&#xff0c;要注意答案为0的情况。否则有可能会没有输出 #include<assert.h> #include<cstdio> #include<…

J签证、移民、绿卡都是怎么回事?

随着全球化的不断推进&#xff0c;越来越多的人开始关注国际间的移民与签证政策&#xff0c;其中包括J签证、移民以及绿卡的申请问题。本文将简要介绍J签证、移民绿卡的基本概念&#xff0c;并提供相关申请的一般步骤&#xff0c;以帮助读者更好地了解这些程序。 首先&#xff…

【raect.js + hooks】useRef 搭配 Houdini 创造 useRipple

水波纹点击特效 really cool&#xff0c;实现水波纹的方案也有很多&#xff0c;笔者经常使用 material 组件&#xff0c;非常喜欢 mui 中的 ripple&#xff0c;他家的 ripple 特效就是通过 css Houdini 实现的。 今天&#xff0c;我们将复刻一个 ripple&#xff0c;并封装成 ho…

vue3 router-view 使用keep-alive报错parentcomponent.ctx.deactivate is not a function

问题 如下图&#xff0c;在component组件上添加v-if判断&#xff0c;会报错: parentcomponent.ctx.deactivate is not a function 解决方法 去除v-if&#xff0c;将key直接添加上。由于有的公用页面&#xff0c;需要刷新&#xff0c;不希望缓存&#xff0c;所以需要添加key…

【23真题】快跑,考太偏了这所211!

今天分享的是23年湖南师范997的信号与系统试题及解析。 小马哥Tips&#xff1a; 本套试卷难度分析&#xff1a;22年湖南师范997考研真题&#xff0c;我也发布过&#xff0c;若有需要&#xff0c;戳这里自取&#xff01;本套试题难度中等&#xff0c;题量适中&#xff0c;但是…

升级python后sudo apt-get update报错

sudo apt-get update 报错&#xff1a; sh: /usr/lib/cnf-update-db: /usr/bin/python3.7.5: bad interpreter: No such file or directory Reading package lists... Done E: Problem executing scripts APT::Update::Post-Invoke-Success if /usr/bin/test -w /var/lib/c…

CANDENCE: PCB 如何高亮网络、器件

PCB 如何高亮网络、器件 开始前先学习一个单词&#xff1a;assign CANDECE 高亮网络 step1: 选择一个颜色&#xff1a;红色 step2: 筛选要高亮什么&#xff1a;网络 or 器件&#xff0c;这里选择网络。 step3&#xff1a;鼠标点击要高亮的网络&#xff1a; 这里是GND 这里…

罐装葡萄酒会成为主流吗?

许多人认为罐装葡萄酒可能是葡萄酒行业的下一个大事件&#xff0c;一个有待提出的问题&#xff0c;罐装葡萄酒会成为主流吗&#xff1f;来自云仓酒庄品牌雷盛红酒分享还是这种形式的基础永远会限制它的吸引力&#xff1f;在这里&#xff0c;我们一起来探讨支持和反对罐装葡萄酒…

力扣题:单词-11.20

力扣题-11.20 [力扣刷题攻略] Re&#xff1a;从零开始的力扣刷题生活 力扣题1&#xff1a;58. 最后一个单词的长度 解题思想&#xff1a;按空格划分&#xff0c;然后统计单词长度即可 class Solution(object):def lengthOfLastWord(self, s):""":type s: str…

Java——TreeSet用法

Java——TreeSet TreeSet 是 Java 中的一个有序集合类&#xff0c;它基于红黑树&#xff08;Red-Black Tree&#xff09;实现。 下面详细介绍 TreeSet 的用法和特点&#xff1a; 有序性&#xff1a;TreeSet 中的元素按照自然顺序或者通过自定义的比较器进行排序。它保证了元素…

网工内推 | 云计算运维,云相关认证优先,最高30K,带薪年假

01 安畅网络 招聘岗位&#xff1a;云计算运维工程师 职责描述&#xff1a; 1、负责对公有云平台的计算、存储、网络资源等IAAS/SAAS/PAAS层产品组件日常交付部署运维工作&#xff0c;包括调试、配置、维护、监控、优化等工作&#xff1b; 2、负责对操作系统及应用日常运行维护…

OSG编程指南<十七>:OSG光照与材质

1、OSG光照 OSG 全面支持 OpenGL 的光照特性&#xff0c;包括材质属性&#xff08;material property&#xff09;、光照属性&#xff08;light property&#xff09;和光照模型&#xff08;lighting model&#xff09;。与 OpenGL 相似&#xff0c;OSG 中的光源也是不可见的&a…