pytorch 数据预加载

1. Abstract

本文介绍一个工具 PreDataLoader,它包装 torch.utils.data.DataLoader,接收该类的一个实例 loader,启动一个线程 t,创建一个队列 qtloader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数据加载程序, 节省数据加载时间。代码:

class PreDataLoader(object):
	"""
	@Author: Yuwei from https://www.zhihu.com/people/aewil-zheng, with few changes

	** 包装 torch.utils.data.DataLoader, 接收该类的一个实例 loader, 启动一个线程 t, 创建一个队列 q
	t 将 loader 中的数据预加载到队列 q 中, 以在模型计算时也能启动启动数据加载程序, 节省数据加载时间

	** 若提供了 cuda device, 数据将直接被加载到 GPU 上
	"""

	def __init__(self, loader, device=None, queue_size=2):
		"""
		:param loader: torch.utils.data.DataLoader
		:param device: torch.device('cuda' or 'cpu'), to use cpu, set None
		:param queue_size: the number of samples to be preloaded
		"""
		self.__loader = loader
		self.__device = device
		self.__queue_size = queue_size

		self.__load_stream = torch.cuda.Stream(device=device) \
			if str(device).startswith('cuda') else None  # 如果提供了 cuda device, 则创建 cuda 流

		self.__queue = Queue(maxsize=self.__queue_size)
		self.__idx = 0
		self.__worker = Thread(target=self._load_loop)
		self.__worker.setDaemon(True)
		self.__worker.start()

	def _load_loop(self):
		""" 不断的将数据加载到队列里 """
		if str(self.__device).startswith('cuda'):
			logging.info(f'>>> data will be preloaded into device \'{self.__device}\'')
			logging.info(f'>>> this may cost more GPU memory!!!')
			# The loop that will load into the queue in the background
			torch.cuda.set_device(self.__device)
			while True:
				for sample in self.__loader:
					self.__queue.put(self._load_instance(sample))
		else:
			while True:
				for sample in self.__loader:
					self.__queue.put(sample)

	def _load_instance(self, sample):
		""" 将 batch 数据从 CPU 加载到 GPU 中 """
		if torch.is_tensor(sample):
			with torch.cuda.stream(self.__load_stream):
				return sample.to(self.__device, non_blocking=True)
		elif sample is None or type(sample) == str:
			return sample
		elif isinstance(sample, dict):
			return {k: self._load_instance(v) for k, v in sample.items()}
		else:
			return [self._load_instance(s) for s in sample]

	def __iter__(self):
		self.__idx = 0
		return self

	def __next__(self):
		# 加载线程挂了
		if not self.__worker.is_alive() and self.__queue.empty():
			self.__idx = 0
			self.__queue.join()
			self.__worker.join()
			raise StopIteration
		# 一个 epoch 加载完了
		elif self.__idx >= len(self.__loader):
			self.__idx = 0
			raise StopIteration
		# 下一个 batch
		else:
			out = self.__queue.get()
			self.__queue.task_done()
			self.__idx += 1
		return out

	def next(self):
		return self.__next__()

	def __len__(self):
		return len(self.__loader)

	@property
	def sampler(self):
		return self.__loader.sampler

	@property
	def dataset(self):
		return self.__loader.dataset

如果你对实现技术细节不感兴趣,也可直接拿来用。后面我将对相关细节展开讨论,包括:

  • python 中的并发与并行;
  • cuda 流:torch.cuda.Stream(device=device)

2. python 中的并发与并行

总所周知,由于 Global Interpreter Lock (GIL) 的存在,Python 语言中,任何时间点只有一个线程在执行,即便在多核 CPU 上,Python 的多线程也无法实现真正的并行计算。

GIL 的原因在于 Python 的内存管理并不是线程安全的。为了防止多个线程同时操作一个对象,造成数据混乱的问题,Python 设定了 GIL 来限制多线程的并发执行。因此,尽管你可以在 Python 中创建多线程,并且看起来他们是同时运行的,但实质上,在任一时刻,只有一个线程在执行。

既然如此,上面代码使用多线程是如何提高程序的效率的?再看:

然而,如果你的程序是 IO 密集型的,例如大量的网络请求或文件读写操作,那么使用多线程还是能显著提高程序的效率的,因为在等待 IO 的过程中,其他线程还可以继续执行。

数据的预加载应该算是 IO 吧,那模型计算和数据加载能并行吗

2.1 Numpy 和 PyTorch 底层计算是多线程并行的

