1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > Linux基于单链表环形队列的多线程生产者消费者模型

Linux基于单链表环形队列的多线程生产者消费者模型

时间:2020-10-14 16:32:07

相关推荐

Linux基于单链表环形队列的多线程生产者消费者模型

生产者–消费者模型简述

对于生产者–消费者模型,相信我们都不陌生,因为生活中,我们无时无刻不在扮演生产者或消费者。但是对于Linux中的生产者–消费者模型,大家又了解了一个什么程度?

其实,说白了就是一个生产,一个消费的关系,而且保证生产者在不生产的时候,消费者无法消费(这跟生活中一样,只不过生活中我们是确实消费不到任何东西,但在计算机中,我们就有可能消费到无效数据,、乱数据、过期数据等等);消费者的消费欲望饱和之后,生产者不生产(同上,如果市场中大量涌入某个商品,导致市场急速饱和,那么厂家户很果断的暂时放弃生产这个商品)。但是生活中的又跟计算机中的有区别,在于:一般情况下,消费者之间并无任何关系,但是计算机中,消费者是存在竞争关系的,即A消费者拿走了这个数据,那么剩余的B,C,D,E…..等等都无法使用这个数据,因为已经被消费了。生产者之间当然是竞争关系了,两个供销商为了业绩,会竞争成为一个大型超市的供货对象,很相似的是,计算机中,多个生产者都是为了为这块空间生产数据(写数据),为了空间的使用权。而生产者与消费者之间的关系为互斥与同步,互斥就是竞争关系,因为邻界资源的访问一次只能允许一个执行分支访问,所以:如果我在写(生产),你就不能读(消费);同步关系表示为:我在写完之后,就会调用函数给你信号,唤醒你来消费,解决二者之间可能出现的饥饿问题。

生产者–消费者间的关系

所以生产者与消费者之间的关系可以总结如下几点:

生产者与生产者:互斥关系;

消费者与消费者:互斥关系;

生产者与消费者:互斥与同步关系。

生产者与消费者模型的实现

实现生产者与消费者模型时我们需要用到互斥锁,与条件变量。

互斥锁

互斥锁即为保护邻界资源的一个保护“装置”,这个我之前解释过,跟二元信号量有类似的地方。当邻界资源未有执行分支使用时,锁是开着的,只要一有执行分支取访问邻界资源,那么锁就会上锁,向外表明:这块邻界资源已经有“人”使用,别“人”来之只能等待(可以使阻塞和非阻塞),当访问邻界资源的执行分支操作完毕,告诉互斥锁,我已经访问完毕,互斥锁开锁,向外界表明:此时邻界资源无“人”使用,可以申请。互斥锁还会保证高优先级的执行分支不能一直持有锁,这样理解:每当一个执行分支操作完毕,就将他的优先级降低,让他去等待队列的后面排队。

相关函数:

#include <pthread.h>pthread_t lock;//定义一个锁,如果这个锁是全局的,就使用宏来初始化,如果是局部的,就使用函数pthread_mutex_init来初始化int pthread_mutex_lock(pthread_mutex_t *mutex);//上锁函数int pthread_mutex_trylock(pthread_mutex_t *mutex);//试上锁函数,即非阻塞等待int pthread_mutex_unlock(pthread_mutex_t *mutex);//解锁函数

条件变量

条件变量是用来描述邻界资源内部状态的一种方法,假如我一直在等,那么我怎么知道等待什么时候,条件变量就是用来跟这个的,当邻界资源中没有可以用来供当前执行分支访问的时候,那么我就挂起等待,并且,注意!注意!我还要解开我的锁,不然我持有锁并挂起等待,那么别的执行分支申请不到锁资源,也会陷入挂起等待,形成上一篇描述的死锁问题。当邻界资源中满足我访问的条件时,你再回来唤醒我,并且此时我再次持有锁资源,以此保证我的正常操作不被别的执行分支打扰。

相关函数:

#include <pthread.h>int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);//计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEDOUT,结束等待//即当前条件不满足我的访问条件,所以是当前执行分支挂起等待int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);//同上,没有计时,直到满足条件为止int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒所有的条件变量导致进入挂起等待的执行分支int pthread_cond_signal(pthread_cond_t *cond);//唤醒单个

基于单链表的生产者–消费者模型

