【Python】原来处理大文件也可以这么简单?

文章目录

  • 前言
  • 一、开始
  • 二、处理文本
  • 三.串行处理
  • 五多进程处理
  • 六输出
  • 七并行处理
  • 总结


前言

为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。

例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。

在这篇文章中,我们将学习如何使用multiprocessing、joblib和tqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。

一、开始

我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。

https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents

我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据导入,re、nltk和string用于文本处理。


# Parallel Computing

import multiprocessing as mp

from joblib import Parallel, delayed

from tqdm.notebook import tqdm

# Data Ingestion 

import pandas as pd

# Text Processing 

import re 

from nltk.corpus import stopwords

import string

在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers。


n_workers = 2 * mp.cpu_count()

print(f"{n_workers} workers are available")

>>> 8 workers are available

下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

输出:


Shape:(2845342, 47)

Column Names:

Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')

CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

二、处理文本

clean_text是一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。


def clean_text(text): 
  # Remove stop words
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word 
 not in stops])
  # Remove Special Characters
  text = text.translate(str.maketrans('', '', string.punctuation))
  # removing the extra spaces
  text = re.sub(' +',' ', text)
  return text

三.串行处理

对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。

我们将处理280万条记录,并将结果保存回 “Description” 列中。


```python
%%time
tqdm.pandas()

df['Description'] = df['Description'].progress_apply(clean_text)

该处使用的url网络请求的数据。

---

#四输出
高端处理器串行处理280万行花了9分5秒。



```python
100% 🟩🟩🟩🟩 2845342/2845342 [09:05<00:00, 5724.25it/s]

CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

五多进程处理

有多种方法可以对文件进行并行处理,我们将了解所有这些方法。multiprocessing是一个内置的python包,通常用于并行处理大型文件。

我们将创建一个有8个workers的多处理池,并使用map函数来启动进程。为了显示进度条,我们将使用tqdm。

map函数由两部分组成。第一个部分需要函数,第二个部分需要一个参数或参数列表。

%%time
p = mp.Pool(n_workers) 

df['Description'] = p.map(clean_text,tqdm(df['Description']))

六输出

我们的处理时间几乎提高了3倍。处理时间从9分5秒下降到3分51秒。

100% 🟩🟩🟩🟩 2845342/2845342 [02:58<00:00, 135646.12it/s]

CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

七并行处理

我们现在将学习另一个Python包来执行并行处理。在本节中,我们将使用joblib的Parallel和delayed来复制map函数。

Parallel需要两个参数:n_job = 8和backend = multiprocessing。

然后,我们将在delayed函数中加入clean_text。

创建一个循环,每次输入一个值。

下面的过程是相当通用的,你可以根据你的需要修改你的函数和数组。我曾用它来处理成千上万的音频和视频文件,没有任何问题。

建议:使用 "try: "和 "except: "添加异常处理。

def text_parallel_clean(array):
  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(clean_text)
  (text) 
  for text in tqdm(array)
  )
  return result




在text_parallel_clean()中添加“Description”列。



%%time
df['Description'] = text_parallel_clean(df['Description'])

输出

我们的函数比多进程处理Pool多花了13秒。即使如此,并行处理也比串行处理快4分59秒。

100% 🟩🟩🟩🟩 2845342/2845342 [04:03<00:00, 10514.98it/s]

CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

并行批量处理

有一个更好的方法来处理大文件,就是把它们分成若干批,然后并行处理。让我们从创建一个批处理函数开始,该函数将在单一批次的值上运行clean_function。

批量处理函数

def proc_batch(batch):
  return [
  clean_text(text)
  for text in batch
  ]

将文件分割成批

下面的函数将根据workers的数量把文件分成多个批次。在我们的例子中,我们得到8个批次。

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / n_workers)
  batches = [
  array[ix:ix+batch_size]
  for ix in tqdm(range(0, file_len, batch_size))
  ]
  return batches

batches = batch_file(df['Description'],n_workers)

>>> 100% 8/8 [00:00<00:00, 280.01it/s]

运行并行批处理

最后,我们将使用Parallel和delayed来处理批次。

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(proc_batch)
  (batch) 
  for batch in tqdm(batches)
  )

df['Description'] = [j for i in batch_output for j in i]

输出

我们已经改善了处理时间。这种技术在处理复杂数据和训练深度学习模型方面非常有名。

100% 🟩🟩🟩🟩 8/8 [00:00<00:00, 2.19it/s]

CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm 并发

tqdm将多处理带到了一个新的水平。它简单而强大。

process_map需要:

函数名称
Dataframe 列名
max_workers
chucksize与批次大小类似。我们将用workers的数量来计算批处理的大小,或者你可以根据你的喜好来添加这个数字。

%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)

df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

输出

通过一行代码,我们得到了最好的结果:

100% 🟩🟩🟩🟩 2845342/2845342 [03:48<00:00, 1426320.93it/s]

CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

总结

我们需要找到一个平衡点,它可以是串行处理,并行处理,或批处理。如果你正在处理一个较小的、不太复杂的数据集,并行处理可能会适得其反。

在这个教程中,我们已经了解了各种处理大文件的Python包,它们允许我们对数据函数进行并行处理。

如果你只处理一个表格数据集,并且想提高处理性能,那么建议你尝试Dask、datatable和RAPIDS。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/14019.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[Java]JavaWeb开发中的MVC设计模式

一、有关Java Web与MVC设计模式 学习过基本Java Web开发的人都已经了解了如何编写基本的Servlet&#xff0c;如何编写jsp及如何更新浏览器中显示的内容。但是我们之前自己编写的应用一般存在无条理性&#xff0c;对于一个小型的网站这样的编写没有任何问题&#xff0c;但是一但…

ETL工具-pentaho企业实战部署

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

TinyOS 配置教程

系列文章目录 TinyOS 系列文章【一】&#xff1a;TinyOS 配置教程 TinyOS 系列文章【二】&#xff1a;Tossim 教程 文章目录 系列文章目录前言1. 安装1.1. 实验环境1.2. TinyOS基础工作1.3. TinyOS 的配置1.4. 安装 java1.5. 安装编译器 2. 测试仿真程序总结 前言 本文主要用…

kafka集群搭建

1.本次搭建涉及3台centos7主机&#xff0c;防火墙与selinux服务均关闭 2.主机参数如下表所示 nameIPportserviceA10.1.60.1122128、2888、3888、9092kafka、zookeeperB10.1.60.1142128、2888、3888、9092kafka、zookeeperC10.1.60.1152128、2888、3888、9092kafka、zookeeper…

让人悲观的国内ChatGPT的未来

最近关于ChatGPT的火爆已经不是简单的AI圈的事了&#xff0c;它已经席卷了所有的IT、媒体圈子&#xff0c;甚至是不同领域不同行业的人&#xff0c;只要你还对于变化与AI有一定的了解&#xff0c;那我相信你一定能知道ChatGPT是什么了。ChatGPT在某种程度上已经是相当于AGI通用…

图论-匈牙利算法学习

本文讲述的是匈牙利算法&#xff0c;即图论中寻找最大匹配的算法。解决的问题是从二分图中找到尽量多的匹配。 原题-华为-HJ28 素数伴侣 描述 题目描述 若两个正整数的和为素数&#xff0c;则这两个正整数称之为“素数伴侣”&#xff0c;如2和5、6和13&#xff0c;它们能应用…

【已解决】SpringBoot 工程 war包服务部署与调用测试

1.开发环境&#xff1a;IDEA&#xff0c;JDK1.8 2.服务打包类型&#xff1a; war包 3.war包部署环境&#xff1a;Linux系统&#xff0c;tomcat服务器&#xff0c;端口号&#xff1a;8081 4.war包部署位置&#xff1a;tomcat-8081/webapps/temp.war 5.服务名为&#xff1a;t…

瑞吉外卖项目——瑞吉外卖

软件开发整体介绍 软件开发流程 需求分析&#xff1a;产品原型、需求规格说明书 设计&#xff1a;产品文档、UI界面设计、概要设计、详细设计、数据库设计 编码&#xff1a;项目代码、单元测试 测试&#xff1a;测试用例、测试报告 上线运维&#xff1a;软件环境安装、配置…

python-day6(补充四:私有属性与函数)

私有属性与函数 私有属性与函数的用途如何定义私有属性与函数如何访问私有属性与函数 私有属性与函数的用途 在面向对象的封装中&#xff0c;私有的属性与函数其根本目的是防止它们在类的外部被使用&#xff0c;python中主要通过命名来进行区分。 把可能使用到的东西封装起来…

从零基础到条码高手:傻瓜式操作,告别excel、AI和PS的烦恼

