Pandas 与 PySpark 强强联手,功能与速度齐飞

Pandas做数据处理可以说是yyds!而它的缺点也是非常明显,Pandas 只能单机处理,它不能随数据量线性伸缩。例如,如果 pandas 试图读取的数据集大于一台机器的可用内存,则会因内存不足而失败。

另外 pandas 在处理大型数据方面非常慢,虽然有像Dask 或 Vaex 等其他库来优化提升数据处理速度,但在大数据处理神之框架Spark面前,也是小菜一碟。

幸运的是,在新的 Spark 3.2 版本中,出现了一个新的Pandas API,将pandas大部分功能都集成到PySpark中,使用pandas的接口,就能使用Spark,因为 Spark 上的 Pandas API 在后台使用 Spark,这样就能达到强强联手的效果,可以说是非常强大,非常方便。

Spark 现在集成了 Pandas API,因此可以在 Spark 上运行 Pandas。只需要更改一行代码:

import pyspark.pandas as ps  

由此我们可以获得诸多的优势:

  • 如果我们熟悉使用Python 和 Pandas,但不熟悉 Spark,可以省略了需复杂的学习过程而立即使用PySpark。

  • 可以为所有内容使用一个代码库:无论是小数据和大数据,还是单机和分布式机器。

  • 可以在Spark分布式框架上,更快地运行 Pandas 代码。

最后一点尤其值得注意。

一方面,可以将分布式计算应用于在 Pandas 中的代码。且借助 Spark 引擎,代码即使在单台机器上也会更快!下图展示了在一台机器(具有 96 个 vCPU 和 384 GiBs 内存)上运行 Spark 和单独调用 pandas 分析 130GB 的 CSV 数据集的性能对比。

技术提升

本文由技术群粉丝分享,项目源码、数据、技术交流提升,均可加交流群获取,群友已超过2000人,添加时最好的备注方式为:来源+兴趣方向,方便找到志同道合的朋友

方式①、添加微信号:pythoner666,备注:来自CSDN
方式②、微信搜索公众号:Python学习与数据挖掘,后台回复:加群

多线程和 Spark SQL Catalyst Optimizer 都有助于优化性能。例如,Join count 操作在整个阶段代码生成时快 4 倍:没有代码生成时为 5.9 秒,代码生成时为 1.6 秒。

Spark 在链式操作(chaining operations)中具有特别显着的优势。Catalyst 查询优化器可以识别过滤器以明智地过滤数据并可以应用基于磁盘的连接(disk-based joins),而 Pandas 倾向于每一步将所有数据加载到内存中。

现在是不是迫不及待的想尝试如何在 Spark 上使用 Pandas API 编写一些代码?我们现在就开始吧!

在 Pandas / Pandas-on-Spark / Spark 之间切换

需要知道的第一件事是我们到底在使用什么。在使用 Pandas 时,使用类pandas.core.frame.DataFrame。在 Spark 中使用 pandas API 时,使用pyspark.pandas.frame.DataFrame。虽然两者相似,但不相同。主要区别在于前者在单机中,而后者是分布式的。

可以使用 Pandas-on-Spark 创建一个 Dataframe 并将其转换为 Pandas,反之亦然:

# import Pandas-on-Spark   
import pyspark.pandas as ps  
  
# 使用 Pandas-on-Spark 创建一个 DataFrame   
ps_df = ps.DataFrame(range(10))  
  
# 将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe   
pd_df = ps_df.to_pandas()  
  
# 将 Pandas Dataframe 转换为 Pandas-on-Spark Dataframe   
ps_df = ps.from_pandas(pd_df)  

注意,如果使用多台机器,则在将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe 时,数据会从多台机器传输到一台机器,反之亦然(可参阅PySpark 指南[1])。

还可以将 Pandas-on-Spark Dataframe 转换为 Spark DataFrame,反之亦然:

# 使用 Pandas-on-Spark 创建一个 DataFrame   
ps_df = ps.DataFrame(range(10))  
  
# 将 Pandas-on-Spark Dataframe 转换为 Spark Dataframe   
spark_df = ps_df.to_spark()  
  
# 将 Spark Dataframe 转换为 Pandas-on-Spark Dataframe   
ps_df_new = spark_df.to_pandas_on_spark()  

数据类型如何改变?

在使用 Pandas-on-Spark 和 Pandas 时,数据类型基本相同。将 Pandas-on-Spark DataFrame 转换为 Spark DataFrame 时,数据类型会自动转换为适当的类型(请参阅PySpark 指南[2])

下面的示例显示了在转换时是如何将数据类型从 PySpark DataFrame 转换为 pandas-on-Spark DataFrame。

