当前位置:   article > 正文

五个同步问题的经典模型之一:生产者/消费者问题_多个 产者进程和多个消费者进程共享 个初始为空、固定 为n的缓存(缓

多个 产者进程和多个消费者进程共享 个初始为空、固定 为n的缓存(缓

也叫缓存绑定问题(bounded- buffer),是一个经典的、多进程同步问题。

单生产者和单消费者

有两个进程:一组生产者进程和一组消费者进程共享一个初始为空、固定大小为n的缓存(缓冲区)。生产者的工作是制造一段数据,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待,如此反复; 同时,只有缓冲区不空时,消费者才能从中取出消息,一次消费一段数据(即将其从缓存中移出),否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。

问题的核心是:
1.要保证不让生产者在缓存还是满的时候仍然要向内写数据;
2.不让消费者试图从空的缓存中取出数据。

生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,他们也是同步关系。

  • 解决思路:对于生产者,如果缓存是满的就去睡觉。消费者从缓存中取走数据后就叫醒生产者,让它再次将缓存填满。若消费者发现缓存是空的,就去睡觉了。下一轮中生产者将数据写入后就叫醒消费者。
    不完善的解决方案会造成“死锁”,即两个进程都在“睡觉”等着对方来“唤醒”。

只有生产者和消费者两个进程,正好是这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步PV操作的位置。使用“进程间通信”,“信号标”semaphore就可以解决唤醒的问题:

我们使用了两个信号标:full 和 empty 。信号量mutex作为互斥信号量,它用于控制互斥访问缓冲池,互斥信号量初值为 1;信号量 full 用于记录当前缓冲池中“满”缓冲区数,初值为0。信号量 empty 用于记录当前缓冲池中“空”缓冲区数,初值为n。新的数据添加到缓存中后,full 在增加,而 empty 则减少。如果生产者试图在 empty 为0时减少其值,生产者就会被“催眠”。下一轮中有数据被消费掉时,empty就会增加,生产者就会被“唤醒”。

伪代码:

  1. semaphore mutex=1; //临界区互斥信号量
  2. semaphore empty=n; //空闲缓冲区
  3. semaphore full=0; //缓冲区初始化为空
  4. producer ()//生产者进程
  5. {
  6. while(1)
  7. {
  8. produce an item in nextp; //生产数据
  9. P(empty); //获取空缓冲区单元
  10. P(mutex); //进入临界区.
  11. add nextp to buffer; //将数据放入缓冲区
  12. V(mutex); //离开临界区,释放互斥信号量
  13. V(full); //满缓冲区数加1
  14. }
  15. }
  16. consumer ()//消费者进程
  17. {
  18. while(1)
  19. {
  20. P(full); //获取满缓冲区单元
  21. P(mutex); // 进入临界区
  22. remove an item from buffer; //从缓冲区中取出数据
  23. V (mutex); //离开临界区,释放互斥信号量
  24. V (empty) ; //空缓冲区数加1
  25. consume the item; //消费数据
  26. }
  27. }

该类问题要注意对缓冲区大小为n的处理,当缓冲区中有空时便可对empty变量执行P 操作,一旦取走一个产品便要执行V操作以释放空闲区。对empty和full变量的P操作必须放在对mutex的P操作之前。

1、若生产者进程已经将缓冲区放满,消费者进程并没有取产品,即 empty = 0,当下次仍然是生产者进程运行时,它先执行 P(mutex)封锁信号量,再执行 P(empty)时将被阻塞,希望消费者取出产品后将其唤醒。轮到消费者进程运行时,它先执行 P(mutex),然而由于生产者进程已经封锁 mutex 信号量,消费者进程也会被阻塞,这样一来生产者进程与消费者进程都
将阻塞,都指望对方唤醒自己,陷入了无休止的等待。
2、若消费者进程已经将缓冲区取空,即 full = 0,下次如果还是消费者先运行,也会出现类似的死锁。
不过生产者释放信号量时,mutex、full 先释放哪一个无所谓,消费者先释放 mutex 还是 empty 都可以。

多生产者多消费者

在多个制造商和多个消费者出现的情况下就会造成拥护不堪的情况,会导致两个或多个进程同时向一个磁道写入或读出数据。要理解这种情况是如何出现的,我们可以借助于putItemIntoBuffer()函数。它包含两个动作:一个来判断是否有可用磁道,另一个则用来向其写入数据。如果进程可以由多个制造商并发执行,下面的情况则会出现:
1、 两个制造商为emptyCount减值;
2、 一个制造商判断缓存中有可用磁道;
3、 第二个制造商与第一个制造商一样判断缓存中有可用磁道;
4、 两个制造商同时向同一个磁道写入数据。

多个生产者向一个缓冲区中存入数据,多个生产者从缓冲区中取数据。这是有界缓冲区问题,队列改写,生产者们之间、消费者们之间、生产者消费者之间互相互斥。
共享缓冲区作为一个环绕缓冲区,存数据到尾时再从头开始。

  • 我们使用一个互斥量保护生产者向缓冲区中存入数据。由于有多个生产者,因此需要记住现在向缓冲区中存入的位置。
  • 使用一个互斥量保护缓冲区中消息的数目,这个生产的数据数目作为生产者和消费者沟通的桥梁。
  • 使用一个条件变量用于唤醒消费者。由于有多个消费者,同样消费者也需要记住每次取的位置。

在选项中选择生产条目的数目,生产者的线程数目,消费者的线程数目。生产者将条目数目循环放入缓冲区中,消费者从缓冲区中循环取出并在屏幕上打印出来。
为了克服这个问题,我们需要一个方法,以确保一次只有一个制造商在执行调用函数。换个说法来讲,我们需要一个有“互斥信号标”(mutal exclusion)的“关键扇区”(critical section)。为了实现这一点,我们使用一个叫mutex二位信号标。因为一个二位信号标的值只能是1或0,只有一个进程能执行down(mutex)或up(mutex)。

代码:

  1. #include "unp.h"
  2. static const int NBUFF = 10000;
  3. static const int MAXNTHREADS = 100;
  4. static int nitems; //总共生产的条目数
  5. static int buff[NBUFF]; //生产者向其中放数据,消费者从中取数据
  6. static struct put//生产者使用的结构,向其中互斥的放数据
  7. {
  8. pthread_mutex_t mutex;
  9. int nput; //net position to put
  10. int nval; //next value to store
  11. } put =
  12. {
  13. PTHREAD_MUTEX_INITIALIZER
  14. };
  15. //记录缓冲区的状态,准备好的数目,消费者唯一关注的结构,当然生产者也会使用
  16. static struct nready
  17. {
  18. pthread_mutex_t mutex;
  19. pthread_cond_t cond;
  20. int nget;
  21. int nready; //number ready for consumer
  22. } nready =
  23. {
  24. PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER
  25. };//
  26. void *produce(void*);
  27. void *consume(void*);
  28. int main(int argc, char **argv)
  29. {
  30. if (argc != 4)
  31. {
  32. err_quit("Usage: a.out <#items> <#produce_nthreads>
  33. <#consume_nthreads>");
  34. }
  35. nitems = atoi(argv[1]);
  36. int produce_nthreads = min(atoi(argv[2]), MAXNTHREADS);
  37. int consume_nthreads = min(atoi(argv[3]), MAXNTHREADS);
  38. //Solaris 2.6需要设置线程并发数
  39. //Set_concurrency(nthreads + 1);
  40. pthread_t tid_produce[MAXNTHREADS];
  41. for (int i = 0; i < produce_nthreads; ++i)
  42. {
  43. Pthread_create(&tid_produce[i], NULL, produce, NULL);
  44. }
  45. pthread_t tid_consume[MAXNTHREADS];
  46. for (int i = 0; i < consume_nthreads; ++i)
  47. {
  48. Pthread_create(&tid_consume[i], NULL, consume, NULL);
  49. }
  50. //等待线程终止
  51. for (int i = 0; i < produce_nthreads; ++i)
  52. {
  53. Pthread_join(tid_produce[i], NULL);
  54. }
  55. for (int i = 0; i < consume_nthreads; ++i)
  56. {
  57. Pthread_join(tid_consume[i], NULL);
  58. }
  59. exit(0);
  60. }
  61. void *produce(void *arg)
  62. {
  63. printf("producd\n");
  64. //多个生产者
  65. for ( ; ; )
  66. {
  67. Pthread_mutex_lock(&put.mutex);
  68. //已存了需要多的数
  69. if (put.nval >= nitems)
  70. {
  71. Pthread_mutex_unlock(&put.mutex);
  72. return NULL;
  73. }
  74. buff[put.nput] = put.nval;
  75. if (++put.nput >= NBUFF)
  76. {
  77. put.nput = 0;
  78. }
  79. ++put.nval;
  80. Pthread_mutex_unlock(&put.mutex);
  81. //当生产了数据后通知条件变量,应该使临界区尽量短,宁愿使用多个互斥量
  82. Pthread_mutex_lock(&nready.mutex);
  83. if (nready.nready == 0)
  84. {
  85. Pthread_cond_signal(&nready.cond);
  86. }
  87. ++nready.nready;
  88. Pthread_mutex_unlock(&nready.mutex);
  89. } //end for(;;)
  90. return NULL;
  91. }
  92. void *consume(void *argv)
  93. {
  94. printf("consume\n");
  95. //多个消费者
  96. //只生产nitems个选项
  97. for ( ; ; )
  98. {
  99. Pthread_mutex_lock(&nready.mutex);
  100. //while避免虚假唤醒
  101. while (nready.nready == 0)
  102. {
  103. Pthread_cond_wait(&nready.cond, &nready.mutex);
  104. }
  105. //int ival = buff[nready.nget];
  106. //if (++nready.nget == NBUFF) {
  107. // nready.nget = 0;
  108. //}
  109. if (++nready.nget >= nitems)
  110. {
  111. //nget比较的取值为1..nitems,当为nitems时少操作了一次,总共操作nitems次
  112. if (nready.nget == nitems)
  113. {
  114. printf("buff[%d] = %d\n", nready.nget - 1,
  115. buff[(nready.nget - 1) % NBUFF]);
  116. }
  117. Pthread_cond_signal(&nready.cond);
  118. Pthread_mutex_unlock(&nready.mutex);
  119. return NULL;
  120. }
  121. --nready.nready;
  122. Pthread_mutex_unlock(&nready.mutex);
  123. //仅仅读数据不许要互斥
  124. //if (buff[nready.nget - 1] != nready.nget - 1)
  125. {
  126. printf("buff[%d] = %d\n", nready.nget - 1,
  127. buff[(nready.nget - 1) % NBUFF]);
  128. //printf("buff[%d] = %d\n", nready.nget,
  129. buff[nready.nget]);
  130. //}
  131. } //end for(i:0..nitems)
  132. return NULL;
  133. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/267922
推荐阅读
相关标签
  

闽ICP备14008679号