【brpc学习实践十二】bthread

概览

bthread(代码)是baidu-rpc使用的M:N线程库,是其稳定和高效的关键组件。能更好地利用多核cpu,能在pthread中运行,需要注意的是,bthread的work stealing机制会da让任务pthread发生切换,从而让thread_local变量不可信,通常在bthread_usleep或这join的时候就有可能发生切换。

thread_local SomeObject obj;
...
SomeObject* p = &obj;
p->bar();
bthread_usleep(1000);
p->bar();

解决方案:

  • 不使用线程级变量传递业务数据。这是一种槽糕的设计模式,依赖线程级数据的函数也难以单测。判断是否滥用:如果不使用线程级变量,业务逻辑是否还能正常运行?线程级变量应只用作优化手段,使用过程中不应直接或间接调用任何可能阻塞的bthread函数。比如使用线程级变量的tcmalloc就不会和bthread有任何冲突。
  • 如果一定要(在业务中)使用线程级变量,使用bthread_key_create和bthread_getspecific。另外,bthread没有优先级,如果需要线程优先级,还是需要使用pthread。

全局视图

在进入代码实现介绍前,通过一张图了解bthread库的主要类/结构体,以及他们之间的关系。
在这里插入图片描述

TaskControl是单例模式的全局控制类。
TaskGroup由每一个Worker线程进行创建,和Worker线程一一对应。
TaskMeta是描述bthread任务的结构体,里面保存了bthread任务的各种信息,包括id,状态,任务函数和参数,栈空间等等。
ParkingLot则是通过futex实现的线程同步工具,用来唤醒(有任务提交时)和阻塞(无任务时)挂在上面的Worker线程(存在多个ParkingLot的目的是减少竞争)。
在关系上,TaskControl中创建了Worker线程,然后Worker线程在线程入口函数里面创建属于自己的TaskGroup。TaskGroup内有rq和remote_rq两个队列,里面保存的是外部提交需要执行的bthread任务id。

所以简单描述bthread库的运行过程就是:外部提交一个bthread任务到某个TaskGroup中的任务队列,然后通过ParkingLot唤醒休眠的Worker线程,Worker线程会从自己的TaskGroup中取出或者其他Worker线程的TaskGroup中“偷”出bthread任务进行执行。某种意义上来说,和线程池有些类似,但是bthread任务粒度更小,所以bthread库也被称为一种类协程库。

这种“偷”任务的实现称为work stealing机制。它保证了即使某一个Worker线程被阻塞,它内部任务队列的bthread任务也可以被其他Worker线程“偷取”执行成功。所以有时候在bthread任务中调用了bthread_join / bthread_usleep等函数把自己切出后,等到函数返回继续执行的时候会发现执行自己的线程已经变了,这时pthread local的数据已经失效,这是在使用的时候一个需要注意的点。

代码剖析

接下来开始介绍bthread库主体功能的代码实现,包括bthread库的初始化、bthread任务的提交、以及bthread任务的执行和较为核心的栈空间切换。
具体的,我们从一个bthread和pthread比较的demo开始。从功能、函数参数和调用形式上,两者非常相似,都是实现后台运行thread_func任务函数。但bthread额外拥有唯一标识自己的bthread id。

#include <iostream>

#include "bthread.h"

const char* bthread_flag = "bthread";
const char* pthread_flag = "pthread";

void* thread_func(void* args) {
   

    std::cout << "flag: " << (char*)(args) <<
            ", pthread_id: " << pthread_self() <<
            ", bthread_id: " << bthread_self() << std::endl;
    return nullptr;
}

int main(int argc, char** argv) {
   
    bthread_t bth;
    pthread_t pth;
    if (bthread_start_background(&bth, nullptr, thread_func, (void*)bthread_flag) != 0) {
   
        std::cout << "fail to create bthread" << std::endl;
        return -1;
    }
    if (pthread_create(&pth, nullptr, thread_func, (void*)pthread_flag) != 0) {
   
        std::cout << "fail to create pthread" << std::endl;
        return -1;
    }
    bthread_join(bth, nullptr);
    pthread_join(pth, nullptr);
    return 0;
}
// $./output/bin/bthread_test 
// flag: pthread, pthread_id: 140281875785472, bthread_id: 0
// flag: bthread, pthread_id: 140282267948800, bthread_id: 4294968064