>>> sdf = spark.createDataFrame([  
...     (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)),  
... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date')  
>>> sdf  
DataFrame[tinyint: tinyint, decimal: decimal(10,0),  
float: float, double: double, integer: int,  
long: bigint, short: smallint, timestamp: timestamp,   
string: string, boolean: boolean, date: date]
psdf = sdf.pandas_api()  
psdf.dtypes  
tinyint                int8  
decimal              object  
float               float32  
double              float64  
integer               int32  
long                  int64  
short                 int16  
timestamp    datetime64[ns]  
string               object  
boolean                bool  
date                 object  
dtype: object

Pandas-on-Spark vs Spark 函数

在 Spark 中的 DataFrame 及其在 Pandas-on-Spark 中的最常用函数。注意,Pandas-on-Spark 和 Pandas 在语法上的唯一区别就是 import pyspark.pandas as ps 一行。

当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。

导入库

# 运行Spark  
from pyspark.sql import SparkSession  
spark = SparkSession.builder \  
          .appName("Spark") \  
          .getOrCreate()  
# 在Spark上运行Pandas  
import pyspark.pandas as ps  

读取数据

以 old dog iris 数据集为例。

# SPARK   
sdf = spark.read.options(inferSchema='True',   
              header='True').csv('iris.csv')  
# PANDAS-ON-SPARK   
pdf = ps.read_csv('iris.csv')  

选择

# SPARK   
sdf.select("sepal_length","sepal_width").show()  
# PANDAS-ON-SPARK   
pdf[["sepal_length","sepal_width"]].head()  

删除列

# SPARK   
sdf.drop('sepal_length').show()# PANDAS-ON-SPARK   
pdf.drop('sepal_length').head()  

删除重复项

# SPARK   
sdf.dropDuplicates(["sepal_length","sepal_width"]).show()  
# PANDAS-ON-SPARK   
pdf[["sepal_length", "sepal_width"]].drop_duplicates()  

筛选

# SPARK   
sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show()  
# PANDAS-ON-SPARK   
pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()  

计数

# SPARK   
sdf.filter(sdf.flower_type == "Iris-virginica").count()  
# PANDAS-ON-SPARK   
pdf.loc[pdf.flower_type == "Iris-virginica"].count()  

唯一值

# SPARK   
sdf.select("flower_type").distinct().show()  
# PANDAS-ON-SPARK   
pdf["flower_type"].unique()  

排序

# SPARK   
sdf.sort("sepal_length", "sepal_width").show()  
# PANDAS-ON-SPARK   
pdf.sort_values(["sepal_length", "sepal_width"]).head()  

分组

# SPARK   
sdf.groupBy("flower_type").count().show()  
# PANDAS-ON-SPARK   
pdf.groupby("flower_type").count()  

替换

# SPARK   
sdf.replace("Iris-setosa", "setosa").show()  
# PANDAS-ON-SPARK   
pdf.replace("Iris-setosa", "setosa").head()  

连接

#SPARK   
sdf.union(sdf)  
# PANDAS-ON-SPARK   
pdf.append(pdf)  

transform 和 apply 函数应用

有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:

DataFrame.transform()   
DataFrame.apply()  
DataFrame.pandas_on_spark.transform_batch()    
DataFrame.pandas_on_spark.apply_batch()    
Series.pandas_on_spark.transform_batch()  

每个 API 都有不同的用途,并且在内部工作方式不同。

transform 和 apply

DataFrame.transform()DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。

# transform  
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})  
def pandas_plus(pser):  
    return pser + 1  # 应该总是返回与输入相同的长度。  
  
psdf.transform(pandas_plus)  
  
# apply  
psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})  
def pandas_plus(pser):  
    return pser[pser % 2 == 1]  # 允许任意长度  
  
psdf.apply(pandas_plus)  

在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。

在“列”轴的情况下,该函数将每一行作为一个熊猫系列。

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})  
def pandas_plus(pser):  
    return sum(pser)  # 允许任意长度  
psdf.apply(pandas_plus, axis='columns')  

上面的示例将每一行的总和计算为pands Series

pandas_on_spark.transform_batchpandas_on_spark.apply_batch

batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})  
def pandas_plus(pdf):  
    return pdf + 1  # 应该总是返回与输入相同的长度。  
  
psdf.pandas_on_spark.transform_batch(pandas_plus)  
  
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})  
def pandas_plus(pdf):  
    return pdf[pdf.a > 1]  # 允许任意长度  
  
psdf.pandas_on_spark.apply_batch(pandas_plus)  

两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。

