早在19年5月就在某站上看到sylar的视频了,一直认为这是一个非常不错的视频,由于本人一直是自学编程,基础不扎实,也没有任何人的督促,没能坚持下去,每每想起倍感惋惜。恰逢互联网寒冬,在家无事,遂提笔再续前缘。
为了能更好的看懂sylar,本套笔记会分两步走,每个系统都会分为两篇博客。
分别是【知识储备篇】和【代码分析篇】
(ps:纯粹做笔记的形式给自己记录下,欢迎大家评论,不足之处请多多赐教)
QQ交流群:957100923
协程调度模块-代码分析篇
一、Schedule调度器是什么【线程级别的调度器】
我的博客已经停更一周了,并不是 五一 假期出去游玩导致的。
原因很简单:就是因为 sylar 视频中协程调度相关的知识过于复杂,导致自己反复观看了数遍才勉强理解一二。
要搞懂协程调度模块一上来就看sylar的视频或代码未免门槛太高了。
我总结了一下这部分知识为什么难以下咽:
- 知识点过多,将调度器、线程、协程糅合在一起讲解,原本基础就不扎实,这样一锅乱炖容易糊掉。
- 想法没有提前告知,不知道要做什么,在做什么,雾里看花。
- 弹幕的误导,sylar-yin 确实会有说错写错的地方,如果你不懂,那么请你百分百遵循他的写法。
- 不够坚持,不要放过每一个你不熟悉的知识点,请暂停视频,把不会的知识点了解了再往下(不要过分深入)。
本人属于不太聪明的类型,所以如果你和我一样,那么请稳扎稳打,别人学一天我们就学三天,别人学一个月,我们就学一年。
1.最普通的多线程使用
首先抛开协程,我们想要使用多线程来完成多个任务需要怎么做呢?
下面我举个例子(该案例中会开启三个子线程来分别执行 唱歌、跳舞、说唱):
#include <chrono>
#include <iostream>
#include <pthread.h>
#include <vector>
#include <list>
#include <unistd.h>
#include <functional>
#include "sylar/thread.h"
#include "sylar/mutex.h"
//测试用的全局累加计数字段
int num = 0;
//循环累加的次数
uint64_t loop_times = 100000000;
typedef sylar::Mutex MutexType;
MutexType g_mutx;
//唱歌的方法,会专门分配一个线程来处理
void sing(){
MutexType::Lock lock(g_mutx);
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout << sylar::Thread::GetName() << " sing~ " << num << std::endl;
}
//跳舞的方法,会专门分配一个线程来处理
void dance(){
MutexType::Lock lock(g_mutx);
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout <<sylar::Thread::GetName() << " dance~ " << num << std::endl;
}
//说唱的方法,会专门分配一个线程来处理
void rap(){
MutexType::Lock lock(g_mutx);
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout <<sylar::Thread::GetName() << " rap~ " << num << std::endl;
}
int main(){
std::cout << "====Main start====" << std::endl;
{
//记录当前时间
auto start = std::chrono::high_resolution_clock::now();
//创建线程用于执行唱歌的方法
sylar::Thread::ptr thr1(new sylar::Thread(&sing,"THREAD_1"));
//创建线程用于执行跳舞的方法
sylar::Thread::ptr thr2(new sylar::Thread(&dance,"THREAD_2"));
//创建线程用于执行说唱的方法
sylar::Thread::ptr thr3(new sylar::Thread(&rap,"THREAD_3"));
thr1->join();
thr2->join();
thr3->join();
//记录结束时间
auto end = std::chrono::high_resolution_clock::now();
//计算花费的时间
std::chrono::duration<double,std::milli> elapsed = end - start;
std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;
}
std::cout << "====Main end====" << std::endl;
return 0;
}
以下是输出(可以看到三个线程按顺序执行了):
===Main start===
THREAD_1 sing~ 100000000
THREAD_2 dance~ 200000000
THREAD_3 rap~ 300000000
Elapsed: 294
===Main end====
2.复用多线程
上述的代码中,我们有三个任务(唱歌、跳舞、说唱),同时我们开辟了三个线程来执行。
但是很多情况下我们的线程数量是需要控制在适当范围内的。
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
CPU核数为4核,一个任务线程cpu耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms,
那最佳线程数目:( 80 + 20 )/20 * 4 = 20。也就是设置20个线程数最佳。
线程的等待时间越大,线程数就要设置越大
1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话,一般设置 5 或 8
2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:cpu核数 / (1-0.9),核数为4的话,一般设置 40
我的机器是1核的,我这里其实适合使用两个线程,但是我在上述代码中使用了三个线程,那么多线程反而变成了低效的。
所以我们可以复用其中一个线程,在该线程执行完成任务后再执行其他的任务。
代码如下(改动很少,改动部分用 !!!标注了):
int main(){
std::cout << "====Main start====" << std::endl;
{
//记录当前时间
auto start = std::chrono::high_resolution_clock::now();
//创建线程用于执行唱歌的方法
sylar::Thread::ptr thr1(new sylar::Thread(&sing,"THREAD_1"));
//创建线程用于执行跳舞的方法
sylar::Thread::ptr thr2(new sylar::Thread(&dance,"THREAD_2"));
thr1->join();
thr2->join();
//!!!复用线程用于执行说唱的方法!!!
thr1.reset(new sylar::Thread(&rap,"THREAD_1"));
thr1->join();
//记录结束时间
auto end = std::chrono::high_resolution_clock::now();
//计算花费的时间
std::chrono::duration<double,std::milli> elapsed = end - start;
std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;
}
std::cout << "====Main end====" << std::endl;
return 0;
}
以下是输出:
===Main start====
THREAD_1 sing~ 100000000
THREAD_2 dance~ 200000000
THREAD_1 rap~ 300000000
Elapsed: 265
==Main end====
问题:
问题来了,以上的两份代码都存在以下问题:
1.就是需要程序员自己来控制线程的创建。
2.需要程序员手动来进行线程的调度。
3.需要程序员手动的去复用一个线程对象。
4.需要程序员手动的join对应的每一个线程。
这样是很繁琐的事情,而且一不小心就会出错,那么我们有什么好的办法吗?
3.调度器的出现
想要解决以上代码带来的问题,我们就要提出需求:
1.将创建线程的事情交由【调度器】执行,我们只需创建调度器时指定线程数量即可。
2.将线程调度和复用线程的事情交由【调度器】执行,我们只需提供所需执行的 任务 即可。
3.等待各个线程完成的jion()方法也由【调度器】执行,我们只需等待调度器结束即可。
综上所述,我们给出以下代码(记得先看main方法中的调度器使用,再看实现):
#include <chrono>
#include <iostream>
#include <pthread.h>
#include <vector>
#include <list>
#include <unistd.h>
#include <functional>
#include "sylar/thread.h"
#include "sylar/mutex.h"
//测试用的全局累加计数字段
int num = 0;
//循环累加的次数
uint64_t loop_times = 100000000;
//唱歌的方法,会专门分配一个线程来处理
void sing(){
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout << sylar::Thread::GetName() << " sing~ " << num << std::endl;
}
//跳舞的方法,会专门分配一个线程来处理
void dance(){
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout <<sylar::Thread::GetName() << " dance~ " << num << std::endl;
}
//说唱的方法,会专门分配一个线程来处理
void rap(){
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout <<sylar::Thread::GetName() << " rap~ " << num << std::endl;
}
//调度器
class Schedule {
public:
//定义锁,使用typedef方便锁类型替换
typedef sylar::Mutex MutexType;
//调度器构造函数,指定线程池的大小
Schedule(size_t size = 1)
:m_poll_size(size){
std::cout << "Specify a thread poll size of "<< size << std::endl;
}
//调度器析构函数
~Schedule(){
}
//调度方法
void run(){
//多个线程操作任务队列,涉及到线程安全,所以要加锁
MutexType::Lock lock(m_mutex);
//定义一个任务对象来存放后续从任务队列中取出的任务
std::function<void()> task;
//开始对任务队列进行迭代
auto it = m_tasks_queue.begin();
while(it != m_tasks_queue.end()){
//将任务从任务队列中取出
task = *it;
//将取出的任务从任务队列中擦除
m_tasks_queue.erase(it);
//每次按顺序取出一个,取出后跳出循环
break;
}
//判断是否有取到任务
if(task != nullptr){
//有取到任务则执行任务
task();
}
}
//开启调度器方法
void start(){
std::cout << "Schedule start..." << std::endl;
std::cout << "Init a thread poll" << std::endl;
//根据调度器初始化时指定的线程池大小,来初始化线程池容器的大小
m_thread_poll.resize(m_poll_size);
std::cout << "Schedule run..." << std::endl;
//构建线程池中的每一个线程对象,每个线程都将绑定调度器的run方法,且给每个线程对象指定了名称用于区分
for(size_t i = 0; i < m_poll_size; ++i){
//这里的reset方法是为了复用线程对象节约内存资源
m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));
}
}
//停止调度器方法
void stop(){
//判断任务队列中的任务是否全部被消化
while(!m_tasks_queue.empty()){
for(size_t i = 0; i < m_poll_size; ++i){
m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));
}
}
//如果任务队列中的任务全部被消化了,那么就在这等待每个子线程的结束
for(size_t i = 0; i < m_poll_size; ++i){
m_thread_poll[i]->join();
}
std::cout << "Schedule stop..." << std::endl;
}
//接受调度任务方法
void schedule(std::function<void()> task){
std::cout << "Push task into queue " << &task << std::endl;
//将任务对象依次存放到调度器的任务队列中
m_tasks_queue.push_back(task);
}
private:
//由于是多线程环境下所以提供锁
MutexType m_mutex;
//线程池的大小
size_t m_poll_size = 0;
//线程池
std::vector<sylar::Thread::ptr> m_thread_poll;
//任务队列
std::list<std::function<void()> > m_tasks_queue;
};
int main(){
std::cout << "====Schedule====" << std::endl;
std::cout << "====Main start====" << std::endl;
{
//记录当前时间
auto start = std::chrono::high_resolution_clock::now();
//构建调度器,指定线程池中线程数量
Schedule sc(2);
//依次将要执行的任务塞入调度器
sc.schedule(std::bind(&sing));
sc.schedule(std::bind(&dance));
sc.schedule(std::bind(&rap));
//执行调度器
sc.start();
//停止调度器
sc.stop();
//记录结束时间
auto end = std::chrono::high_resolution_clock::now();
//计算花费的时间
std::chrono::duration<double,std::milli> elapsed = end - start;
std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;
}
std::cout << "====Main end====" << std::endl;
return 0;
}
以下是输出:
===Schedule===
===Main start====
Push task into queue 0x7ffd602ffb00
Push task into queue 0x7ffd602ffb40
Push task into queue 0x7ffd602ffb80
Schedule start...
Init a thread poll
Schedule run...
THREAD_1 sing~ 100000000
THREAD_2 dance~ 200000000
THREAD_1 rap~ 300000000
Schedule stop...
Elapsed: 234
===Main end====
可以看到以上的代码使用了调度器,代码的编写会很舒服,以下是使用调度器代码的部分:
//构建调度器,指定线程池中线程数量
Schedule sc(2);
//依次将要执行的任务塞入调度器
sc.schedule(std::bind(&sing));
sc.schedule(std::bind(&dance));
sc.schedule(std::bind(&rap));
//执行调度器
sc.start();
//停止调度器
sc.stop();
这样一来,无论是有多少任务需要执行,程序员都不需要关系如何创建线程和如何进行调度了,
只需要把所要执行的任务全部一股脑的交给调度器就行了。
线程也不需要自己来维护了,只需要告诉调度器需要多少个线程就行。
只要调度器完善的够好,程序员就只需要专心处理业务问题就好了。
看到这里相信你对调度器的功能和必要性有了一定的了解,那么我们来继续优化这个调度器,让它更健硕。
二、考虑多线程下的多个调度器问题【线程级别的调度器】
- 由于上边代码调度器Schedule类可以在其他任意线程中构建与执行,所以需要考虑多线程下调度器的区分问题。
- 由于这里为了由浅入深,所以没有使用协程,那么调度器就必须占用一个线程来执行。
接下来的场景会变成:一个调度器线程和多个其他的线程和一系列的函数任务。
所以需要使用线程局部变量来控制一个线程只存在一个调度器
当然这个判断需要将调度器的this存储起来做比较
以下是控制一个线程只存在一个调度器的核心逻辑
static thread_local Schedule* t_schedule = nullptr;
Schedule(size_t size = 1)
:m_poll_size(size){
//断言当前线程是否不存在调度器,如果已存在那么报错
SYLAR_ASSERT2(GetThis()==nullptr,"one thread one schedule");
//设置当前线程上的调度器为当前调度器
setThis();
}
~Schedule(){
//如果当前调度器就是对应线程上的调度器那么将线程上的调度器清空
if(GetThis()==this){
t_schedule = nullptr;
}
}
static Schedule* GetThis(){
return t_schedule;
}
void setThis(){
t_schedule = this;
}
例子(完整代码):
#include <chrono>
#include <iostream>
#include <pthread.h>
#include <vector>
#include <list>
#include <unistd.h>
#include <functional>
#include "sylar/log.h"
#include "sylar/thread.h"
#include "sylar/mutex.h"
#include "sylar/macro.h"
int num = 0;
uint64_t loop_times = 100000000;
void sing(){
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout << sylar::Thread::GetName() << " sing~ " << num << std::endl;
}
void dance(){
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout <<sylar::Thread::GetName() << " dance~ " << num << std::endl;
}
void rap(){
for(size_t i = 0; i < loop_times; ++i){
++num;
}
std::cout <<sylar::Thread::GetName() << " rap~ " << num << std::endl;
}
class Schedule;
static thread_local Schedule* t_schedule = nullptr;
class Schedule {
public:
typedef sylar::Mutex MutexType;
Schedule(size_t size = 1)
:m_poll_size(size){
SYLAR_ASSERT2(GetThis()==nullptr,"one thread one schedule");
setThis();
m_callerThread = sylar::Thread::GetThis();
}
~Schedule(){
if(GetThis()==this){
t_schedule = nullptr;
}
}
static Schedule* GetThis(){
return t_schedule;
}
void setThis(){
t_schedule = this;
}
void run(){
MutexType::Lock lock(m_mutex);
std::function<void()> task;
auto it = m_tasks_queue.begin();
while(it != m_tasks_queue.end()){
task = *it;
m_tasks_queue.erase(it);
break;
}
if(task != nullptr){
task();
}
}
void start(){
std::cout << "Schedule start..." << std::endl;
std::cout << "Init a thread poll" << std::endl;
m_thread_poll.resize(m_poll_size);
std::cout << "Schedule run..." << std::endl;
for(size_t i = 0; i < m_poll_size; ++i){
m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));
}
}
void stop(){
while(!m_tasks_queue.empty()){
for(size_t i = 0; i < m_poll_size; ++i){
m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));
}
}
for(size_t i = 0; i < m_poll_size; ++i){
m_thread_poll[i]->join();
}
std::cout << "Schedule stop..." << std::endl;
}
void schedule(std::function<void()> task){
std::cout << "Push task into queue " << &task << std::endl;
m_tasks_queue.push_back(task);
}
private:
MutexType m_mutex;
size_t m_poll_size = 0;
std::vector<sylar::Thread::ptr> m_thread_poll;
std::list<std::function<void()> > m_tasks_queue;
sylar::Thread* m_callerThread = 0;
};
//第二个调度线程
void subThread(){
std::cout << "Sub Schedule Thread" << std::endl;
Schedule sc(1);
sc.schedule(std::bind(&rap));
sc.start();
sc.stop();
}
int main(){
std::cout << "====Schedule====" << std::endl;
std::cout << "====Main start====" << std::endl;
{
auto start = std::chrono::high_resolution_clock::now();
Schedule sc(2);
sc.schedule(std::bind(&subThread));
sc.schedule(std::bind(&dance));
// sc.schedule(std::bind(&rap));
sc.start();
sc.stop();
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double,std::milli> elapsed = end - start;
std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;
}
std::cout << "====Main end====" << std::endl;
return 0;
}
可以看到,调度器可以在任何一个线程中使用,互不影响。
三、加入协程的概念【协程级别的调度器】
一旦加入协程的概念,那么调度器的复杂度就会上升一个层次。
首先要理清楚有哪些东西参与到这个调度器中。
其次要统一称呼,视频中就是很多称呼不明确导致看不懂。
需要理清楚以下几个点:
- 加入协程的概念后,调度器就不需要占用一个线程来专门做调度,而是退居到某个线程下的 协程 中去。
- 一旦退居到某个协程中,那么该调度器所在的协程我们叫做 【调度协程】。
- 【调度协程】 所在的线程我称它为 【调度线程】。
- 在一个线程中会有一个 【主协程】 ,要记住 【调度协程】 可以是 【主协程】 也可能不是(调度线程是主线程时必然不是)。
- 在多线程中会有一个 【主线程】,要记住 【调度线程】 可以是 【主线程】 也可以不是。
- 调度任务会变成两种类型:【函数任务】、【协程任务】,这里可以把他们包装到一起,统称 【任务】。
- 【函数任务】最终可以包装成【协程任务】,这样统一调用方式。
- 我们封装的协程对象会有很多状态,所以在调度器中需要有比较复杂的判断(这个需要看代码了)。
四、总结
1.要看懂协程调度必须拆分着看,辩证的看。
2.由于协程调度的IO协程篇还没看完,所以存在几个遗留方法,这样无法看到全貌,可以理解大概流程后继续。
3.作者代码中确实有不足的地方,需要先内心赞同他的一切做法,当自己完全看懂后再做修改(比如Fiber中有Schedule的耦合代码)。
4.本人还没完全看完协程调度相关的视频,在看完IO协程调度后可能会回过头来修改这部分的博客内容。