当前位置:   article > 正文

golang并发安全-select_golang sellock

golang sellock

前面说了golang的channel, 今天我们看看golang select 是怎么实现的。

数据结构

  1. type scase struct {
  2. c *hchan // chan
  3. elem unsafe.Pointer // 数据
  4. }

select 非默认的case 中都是处理channel 的 接受和发送,所有scase 结构体中c是用来存储select 的case中使用的channel

处理流程

select case 场景

编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在cmd/compile/internal/walk/select.go 中,下面会根据不同的场景进行分析代码。

没有case

代码示例

  1. func main() {
  2. select {}
  3. }

如果是空的select语句,程序会被阻塞,golang 带有死锁监测机制:如果当前写成无法被唤醒,则会panic

源码解读

在runtime/select.go中可以看到:如果cases为空直接调用gopark函数以waitReasonSelectNoCases的原因挂起当前的协程,并且无法被唤醒,golang监测到直接panic。

同样我们在walk/select.go的walkSelectCases函数中可以看到,如果case为空直接调用runtime.block函数

只有一个case

代码示例

  1. func main() {
  2. ch := make(chan int)
  3. go func() {
  4. ch <- 1
  5. }()
  6. select {
  7. case data := <-ch:
  8. fmt.Println("ch data:", data)
  9. }
  10. }

如果有输入直接打印ch data : 1 , 没有的话会被检测出all goroutines are asleep - deadlock!(和没有case的一样)

源码解读

如果一个非default case ,将读写转换成 ch <- 或 <- ch, 正常的channel读写

  1. func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  2. // optimization: one-case select: single op.
  3. if ncas == 1 {
  4. cas := cases[0] //获取case
  5. ir.SetPos(cas)
  6. l := cas.Init()
  7. if cas.Comm != nil { // 不是默认
  8. n := cas.Comm // 获取case的条件语句
  9. l = append(l, ir.TakeInit(n)...)
  10. switch n.Op() {
  11. default:
  12. base.Fatalf("select %v", n.Op())
  13. case ir.OSEND: // 如果是 send, 无须处理
  14. // already ok
  15. case ir.OSELRECV2:
  16. r := n.(*ir.AssignListStmt)
  17. // 如果不是 data, ok := <- ch 类型,处理成<- ch
  18. if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
  19. n = r.Rhs[0]
  20. break
  21. }
  22. // 是的话, op设置成data, ok := <- ch形式
  23. r.SetOp(ir.OAS2RECV)
  24. }
  25. l = append(l, n)
  26. }
  27. // 将case 条件后要执行的语句加入带执行的列表
  28. l = append(l, cas.Body...)
  29. // 加入 break类型,跳出select-case
  30. l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
  31. return l
  32. }
  33. // convert case value arguments to addresses.
  34. // this rewrite is used by both the general code and the next optimization.
  35. var dflt *ir.CommClause
  36. for _, cas := range cases {
  37. ir.SetPos(cas)
  38. n := cas.Comm
  39. if n == nil {
  40. dflt = cas
  41. continue
  42. }
  43. switch n.Op() {
  44. case ir.OSEND:
  45. n := n.(*ir.SendStmt)
  46. n.Value = typecheck.NodAddr(n.Value)
  47. n.Value = typecheck.Expr(n.Value)
  48. case ir.OSELRECV2:
  49. n := n.(*ir.AssignListStmt)
  50. if !ir.IsBlank(n.Lhs[0]) {
  51. n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])
  52. n.Lhs[0] = typecheck.Expr(n.Lhs[0])
  53. }
  54. }
  55. }
  56. }

两个case(一个default)

代码示例

  1. func main() {
  2. ch := make(chan int)
  3. select {
  4. case data := <-ch:
  5. fmt.Println("ch data:", data)
  6. default:
  7. fmt.Println("default")
  8. }
  9. }

如果写入就走<- 读取,反之走默认

源码解读