在 Spark 上使用 pandas API的注意事项

避免shuffle

某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。

避免在单个分区上计算

另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。

不要使用重复的列名

不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:

import pyspark.pandas as ps  
psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})  
psdf.columns = ["a", "a"]  
Reference 'a' is ambiguous, could be: a, a.;

此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。

import pyspark.pandas as ps  
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})  
Reference 'a' is ambiguous, could be: a, a.;

但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。

from pyspark.sql import SparkSession  
builder = SparkSession.builder.appName("pandas-on-spark")  
builder = builder.config("spark.sql.caseSensitive", "true")  
builder.getOrCreate()  
  
import pyspark.pandas as ps  
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})  
psdf  
   a  A  
0  1  3  
1  2  4

使用默认索引

pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。

如果计划在生产中处理大数据,请通过将默认索引配置为distributeddistributed-sequence来使其确保为分布式。

有关配置默认索引的更多详细信息,请参阅默认索引类型[3]。

在 Spark 上使用 pandas API

尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。

例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:

>>> import pandas as pd  
>>> max(pd.Series([1, 2, 3]))  
3  
>>> min(pd.Series([1, 2, 3]))  
1  
>>> sum(pd.Series([1, 2, 3]))  
6  

Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:

>>> import pyspark.pandas as ps  
>>> ps.Series([1, 2, 3]).max()  
3  
>>> ps.Series([1, 2, 3]).min()  
1  
>>> ps.Series([1, 2, 3]).sum()  
6  

pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:

import pandas as pd  
data = []  
countries = ['London', 'New York', 'Helsinki']  
pser = pd.Series([20., 21., 12.], index=countries)  
for temperature in pser:  
     assert temperature > 0  
     if temperature > 1000:  
         temperature = None  
     data.append(temperature ** 2)  
  
pd.Series(data, index=countries)  
London      400.0  
New York    441.0  
Helsinki    144.0  
dtype: float64

但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:

import pyspark.pandas as ps  
import numpy as np  
countries = ['London', 'New York', 'Helsinki']  
psser = ps.Series([20., 21., 12.], index=countries)  
def square(temperature) -> np.float64:  
     assert temperature > 0  
     if temperature > 1000:  
         temperature = None  
     return temperature ** 2  
  
psser.apply(square)  
London      400.0  
New York    441.0  
Helsinki    144.0

减少对不同 DataFrame 的操作

Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。

写在最后

到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。

参考资料