Numpy 的底层实现是 C 语言,计算速度和并发性远胜于 Python,当我们使用 numpy 进行计算时,特别是复杂的矩阵运算,Python 程序会把这个任务抛给底层的 C 语言进行计算,从而能够使用 CPU 多核。验证:

import time
import numpy as np

def dot():
	start = time.time()
	a = np.random.randn(10000, 10000)
	b = np.random.randn(10000, 10000)
	np.dot(a, b)
	end = time.time()
	print(end - start)

dot()

验证代码用 numpy.dot() 计算两个 10000 10000 10000 维的矩阵乘法,观察 CPU 的使用效率(i5-10400,6核心12线程),发现 CPU 使用率很快从不足 20% 提升至 80% 左右。计算时间约为 15s

为了确定是否真的使用了多核,再设计一个 Python 计算程序:

import time

def add():
	cnt = 1
	start = time.time()
	for i in range(500000000):  # 累加
		cnt += 1
	end = time.time()
	print(end - start)

add()

五亿次加法运算,耗时约 20s,CPU 使用率全程维持在 20% 以下。如此说来,numpy 确实是在使用多核并行计算。

下面看一看 Python 多线程能不能使它们并行计算:

import threading
import time
import numpy as np

def dot():
	start = time.time()
	a = np.random.randn(10000, 10000)
	b = np.random.randn(10000, 10000)
	np.dot(a, b)
	end = time.time()
	print(end - start)

def add():
	cnt = 1
	start = time.time()
	for i in range(500000000):
		cnt += 1
	end = time.time()
	print(end - start)

t = threading.Thread(target=dot)

s = time.time()
add()
t.start()
t1.join()
e = time.time()
print(e - s)

输出:

15.057043313980103
23.129913806915283
23.13091516494751

如果说整个程序只能同时使用一个 CPU 核,那么整体计算时间应该是两部分计算时间的和 35s 左右,但这里只用了 23s,可见 numpy 底层并行计算是实锤了。而且,这两个函数的计算是并行的,即 np.dot() 在计算的时候,add() 也在计算。为什么 add 计算相比其单独运行时多了 3s?而 np.dot() 计算时间基本没变?

可以排除 CPU 资源不够的可能,否则的话,np.dot() 的计算时间也要加长;再者我观察了 CPU 利用率,全程未达到 100%。我觉得这是线程切换的开销add() 可能不是一直在运行的,多个 Python 线程还是只能使用一个 CPU 核,线程之间交替执行,只不过 np.dot() 线程在离开后,底层运行还在继续,而 add() 线程离开后,其不再运行。即:有那么 3s 时间,add() 没运行,“单核 CPU” 转向了线程 np.dot() 检查计算结果是否已返回。

再增加一个 numpy 计算任务线程:

...
t1 = threading.Thread(target=dot)
t2 = threading.Thread(target=dot)

s = time.time()
add()
t1.start()
t2.start()
t1.join()
t2.join()
e = time.time()
print(e - s)

输出:

25.624603986740112
27.81219220161438
30.751672983169556
30.752644538879395

时间增加了不少,基本快赶上计算一次 dot() 时间的两倍了。这大概是由于 CPU 的计算达到了极限:

CPU 利用率长时间维持在 100%。

以上验证对于 PyTorch 也是一样的。

结论:numpy 和 pytorch 的计算不受 GIL 的限制,可以使用 CPU 多核;一个线程中,numpy 和 pytorch 将计算丢给底层的 C/C++ 语言后,“等待计算结果”类似于 IO,会释放 GIL 锁,而计算还在继续,其他 python 线程可以得到执行。
推论:使用 GPU 计算是同样的道理,python 程序将计算丢给 GPU 后,等待计算结果,当前线程阻塞,释放 GIL 锁,其他 python 线程得以执行,从而提高计算效率。

3. torch.cuda.Stream(device=device)


torch.cuda.Stream 是 PyTorch 库中的一个类,用于管理 GPU 上的异步操作。

在 GPU 上执行计算任务时,通常可以使用多个流(stream)来并行执行不同的操作。每个流都有自己的命令队列,可以独立地执行操作,从而提高计算效率。torch.cuda.Stream 就是用来创建和管理这些流的。

使用 torch.cuda.Stream,可以将一系列 GPU 操作放入一个流中,并且可以通过调用流的 synchronize() 方法来等待流中所有操作完成。这对于需要处理多个 GPU 操作的情况非常有用。