如果是两个case,其中一个是default,非default的会根据send还是recv 调用channel的selectnbsend和 selectnbrecv。这两个方法是非阻塞的

  1. func walkSelectCases(cases []*ir.CommClause) []ir.Node){
  2. // optimization: two-case select but one is default: single non-blocking op.
  3. if ncas == 2 && dflt != nil {
  4. cas := cases[0]
  5. if cas == dflt { // 如果是default 放在 cases[1]
  6. cas = cases[1]
  7. }
  8. n := cas.Comm
  9. ir.SetPos(n)
  10. r := ir.NewIfStmt(base.Pos, nil, nil, nil)
  11. r.SetInit(cas.Init())
  12. var cond ir.Node
  13. switch n.Op() {
  14. default:
  15. base.Fatalf("select %v", n.Op())
  16. case ir.OSEND:
  17. // 调用selectnbsend(c, v)
  18. // if selectnbsend(c, v) { body } else { default body }
  19. n := n.(*ir.SendStmt)
  20. ch := n.Chan
  21. cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)
  22. case ir.OSELRECV2:
  23. n := n.(*ir.AssignListStmt)
  24. recv := n.Rhs[0].(*ir.UnaryExpr)
  25. ch := recv.X
  26. elem := n.Lhs[0]
  27. if ir.IsBlank(elem) { //空的话 elem= NodNil
  28. elem = typecheck.NodNil()
  29. }
  30. cond = typecheck.Temp(types.Types[types.TBOOL])
  31. // 调用 selectnbrecv
  32. fn := chanfn("selectnbrecv", 2, ch.Type())
  33. call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
  34. as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
  35. r.PtrInit().Append(typecheck.Stmt(as))
  36. }
  37. r.Cond = typecheck.Expr(cond)
  38. r.Body = cas.Body
  39. r.Else = append(dflt.Init(), dflt.Body...)
  40. return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
  41. }
  42. }

每次尝试从channel读/写值,如果不成功则直接返回,不会阻塞。从selectnbsend和selectnbrecv看出,最后转换成if-else

  1. // compiler implements
  2. //
  3. // select {
  4. // case c <- v:
  5. // ... foo
  6. // default:
  7. // ... bar
  8. // }
  9. //
  10. // as
  11. //
  12. // if selectnbsend(c, v) {
  13. // ... foo
  14. // } else {
  15. // ... bar
  16. // }
  17. //
  18. func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  19. // block:false
  20. // chan将 select准换if-else
  21. return chansend(c, elem, false, getcallerpc())
  22. }
  23. // compiler implements
  24. //
  25. // select {
  26. // case v, ok = <-c:
  27. // ... foo
  28. // default:
  29. // ... bar
  30. // }
  31. //
  32. // as
  33. //
  34. // if selected, ok = selectnbrecv(&v, c); selected {
  35. // ... foo
  36. // } else {
  37. // ... bar
  38. // }
  39. //
  40. func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  41. // block:false
  42. // chan将 select准换if-else
  43. return chanrecv(c, elem, false)
  44. }

多个case

代码示例

  1. func main() {
  2. ch := make(chan int)
  3. go func() {
  4. tempArr := []int{1,2,3,4,5,6}
  5. for i := range tempArr {
  6. ch <- i
  7. }
  8. }()
  9. go func() {
  10. for {
  11. select {
  12. case i := <-ch:
  13. println("first: ", i)
  14. case i := <-ch:
  15. println("second", i)
  16. }
  17. }
  18. }()
  19. time.Sleep(3 * time.Second)
  20. }

可以看到多个case,会随机选取一个case执行

