【Linux系统化学习】生产者消费者模型(阻塞队列和环形队列)

目录

生产者消费者模型

什么是生产者消费者模型

为什么要使用生产者消费者模型

生产者消费者模型的优点

为什么生产者和生产者要互斥?

为什么消费者和消费者要互斥?

为什么生产者和消费者既是互斥又是同步?

基于BlockingQueue的生产者消费者模型 

BlockingQueue

阻塞队列实现

使用阻塞队列实现单生产单消费模型

POSIX信号量

信号量的PV操作

信号量的操作

初始化信号量

销毁信号量

等待信号量(P操作)

 发布信号量(V操作)

基于环形队列的生产者消费者模型

环形队列实现

 使用环形队列实现单生产单消费模型


生产者消费者模型

什么是生产者消费者模型

生产者消费者模型是一种用于解决多线程或多进程间协作的经典问题。在这个模型中,有两种角色:生产者和消费者。生产者负责生成数据,并将其放入共享的缓冲区中,而消费者则负责从缓冲区中取出数据进行处理。这种模型的目标是保持生产者和消费者之间的同步,以避免生产者试图向已满的缓冲区添加数据,或消费者试图从空缓冲区中获取数据。

为什么要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型的优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

总结:

  • 三种关系
  1.         生产者和生产者(互斥)
  2.         消费者和消费者(互斥)
  3.         消费者和生产者 (互斥&&同步)
  • 两种角色
  1.         生产者
  2.         消费者
  • 一个交易场所——内存空间 

为什么生产者和生产者要互斥?

我们可以这样来理解:当含有很多个生产者的时候,第一个生产者在第一个内存中放一个数据,第二个生产者也在第一个内存中放一个数据,覆盖第一个生产者在内存中的数据,两个生产者会产生竞争关系;那么会造成第一个内存中数据的不一致性;所以要互斥。

为什么消费者和消费者要互斥?

当只含有一个数据时候,来了两个消费者,这两个消费者也是竞争关系;因此要互斥; 

为什么生产者和消费者既是互斥又是同步?

当一个生产者在一个内存中放一个数据,消费者瞬间就会拿取这两个数据,那么生产者到底放没放数据?两者竞争关系;当内存中没数据的时候,消费者是拿不到数据的,因此要生产者放入数据消费者才可以拿取数据,具有一定的顺序性,因此又是同步的。


基于BlockingQueue的生产者消费者模型 

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

阻塞队列实现

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
const int defauletcap = 5;
template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defauletcap) : _capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }
    bool IsFull()
    {
        return _q.size() == _capacity;
    }
    void Push(const T &in)
    {
        pthread_mutex_lock(&_mutex); // 每个关系都是互斥的
        while (IsFull())
        {
            // 满了
            // 阻塞等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        _q.push(in);
        // 有数据了通知消费者取数据
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }
    bool IsEmpty()
    {
        return _q.size() == 0;
    }
    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        while (IsEmpty())
        {
            // 空了
            // 阻塞等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        // 取了一个数据通知生产者生产数据
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

private:
    std::queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex; // 每个关系都是互斥的,同一时刻只能有一个线程访问数据
    pthread_cond_t _p_cond; // 生产者的环境变量
    pthread_cond_t _c_cond; // 消费者的环境变量
};

使用阻塞队列实现单生产单消费模型

#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include"BlockQueue.hpp"
void* consumer(void* args)
{
    
    BlockQueue<int> *bp = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        //消费者先休息,生产者瞬间打满队列
        sleep(1);
        int data=0;
        bp->Pop(&data);
        std::cout<<"consumer data:"<<data<<std::endl;
    }
    return nullptr;
}
void* productor(void* args)
{
    BlockQueue<int> *bp = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        int data = rand()%10+1;
        bp->Push(data);
        std::cout<<"productor data:"<<data<<std::endl;
    }
    return nullptr;
}
int main()
{
    srand(uint16_t(time(nullptr)^getpid()*pthread_self()));
    BlockQueue<int> *bp = new BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bp);
    pthread_create(&p,nullptr,productor,bp);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    return 0;
}

 

