【Linux】生产者消费者模型


目录

一、生产者消费者模型

1、生产者消费者模型的概念

2、生产者、消费者之间的关系

3、生产者和消费者的特点

二、基于BlockingQueue的生产者消费者模型(条件变量控制同步与互斥)

1、一个生产线程和一个消费线程完成的计算任务

1.1BlockQueue.hpp

1.2Task.hpp 

1.3Main.cc 

2、基于生产者消费者模型的生产、消费、存储的多线程代码

2.1BlockQueue.hpp

2.2Task.hpp

2.3Main.cc

3、基于生产者消费者模型的多生产者多消费者的多线程代码

4、生产者消费者模型真的高效吗?

三、基于环形队列的多生产者多消费者模型(信号量控制同步与互斥)

1、RingQueue.hpp

2、Task.hpp 

3、main.cc


一、生产者消费者模型

1、生产者消费者模型的概念

        生产者消费者模型是通过一个中间容器来解决生产者和消费者之间的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列进行通讯。所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列相当于一个缓冲区。

2、生产者、消费者之间的关系

三种生产者、消费者之间的关系:

  • 1、生产者和生产者之间的互斥关系
  • 2、消费者和消费者之间的互斥关系
  • 3、生产者和消费者之间的互斥与同步关系。(互斥:需要保证读写安全;同步:当缓冲区数据满了或空了,能够互相等待和通知)

两种角色:生产者线程和消费者线程

一个交易场所:一段特定结构的缓冲区

3、生产者和消费者的特点

  • 1、生产线程和消费线程进行解耦
  • 2、支持生产和消费的一段时间忙闲不均的问题
  • 3、支持并发,提高效率。例如生产者线程在缓冲区生产函数参数的时候,消费者线程也可以正常运行。这两个线程有原来的串行执行变为并发执行,提高效率。

二、基于BlockingQueue的生产者消费者模型(条件变量控制同步与互斥)

        在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。它与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。

1、一个生产线程和一个消费线程完成的计算任务

代码结果如下:

1.1BlockQueue.hpp

#pragma once
#include <iostream>
#include <queue>
const int gMaxCap=5;
template <class T>
class BlockQueue
{
public:
    BlockQueue(const int capacity=gMaxCap)
    :_capacity(capacity)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)//输入型参数:const &
    {
        pthread_mutex_lock(&_mutex);
        //细节2:充当条件的判断必须是while,不能用if
        //这是因为唤醒的时候存在唤醒异常或伪唤醒的情况
        //需要让线程重新使用IsFull对空间就行判断,确保100%唤醒
        while(IsFull())
        {
            //细节1:
            //该线程被pthread_cond_wait函数挂起后,会自动释放锁。
            //该线程被pthread_cond_signal函数唤醒后,会自动重新获取原来那把锁
            pthread_cond_wait(&_pcond,&_mutex);//因为生产条件不满足,无法生产,此时我们的生产者进行等待
        }
        //走到这里一定没有满
        _q.push(in);
        //刚push了一个数据,可以试着消费者把他取出来(唤醒消费者)
        //细节3:pthread_cond_signal()这个函数,可以放在临界区内部,也可以放在临界区外部
        pthread_cond_signal(&_ccond);//可以设置水位线,满多少就唤醒消费者
        pthread_mutex_unlock(&_mutex);
        //pthread_cond_signal(&_ccond);//也可以放在解锁之后
    }
    void pop(T* out)//输出型参数:*  //输入输出型:&
    {
        pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            pthread_cond_wait(&_ccond,&_mutex);//消费者休眠
        }
        //先把数据处理好,再唤醒消费者
        *out=_q.front();
        _q.pop();
        //走到这里,一定能保证不为空,唤醒生产者进行生产
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool IsEmpty()
    {
        return _q.empty();
    }
    bool IsFull()
    {
        return _q.size()==_capacity;
    }
private:
    std::queue<T> _q;
    int _capacity;//阻塞队列的容量
    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;//消费者对应的条件变量
};

1.2Task.hpp 