源码解读

  1. func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  2. ncas := len(cases)
  3. sellineno := base.Pos
  4. if dflt != nil {
  5. ncas--
  6. }
  7. // 定义casorder为ncas大小的case语句的数组
  8. casorder := make([]*ir.CommClause, ncas)
  9. // 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数
  10. nsends, nrecvs := 0, 0
  11. // 多case编译后待执行的语句列表
  12. var init []ir.Node
  13. // generate sel-struct
  14. base.Pos = sellineno
  15. // 定义selv为长度为ncas的scase类型的数组
  16. // scasetype()函数返回的就是scase结构体,包含c和elem两个字段
  17. selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
  18. init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))
  19. // No initialization for order; runtime.selectgo is responsible for that.
  20. // 定义order为2倍的ncas长度的TUINT16类型的数组
  21. // 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的case
  22. order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))
  23. var pc0, pcs ir.Node
  24. if base.Flag.Race {
  25. pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))
  26. pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))
  27. } else {
  28. pc0 = typecheck.NodNil()
  29. }
  30. // register cases 遍历case生成scase对象放到selv中
  31. for _, cas := range cases {
  32. ir.SetPos(cas)
  33. init = append(init, ir.TakeInit(cas)...)
  34. n := cas.Comm
  35. if n == nil { // default:
  36. continue
  37. }
  38. var i int
  39. var c, elem ir.Node
  40. switch n.Op() { // 根据类型获取chan, elem的值
  41. default:
  42. base.Fatalf("select %v", n.Op())
  43. case ir.OSEND: // 发送chan类型,i从0开始递增
  44. n := n.(*ir.SendStmt)
  45. i = nsends
  46. nsends++
  47. c = n.Chan
  48. elem = n.Value
  49. case ir.OSELRECV2: // 接收chan,i从ncas开始递减
  50. n := n.(*ir.AssignListStmt)
  51. nrecvs++
  52. i = ncas - nrecvs
  53. recv := n.Rhs[0].(*ir.UnaryExpr)
  54. c = recv.X
  55. elem = n.Lhs[0]
  56. }
  57. casorder[i] = cas
  58. // 定义一个函数,写入c或elem到selv数组
  59. setField := func(f string, val ir.Node) {
  60. // 放到selv数组
  61. r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
  62. // 添加到带执行列表
  63. init = append(init, typecheck.Stmt(r))
  64. }
  65. c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
  66. setField("c", c)
  67. if !ir.IsBlank(elem) {
  68. elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
  69. setField("elem", elem)
  70. }
  71. // TODO(mdempsky): There should be a cleaner way to
  72. // handle this.
  73. if base.Flag.Race {
  74. r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))
  75. init = append(init, r)
  76. }
  77. }
  78. // 如果发送chan和接收chan的个数不等于ncas,直接报错
  79. if nsends+nrecvs != ncas {
  80. base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
  81. }
  82. // run the select 开始执行select动作
  83. base.Pos = sellineno
  84. // 定义chosen, recvOK作为selectgo()函数的两个返回值
  85. // chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收
  86. chosen := typecheck.Temp(types.Types[types.TINT])
  87. recvOK := typecheck.Temp(types.Types[types.TBOOL])
  88. r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
  89. r.Lhs = []ir.Node{chosen, recvOK}
  90. // 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数
  91. fn := typecheck.LookupRuntime("selectgo")
  92. var fnInit ir.Nodes
  93. r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
  94. init = append(init, fnInit...)
  95. init = append(init, typecheck.Stmt(r))
  96. // selv and order are no longer alive after selectgo.
  97. // 执行完selectgo()函数后,销毁selv和order数组.
  98. init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
  99. init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
  100. if base.Flag.Race {
  101. init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))
  102. }
  103. // dispatch cases
  104. //定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句
  105. dispatch := func(cond ir.Node, cas *ir.CommClause) {
  106. cond = typecheck.Expr(cond)
  107. cond = typecheck.DefaultLit(cond, nil)
  108. r := ir.NewIfStmt(base.Pos, cond, nil, nil)
  109. if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
  110. n := n.(*ir.AssignListStmt)
  111. if !ir.IsBlank(n.Lhs[1]) {
  112. x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
  113. r.Body.Append(typecheck.Stmt(x))
  114. }
  115. }
  116. r.Body.Append(cas.Body.Take()...)
  117. r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
  118. init = append(init, r)
  119. }
  120. // 如果多case中有default分支,并且chosen小于0,执行该default分支
  121. if dflt != nil {
  122. ir.SetPos(dflt)
  123. dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
  124. }
  125. // 如果有chosen选中的case分支,即chosen等于i,则执行该分支
  126. for i, cas := range casorder {
  127. ir.SetPos(cas)
  128. dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
  129. }
  130. return init
  131. }