以下是一个使用 torch.cuda.Stream 的示例代码:

import torch

stream = torch.cuda.Stream()  # 创建流对象

with torch.cuda.stream(stream):  # 在流中执行操作
	# 执行GPU操作
	# ...

stream.synchronize()  # 等待流中操作完成

在上述示例中,我们首先创建了一个 torch.cuda.Stream 对象 stream。然后,我们使用 with 语句块将一些 GPU 操作放入流中执行。最后,我们调用 stream.synchronize() 来等待流中的操作完成。

通过使用 torch.cuda.Stream,我们可以更灵活地控制 GPU 操作的执行顺序和并行性,以优化计算性能。


以上是 GPT3.5 给出的关于 torch.cuda.Stream 的简介。另外,还可参考教程《如何在 Pytorch 中使用 CUDA 流(CUDA stream)》 讲的不错。我现在将其搬过来:


什么是 CUDA 流(CUDA stream)?

CUDA 流是一种在 GPU 上并行执行操作的机制。在默认情况下,PyTorch 会在默认的流上执行所有的操作,即在主流(default stream)上进行。但是,当我们有一些可以并行执行的操作时,通过将这些操作分配到不同的流上,我们可以在 GPU 上更有效地利用计算资源。

第一句就强调:并行执行操作的机制。

如何创建 CUDA 流?

可以通过 torch.cuda.Stream() 函数来创建 CUDA 流:

stream = torch.cuda.Stream()

使用 torch.cuda.Stream() 函数创建了一个名为 stream 的 CUDA 流。

如何使用 CUDA 流?

通过 with 上下文管理操作,并使用 stream.synchronize() 方法等待操作完成:

import torch

# 创建两个CUDA流
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()

# 分别将操作记录到两个流上
with torch.cuda.stream(stream1):
	# 执行操作1
	# ...

with torch.cuda.stream(stream2):
	# 执行操作2
	# ...

# 等待两个流上的操作完成
torch.cuda.synchronize(stream1)
torch.cuda.synchronize(stream2)

我们创建了两个 CUDA 流 stream1stream2。然后,在两个流上分别记录操作,并使用torch.cuda.synchronize() 方法等待这些操作完成。

如何利用 CUDA 流提高性能?

一种常见的用法是将计算数据传输操作分配到不同的流上,从而实现计算和数据传输的并行执行


3.1 对 PreDataLoader 中 CUDA 流的解释

with torch.cuda.stream(self.__load_stream):
	return sample.to(self.__device, non_blocking=True)

这一句 sample.to(self.__device, non_blocking=True) 算是数据传输吧,它处在一个数据预加载线程中,想要与模型计算并行。那么按照上面的教程:一个 CUDA 流中的操作是顺序执行的,模型计算使用的是默认流(default stream),平时我们的代码 sample.to(device) 也使用了默认流,这意味着数据的传输和模型计算是串行的。

所以,PreDataLoader 中定义了一个新的 CUDA 流,把 sample.to(self.__device, non_blocking=True) 放入这个新 CUDA 流,就可以和模型计算并行了。

4. @property

@property 是一个装饰器,用于将类的方法转换为属性。通过使用 @property,您可以定义一个方法,并将其作为实例的属性来访问,而不需要使用函数调用的语法。

下面是一个示例,说明如何使用 @property 装饰器:

class Circle:
	def __init__(self, radius):
		self.radius = radius

	@property
	def diameter(self):
		return 2 * self.radius

	@diameter.setter
	def diameter(self, value):
		self.radius = value / 2


# 创建 Circle 对象
circle = Circle(5)

# 访问 diameter 属性(实际上是调用了 diameter 方法)
print(circle.diameter)  # 输出:10

# 设置 diameter 属性(实际上是调用了 diameter.setter 方法)
circle.diameter = 14
print(circle.radius)  # 输出:7

在上面的示例中,Circle 类定义了一个 radius 实例变量和一个 diameter 方法(被 @property 装饰)。当我们像访问属性一样访问 circle.diameter 时,实际上是调用了 diameter 方法并返回其结果。

此外,我们还可以使用 @property 创建一个 setter 方法,用于设置属性的值。在示例中,diameter 属性的 setter 方法名为 diameter.setter,它接受一个参数 value,我们可以在 setter 方法中对 self.radius 进行更新。

总结:使用 @property 装饰器可以将一个方法定义为属性,并提供更加方便和易读的方式来访问和设置属性。

