1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > Linux系统编程 -- 多线程之基于阻塞队列生产者与消费者模型

Linux系统编程 -- 多线程之基于阻塞队列生产者与消费者模型

时间:2022-01-24 12:39:40

相关推荐

Linux系统编程 -- 多线程之基于阻塞队列生产者与消费者模型

(1)生产者与消费者模型概述

在现实生活中,当我们缺少某些生活用品时,就回到超市去购买。当你到超市时,你的身份就是消费者,那么这些商品又是哪里来的呢,自然是供应商,那么它们就是生产者,而超市在生产者与消费者之间,就充当了一个交易场所。正是这样的方式才使得人类的交易变得高效,生产者只需要向超市供应商品,消费者只需要去超市购买商品

计算机是现实世界的抽象,因此像这种人类世界的模型,自然也被引入到了计算机当中。在实际软件开发中,进程或线程就是生产者和消费者,他们分别产生大量数据或消耗大量数据,但是他们之间一般不直接进行交流,而是生产者生产好数据之后把数据交到一个缓冲区中,消费者需要数据时直接从缓冲区中取就可以了

我们将其总结为321原则——3种关系,2个角色,1个场所

3种关系:生产者与生产者之间是互斥关系,消费者与消费者之间是互斥关系,生产者与消费者之间是同步关系

2个角色:生产者和消费者

1个场所:它们之间进行数据交互是在一缓冲区当中,这个缓冲区可以有多种表现形式

(2)生产者与消费者模型优点

主要有三点

解耦

支持并发

支持忙闲不均

其中最为重要的就是解耦,我们在软件工程这门课程中,经常听到一个原则就是“高内聚,低耦合”,但是很多同学对这句话的理解不是特别准确。

如下代码,它的耦合性就非常高。因为当main函数调用add函数时,在add函数还没有完成之前,它只能等待add函数执行完毕,这里的代码比较简单,但是如果数据很庞大呢?其实就会造成main函数阻塞很长时间

#include <stdio.h>void add(int x,int y){int ret=x+y;printf("%d\n",ret);}int main(){int a=10,b=20;add(a,b);}

而如果按照生产者与消费者模型将其分在两个线程之间运行,这样的话main函数只负责写数据,它就无需关心add函数了,所以这样就会使其耦合性降低

(3)基于阻塞队列(blockingqueue)的生产者消费者模型(单消费者单生产者)

处于生产者与消费者之间的缓冲区可以有多种实现方式,比较经典的就如阻塞队列。如下当队列为空时,消费者将会被阻塞,直到队列中放了数据;当队列满时,生产者就会被阻塞,直到有数据取出

为了演示方便,先以单消费者和单生产者做演示,共有两个文件,blockqueue.h用于实现阻塞队列,blockqueue.cpp用于控制线程

makefile如下

blockqueue.exe:blockqueue.cppg++ -o $@ $^ -std=c++11 -lpthread.PHONY:cleanclean:rm -f blockqueue.exe

blockqueue.h如下

#include <iostream>#include <unistd.h>#include <pthread.h>#include <queue>using namespace std;class BlockQueue{private://成员变量pthread_mutex_t lock;//锁pthread_cond_t consumer_cond;//消费者的等待条件pthread_cond_t productor_cond;//生产者的等待条件queue<int> q;//队列size_t _cpacity;//队列最大容量private://私有接口void QueueLock()//锁定{pthread_mutex_lock(&lock);}void QueueUnLock()//解锁{pthread_mutex_unlock(&lock);}bool IsFull(){return q.size()>=_cpacity;//满了}bool IsEmpty(){return q.empty();//空了}void WakeUpConsumber()//唤醒消费者{cout<<"【唤醒消费者】"<<endl;pthread_cond_signal(&consumer_cond);}void WakeUpProductor()//唤醒生产者{cout<<"【唤醒生产者】"<<endl;pthread_cond_signal(&productor_cond);}void ConsumberWait()//消费者等待唤醒条件{cout<<"队列已经空了,消费者睡觉了"<<endl;pthread_cond_wait(&consumer_cond,&lock);}void ProductorWait()//生产者等待唤醒条件{cout<<"队列已经满了,生产者睡觉了"<<endl;pthread_cond_wait(&productor_cond,&lock);}public://对外接口BlockQueue(int capacity)//构造函数:_cpacity(capacity){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&consumer_cond,nullptr);pthread_cond_init(&productor_cond,nullptr);}~BlockQueue()//析构{pthread_mutex_destroy(&lock);pthread_cond_destroy(&consumer_cond);pthread_cond_destroy(&productor_cond);}void Push(int val)//生产者放数据{QueueLock();//进入临界资源if(IsFull())//如果队列已经满了{WakeUpConsumber();//唤醒消费者ProductorWait();//生产者睡觉} q.push(val);//没有满就push}QueueUnLock();//走出临界资源·}void Get(int& val)//消费者拿数据{QueueLock();if(IsEmpty())//如果队列空了{WakeUpProductor();//唤醒生产者ConsumberWait();//消费者睡觉}val=q.front();q.pop();QueueUnLock();}};