注:

  • 生产者和消费者谁先调度我们不清楚,是由调度算法和CPU来决定;因为消费者线程先运行由于队列中没有数据会被阻塞。但是可以确定的是一定是生产者进入阻塞队列填充数据;有了数据以后消费者才可以从阻塞队列中取数据。 
  • 生产者消费者模型的高效并不体现在交易场所,因为交易产所不论是生产者还是消费者只能有一个线程进入,高效率而是体现在生产数据并发和获得处理数据并发;

上面的代码只需要创建一个生产者数组和消费者数组即可实现多生产者多消费者模型


POSIX信号量

当我们在上面使用阻塞队列实现生产者消费者模型的时候,虽然是多生产者但是在内存中放数据的时候只能有一个线程必须先申请锁最后在释放锁,因此每个线程放数据是串行的。这样会降低效率,我们可以使用信号量解决这个问题。

  • 信号量本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理

  • 每个执行流在进入临界区之前都应先申请信号量,申请成功就有了操作临界资源的权限,当操作完毕后就应该释放信号量

信号量的PV操作

信号量的P操作:将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一

信号量的V操作:释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一

注:

  • 信号量的PV操作时原子的
  • 信号量申请失败会被挂起等待
  • 信号量并不仅仅是一个计数器,还包括一个等待队列

信号量的操作

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值 

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量(P操作)

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

 发布信号量(V操作)

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

基于环形队列的生产者消费者模型

环形队列采用数组模拟,用模运算来模拟环状特性

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

 

但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。

总结:

  • 当消费者阻塞的时候,生产者疯狂生产时,生产者不能把消费者套圈。
  • 当生产者阻塞的时候,消费者疯狂消费时,消费者不能超过生产者,必须在生产者的后面。
  • 生产者和消费者,只有两种情况会指向同一个位置,其他情况根本就不会指向同一个位置。
  1. 为空(只能让生产者跑)
  2. 为满(只能让消费者跑)
  • 空间资源不够时,生产者不在生产;数据资源不够时,消费者不在消费。

上面的是单生产和单消费,不能直接改为多生产和多消费时,因为读写位置的坐标只有一个。为了保证这一点,我们可以加锁。

环形队列实现

#pragma noce
#include <iostream>
#include <vector>
#include <semaphore.h>
using namespace std;
const int defaultsize = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }

public:
    RingQueue(int size = defaultsize)
        : _ringqueue(size), _size(size), _p_step(0), _c_step(0)
    {
        sem_init(&_space_sem, 0, size);
        sem_init(&_data_sem, 0, 0);

        pthread_mutex_init(&_p_lock,nullptr);
        pthread_mutex_init(&_c_lock,nullptr);
    }
    void Push(const T &in)
    {
        // 生产
        // P操作
        // 申请空间资源
        P(_space_sem);
        //先申请信号量,再申请锁
        pthread_mutex_lock(&_p_lock);
        {
            _ringqueue[_p_step] = in;
            _p_step++;
            _p_step %= _size; // 防止越界
        }
        pthread_mutex_unlock(&_p_lock);
        V(_data_sem); // 多生产一个数据
    }
    void Pop(T *out)
    {
        // 消费
        // P操作
        // 申请数据资源

        P(_data_sem);
        pthread_mutex_lock(&_c_lock);
        {
            *out = _ringqueue[_c_step];
            _c_step++;
            _c_step %= _size; // 防止越界
        }
        pthread_mutex_unlock(&_c_lock);
        V(_space_sem); // 多释放了一个空间
    }
    ~RingQueue()
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_p_lock);
        pthread_mutex_destroy(&_c_lock);
    }

