当前位置:   article > 正文

golang 实现生产者消费者模式(转)

golang生产者消费者模式

方法一:用两个通道 + A协程sleep

一个通道用来传数据,一个用来传停止信号。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // 老师视频里的生产者消费者
  7. func main() {
  8. //知识点: 老师这里用了两个线程,一个用个传数据,一个用来传关闭信号
  9. messages := make(chan int, 10)
  10. done := make(chan bool)
  11. defer close(messages)
  12. // consumer
  13. go func() {
  14. ticker := time.NewTicker(1 * time.Second)
  15. for range ticker.C {
  16. select {
  17. case <-done:
  18. fmt.Println("child process interrupt...") // 数据还没收完,就被停止了。
  19. return
  20. default:
  21. fmt.Printf("receive message:%d\n", <-messages)
  22. }
  23. }
  24. }()
  25. // producer
  26. for i := 0; i < 10; i++ {
  27. messages <- i
  28. }
  29. // 5秒后主线程关闭done通道
  30. time.Sleep(5 * time.Second)
  31. close(done)
  32. time.Sleep(1 * time.Second)
  33. fmt.Println("main process exit!")
  34. }

程序输出如下:

  1. receive message:0
  2. receive message:1
  3. receive message:2
  4. receive message:3
  5. child process interrupt...
  6. main process exit!

方法二:利用无缓冲channel与任务发送/执行分离方式

  1. package main
  2. import (
  3. "fmt"
  4. "math"
  5. "sync"
  6. "runtime"
  7. )
  8. var wg = sync.WaitGroup{}
  9. func busi(ch chan int) {
  10. for t := range ch {
  11. fmt.Println("go task = ", t, ", goroutine count = ", runtime.NumGoroutine())
  12. wg.Done()
  13. }
  14. }
  15. func sendTask(task int, ch chan int) {
  16. wg.Add(1)
  17. ch <- task
  18. }
  19. func main() {
  20. ch := make(chan int) //无buffer channel
  21. goCnt := 3 //启动goroutine的数量
  22. for i := 0; i < goCnt; i++ {
  23. //启动go
  24. go busi(ch)
  25. }
  26. taskCnt := math.MaxInt64 //模拟用户需求业务的数量
  27. for t := 0; t < taskCnt; t++ {
  28. //发送任务
  29. sendTask(t, ch)
  30. }
  31. wg.Wait()
  32. }

结果

  1. //...
  2. go task = 130069 , goroutine count = 4
  3. go task = 130070 , goroutine count = 4
  4. go task = 130071 , goroutine count = 4
  5. go task = 130072 , goroutine count = 4
  6. go task = 130073 , goroutine count = 4
  7. go task = 130074 , goroutine count = 4
  8. go task = 130075 , goroutine count = 4
  9. go task = 130076 , goroutine count = 4
  10. go task = 130077 , goroutine count = 4
  11. go task = 130078 , goroutine count = 4
  12. go task = 130079 , goroutine count = 4
  13. go task = 130080 , goroutine count = 4
  14. go task = 130081 , goroutine count = 4
  15. go task = 130082 , goroutine count = 4
  16. go task = 130083 , goroutine count = 4
  17. go task = 130084 , goroutine count = 4
  18. go task = 130085 , goroutine count = 4
  19. go task = 130086 , goroutine count = 4
  20. go task = 130087 , goroutine count = 4
  21. go task = 130088 , goroutine count = 4
  22. go task = 130089 , goroutine count = 4
  23. go task = 130090 , goroutine count = 4
  24. go task = 130091 , goroutine count = 4
  25. go task = 130092 , goroutine count = 4
  26. go task = 130093 , goroutine count = 4
  27. ...

执行流程大致如下,这里实际上是将任务的发送和执行做了业务上的分离。使得消息出去,输入SendTask的频率可设置、执行Goroutine的数量也可设置。也就是既控制输入(生产),又控制输出(消费)。使得可控更加灵活。这也是很多Go框架的Worker工作池的最初设计思想理念。此方法转自:5、Go是否可以无限go? 如何限定数量?

方法三:使用context.WithTimeout

下面的例子比较复杂,基于 Channel 编写一个简单的单协程生产者消费者模型

要求如下:

