首页 > 编程学习 > 生产者、消费者模式

生产者、消费者模式

发布时间:2022/8/26 14:36:13

是什么

生产者消费者模式(生产者消费者模式)是经典的线程同步案例,也称为有限缓冲问题。
生产者产生数据,但是数据不能超出缓冲区的限制,当缓冲区满时,停止生产。
消费者消费生产者产生的数据,当缓冲区为空时,停止消费。

能干什么

生产者消费者模式时一种设计思想,并不是一种固定的写法。就像23种设计模式一样,它所传递的解决问题的思路(思想、思维)。

例如,消息队列。消息队列就使用了生产者消费者模式的思想,
线程A(或系统A,生产者)产生消息,将消息入队,其他的线程(消费者)从消息队列中取出消息,并响应。

怎么干

为了方便展示生产者和消费之间的关系,特将队列换成栈去实现,因为入栈和出栈在同一端完成,可以根据出栈的顺序观察到生产者和消费者之间的关系。
以函数指针来表示一种消息。

std::stack<int (*)(const int x)> product_stk;

生产者pdoductor_fun的实现,在进入临界区时加锁,当栈(缓冲区)等于栈的预设大小时阻塞当前线程,wait等待消费者唤醒;否则生产消息,notify_all通知消费者可以消费。
用随机数表示产生的不同的消息(调用不同的函数)

void pdoductor_fun() {
    srand(time(NULL));
    while (1) {
        {
            std::unique_lock<std::mutex> ulk(mtx);
            while (product_stk.size() >= 20) {
                cond_full_var.wait(ulk);
            }
            if (rand() % 2 == 1) {
                product_stk.push(foo1);
            } else {
                product_stk.push(foo2);
            }
            ++global_num;
            cond_null_var.notify_all();
            std::cout << product_stk.size() << std::endl;
        }
        sleep(1);
    }
    return;
}

消费者cunsumer_fun当栈(缓冲区)空时,wait阻塞消费者线程,等待生产者唤醒;否则,取出栈中的消息,notify_all通知生产者可以生产。

void cunsumer_fun() {
    int (*ptr_fun)(const int x);
    int x;
    while (1) {
        {
            std::unique_lock<std::mutex> ulk(mtx);
            // lock_guard<std::mutex> lg(m);
            while (product_stk.empty()) {
                cond_null_var.wait(ulk);
            }
            x = global_num;
            ptr_fun = product_stk.top();
            product_stk.pop();
            cond_full_var.notify_all();
        }
        ptr_fun(x);
        sleep(3);
    }
    return;
}

完整代码如下,用sleep来控制消息产生的速度和消费的速度。虽然示例程序中有一个生产者、两个消费者,但消息产生的速度仍大于消费,可以观察出当栈满时消息不在产生。
注:linux下sleep的单位是秒s,windows下sleep的单位时毫秒ms。

#include <iostream>
#include <condition_variable>
#include <thread>
// #include <queue>
#include <stack>
#include <stdlib.h>
#include <unistd.h>
#include <mutex>

using namespace std;

int foo1(const int x);
int foo2(const int x);
void pdoductor_fun();
void cunsumer_fun();

const int STK_MAX_SIZE = 20;
std::stack<int (*)(const int x)> product_stk;
mutex mtx;
int global_num = 0;
std::condition_variable cond_null_var, cond_full_var;

int main() {

    std::thread productor_thread(pdoductor_fun);

    // std::cout << productor_thread.get_id() << std::endl;

    std::thread consumer_thread_1(cunsumer_fun);
    std::thread consumer_thread_2(cunsumer_fun);

    consumer_thread_1.join();
    consumer_thread_2.join();
    productor_thread.join();
    return 0;
}

void cunsumer_fun() {
    int (*ptr_fun)(const int x);
    int x;
    while (1) {
        {
            std::unique_lock<std::mutex> ulk(mtx);
            // lock_guard<std::mutex> lg(m);
            while (product_stk.empty()) {
                cond_null_var.wait(ulk);
            }
            x = global_num;
            ptr_fun = product_stk.top();
            product_stk.pop();
            cond_full_var.notify_all();
        }
        ptr_fun(x);
        sleep(3);
    }
    return;
}

void pdoductor_fun() {
    srand(time(NULL));
    while (1) {
        {
            std::unique_lock<std::mutex> ulk(mtx);
            while (product_stk.size() >= 20) {
                cond_full_var.wait(ulk);
            }
            if (rand() % 2 == 1) {
                product_stk.push(foo1);
            } else {
                product_stk.push(foo2);
            }
            ++global_num;
            cond_null_var.notify_all();
            std::cout << product_stk.size() << std::endl;
        }
        sleep(1);
    }
    return;
}

int foo1(const int x) {
    std::cout << "foo1 " << x << std::endl;
    return 1;
}

int foo2(const int x) {
    std::cout << "foo2 " << -x << std::endl;
    return 2;
}

cmake版本 3.16,源代码文件放在项目目录src文件夹下。CMakeLists.txt如下

cmake_minimum_required(VERSION 3.16)

project(product_consumer)

set(CXX_STANDARD c++11)
set(CMAKE_CXX_STANDARD_REQUIRED True)

# set(THREADS_PREFER_PTHREAD_FLAG ON)

find_package(Threads)

file(GLOB SRC_CC ${PROJECT_SOURCE_DIR}/src/*.cc)
file(GLOB SRC_H ${PROJECT_SOURCE_DIR}/src/*.h)

add_executable(product_consumer ${SRC_CC} ${SRC_H})

target_link_libraries(product_consumer PUBLIC ${CMAKE_THREAD_LIBS_INIT})

写在最后,是关于函数指针的问题。
假设有函数指针

int (*ptr_foo)();

在c中,ptr_foo既可以指向,返回值为int有参的函数,也可以指向返回值int无参的函数。

int foo1() { return 0; }
int foo2(const int x) { return 0; }
int foo3(const int x, char c) { return 0; }

ptr_foo = foo1;  // 正确
ptr_foo = foo1;  // 正确
ptr_foo = foo1;  // 正确

只能指向无参的函数指针应该这样写

int (*ptr_foo)(void);

而在c++中函数指针的参数列表需要和函数的参数列表相匹配

Copyright © 2010-2022 mfbz.cn 版权所有 |关于我们| 联系方式|豫ICP备15888888号