既然担心 Python 线程的 GIL 问题,为何不直接用多进程?

:多进程没那么好用,进程是重量级的,有独立的内存管理,共享内存是比较麻烦的:

import multiprocessing

class Int(object):
	def __init__(self, i):
		self.__int = i

	def add(self):
		self.__int += 1

	def print(self):
		print(self.__int)

def add(integer: Int):
	integer.add()
	integer.print()
	print(id(integer))

if __name__ == '__main__':
	a_integer = Int(0)
	p1 = multiprocessing.Process(target=add, args=(a_integer,))
	p2 = multiprocessing.Process(target=add, args=(a_integer,))
	p3 = multiprocessing.Process(target=add, args=(a_integer,))

	p1.start()
	p2.start()
	p3.start()

	add(a_integer)
	a_integer.print()

输出:

1
1839132811024
1
1
2091010788944
1
1721319788112
1
2095109213776

可见,各进程操作的 Int 对象不是同一个,即,创建子进程时传入参数会是参数的一份拷贝

如果将 multiprocessing.Process 换成 threading.Thread,则输出:

1
2691328945888
2
2691328945888
3
2691328945888
4
2691328945888
4

创建线程时传入参数会是参数对象本身

此外,子进程不能访问主线程的变量,如果:

def add(integer: Int):
	integer.add()
	integer.print()
	b_integer.add()  # 加一个主进程中的变量
	print(id(integer))

则会报错。而线程则可以

可以看到,PreDataLoader 中的线程是访问了主程序的数据了的,如果用进程,一是编程比较麻烦,二是效率也未必就高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/217912.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

知识蒸馏的蒸馏损失方法代码总结(包括:基于logits的方法:KLDiv,dist,dkd等,基于中间层提示的方法:)

有两种知识蒸馏方法:一种利用教师模型的输出概率(基于logits的方法)[15,14,11],另一种利用教师模型的中间表示(基于提示的方法)[12,13,18,17]。基于logits的方法利用教师的输出作为辅助信号来训练一个较小的…

图像万物分割——Segment Anything算法解析与模型推理

一、概述 在视觉任务中,图像分割任务是一个很广泛的领域,应用于交互式分割,边缘检测,超像素化,感兴趣目标生成,前景分割,语义分割,实例分割,泛视分割等。 交互式分割&am…

(c语言进阶)offsetof函数——计算结构体元素的偏移量

一.基本概念&#xff1a; 头文件&#xff1a;<stddef.h> offsetof(结构体名,结构体元素名) 返回值为size_t&#xff08;unsigned int&#xff09; 二.应用 例题&#xff1a; #include<stdio.h> #include<stddef.h> typedef struct S1 {char a;int b;c…

Clean 架构下的现代 Android 架构指南

Clean 架构下的现代 Android 架构指南 Clean 架构是 Uncle Bob 提出的一种软件架构&#xff0c;Bob 大叔同时也是 SOLID 原则的命名者。 Clean 架构图如下&#xff1a; 这张图描述的是整个软件系统的架构&#xff0c;而不是单体软件&#xff0c;其中至少包括服务端以及客户端…

JVM类加载全过程

Java虚拟机类加载的全过程&#xff0c;即加载&#xff0c;验证&#xff0c;准备&#xff0c;解析&#xff0c;初始化 一、加载 加载 是 类加载过程中的一个阶段&#xff0c; 有以下三部分组成 1&#xff09;通过一个类的全限定名来获取定义此类的二进制流 2&#xff09;将这…

【Java 基础】19 多线程基础

文章目录 进程和线程进程&#xff08;Process&#xff09;线程&#xff08;Thread&#xff09; 线程的创建1&#xff09;继承 Thread 类2&#xff09;实现 Runnable 接口3&#xff09;使用 Lambda 表达式4&#xff09;总结 线程的状态状态的分类状态间转换 多线程是一种 同时执…

Github无法打开

文章目录 一、问题二、解决2.1、科学上网&#xff08;使用中&#xff09;2.2、使用代理&#xff08;不稳定&#xff09;2.3、修改hosts&#xff08;得更新&#xff09;2.3.1、找到hosts文件2.3.2、复制hosts文件2.3.3、添加记录2.3.4、替换原来的hosts文件2.3.5、成功访问Githu…

CefSharp 获取POST(AJAX)、GET消息返回值(request)