private:
    std::vector<T> _ringqueue;
    int _size;
    int _p_step; // 生产者的位置
    int _c_step; // 消费者的位置

    sem_t _space_sem; // 空间信号量
    sem_t _data_sem;  // 数据信号量

    pthread_mutex_t _p_lock;
    pthread_mutex_t _c_lock;
};

 使用环形队列实现单生产单消费模型

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include "RingQueue.hpp"
using namespace std;
void *Producter(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    int cnt = 100;
    while (true)
    {
        rq->Push(cnt);
        cout << "product done, data is :" << cnt << endl;
        cnt--;
    }
}
void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        sleep(1);
        int data = 0;
        rq->Pop(&data);
        cout << "consumer done , data is :" << data << endl;
    }
}
int main()
{
    pthread_t c, p;
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_create(&p, nullptr, Producter, rq);
    pthread_create(&c, nullptr, Consumer, rq);
    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    return 0;
}

今天对生产者消费者模型的分享到这就结束了,希望大家读完后有很大的收获,也可以在评论区点评文章中的内容和分享自己的看法;个人主页还有很多精彩的内容。您三连的支持就是我前进的动力,感谢大家的支持!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579234.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

将数组中最大的数放在最后一位,最小的数放在第一位

#include <stdio.h> int main() {void input(int number[]);void output(int number[]);void swapmaxmin(int number[]);int number[10];input(number);//swapmaxmin(number);output(number);return 0; }//往一个数组里输入 void input(int number[]) {int i;for(i0;i<…

Bert类模型也具备指令遵循能力吗?

深度学习自然语言处理 原创作者&#xff1a;Winnie BERT模型&#xff0c;依托Transformer架构及其大规模预训练&#xff0c;为自然语言处理领域带来了深远的影响。BERT模型架构包含多层双向Transformer编码器&#xff0c;通过这种结构&#xff0c;BERT及其家族成员&#xff0c;…

tensorflow_decision_forests\tensorflow\ops\inference\inference.so not found

恰好有一个帖子提到了py3.10里面的解决方案 pip install --user tensorflow2.11.0My tensorflow version is 2.11.0 and my tensorflow_decision_forests version is 1.2.0 so those should be compatible. I also am using Python version 3.10.11原文链接&#xff1a; http…

R语言高级数据管理

一&#xff0c;数学函数 绝对值函数abs(x) sqrt(x) 开平方根 不小于某个数的最小整数ceiling(x) 不大于某个数的最大整数floor(x) 四舍五入round(x) sin(x) cos(x) log(x) 二&#xff0c;统计函数 求平均值 > x<-c(2,3,4,5,6,7,8,9,10) > mean(x) 求和 &g…

Entity Framework6 Oracle 官网开发说明

Entity Framework, LINQ and Model-First for the Oracle Database

SAP-ERP TM运输管理模块详解-1

简介 SAP中的运输功能(即TM模块,属于SD的子模块)是后勤执行的一部分,用于自动计算交货成本。也就是说,SAP可以让系统自动对销售发货的商品计算运费,对于运费占这个成本很大比重的销售模式,可以使用该功能。运输功能相对于SD其他模块,相对比较独立的,应用面不是很广。其…

01.JAVAEE初阶之计算机如何工作

1.一台机器如何组成 冯诺依曼体系 CPU 中央处理器: 进行算术运算和逻辑判断.存储器: 分为外存和内存, 用于存储数据(使用二进制方式存储)输入设备: 用户给计算机发号施令的设备.输出设备: 计算机个用户汇报结果的设备. 针对存储空间 硬盘 > 内存 >> CPU针对数据访问…

Xline中区间树实现小结

Table of Contents 实现区间树的起因区间树实现简介 插入/删除查询重叠操作使用Safe Rust实现区间树 问题Rc<RefCell<T>> i. 线程安全问题其他智能指针 i. Arc<Mutex<T>>? ii. QCell数组模拟指针总结 01、实现区间树的起因 在Xline最近的一次重构中…

速卖通自养号测评:如何规避安全风险?

对于初涉电商领域的新卖家而言&#xff0c;进行销量测评显得尤为关键。由于速卖通新店铺往往难以获得平台活动的支持&#xff0c;流量也相对匮乏&#xff0c;因此&#xff0c;开店的首要任务便是进行测评&#xff0c;通过积累一定的评论和销售数据。 测评的益处颇多&#xff0…

生成完美口型同步的 AI 代言人视频(及其实现原理详解)