初始化

bthread_start_background是我们启动一个bthread任务常用的接口函数。
它会首先判断自己的调用是不是由某个Worker线程发起的。这在BRPC框架下是很常见的场景,因为RPC回调中的主体业务代码都是运行在Worker线程内的。它判断的依据是pthread local级别的TaskGroup指针变量tls_task_group是否为空。之前有提到TaskGroup和Worker线程一一对应,实现的方式就是在Worker线程创建好TaskGroup后,赋值给自己的线程变量tls_task_group。

所以如果tls_task_group不为空,这说明bthread库已经初始化过了,并且位于Worker线程中,直接调用TaskGroup的start_background函数提交bthread任务到它的任务队列;否则就需要尝试进行bthread库的初始化,然后从其中随机挑选一个TaskGroup同样调用它的start_background函数进行任务提交。前面的demo很明显走的是后者的逻辑,需要进行bthread库的初始化。

int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) __THROW {
   
    bthread::TaskGroup* g = bthread::tls_task_group;
    if (g) {
   
        // start from worker
        return g->start_background<false>(tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

BASE_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
                      const bthread_attr_t* __restrict attr,
                      void * (*fn)(void*),
                      void* __restrict arg) {
   
    TaskControl* c = get_or_new_task_control();
    if (NULL == c) {
   
        return ENOMEM;
    }
    ...
    return c->choose_one_group()->start_background<true>(
        tid, attr, fn, arg);
}

get_or_new_task_control函数实现了全局单例类TaskControl的构造和初始化。它是通过double-checked locking pattern(DCLP)方式实现的单例模式,使用了memory fence保证了正确性(感兴趣的可以看这篇论文(https://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf)和文章(https://zh.wikipedia.org/wiki/%E5%8F%8C%E9%87%8D%E6%A3%80%E6%9F%A5%E9%94%81%E5%AE%9A%E6%A8%A1%E5%BC%8F#Microsoft_Visual_C++_%E4%B8%AD%E7%9A%84%E4%BD%BF%E7%94%A8),C++11后通常用std::call_once实现单例模式)。

inline TaskControl* get_or_new_task_control() {
   
    base::atomic<TaskControl*>* p = (base::atomic<TaskControl*>*)&g_task_control;
    TaskControl* c = p->load(base::memory_order_consume);
    if (c != NULL) {
   
        return c;
    }
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
    c = p->load(base::memory_order_consume);
    if (c != NULL) {
   
        return c;
    }
    c = new (std::nothrow) TaskControl;
    if (NULL == c) {
   
        return NULL;
    }
    int concurrency = FLAGS_bthread_min_concurrency > 0 ?
        FLAGS_bthread_min_concurrency :
        FLAGS_bthread_concurrency;
    if (c->init(concurrency) != 0) {
   
        LOG(ERROR) << "Fail to init g_task_control";
        delete c;
        return NULL;
    }
    p->store(c, base::memory_order_release);
    return c;
}

TaskControl的init函数,首先检查了入参concurrency的正确性,然后创建了concurrency数量的Worker线程。
默认情况下会创建concurrency = FLAGS_bthread_concurrency = 8 + 1 = 9条线程。
BRPC框架下默认情况会设置concurrency为系统CPU核心数 + 1,也可以通过ServerOptions.num_threads配置Worker线程的数量。

int TaskControl::init(int concurrency) {
   
    if (_concurrency != 0) {
   
        LOG(ERROR) << "Already initialized";
        return -1;
    }
    if (concurrency <= 0) {
   
        LOG(ERROR) << "Invalid concurrency=" << concurrency;
        return -1;
    }
    _concurrency = concurrency;
    ...
    _workers.resize(_concurrency);   
    for (int i = 0; i < _concurrency; ++i) {
   
        const int rc = pthread_create(&_workers[i], NULL, worker_thread<true>, this);
        if (rc) {
   
            LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
            return -1;
        }
    }
    ...
    while (_ngroup == 0) {
   
        usleep(100);  // TODO: Elaborate
    }
    return 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/206790.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于单片机的蔬菜大棚温湿度控制系统

1&#xff0e;设计任务 利用AT89C51单片机为核心控制元件,设计一个节日彩灯门&#xff0c;设计的系统实用性强、操作简单&#xff0c;实现了智能化、数字化。 本系统通过SHT11传感器测量出大棚内的温湿度&#xff0c;并将温湿度电信号传至单片机AT89C51,单片机系统通过预先设…

双向ESD保护 汽车级TVS二极管 ESD9B3.3ST5G工作原理、特性参数、封装形式

什么是汽车级TVS二极管&#xff1f; TVS二极管是一种用于保护电子电路的电子元件。它主要用于电路中的过电压保护&#xff0c;防止电压过高而损坏其他部件。TVS二极管通常被称为“汽车级”是因为它们能够满足汽车电子系统的特殊要求。 在汽车电子系统中&#xff0c;由于车辆启…

leetCode 78.子集 + 回溯算法 + 图解 + 笔记

给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。解集 不能 包含重复的子集。你可以按 任意顺序 返回解集 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[],[1],[2],[1,2],[3],[1…

智慧机场视频监控系统方案:AI智能助力机场智慧运营

一、方案背景 随着人们生活物质水平的上升&#xff0c;人们对机场的需求也日益增多&#xff0c;在民航新建、迁建、扩建机场项目猛增的同时&#xff0c;也需同步配备相应的安防监控系统&#xff0c;以满足民航机场安全管理要求和机场运营业务的高速发展。 二、方案概述 智慧机…

【服务部署】常用内网穿透方案

一、前言 由于一些开发及使用需求&#xff0c;需要将内网机器端口映射到公网&#xff0c;达到公网访问内网环境的目的 本文主要介绍几种常用的内网穿透方案 ssh远程端口转发 部署简单&#xff0c;无需额外安装软件包 frp反向代理 功能配置丰富&#xff0c;部署相对复杂&#…

frp 配置内网访问

frp介绍 frp 是一个开源、简洁易用、高性能的内网穿透软件&#xff0c;支持 tcp, udp, http, https 等协议。frp 项目官网是 https://github.com/fatedier/frp 下载地址&#xff1a; https://github.com/fatedier/frp/releases frp工作原理 服务端运行&#xff0c;监听一个…

键盘敲入一个字母,操作系统发生了什么?

一、设备控制器 我们的电脑设备可以接非常多的输入输出设备&#xff0c;比如鼠标键盘网卡硬盘打印机等&#xff0c;每个设备的用法和功能都不相同&#xff0c;那操作系统是如何把这些输入输出设备统一管理的呢&#xff1f; 为了屏蔽设备之间的差异&#xff0c;每个设备都有一…

前端项目环境的搭建

一、下载并且安装Node&#xff08;不安装node&#xff0c;就安装nvm。nvm安装教程&#xff09;&#xff1a; 1.官网下载Node&#xff1a;https://nodejs.org/en/ 2.测试nodejs安装是否成功&#xff1a; 在windows powerShell中输入node -v 和 npm -v&#xff0c;看到版本号就…

【23-24 秋学期】NNDL 作业9 RNN - SRN

简单循环网络&#xff08;Simple Recurrent Network&#xff0c;SRN&#xff09;只有一个隐藏层的神经网络&#xff0e; 目录 1. 实现SRN &#xff08;1&#xff09;使用Numpy &#xff08;2&#xff09;在1的基础上&#xff0c;增加激活函数tanh &#xff08;3&#xff0…

rabbitMQ镜像队列的使用

在rabbitMQ集群中&#xff0c;默认发送消息时&#xff0c;队列默认时在一个节点上存在的。 我们以node01 node02 node03三节点集群为例&#xff0c;在node01声明队列发送消息后&#xff0c;发现&#xff1a; 测试队列只在节点node01上出现。 我们手动停止node01后&#xff0c…

#HarmonyOS:软件安装window和mac预览Hello World

Window软件地址 https://developer.harmonyos.com/cn/develop/deveco-studio#download 安装的建议 这个界面这样选&#xff0c;其他界面全部按照默认路径往下走&#xff01;&#xff01;&#xff01; 等待安装… 安装环境错误处理 一般就是本地node配置异常导致&#xff…

Nginx基线检查

扩展知识: Nginx主配置文件:/etc/nginx/nginx.conf 这是Nginx的主要配置文件,用于配置全局的设置、HTTP块、事件处理、邮件等内容。 打开并编辑配置文件 vim /etc/nginx/nginx.conf 一、关于禁止显示服务器版本号和操作系统版本信息: 简介: 在错误页面和响应头中显示…

SAP_ABAP_编程基础_逻辑流控制_比较运算符 / 比较字符串和数字串 / 比较二进制位结构 / 编程分支和循环

SAP ABAP 顾问&#xff08;开发工程师&#xff09;能力模型_Terry谈企业数字化的博客-CSDN博客文章浏览阅读470次。目标&#xff1a;基于对SAP abap 顾问能力模型的梳理&#xff0c;给一年左右经验的abaper 快速成长为三年经验提供超级燃料&#xff01;https://blog.csdn.net/j…

html/css中用float实现的盒子案例

运行效果&#xff1a; 代码部分&#xff1a; <!doctype html> <html> <head> <meta charset"utf-8"> <title>无标题文档</title> <style type"text/css">.father{width:300px; height:400px; background:gray;…

云轴科技ZStack助力龙芯打造IT产业新生态

11月28日&#xff0c;2023龙芯产品发布暨用户大会在国家会议中心启幕。大会以“到中流击水”为主题&#xff0c;现场发布新一代通用处理器龙芯3A6000、打印机主控芯片龙芯2P0500重磅成果&#xff0c;并对外公布龙芯处理器核IP及龙芯自主指令系统架构授权计划。作为龙芯的卓越行…

element-ui表格滚动效果,el-table滚动条样式重置

项目首页需要展示一个表格滚动区域&#xff0c;特此来记录一下 HTML <div class"table-box" mouseenter"mouseenter" mouseleave"mouseleave"><el-table :data"tableList" border height"400px" v-loading"…

spring日志输出到elasticsearch

1.maven <!--日志elasticsearch--><dependency><groupId>com.agido</groupId><artifactId>logback-elasticsearch-appender</artifactId><version>3.0.8</version></dependency><dependency><groupId>net.l…

单机zk安装与zk四字命令

一、下载 Apache ZooKeeper可以在 Linux 系统中使用 wget 命令直接下载&#xff0c;官网地址 Apache ZooKeeper 二、解压 tar -zxvf apache-zookeeper-3.8.3-bin.tar.gz 进去解压的目录中&#xff0c; 进入到 zk 解压目录的 conf 目录&#xff0c;复制 zoo_sample.cfg 文件&a…

leetCode 93.复原 IP 地址 + 回溯算法 + 图解 + 笔记

93. 复原 IP 地址 - 力扣&#xff08;LeetCode&#xff09; 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&#xff09;&#xff0c;整数之间用 . 分隔。 例如&#xff1a;"0.1.2.201" 和 "192.168.1.1…

基于AT89C51单片机的电子闹钟设计

1&#xff0e;设计任务 利用AT89C51单片机为核心控制元件,设计一个电子闹钟&#xff0c;设计的系统实用性强、操作简单&#xff0c;实现了智能化、数字化。 &#xff08;1&#xff09;按开始键自动进入时间显示&#xff0c;开始为0&#xff0c;按K1键进入更改时间&#xff0c…
最新文章