条形码是一种用于商品识别、库存管理等方面的编码标识系统&#xff0c;它是通过将数字和字符以特定的图案排列组合起来&#xff0c;从而形成一组能被机器扫描和识别的条纹图案。 通常情况下&#xff0c;条形码的生成可以分为如下几个步骤&#xff1a; 1、编号&#xff1a;首先…

数据可视化工具汇总:数字孪生产品的得力助手

数字孪生技术是一项快速发展的新兴技术&#xff0c;已经在许多领域得到广泛应用。数字孪生技术不仅可以提供完整的虚拟模型&#xff0c;还可以模拟物理系统的行为。在数字孪生技术的推动下&#xff0c;越来越多的数字孪生产品开始涌现出来&#xff0c;为不同的领域提供支持和解…

如何通过FAQ页面减轻客户支持压力,提高工作效率?

作为现代企业不可或缺的一部分&#xff0c;客户支持服务是为客户提供解决方案、回答问题和解决技术难题的关键部分。无论是产品管理还是销售环节&#xff0c;客户支持都是重要的一环。然而&#xff0c;有效地处理技术支持问题和客户请求并不容易。卓越的客户支持需要组织结构&a…

excle表格打印相关问题

ps&#xff1a;无论是打印word,还是打印excel, 最后最好都保存成pdf&#xff0c;再打印。 ps&#xff1a;无论是打印word,还是打印excel, 最后最好都保存成pdf&#xff0c;再打印。 ps&#xff1a;无论是打印word,还是打印excel, 最后最好都保存成pdf&#xff0c;再打印。 …

ThreadLocal InheritableThreadLocal TransmittableThreadLocal的使用以及原理

ThreadLocal 每个线程向ThreadLocal设置值&#xff0c;再取值&#xff0c;实现线程之间的隔离 public class ThreadLocalCase1 {private static ThreadLocal<Integer> threadLocal new ThreadLocal<>();public static void main(String[] args) {Random random …

浅析提高倾斜摄影超大场景的三维模型轻量化的数据质量关键技术

浅析提高倾斜摄影超大场景的三维模型轻量化的数据质量关键技术 倾斜摄影超大场景的三维模型轻量化的质量关键技术主要包括&#xff1a; 1、保持数据精度。在进行轻量化处理时&#xff0c;必须确保数据的精度不受损失&#xff0c;否则会影响后续分析和应用方案。因此&#xff0…

【Leetcode -剑指Offer 22.链表中倒数第k个结点 -203.移除链表元素】

Leetcode Leetcode -剑指Offer 22.链表中倒数第k个结点Leetcode -203.移除链表元素 Leetcode -剑指Offer 22.链表中倒数第k个结点 题目&#xff1a;输入一个链表&#xff0c;输出该链表中倒数第k个节点。为了符合大多数人的习惯&#xff0c;本题从1开始计数&#xff0c;即链表…

DAY829

学习目标&#xff1a;成就上瘾&#xff0c;学到欲罢不能 4月&#xff08;复习完高数18讲内容&#xff0c;背诵21篇短文&#xff0c;熟词僻义300词基础词&#xff09; 学习内容&#xff1a; 暴力英语&#xff1a;背单词150个&#xff0c;背《死亡诗社》经典语段&#xff0c;抄写…

【Spring Cloud】Spring Cloud 是什么?

文章目录 前言一、子项目二、常用组件三、把 Spring Cloud 官方、Netflix、Alibaba 三者整理成如下表格&#xff1a; 前言 Spring 以 Bean&#xff08;对象&#xff09; 为中心&#xff0c;提供 IOC、AOP 等功能。Spring Boot 以 Application&#xff08;应用&#xff09; 为中…

LightGBM面试题

1.偏差 vs 方差? 偏差是指由有所采样得到的大小为m的训练数据集&#xff0c;训练出的所有模型的输出的平均值和真实模型输出之间的偏差。 通常是由对学习算法做了错误的假设导致的描述模型输出结果的期望与样本真实结果的差距。分类器表达能力有限导致的系统性错误&#xff0c…

linux学习记录 和文件系统相关的命令

记录过程&#xff0c;会有错误,硬链接与软链接哪里可能没有说清楚 文件,目录操作命令 pwd 获取当前处于哪个目录当中&#xff0c;返回的是绝对路径 [rootlocalhost home]# pwd /homecd cd 相对/绝对路径 切换目录的&#xff0c;change directory .代表当前目录 …代表上一级…
最新文章