blockqueue.cpp如下

#include “blockqueue.h”

using namespace std;

void* con_run(void* arg)

{

BlockQueue* bq=(BlockQueue*)arg;

while(1)

{

int data;

bq->Get(data);

sleep(2);

cout<<“消费者拿到了数据:”<<data<<endl;

}

}

void* pro_run(void* arg)

{

BlockQueue* bq=(BlockQueue*)arg;

while(1)

{

int data=rand()%10+1;//产生1-10的随机数

bq->Push(data);

sleep(1);

cout<<“生产者放了一个数据:”<<data<<endl;

}

}

int main()

{

BlockQueue* bq=new BlockQueue(5);//创建阻塞队列,给5个大小

pthread_t con,pro;//创建消费者和生产者线程

pthread_create(&con,nullptr,con_run,(void*)bq);

pthread_create(&pro,nullptr,pro_run,(void*)bq);

pthread_join(con,nullptr);

pthread_join(pro,nullptr);

delete bq;

return 0;

}

运行效果如下:

解释一下为什么pthread_cond_wait第二个要携带一把锁?

举个例子,当生产者调用该函数时,表明生产者需要等待,它等待的原因是因为条件不满足了,而这个条件不满足必须要进入临界资源进行判断才能得到的结论,所以这里必须让他带着锁进入资源进行判断,同时等待时必须释放锁。当该函数调用完毕返回之后,它返回的位置恰好就是临界区,所以该函数的另一个作用就是要让该线程重新获得锁

(4)基于阻塞队列(blockingqueue)的生产者消费者模型(多消费者多生产者)

在前面单消费者和单生产者的基础上,做一些简单的修改就能变为多生产者和多消费者了,具体修改如下

建立锁,分别保证每次只有一个消费者或一个生产者进入队列,然后该生产者或消费者再和对应的消费者或生产者进行同步或互斥,也就是组内竞争,再组外同步或竞争

实际过程中不会让队列到达极端情况,也就是全满或全空,所以下面控制了水平线,当容量大于95%时唤醒消费者,小于%5则唤醒生产者

blockqueue.h如下

#include <iostream>#include <unistd.h>#include <pthread.h>#include <queue>using namespace std;class BlockQueue{private://成员变量pthread_mutex_t lock;//锁pthread_cond_t consumer_cond;//消费者的等待条件pthread_cond_t productor_cond;//生产者的等待条件queue<int> q;//队列size_t _cpacity;//队列最大容量private://私有接口void QueueLock()//锁定{pthread_mutex_lock(&lock);}void QueueUnLock()//解锁{pthread_mutex_unlock(&lock);}bool IsFull(){return q.size()>=(_cpacity*0.95);//如果大于容量的95%就算满了}bool IsEmpty(){return q.size()<=(_cpacity*0.05);//如果小于容量的%5就算空了}void WakeUpConsumber()//唤醒消费者{cout<<"【唤醒消费者】"<<endl;pthread_cond_signal(&consumer_cond);}void WakeUpProductor()//唤醒生产者{cout<<"【唤醒生产者】"<<endl;pthread_cond_signal(&productor_cond);}void ConsumberWait()//消费者等待唤醒条件{cout<<"队列已经空了,消费者睡觉了"<<endl;pthread_cond_wait(&consumer_cond,&lock);}void ProductorWait()//生产者等待唤醒条件{cout<<"队列已经满了,生产者睡觉了"<<endl;pthread_cond_wait(&productor_cond,&lock);}public://对外接口BlockQueue(int capacity)//构造函数:_cpacity(capacity){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&consumer_cond,nullptr);pthread_cond_init(&productor_cond,nullptr);}~BlockQueue()//析构{pthread_mutex_destroy(&lock);pthread_cond_destroy(&consumer_cond);pthread_cond_destroy(&productor_cond);}size_t getcapacity(){return _cpacity-q.size();}void Push(int val)//生产者放数据{QueueLock();//进入临界资源if(IsFull())//如果队列已经满了{sleep(2);WakeUpConsumber();//唤醒消费者ProductorWait();//生产者睡觉} q.push(val);//没有满就push}QueueUnLock();//走出临界资源·}void Get(int& val)//消费者拿数据{QueueLock();if(IsEmpty())//如果队列空了{sleep(2);WakeUpProductor();//唤醒生产者ConsumberWait();//消费者睡觉}val=q.front();q.pop();QueueUnLock();}};