目录 什么是Heygen? Heygen注册 Video Translation&#xff08;视频翻译 完美口型同步&#xff09; 实现原理详解 视频翻译部分 完美口型同步部分 什么是Heygen? Heygen是一款在线工具&#xff0c;可帮助您生成具有完美口型同步的 AI 代言人视频。 Heygen注册 https:…

网络安全实训Day23

网络空间安全实训-渗透测试 文件上传攻击 定义 将Webshell文件上传到网站服务器上&#xff0c;从而获得网站整台服务器控制权限的攻击方式 Webshell 一种以网页形式存在的命令行执行环境&#xff0c;又称网页木马 分类 一句话木马 只有一行代码&#xff0c;功能强大&#xff…

(bevfusion:多模态融合)报错:AttributeError: module ‘numpy‘ has no attribute ‘long‘

解决办法1&#xff1a;降低numpy版本&#xff08;我的报错版本是1.24.4&#xff09; pip install numpy1.20.3解决办法2&#xff1a;或者将np.long改为np.int64 (由于我的报错在环境内部&#xff0c;不好修改&#xff0c;所以直接降低的numpy版本)

Java中的StringBuilder

为什么要用StringBuilder StringBuilder是一个可变的字符串类&#xff08;StringBuilder对象中的内容可变&#xff09; 为什么不用String拼接呢&#xff1f; 因为拼接字符串会造成前两个字符串的空间浪费 package dayhou40.day44; ​ public class test {public static voi…

Java线程池让使用线程变得更加高效

使用一个线程需要经过创建、运行、销毁三大步骤&#xff0c;如果业务系统每个线程都要经历这个过程&#xff0c;那势必带来过多不必要的资源消耗。线程池就是为了解决这个问题而生&#xff0c;需要时就从池中拿取&#xff0c;使用完毕就放回去&#xff0c;池化思想通过复用对象…

SpringBoot---------Hutool

第一步&#xff1a;引入依赖 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-parent</artifactId><version>5.7.17</version></dependency> 第二步&#xff1a;各种用法 ①生成随机数 //生成验证码 String s …

游戏新手村21:再谈游戏广告页面设计

前文我们说到了网页游戏的LandingPage页面设计中需要遵循的一些规范和注意事项&#xff0c;本章我们重点谈下网络游戏的广告页面设计。 之前在金山的时候&#xff0c;大家习惯或者喜欢称LandingPage为分流页&#xff0c;这个页面需要加入哪些游戏信息才能在短时间内俘获玩家的…

cJSON的使用

文章目录 一、CJSON初识二、CJSON解析器基础三、CJSON解析数据JSON解析基础CJSON解析数组数据CJSON解析嵌套数据 五、创建JSON数据 一、CJSON初识 JSON (JavaScript Object Notation)是一种轻量级的数据交换格式&#xff0c;常用于在网络之间传输数据。它是一种文本格式&#…

Linux计划任务书以及定时任务的编写

一、程序可以通过两种方式执行&#xff1a; 手动执行利用调度任务&#xff0c;依据一定的条件自动执行 自动执行可通过一下两个命令来实现: &#xff08;1&#xff09;At &#xff08;单一工作调度&#xff09; &#xff08;2&#xff09;Cron &#xff08;循环工作调度&a…

微信小程序的开发

1.了解项目的基本组成结构 pages 用来存放所有小程序的页面 utils 用来存放工具性质的模块(例如:格式化时间的自定义模块) app.js 小程序项目的入口文件 app.json 小程序项目的全局配置文件 app.wxss 小程序项目的全局样式文件 project.config.json 项目的配置文件 sitem…

(GEE)2000-2020年黄河流域时序渐变图及高程模型计算 JavaScript版

文章目录 一. 选取目标区域二. NDVI实现三. 高程模型DEM实现四. 时序图五. 植被覆盖类型六. 参考文献 首先推荐吴秋生老师团队开源的便捷构建网站&#xff1a;适用于地理空间应用的Streamlight 吴秋生老师团队的工具请自行探索。本文讲解基于GEE云开发平台实现&#xff0c;基于…
最新文章