从上面代码可以看出:

1- 初始化过程: 生成scase数组,定义selv 存放scase数组内存地址,定义order 来给scase排序

2- 遍历所有的case ,将case放到带执行列表(不包括default)

3- 调用runtime。selectgo并将selv和order作为入参传入selectgo

4- 根据selectgo返回的chosen来生成if语句,执行对应的case

解锁加锁

加锁的顺序和解锁的顺序相反。

  1. func sellock(scases []scase, lockorder []uint16) {
  2. var c *hchan
  3. for _, o := range lockorder {
  4. c0 := scases[o].c
  5. if c0 != c {
  6. c = c0
  7. lock(&c.lock)
  8. }
  9. }
  10. }
  11. func selunlock(scases []scase, lockorder []uint16) {
  12. // 我们必须非常小心,在解锁最后一把锁后不要触摸sel,因为sel可以在最后一次解锁后立即释放。
  13. //考虑以下情况。第一个M调用runtime·park()在runtime·selectgo()中传递sel。
  14. //一旦runtime·park()解锁了最后一个锁,另一个M会使调用select的G再次可运行,
  15. //并安排其执行。当G在另一个M上运行时,它锁定所有锁并释放sel。现在,如果第一个M触摸sel,它将访问释放的内存。
  16. for i := len(lockorder) - 1; i >= 0; i-- {
  17. c := scases[lockorder[i]].c
  18. if i > 0 && c == scases[lockorder[i-1]].c {
  19. continue // will unlock it on the next iteration
  20. }
  21. unlock(&c.lock)
  22. }
  23. }

selectgo

