spark 数据的加载和保存(Parquet、JSON、CSV、MySql)

spark数据的加载和保存
SparkSQL 默认读取和保存的文件格式为 parquet
1.加载数据
spark.read.load 是加载数据的通用方法

scala> spark.read.
csv format jdbc json load option options orc parquet schema 
table text textFile
如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")

  format("…"):指定加载的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"。

  load("…"):在"csv""jdbc""json""orc""parquet""textFile"格式下需要传入加载
数据的路径。

  option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,也可以直接在文件上进行查询: 文件格式.`文件路径`

scala>spark.sql("select * from json.`/opt/tmp/data/test.json`").show

2.保存数据
df.write.save 是保存数据的通用方法

scala>df.write.
csv jdbc json orc parquet textFile…
scala>df.write.format("…")[.option("…")].save("…")

format("…"):指定保存的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"等

save ("…"):在"csv""orc""parquet""textFile"格式下需要传入保存数据的路径。

option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。

SaveMode 是一个枚举类,其中的常量包括:
在这里插入图片描述
保存:

df.write.mode("append").json("/opt/tmp/data/output")

Parquet
Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式
存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。
修改配置项 spark.sql.sources.default,可修改默认数据源格式。
1.加载数据

scala> val df = spark.read.load("tmp/resources/test.parquet")
scala> df.show

2.保存数据

scala> var df = spark.read.json("/opt/tmp/data/input/test.json")
//保存为 parquet 格式
scala> df.write.mode("append").save("/opt/tmp/data/output")

JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。格式如下:

{"name":"zhangsan"}
{"name":"lisi""age":21}
[{"name":"wangwu""age":21},{"name":"xiaohua""age":22}]

1.导入隐式转换

import spark.implicits.

2.加载 JSON 文件

val path = "/opt/tmp/spark-test/test.json"
val peopleDF = spark.read.json(path)

3.创建临时表

peopleDF.createOrReplaceTempView("people")

4.数据查询

val teenagerNamesDF = spark.sql("SELECT name FROM test WHERE age BETWEEN 20 AND 22")teenagerNamesDF.show()

CSV
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列

spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/test.csv")

MySQL
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对
DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操
作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类
路径下

bin/spark-shell 
--jars mysql-connector-java-5.1.27-bin.jar

Idea 中通过 JDBC 对 Mysql 进行操作
1.导入依赖

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>

2.读取数据(mysql)

package SparkTest.sparkmysql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkMysqlRead {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    
    //方式 1:通用的 load 方法读取
    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.58.203:3306/testdb")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123")
      .option("dbtable", "user_test")
      .load().show()

   //方式 2:通用的 load 方法读取 参数另一种形式
  spark.read.format("jdbc")
    .options(Map("url"->"jdbc:mysql://192.168.58.203:3306/testdb?user=root&password=123", "dbtable"->"user_test","driver"->"com.mysql.jdbc.Driver")).load().show()

  //方式 3:使用 jdbc 方法读取
  val props: Properties = new Properties()
  props.setProperty("user", "root")
  props.setProperty("password", "123")
  val df: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.58.203:3306/testdb", "user_test", props)
  df.show()
  
  //释放资源
    spark.stop()
  }
}

3.写入数据(mysql)

package SparkTest.sparkmysql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}

object SparkMysqlWrite {
  case class User(name: String, age: Long)
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("zhangsan", 20), User("lisi", 21)))
    val ds: Dataset[User] = rdd.toDS()


    //方式 1:通用的方式 format 指定写出类型
    ds.write
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.58.203:3306/testdb")
      .option("user", "root")
      .option("password", "123")
      .option("dbtable", "user1")
      .mode(SaveMode.Append)
      .save()

    //方式 2:通过 jdbc 方法
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123")
    ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.58.203:3306/testdb", "user2", props)
    
    //释放资源
    spark.stop()

  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/16771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

后端要一次性返回我10万条数据

