赞
踩
生产者消费者问题 (英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区已经装满时加入数据,消费者也不会在缓冲区为空时消耗数据。
#include <condition_variable> #include <iostream> #include <mutex> #include <queue> #include <string> #include <thread> #include <vector> constexpr int WORKERTHREAD_COUNT = 3; class ProducerConsumerProblem { private: // 任务队列 std::queue<std::string> m_queue; // mutex std::mutex m_mutex; // condition_variable std::condition_variable m_cv; public: /** * 生产者函数 */ void produce(const int cnt) { printf("(Produce Thread:%lld) start\n", std::this_thread::get_id()); // RAII 自动解锁 std::lock_guard<std::mutex> lock(m_mutex); char buff[1024] = {}; for (int i = 0; i < cnt; i += 1) { sprintf(buff, "Produce[%02d]", i); m_queue.push(buff); } // 唤醒所有消费者 m_cv.notify_all(); // 唤醒任意一个消费者 // m_cv.notify_one(); } /** * 消费者线程 */ void consumer() { printf("(Consumer Thread:%lld) start\n", std::this_thread::get_id()); constexpr int SLEEP_TIME = 5; std::string receiveBuffer; while (1) { // 比lock_guard更细致的控制力度 std::unique_lock<std::mutex> lock(m_mutex); /** * @brief * 如果任务队列为空,则等待 * * 为什么是循环判断? * >条件变量虚假唤醒: * > 消费者线程被唤醒后 * > 缓存队列中没有数据 * * 条件变量 wait(mutex)的作用 * 1. 把互斥锁解开 * 2. 阻塞,等待被唤醒 * 3. 给互斥锁加锁 * 等效写法 * m_cv.wait(lock, [this]() { return !m_queue.empty(); }); */ while (m_queue.empty()) { // 等待生产者唤醒 printf("(Thread:%lld) is waiting ~~~\n", std::this_thread::get_id()); m_cv.wait(lock); } // 此时任务队列有数据,且cv被唤醒 // 消费者处理一个任务 receiveBuffer = m_queue.front(); m_queue.pop(); // 当前消费者线程获得数据,手动解锁 lock.unlock(); // 模拟随机设定一个消费时间 int solveTimeCost = (rand() % SLEEP_TIME); printf("(Thread:%lld) => %s need %d seconds ...\n", std::this_thread::get_id(), receiveBuffer.c_str(), solveTimeCost); std::this_thread::sleep_for(std::chrono::seconds(solveTimeCost)); } } }; int main() { srand(time(0)); ProducerConsumerProblem obj; std::vector<std::thread> thdVec; for (int i = 0; i < WORKERTHREAD_COUNT; i += 1) { // 只能move,放入vector延长对象的生存周期 auto thd = std::thread(&ProducerConsumerProblem::consumer, &obj); thdVec.push_back(std::move(thd)); thdVec[i].detach(); } int cnt; // 通过这里的输入阻塞程序 while (printf(">>Please input produce number:>\n"), std::cin >> cnt) { obj.produce(cnt); } return 0; }
环境
g++ (x86_64-posix-seh-rev0, Built by MinGW-W64 project) 7.3.0
效果
分别输入2 10 3
>>Please input produce number:> (Consumer Thread:2) start (Consumer Thread:3) start (Consumer Thread:4) start (Thread:2) is waiting ~~~ (Thread:3) is waiting ~~~ (Thread:4) is waiting ~~~ 2 (Produce Thread:1) start >>Please input produce number:> (Thread:4) => Produce[00] need 1 seconds ... (Thread:2) => Produce[01] need 1 seconds ... (Thread:3) is waiting ~~~ (Thread:4) is waiting ~~~ (Thread:2) is waiting ~~~ 10 (Produce Thread:1) start >>Please input produce number:> (Thread:2) => Produce[00] need 2 seconds ... (Thread:4) => Produce[01] need 2 seconds ... (Thread:3) => Produce[02] need 1 seconds ... (Thread:3) => Produce[03] need 2 seconds ... (Thread:2) => Produce[04] need 4 seconds ... (Thread:4) => Produce[05] need 4 seconds ... (Thread:3) => Produce[06] need 4 seconds ... (Thread:2) => Produce[07] need 0 seconds ... (Thread:2) => Produce[08] need 4 seconds ... (Thread:4) => Produce[09] need 0 seconds ... (Thread:4) is waiting ~~~ (Thread:3) is waiting ~~~ (Thread:2) is waiting ~~~ 3 (Produce Thread:1) start >>Please input produce number:> (Thread:4) => Produce[00] need 4 seconds ... (Thread:3) => Produce[02] need 0 seconds ... (Thread:3) is waiting ~~~ (Thread:2) => Produce[01] need 4 seconds ... (Thread:4) is waiting ~~~ (Thread:2) is waiting ~~~ ^Z
int main() { // 设置随机数种子 srand(time(0)); // 采用对象+成员函数的方式进行多线程 ProducerConsumerProblem obj; std::vector<std::thread> thdVec; for (int i = 0; i < WORKERTHREAD_COUNT; i += 1) { // 只能move,放入vector延长对象的生存周期 auto thd = std::thread(&ProducerConsumerProblem::consumer, &obj); thdVec.push_back(std::move(thd)); thdVec[i].detach(); } // 通过手动输入,生产物品 int cnt; // 通过这里的输入阻塞程序 while (printf(">>Please input produce number:>\n"), std::cin >> cnt) { obj.produce(cnt); } return 0; }
这里采用对象+成员函数的形式创建线程,因为成员函数的第一个参数是一个隐式的this,因此写std::thread(&ProducerConsumerProblem::consumer, &obj);
注意,std::thread
只能move,不能copy。
要将线程对象的生命周期延长,因此移动到vector中。
为了让下文的code执行,将线程进行分离detach()
。
class ProducerConsumerProblem { private: // 任务队列 std::queue<std::string> m_queue; // mutex std::mutex m_mutex; // condition_variable std::condition_variable m_cv; public: /** * 生产者函数 */ void produce(const int cnt) ; /** * 消费者函数 */ void consumer() ; };
这里的核心数据结构就是std::condition_variable
通常配合std::unique_lock<>
使用。
void produce(const int cnt) { printf("(Produce Thread:%lld) start\n", std::this_thread::get_id()); // RAII 自动解锁 std::lock_guard<std::mutex> lock(m_mutex); char buff[1024] = {}; for (int i = 0; i < cnt; i += 1) { sprintf(buff, "Produce[%02d]", i); m_queue.push(buff); } // 唤醒所有消费者 m_cv.notify_all(); // 唤醒任意一个消费者 // m_cv.notify_one(); }
std::lock_guard<std::mutex>
可以做到最简单的构造时上锁,析构时解锁。
notify_all()
唤醒所有wait中的对象。
notify_one()
唤醒一个wait中的对象。
void consumer() { printf("(Consumer Thread:%lld) start\n", std::this_thread::get_id()); constexpr int SLEEP_TIME = 5; std::string receiveBuffer; while (1) { // 比lock_guard更细致的控制力度 std::unique_lock<std::mutex> lock(m_mutex); /// !!!最重要的部分!!! while (m_queue.empty()) { // 等待生产者唤醒 printf("(Thread:%lld) is waiting ~~~\n", std::this_thread::get_id()); m_cv.wait(lock); } // 此时任务队列有数据,且cv被唤醒 // 消费者处理一个任务 receiveBuffer = m_queue.front(); m_queue.pop(); // 当前消费者线程获得数据,手动解锁 lock.unlock(); // 模拟随机设定一个消费时间 int solveTimeCost = (rand() % SLEEP_TIME); printf("(Thread:%lld) => %s need %d seconds ...\n", std::this_thread::get_id(), receiveBuffer.c_str(), solveTimeCost); std::this_thread::sleep_for(std::chrono::seconds(solveTimeCost)); } }
std::unique_lock<std::mutex>
居于更细致的操作力度,可以手动上锁解锁。
m_cv.wait(lock);
整个demo的核心!!!
在消费者在处理任务时,可以让unique_lock主动解锁。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。