blockqueue.cpp如下

#include "blockqueue.h"using namespace std;BlockQueue* bq=new BlockQueue(150);//创建阻塞队列,给150个大小pthread_mutex_t c_lock;//消费者组内竞争锁pthread_mutex_t p_lock;//生产者组内竞争锁void* con_run(void* arg){char* id=(char*)arg;while(1){int data;pthread_mutex_lock(&c_lock);//每次先保证一个消费者进入,然后这个消费者再和生产者去竞争bq->Get(data);usleep(30000); cout<<"消费者"<<id<<"拿到了数据:"<<data<<",队伍空间还有:"<<bq->getcapacity()<<endl;pthread_mutex_unlock(&c_lock);}}void* pro_run(void* arg){char* id=(char*)arg;while(1){int data=rand()%10+1;//产生1-10的随机数pthread_mutex_lock(&p_lock);//这里保证仅有一个生产者进入,然后这个生产者再和消费者进行竞争bq->Push(data);cout<<"生产者"<<id<<"放了一个数据:"<<data<<",队伍空间还有:"<<bq->getcapacity()<<endl;usleep(30000);pthread_mutex_unlock(&p_lock);}}int main(){pthread_mutex_init(&c_lock,nullptr);pthread_mutex_init(&p_lock,nullptr);pthread_t con1,con2,con3,con4,pro1,pro2,pro3,pro4;//创建消费者和生产者线程pthread_create(&con1,nullptr,con_run,(void*)"1号");pthread_create(&con2,nullptr,con_run,(void*)"2号");pthread_create(&con3,nullptr,con_run,(void*)"3号");pthread_create(&con4,nullptr,con_run,(void*)"4号");pthread_create(&pro1,nullptr,pro_run,(void*)"1号");pthread_create(&pro2,nullptr,pro_run,(void*)"2号");pthread_create(&pro3,nullptr,pro_run,(void*)"3号");pthread_create(&pro4,nullptr,pro_run,(void*)"4号");pthread_join(con1,nullptr);pthread_join(con2,nullptr);pthread_join(con3,nullptr);pthread_join(con4,nullptr);pthread_join(pro1,nullptr);pthread_join(pro2,nullptr);pthread_join(pro3,nullptr);pthread_join(pro4,nullptr);pthread_mutex_destroy(&p_lock);pthread_mutex_destroy(&c_lock);delete bq;return 0;}

效果如下:

(4)基于阻塞队列(blockingqueue)的生产者消费者模型(实际模型)

就以简单的用户注册为例,当服务器获取的用户的注册信息时,此时至少会存在两个线程,1个线程负责从服务器读取数据,一个则负责将数据写入数据库内。于是我们建立4个生产者,生产者向消费者派发任务,任务是计算两个数的和,消费者得到任务后进行计算

blockqueue.h如下

