[Python学习笔记]multiprocess 多进程间变量共享

在TDengine 跨版本迁移实战章节中提到了进行 TDengine 数据迁移的 Python脚本。脚本支持多线程或多进程模式。

但是使用多进程模式时,会出现问题,如下:

多线程模式:

python3 datac_com23v2.py -p
[2023-11-17 13:43:00,526] dataC/multi_thread(1975/MainThread) INFO - --------------------begin------------------
[2023-11-17 13:43:00,527] dataC/multi_thread(1975/MainThread) INFO - ##############################
[2023-11-17 13:43:24,346] dataC/multi_thread(1975/MainThread) INFO - ## 9038/9038 Tables  and 27425 Rows are proceed.
[2023-11-17 13:43:24,347] dataC/multi_thread(1975/MainThread) INFO - ## 0 tables created.
[2023-11-17 13:43:24,348] dataC/multi_thread(1975/MainThread) INFO - ##############################
[2023-11-17 13:43:24,349] dataC/multi_thread(1975/MainThread) INFO - --------------------end------------------

多进程模式:

python3 datac_com23v2.py 
[2023-11-17 14:08:57,023] dataC/multi_thread(2208/MainThread) INFO - --------------------begin------------------
[2023-11-17 14:08:57,024] dataC/multi_thread(2208/MainThread) INFO - ##############################
[2023-11-17 14:09:19,175] dataC/multi_thread(2208/MainThread) INFO - ## 0/9038 Tables  and 0 Rows are proceed.
[2023-11-17 14:09:19,177] dataC/multi_thread(2208/MainThread) INFO - ## 0 tables created.
[2023-11-17 14:09:19,177] dataC/multi_thread(2208/MainThread) INFO - ##############################
[2023-11-17 14:09:19,178] dataC/multi_thread(2208/MainThread) INFO - --------------------end------------------

脚本虽然正常运行了,但是输出结构中没有打印出进度信息。造成这个问题的原因在于多进程模式变量的共享需要特殊处理。

具体处理方法为在调用 multiprocessing.Process 先声明共享变量,然后在进程中使用。

修改步骤如下:

  1. multi_thread 函数中,添加共享变量的定义
    m_tb = multiprocessing.Array('i',threadNum)
    m_rw = multiprocessing.Array('i',threadNum)
    m_ctb = multiprocessing.Array('i',threadNum)
    
  2. 将共享变量传递个子函数
    target = process_func, args=(tblist, tnum, listnum, m_tb, m_rw, m_ctb)
    
  3. 在子函数中将记录写入共享变量
    m_tb[tnum] = len(tb_proced)
    m_rw[tnum] = sum_list(rw_proced)
    m_ctb[tnum] = len(ctb_proced)   
    
  4. 对记录进行统计输出
    if wmethod == 'process':
         logger.info("## "+str(sum_list(m_tb[:]))+"/"+str(len(tblist))+" Tables  and "+str(sum_list(m_rw[:]))+" Rows are proceed.")
         logger.info("## "+str(sum_list(m_ctb[:]))+" tables created.")
    

部分代码如下:

def process_func(tb_list, tnum, list_num, m_tb, m_rw, m_ctb):
    slnum = 1
    irss = requests.session()
    erss = requests.session()
    for ll in range(list_num):
        ii = tnum*list_num+ll
        if ii < len(tb_list):
            etbname = str(tb_list[ii])
            itbname = etbname
            if tableonly == 'false':
                export_table(etbname, itbname, irss, erss)
                slnum += 1
                if slnum == 1000 :
                    time.sleep(1)
                    logger.info("Sleep 1 sec.")
                    slnum = 1
            else:
                if tableonly == 'true':
                    export_table_only(etbname, itbname, irss, erss)
                else:
                    logger.error("CfgFile: tableonly set error!")
    irss.close()
    erss.close()
    m_tb[tnum] = len(tb_proced)
    m_rw[tnum] = sum_list(rw_proced)
    m_ctb[tnum] = len(ctb_proced)

def multi_thread(tblist, wmethod):
    logger.info('--------------------begin------------------')
    logger.info("##############################")
    threads = []
    if len(tblist) < threadNum:
        irss = requests.session()
        erss = requests.session()
        for i in range(len(tblist)):
            tbname = tblist[i]
            export_table(tbname, irss, erss)
            proce = str(i+1)+'/'+str(len(tblist))
            logger.info(proce)
    else:
        listnum = int(len(tblist)/threadNum)+1
        if wmethod == 'process':
            m_tb = multiprocessing.Array('i',threadNum)
            m_rw = multiprocessing.Array('i',threadNum)
            m_ctb = multiprocessing.Array('i',threadNum)
            for tnum in range(threadNum):
                t = multiprocessing.Process(
                    target=process_func, args=(tblist, tnum, listnum, m_tb, m_rw, m_ctb))
                threads.append(t)
        else:
            for tnum in range(threadNum):
                tname = str('Thread_'+str(tnum))
                t = threading.Thread(target=thread_func,
                                     name=tname, args=(tblist, tnum, listnum))
                threads.append(t)
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    if wmethod == 'process':
        logger.info("## "+str(sum_list(m_tb[:]))+"/"+str(len(tblist))+" Tables  and "+str(sum_list(m_rw[:]))+" Rows are proceed.")
        logger.info("## "+str(sum_list(m_ctb[:]))+" tables created.")
    else:
        logger.info("## "+str(sum_list(tb_proced))+"/"+str(len(tblist))+" Tables  and "+str(sum_list(rw_proced))+" Rows are proceed.")
        logger.info("## "+str(sum_list(ctb_proced))+" tables created.")
    logger.info("##############################")
    logger.info('--------------------end------------------')