CefSharp作为专门为爬虫工具开发的库比Selenium这种开发目的是页面测试工具然后用来做爬虫的工具要贴心得多。我们操作网页的时候发送或者做了某个动作提交表单之后需要知道我们的动作或者提交是否成功&#xff0c;因为有的页面会因为网络延迟问题提交失败&#xff0c;需要准确…

[Azure]azure磁盘加密(Windows/Linux) ADE(Azure Disk Encryption)

Azure 磁盘加密用于保护数据&#xff0c;对于Windows使用BitLocker对磁盘进行加密&#xff0c;同时与Key Vault集成&#xff0c;控制和管理Key和Secret。 本文利用Potal对磁盘进行加密 注&#xff1a;Azure DIsk Encryption 可能会导致VM重启&#xff0c;对VM造成影响&#xff…

哈希与哈希表

哈希表的概念 哈希表又名散列表&#xff0c;官话一点讲就是&#xff1a; 散列表&#xff08;Hash table&#xff0c;也叫哈希表&#xff09;&#xff0c;是根据关键码值(Key value)而直接进行访问的数据结构。也就是说&#xff0c;它通过把关键码值映射到表中一个位置来访问记…

基于SSM的老年公寓信息管理的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

nodejs微信小程序+python+PHP天天网站书城管理系统的设计与实现-计算机毕业设计推荐

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

QT 中 QMessageBox 的简单用法

效果 思路 // 创建一个question弹出对话框&#xff0c;添加两个按钮&#xff1a;Yes和NoQMessageBox *box new QMessageBox(QMessageBox::Question, "提示", "确认删除的信息吗&#xff1f;", QMessageBox::Yes | QMessageBox::No, this);box->button(…

成人学生钢笔练字快速入门,硬笔书法行书楷书教程合集

一、教程描述 虽然现在都是电脑打字&#xff0c;需要手写的场合越来越少&#xff0c;但是可以写一手人见人爱&#xff0c;花见花开的好字&#xff0c;仍然是很拉风很惊艳的&#xff0c;可以给人留下深刻印象。本套硬笔书法教程&#xff0c;大小40.90G&#xff0c;共有591个文件…

Python 网络爬虫(二):HTTP 基础知识

《Python入门核心技术》专栏总目录・点这里 文章目录 1. HTTP 协议简述2. HTTP 请求过程3. HTTP 的结构3.1 请求行3.2 请求头3.3 请求体3.4 状态行3.5 响应头3.6 响应体 4. Cookie 状态管理5. HTTP 请求示例6. 总结 大家好&#xff0c;我是水滴~~ 在准备学习网络爬虫之前&…

公有云迁移研究——AWS Route53

大纲 1 什么是Route 532 Route 53能做些什么# 3 通过DNS托管来实现分流3.1 创建DNS托管3.2 对托管创建记录对流量进行分配 4 通过流量策略来对流量进行分流4.1 创建流量策略 5 对比两者的区别6 推荐 在给客户从本地机房往AWS迁移的过程中&#xff0c;我们接到如下需求&#xff…

vue打印功能

安装 vue3-print-nb yarn add vue3-print-nb //或 npm install vue3-print-nb main.js中引入 vue3-print-nb import { createApp } from vue; import App from ./App.vue; const app createApp(App); // 打印插件 import print from vue3-print-nb app.use(print) // 页面…

公有云迁移研究——AWS Translate

大纲 1 什么是Translate2 Aws Translate是怎么运作的3 Aws Translate和Google Translate的区别4 迁移任务4.1 迁移原因 5 Aws Translate的Go demo6 迁移中遇到的问题6.1 账号和权限问题&#xff1a;6.2 小语种 1 什么是Translate Translate是一种文本翻译服务&#xff0c;它使…

制作一个RISC-V的操作系统三-编译与链接

文章目录 GCCGCC简介GCC的命令格式gcc -Egcc -cgcc -Sgcc -ggcc -vGCC的主要执行步骤GCC涉及的文件类型针对多个源文件的处理 ELFELF介绍ELF文件格式ELF文件处理相关工具&#xff1a;Binutils&#xff08;binary utility&#xff09;readlelf -hreadelf -S或readelf -SW&#x…

思维模型 路径依赖定律

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。难以摆脱的惯性。 1 路径依赖定律的应用 1.1 打破路径依赖定律的苹果 在 20 世纪 80 年代&#xff0c;苹果公司推出了 Macintosh 电脑&#xff0c;这是一款具有图形用户界面和鼠标的创新产…
最新文章