selectgo 处理逻辑

  1. // cas0指向[ncases]scase类型的数组,order0指向[2*ncases]uint16类型的数组(其中ncases必须<=65536)。
  2. // 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
  3. func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
  4. if debugSelect {
  5. print("select: cas0=", cas0, "\n")
  6. }
  7. //==== 执行必要的初始化操作,并生成处理case的两种顺序:轮询顺序polIorder和加锁顺序lockorder。
  8. // 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组
  9. // NOTE: In order to maintain a lean stack size, the number of scases
  10. // is capped at 65536.
  11. cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
  12. order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
  13. // ncases个数 = 发送chan个数+ 接收chan个数
  14. ncases := nsends + nrecvs
  15. // scases是cas1数组的前ncases个元素
  16. scases := cas1[:ncases:ncases]
  17. // 顺序列表pollorder是order1的0- ncases个元素
  18. pollorder := order1[:ncases:ncases]
  19. // 加锁列表lockorder是order1的ncase到 2 ncases 个元素
  20. lockorder := order1[ncases:][:ncases:ncases]
  21. // NOTE: 编译器初始化的pollorder/lockorder的基础数组不是零。
  22. // Even when raceenabled is true, there might be select
  23. // statements in packages compiled without -race (e.g.,
  24. // ensureSigM in runtime/signal_unix.go).
  25. var pcs []uintptr
  26. if raceenabled && pc0 != nil {
  27. pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
  28. pcs = pc1[:ncases:ncases]
  29. }
  30. casePC := func(casi int) uintptr {
  31. if pcs == nil {
  32. return 0
  33. }
  34. return pcs[casi]
  35. }
  36. var t0 int64
  37. if blockprofilerate > 0 {
  38. t0 = cputicks()
  39. }
  40. // 生成排列顺序
  41. norder := 0
  42. for i := range scases {
  43. cas := &scases[i]
  44. // Omit cases without channels from the poll and lock orders.
  45. // 处理case中channel为空的情况
  46. if cas.c == nil {
  47. cas.elem = nil // 便于GC
  48. continue
  49. }
  50. // 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引
  51. j := fastrandn(uint32(norder + 1))
  52. pollorder[norder] = pollorder[j]
  53. pollorder[j] = uint16(i)
  54. norder++
  55. }
  56. // 重新生成列表
  57. pollorder = pollorder[:norder]
  58. lockorder = lockorder[:norder]
  59. // 根据chan地址确定lockorder加锁排序列表的顺序
  60. // 简单的堆排序,以保证nlogn时间复杂度完成排序
  61. for i := range lockorder {
  62. j := i
  63. // 从轮询顺序开始,在同一channel上排序。
  64. c := scases[pollorder[i]].c
  65. for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
  66. k := (j - 1) / 2
  67. lockorder[j] = lockorder[k]
  68. j = k
  69. }
  70. lockorder[j] = pollorder[i]
  71. }
  72. for i := len(lockorder) - 1; i >= 0; i-- {
  73. o := lockorder[i]
  74. c := scases[o].c
  75. lockorder[i] = lockorder[0]
  76. j := 0
  77. for {
  78. k := j*2 + 1
  79. if k >= i {
  80. break
  81. }
  82. if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
  83. k++
  84. }
  85. if c.sortkey() < scases[lockorder[k]].c.sortkey() {
  86. lockorder[j] = lockorder[k]
  87. j = k
  88. continue
  89. }
  90. break
  91. }
  92. lockorder[j] = o
  93. }
  94. if debugSelect {
  95. for i := 0; i+1 < len(lockorder); i++ {
  96. if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
  97. print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
  98. throw("select: broken sort")
  99. }
  100. }
  101. }
  102. // 锁定select中涉及的所有channel
  103. sellock(scases, lockorder)
  104. var (
  105. gp *g
  106. sg *sudog
  107. c *hchan
  108. k *scase
  109. sglist *sudog
  110. sgnext *sudog
  111. qp unsafe.Pointer
  112. nextp **sudog
  113. )
  114. // === pass 1 - 查找可以等待处理的channel
  115. var casi int
  116. var cas *scase
  117. var caseSuccess bool
  118. var caseReleaseTime int64 = -1
  119. var recvOK bool
  120. for _, casei := range pollorder {
  121. casi = int(casei) // case的索引
  122. cas = &scases[casi]
  123. c = cas.c
  124. if casi >= nsends { // 处理接收channel的case
  125. sg = c.sendq.dequeue()
  126. if sg != nil {
  127. // 如果当前channel的sendq上有等待的goroutine,
  128. // 跳到recv代码 并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置
  129. goto recv
  130. }
  131. if c.qcount > 0 {
  132. //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;
  133. goto bufrecv
  134. }
  135. if c.closed != 0 {
  136. //如果当前channel已经被关闭,就会跳到rclose读取末尾数据和收尾工作;
  137. goto rclose
  138. }
  139. } else { // 处理发送channel的case
  140. if raceenabled {
  141. racereadpc(c.raceaddr(), casePC(casi), chansendpc)
  142. }
  143. if c.closed != 0 {
  144. // 如果当前channel已经被关闭就会直接跳到sclose标签(panic中止程序)
  145. goto sclose
  146. }
  147. sg = c.recvq.dequeue()
  148. if sg != nil {
  149. // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;
  150. goto send
  151. }
  152. if c.qcount < c.dataqsiz {
  153. // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;
  154. goto bufsend
  155. }
  156. }
  157. }
  158. if !block { // 如果是非阻塞,即包含default分支,解锁所有channel并返回
  159. selunlock(scases, lockorder)
  160. casi = -1
  161. goto retc
  162. }
  163. // === pass 2 - 将当前goroutine根据需要挂在chan的sendq或recvq上
  164. gp = getg() // 获取当前的groutine
  165. if gp.waiting != nil {
  166. throw("gp.waiting != nil")
  167. }
  168. nextp = &gp.waiting // 正在等待的sudog结构;按锁定顺序
  169. for _, casei := range lockorder {
  170. casi = int(casei)
  171. cas = &scases[casi]
  172. c = cas.c
  173. sg := acquireSudog()
  174. // 获取sudog,将当前goroutine绑定到sudog上
  175. sg.g = gp
  176. sg.isSelect = true
  177. // 在分配elem和在gp.waiting上排队sg之间没有堆栈分割,copystack可以找到它。
  178. sg.elem = cas.elem
  179. sg.releasetime = 0
  180. if t0 != 0 {
  181. sg.releasetime = -1
  182. }
  183. sg.c = c
  184. // 按锁定顺序构建waiting list 。
  185. *nextp = sg
  186. nextp = &sg.waitlink
  187. // 加入相应等待队列
  188. if casi < nsends {
  189. c.sendq.enqueue(sg)
  190. } else {
  191. c.recvq.enqueue(sg)
  192. }
  193. }
  194. // 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil
  195. gp.param = nil
  196. // Signal to anyone trying to shrink our stack that we're about
  197. // to park on a channel. The window between when this G's status
  198. // changes and when we set gp.activeStackChans is not safe for
  199. // stack shrinking.
  200. atomic.Store8(&gp.parkingOnChan, 1)
  201. // 挂起当前goroutine
  202. gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
  203. gp.activeStackChans = false
  204. // 加锁所有的channel
  205. sellock(scases, lockorder)
  206. gp.selectDone = 0
  207. sg = (*sudog)(gp.param)
  208. // param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil
  209. gp.param = nil
  210. // === pass 3 - 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理
  211. //dequeue from unsuccessful chans
  212. // otherwise they stack up on quiet channels
  213. // record the successful case, if any.
  214. // We singly-linked up the SudoGs in lock order.
  215. // 从不成功的通道中退出队列,否则它们会堆积在安静的通道上,记录成功的案例(如果有的话)。我们单独将SudoG按锁定顺序连接起来。
  216. casi = -1
  217. cas = nil
  218. caseSuccess = false
  219. // 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
  220. sglist = gp.waiting
  221. // 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
  222. for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
  223. sg1.isSelect = false
  224. sg1.elem = nil
  225. sg1.c = nil
  226. }
  227. // 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了
  228. gp.waiting = nil
  229. for _, casei := range lockorder {
  230. k = &scases[casei]
  231. // 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
  232. if sg == sglist {
  233. // sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队
  234. casi = int(casei)
  235. cas = k
  236. caseSuccess = sglist.success
  237. if sglist.releasetime > 0 {
  238. caseReleaseTime = sglist.releasetime
  239. }
  240. } else {
  241. // 不是此case唤醒当前goroutine, 将goroutine从case对应的队列(发送或接收)出队
  242. c = k.c
  243. if int(casei) < nsends {
  244. c.sendq.dequeueSudoG(sglist)
  245. } else {
  246. c.recvq.dequeueSudoG(sglist)
  247. }
  248. }
  249. // 释放当前case的sudog,然后处理下一个case的sudog
  250. sgnext = sglist.waitlink
  251. sglist.waitlink = nil
  252. releaseSudog(sglist)
  253. sglist = sgnext
  254. }
  255. if cas == nil {
  256. throw("selectgo: bad wakeup")
  257. }
  258. c = cas.c
  259. if debugSelect {
  260. print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
  261. }
  262. if casi < nsends {
  263. if !caseSuccess {
  264. goto sclose
  265. }
  266. } else {
  267. recvOK = caseSuccess
  268. }
  269. if raceenabled {
  270. if casi < nsends {
  271. raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
  272. } else if cas.elem != nil {
  273. raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
  274. }
  275. }
  276. if msanenabled {
  277. if casi < nsends {
  278. msanread(cas.elem, c.elemtype.size)
  279. } else if cas.elem != nil {
  280. msanwrite(cas.elem, c.elemtype.size)
  281. }
  282. }
  283. if asanenabled {
  284. if casi < nsends {
  285. asanread(cas.elem, c.elemtype.size)
  286. } else if cas.elem != nil {
  287. asanwrite(cas.elem, c.elemtype.size)
  288. }
  289. }
  290. selunlock(scases, lockorder)
  291. goto retc
  292. bufrecv:
  293. // 能从buffer获取数据
  294. if raceenabled {
  295. if cas.elem != nil {
  296. raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
  297. }
  298. racenotify(c, c.recvx, nil)
  299. }
  300. if msanenabled && cas.elem != nil {
  301. msanwrite(cas.elem, c.elemtype.size)
  302. }
  303. if asanenabled && cas.elem != nil {
  304. asanwrite(cas.elem, c.elemtype.size)
  305. }
  306. recvOK = true
  307. qp = chanbuf(c, c.recvx)
  308. if cas.elem != nil {
  309. typedmemmove(c.elemtype, cas.elem, qp)
  310. }
  311. typedmemclr(c.elemtype, qp)
  312. c.recvx++
  313. if c.recvx == c.dataqsiz {
  314. c.recvx = 0
  315. }
  316. c.qcount--
  317. selunlock(scases, lockorder)
  318. goto retc
  319. bufsend:
  320. // 发送数据到缓存
  321. if raceenabled {
  322. racenotify(c, c.sendx, nil)
  323. raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
  324. }
  325. if msanenabled {
  326. msanread(cas.elem, c.elemtype.size)
  327. }
  328. if asanenabled {
  329. asanread(cas.elem, c.elemtype.size)
  330. }
  331. typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  332. c.sendx++
  333. if c.sendx == c.dataqsiz {
  334. c.sendx = 0
  335. }
  336. c.qcount++
  337. selunlock(scases, lockorder)
  338. goto retc
  339. recv:
  340. // 从休眠sender(sg)接收
  341. recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  342. if debugSelect {
  343. print("syncrecv: cas0=", cas0, " c=", c, "\n")
  344. }
  345. recvOK = true
  346. goto retc
  347. rclose:
  348. // 读取结束的channel
  349. selunlock(scases, lockorder)
  350. recvOK = false
  351. if cas.elem != nil {
  352. typedmemclr(c.elemtype, cas.elem)
  353. }
  354. if raceenabled {
  355. raceacquire(c.raceaddr())
  356. }
  357. goto retc
  358. send:
  359. // 想休眠的接收房发送数据
  360. if raceenabled {
  361. raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
  362. }
  363. if msanenabled {
  364. msanread(cas.elem, c.elemtype.size)
  365. }
  366. if asanenabled {
  367. asanread(cas.elem, c.elemtype.size)
  368. }
  369. send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  370. if debugSelect {
  371. print("syncsend: cas0=", cas0, " c=", c, "\n")
  372. }
  373. goto retc
  374. retc:
  375. if caseReleaseTime > 0 {
  376. blockevent(caseReleaseTime-t0, 1)
  377. }
  378. return casi, recvOK
  379. sclose:
  380. // 向关闭的channel发送数据
  381. selunlock(scases, lockorder)
  382. panic(plainError("send on closed channel"))
  383. }