问题描述 面试官&#xff1a;后端一次性返回10万条数据给你&#xff0c;你如何处理&#xff1f;我&#xff1a;歪嘴一笑&#xff0c;what the f**k! 问题考察点 看似无厘头的问题&#xff0c;实际上考查候选人知识的广度和深度&#xff0c;虽然在工作中这种情况很少遇到... …

情景剧本杀闯关系统

情景剧本杀闯关软件的开发需求通常包括以下几个方面&#xff1a; 剧本设计&#xff1a;开发者需要根据用户需求和市场调研&#xff0c;设计不同主题和难度等级的剧本内容&#xff0c;以及游戏过程中的任务、角色和道具等。 游戏引擎开发&#xff1a;为了实现游戏过程中…

如何在 DigitalOcean 中部署 ONLYOFFICE 文档

现在您可使用通过 DigitalOcean 市场提供的一键式应用在 DigitalOcean 云架构中轻松部署 Docker 版本的 ONLYOFFICE 文档。 一键式应用是一个包含所有必要预配置组件的镜像&#xff0c;可用于便捷地在运行有 Ubuntu OS 的 DigitalOcean 服务器上部署 ONLYOFFICE&#xff1a; D…

Azure DevOps Server 2022.0.1升级手册

Contents 1. 概述2. 操作方法 2.1 安装操作系统2.2 安装数据库2.4 还原数据2.3 安装和配置Azure DevOps Server 1. 概述 Azure DevOps Server 是微软公司经过20多年的持续开发&#xff0c;逐渐将需求管理、敏捷实践、源代码管理、持续集成等功能集成一体&#xff0c;实现应用软…

B-Tree (多路查找树)分析-20230503

B-Tree (多路查找树)学习-20230503 前言 B-树是一类多路查询树&#xff0c;它主要用于文件系统和某些数据库的索引&#xff0c;如果采用二叉平衡树访问文件里面的数据&#xff0c;最坏情况下&#xff0c;磁头可能需要进行O(h)次对磁盘的读写&#xff0c;其中h为树的高度&…

微服务不是本地部署的最佳选择,不妨试试模块化单体

微服务仅适用于成熟产品 关于从头开始使用微服务&#xff0c;马丁・福勒&#xff08;Martin Fowler&#xff09;总结道&#xff1a; 1. 几乎所有成功的微服务都是从一个过于庞大而不得不拆分的单体应用开始的。 2. 几乎所有从头开始以微服务构建的系统&#xff0c;最后都会因…

Java 反射机制

目录 一、反射机制概述 二、理解并获取Class实例 三、反射的用法 1. 通过反射创建运行时类的对象 2. 通过反射获取运行时类的属性结构 3. 通过反射获取运行时类的方法结构 4. 通过反射获取运行时类的构造器结构 5. 通过反射获取运行时类的父类 6. 通过反射获取运行时类…

DDD系列:三、Repository模式

为什么需要Repository&#xff1f; ​ Anemic Domain Model&#xff08;贫血领域模型&#xff09;特征&#xff1a; 有大量的XxxDO对象&#xff1a;这里DO虽然有时候代表了Domain Object&#xff0c;但实际上仅仅是数据库表结构的映射&#xff0c;里面没有包含&#xff08;或…

Midjourney之logo设计(建议收藏)

目录 宠物诊所的logo设计 常见的Logo类型 图形logo: 字母LOGO APP LOGO 进阶技巧 设置艺术家风格 去掉不需要的元素 ChatGPT Midjourney设计logo 聊天&#xff08;国产&#xff09;&#xff1a;文心一言通义千问 绘图&#xff08;国产&#xff09; UI设计 ChatGP…

【谷粒商城之服务认证OAuth2.0】

本笔记内容为尚硅谷谷粒商城服务认证OAuth2.0部分 目录 一、OAuth 2.0 二、微博登录测试 1、微博登陆准备工作 2、获取微博Access Token 3、登录测试 1.添加HttpUtils工具类 2.controller 3.service 4.vo 总结 一、OAuth 2.0 OAuth&#xff1a; OAuth&#xff08;开…

Java多线程深入探讨