#pragma once
#include <iostream>
#include <functional>
class Task
{
    //using func=std::function<int(int,int,char)>;
    typedef std::function<int(int,int,char)> func_t;//函数对象
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callBack(func)
    {}
    std::string operator()()
    {
        int result=_callBack(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d=%d",_x,_op,_y,result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d=?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callBack;//回调函数
};

1.3Main.cc 

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
const std::string oper="+-*/%";
int myMath(int x,int y,char op)
{
    int result=0;
    switch(op)
    {
        case '+':
            result=x+y;
            break;
        case '-':
            result=x-y;
            break;
        case '*':
            result=x*y;
            break;
        case '/':
        {
            if(y==0)
            {
                std::cerr<<"div zero"<<std::endl;
                result=-1;
            }
            else 
                result=x/y;
        }
            break;
        case '%':
        {
            if(y==0)
            {
                std::cerr<<"mod zero"<<std::endl;
                result=-1;
            }
            else 
                result=x%y;
        }
            break;
        default:
        break;
    }
    return result;
}
//消费者
void* consumer(void* args)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
    while(1)
    {
        //消费数据
        Task t;
        bq->pop(&t);
        printf("Consumed task:%s\n",t().c_str());
        //sleep(1);
    }
    return nullptr;
}
//生产者
void* productor(void* args)
{
    BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
    while(1)
    {
        //生产数据
        int x=rand()%10+1;
        int y=rand()%5;
        int operCode=rand()%oper.size();
        Task t(x,y,oper[operCode],myMath);
        bq->push(t);//把任务push进阻塞队列
        printf("Production task:%s\n",t.toTaskString().c_str());
        sleep(1);
    }
    return nullptr;
}
int main()
{
    srand((unsigned int)time(nullptr)^getpid());
    BlockQueue<Task>* bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

2、基于生产者消费者模型的生产、消费、存储的多线程代码

        定义一个结构体BlockQueues用于封装计算任务的阻塞队列和存储任务的阻塞队列;

        生产者线程执行productor函数域中的代码,用于生产运算任务对象CalTask t,并将其push进计算任务的阻塞队列中。

        消费者线程执行consumer函数域中的代码,用于从任务队列中获取CalTask t,并通过CalTask类中的“仿函数”对结果进行计算,同时消费者线程还要将计算结果和Save方法生成一个SaveTask对象,将其存储于存储阻塞队列save_bq中;

        存储线程执行saver函数域中的代码,通过拿到存储任务的阻塞队列save_bq,通过operator()间接调用Save方法,将数据结果追加式存储于文本文件中。

2.1BlockQueue.hpp

#pragma once
#include <iostream>
#include <queue>
const int gMaxCap=5;
template <class T>
class BlockQueue
{
public:
    BlockQueue(const int capacity=gMaxCap)
    :_capacity(capacity)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)//输入型参数:const &
    {
        pthread_mutex_lock(&_mutex);
        //细节2:充当条件的判断必须是while,不能用if
        //这是因为唤醒的时候存在唤醒异常或伪唤醒的情况
        //需要让线程重新使用IsFull对空间就行判断,确保100%唤醒
        while(IsFull())
        {
            //细节1:
            //该线程被pthread_cond_wait函数挂起后,会自动释放锁。
            //该线程被pthread_cond_signal函数唤醒后,会自动重新获取原来那把锁
            pthread_cond_wait(&_pcond,&_mutex);//因为生产条件不满足,无法生产,此时我们的生产者进行等待
        }
        //走到这里一定没有满
        _q.push(in);
        //刚push了一个数据,可以试着消费者把他取出来(唤醒消费者)
        //细节3:pthread_cond_signal()这个函数,可以放在临界区内部,也可以放在临界区外部
        pthread_cond_signal(&_ccond);//可以设置水位线,满多少就唤醒消费者
        pthread_mutex_unlock(&_mutex);
        //pthread_cond_signal(&_ccond);//也可以放在解锁之后
    }
    void pop(T* out)//输出型参数:*  //输入输出型:&
    {
        pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            pthread_cond_wait(&_ccond,&_mutex);//消费者休眠
        }
        //先把数据处理好,再唤醒消费者
        *out=_q.front();
        _q.pop();
        //走到这里,一定能保证不为空,唤醒生产者进行生产
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool IsEmpty()
    {
        return _q.empty();
    }
    bool IsFull()
    {
        return _q.size()==_capacity;
    }
private:
    std::queue<T> _q;
    int _capacity;//阻塞队列的容量
    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;//消费者对应的条件变量
};

2.2Task.hpp

#pragma once
#include <iostream>
#include <functional>
#include <string>
class CalTask
{
    //using func=std::function<int(int,int,char)>;
    typedef std::function<int(int,int,char)> func_t;//函数对象
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callBack(func)
    {}
    std::string operator()()//消费者调用
    {
        int result=_callBack(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d=%d",_x,_op,_y,result);//结果字符串
        return buffer;
    }
    std::string toTaskString()//生产者调用
    {
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d=?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;//加减乘除取模
    func_t _callBack;//回调函数
};

const std::string oper = "+-*/%";
    int myMath(int x, int y, char op)
    {
        int result = 0;
        switch (op)
        {
        case '+':
            result = x + y;
            break;
        case '-':
            result = x - y;
            break;
        case '*':
            result = x * y;
            break;
        case '/':
        {
            if (y == 0)
            {
                std::cerr << "div zero error" << std::endl;
                result = -1;
            }
            else
                result = x / y;
        }
            break;
        case '%':
        {
            if (y == 0)
            {
                std::cerr << "mod zero" << std::endl;
                result = -1;
            }
            else
                result = x % y;
        }
        break;
        default:
            break;
        }
        return result;
    }
class SaveTask
{
    typedef std::function<void(const std::string& )> func_t;
public:
    SaveTask()
    {}
    SaveTask(const std::string& message,func_t func)
    :_message(message)
    ,_func(func)
    {}
    void operator()()
    {
        _func(_message);//Main.cc传入的result
    }
private:
    std::string _message;
    func_t _func;
};
void Save(const std::string& message)
{
    const std::string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(fp==nullptr)
    {
        std::cerr<<"fopen error"<<std::endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
}

2.3Main.cc

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
//阻塞队列类。 C:计算任务,S:存储任务
template <class C, class S>
struct BlockQueues
{
    BlockQueue<C> *c_bq;
    BlockQueue<S> *s_bq;
};
// 生产者跑这个函数,参与生产任务
void *productor(void *args)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->c_bq;//计算任务
    while (1)
    {
        // 生产数据
        int x = rand() % 10 + 1;
        int y = rand() % 5;
        int operCode = rand() % oper.size();
        CalTask t(x, y, oper[operCode], myMath);
        bq->push(t); // 把任务push进阻塞队列
        printf("Productor thread,生产计算任务:%s\n", t.toTaskString().c_str());
        sleep(1);
    }
    return nullptr;
}
// 消费者跑这个函数,参与计算任务和存储任务
void *consumer(void *args)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->c_bq;//计算任务
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->s_bq;//存储任务

    while (1)
    {
        // 消费数据,计算任务
        CalTask t;
        bq->pop(&t);
        std::string result=t();
        printf("Cal thread,完成计算任务:%s...done\n", result.c_str());

        //存储任务
        SaveTask save(result,Save);
        save_bq->push(save);//把save对象push进储存阻塞队列中
        printf("Cal thread,推送存储任务完成\n");
        //sleep(1);
    }
    return nullptr;
}

//储存线程跑这个函数,参与存储任务
void* saver(void* args)
{
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->s_bq;//拿到存储任务
    while(1)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();//调用消费者类中的Save方法
        printf("save thread,保存任务完成...\n");
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid());
    BlockQueues<CalTask, SaveTask> bqs;
    // 两个阻塞队列
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();
    pthread_t c, p,s;//消费者、生产者、保存者线程
    pthread_create(&p, nullptr, productor, &bqs);
    pthread_create(&c, nullptr, consumer, &bqs);
    pthread_create(&s, nullptr,saver ,&bqs);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    pthread_join(s, nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

3、基于生产者消费者模型的多生产者多消费者的多线程代码

#include <iostream>
#include <queue>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
//阻塞队列类。 C:计算任务,S:存储任务
template <class C, class S>
struct BlockQueues
{
    BlockQueue<C> *c_bq;
    BlockQueue<S> *s_bq;
};
// 生产者跑这个函数,参与生产任务
void *productor(void *args)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->c_bq;//计算任务
    while (1)
    {
        // 生产数据
        int x = rand() % 10 + 1;
        int y = rand() % 5;
        int operCode = rand() % oper.size();
        CalTask t(x, y, oper[operCode], myMath);
        bq->push(t); // 把任务push进阻塞队列
        printf("Productor thread,生产计算任务:%s\n", t.toTaskString().c_str());
        sleep(1);
    }
    return nullptr;
}
// 消费者跑这个函数,参与计算任务和存储任务
void *consumer(void *args)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->c_bq;//计算任务
    //BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->s_bq;//存储任务

