简介
在进行FastBuild优化的时候,需要串行的获取需要的组件的特征,之前是串行进行的,但是由于之前的设计存在问题,因此,总是很低效,主要是如下的原因:
- 镜像需要先下载,然后检测运行环境和检查镜像元数据
- 有些镜像比较大,下载很花时间,前端的请求,大概是15秒,之后就终止了。
- 检查镜像环境的时候,之前是串行进行的
博客 python concurrent.futures 模块线程处理详解介绍的不错
问题代码
def get_image_descriptor(self) -> ImageDescriptor:
"""
获取镜像描述信息
:return:
"""
descriptor = ImageDescriptor(self.image_name)
descriptor.kernel = self.get_kernel_artifact()
descriptor.os = self.get_os_artifact()
descriptor.package_manager = self.get_package_manager_artifact()
descriptor.pip = self.get_pip_artifact()
descriptor.conda = self.get_conda_artifact()
descriptor.python = self.get_python_artifact()
descriptor.image_id = self.image_id
descriptor.sshd = self.get_sshd_artifact()
descriptor.jupyter_lab = self.get_jupyter_lab_artifact()
return descriptor
优化如下:
def get_image_descriptor(self) -> ImageDescriptor:
"""
获取镜像描述信息
:return:
"""
descriptor = ImageDescriptor(self.image_name)
descriptor.image_id = self.image_id
result = self.get_artifact_result_parallel()
descriptor.kernel = result["get_kernel_artifact"]
descriptor.os = result["get_os_artifact"]
descriptor.package_manager = result["get_package_manager_artifact"]
descriptor.pip = result["get_pip_artifact"]
descriptor.conda = result["get_conda_artifact"]
descriptor.python = result["get_python_artifact"]
descriptor.sshd = result["get_sshd_artifact"]
descriptor.jupyter_lab = result["get_jupyter_lab_artifact"]
return descriptor
def get_all_artifact_funcs(self) -> List:
return [self.get_kernel_artifact, self.get_os_artifact, self.get_package_manager_artifact,
self.get_pip_artifact, self.get_conda_artifact, self.get_python_artifact,
self.get_sshd_artifact, self.get_jupyter_lab_artifact]
def get_artifact_result_parallel(self):
# 使用线程池执行所有的artifact获取函数
with concurrent.futures.ThreadPoolExecutor() as executor:
# 执行所有函数并将结果映射到一个字典中
results = {func.__name__: executor.submit(func) for func in self.get_all_artifact_funcs()}
# 等待所有任务完成并更新descriptor
res = {}
for name, future in results.items():
res[name] = future.result()
return res
Python代码演示并行和串行的影响
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
"""
@author: songquanheng
@email: wannachan@outlook.com
@time: 2024年4月29日14:12:03
@desc: 测试并行函数
"""
import concurrent
from time import sleep
import time
from typing import List
def cost_time(func):
def fun(*args, **kwargs):
t = time.perf_counter()
result = func(*args, **kwargs)
print(f'func {func.__name__} cost time:{time.perf_counter() - t:.8f} s')
return result
return fun
def get_ret_value():
"""
这是一个需要花费1秒的函数
:return:
"""
sleep(1)
return 12
def get_all_artifact_funcs() -> List:
return [get_ret_value, get_ret_value, get_ret_value,
get_ret_value, get_ret_value, get_ret_value,
get_ret_value, get_ret_value]
@cost_time
def serial():
start = time.perf_counter()
for func in get_all_artifact_funcs():
print(func())
print(f'serial coast:{time.perf_counter() - start:.8f}s')
@cost_time
def parallel():
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
# 执行所有函数并将结果映射到一个字典中
results = {func.__name__: executor.submit(func) for func in get_all_artifact_funcs()}
# 等待所有任务完成并更新descriptor
res = {}
for name, future in results.items():
res[name] = future.result()
print(res)
print(f'parallel coast:{time.perf_counter() - start:.8f}s')
def get_artifact_result_parallel(self):
with concurrent.futures.ThreadPoolExecutor() as executor:
# 执行所有函数并将结果映射到一个字典中
results = {func.__name__: executor.submit(func) for func in self.get_all_artifact_funcs()}
# 等待所有任务完成并更新descriptor
res = {}
for name, future in results.items():
res[name] = future.result()
return res
if __name__ == '__main__':
serial()
parallel()