方法一:用两个通道 + A协程sleep
一个通道用来传数据,一个用来传停止信号。
- package main
-
- import (
- "fmt"
- "time"
- )
-
- // 老师视频里的生产者消费者
-
- func main() {
- //知识点: 老师这里用了两个线程,一个用个传数据,一个用来传关闭信号
- messages := make(chan int, 10)
- done := make(chan bool)
-
- defer close(messages)
-
- // consumer
- go func() {
- ticker := time.NewTicker(1 * time.Second)
- for range ticker.C {
- select {
- case <-done:
- fmt.Println("child process interrupt...") // 数据还没收完,就被停止了。
- return
- default:
- fmt.Printf("receive message:%d\n", <-messages)
- }
-
- }
- }()
-
- // producer
- for i := 0; i < 10; i++ {
- messages <- i
- }
-
- // 5秒后主线程关闭done通道
- time.Sleep(5 * time.Second)
- close(done)
- time.Sleep(1 * time.Second)
- fmt.Println("main process exit!")
-
- }
程序输出如下:
- receive message:0
- receive message:1
- receive message:2
- receive message:3
- child process interrupt...
- main process exit!
方法二:利用无缓冲channel与任务发送/执行分离方式
- package main
-
- import (
- "fmt"
- "math"
- "sync"
- "runtime"
- )
-
- var wg = sync.WaitGroup{}
-
- func busi(ch chan int) {
-
- for t := range ch {
- fmt.Println("go task = ", t, ", goroutine count = ", runtime.NumGoroutine())
- wg.Done()
- }
- }
-
- func sendTask(task int, ch chan int) {
- wg.Add(1)
- ch <- task
- }
-
- func main() {
-
- ch := make(chan int) //无buffer channel
-
- goCnt := 3 //启动goroutine的数量
- for i := 0; i < goCnt; i++ {
- //启动go
- go busi(ch)
- }
-
- taskCnt := math.MaxInt64 //模拟用户需求业务的数量
- for t := 0; t < taskCnt; t++ {
- //发送任务
- sendTask(t, ch)
- }
-
- wg.Wait()
- }
结果
- //...
- go task = 130069 , goroutine count = 4
- go task = 130070 , goroutine count = 4
- go task = 130071 , goroutine count = 4
- go task = 130072 , goroutine count = 4
- go task = 130073 , goroutine count = 4
- go task = 130074 , goroutine count = 4
- go task = 130075 , goroutine count = 4
- go task = 130076 , goroutine count = 4
- go task = 130077 , goroutine count = 4
- go task = 130078 , goroutine count = 4
- go task = 130079 , goroutine count = 4
- go task = 130080 , goroutine count = 4
- go task = 130081 , goroutine count = 4
- go task = 130082 , goroutine count = 4
- go task = 130083 , goroutine count = 4
- go task = 130084 , goroutine count = 4
- go task = 130085 , goroutine count = 4
- go task = 130086 , goroutine count = 4
- go task = 130087 , goroutine count = 4
- go task = 130088 , goroutine count = 4
- go task = 130089 , goroutine count = 4
- go task = 130090 , goroutine count = 4
- go task = 130091 , goroutine count = 4
- go task = 130092 , goroutine count = 4
- go task = 130093 , goroutine count = 4
- ...
执行流程大致如下,这里实际上是将任务的发送和执行做了业务上的分离。使得消息出去,输入SendTask的频率可设置、执行Goroutine的数量也可设置。也就是既控制输入(生产),又控制输出(消费)。使得可控更加灵活。这也是很多Go框架的Worker工作池的最初设计思想理念。此方法转自:5、Go是否可以无限go? 如何限定数量?
方法三:使用context.WithTimeout
下面的例子比较复杂,基于 Channel 编写一个简单的单协程生产者消费者模型。
要求如下:
1)队列:队列长度 10,队列元素类型为 int
2)生产者:每 1 秒往队列中放入一个类型为 int 的元素,队列满时生产者可以阻塞
3)消费者:每2秒从队列中获取一个元素并打印,队列为空时消费者阻塞
4)主协程30秒后要求所有子协程退出。
5)要求优雅退出,即消费者协程退出前,要先消费完所有的int
6)通过入参支持两种运行模式:
wb(温饱模式)生产速度快过消费速度、
je(饥饿模式)生产速度慢于消费速度
context.WithTimeout见第87行。
- package main
-
- import (
- "context"
- "flag"
- "fmt"
- "sync"
- "time"
- )
-
- // 课后练习 1.2
- // 基于 Channel 编写一个简单的单协程生产者消费者模型。
- // 要求如下:
- // 1)队列:队列长度 10,队列元素类型为 int
- // 2)生产者:每 1 秒往队列中放入一个类型为 int 的元素,队列满时生产者可以阻塞
- // 3)消费者:每2秒从队列中获取一个元素并打印,队列为空时消费者阻塞
- // 4)主协程30秒后要求所有子协程退出。
- // 5)要求优雅退出,即消费者协程退出前,要先消费完所有的int。
-
- // 知识点:
- // 1) 切片的零值也是可用的。
- // 2) context.WithTimeout
- var (
- wg sync.WaitGroup
- p Producer
- c Consumer
- )
-
- type Producer struct {
- Time int
- Interval int
- }
-
- type Consumer struct {
- Producer
- }
-
- func (p Producer) produce(queue chan<- int, ctx context.Context) {
- go func() {
- LOOP:
- for {
- p.Time = p.Time + 1
- queue <- p.Time
- fmt.Printf("生产者进行第%d次生产,值:%d\n", p.Time, p.Time)
- time.Sleep(time.Duration(p.Interval) * time.Second)
-
- select {
- case <-ctx.Done():
- close(queue)
- break LOOP
- }
- }
- wg.Done()
- }()
- }
-
- func (c Consumer) consume(queue <-chan int, ctx context.Context) {
- go func() {
- LOOP:
- for {
- c.Time++
- val := <-queue
- fmt.Printf("-->消费者进行第%d次消费,值:%d\n", c.Time, val)
- time.Sleep(time.Duration(c.Interval) * time.Second)
-
- select {
- case <-ctx.Done():
- //remains := new([]int)
- //remains := []int{}
- var remains []int // 知识点:切片的零值也是可用的。
- for val = range queue {
- remains = append(remains, val)
- fmt.Printf("-->消费者: 最后一次消费, 值为:%v\n", remains)
- break LOOP
- }
- }
- }
- wg.Done()
- }()
- }
-
- func main() {
- wg.Add(2)
-
- // 知识点:context.Timeout
- timeout := 30
- ctx, _ := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
-
- queue := make(chan int, 10)
-
- p.produce(queue, ctx)
- fmt.Println("main waiting...")
- wg.Wait()
- fmt.Println("done")
- }
-
- /*
- 启动命令:
- $ go run main/main.go -m wb
- $ go run main/main.go -m je
- */
- func init() {
- // 解析程序入参,运行模式
- mode := flag.String("m", "wb", "请输入运行模式:\nwb(温饱模式)生产速度快过消费速度、\nje(饥饿模式)生产速度慢于消费速度)")
- flag.Parse()
-
- p = Producer{}
- c = Consumer{}
-
- if *mode == "wb" {
- fmt.Println("运行模式:wb(温饱模式)生产速度快过消费速度")
- p.Interval = 1 // 每隔1秒生产一次
- c.Interval = 5 // 每隔5秒消费一次
-
- // p = Producer{Interval: 1}
- // c = Consumer{Interval: 5} // 这一行会报错,为什么?
-
- } else {
- fmt.Println("运行模式:je(饥饿模式)生产速度慢于消费速度")
- p.Interval = 5 // 每隔5秒生产一次
- c.Interval = 1 // 每隔1秒消费一次
- }
-
- }
wb(温饱模式)生产速度快过消费速度,输出如下:
- 运行模式:wb(温饱模式)生产速度快过消费速度
- 生产者: 第1次生产, 值为:1
- -->消费者: 第1次消费, 值为:1
- 生产者: 第2次生产, 值为:2
- 生产者: 第3次生产, 值为:3
- 生产者: 第4次生产, 值为:4
- 生产者: 第5次生产, 值为:5
- -->消费者: 第2次消费, 值为:2
- 生产者: 第6次生产, 值为:6
- 生产者: 第7次生产, 值为:7
- 生产者: 第8次生产, 值为:8
- 生产者: 第9次生产, 值为:9
- 生产者: 第10次生产, 值为:10
- -->消费者: 第3次消费, 值为:3
- 生产者: 第11次生产, 值为:11
- 生产者: 第12次生产, 值为:12
- 生产者: 第13次生产, 值为:13
- -->消费者: 第4次消费, 值为:4
- 生产者: 第14次生产, 值为:14
- -->消费者: 第5次消费, 值为:5
- 生产者: 第15次生产, 值为:15
- 生产者: 第16次生产, 值为:16
- -->消费者: 第6次消费, 值为:6
- main waiting
- 生产者: 第17次生产, 值为:17
- -->消费者: 最后一次消费, 值为:[7 8 9 10 11 12 13 14 15 16 17]
- -- done --
je(饥饿模式)生产速度慢于消费速度,输出如下:
- 运行模式:je(饥饿模式)生产速度慢于消费速度
- -->消费者: 第1次消费, 值为:1
- 生产者: 第1次生产, 值为:1
- 生产者: 第2次生产, 值为:2
- -->消费者: 第2次消费, 值为:2
- 生产者: 第3次生产, 值为:3
- -->消费者: 第3次消费, 值为:3
- 生产者: 第4次生产, 值为:4
- -->消费者: 第4次消费, 值为:4
- 生产者: 第5次生产, 值为:5
- -->消费者: 第5次消费, 值为:5
- 生产者: 第6次生产, 值为:6
- -->消费者: 第6次消费, 值为:6
- main waiting
- -->消费者: 第7次消费, 值为:0
- -->消费者: 最后一次消费, 值为:[]
- -- done--
————————————————
版权声明:本文为CSDN博主「湾区的候鸟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/tjg138/article/details/124114511
方法一和方法三均转自:Go语言实现超时的3种方法