    while (1)
    {
        // 消费数据,计算任务
        CalTask t;
        bq->pop(&t);
        std::string result=t();
        printf("Cal thread,完成计算任务:%s...done\n", result.c_str());
    }
    return nullptr;
}

//储存线程跑这个函数,参与存储任务
void* saver(void* args)
{
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(args))->s_bq;//拿到存储任务
    while(1)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();//调用消费者类中的Save方法
        printf("save thread,保存任务完成...\n");
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid());
    BlockQueues<CalTask, SaveTask> bqs;
    // 两个阻塞队列
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();
    pthread_t c[2], p[3],s;//消费者、生产者、保存者线程
    pthread_create(c, nullptr, consumer, &bqs);
    pthread_create(c+1, nullptr, consumer, &bqs);

    pthread_create(p, nullptr, productor, &bqs);
    pthread_create(p+1, nullptr, productor, &bqs);
    pthread_create(p+2, nullptr, productor, &bqs);
    //pthread_create(&s, nullptr,saver ,&bqs);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    pthread_join(p[2], nullptr);
    //pthread_join(s, nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

        对Main.cc进行改造,另外两个头文件不变。

        该代码有多个生产者和消费者,通过BlockQueue.hpp中的BlockQueue类实例化出的对象中的push和pop方法,让各生产者和消费者共同去竞争同一把锁,保证在同一时间,只有一个线程抢到锁并执行对应的任务。