1)队列:队列长度 10,队列元素类型为 int
2)生产者:每 1 秒往队列中放入一个类型为 int 的元素,队列满时生产者可以阻塞
3)消费者:每2秒从队列中获取一个元素并打印,队列为空时消费者阻塞
4)主协程30秒后要求所有子协程退出。
5)要求优雅退出,即消费者协程退出前,要先消费完所有的int
6)通过入参支持两种运行模式:
wb(温饱模式)生产速度快过消费速度、
je(饥饿模式)生产速度慢于消费速度
context.WithTimeout见第87行。

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "sync"
  7. "time"
  8. )
  9. // 课后练习 1.2
  10. // 基于 Channel 编写一个简单的单协程生产者消费者模型。
  11. // 要求如下:
  12. // 1)队列:队列长度 10,队列元素类型为 int
  13. // 2)生产者:每 1 秒往队列中放入一个类型为 int 的元素,队列满时生产者可以阻塞
  14. // 3)消费者:每2秒从队列中获取一个元素并打印,队列为空时消费者阻塞
  15. // 4)主协程30秒后要求所有子协程退出。
  16. // 5)要求优雅退出,即消费者协程退出前,要先消费完所有的int。
  17. // 知识点:
  18. // 1) 切片的零值也是可用的。
  19. // 2) context.WithTimeout
  20. var (
  21. wg sync.WaitGroup
  22. p Producer
  23. c Consumer
  24. )
  25. type Producer struct {
  26. Time int
  27. Interval int
  28. }
  29. type Consumer struct {
  30. Producer
  31. }
  32. func (p Producer) produce(queue chan<- int, ctx context.Context) {
  33. go func() {
  34. LOOP:
  35. for {
  36. p.Time = p.Time + 1
  37. queue <- p.Time
  38. fmt.Printf("生产者进行第%d次生产,值:%d\n", p.Time, p.Time)
  39. time.Sleep(time.Duration(p.Interval) * time.Second)
  40. select {
  41. case <-ctx.Done():
  42. close(queue)
  43. break LOOP
  44. }
  45. }
  46. wg.Done()
  47. }()
  48. }
  49. func (c Consumer) consume(queue <-chan int, ctx context.Context) {
  50. go func() {
  51. LOOP:
  52. for {
  53. c.Time++
  54. val := <-queue
  55. fmt.Printf("-->消费者进行第%d次消费,值:%d\n", c.Time, val)
  56. time.Sleep(time.Duration(c.Interval) * time.Second)
  57. select {
  58. case <-ctx.Done():
  59. //remains := new([]int)
  60. //remains := []int{}
  61. var remains []int // 知识点:切片的零值也是可用的。
  62. for val = range queue {
  63. remains = append(remains, val)
  64. fmt.Printf("-->消费者: 最后一次消费, 值为:%v\n", remains)
  65. break LOOP
  66. }
  67. }
  68. }
  69. wg.Done()
  70. }()
  71. }
  72. func main() {
  73. wg.Add(2)
  74. // 知识点:context.Timeout
  75. timeout := 30
  76. ctx, _ := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
  77. queue := make(chan int, 10)
  78. p.produce(queue, ctx)
  79. fmt.Println("main waiting...")
  80. wg.Wait()
  81. fmt.Println("done")
  82. }
  83. /*
  84. 启动命令:
  85. $ go run main/main.go -m wb
  86. $ go run main/main.go -m je
  87. */
  88. func init() {
  89. // 解析程序入参,运行模式
  90. mode := flag.String("m", "wb", "请输入运行模式:\nwb(温饱模式)生产速度快过消费速度、\nje(饥饿模式)生产速度慢于消费速度)")
  91. flag.Parse()
  92. p = Producer{}
  93. c = Consumer{}
  94. if *mode == "wb" {
  95. fmt.Println("运行模式:wb(温饱模式)生产速度快过消费速度")
  96. p.Interval = 1 // 每隔1秒生产一次
  97. c.Interval = 5 // 每隔5秒消费一次
  98. // p = Producer{Interval: 1}
  99. // c = Consumer{Interval: 5} // 这一行会报错,为什么?
  100. } else {
  101. fmt.Println("运行模式:je(饥饿模式)生产速度慢于消费速度")
  102. p.Interval = 5 // 每隔5秒生产一次
  103. c.Interval = 1 // 每隔1秒消费一次
  104. }
  105. }

wb(温饱模式)生产速度快过消费速度,输出如下:

  1. 运行模式:wb(温饱模式)生产速度快过消费速度
  2. 生产者: 第1次生产, 值为:1
  3. -->消费者: 第1次消费, 值为:1
  4. 生产者: 第2次生产, 值为:2
  5. 生产者: 第3次生产, 值为:3
  6. 生产者: 第4次生产, 值为:4
  7. 生产者: 第5次生产, 值为:5
  8. -->消费者: 第2次消费, 值为:2
  9. 生产者: 第6次生产, 值为:6
  10. 生产者: 第7次生产, 值为:7
  11. 生产者: 第8次生产, 值为:8
  12. 生产者: 第9次生产, 值为:9
  13. 生产者: 第10次生产, 值为:10
  14. -->消费者: 第3次消费, 值为:3
  15. 生产者: 第11次生产, 值为:11
  16. 生产者: 第12次生产, 值为:12
  17. 生产者: 第13次生产, 值为:13
  18. -->消费者: 第4次消费, 值为:4
  19. 生产者: 第14次生产, 值为:14
  20. -->消费者: 第5次消费, 值为:5
  21. 生产者: 第15次生产, 值为:15
  22. 生产者: 第16次生产, 值为:16
  23. -->消费者: 第6次消费, 值为:6
  24. main waiting
  25. 生产者: 第17次生产, 值为:17
  26. -->消费者: 最后一次消费, 值为:[7 8 9 10 11 12 13 14 15 16 17]
  27. -- done --

je(饥饿模式)生产速度慢于消费速度,输出如下:

  1. 运行模式:je(饥饿模式)生产速度慢于消费速度
  2. -->消费者: 第1次消费, 值为:1
  3. 生产者: 第1次生产, 值为:1
  4. 生产者: 第2次生产, 值为:2
  5. -->消费者: 第2次消费, 值为:2
  6. 生产者: 第3次生产, 值为:3
  7. -->消费者: 第3次消费, 值为:3
  8. 生产者: 第4次生产, 值为:4
  9. -->消费者: 第4次消费, 值为:4
  10. 生产者: 第5次生产, 值为:5
  11. -->消费者: 第5次消费, 值为:5
  12. 生产者: 第6次生产, 值为:6
  13. -->消费者: 第6次消费, 值为:6
  14. main waiting
  15. -->消费者: 第7次消费, 值为:0
  16. -->消费者: 最后一次消费, 值为:[]
  17. -- done--

————————————————
版权声明:本文为CSDN博主「湾区的候鸟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/tjg138/article/details/124114511

方法一和方法三均转自:Go语言实现超时的3种方法

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/726664
推荐阅读
相关标签
  

闽ICP备14008679号