#include <iostream>#include <unistd.h>#include <pthread.h>#include <queue>using namespace std;class Task//生产者派发给消费者的任务{public:int _x;int _y;public:Task(int x,int y):_x(x),_y(y){}Task(){}int sum(){return _x+_y;}};class BlockQueue{private://成员变量pthread_mutex_t lock;//锁pthread_cond_t consumer_cond;//消费者的等待条件pthread_cond_t productor_cond;//生产者的等待条件queue<Task> q;//队列size_t _cpacity;//队列最大容量private://私有接口void QueueLock()//锁定{pthread_mutex_lock(&lock);}void QueueUnLock()//解锁{pthread_mutex_unlock(&lock);}bool IsFull(){return q.size()>=_cpacity;}bool IsEmpty(){return q.empty();}void WakeUpConsumber()//唤醒消费者{cout<<"【唤醒消费者】"<<endl;pthread_cond_signal(&consumer_cond);}void WakeUpProductor()//唤醒生产者{cout<<"【唤醒生产者】"<<endl;pthread_cond_signal(&productor_cond);}void ConsumberWait()//消费者等待唤醒条件{cout<<"队列已经空了,消费者睡觉了"<<endl;pthread_cond_wait(&consumer_cond,&lock);}void ProductorWait()//生产者等待唤醒条件{cout<<"队列已经满了,生产者睡觉了"<<endl;pthread_cond_wait(&productor_cond,&lock);}public://对外接口BlockQueue(int capacity)//构造函数:_cpacity(capacity){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&consumer_cond,nullptr);pthread_cond_init(&productor_cond,nullptr);}~BlockQueue()//析构{pthread_mutex_destroy(&lock);pthread_cond_destroy(&consumer_cond);pthread_cond_destroy(&productor_cond);}size_t getcapacity(){return _cpacity-q.size();}void Push(Task t)//生产者放数据{QueueLock();//进入临界资源while(IsFull())//如果队列已经满了,注意使用while防止挂起失败{sleep(1);WakeUpConsumber();//唤醒消费者ProductorWait();//生产者睡觉} q.push(t);//没有满就pushQueueUnLock();//走出临界资源·}void Get(Task& t)//消费者拿数据{QueueLock();while(IsEmpty())//如果队列空了,注意使用while防止挂起失败{sleep(2);WakeUpProductor();//唤醒生产者ConsumberWait();//消费者睡觉}t=q.front();q.pop();QueueUnLock();}};

blockqueue.cpp如下

#include "blockqueue.h"using namespace std;BlockQueue* bq=new BlockQueue(10000);//创建任务队列,任务10000个pthread_mutex_t c_lock;//消费者组内竞争锁pthread_mutex_t p_lock;//生产者组内竞争锁void* con_run(void* arg){char* id=(char*)arg;while(1){pthread_mutex_lock(&c_lock);//每次先保证一个消费者进入,然后这个消费者再和生产者去竞争Task ret;bq->Get(ret);//消费者完成计算printf("消费者%s完成任务,%d+%d=%d\n",id,ret._x,ret._y,ret.sum());pthread_mutex_unlock(&c_lock);}}void* pro_run(void* arg){char* id=(char*)arg;while(1){pthread_mutex_lock(&p_lock);//这里保证仅有一个生产者进入,然后这个生产者再和消费者进行竞争int x=rand()%10000+1;int y=rand()%10000+1;Task task(x,y);//生产者产生任务bq->Push(task);printf("生产者%s派发任务:%d+%d=?\n",id,task._x,task._y);pthread_mutex_unlock(&p_lock);}}int main(){pthread_mutex_init(&c_lock,nullptr);pthread_mutex_init(&p_lock,nullptr);pthread_t con1,con2,pro1,pro2,pro3,pro4;//创建消费者和生产者线程pthread_create(&con1,nullptr,con_run,(void*)"1号");pthread_create(&con2,nullptr,con_run,(void*)"2号");pthread_create(&pro1,nullptr,pro_run,(void*)"1号");pthread_create(&pro2,nullptr,pro_run,(void*)"2号");pthread_create(&pro3,nullptr,pro_run,(void*)"2号");pthread_create(&pro4,nullptr,pro_run,(void*)"2号");pthread_join(con1,nullptr);pthread_join(con2,nullptr);pthread_join(pro1,nullptr);pthread_join(pro2,nullptr);pthread_join(pro3,nullptr);pthread_join(pro4,nullptr);pthread_mutex_destroy(&p_lock);pthread_mutex_destroy(&c_lock);delete bq;return 0;}

效果如下:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。