再次运行程序,已经能正常输出结构了。

datac_com23v2.py -p
[2023-11-17 14:52:30,965] dataC/multi_thread(2840/MainThread) INFO - --------------------begin------------------
[2023-11-17 14:52:30,966] dataC/multi_thread(2840/MainThread) INFO - ##############################
[2023-11-17 14:52:38,869] dataC/multi_thread(2840/MainThread) INFO - ## 9038/9038 Tables  and 27425 Rows are proceed.
[2023-11-17 14:52:38,870] dataC/multi_thread(2840/MainThread) INFO - ## 0 tables created.
[2023-11-17 14:52:38,871] dataC/multi_thread(2840/MainThread) INFO - ##############################
[2023-11-17 14:52:38,872] dataC/multi_thread(2840/MainThread) INFO - --------------------end------------------

知识点

multiprocess 进程间共享变量有三种方式:Value, Array 和 Manager。前两者是共享内存,支持的数据类型有限,最后一种是使用服务进程管理需要共享的变量,支持的数据类型更丰富,但速度不如前两者。

参考

  1. Sharing state between processes

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/165915.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【智能家居】5、主流程设计以及外设框架编写与测试

目录 一、主流程设计 1、工厂模式结构体定义 &#xff08;1&#xff09;指令工厂 inputCmd.h &#xff08;2&#xff09;外设工厂 controlDevices.h 二、外设框架编写 1、创建外设工厂对象bathroomLight 2、编写相关函数框架 3、将浴室灯相关操作插入外设工厂链表等待被调…

Activiti7工作流

文章目录 一、工作流介绍1.1 概念1.2 适用行业1.3 应用领域1.4 传统实现方式1.5 什么是工作流引擎 二、什么是Activiti7&#xff1f;2.1 概述2.2 Activiti7内部核心机制2.3 BPMN2.4 Activiti如何使用2.4.1 整合Activiti2.4.2 业务流程建模2.4.3 部署业务流程2.4.4 启动流程实例…

大反转!OpenAI董事会辞职,求奥特曼重返OpenAI?「奥特曼24小时流放」大揭秘...

大家好&#xff0c;我是二狗。 想必大家昨天都被Sam Altman被董事会解雇的事情刷屏了。 然而才仅仅过去一天&#xff0c;OpenAI 董事会就反悔了&#xff01;正和Sam Altman 商量让他重返CEO职位。 这一反转和Altman被炒鱿鱼一样突然&#xff0c;凄凄惨惨真真假假真真&#x…

340条样本就能让GPT-4崩溃,输出有害内容高达95%?OpenAI的安全防护措施再次失效

仅需340个示例微调GPT-4&#xff0c;即可绕过安全限制&#xff0c;让模型说出“枪支改装方法”、“生化武器制作过程”等有害内容&#xff1f; OpenAI的安全防护措施再次失效&#xff0c;攻击的成功率高达95%&#xff01; 近日&#xff0c;美国顶尖大学UIUC与斯坦福联合对GPT…

python 的 import 机制

引言 对于初学 python&#xff0c;或多或少在 import 一个 module 时遇到过 ImportError: attempted relative import with no known parent package 这样的错误信息。对于初学 python&#xff0c;遇到这样的问题是因为在执行 python xxx.py 程序时&#xff0c;xxx.py 程序中 …

【心得】基于flask的SSTI个人笔记

目录 计算PIN码 例题1 SSTI的引用链 例题2 SSTI利用条件&#xff1a; 渲染字符串可控&#xff0c;也就说模板的内容可控 我们通过模板 语法 {{ xxx }}相当于变相的执行了服务器上的python代码 利用render_template_string函数参数可控&#xff0c;或者部分可控 render_…

分库分表

分库&#xff0c;分表&#xff0c;分库分表 “只分库“&#xff0c;“只分表“&#xff0c;“既分库又分表" 何时分库 在面对高并发的情况下&#xff0c;数据库连接成为性能瓶颈。当数据QPS过高导致数据库连接数不足时&#xff0c;考虑分库。在读多写少的场景下&#x…

基于Vue+SpringBoot的超市账单管理系统 开源项目

项目编号&#xff1a; S 032 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S032&#xff0c;文末获取源码。} 项目编号&#xff1a;S032&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统设计3.1 总体设计3.2 前端设计3…

SQL零基础入门教程,贼拉详细!贼拉简单! 速通数据库期末考!(十)

