多线程之间的通信
通过锁同步共享内容
因为一个进程中的多个线程是贡献进程的资源的,所以多线程可以通过访问进程中的全局变量来通信,为了避免竞争需要加锁。
线程中的通信,因为存在共享的进程资源,所以主要是要进行线程的同步(即各种方式的加锁)。
关于几种常见的锁,写在锁部分。←这也是这篇文章主要记录的内容。
消息
Windows中,可以通过PostThreadMessage()或者SendThreadMessage()来进行线程间通信。这个需要对Windows的消息机制(消息队列、消息循环等)有基本了解。
参考:
使用PostThreadMessage在Win32线程间传递消息
锁
多线程常用的锁
互斥锁
互斥锁,即std::mutex,对于一个资源,同时只能有一个线程进行访问。
当一个线程持有互斥锁的时候,其余需要获取互斥锁的线程会进入睡眠, 不再占用cpu。当持有锁的线程解锁后,所有等待锁的线程全部激活,其中一个线程获得锁后,其余线程再次陷入睡眠。
互斥锁是最容易理解和使用的锁。
示例代码:
std::mutex mtx;
int data = 0;
void _func(int index)
{
char msg[256] = { 0x00 };
//std::lock_guard<std::mutex> lkg(mtx);
mtx.lock();
ZeroMemory(msg, sizeof(msg));
sprintf_s(msg, 256, "%s%d%s", "this is ", std::this_thread::get_id(), " thread in.");
std::cout << msg << std::endl;
data = index;
std::cout << data << std::endl;
ZeroMemory(msg, sizeof(msg));
sprintf_s(msg, 256, "%s%d%s", "this is ", std::this_thread::get_id(), " thread out.");
std::cout << msg << std::endl;
mtx.unlock();
return;
}
int main()
{
std::thread th1(_func,1);
std::thread th2(_func,2);
std::thread th3(_func,3);
th1.join();
th2.join();
th3.join();
return 0;
}
输出结果:
this is 7816 thread in.
3
this is 7816 thread out.
this is 18136 thread in.
1
this is 18136 thread out.
this is 13060 thread in.
2
this is 13060 thread out.
条件变量
条件变量std::condition_variable ,条件变量是和std::mutex 一起使用的。
等于是对互斥锁的升级,当一个持有互斥锁的线程解锁时,其余线程全部激活,去抢占互斥锁。
条件变量多使用于生产者-消费者这样的模型,生产者控制条件变量的notify,可以选择通知任意一个线程,还是通知所有线程。
例如以下代码:
std::mutex mtx;
std::condition_variable cdn;
int data = 0;
bool exit_threads = 0; //消费者函数_func中循环的退出条件
void _func_manager(int index) //生产者函数
{
Sleep(1000);
char msg[256] = { 0x00 };
ZeroMemory(msg, sizeof(msg));
sprintf_s(msg, 256, "%s%d%s", "this is mgr ", std::this_thread::get_id(), " thread in.");
std::cout << msg << std::endl;
int i = 0;
while (i < 10)
{
i++;
cdn.notify_one(); // 不断的激活消费者线程
Sleep(200);//为了避免,消费者线程正在处理,接收不到notity,这里等待一下
}
exit_threads = true; //改变消费者线程循环退出条件
cdn.notify_all(); //通知所有线程,让所有消费者线程进行循环条件检测
ZeroMemory(msg, sizeof(msg));
sprintf_s(msg, 256, "%s%d%s", "this is mgr ", std::this_thread::get_id(), " thread out.");
std::cout << msg << std::endl;
return;
}
void _func(int index) //消费者线程函数
{
char msg_in[256] = { 0x00 };
char msg_out[256] = { 0x00 };
ZeroMemory(msg_in, sizeof(msg_in));
ZeroMemory(msg_out, sizeof(msg_out));
sprintf_s(msg_in, 256, "%s%d%s", "this is ", std::this_thread::get_id(), " thread active.");
sprintf_s(msg_out, 256, "%s%d%s", "this is ", std::this_thread::get_id(), " thread out.");
while (!exit_threads) // 循环,只要循环退出条件没有达到,就会持续循环
{
std::unique_lock<std::mutex> ulk(mtx); // 创建unique_lick对象,unique_lock 生成即加锁
cdn.wait(ulk); //条件变量等待,传入unique_lock对象,对unique_lock对象解锁。
//当条件变量被通知时,再对unique_lock对象加锁。
if (!exit_threads) { // 此时unicque_lock已自动加锁。再次检测循环退出条件
std::cout << msg_in << std::endl;
data++;
std::cout << data << std::endl;
}
} //循环退出,结束线程
std::cout << msg_out << std::endl;
return;
}
int main()
{
std::thread mgr(_func_manager, 0);
std::thread th1(_func, 1);
std::thread th2(_func, 2);
std::thread th3(_func, 3);
mgr.join(); // join,主线程等待子线程结束,再执行join后面的内容。
th1.join();
th2.join();
th3.join();
return 0;
}
条件变量的wait有两个重载。
//MSVC\14.35.32215\include\mutex
void wait(unique_lock<mutex>& _Lck) { // wait for signal
// Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
}
template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test predicate
while (!_Pred()) {
wait(_Lck);
}
}
第一个重载函数,参数只需要一个unique_lock,但是可能存在虚假唤醒的问题,需要在wait 返回后,再通过条件判断,避免虚假唤醒。
第二个重载函数,需要两个参数,第一个是unique_lock,第二个可以传入一个Lambda表达式。当wait被唤醒时,会通过Lambda表达式判断,如果Lambda返回值为false,wait会unlock unique_lock并继续阻塞等待,返回值为true时,wait才继续执行。通过两个参数的重载,可以直接把避免虚假唤醒的判断,添加到wait函数上。
自旋锁
如果使用互斥锁,没有获得锁的线程,会进入休眠,不会占用cpu。
如果使用自旋锁,没有获得锁的线程,不会进入休眠,会不断尝试获取锁,会一直占据cpu。
使用互斥锁时,因为线程会休眠,cpu会切换去其他线程了,所以会有cpu切换的时间。
如果锁定的临界区执行的时间大于线程切换的时间,那使用互斥锁就可以。
如果锁定的临界区执行的事件很小,比线程切换的时间还小,那使用互斥锁所带来的线程切换时间就变成了很大的负担,就应该使用一直占用cpu的自旋锁。
//第是目前为止,我还不知道怎么看什么时候代码的执行时间小于线程切换时间。
自旋锁的实现原理
c++之理解自旋锁
这里提供了一种自旋锁的实现原理。主要是利用原子操作。
自旋锁监控的是锁对象中的atomic bool值flag。当没有加锁时,flag为false,当线程获取锁时,flag为true。
如果线程尝试获取锁,但flag为true, 线程就持续获取flag的状态。
获取flag的状态使用的是系统的CAS接口。
CAS接口一般是这样的,bool CAS(查询的flag,比较的期望值expect,想要设置的值desired);
如果flag和expect不一致,CAS接口返回false,可有用于继续循环,实现自旋。
如果flag和expect一直,把flag设置为desired,并返回true。
其他和锁相关的
读写锁
如果使用互斥锁,只要有一个线程获取锁,无论它对共享资源进行什么操作,别的线程都只能休眠等待。
读写锁,在互斥锁的基础上,区分了读、写操作。当一个线程获取了读取锁时,别的线程也可以获取读取锁;当一个线程获取了写入锁时,其余所有线程都休眠等待。
其中有一个问题就是,如果有线程在进行读操作,想要写入的线程怎么办?
读写锁是一种设计思路,网络上有比较多的示例。
递归锁
递归锁,std::recursive_mutex。
解决了一个线程需要重复获取一个锁,所产生的死锁问题。
使用递归锁,同一个线程,可以重复对一个资源加锁,只要最终释放锁的次数和加锁的次数一致就可以。
参考:
什么时候需要使用递归锁(递归mutex)? - Lion Long的回答 - 知乎
信号量
信号量,Semaphore。
信号量和信号(Signal)不同,信号量相当于多线程中一个共享资源的计数器。
信号量在创建时需要设置一个初始值,表示同时可以有几个任务(线程)可以访问某一块共享资源。
一个任务要想访问共享资源,前提是信号量大于0,当该任务成功获得资源后,将信号量的值减 1;
若当前信号量的值小于 0,表明无法获得信号量,该任务必须被挂起,等待信号量恢复为正值的那一刻;
当任务执行完之后,必须释放信号量,对应操作就是信号量的值加 1。
另外,对信号量的操作(加、减)都是原子的。互斥锁(Mutex)就是信号量初始值为 1 时的特殊情形,即同时只能有一个任务可以访问共享资源区。
来源:https://zhuanlan.zhihu.com/p/512969481
C++11 没有实现信号量(C++20实现的),可以通过系统API使用信号量,或者通过条件变量来模拟信号量(给条件变量增加一个线程计数判断)。
以下通过Windows API实现:
示例一:
#include <winbase.h>
int i = 0;
HANDLE sem_handle;
void _func()
{
char msg[256] = { 0x00 };
//等待信号量信号
WaitForSingleObject(sem_handle, INFINITE);
sprintf_s(msg, 256, "%s%d%s%d", " This is Th:" , std::this_thread::get_id() , " data:" , i);
std::cout << msg << std::endl;
//线程占据信号量后,信号量计数自动减1
//释放时增加信号量计数
ReleaseSemaphore(sem_handle, 1, NULL);
}
int main()
{
//创建一个信号量,只能同时容纳2个线程
sem_handle = CreateSemaphoreA(NULL, 2, 2, "SEMP");
std::thread th1(_func);
std::thread th2(_func);
std::thread th3(_func);
th1.join();
th2.join();
th3.join();
CloseHandle(sem_handle);
}
输出:
This is Th:21748 data:0 This is Th:24000 data:0
This is Th:20240 data:0
线程21748 和 24000 获取了信号量,执行_func,所以没等到21748 的std::endl执行,他们就一起输出了,两个线程的输出连起来了。
线程20240等到前面线程release信号量之后,才获取信号量,所以单独一行输出。
示例二:
#include <winbase.h>
int i = 0;
HANDLE sem_handle;
void _func()
{
char msg[256] = { 0x00 };
WaitForSingleObject(sem_handle, INFINITE);
sprintf_s(msg, 256, "%s%d%s%d", " This is Th:" , std::this_thread::get_id() , " data:" , i);
std::cout << msg << std::endl;
ReleaseSemaphore(sem_handle, 1, NULL);
}
int main()
{
//创建一个信号量,只能同时容纳1个线程
sem_handle = CreateSemaphoreA(NULL, 1, 2, "SEMP");
std::thread th1(_func);
std::thread th2(_func);
std::thread th3(_func);
th1.join();
th2.join();
th3.join();
CloseHandle(sem_handle);
}
输出:
This is Th:16128 data:0
This is Th:21188 data:0
This is Th:5260 data:0
创建了一个信号量,只能容纳一个线程,就相当于互斥锁了,所以3个线程每个线程都占一行输出。
参考:
WINAPI】CreateSemaphore_信号量
原子变量
C++ 提供了原子变量模板
std::atomic,可以把变量例如int、bool等,通过atomic模板作为原子变量使用。
死锁问题
死锁是一个线程获取了锁,但是没有解锁导致的,所有需要获取锁的进程都阻塞,而且无法自行解决的情况。
死锁大概有几种情况:
- 代码中加了锁,但是在return等处忘记解锁。
->可以用lock_guard 或者unique_lock,无需自行解锁 - 一个线程的函数调用中,不同函数都对一个互斥量加锁,会导致一个函数加锁后,调用另一个函数,被调用的函数中也要加锁,形成死锁。
-> 使用递归锁,或者不是使用同一个互斥量加锁。 - 多线程,同时对多个互斥量加锁,例如一个线程先锁mutex1,再锁mutex2,另一个线程,先锁mutex2,再锁mutex1,两个线程形成死锁。
-> 编程时应该避免这种情况,可以使用std::lock(mutex1, mutex2)同时加锁。
参考:
C++ 死锁及解决办法