1. 线程与进程2. 创建和管理线程2.1. 继承Thread类2.2. 实现Runnable接口2.3 利用Callable、FutureTask接口实现。2.4 Thread的常用方法 3. 线程同步3.1. synchronized关键字3.1.1同步代码块&#xff1a;3.1.2 同步方法&#xff1a; 3.2. Lock接口 4. 线程间通信5. 线程池5.1 使…

【Linux】管道

目录 一、前言 二、管道 1、匿名管道 1.1、基本原理 1.2、代码实现 1.3、管道的特点 1.4、基于管道的简单设计 2、命名管道 2.1、匿名管道与命名管道的区别 2.2、代码实现命名管道通信 一、前言 为了满足各种需求&#xff0c;进程之间是需要通信的。进程间通信的主要目…

python函数的递归调用

引入 函数既可以嵌套定义也可以嵌套调用。嵌套定义指的是在定义一个函数时在该函数内部定义另一个函数&#xff1b;嵌套调用指的是在调用一个函数的过程中函数内部有调用另一个函数。而函数的递归调用指的是在调用一个函数的过程中又直接或者间接的调用该函数本身。 函数递归…

PySpark基础入门(3):RDD持久化

RDD的持久化 RDD 的数据是过程数据&#xff0c;因此需要持久化存储&#xff1b; RDD之间进行相互迭代的计算&#xff0c;新的RDD的生成代表着旧的RDD的消失&#xff1b;这样的特性可以最大化地利用资源&#xff0c;老旧地RDD可以及时地从内存中清理&#xff0c;从而给后续地计…

aop切面调用失效问题排查

应用里有较多的地方访问外部供应商接口&#xff0c;由于公网网络不稳定或者外部接口不稳定&#xff08;重启&#xff0c;发版&#xff0c;ip切换&#xff09;的原因&#xff0c;经常报502或者504错误。为了解决HTTP调用的500报错&#xff0c;选择使用spring的retryable注解进行…

Pyinstaller将python文件打包成exe程序——封装LoFTR开源匹配代码

Pyinstaller将python文件打包成exe程序——封装LoFTR开源匹配代码 1.LoFTR代码下载及环境搭建 源码下载&#xff1a;https://github.com/bodhisatan/LoFTR-Stitch 环境搭建&#xff1a;按照github项目中的readme文档进行搭建即可&#xff0c;几乎没有遇到问题&#xff0c;代码…

【Unity入门】22.动态创建实例

【Unity入门】动态创建实例 大家好&#xff0c;我是Lampard~~ 欢迎来到Unity入门系列博客&#xff0c;所学知识来自B站阿发老师~感谢 &#xff08;一&#xff09;脚本实例化预制体对象 &#xff08;1&#xff09;Instantiate克隆创建对象 昨天我们学习了预制体这个概念&#…

文献阅读(50)—— Transformer 用于肺癌诊断预测

文献阅读&#xff08;50&#xff09;—— Transformer 用于肺癌诊断预测 文章目录 文献阅读&#xff08;50&#xff09;—— Transformer 用于肺癌诊断预测先验知识/知识拓展文章结构背景文章方法1. 文章核心网络结构2. Time Encoding ViT &#xff08;TeViT&#xff09;3. Tim…

力扣刷题2023-05-04-1——题目:2614. 对角线上的质数

题目&#xff1a; 给你一个下标从 0 开始的二维整数数组 nums 。 返回位于 nums 至少一条 对角线 上的最大 质数 。如果任一对角线上均不存在质数&#xff0c;返回 0 。 注意&#xff1a; 如果某个整数大于 1 &#xff0c;且不存在除 1 和自身之外的正整数因子&#xff0c;…

Leetcode——66. 加一

&#x1f4af;&#x1f4af;欢迎来到的热爱编程的小K的Leetcode的刷题专栏 文章目录 1、题目2、暴力模拟(自己的第一想法)3、官方题解 1、题目 给定一个由 整数 组成的 非空 数组所表示的非负整数&#xff0c;在该数的基础上加一。最高位数字存放在数组的首位&#xff0c; 数组…
最新文章