4、生产者消费者模型真的高效吗?

        从本段第三节我们看到,大量的生产者、消费者全部在虎视眈眈的争夺同一把锁,也就是说,一次只能放一个线程去阻塞队列中完成任务,那效率不是非常慢?既然如此为何要采用这种模型?

        因为传统的线程运作方式会让大部分线程阻塞在临界区之外,而生产者消费者模型则是将任务的工序拆开,一组线程分为生产者,另一组分为消费者。充分利用了生产者的阻塞时间,用以提前准备好生产资源;同时也利用了消费者计算耗时的问题,让消费者线程将更多的时间花在计算上,而不是抢不到锁造成线程“干等”。

        生产者消费者模型可以在生产前和消费后,让线程并行执行,减少线程阻塞时间。

三、基于环形队列的多生产者多消费者模型(信号量控制同步与互斥)

        环形队列本质是一个数组模拟的首尾相连的队列。

        在环形队列中,只有队列全空或者全满时,生产者和消费者才会站在队列的同一格里。当环形队列全空时,必须生产者线程先生产;当环形队列为满时,必须消费者线程先生产。在其他情况下,生产者和消费者线程可以并发执行,只有在队列为空或满的时候,才有同步与互斥问题。

        我们可以给生产者定义一个信号量,用以表示剩余空间资源;给消费者定义一个信号量,用以表示数据资源。

void Push(const T& in)//向环形队列中push数据
{
    P(_spaceSem);//调用sem_wait,信号量申请成功,未来一定能访问到临界资源

    pthread_mutex_lock(&_pmutex);
    _queue[_productorStep++]=in;
    _productorStep%=_cap;

    pthread_mutex_unlock(&_pmutex);
    V(_dataSem);//生产完毕后,调用V操作,让消费者信号量++。
}

        环形队列中最少有一个线程行动(为空或为满);最多有两个线程行动(生产消费各一个)。那么多生产者多消费者的意义是什么?

        这个意义和上面讲的生产者消费者模型一样,线程在资源生产和消费的过程可能巨花时间,该方式可以让线程利用原先被阻塞的时间,用以生产和消费活动,提升总体效率。

1、RingQueue.hpp