#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <unistd.h>typedef struct list{struct list* next;int data;}Node,*PNode;Node* head;//定义一个全局的单链表,是两个的两个线程公共资源pthread_cond_t procon=PTHREAD_COND_INITIALIZER;//定义条件变量并初始化pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//定义互斥锁并初始化void initlist(PNode* head){*head=NULL;}void* consumer(void* val){int i=1;while(i){Node* tmp=NULL;pthread_mutex_lock(&lock);//上锁,保证访问安全while(NULL == head)//用while原因是pthread_cond_wait可能调用失败{printf("list empty,consunistd.humer just waitting...\n");pthread_cond_wait(&procon,&lock);//如果此时链表为空,即无法访问,所以consumer挂起等待product函数插入节点后访问,//当product函数插入节点时,会唤醒consumer}tmp=head;head=tmp->next;tmp->next=NULL;printf("----------------pthread tid: %u\n",pthread_self());//当前线程的tidprintf("consumer success, total: %d\n",i++);pthread_mutex_unlock(&lock);//解锁free(tmp);tmp=NULL;}return NULL;}void* product(void* val){initlist(&head);int i=1;while(i){pthread_mutex_lock(&lock);Node* node=(Node*)malloc(sizeof(Node));node->data=i;node->next=head;head=node;printf("product success,total: %d\n",i++);pthread_mutex_unlock(&lock);sleep(1);pthread_cond_signal(&procon);//取唤醒consumer}return NULL;}int main(){void *ret;pthread_t pro,con,con1;pthread_create(&pro,NULL,product,NULL);pthread_create(&con,NULL,consumer,NULL);pthread_create(&con1,NULL,consumer,NULL);pthread_join(pro,&ret);pthread_join(con,&ret);return 0;}

程序结果截图:

注意事项:

提醒一点,create线程后必须join线程,不然线程不会运行,别问我为什么。我去哭会/(ㄒoㄒ)/~

基于环形队列的多线程的生产者–消费者模型

相关函数:

初始化函数,*sem表示信号量,pshared为0时表示可供同一进程中的多个线程同步,value为信号量的数量,即邻界资源的数量

#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);

sem_wait函数相当于二元信号量的P操作,给当前信号量表示的邻界资源的数量减一

#include <semaphore.h>int sem_wait(sem_t *sem);int sem_trywait(sem_t *sem);int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

sem_post函数相当于二元信号量的V操作,给当前信号量表示的邻界资源的数量加一

#include <semaphore.h>int sem_post(sem_t *sem);

sem_destroy函数,销毁*sem表示的信号量。

#include <semaphore.h>int sem_destroy(sem_t *sem);

#include <stdio.h>#include <pthread.h>#include <unistd.h>#include <stdlib.h>#include <signal.h>#include <semaphore.h>#define SIZE 10sem_t product_sem;sem_t consumer_sem;static int pro=SIZE;//一开始生产者可以生产static int con=0;//一开始没有产品可供消费static int p=0;//p,c,d变量均是为了打印定义的static int c=0;static int d=0;static int table[SIZE];//用数组表示一个环形队列void* product_fun(void* val){int i=1;int index=0;while(i){sem_wait(&product_sem);//申请一个可生产的空间,即申请成功表示邻界资源可写,否则表示邻界资源已满不可写入。table[p]=i;pro--;con++;printf("-----------------------------------------\n");printf("which product:%u\n",pthread_self());//哪个生产者生产的printf("product# product success,total:%d\n",i++);//一共生产了多少个printf("profuct# you have %d blank source!\n",pro);//生产者可用的生产空间printf("product# you have %d data source!\n",con);//消费可供消费的产品的数量for(index=0;index<SIZE;index++)//输出当前生产的所有产品{if(table[index]!=0)printf("array[%d]:%d ",index,table[index]);}printf("\n");printf("-----------------------------------------\n");printf("\n");sem_post(&consumer_sem);//给消费者可供消费的产品数量加一,即将邻界资源中的可使用的数据加一p=(p+1)%SIZE;//下标调整sleep(1);}return NULL;}void* consumer_fun(void* val){int i=1;int temp=0;while(i){sem_wait(&consumer_sem);// 申请一个消费者可消费的产品,即此时邻界资源中有可使用的数据。if(c!=0)d=c-1;temp=table[c];table[c]=0;pro++;con--;printf("##########################################\n");printf("which consumer:%u\n",pthread_self());//哪个消费者消费printf("consume: %d\n",temp);//消费的数据为tempprintf("consumer# you have %d data source!\n",con);//可供消费的数据量printf("consumer# you have %d blank source!\n",pro);//可供生产的空间printf("##########################################\n");printf("\n");sem_post(&product_sem);//给邻界资源可写的资源加一,表示消费一个数据,拿走数据后,空出一个位置可写c=(c+1)%SIZE;}}void destroy(){sem_destroy(&product_sem);sem_destroy(&consumer_sem);exit(0);}void initsem(){signal(2,destroy);//信号捕捉函数,自定义2号信号量为销毁两个信号量int i=0;for(i=0;i<SIZE;++i)table[i]=0;sem_init(&product_sem,0,SIZE);sem_init(&consumer_sem,0,0);}int main(){initsem();pthread_t product1,product2,consumer1,consumer2;pthread_create(&product1,NULL,product_fun,NULL);pthread_create(&product2,NULL,product_fun,NULL);pthread_create(&consumer1,NULL,consumer_fun,NULL);pthread_create(&consumer2,NULL,consumer_fun,NULL);pthread_join(product1,NULL);pthread_join(product2,NULL);pthread_join(consumer1,NULL);pthread_join(consumer2,NULL);return 0;}

程序结果截图:

如有错误,请指出!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。