SQL 函数 SQL 拥有很多可用于计数和计算的内建函数。 比如&#xff1a; AVG() - 返回平均值 COUNT() - 返回行数 MAX() - 返回最大值 MIN() - 返回最小值 SUM() - 返回总和 FIRST() - 返回第一个记录的值 LAST() - 返回最后一个记录的值 GROUP BY 学习SQL函数前&#xff0c…

基于卡尔曼滤波实现行人目标跟踪

目录 1. 作者介绍2. 目标跟踪算法介绍2.1 目标跟踪背景2.2 目标跟踪任务分类2.3 目标跟踪遇到的问题2.4 目标跟踪方法 3. 卡尔曼滤波的目标跟踪算法介绍3.1 所用数据视频说明3.2 卡尔曼滤波3.3 单目标跟踪算法3.3.1 IOU匹配算法3.3.2 卡尔曼滤波的使用方法 3.4 多目标跟踪算法 …

腾讯云轻量数据库1核1G评测和租用价格表

腾讯云轻量数据库测评&#xff0c;轻量数据库100%兼容MySQL 5.7和8.0&#xff0c;腾讯云提供1C1G20GB、1C1G40GB、1C2G80GB、2C4G120GB、2C8G240GB五种规格轻量数据库&#xff0c;阿腾云atengyun.com分享腾讯云轻量数据库测评、轻量数据库详细介绍、特性、配置价格和常见问题解…

【算法】最小生成树——普利姆 (Prim) 算法

目录 1.概述2.代码实现2.1.邻接矩阵存储图2.2.邻接表存储图2.3.测试 3.应用 1.概述 &#xff08;1&#xff09;在一给定的无向图 G (V, E) 中&#xff0c;(u, v) 代表连接顶点 u 与顶点 v 的边&#xff0c;而 w(u, v) 代表此边的权重&#xff0c;若存在 T 为 E 的子集且为无循…

华为模拟器dhcp实验

实验需求&#xff0c;pc1 pc2 pc3 获取到地址且能ping通&#xff0c;pc1 pc2 为地址池模式&#xff0c;pc3为接口模式 上配置 #sysname AR1# dhcp enable # interface GigabitEthernet0/0/0ip address 10.0.47.254 255.255.255.0 dhcp select relaydhcp relay server-ip 10.0…

认识.NET Aspire:高效构建云原生应用的利器

简介 在几天前的.NET 8发布会上&#xff0c;来自微软的Glenn Condron和David Fowler为我们演示了.NET Aspire&#xff0c;在Visual Studio的帮助下&#xff0c;它展现出了惊人的开发效率。 短短的十分钟内&#xff0c;David现场演示了如何轻松创建了一个具有服务发现&#xf…

基于不确定性感知的脑肿瘤分割多维互学习

Uncertainty-Aware Multi-Dimensional Mutual Learning for Brain and Brain Tumor Segmentation 一基于不确定性感知的脑肿瘤分割多维互学习背景贡献实验方法Uncertainty-Aware Mutual Learning&#xff08;具有不确定性的相互学习&#xff09; Thinking 一基于不确定性感知的…

设计模式常见面试题

简单梳理下二十三种设计模式&#xff0c;在使用设计模式的时候&#xff0c;不仅要对其分类了然于胸&#xff0c;还要了解每个设计模式的应用场景、设计与实现&#xff0c;以及其优缺点。同时&#xff0c;还要能区分功能相近的设计模式&#xff0c;避免出现误用的情况。 什么是…

Git精讲

Git基本操作 创建Git本地仓库 git initgit clone 配置Git git config [--global] user.name "Your Name" git config [--global] user.email "emailexample.com"–global是一个可选项。如果使用了该选项&#xff0c;表示这台机器上所有的Git仓库都会使…

Network(三)动态路由与ACL配置

一 三层交换机 1 三层交换机概述 三层交换二层交换三层转发 2 虚拟接口概述 在三层交换机上配置的VLAN接口为虚拟接口&#xff0c;使用Vlanif&#xff08;VLAN虚拟接口&#xff09;实现VLAN间路由&#xff0c;VLAN接口的引入使得应用更加灵活 三层交换机VLAN间通信的转发…

Cross-View Transformers for Real-Time Map-View Semantic Segmentation 论文阅读

论文链接 Cross-View Transformers for Real-Time Map-View Semantic Segmentation 0. Abstract 提出了 Cross-View Transformers &#xff0c;一种基于注意力的高效模型&#xff0c;用于来自多个摄像机的地图视图语义分割使用相机感知的跨视图注意机制隐式学习从单个相机视…

第93步 深度学习图像分割:PSPNet建模

基于WIN10的64位系统演示 一、写在前面 本期&#xff0c;我们继续学习深度学习图像分割系列的另一个模型&#xff0c;PSPNet。 二、PSPNet简介 &#xff08;1&#xff09;金字塔池化模块 (Pyramid Pooling Module) PSPNet的核心是其金字塔池化模块&#xff0c;该模块能够捕…
最新文章