#pragma once
#include <iostream>
#include <semaphore.h>
#include <unistd.h>
#include <vector>
#include <cassert>
#include <pthread.h>
static const int gcap=5;
template <class T>
class RingQueue
{
private:
    void P(sem_t& spaceSem)
    {
        int n=sem_wait(&spaceSem);
        assert(0==n);//这里最好if
        (void)n;
    }
    void V(sem_t& dataSem)
    {
        int n=sem_post(&dataSem);
        assert(0==n);//这里最好if
        (void)n;
    }
public:
    RingQueue(const int& cap=gcap)
    :_queue(cap)//可以这样初始化vector吗?
    ,_cap(cap)
    ,_productorStep(0)
    ,_consumerStep(0)
    {
        int n=sem_init(&_spaceSem,0,_cap);
        assert(0==n);
        n=sem_init(&_dataSem,0,0);
        assert(0==n);
        
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);
    }
    //生产者调用Push
    void Push(const T& in)//向环形队列中push数据
    {
        P(_spaceSem);//调用sem_wait,信号量申请成功,未来一定能访问到临界资源

        pthread_mutex_lock(&_pmutex);
        _queue[_productorStep++]=in;
        _productorStep%=_cap;

        pthread_mutex_unlock(&_pmutex);
        V(_dataSem);//生产完毕后,调用V操作,让消费者信号量++。
    }
    //消费者调用Pop
    void Pop(T* out)//向环形队列中pop数据,out是输出型参数
    {
        P(_dataSem);//确认消费者信号量是否可以--,不能减就阻塞

        pthread_mutex_lock(&_cmutex);
        *out=_queue[_consumerStep++];
        _consumerStep%=_cap;
        
        pthread_mutex_unlock(&_cmutex);
        V(_spaceSem);//消费完毕后,生产者信号量++
    }
    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);

        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    std::vector<T> _queue;//vector模拟队列
    int _cap;//队列的最大容量
    sem_t _spaceSem;//生产者信号量,表明环形队列中剩余空间资源数量
    sem_t _dataSem;//消费者信号量,表明环形队列中存在的数据资源数量
    int _productorStep;//生产者在循环列表的脚步
    int _consumerStep;//消费者在循环列表的脚步
    pthread_mutex_t _pmutex;//生产者锁
    pthread_mutex_t _cmutex;//消费者锁
};

2、Task.hpp 

#pragma once
#include <iostream>
#include <functional>
#include <string>
class Task
{
    //using func=std::function<int(int,int,char)>;
    typedef std::function<int(int,int,char)> func_t;//函数对象
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callBack(func)
    {}
    std::string operator()()//消费者调用
    {
        int result=_callBack(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d=%d",_x,_op,_y,result);//结果字符串
        return buffer;
    }
    std::string toTaskString()//生产者调用
    {
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d=?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;//加减乘除取模
    func_t _callBack;//回调函数
};

const std::string oper = "+-*/%";
int myMath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
    break;
    default:
        break;
    }
    return result;
}

3、main.cc

