赞
踩
线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型是多线程编程和分布式系统中的一个经典概念,它描述了生产者和消费者之间的交互方式。在这个模型中,生产者负责生成数据或任务,而消费者则负责处理这些数据或任务。这种模型在处理并发和异步操作时非常有用,尤其是在需要平衡生产速率和消费速率的情况下。
生产者(Producer):负责生成数据或任务的实体。在多线程环境中,这通常是一个线程或一组线程。
消费者(Consumer):负责处理数据或任务的实体。同样,这也可以是一个线程或一组线程。
缓冲区(Buffer):生产者和消费者之间的中间存储区域,用于临时存放生产者生成的数据,直到消费者准备好处理它们。
生产消费模型是理解和实现高效并发和分布式系统的关键,通过合理设计和优化,可以显著提高系统的性能和稳定性。
BlockQueue.hpp: 阻塞队列
阻塞队列是一种支持两个附加操作的队列。这两个附加的操作是:当队列为空时,获取元素的线程会等待队列变为非空;当队列已满时,存储元素的线程会等待队列可用。
#ifndef __BLOCK_QUEUE_HPP__ #define __BLOCK_QUEUE_HPP__ #include <iostream> #include <string> #include <queue> #include <pthread.h> using namespace std; template <class T> class BlockQueue { public: BlockQueue(int cap) :_cap(cap),_product_wait_num(0),_consum_wait_num(0) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_product_cond,nullptr); pthread_cond_init(&_consum_cond,nullptr); } void Enqueue(T& in)//生产者所用接口 { pthread_mutex_lock(&_mutex);//对临界资源开启保护 while(IsFull())//当队列存满后需要让生产者停止生产,进入阻塞状态 { _product_wait_num++; pthread_cond_wait(&_product_cond,&_mutex); _product_wait_num--; } //开始生产 _block_queue.push(move(in)); //让消费者来消费 if(_consum_wait_num>0) pthread_cond_signal(&_consum_cond); pthread_mutex_unlock(&_mutex); } void Pop(T* out) { pthread_mutex_lock(&_mutex);//对临界资源开启保护 while(IsEmpty())//当队列空缺后需要让消费者停止消费,进入阻塞状态 { _consum_wait_num++; pthread_cond_wait(&_consum_cond,&_mutex); _consum_wait_num--; } //进行消费 *out=_block_queue.front(); _block_queue.pop(); //通知生产者 if(_product_wait_num>0) pthread_cond_signal(&_product_cond); pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_product_cond); pthread_cond_destroy(&_consum_cond); } private: bool IsFull() { return _block_queue.size() == _cap; } bool IsEmpty() { return _block_queue.empty(); } queue<T> _block_queue;//阻塞队列 int _cap; //总上限 pthread_mutex_t _mutex; //互斥锁 pthread_cond_t _product_cond; //生产者的条件变量 pthread_cond_t _consum_cond; //消费者的条件变量 int _product_wait_num; int _consum_wait_num; }; #endif
代码解释:
main.cc: 主函数
#include"BlockQueue.hpp" #include"Thread.hpp" #include<string> #include<vector> #include<unistd.h> using namespace ThreadMdule; int a=10; //生产者 void Productor(BlockQueue<int>& bq) { int cnt=1; while (true) { bq.Enqueue(cnt); std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl; cnt++; //sleep(3); } } //消费者 void Consumer(BlockQueue<int>& bq) { while (true) { int data; bq.Pop(&data); std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl; sleep(5); } } //执行创建线程的函数 void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func) { for (int i = 0; i < num; i++) { std::string name = "thread-" + std::to_string(i + 1); //将线程放入threads中,记录信息 threads->emplace_back(func, bq, name); threads->back().start(); } } void StartProductor(vector<Thread<BlockQueue<int>>>* threads,int num,BlockQueue<int>& bq) { StartComm(threads,num,bq,Productor); } void StartConsumer(vector<Thread<BlockQueue<int>>>* threads,int num,BlockQueue<int>& bq) { StartComm(threads,num,bq,Consumer); } void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads) { for (auto &thread : threads) { thread.Join(); } } int main() { BlockQueue<int>* bq=new BlockQueue<int>(5); vector<Thread<BlockQueue<int>>> threads;//用threads来记录线程的信息 StartProductor(&threads,1,*bq); StartConsumer(&threads,1,*bq); WaitAllThread(threads); }
细节:
这里在上面主函数代码上更改生产者的数量即可。
直接验证:
这里用任务类来作为阻塞队列的任务,让生产者产出对应任务,消费者来解决任务;生产出来的任务先放入阻塞队列作为缓冲;
#include<iostream> #include<string> #include<functional> using namespace std; class Task { public: Task(){} Task(int a,int b): _a(a),_b(b),_result(0) {} void Excute() { _result=_a+_b; } string ResultToString() { return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result); } string DebugToString() { return to_string(_a) + "+" + to_string(_b) + "= ?"; } private: int _a; int _b; int _result; };
//类型
//生产者:
//消费者:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。