总结

简单总结下select对case处理逻辑:

1- 空的case 会被golang监听到无法唤醒的协程,会panic

2- 如果只有一个case, 根据操作类型转换成 <- ch 或 成ch <- () (会跳用channel 的 chansend , chanrecv)

3- 如果一个default 一个非default 的case,非default会走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后转换成if-else 语句)

4- 多个case 的情况下, cmd/compile/internal/walk/select.go 优化程序中:

4.1 对 scase 数组, selv ,order数组初始化,将case放在带执行列表中

4.2 调用selectgo函数,根据返回的chosen 结果来生成if语句,执行对应的case

selectgo 函数:

1- 随机生成一个便利case 的 轮询 poollorder, 根据channel 地址生成一个枷锁顺序的lockorder。(随机顺序保证公平性,加锁顺序能够避免思索)

2- 根据pollorder顺序查找cases是否包含立即处理的chan, 如果有就处理。没有处理的话,创建 sudo 结构,将当前的G 加入各case的channel 对应的 接收发送队列,等待其他G唤醒

3- 当调度器 唤醒当前的G,会按照lockorder ,访问所有的case。从中找到需要处理的case进行读写处理,同时从所有的case 的发送姐搜队列中移除当前的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/770693
推荐阅读
相关标签
  

闽ICP备14008679号