当前位置:   article > 正文

生产者-消费者模式golang实现_golang中如何实现producer-consumer模式

golang中如何实现producer-consumer模式

生产者-消费者模式


生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。

基本工作原理

生产者线程增加资源数,如果资源数大于最大值则生产者线程挂起等待,当收到消费者线程的通知后继续生产。
消费者线程减少资源数,如果资源数为0,则消费者线程挂起,等待生产者通知后继续生产。


举例

package main

import (
	"fmt"
	"sync"
	"time"
)

const (
	PRODUCERS  = 5
	CONSUMERS = 2
	PRODUCTS  = 20
)

// productCount为资源的数量,需要互斥处理
var productCount = 0
var lock sync.Mutex
var wg sync.WaitGroup

// 控制生产者阻塞等待
var produceWait chan struct{}
// 控制消费者阻塞等待
var consumeWait chan struct{}

// 当资源达到上限或下限时,挂起单个协程,通过这两个变量休眠同类协程
var stopProduce = false
var stopConsume = false

// Produce 生产者
func Produce(index int, wg *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Producer ", index, " panic")
		}
		wg.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		// 生产者协程发现stopProduce为true,则睡眠5秒
		if stopProduce {
			fmt.Println("Producer ", index, " stop produce, sleep 5 seconds")
			lock.Unlock()
			time.Sleep(time.Second * 5)
			continue
		}
		fmt.Println("Producer ", index, " begin produce")
		if productCount >= PRODUCTS {
			fmt.Println("Products are full")
			// 如果产品满了就停止生产
			stopProduce = true
			lock.Unlock()
			//将当前生产者挂起,等待
			<-produceWait
			lock.Lock()
			stopProduce = false
			lock.Unlock()
			continue
		}
		productCount++
		fmt.Println("Products count is ", productCount)
		// 产品从空的状态到被生产了一个,激活消费者
		if stopConsume {
			var consumerActive struct{}
			consumeWait <- consumerActive
		}
		lock.Unlock()
	}
}

// Consume 消费者
func Consume(index int, wg *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Consumer ", index, " panic")
		}
		wg.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		if stopConsume {
			fmt.Println("Consumer ", index, " stop consume, sleep 5 seconds")
			lock.Unlock()
			time.Sleep(time.Second * 5)
			continue
		}
		fmt.Println("Consumer ", index, " begin consume")
		if productCount <= 0 {
			fmt.Println("Products are empty")
			stopConsume = true
			lock.Unlock()
			//产品空了,将当前消费者挂起
			<-consumeWait
			lock.Lock()
			stopConsume = false
			lock.Unlock()
			continue
		}
		productCount--
		fmt.Println("Products count is ", productCount)

		// 产品从满的状态被消费了一个,激活生产者
		if stopProduce {
			var producerActive struct{}
			produceWait <- producerActive
		}

		lock.Unlock()
	}
}

func main() {
	wg.Add(PRODUCERS + CONSUMERS)
	produceWait = make(chan struct{})
	consumeWait = make(chan struct{})
	for i := 0; i < CONSUMERS; i++ {
		go Consume(i, &wg)
	}
	for i := 0; i < PRODUCERS; i++ {
		go Produce(i, &wg)
	}

	wg.Wait()
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128

在程序中可以发现,在produce函数中,当我们发现系统资源到达临界值时,使用channel将当前的协程阻塞了,然后将bool变量stopProduce置为true,让其它协程进入休眠。为什么要这样做呢?
因为程序中的produceWait是非缓冲的,所以一次只能激活一个生产者,这么做在一定程度限制了生产者,所以我们引入bool变量通知其他协程睡眠,避免此问题,consume函数中也是同样的道理。


其它并发模式


参考资料:

https://www.cnblogs.com/secondtonone1/p/11843269.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/726668
推荐阅读
相关标签
  

闽ICP备14008679号