#include <iostream>
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include "RingQueue.hpp"
#include "Task.hpp"
std::string SelfName()
{
    char name[128];
    snprintf(name,sizeof(name),"thread:0X%x",pthread_self());
    return name;
}
void* ProductorRoutine(void* product)
{
    RingQueue<Task>* rqueue=static_cast<RingQueue<Task>*>(product);
    while(1)
    {
        //模拟构建一个任务
        int x=rand()%1000;
        int y=rand()%2000;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,myMath);//构建任务对象
        rqueue->Push(t);
        std::cout<<SelfName()<<"生产者生产任务成功:"<<t.toTaskString()<<std::endl;
        sleep(1);
    }
}
void* ConsumerRoutine(void* consume)
{
    RingQueue<Task>* rqueue=static_cast<RingQueue<Task>*>(consume);
    while(1)
    {
        Task t;
        rqueue->Pop(&t);
        std::cout<<SelfName()<<"消费者消费任务成功"<<t()<<std::endl;
    }
}
int main()
{
    srand((unsigned int)time(nullptr)^getpid()^pthread_self());


    RingQueue<Task>* rq=new RingQueue<Task>();
    pthread_t p[4],c[7];//定义生产者消费者线程
    for(int i=0;i<4;++i)
        pthread_create(p+i,nullptr,ProductorRoutine,rq);
    for(int i=0;i<7;++i)
        pthread_create(c+i,nullptr,ConsumerRoutine,rq);
    
    for(int i=0;i<4;++i)
        pthread_join(p[i],nullptr);
    for(int i=0;i<7;++i)
        pthread_join(c[i],nullptr);
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/17065.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Kubernetes服务搭建[配置-部署](Kubeadm)

文章目录 **[1 — 7] ** [ 配置K8S主从集群前置准备操作 ]一&#xff1a;主节点操作 查看主机域名->编辑域名1.1 编辑HOST 从节点也做相应操作1.2 从节点操作 查看从节点102域名->编辑域名1.3 从节点操作 查看从节点103域名->编辑域名 二&#xff1a;安装自动填充&…

进程地址空间与页表方面知识点(缺页中断及写时拷贝部分原理)

谢谢阅读&#xff0c;如有错误请大佬留言&#xff01;&#xff01; 目录 谢谢阅读&#xff0c;如有错误请大佬留言&#xff01;&#xff01; 抛出总结 开始介绍 发现问题 进程地址空间&#xff08;虚拟地址&#xff09; 页表 物理内存与进程地址空间映射 缺页中断基本…

Spring--AOP详细介绍--和详细代码演示证明理解

目录 Spring--AOP详细介绍 基本介绍 代码演示—入门 需求说明 定义一个接口类Vehicle 定义一个实现接口类的Car类 定义一个实现接口类的Ship类 创建测试类Test.java 来思考一下&#xff0c; 解决方案-动态代理方式-2 修改 Car类 修改 Ship类 创建VehicleProxyProvid…

Stable Diffusion使用方法

SD的本地安装教程有很多我就不重复了&#xff0c;这里主要是记录我在使用SD Webui的过程中遇到的问题&#xff0c;总结的一些提升出图效率&#xff0c;出好图概率的经验。 先搞几张看看效果 二次元妹妹 高达 &#xff1f; Ok&#xff0c;以上只是一小部分成品 &#xff0c;属…

PyQt5桌面应用开发(6):文件对话框

本文目录 PyQt5桌面应用系列介绍QFileDialog的静态接口QFileDialog的对象接口 示例结论后记 PyQt5桌面应用系列 PyQt5桌面应用开发&#xff08;1&#xff09;&#xff1a;需求分析 PyQt5桌面应用开发&#xff08;2&#xff09;&#xff1a;事件循环 PyQt5桌面应用开发&#xff…

MRI k空间概念整理

以下内容为MRI期末复习笔记&#xff0c;仅供复习参考使用。 K空间概念 K空间为包含MR数据的阵列&#xff0c;也可定义为原始数据阵列相位编码轴和频率编码轴的交叉点 MR扫描得到的数据为谱空间数据&#xff0c;谱空间数据与空间数据位置无直接对应关系 k空间每一数据点或数据…

不能使用chatGPT?这3个平替甚至比chatGPT更强

不能使用chatGPT&#xff1f;这3个平替甚至比chatGPT更强 chatGPT&#xff0c;一款由OpenAI开发的新型AI聊天机器人&#xff0c;正在势如破竹地改变着许多人的工作和生活方式。作为一款基于大语言模型的聊天机器人&#xff0c;chatGPT能够理解自然语言并进行人机对话。与传统的…

用于scATAC-seq有监督分类的Cellcano

细胞类型识别是单细胞数据分析的基本步骤。由于高质量参考数据集的可用性&#xff0c;有监督细胞分类方法在scRNA-seq数据中很受欢迎。染色质可及性分析&#xff08;scATAC-seq&#xff09;的最新技术进步为理解表观遗传异质性带来了新的见解。随着scATAC-seq数据集的不断积累&…

html5地理位置信息介绍, 百度地图使用

文章目录 1. HTML5中地理信息API1.1 Geolocation 接口 2. 在vue中使用百度地图3. 在react中使用百度地图 1. HTML5中地理信息API HTML5 的地理位置 API 可以让你获取用户的地理位置信息&#xff0c;并将其用于许多不同的应用场景&#xff0c;例如&#xff1a; 在地图上显示用…

钴基双金属氧化物储能材料的高效制备和电化学应用

一、引言 钴金属氧化物作为一类典型的储能材料&#xff0c;既可以用于锂离子电池负极材料&#xff0c;又可以用于超级电容器电极材料&#xff0c;因而备受关注 。在作为锂离子电池负极材料时&#xff0c;具有较高的理论比容量&#xff0c;但充放电体积变化较大、材料导电性较差…

爬虫为什么需要ip

爬虫需要使用爬虫ip主要是为了解决以下问题&#xff1a; 1、反爬虫机制&#xff1a;许多网站会设置反爬虫机制来防止爬虫程序的访问&#xff0c;例如限制IP地址的访问频率、检测访问来源等。使用爬虫ip可以绕过这些限制&#xff0c;使得爬虫程序更难被检测到。 2、访问限制&a…

浅拷贝和深拷贝

浅拷贝&#xff1a; 定义&#xff1a;浅拷贝&#xff08;Shallow Copy&#xff09;是一种简单的对象复制方式&#xff0c;将一个对象的数据成员直接复制给另一个对象&#xff08;通常是通过默认的复制构造函数或赋值运算符实现&#xff09;&#xff0c;这些数据成员可以是基本…

JavaScript:字符串

文章目录 字符串344. 反转字符串reverse() 方法&#xff08;打基础的时候&#xff0c;不要太迷恋库函数&#xff09;代码及思路 541. 反转字符串 IIJavaScript String split() 方法JavaScript Array join() 方法代码分析见注释 剑指 Offer 05. 替换空格思路注意&#xff1a;上面…

网络基础学习:什么是网络与网络发展史

什么是网络与网络发展史 什么是网络&#xff1f;什么是网络发展史&#xff1f;分组交换技术TCP/IP技术Web技术ARPANET&#xff08;1969年&#xff09;Internet&#xff08;1983年&#xff09;万维网&#xff08;1990年&#xff09;移动互联网&#xff08;2007年&#xff09;物联…

KDGK-F断路器机械特性测试仪

一、产品概述 KDGK-F 断路器机械特性测试仪可用于各电压等级的真空、六氟化硫、少油、多油等电力系统高压开关的机械特性参数测试与测量。测量数据稳定&#xff0c;抗干扰性强&#xff0c;可在500KV等级及以下电站做实验&#xff0c;接线方便&#xff0c;操作简单&#xff0c;是…

第14章 项目采购管理

文章目录 采购管理包括如下几个过程14.2 编制采购计划 462编制采购计划的输出1&#xff09;采购管理计划2&#xff09;采购工作说明书3&#xff09;采购文件 14.2.3 工作说明书&#xff08;SOW&#xff09; 14.3 实施采购 47414.3.2 实施采购的方法和技术 476&#xff08;1&…

No.054<软考>《(高项)备考大全》【冲刺8】《软考之 119个工具 (6)》

《软考之 119个工具 &#xff08;6&#xff09;》 99.应急应对策略:100.风险在评估:101.风险审计:102.偏差和趋势分析:103.技术绩效测量:104.自制或外购分析:105.市场调研:106.投标人会议:107.建议书评价技术:108.独立核算:109.广告:110.采购谈判:111.合同变更控制系统:112.采购…

ArduPilot之GPS Glitch问题M8N模块配置

ArduPilot之GPS Glitch问题&M8N模块配置 1. 源由2. 现象3. 视频分析3.1 配置&#xff08;不理想&#xff09;3.2 配置优化3.3 优化配置短时间3D LockGlitch3.4 优化配置长时间3D DGPS Lock3.5 使用尽量多的卫星系统3.5.1 配置一3.5.2 配置二 3.6 同一时间段&#xff08;M8N…

3.3 泰勒公式例题分析

例1 写出函数f(x)带有拉格朗日余项的n阶麦克劳林公式 我的答案&#xff1a; 一、信息 1.f(x)的表达式 2.目标求这个f(x)的n阶麦克劳林公式 二、分析 条件1&#xff1a;告诉我f(x)的表达式为我后续带入公式做准备 条件2&#xff1a;告诉我用什么公式和此次求解的方向 三…

平面设计师都在用的设计素材网站,免费下载~

很多新手设计师不知道去哪里找高清、免费的设计素材&#xff0c;今天我就给大家推荐5个设计素材网站&#xff0c;免费下载&#xff0c;赶紧收藏起来把&#xff01; 1、菜鸟图库 https://www.sucai999.com/?vNTYwNDUx 菜鸟图库是我推荐过很多次的网站&#xff0c;主要是站内素…