[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html

[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html

[3]默认索引类型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/1550.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux分文件编程:静态库与动态库的生成和使用

目录 一,Linux库引入之分文件编程 ① 简单说明 ② 分文件编程优点 ③ 操作逻辑 ④ 代码实现说明 二,Linux库的基本说明 三,Linux库之静态库的生成与使用 ① 静态库命名规则 ② 静态库制作步骤 ③ 静态库的使用 四,Linu…

django-celery-beat搭建定时任务

一、创建django项目和app 1、安装定时任务第三方包 pip install django-celery-beat # 插件用来动态配置定时任务,一般会配合 django_celery_results 一起使用,所以一起安装 django_celery_results pip install django_celery_results pip install eventlet # win…

Keil MDK6要来了,将嵌入式软件开发水平带到新高度,支持跨平台(2023-03-11)

注:这个是MDK6,不是MDK5 AC6,属于下一代MDK视频版: https://www.bilibili.com/video/BV16s4y157WF Keil MDK6要来了,将嵌入式软件开发水平带到新高度,支持跨平台一年一度的全球顶级嵌入式会展Embedded Wor…

操作系统(1.3)--习题

一、课堂习题 1、一个作业第一 次执行时用了5min ,而第二次执行时用了6min,这说明了操作系统的( )特点。 A、并发性 B、共享性 C、虚拟性 D、不确定性 D 2、在计算机系统中,操作系统是( )。 A、处于裸机之上的第一层软件 B、处于硬件之下的低层软件 C、处于应用软件之上的系统软…

对象的创建以及数组中常见的属性与方法

(一)对象创建的三种方法 1、利用对象字面量创建对象 const obj{ name:小开心 } 2、利用new Object创建对象 const obj1new Object({ name:小开心 }) 3、利用构造函数创建对象 构造函数:是一种特殊的函数,主要用来初始化对象&…

Vector的扩容机制

到需要扩容的时候,Vector会根据需要的大小,创建一个新数组,然后把旧数组的元素复制进新数组。 我们可以看到,扩容后,其实是一个新数组,内部元素的地址已经改变了。所以扩容之后,原先的迭代器会…

【Spring事务】声明式事务 使用详解

个人简介:Java领域新星创作者;阿里云技术博主、星级博主、专家博主;正在Java学习的路上摸爬滚打,记录学习的过程~ 个人主页:.29.的博客 学习社区:进去逛一逛~ 声明式事务一、编程式事务二、声明式事务&…

PMSM矢量控制笔记(1.1)——电机的机械结构与运行原理

前言:重新整理以前的知识和文章发现,仍然有许多地方没有学得明白,懵懵懂懂含含糊糊的地方多如牛毛,尤其是到了真正实际写东西或者做项目时,如果不是系统的学习了知识,很容易遇到问题就卡壳,也想…

C语言的灵魂---指针(基础)

C语言灵魂指针1.什么是指针?2.指针的大小3.指针的分类3.1比较常规的指针类型3.2指针的解引用操作3.3野指针野指针的成因:4.指针运算4.1指针加减整数4.2指针-指针1.什么是指针? 这个问题我们通常解释为两种情况: 1.指针本质&#…

Unity学习日记13(画布相关)

目录 创建画布 对画布的目标图片进行射线检测 拉锚点 UI文本框使用 按钮 按钮导航 按钮触发事件 输入框 实现单选框 下拉菜单 多选框选项加图片 创建画布 渲染模式 第一个,保持画布在最前方,画布内的内容显示优先级最高。 第二个,…

GitHub 上有些什么好玩的项目?

前言 各个领域模块的都整理了一下,包含游戏、一些沙雕的工具、实用正经的工具以及一些相关的电商项目,希望他们可以给你学习的路上增加几分的乐趣,我们直接进入正题~ 游戏 1.吃豆人 一款经典的游戏开发案例,包括地图绘制、玩家控…

并发基础之线程池(Thread Pool)

目录前言何为线程池线程池优势创建线程池方式直接实例化ThreadPoolExecutor类JUC Executors 创建线程池线程池挖掘Executors简单介绍ThreadPoolExecutor核心类ThreadPoolExecutor 类构造参数含义线程池运行规则线程设置数量结语前言 相信大家都知道当前的很多系统架构都要求高…

echart图表之highcharts

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录前言一、HighCharts是什么?二、使用步骤1.引入库2.前端代码3.展现结果4.后台自动截图总结前言 提示:这里可以添加本文要记录的大概内容&…

linux kernel 5.0 inline hook框架

github:https://github.com/WeiJiLab/kernel-hook-framework 一、项目介绍 Usually we want to hack a kernel function, to insert customized code before or after a certain kernel function been called, or to totally replace a function with new one. How can we…

计算机图形学11:二维观察之多边形的裁剪

作者:非妃是公主 专栏:《计算机图形学》 博客地址:https://blog.csdn.net/myf_666 个性签:顺境不惰,逆境不馁,以心制境,万事可成。——曾国藩 文章目录专栏推荐专栏系列文章序一、多边形的裁剪…

Activity工作流(三):Service服务

3. Service服务 所有的Service都通过流程引擎获得。 3.1 RepositoryService 仓库服务是存储相关的服务,一般用来部署流程文件,获取流程文件(bpmn和图片),查询流程定义信息等操作,是引擎中的一个重要的服务。…

Anaconda配置Python新版本tensorflow库(CPU、GPU通用)的方法

本文介绍在Anaconda环境中,下载并配置Python中机器学习、深度学习常用的新版tensorflow库的方法。 在之前的两篇文章基于Python TensorFlow Estimator的深度学习回归与分类代码——DNNRegressor(https://blog.csdn.net/zhebushibiaoshifu/article/detail…

【C++学习】日积月累——SLT中stack使用详解(1)

一、stack的相关概念 stack是一种容器适配器,专门用在具有后进先出的上下文环境中,其删除只能从容器的一端进行元素的插入与提取操作; stack是作为容器适配器被实现的,容器适配器即是对特定类封装作为其底层的容器,并提…

图形视图框架的坐标

图形视图基于笛卡尔坐标系;项目在场景中的位置和几何图形由两组数字表示:X 坐标和 Y 坐标。使用未变换的视图观察场景时,场景上的一个单元由屏幕上的一个像素表示。 图形视图中有三种有效的坐标系: 项目坐标场景坐标视图坐标为了简化实现图形…

opencv仿射变换之获取变换矩阵

大家好,我是csdn的博主:lqj_本人 这是我的个人博客主页: lqj_本人的博客_CSDN博客-微信小程序,前端,python领域博主lqj_本人擅长微信小程序,前端,python,等方面的知识https://blog.csdn.net/lbcyllqj?spm1011.2415.3001.5343哔哩哔哩欢迎关注…
最新文章